BDAT 1008 Spark Schemas Worksheet

Assignment

Save Time On Research and Writing
Hire a Pro to Write You a 100% Plagiarism-Free Paper.
Get My Paper

– Loading Data with SchemaINSTRUCTIONS In the lecture on Spark Structured API, we did not specify the schema of our dataset. We reliedon the inference of Spark engine which may not always be accurate. We can create a schemaby using an object of a class called StructType consisting of an array of StructFields.Moredetails on Spark Schemas can be found at this link

https://sparkbyexamples.com/spark/spark-schema-exp…

The code to load the youtube dataset used in the lectures with a schema has been provided asa guide. Once you are familiar with how to create schemas, load the stocks dataset into Spark. You can get the stocks dataset by running the following wget command:wget

https://www.dropbox.com/s/ia779cdcjfctd84/stocks

Save Time On Research and Writing
Hire a Pro to Write You a 100% Plagiarism-Free Paper.
Get My Paper

Note that dates in Spark are only recognized if they have a special format. You can treat datesas strings for simplicity. Once you have loaded the stocks datasets with the correct schema inSpark, answer ONE of the following query questions:

1. Find the top 5 stocks with the maximum average trading volume

2. Find the top 5 stocks with the maximum closing price

3. Find the top 5 stocks with the highest price change during any trading day

