Python

SON algorithm for Frequent Itemsets

Python3 implementation of :

  • Randomized Algorithm (Chapter 6.4.1)
  • Savasere, Omiecinski, and Navathe (SON) Algorithm (Chapter 6.4.3)

Originally described in Book Mining of Massive Datasets. Book is available online:

http://infolab.stanford.edu/~ullman/mmds/ch6.pdf

Datasets Used ( T10I4D100K, T40I10D100K ):

http://fimi.ua.ac.be/data/


Arguments:

  • -f –file File name
  • -s –min_support (default 0.5) Minimal support rate.
  • -c –min_confidence (default 0.5) Minimal confidence level.
  • -t –process (default 4) How many process (Set to you core number) (SON algorithm only)
  • -p –probability (default 0.1) (Randomized Algorithm only)

3rd Party library needed:

pip3 install efficient-apriori

Sample Run:

python3 random.py -f T40I10D100K.dat -p 0.1 -s 0.05 -c 0.05
python3 son.py -f T40I10D100K.dat -t 10 -s 0.05 -c 0.05

Proof of efficiency:

Original Apriori Algorithm Costs 10 mins
Randomized Algorithm with 50% sampling rate saves 50% time needed
SON algorithm with 10 Cores CPU saves 90% time needed
Fully optimized multiprocessing

Note: Hyper-threading won’t save your time


Randomized Algorithm:

import argparse
import time
import sys
import random
from efficient_apriori import apriori

parser = argparse.ArgumentParser(description="datasets")
parser.add_argument('-f', '--file', default="sample.txt",help="http://fimi.uantwerpen.be/data/")
parser.add_argument('-s', '--min_support', default = 0.5,
                    help="minimum support, set to 0.5 by default")
parser.add_argument('-c', '--min_confidence', default = 0.5,
                    help="minimum confidence, set to 1 by default")
parser.add_argument('-p', '--probability', default = 1,
                    help="Probability for randomlized slice")               
args = parser.parse_args()

#memory_data [Brisket_Number][Item_Number]
memory_data = []
data_list = []

def decision(probability):
    return random.random() < float(probability)
#------------------------------------------File Handling------------------------------------------

def file_to_array():
    i = 1
    data_file = open(args.file, "r")
    print("Reading File into Array...\n")
    #if its comment line
    for line in data_file:
        
        line = line.rstrip()
        line_row = line.split()
        
        memory_data.append(line_row)
        if (decision(args.probability)):
            data_list.append(tuple(line_row))
        i += 1
    data_file.close()
    #print("Array:",memory_data,"\n")
    #print("Data_list:",data_list)
    print ("Reading complete ",i, " lines in total.")


def main():
    start_time = time.time()
    file_to_array()
    #Apriori()
    itemsets, rules = apriori(data_list, min_support= float(args.min_support),  min_confidence=float(args.min_confidence))
    #print(itemsets)
    print(rules) 

    print("--- Total run time: %s seconds ---" % (time.time() - start_time))


if __name__ == "__main__":
    main()

Savasere, Omiecinski, and Navathe (SON) Algorithm:

import argparse
import time
import sys
import multiprocessing
from efficient_apriori import apriori


parser = argparse.ArgumentParser(description="datasets")
parser.add_argument('-f', '--file', default="sample.txt",help="http://fimi.uantwerpen.be/data/")
parser.add_argument('-s', '--min_support', default = 0.5,
                    help="minimum support, set to 0.5 by default")
parser.add_argument('-c', '--min_confidence', default = 0.5,
                    help="minimum confidence, set to 1 by default")
parser.add_argument('-t', '--process', default = 4,
                    help="multi process")
args = parser.parse_args()

memory_data = []
result_first_round = []

def file_to_array():
    total_lines = 0
    data_file = open(args.file, "r")
    print("Reading File into Array...\n")
    #if its comment line
    for line in data_file:
        total_lines += 1
        line = line.rstrip()
        line_row = line.split()
        memory_data.append(line_row)
    data_file.close()
    print(total_lines," lines in total.\n")
    return total_lines

def split_list(k):
    data_list = []
    for i in range (len(memory_data)):
        if (i % int(args.process) == k):
            data_list.append(tuple(memory_data[i]))
    #print("datalist for thread ",k,":\n",data_list)
    return data_list

def Son_Algorithm_single_process(data_list):
    itemsets, rules = apriori(data_list, min_support= float(args.min_support),  min_confidence=float(args.min_confidence))
    return list(rules)

def Son_Algorithm(k):
    temp_list = split_list(k)
    if not temp_list:
        return
    return Son_Algorithm_single_process(temp_list)

def accumulate_result(result_first_round):
    temp = []
    for item in result_first_round:
        temp += item
    temp2 = []
    for result in temp:
        if not result in temp2:
            temp2.append(result)
    return temp2

def verify_result_split(temp_result,k):  

    split_result = []
    for i in range (len(temp_result)):
        if (i % int(args.process) == k):
            split_result.append(temp_result[i])
    return split_result

def verify_result_single_process(temp_result,k,total_lines):
    result = []
    #print("List for Process ", k , ": ",temp_result)
    temp_list = verify_result_split(temp_result,k)
    
    for pair in temp_list:
        temp_count = 0
        for i in range (len(memory_data)):
            if (str(pair.lhs[0]) in memory_data[i] and str(pair.rhs[0]) in memory_data[i]):
                temp_count += 1
        if ( temp_count/total_lines>float(args.min_support)):
            result.append(pair)
    return result

def main():
    start_time = time.time()
    total_lines = file_to_array()

    #First Scan
    pool = multiprocessing.Pool(processes=int(args.process))
    for i in range(int(args.process)):
        result_first_round.append(pool.apply_async(Son_Algorithm, (i,)))
    pool.close()
    pool.join()

    first_temp = []
    for res in result_first_round:
        first_temp.append(res.get())  
     
    #print(accumulate_result(first_temp))
    
    #Second Scan
    result_final_round = []
    temp_result = accumulate_result(first_temp)

    pool2 = multiprocessing.Pool(processes=int(args.process))
    for i in range(int(args.process)):
        result_final_round.append(pool2.apply_async(verify_result_single_process, (temp_result,i,total_lines,)))
    pool2.close()
    pool2.join()
    
    final_result = []
    for res in result_final_round:
        final_result.append(res.get())
    print (accumulate_result(final_result))

    print("--- Total run time: %s seconds ---" % (time.time() - start_time))


if __name__ == "__main__":
    main()

Leave a Reply

Your email address will not be published. Required fields are marked *