Hadoop MapReduce & Spark Program

Homework #5 (Hadoop MapReduce & Spark)Due: 11:59pm, Friday, April 21, 2023
Points: 100
1. [40 points] Write a Hadoop MapReduce program named SQL2MR.java to find answers to the
following SQL query on the aqi.csv data set (for Air Quality). The file has 10,129 rows, each row
has four values: date, country, status, value.
select status, avg(value)
from aqi
where date like ‘2022-08%’
group by status
having count(value) >= 100;
You are provided with a template for SQL2MR.java where you can provide the missing code to
produce a complete program. The template also has some hints that may help you.
Before you compile and run the program, please complete the following:
● You should remove the header of aqi.csv file and save it under a directory called
aqi-input on EC2.
● You should comment out OR remove xxx part in core-site.xml.
Here is the path for the core-site.xml file /etc/hadoop/core-site.xml.
● Remember to stop your mysql server before running hadoop(sudo service mysql stop).
You are reminded of the following steps to compile and run the program.



hadoop com.sun.tools.javac.Main SQL2MR.java
jar cf sql2mr.jar SQL2MR*.class
hadoop jar sql2mr.jar SQL2MR aqi-input aqi-output
Submission: SQL2MR.java, sql2mr.jar, and part-r-00000 file under aqi-output.
2. [30 points] Using the JSON files in country-db.zip and the aqi.csv file, answer the following
questions using Spark DataFrame API. You can use “import pyspark.sql.functions as fc”. Note:
you should not use Spark SQL in this question.
Remember to stop your mysql server before running spark (sudo service mysql stop).
Submission: Copy your Spark DataFrame scripts and outputs into one file and generate PDF to
submit.
a. [8 points] Find countries that are in both country.json and aqi.csv.
i. Using join
ii. Using set operation
b. [8 points] Find (names of) countries that are in aqi.csv but not in country.json. Output
the same countries only once.
i. Using join
ii. Using set operation
c. [6 points] Find countries that are in country.json but not in aqi.csv.
i. Using join
ii. Using set operation
d. [8 points] Find answer to the SQL query in Task 1, copied below:
select status, avg(value)
from aqi
where date like ‘2022-08%’
group by status
having count(value) >= 100;
Note that if the “date” column is a timestamp, you may proceed as follows to extract
year and month of the dates:
fc.year(‘date’) # this will get year
fc.month(‘date’) # of this will get month
3. [30 points] Using the JSON files in country-db.zip and the aqi.csv file, answer the same questions
as in Task 2 but using Spark RDD API. Note that you should first convert the dataframe for the
entire data set (e.g., aqi for aqi.csv) to rdd (e.g., using aqi.rdd) and work on the RDDs to solve the
questions.
Hint: if date is a datetime, e.g., datetime.datetime(2022, 8, 1, 0, 0), you can use date.year and
date.month to get year and month respectively.
[Row(date=datetime.datetime(2022, 8, 1, 0, 0), country=’Albania’, …]
Submission: Copy your Spark RDD scripts and outputs into one file and generate PDF to submit.
a. [8 points] Find countries that are in both country.json and aqi.csv.
i. Using join
ii. Using set operation
b. [8 points] Find (names of) countries that are in aqi.csv but not in country.json.
i. Using join
ii. Using set operation
c. [6 points] Find countries that are in country.json but not in aqi.csv.
i. Using join
ii. Using set operation
d. [8 points] Find answer to the SQL query in Task 1, copied below:
select status, avg(value)
from aqi
where date like ‘2022-08%’
group by status
having count(value) >= 100;
Note: you are required to use aggregateByKey in question d.
Submission:
1. Please ZIP your files as a whole .zip file(don’t use any other compress type such as ‘.rar’) and
submit. Otherwise you may not be able to submit because of D2L security reasons.
2. For Q1: SQL2MR.java, sql2mr.jar, and part-r-00000 file under aqi-output
3. For Q2: Copy your Spark DataFrame scripts and outputs into one file and generate PDF to
submit.
4. For Q3: Copy your Spark RDD scripts and outputs into one file and generate PDF to submit.
Hadoop MapReduce
DSCI 55x
Wensheng Wu
1
Hadoop
• A large-scale distributed batch-processing
infrastructure
• Large-scale:
– Handle a large amount of data and computation
• Distributed:
– Distribute data & work across a number of machines
• Batch processing
– Process a series of jobs without human intervention
2
2-10 Gbps backbone between racks
1 Gbps between
any pair of nodes
in a rack
Switch
Switch
Switch
CPU
CPU
CPU
CPU
Mem
Mem
Mem
Mem
Disk
Disk
Disk


Disk
Each rack contains 16-64 nodes
3
In 2011 it was guestimated that Google had 1M machines, http://bit.ly/Shh0RO
1/7/2014
Jure Leskovec, Stanford CS246: Mining Massive Datasets, http://cs246.stanford.edu
36
4
Roadmap
• Hadoop architecture
– HDFS
– MapReduce
• MapReduce implementation
• Compile & run MapReduce programs
5
Key components
• HDFS (Hadoop distributed file system)
– Distributed data storage with high reliability
• MapReduce
– A parallel, distributed computational paradigm
– With a simplified programming model
6
HDFS
• Data are distributed among multiple data nodes
– Data nodes may be added on demand for more
storage space
• Data are replicated to cope with node failure
– Typically replication factor = 2/3
• Requests can go to any replica
– Removing the bottleneck (in single file server)
7
HDFS architecture
8
HDFS has …
• A single NameNode, storing meta data:
– A hierarchy of directories and files
– Attributes of directories and files
– Mapping of files to blocks on data nodes
• A number of DataNodes:
– Storing contents/blocks of files
9
Compute nodes
• Data nodes are compute nodes too
• Advantage:
– Allow schedule computation close to data
10
HDFS also has …
• A SecondaryNameNode
– Maintaining checkpoints of NameNode
– For recovery
• In a single-machine setup
– all nodes correspond to the same machine
11
Metadata in NameNode
• NameNode has an inode for each file and dir
• Record attributes of file/dir such as
– Permission
– Access time
– Modification time
• Also record mapping of files to blocks
12
Mapping information in NameNode
• E.g., file /user/aaron/foo consists of blocks 1,
2, and 4
• Block 1 is stored on data nodes 1 and 3
• Block 2 is stored on data nodes 1 and 2
• …
13
Block size
• HDFS: 128MB
– Much larger than disk block size (4KB)
• Why larger size in HDFS?
– Reduce metadata required per file
– Fast streaming read of data (since larger amount
of data are sequentially laid out on disk)
– Good for workload with largely sequential read of
large file
14
Roadmap
• Hadoop architecture
– HDFS
– MapReduce
• MapReduce implementation
• Compile & run MapReduce programs
15
MapReduce job
• A MapReduce job consists of a number of
– Map tasks
– Reduce tasks
– (Internally) shuffle tasks
16
Map, reduce, and shuffle tasks
• Map task performs data transformation
• Reduce task combines results of map tasks
• Shuffle task sends output of map tasks to right
reduce tasks
– M1 M2 (=> key-values)
– R1 R2
17
Hadoop cluster
Each may be run on
a separate machine
18
Job tracker
• Takes requests from clients (MapReduce
programs)
• Ask name node for location of data
• Assign tasks to task trackers near the data
– Compared to: bring data to computation
• Reassign tasks if failed
19
Task tracker
• Accept (map, reduce, shuffle) tasks from job
trackers
• Send heart beats to job trackers: I am alive
• Monitor status of tasks and notify job tracker
20
Roadmap
• Hadoop architecture
– HDFS
– MapReduce
• MapReduce implementation
– Map & reduce functions and tasks
– Shuffling & merging
– Input and output format
– Combiner
• Compile & run MapReduce programs
21
Roots in functional programming
• Functional programming languages:
– Python, Lisp (list processor), Scheme, Erlang, Haskell
• Two functions:
– Map: mapping a list => list
– Reduce: reducing a list => value
• map() and reduce() in Python
– https://docs.python.org/2/library/functions.html#ma
p
22
map() and reduce() in Python
• list = [1, 2, 3]
• def sqr(x): return x ** 2
• list1 = map(sqr, list)
What are the value of list1 and z?
• def add(x, y): return x + y
• z = reduce(add, list)
23
Lambda function
• Anonymous function (not bound to a name)
• list = [1, 2, 3]
• list1 = map(lambda x: x ** 2, list)
• z = reduce(lambda x, y: x + y, list)
24
How is reduce() in Python evaluated?
• z = reduce(f, list) where f is add function
• Initially, z (an accumulator) is set to list[0]
• Next, repeat z = add(z, list[i]) for each i > 0
• Return final z
• Example: z = reduce(add, [1, 2, 3])
– i = 0, z = 1; i = 1, z = 3; i = 2, z = 6
25
Python 3
• map() returns an iterator
– g = map(lambda x: x* 2, [1,2,3])
– next(g)
• reduce() moved to functools
– import functools as fc
– fc.reduce(lambda U, x: U + x, [1,2,3])
– fc.reduce(lambda U, x: U + x, [1,2,3], 0)
26
MapReduce
• Map function:
– Input: pair
– Output: a list of pairs // (‘LA’, 2), (‘LA’, 3)
• Reduce function: (‘LA’, [2, 3])
– Input: (note k’s are output by map)
– Output: a list of pairs
27
Input: key-value pairs
Output: (intermediate) key-value pairs
Intermediate
key-value pairs
v
k
v
k

k
k
v
k
v
k
v
map
map

v
k
v
28
Output
key-value
pairs
Group by
Intermediate
key-value pairs
Key-value groups
reduce
k
v
k
v
k
v
k
Group
by key
v
v
k
v
k
v
reduce
k
v
v


k
v
v
k

v
k
v
29
MAP:
Read input and
produces a set of
key-value pairs
Group by key:
Collect all pairs with
the same key
Single map &
reduce task
Reduce:
Reduce all values
belonging to the
key and output
30
Shuffling
h(k1) = 2
h(k2) = ?
fall20,[1]
that,[2,1]
k4,v
k5,v
k4,v
k2,v k4,v k4,v
k4,v
k5,v
k3,v
k3,v
Multiple map & reduce tasks
hello,[3,5]

31
Example: WordCount
• Counting the number of occurrences of words
in a collection of documents
• helloworld.txt (stored under an input
directory, among possible other documents)
– hello world
– hello this world
– hello hello world
32
Example: WordCount
• Map function:
– Input: // line = a line of text in
a document
– Output: for each word in line, output
• Reduce function:
– Input:
– Output: where count is the number
of 1’s in the input list
33
Pseudocode
map(key, value):
// key: line offset; value: line content
for each word w in value:
output (w, 1)
reduce(key, values):
// key: a word; values: an iterator over counts
result = 0
for each count v in values:
result += v
output (key, result)
34
Group by
• System groups the intermediate key-value
pairs from map tasks by key
• E.g.,
=> ,
35
Example: WordCount
• hadoop jar wc.jar WordCount input output
• Output:
– hello 4
– this 1
– world 3
36
WordCount: Mapper
Object can be replaced with LongWritable
Data types of input key-value
Data types of output key-value
Key-value pairs with specified data types
37
WordCount: Reducer
Data types of input key-value
Should be the same as output data types of mapper
Data types of output key-value
A list of values
38
Checking map input
• map input: key=0, value=hello world
• map input: key=12, value=hello this world
• map input: key=29, value=hello hello world
39
Checking reduce input
• reduce input: key=hello, values=1 1 1 1
• reduce input: key=this, values=1
• reduce input: key=world, values=1 1 1
40
Map and reduce tasks in Hadoop
• A node may run multiple map/reduce tasks
• Typically, one map task per input split (chunk
of data)
• One reduce task per partition of map output
– E.g., partition by key range or hashing
41
Mapper and Reducer
• Each map task runs an instance of Mapper
– Mapper has a map function
– Map task invokes the map function of the Mapper
once for each input key-value pair
• Each reduce task runs an instance of Reducer
– Reducer has a reduce function
– Reduce task invokes the reduce function of the
Reducer once for every different intermediate key
42
Reduce function
• Input: a key and an iterator over the values for
the key
• Values are NOT in any particular order
• Reduce function is called once for every
different key (received by the reduce task)
43
Roadmap
• Hadoop architecture
– HDFS
– MapReduce
• MapReduce implementation
– Map & reduce functions and tasks
– Shuffling task
– Input and output format
– Combiner
• Compile & run MapReduce programs
44
Shuffling
• Process of distributing intermediate key-values
to the right reduce tasks
• It is the only communication among map and
reduce tasks
– Individual map tasks do not exchange data directly
with other map tasks
– They are not even aware of existence of their peers
45
Shuffling
• Begins when a map task completed on a node
• All intermediate key-value pairs with the same
key are sent to the same reducer task
• Partitioning method defined in Partitioner class
– Default rule: partition by hashing the key
46
Internals of shuffling
• Map side
– Partition, sort, spill & merge
• Reduce side
– Fetch & merge
47
Shuffling process
Keys in the same partition are sorted (keys from different partitions may not be)
Merging by partition
Merging by key
From “Hadoop: The Definitive Guide” by T. White. 48
Map side
• Partition data in the buffer into R parts
– R = # of reduce tasks
• Sort data in each partition by key
• Spill/write data in the buffer to disk
• Merge the spills
• Notify job tracker: output complete
49
Reduce side
• Task tracker notified by job tracker: data ready
• Fetch/copy data from map side
• Merge the data
– Some data may sit on disk once fetched
– This depends on the buffer size
• Figure out groups from sorted data
50
Roadmap
• Hadoop architecture
– HDFS
– MapReduce
• MapReduce implementation
– Map & reduce functions and tasks
– Shuffling & merging
– Input and output format
– Combiner
• Compile & run MapReduce programs
51
52
InputFormat
• Determine how input files are split and read
• Defined in the Java interface InputFormat
• Job:
– Split input file into chunks called InputSplits
– Implement RecordReader to read data from splits
53
InputFormat implementations
• FileInputFormat (input from files in given dirs)
• DBInputFormat (input data from a database)
• CombineFileInputFormat (input data by
combining multiple files)
• …
54
FileInputFormat
• Job:
– Takes paths to files
– Read all files in the paths
– Divide each file into one or more InputSplits
• Subclasses:
– TextInputFormat
– KeyValueTextInputFormat
– SequenceFileInputFormat
55
Subclasses of FileInputFormat
InputFormat:
Description:
Key:
Value:
TextInputFormat
Default format;
reads lines of text
files
The byte offset of
the line
The line
content
KeyValueTextInputFormat
Parses lines into
key, value pairs
Everything up to
the first tab
character
The remainder
of the line
SequenceFileInputFormat
A Hadoop-specific
high-performance user-defined
binary format
user-defined
56
Use non-default input format
• If input file contains tab-separated key-value
pairs, e.g.,
– John 5
– David 6
–…
• job.setInputFormatClass(KeyValueTextInputFo
rmat.class);
– Both key and value are of type “Text”
57
InputSplits
• If a file is big, multiple splits may be created
– Typical split size = 128MB
• A map task is created for each split
– i.e., a chunk of some input file
58
RecordReader (RR)
• InputFormat defines an instance of RR
– E.g., TextInputFormat provides LineRecordReader
• LineRecordReader
– Form a key-value pair for every line of file
– Data type for key: LongWritable; value: Text
• Reader is repeatedly called
– Until all data in the split are processed
59
OutputFormat
• Define the format of output from Reducers
– Output stored in a file
• Defined in the Java interface OutputFormat
• Implemention: FileOutputFormat
– Subclasses: TextOutputFormat,
SequenceFileOutputFormat
60
OutputFormat
OutputFormat:
Description
TextOutputFormat
Default; writes lines in “key \t value” form
SequenceFileOutputFormat
Writes binary files suitable for reading
into subsequent MapReduce jobs
61
Outputs
• All Reducers write to the same directory
– Each writes a separate file, named part-r-nnnnn
– r: output from Reducers
– nnnnn: partition id associated with reduce task
• Output directory
– Set by FileOutputFormat.setOutputPath() method
• OutputFormat defines a RecordWriter
– which handles the write
62
WordCount: setting up job
Take multiple directories as input
Set output key and value types for both map and reduce tasks.
If Mapper has different types, use setMapOutputKeyClass and setMapOutputValueClass63
setJarByClass
• Mapper and reducer code may be in a
different jar
– E.g., a jar in Hadoop class search path
– (wc.jar contains only the job submission code)
• setJarByClass tells Hadoop where to find jar
containing mapper and reducer code
– By specifying its class name (e.g., WordCount)
64
Roadmap
• Hadoop architecture
– HDFS
– MapReduce
• MapReduce implementation
– Map & reduce functions and tasks
– Shuffling & merging
– Input and output format
– Combiner
• Compile & run MapReduce programs
65
Combiner
66
Combiner
• Run on the node running the Mapper
– Perform local (or mini-) reduction
• Combine Mapper results
– Before they are sent to the Reducers
– Reduce communication costs
• E.g., may use a combiner in WordCount
– (cat, 1), (cat, 1), (cat, 1) => (cat, 3)
– One key-value pair per unique word
67
Without combiner
• Mapper 1 outputs:
– (cat, 1), (cat, 1), (cat, 1), (dog, 1)
• Mapper 2 outputs:
– (dog, 1), (dog, 1), (cat, 1)
• Suppose there is only one Reducer
– It will receive: (cat, [1, 1, 1, 1]), (dog, [1, 1, 1])
68
Implementing combiner
• May directly use the reduce function
– If it is commutative and associative
– Meaning operations can be grouped & performed in
any order
• Operation ‘op’ is commutative if:
– A op B = B op A
• Op is associative if:
– A op (B op C) = (A op B) op C
69
Example: without combiner
• Consider two map tasks
– M1 => 1, 2, 3 for some key x
– M2 => 4, 5 for the same key
• Reducer adds all values for x
– Result = (((1 + 2) + 3) + 4) + 5
70
Example: with combiner
• M1 => 1, 2, 3 => combiner: (1 + 2) + 3 => 6
• M2 => 4, 5 => combiner: 4 + 5 => 9
• Reducer now 6 + 9,
– I.e., (((1 + 2) + 3) + (4 + 5))
– Question: is it the same as ((((1 + 2) + 3) + 4) + 5)?
• Yes, since ‘+’ is associative
71
Example: with combiner
• M1 => 1, 2, 3 => combiner: (1 + 2) + 3 => 6
• M2 => 4, 5 => combiner: 4 + 5 => 9
• Reducer may also compute 9 + 6,
– I.e., (4 + 5) + ((1 + 2) + 3)
– Since values may arrive at reducer in any order
– Question: is it the same as (((1 + 2) + 3) + 4) + 5?
• Yes, since ‘+’ is also commutative
72
General requirements
• To use reduce function ‘f’ for a combiner
– Consider a set of values S and its subsets S1, …, Sk
– It must be that: f(S) = f(f(S1), …, f(Sk))
• sum(S) = sum(sum(S1), sum(S2))
• avg([1,2,3,4,5]) = avg(avg([1,2,3]), avg([4,5]))
• avg([1,2,3,4,5]) ?= f(sum_cnt([1,2,3]), sum_cnt([4,5]))
• f: adding up sum => global sum; local cnt => global cnt
• Combiners: [1,2,3] => (‘LA’, (6,3)); [4,5] => (‘LA’,(9, 2))
• Reducers: (‘LA’, [(6,3), (9,2)]) => ‘LA’, 15/5 = 3
• E.g., in WordCount:
– f = sum
– S = a list of integers
73
Commutative and associative
• Examples
– Sum
– Max
– Min
• Non-examples
– Count
– Average
– Median
74
Custom combiner
• Key & value data type of both input & output
– Should be same as that of the output of Mapper
– (Also the same as that for input to Reducer)
• So if Mapper outputs (Text, IntWritable), then:
– public static class MyCombiner
extends Reducer {

}
75
Enabling combiner
• job.setCombinerClass(IntSumReducer.class)
– To use reduce function for combiner
76
Roadmap
• Hadoop architecture
• MapReduce framework
• Compile & run MapReduce programs
– On Amazon EC2
77
Hadoop installation
• Install the Hadoop package
– Log into your EC2 instance and then execute:
• wget
https://downloads.apache.org/hadoop/common/hado
op-3.3.1/hadoop-3.3.1.tar.gz
• Might want to remove installation package
(~200MB) to save space
78
Install java sdk
• sudo yum install java-1.8.0-devel
– 1.8 is needed for spark
79
Setup environment variables
• Edit ~/.bashrc by adding the following:
– export JAVA_HOME=/usr/lib/jvm/java
– export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
– export HADOOP_HOME=/home/ec2-user/hadoop-3.3.1
– export
PATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:${PATH}
• source ~/.bashrc
This assumes that you installed hadoop
right under home directory
– This is to get the new variables in effect
– Or you may also log out and log in again
80
Run Hadoop in standalone mode
• Comment out element (if exists) in
/etc/hadoop/core-site.xml as shown
below
– If you have one…
81
Examples
• You may find example codes here:
– /home/ec2-user/hadoop3.3.1/share/hadoop/mapreduce/sources/hadoopmapreduce-examples-3.3.1-sources.jar
• unzip hadoop-mapreduce-examples-3.3.1-sources.jar
• Find WordCount.java and more under
• org/apache/hadoop/examples/
82
WordCount.java
• Copy WordCount.java to a working directory
of your choice
– E.g., ~/dsci551
• Comment out first line:
– // package org.apache.hadoop.examples;
83
Compile & run
• Go to the directory that has WordCount.java
• hadoop com.sun.tools.javac.Main
WordCount.java
• jar cf wc.jar WordCount*.class
• hadoop jar wc.jar WordCount input-hello outputhello
84
More examples
• Two input splits
• Multiple reducers
• SQL implemented in MapReduce
– Selection, projection, group by, having
– Join
85
Two-split example
• Now input directory has two files
=> Two splits (hence two map tasks) generated, one
for each file
86
Processing one split (Mapper)
• Split for “helloworld2.txt”
– This shows map function is called 3 times
– One for each line of text
87
Processing one split (combiner)
• This shows input key-values for combiner
– Note combiner uses the same reduce function
88
Process the other split (Mapper)
• helloworld1.txt
89
Process the other split (combiner)
• This shows the input to the combiner for the
2nd Map task
90
Reducer input (one Reducer)
• Assume only one Reducer is used
– Note the input values now contain local counts
91
Setting number of Reducers
• job.setNumReduceTasks(2);
– Two reduce tasks
92
Two-Reducer case
• Note “that” is in one partition
• “hello”, “this”, “world” in the other
93
Two output files
• Part-r-00000 for partition 00000
• Part-r-00001 for partition 00001
94
More examples
• Two input splits
• Multiple reducers
• SQL implemented in MapReduce
– Selection, projection, group by, having
– Join
95
Example
100,John,25,M
200,Mary,23,F
300,David,23,M
400,Bill,26,M
500,Jennifer,20,F
600,Maria,28,F
Map(key, value):
toks = tokenize(value, ‘,’)
name = toks[1]
gender = toks[3]
If (gender == ‘M’):
output(name, 1)
(John, 1)
(David, 1)

SELECT name
FROM Emp
WHERE gender = ‘M’
96
Example
100,John,25,M
200,Mary,23,F
300,David,23,M
400,Bill,26,M
500,Jennifer,20,F
600,Maria,28,F
Map(key, value):
toks = tokenize(value, ‘,’)
gender = toks[3]
name = toks[1]
if (gender == ‘M’):
output(name, 1)
SELECT name
FROM Emp
WHERE gender = ‘M’
97
Example
100,John,25,M
200,Mary,23,F
300,David,23,M
400,Bill,26,M
500,Jennifer,20,F
600,Maria,28,F
Map(key, value):
Toks = tokenize(value, ‘,’)
name = toks[1]
gender = toks[3]
if gender == ‘M’:
output(name, 1)
No need for reduce
SELECT name
FROM Emp
WHERE gender = ‘M’
98
Example
Id, name, age, gender
• Employee.txt
100,John,25,M
200,Mary,23,F
300,David,23,M
400,Bill,26,M
500,Jennifer,20,F
600,Maria,28,F
SELECT gender, count(*)
FROM Emp
WHERE age > 22
GROUP BY gender
Having count(*) > 2
Map(key, value):
Toks = tokenize(value, ‘,’)
Age = Toks[2]
Gender = Toks[3]
if (Age > 22):
output(gender, 1)
(M,1)
(F, 1)
(M,1)
(M,1)
(F,1)
 (M, [1,1,1])
 (F, [1,1])
M, 3
F, 2
99
Example
Id, name, age, gender
• Employee.txt
100,John,25,M
200,Mary,23,F
300,David,23,M
400,Bill,26,M
500,Jennifer,20,F
600,Maria,28,F
SELECT gender, count(*)
FROM Emp
WHERE age > 22
GROUP BY gender
Having count(*) > 2
Map(key, value):
Toks = tokenize(value, ‘,’)
Age = Toks[2]
Gender = Toks[3]
if (Age > 22):
output(gender, 1)
Reduce(key, values):
cnt = sum(values) // expand it yourself
if (cnt > 2):
M, 3
output(key, cnt)
F, 2
100
Join
• R(A, B)
S(A, C)
• Map:
– r(a, b) => (a, (‘R’, b))
– s(a, c) => (a, (‘S’, c))
• Reduce:
– Joining every R tuple with every S tuple with same key
– (a, [(‘R’, b), (‘S’, c1), (‘S’, c2)]) => (a, (b, c1)), (a, (b, c2))
)
101
Join
• Dangling tuples:
– Key with values from only one relation
– (a, [(‘R’, b)]) => left dangling
– (a, [(‘S’, c)]) => right dangling
102
Implementation
• Each relation stored as a text file
– In different input directories (say R and S)
– E.g., R/tuples.txt, S/tuples.txt
• bin/hadoop jar join.jar R S output
• Need to use MultipleInputs class
– org.apache.hadoop.mapreduce.lib.input.MultipleI
nputs
103
Implementation
• MultipleInputs.addInputPath(job,
new Path(args[0]),
KeyValueTextInputFormat.class,
RMapper.class);
Reading tuples from R
• tuples.txt contains tab-separated key-value tuples
– john
– mary
– …
25
36
• RMapper handles:
– r(a, b) => (a, (‘R’, b))
104
Removing verbose messages
• Modify:
– /home/ec2-user/hadoop3.1.4/etc/Hadoop/hadoop-env.sh
# Default log4j setting for interactive commands
# Java property: hadoop.root.logger
export HADOOP_ROOT_LOGGER=ERROR,console
105
Resources & readings
• MapReduce tutorial from Apache:
– https://hadoop.apache.org/docs/stable/hadoopmapreduce-client/hadoop-mapreduce-clientcore/MapReduceTutorial.html
• MapReduce tutorial from Yahoo! – module 4
– https://developer.yahoo.com/hadoop/tutorial/mo
dule4.html
106
Readings
• J. Dean and S. Ghemawat, MapReduce:
simplified data processing on large clusters,”
Communications of the ACM, vol. 51, pp. 107113, 2008.
107
Apache Spark
DSCI 551
Wensheng Wu
1
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
2
History
Apache took over Hadoop
3
Characteristics of Hadoop
• Acyclic data flow model
– Data loaded from stable storage (e.g., HDFS)
– Processed through a sequence of steps
– Results written to disk
• Batch processing
– No interactions permitted during processing
4
Problems
• Ill-suited for iterative algorithms that requires
repeated reuse of data
– E.g., machine learning and data mining algorithms
such as k-means (clustering), PageRank, logistic
regression
• Ill-suited for interactive exploration of data
– E.g., OLAP on big data
5
Spark
• Support working sets (of data) through RDD
– Enabling reuse & fault-tolerance
• 10x faster than Hadoop in iterative jobs
• Interactively explore 39GB (Wikipedia dump)
with sub-second response time
– Data were distributed over 15 EC2 instances
6
Spark
• Provides libraries to support
– embedded use of SQL
– stream data processing
– machine learning algorithms
– processing of graph data
7
Spark
• Support diverse data sources including HDFS,
Cassandra, HBase, and Amazon S3
8
RDD: Resilient Distributed Dataset
• RDD
– Read-only, partitioned collection of records
– Operations performed on partitions in parallel
– Maintain lineage for efficient fault-tolerance
• Methods of creating an RDD
– from an existing collection (e.g., Python list/tuple)
– from an external file
9
RDD: Resilient Distributed Dataset
• Distributed
– Data are divided into a number of partitions
– & distributed across nodes of a cluster to be
processed in parallel
• Resilient
– Spark keeps track of transformations to dataset
– Enable efficient recovery on failure (no need to
replicate large amount of data across network)
10
Architecture
• SparkContext (SC) object coordinates the
execution of application in multiple nodes
– Similar to Job Tracker in Hadoop MapReduce
SC: sending tasks
Acquiring resources
Executor: sending responses
11
Components
• Cluster manager
– Allocate resources across applications
– Can run Spark’s own cluster manager or
– Apache YARN (Yet Another Resource Negotiator)
• Executors
– Run tasks & store data
12
Accessing Spark from Python
• Interactive shell:
– pyspark
– A SparkContext object sc will be automatically
created
• pyspark –master local[4]
– This starts Spark on local host with 4 threads
– “–master” specifies the location of Spark master
node
13
Accessing Spark from Python
• Standalone program
– Executed using spark-submit script
– E.g., spark-submit wc.py
• You may find many Python Spark examples
under
– examples/src/main/python
14
wc.py
from pyspark import SparkContext
from operator import add
sc = SparkContext(appName=”dsci351″)
Make sure you have this file
under the same directory
where wc.py is located
lines = sc.textFile(‘hello.txt’)
counts = lines.flatMap(lambda x: x.split(‘ ‘)) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for v in output:
print(v[0], v[1])
15
hello.txt
hello world
hello this world
16
Suppress verbose log messages
• cd conf
• cp log4j.properties.template log4j.properties
• edit log4j.properties
– change first line to:
• log4j.rootCategory=ERROR, console
– Or to:
• log4j.rootCategory=WARN, console
17
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
18
Creating an initial RDD
• From an external file
– textFile(, [# of partitions])
– lines = sc.textFile(“hello.txt”, 2)
• From an existing Python collection (e.g., list,
tuple, and dictionary)
– data = sc.parallelize([1, 2, 3, 4, 5], 2)
– create two partitions from given list
19
Creating RDD from an external file
• lines = sc.textFile(“hello.txt”) # lines is an RDD
– Return a collection of lines
– Spark does not check if file exists right away
– Nor does it read from the file now
20
Action
• Perform a computation on an RDD
– Return a final value (not an RDD) to client
• Usually the last operation on an RDD
• E.g., reduce(func)
– aggregates all elements in the RDD using func
– returns aggregated value to client
21
Actions
• getNumPartitions()
• foreachPartition(func)
• collect()
• take(n)
• count(), sum(), max(), min(), mean()
• reduce(func)
• aggregate(zeroVal, seqOp, combOp)
• takeSample(withReplacement, num, [seed])
• countByKey()
22
getNumPartitions()
• How many partitions does an RDD have?
• E.g., lines.getNumPartitions()
=> 1
• E.g., data.getNumPartitions()
=> 2
23
foreachPartition(func)
• What are in each partition?
• def printf(iterator):
par = list(iterator)
print ‘partition:’, par
Iterator for the list of elements
in the partition
• sc.parallelize([1, 2, 3, 4, 5], 2).foreachPartition(printf)
=>
partition: [3, 4, 5]
partition: [1, 2]
24
collect()
• Show the entire content of an RDD
• sc.parallelize([1, 2, 3, 4, 5], 2).collect()
• collect()
– Fetch the entire RDD as a Python list
– RDD may be partitioned among multiple nodes
– collect() brings all partitions to the client’s node
• Problem:
– may run out of memory when the data set is large
25
take(n)
• take(n): collect first n elements from an RDD
• l = [1,2,3,4,5]
• rdd = sc.parallelize(l, 2)
• rdd.take(3)
=>
[1,2,3]
26
count()
• Return the number of elements in the dataset
– It first counts in each partition
– Then sum them up in the client
• l = [1,2,3,4,5]
• rdd = sc.parallelize(l, 2)
• rdd.count()
=> 5
27
sum()
• Add up the elements in the dataset
• l = [1,2,3,4,5]
• rdd = sc.parallelize(l)
• rdd.sum()
=> 15
28
reduce(func)
• Use func to aggregate the elements in RDD
• func(a,b):
– Takes two input arguments, e.g., a and b
– Outputs a value, e.g., a + b
• func should be commutative and associative
– Applied to each partition (like a combiner)
29
reduce(func)
• func is continually applied to elements in RDD
– [1, 2, 3]
– First, compute func(1, 2) => x
– Then, compute func(x, 3)
• If RDD has only one element x, it outputs x
• Similar to reduce() in Python
30
Recall Python example
• def add(a, b): return a + b
• reduce(add, [1, 2, 3])
6
Or simply reduce(lambda a, b: a + b, [1, 2, 3])
31
Spark example
• def add(a, b): return a + b
• data = sc.parallelize([1, 2, 3], 2)
• data.reduce(add)
6
Or simply: data.reduce(lambda a, b: a + b)
32
Implementation of reduce(func)
• Suppose [1, 2, 3, 4, 5] => two partitions:
– [1, 2] and [3, 4, 5]
• rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
• Consider reduce(add)
33
Local reduction
• Apply add to reduce each partition locally
– Using mapPartition(func) (see transformations)
• Func: apply ‘add’ function to reduce a
partition
– E.g., using Python reduce function
– reduce(add, [1, 2]) => 3
– reduce(add, [3, 4, 5]) => 12
34
Global reduction
• Collect all local results
– using collect()
=> res = [3, 12]
• Use Python reduce to obtain final result
– reduce(add, res) => reduce(add, [3, 12]) =15
35
Example: finding largest integers
• data = [5, 4, 4, 1, 2, 3, 3, 1, 2, 5, 4, 5]
• pdata = sc.parallelize(data)
• pdata.reduce(lambda x, y: max(x, y))
5
• Or simply: pdata.reduce(max)
36
aggregate(zeroValue, seqOp, combOp)
But note reduce here is different from that in Python:
zeroValue can have different type than values in p
• For each partition p (values in the partition),
– “reduce”(seqOp, p, zeroValue)
– Note if p is empty, it will return zeroValue
• For a list of values, vals, from all partitions,
execute:
– reduce(combOp, vals, zeroValue)
37
seqOp and combOp
• seqOp(U, v):
– how to aggregate values v’s in the partition into U
– U: accumulator, initially U = zeroValue
– Note: U and v may be of different data type
• combOp(U, p):
– how to combine results from multiple partitions
– U: accumulator, initially U = zeroValue
– p: result from a partition
38
Python reduce() w/o initial value
• reduce(func, list)
• If list is empty => ERROR
• Else if list contains a single element v, return v
• Otherwise, set accumulator x = list[0]
– for each of remaining element list[i]
• x = func(x, list[i])
– Return final value of x
39
Python reduce() with initial value
• reduce(func, list, initialValue)
• Same as:
– reduce(func, [initialValue] + list)
• Note: list can be empty now
– reduce() will return initialValue when list is empty
40
reduce(f) vs aggregate(z, f1, f2)
• func in reduce(func) needs to be commutative
and associative
– While f1 and f2 in aggregate(z, f1, f2) do not need
to be
– f1: similar to the combiner function in Hadoop
• Need to specify initial value for aggregate()
– & it can be of different type than values in RDD
41
Example
• data = sc.parallelize([1], 2)
• data.foreachPartition(printf)
– P1: []
– P2: [1]
• data.aggregate(1, add, add)
– P1 => [1] => after reduction => 1
– P2 => [1] + [1] = [1, 1] => 2
– final: [1] + [1, 2] => [1, 1, 2] => 4
42
Example
• data.aggregate(2, add, lambda U, v: U * v)
– P1 => 2
– P2 => 3
– Final: [2] + [2, 3] => 2 * 2 * 3 = 12
(where [2] is zeroValue, [2,3] is the list of values
from partitions)
43
Implementing count() using
aggregate()
• data = sc.parallelize([1, 2, 3, 4, 5])
• …
44
Implementing mean() using
aggregate()
• data = sc.parallelize([1, 2, 3, 4, 5])
• …
45
takeSample(withReplacement, num,
[seed])
• Take a random sample of elements in rdd
• withReplacement: True if with replacement
• num: sample size
• optional seed: for random number generator
• Useful in many applications, e.g., k-means
clustering
46
Example
• data = sc.parallelize(xrange(10))
• data.takeSample(False, 2, 1)
– [8, 0]
47
countByKey()
• Only available on RDDs of type (K, V)
– i.e., RDD that contains a list of key-value pairs,
e.g., (‘hello’, 3)
• Return a hashmap (dictionary in Python) of (K,
Int) pairs with count for each unique key in
RDD
– Count for key k = # of tuples whose key is k
48
Example
• d = [(‘hello’, 1), (‘world’, 1), (‘hello’, 2), (‘this’,
1), (‘world’,0)]
• data = sc.parallelize(d)
• data.countByKey()
=> {‘this’: 1, ‘world’: 2, ‘hello’: 2}
49
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
50
Transformation
• Create a new RDD from an existing one
• E.g., map(func)
– Applies func to each element of an RDD
– Returns a new RDD representing mapped result
51
Lazy transformations
• Spark does not apply them to RDD right away
– Just remember what needs to be done
– Perform transformations until an action is applied
• Advantage
– Results of transformations pipelined to the action
– No need to return intermediate results to clients
=> more efficient
52
Avoid re-computation
• However, this means that the same RDD may
be recomputed multiple times if it is used in
multiple actions
=> All transformations need to be redone
=> Consequence: costly
• Solution: allow caching of RDDs in memory
– May also persist them on disk
53
Transformations
• map(func)
• filter(func)
• flatMap(func)
• reduceByKey(func)
• groupByKey()
• sortByKey(True/False)
– sortBy(keyfunc, ascending=True/False)
• distinct()
• mapPartitions(func)
54
Transformations
• join(rdd, [numTasks])
– leftOuterJoin
– rightOuterJoin
– fullOuterJoin
• aggregateByKey(zeroValue, seqOp, combOp,
[numTasks])
• mapValues(func)
• flatMapValues(func)
• union/intersection/subtract
• subtractByKey
55
Transformations
f, numPartitions=None, partitionFunc=)
• groupBy(f)
– f is a function that produces the group key
56
map(func)
• map(func): Apply a function func to each
element in input RDD
– func returns a value (could be a list)
• Output the new RDD containing the
transformed values produced by func
57
Example
• lines = sc.textFile(“hello.txt”)
• lineSplit = lines.map(lambda s: s.split())
=> [[‘hello’, ‘world’], [‘hello’, ‘this’, ‘world’]]
• lineLengths = lines.map(lambda s: len(s))
=> [11, 16]
58
filter(func)
• filter(func): return a new RDD with elements
of existing RDD for which func returns true
• func should be a boolean function
• lines1 = lines.filter(lambda line: “this” in line)
 [‘hello this world’]
• What about: lines.filter(lambda s: len(s) > 11)?
59
Notes
• data = sc.parallelize([1, 2, 3, 4, 5, 1, 3, 5], 2)
• data.map(lambda x: x if x % 2 == 0 else None).collect()
Result
• def f(x):
if x % 2 == 0:
return x
else:
pass
[None, 2, None, 4, None, None, None, None]
Same as “return None”
• data.map(f).collect()
Produce the same result as above
60
Python filter
• l = [1, 2, 3, 4, 5, 1, 3, 5]
• filter(lambda x: x % 2 == 0, l)
– [2, 4]
61
Spark implementation of filter
• def even(x): return x % 2 == 0
• data.filter(even)
Implemented as follows:
• def processPartition(iterator):
return filter(even, iterator)
• data.mapPartitions(processPartition)
62
mapPartitions(func)
• Apply transformation to a partition
– input to func is an iterator (over the elements in
the partition)
– func must return an iterable (a list or use yield to
return a generator)
• Different from map(func)
– func in map(func) applies to an element
63
Implementing aggregate()
• rdd.aggregate((0,0), combFunc, reduFunc)
• def combFunc(U, x): return (U[0] + x, U[1] + 1)
• def reduFunc(U, V): return (U[0] + V[0], U[1] +
V[1])
• def sumf(iterator):
return [reduce(combFunc, iterator, (0, 0))]
• rdd.mapPartitions(sumf).reduce(reduFunc)
64
Exercise
• Implement count() using mapPartitions() and
reduce() only
– rdd = sc.parallelize([1, 1, 2, 3, 3, 3], 2)
– rdd.count() => 6
65
flatMap(func)
• flatMap(func):
– similar to map
– But func here must return a list (or generator) of
elements
– & flatMap merges these lists into a single list
• lines.flatMap(lambda x: x.split())
=>rdd: [‘hello’, ‘world’, ‘hello’, ‘this’, ‘world’]
66
reduceByKey()
• reduceByKey(func)
– Input: a collection of (k, v) pairs
– Output: a collection of (k, v’) pairs
• v’: aggregated value of v’s in all (k, v) pairs
with the same key k by applying func
• func is the aggregation function
– Similar to func in the reduce(func, list) in Python
67
reduceByKey(func)
• It first performs partition-site reduction & then
global reduction
– By executing the same reduce function
• In other words, func needs to be commutative
and associative
• More details:
– http://spark.apache.org/docs/latest/api/python/pysp
ark.html
68
Example
• rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4),
(3,5), (2, 4), (1, 5), (2, 6)], 2)
• def printf(part):
print list(part)
• rddp.foreachPartition(printf)
– Partition 1: [(1, 2), (1, 3), (2, 2), (1, 4)]
– Partition 2: [(3, 5), (2, 4), (1, 5), (2, 6)]
69
Example
• from operator import add
• rddp.reduceByKey(add)
• It will first execute local reduce:
– Partition 1: [(1, 2), (1, 3), (2, 2), (1, 4)] => (1, 9),
(2,2)
– Partition 2: [(3, 5), (2, 4), (1, 5), (2, 6)] => (3, 5), (1,
5), (2, 10)
70
Example
• Final reduce at reducer side
– (1, 9), (1, 5) => (1, 14)
– (2, 2), (2, 10) => (2, 12)
– (3, 5) => (3, 5)
• Note that if there are two reducers, then:
– Some keys, e.g., 1, may be reduced by one reducer
– Others, e.g., 2 and 3, by the other
71
reduceByKey() vs. reduce()
• reduceByKey() returns an RDD
– Reduce values per key
• reduce() returns a non-RDD value
– Reduce all values!
72
Exercise
• Implement countByKey using reduceByKey
– rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4), (3,5),
(2, 4), (1, 5), (2, 6)], 2)
– rddp.countByKey() => {1: 4, 2: 3, 3: 1}
73
aggregateByKey
• aggregateByKey(zeroValue, combOp, reduOp)
– Input RDD: a list of (k, v) pairs
– Aggregate values for each key
• Return a value U for each key
– Note that U may be a tuple
– zeroValue: initial value for U
– combOp(U, v): (function for) local reduction
– reduOp(U1, U2): global reduction
74
Computing group averages
• rdd1 = rddp.aggregateByKey((0,0), lambda
U,v: (U[0] + v, U[1] + 1), lambda U1,U2: (U1[0]
+ U2[0], U1[1] + U2[1]))
– [(2, (12, 3)), (1, (14, 4)), (3, (5, 1))]
• rdd1.map(lambda (x, (y, z)): (x, float(y)/z))
– [(2, 4.0), (1, 3.5), (3, 5.0)]
75
Example: aggregateByKey
• data = sc.parallelize([(1, 1), (1,2), (1,3)], 2)
• data.foreachPartition(printf)
– [(1, 1)]
– [(1, 2), (1, 3)]
• data.aggregateByKey(1, add, add).collect()
– [(1, 8)]
76
Compared with aggregate()
• data = sc.parallelize([1, 2, 3], 2)
• data.foreachPartition(printf)
– [1]
– [2, 3]
• data.aggregate(1, add, add)
–9
77
aggregateByKey vs. aggregate
• zeroValue in aggregateByKey
– Used only combOp (i.e., reduction within a
partition)
• zeroValue in aggregate
– Used in both combOp and reduOp
– E.g., data.aggregate(1, add, add) => 9
78
aggregateByKey vs. reduceByKey
• aggregateByKey more general than reduceKey
– Can specify different functions for combiner and
reducer
– can specify initial value for U, the accumulator
– aggregated value may have different type than
that of value v of input RDD
• E.g., in previous example:
– v is an integer, while U is a tuple (sum, count)
79
Exercise
• Implement reduceByKey(add) using
aggregateByKey()
• rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4),
(3,5), (2, 4), (1, 5), (2, 6)], 2)
– rddp.reduceByKey(add) => [(2, 12), (1, 14), (3, 5)]
80
groupByKey()
• groupByKey()
– Similar to reduceByKey(func)
– But without func & returning (k, Iterable(v))
instead
• rddp.groupByKey()
[(2, ), (1, …), (3, …)]
81
Example
• rddp.groupByKey().mapValues(list).collect()
– mapValues converts iterable value into a list
=> [(2, [2, 4, 6]), (1, [2, 3, 4, 5]), (3, [5])]
82
groupBy()
• rddp.groupBy(lambda t:
t[0]).mapValues(list).collect()
=>
[(2, [(2, 2), (2, 4), (2, 6)]), (1, [(1, 2), (1, 3), (1, 4),
(1, 5)]), (3, [(3, 5)])]
83
sortByKey(True/False)
• sortByKey([asc])
– Sort input RDD with (k, v) pairs by key
– Ascending if asc (a boolean value) is True
• rddp.sortByKey(False).collect()
=> [(3, 5), (2, 2), (2, 4), (2, 6), (1, 2), (1, 3), (1, 4),
(1, 5)]
84
sortBy(keyfunc, ascending=True/False)
• >>> rddp.sortBy(lambda p: p[1]).collect()
– [(1, 2), (2, 2), (1, 3), (1, 4), (2, 4), (3, 5), (1, 5), (2, 6)]
• >>> rddp.sortBy(lambda p: -p[1]).collect()
– [(2, 6), (3, 5), (1, 5), (1, 4), (2, 4), (1, 3), (1, 2), (2, 2)]
• >>> rddp.sortBy(lambda p: p[1],
ascending=False).collect()
– [(2, 6), (3, 5), (1, 5), (1, 4), (2, 4), (1, 3), (1, 2), (2, 2)]
85
distinct()
• Return an RDD with distinct elements of source RDD
• data = [5, 4, 4, 1, 2, 3, 3, 1, 2, 5, 4, 5]
• pdata = sc.parallelize(data, 2)
• pdata.distinct().collect()
=> [2, 4, 1, 3, 5]
86
Exercise
• Implement distinct() using
reduceByKey()/groupByKey()
• rdd = sc.parallelize([3, 1, 2, 3, 1, 3, 3, 2])
• rdd.distinct()
=> [1, 2, 3]
87
join(rdd)
• rdd1.join(rdd2)
– Joining tuples of two RDDs on the key
– rdd1: an RDD containing a list of (k, v)’s
– rdd2: another RDD containing a list of (k, w)’s
• Output an RDD containing (k, (v, w))’s
– That is, (k, v) joins with (k, w) => (k, (v, w))
88
Example
• ds1 = sc.parallelize([(1,2), (2,3)])
• ds2 = sc.parallelize([(2,4), (3,5)])
• ds1.join(ds2)
– [(2, (3, 4))]
89
Outer joins
• Also retain dangling tuples
• ds1.leftOuterJoin(ds2)
– [(1, (2, None)), (2, (3, 4))]
• ds1.rightOuterJoin(ds2)
– [(2, (3, 4)), (3, (None, 5))]
• ds1.fullOuterJoin(ds2)
– [(1, (2, None)), (2, (3, 4)), (3, (None, 5))]
90
mapValues
• mapValues(func)
– For each key, apply func to each value of the key
• x = sc.parallelize([(“a”, [“apple”, “banana”,
“lemon”]), (“b”, [“grapes”])])
• x.mapValues(lambda l: len(l)).collect()
– [(‘a’, 3), (‘b’, 1)]
91
flatMapValues(func)
• mapValues part
– For each key k, apply func to its value, return a list
[i1, i2, …]
• flatMap part
– flatten the lists into a single list but retain the key
=> [(k, i1), (k, i2), …, (k’, i1′), (k’, i2′), …]
92
Example
• rdd = sc.parallelize([(1, “hello world”), (2,
“hello this world”)])
– For example, 1 and 2 may be document id’s
• rdd2 = rdd.flatMapValues(lambda s: s.split())
– [(1, ‘hello’), (1, ‘world’), (2, ‘hello’), (2, ‘this’), (2,
‘world’)]
93
Exercise
• Use mapValues() and flatMap() implement
flatMapValues() in the previous slide
94
union(rdd)
• rdd1.union(rdd2)
– Returns all elements in rdd1 and rdd2
– Does not remove duplicates (so bag union)
2 partitions
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3], 2)
2 partitions
• rdd2 = sc.parallelize([1, 2, 2, 5], 2)
4 partitions
• rdd1.union(rdd2)
– [1, 1, 2, 3, 3, 3, 1, 2, 2, 5]
95
intersection(rdd)
• rdd1.intersection(rdd2)
– Returns elements in both rdd1 and rdd2
– Duplicates will be removed! (so set-semantics)
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])
• rdd2 = sc.parallelize([1, 2, 2, 5])
• rdd1.intersection(rdd2)
– [2, 1]
96
subtract(rdd)
• rdd1.subtract(rdd2)
– Return values in rdd1 that do not appear in rdd2
– Note: neither set nor bag semantics!
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])
• rdd2 = sc.parallelize([1, 2, 2, 5])
• rdd1.subtract(rdd2)
– [3, 3, 3]
– Note: 1 not included in result (unlike bag difference)
97
subtractByKey(rdd)
• rdd1.subtractByKey(rdd2)
– Return each (key, value) pair in rdd1 that has no pair
with matching key in rdd2
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3]).map(lambda
x: (x, 1))
• rdd2 = sc.parallelize([1, 2, 2, 5]).map(lambda x:
(x, 1))
• rdd1.subtractByKey(rdd2)
– [(3, 1), (3, 1), (3, 1)]
98
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
99
WordCount
• from operator import add
• lines = sc.textFile(“hello.txt”)
• counts = lines.flatMap(lambda x: x.split(‘ ‘)) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
• counts.collect()
=> [(u’this’, 1), (u’world’, 2), (u’hello’, 2)]
100
Word length histogram
• long: if > 4 letters
• short: otherwise
• def myFunc(x):
if len(x) > 4:
return (‘long’, 1)
else:
return (‘short’, 1)
101
Word length histogram
• sc.textFile(“hello.txt”) \
.flatMap(lambda x: x.split(” “)) \
.map(myFunc) \
.reduceByKey(add) \
.collect()
=> [(‘short’, 1), (‘long’, 4)]
102
Adding ratings for each person
Ratings.txt
(patrick, 4)
(matei, 3)
(patrick, 1)
(aaron, 2)
(aaron, 2)
(reynold, 1)
(aaron, 5)
(aaron, 9)
(patrick, 5)