DELIVERABLESSubmit your code (creating the schema, loading of data as a DataFrame, and the correspondingquery) as text file. Along with your code, in a separate file, submit the screenshots of your codebeing executed. Lecture 2 – Spark Speed Details
BDAT 1008
Scala Functions
Scala Functions
• Recall Last week:
– Wrote our first program in Scala
– Set and get concept
– Type inference
• This lecture
– Functions
Scala Functions
• Why do we need a lesson on functions?
– Simple concept?
• Because of scala’s concise code, you have
to familiarize yourself with how to write
functions in different ways
• Concept of higher order functions and
anonymous functions
Scala Functions
• Create a class called TestFunction
class TestFunction {
def m(x: Int) = x + 1
val f = (x: Int) => x + 1
}
object Main1{
def main (args: Array[String]): Unit = {
}
}
Scala Functions
• Both lines of code look more or less the
same, but first one is a method and
second is a function
class TestFunction {
def m(x: Int) = x + 1
val f = (x: Int) => x + 1
}
object Main1{
def main (args: Array[String]): Unit = {
}
}
Scala Functions
• First line of code represents a method
called m. It takes an integer, adds one to
the integer and returns it
• Second line of code is a function called f
that takes in an integer, adds one and
returns it
• People refer to methods and functions in
Scala interchangeably
– Even in books
– WRONG!
Scala Functions
• In the main, create a new object of type
TestFunction and then call the method
and function and print the results
object Main1{
def main (args: Array[String]): Unit = {
val t = new TestFunction()
println(t.m(10))
println(t.f(10))
}
}
Scala Functions
• Save the code and run the application
• Should see 11 printed twice
• So output is also the same, what the
difference?
object Main1{
def main (args: Array[String]): Unit = {
val t = new TestFunction()
println(t.m(10))
println(t.f(10))
}
}
Scala Functions
• For each function in the class, Scala
creates a class file behind the scenes.
• Why? Scala treats functions as objects
• Since function is an object in Scala, you
can now pass the function as argument or
return a function from another function
Scala Methods
• Methods are not objects in Scala. Think
of the method as an attribute in a class
– Just like Java methods
• Methods are not meant to be passed as
arguments
• You can pass a method as an argument to
another method or function, but behind
the scenes Scala converts the method to
a function
– Hence the confusion
Scala Methods
• Each time a method is used as a function,
Scala will create a new instance of that
class
– This adds unnecessary overhead
When to Use Scala Functions?
• When to use Scala functions?
• If you are coding in a functional
programming context
– When there is a need to pass a function as an
argument
– Or return a function
Another function
• Let’s write a few more functions
val simpleStringFun = () => “Hi there!”
• Notice the syntax, things to the left of
the arrow are the arguments, things to
the right are the “body” of the function
• Notice that there is no return keyword
– you don’t need it, Scala adds it for you.
Another function
• Another function
val evenOrOdd = (x : Int) => {
if (x % 2 == 0)
“even”
else
“odd”
}
A few more functions
• Now let’s call these functions
println(t.simpleStringFun())
println(t.evenOrOdd(11))
A new method
• Now let’s create a new method
– Very similar to a Java method → name, list of
parameters, definition for the method
• Want to take a list of employees and
return the sum of their salaries.
def sumSalary(eList : List[Employee]) = {
var sum = 0
eList.foreach((e : Employee) => sum +=e.salary)
sum
}
Anonymous functions
• Focus on the following line of code
eList.foreach((e : Employee) => sum +=e.salary)
• What does this look like?
– It’s a function
– But what is the name of the function?
– It doesn’t have a name, it is an anonymous
function
– That is concise!
Anonymous functions
• And we can make the code more concise
• You do not need to include the type
(Employee)
– Scala’s type inference will take care of this
eList.foreach(e => sum +=e.salary)
Anonymous functions
• So let’s create a list of employees and call
this function
• First create the objects:
var joe = new Employee(“Joe”, 10000)
joe.dept = 10
var chris = new Employee(“Chris”, 5000)
chris.dept = 10
var sara = new Employee (“Sara”, 20000)
sara.dept = 10
Anonymous functions
• Then create a list and print the total
salary
val eList = List(joe, chris, sara)
println(“Total Salary: ” + t.sumSalary(eList))
• Run the program
Anonymous functions
• Now let’s say that you want to only get
the employee salaries from department 10
– Simple, take the existing code and tweak it a
bit
def sumSalaryForDept10(eList : List[Employee]) =
{
var sum = 0
eList.foreach(e => {
if (e.dept==10)
sum+=e.salary
})
sum
}
Anonymous functions
• And run the code
println(“Total Salary for department 10: ” +
t.sumSalaryForDept10(eList))
Anonymous functions
• Now let’s say that you want to get the
employee salaries not from department
10? Department 11?
• You can create more functions of course
• But you have a lot of redundant logic
• We can do something better
– Allow the user to supply the logic for which
employee to include in the sum
• How can we do this?
Anonymous functions
def sumSalarySmart (eList : List [Employee],
selection : Employee => Boolean) = {
var sum = 0
eList.foreach(e => {
if (selection(e))
sum += e.salary
})
sum
}
}
Anonymous functions
• Now print the total salary for
println(“Total Salary for dept greater than 10: ”
+ t.sumSalarySmart(eList, (x : Employee) => x.dept > 10))
Anonymous functions
• And we do not need to include the type
for the function
– Scala’s type inference will take care of it
println(“Total Salary for dept greater than 10: ”
+ t.sumSalarySmart(eList, x => x.dept > 10))
Higher order functions
• sumSalarySmart is referred to as a
higher order function
• For a function to be called ahigher order,
it has to either
– Take one or more functions as arguments
– Or return a function as a result
Higher order functions
• Higher order functions can be quite
powerful
– Duplications are eliminated
– Sending your own logic to a function is very
powerful
• It makes the code highly scalable
– To be continued ….
Summary
• Difference between functions and
methods
• How to write functions and different
style of writing function
• How to write anonymous and higher order
functions.
Next Week
• More on RDD’s
– What is it?
• Our first Spark program
– Definitely!
• More Scala programming
Lecture 2 – Spark Speed Details
Today’s Lecture
• How is Spark faster than Hadoop
– Focus on role of RDD’s
• Scala Programming
Spark Speed
• Spark fame is due to speed
– 10x to 100X claim
– We talked about the reason for the large
range
• In-memory processing is part of the
equation but not everything
• Second reason is the execution engine
• Third reason is the architecture
• Both the in-memory processing and
architecture are achieved by use of RDD’s
Spark Speed
• Most people cannot answer the reason for
Sparks speed
– Even people working in industry!
• You’ll hear buzzwords
– In-memory processing
– DAG
– RDDs
• Whenever you learn a new technology, you
need to understand fully why it is better
than present solutions
Recall Spark Solution
Hadoop Distributed File System
– fault tolerance, location of blocks, etc
Recall Spark Solution
Hadoop Distributed File System
– fault tolerance, location of blocks, etc
Spark Speed Reason #1
• Is in-memory computing special?
• Is Spark the only tool in the market that
does in-memory computing?
• NO!
• Any program or any software needs
memory to do any operation
– You need to bring data to the memory (RAM)
to operate on them
– You cannot by-pass memory
Spark Speed Reason #1: In-memory computation
• So if every enterprise software caches
data in memory …
• What’s special about Spark?
– Spark has distributed in-memory computing
– 10, 100, or 1000’s of commodity computers
– This is very unique
• In-memory computing in a distributive
environment is a whole new ballgame
– Very complex
– Main problem is to make in-memory computing
fault tolerant → not easy (more later)
Spark Speed Reason #2
• The next biggest point in achieving speed
is the Spark Execution Engine
– Similar to Hive and Pig engines
Code
Logical Plan
Optimized Logical
Plan
Physical Plans
Selected Physical
Plan
RDD s
Introduction to DAG engine
• When you write a series of commands in
Spark, it is not executed line by line
• Spark will look at all the instructions that
you have and convert them to a logical
plan
Code
Logical Plan
Optimized Logical
Plan
Physical Plans
Selected Physical
Plan
RDD s
Introduction to DAG engine
• From the logical plan, Spark creates
several physical plans
• It then choses the best physical plan and
executes it in the cluster
• This paradigm is not the same as
traditional programming
Code
Logical Plan
Optimized Logical
Plan
Physical Plans
Selected Physical
Plan
RDD s
What is a DAG?
• DAG – directed acyclic graph
– DAG is just a model for execution
– You move from one task, to another, and you
never come back to the task that you already
executed
– Think of DAG as a sequence of connected
steps.
– You can move from one step to another but
the move can only be forward and can’t be
“backward” (or cyclic)
– Your final code is DAG in nature
Hadoop’s MapReduce
• Hadoop does not have a plan for execution
• Everything is executed as is
• There is no tracking of functions that
happen on the dataset and no way to
optimize things
Spark Speed Reason #3
• Spark Architecture has been designed
ground up for efficiency and speed
• In-memory computation and DAG are
difficult
– Architecture does these in efficient manner
not to hinder overall speed of cluster
• Hadoop was a dirty design
– Just get something working
• Spark has been much more thought out
Summary
• Spark computational speed is due to a few
things
1. In-memory computation
2. Execution Engine
3. Architecture
RDD
RDDs and in-memory fault tolerance
Introduction
• We saw that in-memory computing is not
special
• But why is Spark special?
– Spark does in-memory computing in a
distributed scale
– Not easy, sounds simple but not in distributed
mode
• We want to understand what this
complexity is
– Specifically fault tolerance
Hadoop fault tolerance
MapReduce
Job
HDFS
MapReduce
Job
Block 1
Node 1
Node 5
Block 2
Node 2
Node 6
Block 3
Node 3
Node 7
Block 4
Node 4
Node 8
HDFS
Hadoop Fault Tolerance
• We assume only mappers for simplicity
• Once the first job is done, the results are
used for Job 2 (nodes 5-8)
• But when Job 2 is running, Node 7 doesn’t
care if Node 3 fails, the result is already
stored in HDFS
Spark Fault Tolerance
Spark
MapReduce Job
Spark
MapReduce Job
Block 1
Node 1
Node 5
Block 2
Node 2
Node 6
Block 3
Node 3
Node 7
Block 4
Node 4
Node 8
HDFS
Spark Fault Tolerance
• Looks the same except the everything is
kept in memory
• Data for Job 2 (in nodes 5-8) is in the
memory of nods 1-4
• KEY difference: In Spark, when Job 2 is
being executed on node 7, node 3 needs
to be alive
– Not the case with Hadoop, because result
from Node 2 was returned to HDFS
Spark Fault Tolerance
• So we have a very big problem with fault
tolerance when we keep things
intermediately in memory.
• How can we solve this?
Simple example using memory
• An example of chain calculation
+5
+9
-2
+20
43
11
Joe
Maryam
Michael
Alice
• Each person memorizes one result, each
person depends on a previous result
Simple example using memory
• Now assume Michael stops responding?
+5
+9
-2
+20
43
11
Joe
Maryam
Michael
Alice
• Without input from Michael, Alice cannot
continue with the calculation
• What’s the solution?
Simple example using memory
• We know how the results got there…
+5
+9
-2
+20
43
11
Joe
A
B=A+5
Maryam
C=B+9
Michael
D=C-2
Alice
E=D+20
• So since we know the transformations,
there is a way to get the missing data
even if Michael is sleep
Simple example using memory
+5
+9
-2
+20
43
11
Joe
A
B=A+5
Maryam
C=B+9
Michael
D=C-2
Cynthia
Alice
E=D+20
Spark Fault Tolerance
• By keeping track of all transformations
that happen to the data, we can tolerate
failure
• Spark achieves fault tolerant this way
– By keeping track of every operation or
transformation that happens to your dataset
• This information is called lineage
Interview Question
• How does Spark keep track of what you
do with your data?
• Lineage information is only possible with
RDD’s
• Resilient Distributed Dataset
– Spark tightly controls what you do with the
dataset
• You cannot work with data in Spark
without RDD
Datasets in Spark
• To refer to your dataset in Spark, you will
use functions provided by Spark to refer
to datasets
• That function will create an RDD behind
the scenes
• Can you join two datasets in Spark?
– Yes but you must use the join function
– Internally, this will turn into an RDD
• RDD’s provide information to Spark that
allow it to handle fault tolerance
Datasets in Spark
• If Spark allowed free-form data
manipulation, there is then no way to keep
track of transformation
• With Spark you have a defined set of
functions (transformations) to work with
• Hadoop MapReduce let you write Mapper
and Reducer programs to manipulate the
data
Datasets in Spark
Spark
MapReduce Job
Spark
MapReduce Job
Block 1
Node 1
Node 5
Block 2
Node 2
Node 6
Block 3
Node 3
Node 7
Block 4
Node 4
Node 8
HDFS
Datasets in Spark
Spark
MapReduce Job
Spark
MapReduce Job
Block 1
Node 1
Node 5
Block 2
Node
Node
X 2
Node 6
Block 3
Node 3
Node 7
Block 4
Node 4
Node 8
HDFS
Summary
• We see how Spark can achieve in-memory
computation and achieve fault tolerance
• Spark keeps track of all data manipulation
and transformations
– This information is called lineage
• This is possible with the help of RDDs
RDD Definition in more detail
Introduction
• We saw how RDD’s can track information
and allow fault tolerance
• But what exactly is an RDD?
• We will look into more details in this
section
What is an RDD?
• RDD is essentially a class
• Every time you apply a function to a
dataset, an RDD is object created
• An RDD object allows Spark to keep track
of all the operations and transformations
happening on the data
• An RDD object gives information about
operations and transformations that
happens to the dataset
Importance of RDDs
• RDD’s are at the core of Spark, they are
the reason for the speed improvements
behind the scenes
• Without RDD, Spark’s execution engine
will not have enough information about
the operations and transformations
• It will then not be able to form a logical
and physical plans to optimize the
execution
• RDD’s also enable fault tolerance for inmemory processing
RDDs and in-memory fault tolerance
• The most challenging problem with inmemory processing is fault-tolerance
• Spark solves the problem by keeping
track of every single operation that
happens to the data → called lineage
• All this is possible with the help of RDD
– Resilient Distributed DataSet
Need for Details of RDD
• To work with Spark you do not need to
know anything about RDD’s
• Why do we care?
– RDDs are core of spark
– Without a good technical understanding of
them it is difficult to understand faulttolerance
– We also won’t be able to understand how Spark
draws logical and physical plans
– Without understanding RDDs, we miss the
whole concept of efficiencies of the Spark
Execution Engine
What is an RDD?
• Whenever you ask a question on forums,
you get people referring you to a paper by
Matei Zaharia
• This is a research paper
• Not a good starting point for finding what
is RDD
• Paper is just confusing for most people
RDD in details
• We want to see what RDD is with a simple
use case
• We want to check the number of lines
that are in a file related errors in a log
file
Simple case code
val logfile = sc.textFile (“hdfs://localhost:8020
/BigData/logfiles.txt”)
val errors = logfile.filter(_.startsWith(“ERROR”))
val hdfs = errors.filter(_.contains(“dfs”))
hdfs.count()
Code in Detail
• First line
val logfile = sc.textFile (“hdfs://localhost:8020
/BigData/logfiles.txt”)
• We are referring to a text file in HDFS
and we are assigning to an object called
logfile
Code in Detail
val errors = logfile.filter(_.startWith(“ERROR”))
• We are filtering all lines that start with
the word “ERROR” and calling it an object
errors
Code in Detail
val hdfs = errors.filter(_.contains(“HDFS”))
• We are filtering all lines that start with
the word “ERROR” that contain the word
HDFS and calling the result hdfs
Code in Detail
hdfs.count()
• Last line, we are counting the number of
lines in the variable hdfs
Do you see the lineage?
textFile()
filter()
filter()
logfile
errors
hdfs
RDD
RDD
RDD
• We can say that HDFS is dependent on
errors and errors is dependent on log-file
• Spark refers to this dependency change as
lineage
What functions can I use with RDDs?
• If you are interested, here is a list of
functions you can use with RDDs
• https://spark.apache.org/docs/2.2.0/api/ja
va/org/apache/spark/rdd/RDD.html
Input Dataset
• Assume our dataset is in
HDFS
– And it is divided into 5
HDFS blocks as shown
Input Dataset
Block 1
Block 2
Block 3
Block 4
Block 5
logfile RDD
• When you run
the textfile()
function, our
resulting
dataset logfile
will have five
blocks
logfile
RDD
Input Dataset
textfile()
Block 1
Partition 1
Block 2
Partition 2
Block 3
Partition 3
Block 4
Partition 4
Block 5
Partition 5
Spark partitions
• Spark calls a block of the
file as partition
– So logfile refers to 5
partitions in different
nodes
– Since here we are
referring to a text file,
each partition will have
several lines of text or
records
– Spark calls records
elements
logfile
RDD
Input Dataset
textfile()
Block 1
Partition 1
Block 2
Partition 2
Block 3
Partition 3
Block 4
Partition 4
Block 5
Partition 5
errors RD
• We then use filter() function to only get
lines with word ERROR in it
RDD
Input Dataset
logfile
filter()
textfile()
Block 1
Partition 1
Block 2
Partition 2
Block 3
Partition 3
Block 4
Partition 4
Block 5
Partition 5
errors RD
• This results in errors
RDD
RDD
Input Dataset
errors
logfile
filter()
textfile()
Block 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
hdfs RDD
• We then call filter function on errors to
get hdfs
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
Final result
• We then use the count function to tally
the results
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
count()
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
RDD
RDD
Resilient Distributed Dataset
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
count()
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
Is errors an RDD?
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
count()
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
Is errors an RDD?
• First, check to see if errors is resilient or
not
• Assume that partition 2 resides in node 2
and for some reason node 2 went down
– Lost partition 2 in errors
– If we can recover partition 2 in errors, we can
say errors is resilient
– Simple: to get partition 2 in errors, run filter()
function on partition 2 of logfiles
– This is possible due to lineage
Is errors a distributed data set (DD)?
• Errors data is distributed in different
nodes
• Therefore it is a distributed dataset
Conclusion
• Errors is both resilient and a distributed
data set and hence an RDD
• Same is true for logfile and hdfs
• Therefore, each is an “RDD”
RDD’s in our example
RDD
Resilient Distributed Dataset
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
count()
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
Interview Question
• We never specified that logfiles, errors
and hdfs are RDDs
• Is Spark really treating them as RDDs?
• Run the commands and see
• Start the spark-shell
Spark shell
• Spark shell is an interactive way to
execute Spark (scala) code
• Sparks architecture is very similar to the
master slave architecture we see in
Hadoop
RDD inference
• The reason Spark knows to treat logfile,
errors and hdfs as RDDs is because of the
functions we are using
– Textfile function
– Filter funtion
• These functions are from the Spark API
and create RDD’s behind the scenes
• This helps Spark manage the dependencies
and track all the operations we do to our
data set
RDD Rigorous Definition
• RDD is actually a type
– logfile, errors and hdfs are RDDs
• API documentation for RDD
https://spark.apache.org/docs/1.6.0/api/scal
a/index.html
RDD Type Definition
• “partitioned collection of elements that
can be operated on in parallel”
– In our illustration, each RDD is made up of
fives partitions
– Each partition has a collection of elements
– Each partition is operated on in parallel across
five different nodes
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
count()
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
RDD Type Definition
• “immutable”
– Once an RDD is created you cannot change the
elements inside the RDD
– The only way to change the elements is by
applying another function to the RDD and this
will create a new RDD
RDD Properties
• Internally, RDD’s are characterized by 5
properties
• Without these properties, an RDD cannot
exist
• Last two are optional, so we will look at the
first 3
– List of partitions
– Compute function
– List of dependencies
RDD Property # 1: list of partitions
RDD
RDD
Input Dataset
RDD
errors
logfile
filter()
textfile()
hdfs
count()
filter()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
List of
partitions
RDD Property # 2: compute function
Compute function
Input Dataset
RDD
RDD
RDD
filter()
textfile()
hdfs
errors
logfile
filter()
count()
Block 1
Partition 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
List of
partitions
RDD Property #2 Details
• Property 2 says that each RDD must have
a function associated with it
• A function you apply to an RDD will be
applied to ALL the elements of the RDD.
• This means that RDD does not support
fine grain operations
– You cannot operate on specific elements in an
RDD
• We say RDD only supports coarse grain
operations
RDD Property # 3: list of dependencies
Compute function
Input Dataset
RDD
RDD
RDD
filter()
textfile()
hdfs
errors
logfile
filter()
count()
Partition 1
Block 1
Partition 1
Partition 1
Block 2
Partition 2
Partition 2
Partition 2
Block 3
Partition 3
Partition 3
Partition 3
Block 4
Partition 4
Partition 4
Partition 4
Block 5
Partition 5
Partition 5
Partition 5
List of
partitions
List of
Dependecies
RDD Property #3 Details
• Property 3 says that each RDD has a list
of dependencies on other RDDs
• In our example, hdfs RDD is depended on
errors, errors RDD is dependent on logfile
• Dependencies play a major role in
conversion of logical plans to physical
plans
Interview Question
• How does Spark decide how many
partitions it will create?
– If we are referring to file in HDFS, then the
number of partitions is determined by the
block size in HDFS.
– If on the local filesystem then it is usually
determined by the number of cores available.
Summary
• RDD enables spark to keep track of all the
operations that goes on with the dataset
• This is how Spark achieves fault tolerance
• What makes an RDD?
– RDD has information about the list of
partitions
– RDD has the function or operation that needs
to be applied to it’s partitions
– RDD has a list of RDDs that it depends on
Introduction to Spark
Course Overview
Purpose of the Course
• To get an understanding of Spark
• Spark will be the go to platform for data
processing, learning and analytics
– Fastest evolving tool in the Big Data Ecosystem
Pre-requisites For the course
• Basic understanding of Hadoop
• Scala
– We’ll go over it
– Not too difficult if you already know some
procedural language
Today’s lecture
• Introduction to Spark
– Compare to Hadoop
– How Spark is faster than Hadoop
– Some of the misconceptions about Spark
• Introduction to Scala Language
– Download IDE
– Program in Scala
– Compare to an equivalent Java implementation
Spark vs. Hadoop
• Did we learn an outgoing and obsolete
technology by learning about Hadoop?
– NO!
Spark vs. Hadoop
• Spark does not replace Hadoop
– We didn’t learn something that is useless
– But this perception exists
• We want to objectively compare the two
technologies
1) Storage
2) Computation method
3) Computational Speed
4) Resource management
• Goal is to make you be able to make tool
choices in a company
Introduction
• Spark is an execution engine that can do
fast computations on big data set
• Example: average volume by stock symbol
on a stock dataset
– Write a few lines of code (Scala, Python or Java)
– Spark will spin up the tasks on different machines
on your cluster and give your results very fast
• Seems like MapReduce
– Yes but much faster than Hadoop
– HOW though?
Comparing Spark to Hadoop
• Let’s compare Hadoop and Spark based on
the four components we talked about
1) Storage
2) Computation Model
3) Computational Speed
4) Resource management
Difference between Hadoop and Spark – Storage
• Spark does not offer a big data
storage solution
• What is the difference between Big Data
Storage and regular storage?
– The whole concept of replication, keeping
track of pieces of your data etc, in short
there is no “HDFS”
Difference – Storage
• So where does Spark load the data for
computation?
• Where does Spark store the results after
computation?
• Spark must leverage existing distributed file
systems like Hadoop’s HDFS or Cloud
services like Amazon S3 or big data
databases like HBase, Cassandra, etc
Difference – Storage
Storage
HDFS
None
• Storage is not Spark’s focus
• Spark focuses on fast computation
Difference – MapReduce
• False misconception
– “Spark job executions are fast because it does not
use the MapReduce model”
• Spark’s goal is not to find an alternative to
MapReduce
– We still have map, reduce, shuffle
• Spark replaces Hadoop’s implementation
of MapReduce with its own implementation
– Faster and more efficient
– And we’ll develop in some detail why this is in the
first two lectures of the course
Difference – Computationally Speed
• Spark Homepage → 100X faster
• But look at the details…
– Uses a logistic regression example
Comparison – Computational Speed
• The 100x improvement only true for
iterative machine learning cases
– Example logistic regression
– Keep doing the same computation on different
outputs
• Spark speeds things up (among other things)
by keeping output in memory
Iterative algorithms in Hadoop
Hadoop Distributed File System
– fault tolerance, location of blocks, etc
Problem with iterative algorithm in Hadoop
• Reading first input and writing the final
output to HDFS is unavoidable
– You have to save the data somewhere!
• But we also do two intermediate reads
and writes
Problem with iterative algorithms in Hadoop
• Intermediate reads and writes are
inefficient
– 50 GB intermediate input/output take a lot of
time → slow execution time
– Iterations compound the problem
Iterative algorithms in Spark
• Spark solution….
– Keep the intermediate data in memory
– Sounds simple, but adds a lot of complications
• Not a ground breaking thing, been done
before
• So what’s special?
– Spark achieves this while achieving fault
tolerance which is not trivial
– Details will be provided in Lecture 2
Spark Solution Graphically
Hadoop Distributed File System
– fault tolerance, location of blocks, etc
Comparison – Computational Speed
• What if you don’t have logistic
regression?
• In other cases, still faster but not
anywhere near 100x
– example average volume of stocks
• Good rule of thumb 10x
• We still want to see how this 10x
improvement happens
– It’s not just in memory processing
Machine Learning Library
• Spark has an extensive machine learning
library
• Very popular, we will focus on this
heavily in the later part of the course
Comparison So Far
Storage
HDFS
None
MapReduce
Built-in
Built-in, optimized
Speed
Fast
10-100 X Faster
Comparison – Resource Management
• In Hadoop, when you start a job, mappers
and reducers are running in the
background
• You need “resources” to get jobs done
– Nodes
– CPU
– Memory
• Hadoop uses YARN
– MapReduce jobs negotiate with YARN to get
resources for execution
Comparison – Resource Management
• Same thing applies to Spark, it needs to
look for resources
• Spark however has it’s own in-built
resource manager
– So we don’t need YARN
• But we can request that Spark use YARN
as the resource manager
• Generally, you see use of YARN because it
is very well developed
– Another nuance of the technology
Summary of Comparison
Storage
HDFS
None
MapReduce
Built-in
Built-in, optimized
Speed
Fast
10-100 X Faster
Resource
Management
YARN
Standalone
Setting up a Spark Cluster
A simple question?
• Do we need a Hadoop cluster to run
Spark?
– Short answer is no
• You can run Spark as standalone
– ie setup a bunch of computers, designate one
as a master, rest as workers and start Spark
daemons on each
• But Spark still has to get data from
somewhere….
– HDFS, S3, HBase, etc
Using a local file system with Spark
• Can we use our local file system to give
data to Spark?
– Yes, but this is very inefficient
– Spark needs to be able to run computational
tasks on any of the nodes in the cluster
– Therefore, data must be available on each node
– 1TB dataset must be copied to every single
node
– HDFS will fetch you the data
• So possible but defeats the whole purpose
of distributive computing
Existing Hadoop Cluster
• What if you are at a company with a nice
working Hadoop cluster?
– 1000’s of nodes
– You can still have Spark as a separate cluster
– Spark gets data from HDFS
• Two problems with this:
– There is an extra overhead cost of
maintaining two clusters
– No Data Locality → every time you need a
file, you have to do a copy from HDFS to
Spark (bandwidth intensive) and this will slow
you down
Best Solution
• Run Spark on top of
the Hadoop Cluster
• Spark will execute
its jobs in the same
node where the data
is stored
– Avoids the need to
bring data over the
network
– Saves costs
On top of
Final Question
• Is Spark a replacement for Hadoop? Or is
Hadoop “dying”?
– Short answer is no
• Spark enhances the Hadoop stack and is not
meant to replace it
– It enhances the speed of computations
• Spark is a great tool for Data Analysts using
PIG or HIVE,
– Use Spark for faster execution times
• Some even argue that Spark is just another
tool in the Hadoop framework like Hive and
Pig
Follow up Question
• I already have a bunch of jobs that were
implemented in traditional Hadoop
MapReduce, should I implement them in
Spark?
– Depends
– If you are creating new jobs look at Spark even if
your speed needs are already met
• Generally not recommended to convert all
your current jobs to Spark Jobs
– Look at the slow ones and migrate them to Spark
• At the end of the day though, hard to ignore
10x speed improvement
Summary
• Power of Spark lies in its computational
speed
• Its execution engine is faster than
Hadoop’s MapReduce implementation
• Spark is designed to work on top of
Hadoop leveraging YARN and HDFS
• 10x (at least) speed improvement is quite
compelling and you see quite a bit of
traction with Spark now
– And job demands
Scala Programming Language
Introduction
• Scala stands for Scalable language
• Developed in EPFL
• There are already 1000’s of programming
languages
• So why another programming language?
• Look at Scala homepage
Scala Homepage
The Art of Programming
• You can argue that there are two camps in
programming
• Object oriented programming
• Functional programming
Object-Oriented Programming/Languages
• Programming model based on “objects”
• Easy to visualize real life scenarios
Object-Oriented Example – Car dealership
Object-Oriented Example – Car dealership
Object-Oriented Example – Car dealership
Object-Oriented Programming
• A program essentially has two things
– Data
– Operations to do on the data
• In Object oriented programming, we
create a class and put data and operations
to do in one location
• This helps “visualize” what is happening
with your program
• We create instances of a class called an
“object”
Functional Programming
• There is no concept of classes and
objects
• Data and operations performed on the
data are not in a centralized location
• Functional programming treat data and
operations as two different things
• Hence, they should be kept separate
• The idea behind functional programming is
to break a big job into tiny manageable
functions
Comparison
Visualization
Real world
represetnation
Data transfomration
Comparison
Visualization
Real world
represetnation
Data transfomration
Changing
Data
Supports immutability
Embraces immutability
Comparison
Visualization
Real world
represetnation
Data transfomration
Changing
Data
Supports immutability
Embraces immutability
Higher order
functions
May support higher order
functions
Supports higher order
functions
Summary
• Each method has it’s benefits
• There is no clear “winner”
• So new programming languages try to take
a hybrid approach
• Java is a OO language
– Java 8 came up with a lot of functional
programming concepts
• Python is a hybrid
– But more strong towards functional
programming
– Reason it is popular for Big Data
Scala
• Scala was build from ground up to be
both functional and objected oriented
• Scala also has a rich set of features
Why do we care though?
• Spark is written in Scala
• Spark jobs are preferred to be written in
Scala
• You can use other programming languages
but Scala is gaining momentum
Scala popularity
Why Spark was written in Scala?
Why Spark was written in Scala?
• In layman’s terms, Scala was chosen
because it works on JVM so interaction
with Hadoop is easier
– Hadoop is written in Java
• And it also allows functional programming
– Need it for data manipulation
Scala attributes
• Is Scala easy to learn?
• Scala has concise code
– But harder to understand if you are new
• Runs on JVM
– When Java came out “write once, and run
anywhere“
– Java programs are compiled to bytecode
– You take the bytecode and can run it in any
operating system (irrespective of where it
was compiled)
– Scala code is also compiled into bytecode
Scala attributes
• Interoperability with Java
– You can refer any Java programs or libraries
from Scala and vice versa
• Scala developers made it easy for Java
developers to switch
• Java though is taking notice and we can
see this in Java 8
Basic Scala Program
Introduction to Scala
• We will write a basic Scala program and
compare it to Java
• Download the Scala IDE
– http://scala-ide.org/download/sdk.html
Creating Employee.Java
• In Java, for any class, we likely need:
– Constructor methods
– Getter methods
– Setter methods
• Look at Java code for Employee class
provided to you on blackboard
Same code in Scala
class Employee (var name : String , var salary : Int)
• Just one line!
• Two properties, name and salary
• Where are the constructors?
– Scala automatically creates a constructors with
two properties behind the scenes for us
• Where are the getters and setters?
– Don’t need it!
– Scala has an easy way to set values for
properties
Scala Development
• Scala developers basically decided that
the whole concept of getters, setters,
constructors, etc is redundant
– You are always writing the same code over and
over again
– We will take care of things behind the scenes
– So Scala code is very concise
• Aside: Python came along the same lines
– For example, no braces { } to separate your
conditional or loop statements
Basic Scala Program
• Just like Java we need an entry point for
the program – the main method
object Main {
def main(args: Array[String]): Unit = {
// code goes here}
}
• object here means that there can only be
one instance called Main
• def defines a method
• Unit is the return type and it is equivalent
to void in Java (and other languages)
Basic Scala Program
• Create an employee object
object Main {
def main(args: Array[String]): Unit = {
var joe = new Employee(“Joe”, 10000)
}
• Employee name is Joe and Salary is
$10000
Basic Scala Program
• Print name and salary
println(joe.name)
println(joe.salary)
• Note that there is no need for the
semicolon
– Again redundant if you think about it
– Can still put it for you own readability sake
• Run the code and make sure you get an
output
Basic Scala Program
• Now change the salary
joe.salary = 20000
• And print again
Basic Scala Program
• Let’s say we want to add another property
called department to the employee class
class Employee (var name : String , var salary :
Int) {
var dept = 0
}
• Print the department, should get zero
Basic Scala Program
• Change the department for Joe to “ten”
joe.dept = “ten”
• You’ll get an error
• Why?
• Even though we have not mentioned the
data type of the variable “dept” it was
inferred
• This is called type inference
Basic Scala Program
• This also makes Scala a “static type
language”
– Scala knows the type of the variable at
compile time
• Java is also statically typed, but you as a
developer must specify the type of the
variable
– It supports type inference in limited capacity
– Expect Java to make improvements here to be
compete with Scala
Basic Scala Program
• To fix the issue, change the department
for Joe to 10
joe.dept = 10
• Print the department, should get 10
Creating immutable values
• True functional programming must
support concept of immutability
– Once value is set, it cannot be changed
• The keyword “var” means that the
variable department can be changed
class Employee (var name : String , var salary :
Int) {
var dept = 0
}
Basic Scala Program
• But Scala also allows the concept of
immutability by using “val” → “value”
class Employee (var name : String , var salary :
Int) {
val dept = 0
}
• You cannot change dept now, try it you’ll
get an error
• val is comparable to final in Java
Primitive types versus object types
• Important concept to really understand
what’s “different” with Scala
• Try to convert the salary to string and
print it.
println(joe.salary.toString())
• Now try to do it with the Java code
provided
public int getSalary() {
return salary.toString();
}
Primitive types versus object types
• For the experts out there
• Salary is an integer which is a primitive
– We can only call methods on objects that are not
primitive types
• Java distinguishes between primitives and
objects
– You always have to make a conscious choice
• Scala does not
– In Scala everything is an object
– Scala is a “pure” object oriented language
– Makes your life easier
Simple method in Scala
• Write a method to print employee name
and salary they make
class Employee (var name : String , var salary :
Int) {
var dept = 0
def printEmployee() = {
println(name + ” makes $” + salary)
}
}
Simple method in Scala
• Call the method
joe.printEmployee()
Summary
• Scala is a very concise language compare
to Java
• Scala does type inference
• It is a statically typed language
• Scala does not support the concept of
primitive and everything is an object
Homework
• Scala Language training on
LinkedInLearning.com

Still stressed from student homework?
Get quality assistance from academic writers!

Order your essay today and save 25% with the discount code LAVENDER