Answer these questions using Spark code. Submit your code (in a py file) and the answers to the questions (in a text file). The answers should use the full dataset, not the small dataset. Start with the code shown below. (Hint: for any tasks that say max/largest, don’t use sortByKey, because that’s much slower than a better option.)
- Which day had the largest number of installed drives, and what was this number?
- How many distinct drives (by model+serial) are installed (i.e., that exist in the data) in each year?
- What’s the max drive capacity per year?
Full dataset: change the file path to: file:///ssd/data/backblaze.csv (146 million rows) – my solution took 17min
Run spark like this: spark-submit backblaze-spark.py –master=local[5]
Or to hide log messages: spark-submit backblaze-spark.py –master=local[5] 2> /dev/null
Look at /home/jeckroth/cinf201/2022-spring/spark/backblaze.py for some more example code.
Starting code with some examples that you can remove:
“”””
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(“Backblaze”).getOrCreate()schema = “day DATE, serial STRING, model STRING, capacity LONG, failure INTEGER”d = spark.read.schema(schema).load(“file:///home/jeckroth/cinf201/spark/assignment/small-backblaze.csv”, format=”csv”, sep=”,”, header=”true”)d = d.rdd# print first 10 rowsprint(d.take(10))## How many failures occurred each year?# make key (year) & value (failure 0/1)d2 = d.map(lambda row: (row.day.year, row.failure))# add up failures per yearfailureCounts = d2.reduceByKey(lambda cnt, rowcnt: cnt + rowcnt)print(failureCounts.collect())## Which model (not serial number) has the most failures overall?# grab model & failure from data, model is the keyd3 = d.map(lambda row: (row.model, row.failure))# count failures for that model; result so far: [(modelX, 55), (modelY, 2100)]d3 = d3.reduceByKey(lambda cnt, rowcnt: cnt + rowcnt)# flip keys and values; result so far: [(55, modelX), (2100, modelY)]d3 = d3.map(lambda pair: (pair[1], pair[0]))# sort by value (second in the pair)d3 = d3.sortByKey(ascending=False) ### NOT EFFICIENT TECHNIQUEprint(d3.collect())