103
Adding ratings for each person
• sc.textFile(“ratings.txt”) \
.map(lambda s: s[1:-1].split(“,”)) \
.collect()
Strip off ()
=>
[[u’patrick’, u’4′], [u’matei’, u’3′], [u’patrick’, u’1′],
[u’aaron’, u’2)’], [u’aaron’, u’2′], [u’reynold’, u’1′],
[u’aaron’, u’5′]]
104
Adding ratings for each person
• sc.textFile(“ratings.txt”) \
.map(lambda s: s[1:-1].split(“,”)) \
.map(lambda p: (p[0], int(p[1]))) \
.reduceByKey(lambda a, b: a + b) \
.collect()
=> [(u’patrick’, 5), (u’aaron’, 9), (u’reynold’, 1),
(u’matei’, 3)]
105
Execution steps
• Note that reduceByKey requires shuffling
Strip-off ()
Tokenize by ‘,’
Turn values
into integers
106
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
107
Shuffling
• Data are essentially repartitioned
– E.g., reduceByKey repartitions the data by key
• A costly operation: a lot of local & network
I/O’s
108
Another example: sortByKey
• Sampling stage:
– Sample data to create a range-partitioner
– Ensure even partitioning
• “Map” stage:
– Write (sorted) data to destined partition for reduce
stage
Data are shuffled between Map and Reduce stage
• “Reduce” stage:
– get map output for specific partition
– Merge the sorted data
109
Transformations that require shuffling
• reduceByKey(func)
• groupByKey()
• sortByKey([asc])
• distinct()
110
Transformations that require shuffling
• join(rdd):
– leftOuterJoin
– rightOuterJoin
– fullOuterJoin
• aggregateByKey(zeroValue, seqOp, combOp)
• intersection/subtract
• subtractByKey
111
Transformations that do not need
shuffling
• map(func)
• filter(func)
• flatMap(func)
• mapValues(func)
• union
• mapPartitions(func)
112
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
113
RDD persistence
• rdd.persist()
• Store the content of RDD for later reuse
– storageLevel specifies where content is stored
– E.g., in memory (default) or on disk
• rdd.persist() or rdd.cache()
– Content stored in main memory
114
RDD persistence
• Executed at nodes having partitions of RDD
• Avoid re-computation of RDD in reuse
115
Example
• ratings = sc.textFile(“ratings.txt”) \
.map(lambda s: s[1:-1].split(“,”)) \
.map(lambda p: (p[0], int(p[1]))) \
.cache()
• ratings.reduceByKey(lambda a, b: a +
b).collect()
– ratings RDD will be computed for the first time &
result cached
116
Example
• ratings.countByKey()
– It will use cached content of “ratings” rdd
117
Automatic persistence
• Spark automatically persists intermediate data
in shuffling operations (e.g., reduceByKey)
• This avoids re-computation when node fails
118
K-means clustering
• Find k clusters in a data set
– k is pre-determined
• Iterative process
– Start with initial guess of centers of clusters
– Repeatedly refine the guess until stable (e.g.,
centers do not change much)
• Need to use data set at each iteration
119
K-means clustering
• Assign point p to the closest center c
– Distance = Euclidean distance between p and c
• Re-compute the centers based on assignments
• Coordinates of center of a cluster =
– Average coordinate of all points in the cluster
– E.g., (1, 1, 1) (3, 3, 3) => center: (2, 2, 2)
120
K-means clustering
Iteration 6
1
2
3
4
5
3
2.5
2
y
1.5
1
0.5
0
-2
-1.5
-1
-0.5
0
0.5
1
1.5
2
x
121
Persist data points in memory
Initial centers
New centers
Sum of distances
between new and old
centers
122
Parse input & find closest center
123
kmeans-data.txt
• A text file contains the following lines
– 0.0 0.0 0.0
– 0.1 0.1 0.1
– 0.2 0.2 0.2
– 9.0 9.0 9.0
– 9.1 9.1 9.1
– 9.2 9.2 9.2
• Each line is a 3-dimensional data point
124
Parse & cache the input dataset
• “data” RDD is now cached in main memory
125
Generating initial centers
• Recall takeSample() action
– False: sample without replacement
–K=2
126
Assign point to its closest center
• Center 0 has points: (0, 0, 0) and (.1, .1, .1)
• Center 1 has the rest: (.2, .2, .2), (.9, .9, .9), …
127
Getting statistics for each center
• pointStats has a key-value pair for each center
• Key is center # (0 or 1 for this example)
• Value is a tuple (sum, count)
– sum = the sum of coordinates over all points in
the cluster
– Count = # of points in the cluster
128
Computing coordinates of new centers
• Coordinate = sum of point coordinates/count
– E.g., center 0: [.1, .1, .1] /2 = [.05, .05, .05]
Can use mapValues here too:
newPoints1 = pointStats.mapValues(lambda stv: stv[0]/stv[1]).collect()
129
Distance btw new & old centers
• Old center: [.1, .1, .1] and [.2, .2, .2]
• New center: [.05, .05, .05] and [6.875, 6.875,
6.875]
• Distance = (.1-.05)2*3 + (6.875-.2)2*3 ~ 133.67
– To be more exact, it is sqrt(133.67) = 11.56
130
RDD operations
• A complete list:
– http://spark.apache.org/docs/latest/api/python/p
yspark.html
131
Resources
• Spark programming guide:
– https://spark.apache.org/docs/latest/
• Lambda, filter, reduce and map:
– http://www.python-course.eu/lambda.php
• Improving Sort Performance in Apache Spark: It’s
a Double
– http://blog.cloudera.com/blog/2015/01/improvingsort-performance-in-apache-spark-its-a-double/
132
Readings
• Spark: Cluster Computing with Working Sets,
2010.
• Resilient Distributed Datasets: A Fault-Tolerant
Abstraction for In-Memory Cluster Computing,
2012.
133
References
• Functional programming in Python
– https://docs.python.org/2/howto/functional.html
• Learning Spark by Matei Zaharia, et. Al.
O’Reilly, 2015
– https://www.safaribooksonline.com/library/view/l
earning-spark/9781449359034/
134
References
• Sort-based shuffle implementation
– https://issues.apache.org/jira/browse/SPARK2045
• Sort-Based Shuffle in Spark
– https://issues.apache.org/jira/secure/attachment/
12655884/Sort-basedshuffledesign.pdf
135

Save Time On Research and Writing
Hire a Pro to Write You a 100% Plagiarism-Free Paper.
Get My Paper
Still stressed from student homework?
Get quality assistance from academic writers!

Order your essay today and save 25% with the discount code LAVENDER