!pip install pyspark
from pyspark.sql import SparkSession  
from pyspark.sql.functions import concat, col
import findspark
#findspark.init()
# create a SparkSession
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
# read CSV file
df = spark.read.csv("/content/drive/MyDrive/Phaul/small1.csv", 
                    header=True, inferSchema=True)
# show first 5 rows
df.show(5)
# stop SparkSession
#spark.stop()
cols = df.columns 
cols
from pyspark.sql.functions import col, count
df.select([count(col(c).isNull().alias(c)) for c in df.columns]).show()
df = df.na.drop(how = "any").show()
from pyspark import SparkConf, SparkContext
def SON_algorithm(data, support_threshold, num_partitions):
    # Step 1: Partition the data and find frequent itemsets in each partition
    partitioned_data = data.repartition(num_partitions)
    frequent_items = partitioned_data.mapPartitions(lambda partition: apriori(partition, support_threshold))
    frequent_items = frequent_items.reduceByKey(lambda x, y: x + y).collect()
    # Step 2: Find frequent itemsets by combining frequent itemsets found in each partition
    candidates = [itemset for itemset, support in frequent_items if support >= support_threshold]
    frequent_itemsets = set(candidates)
    for i in range(2, len(candidates)):
        candidate_itemsets = partitioned_data.flatMap(lambda partition: generate_candidates(partition, frequent_itemsets, i))
        frequent_itemsets = set(candidate_itemsets.reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] >= support_threshold).map(lambda x: x[0]).collect())
    return frequent_itemsets
def apriori(partition, support_threshold):
    # Generate candidate itemsets of size 1
    items = partition.flatMap(lambda transaction: transaction).map(lambda item: (item, 1))
    frequent_items = items.reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] >= support_threshold)
    # Generate candidate itemsets of size > 1 using the Apriori algorithm
    candidates = set(frequent_items.map(lambda x: frozenset([x[0]])).collect())
    while len(candidates) > 0:
        frequent_items = items.filter(lambda x: x[0] in candidates).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] >= support_threshold)
        new_candidates = frequent_items.map(lambda x: frozenset(candidates.union([frozenset([x[0]])])))
        candidates = set(new_candidates.collect())
        frequent_items = frequent_items.collect()
        for itemset in frequent_items:
            yield (frozenset([itemset[0]]), itemset[1])
def generate_candidates(partition, frequent_itemsets, k):
    # Generate candidate itemsets of size k using the frequent itemsets of size k-1
    candidates = set()
    for itemset in frequent_itemsets:
        for item in partition:
            if item not in itemset:
                candidate = frozenset(itemset.union([item]))
                if candidate not in candidates:
                    candidates.add(candidate)
                    yield (candidate, 1)
conf = SparkConf().setAppName("SON Algorithm")
sc = SparkContext.getOrCreate(conf)
data = sc.textFile("/content/drive/MyDrive/Phaul/small1.csv")
cols = ['user_id', 'business_id']
#pythonLines = data.filter(lambda data: "color" in data )
#pythonLines.collect()
support_threshold = 100
num_partitions = 4
frequent_itemsets = SON_algorithm(data, support_threshold, num_partitions) Write, Run & Share Scala code online using OneCompiler's Scala online compiler for free. It's one of the robust, feature-rich online compilers for Scala language, running on the latest version 2.13.8. Getting started with the OneCompiler's Scala compiler is simple and pretty fast. The editor shows sample boilerplate code when you choose language as Scala and start coding.
OneCompiler's Scala online editor supports stdin and users can give inputs to programs using the STDIN textbox under the I/O tab. Following is a sample Scala program which takes name as input and prints hello message with your name.
object Hello {
	def main(args: Array[String]): Unit = {
	  val name = scala.io.StdIn.readLine()        // Read input from STDIN
    println("Hello " + name ) 
	}
}
Scala is both object-oriented and functional programming language by Martin Odersky in the year 2003.
Variable is a name given to the storage area in order to identify them in our programs.
var or val Variable-name [: Data-Type] = [Initial Value];
If, If-else, Nested-Ifs are used when you want to perform a certain set of operations based on conditional expressions.
if(conditional-expression){    
//code    
} 
if(conditional-expression) {  
//code if condition is true  
} else {  
//code if condition is false  
} 
if(condition-expression1) {  
//code if above condition is true  
} else if (condition-expression2) {  
//code if above condition is true  
}  
else if(condition-expression3) {  
//code if above condition is true  
}  
...  
else {  
//code if all the above conditions are false  
}  
For loop is used to iterate a set of statements based on a criteria.
for(index <- range){  
  // code  
} 
While is also used to iterate a set of statements based on a condition. Usually while is preferred when number of iterations are not known in advance.
while(condition) {  
 // code 
}  
Do-while is also used to iterate a set of statements based on a condition. It is mostly used when you need to execute the statements atleast once.
do {
  // code 
} while (condition) 
Function is a sub-routine which contains set of statements. Usually functions are written when multiple calls are required to same set of statements which increases re-usuability and modularity.
def functionname(parameters : parameters-type) : returntype = {   //code
}
You can either use = or not in the function definition. If = is not present, function will not return any value.