Sunday, January 25, 2015

#apachespark for Hadoop programmers

 

Apache spark provides many advantages over Hadoop. Following are the important differences to consider before starting with Spark

Apache Spark API Hadoop API
The input is an RDD of Strings only, not of key-value pairs Mappers and Reducers always use key-value pairs as input and output
Tuple is the equivalent of key values. ReduceByKey is the equivalent

A Reducer reduces values per key only

Mapper should always return 1 record. Filter has to be used to remove unwanted records A Mapper or Reducer may emit 0, 1 or more key-value pairs for every input
Always returns typed results. Functions like flatten,flatmap, map and reduce have to be used in combination with GroupByKey. A worker may run out of memory if above function are improperly applied Mappers and Reducers may emit any arbitrary keys or values, not just subsets or transformations of those in the input
The Spark map() and flatMap() methods only operate on one input at a time though, and provide no means to execute code before or after transforming a batch of values. The nearest equivalent is mapPartitions. Mapper and Reducer objects have a lifecycle that spans many map() and reduce() calls. They support a setup() and cleanup() method, which can be used to take actions before or after a batch of records is processed

Other than the API differences there a lot of fundamental differences the way apache spark works

It provides

  • Caching + in memory computation
  • RDD(Resilient Distributed Data set): an RDD is the main abstraction of spark. It allows recovery of failed nodes by re-computation of the DAG while also supporting a more similar recovery style to Hadoop by way of checkpointing, to reduce the dependencies of an RDD. Storing a spark job in a DAG allows for lazy computation of RDD's and can also allow spark's optimization engine to schedule the flow in ways that make a big difference in performance
  • Spark API: Hadoop MapReduce has a very strict API that doesn't allow for as much versatility. Since spark abstracts away many of the low level details it allows for more productivity. Also things like broadcast variables and accumulators are much more versatile than DistributedCache and counters
  • As a product of in memory computation spark sort of acts as it's own flow scheduler. Whereas with standard MR you need an external job scheduler like Azkaban or Oozie to schedule complex flows
  • Scala API. Scala stands for Scalable Language and is clearly the best language to choose for parallel processing. They say Scala cuts down code by 2-5x, but in my experience from refactoring code in other languages - especially java mapreduce code, its more like 10-100x less code. Seriously I have refactored 100s of LOC from java into a handful of Scala / Spark. Its also much easier to read and reason about. Spark is even more concise and easy to use than the Hadoop abstraction tools like pig & hive, its even better than Scalding.
  • Spark has a repl / shell. The need for a compilation-deployment cycle in order to run simple jobs is eliminated. One can interactively play with data just like one uses Bash to poke around a system
  • Spark has much lower per job and per task overhead. It gives it ability to be applied to the cases where Hadoop MR is not applicable. It is cases when reply is needed in 1-30 seconds.
    Low per task overhead makes Spark more efficient for even big jobs with a lot of short tasks. As a very rough estimation - when task takes 1 second Spark will be 2 times more efficient then Hadoop MR
  • Spark has lower abstraction then MR - it is graph of computations. As a result it is possible to implement more efficient processing then MR - specifically in cases when sorting is not needed. In other words - in MR we always pay for the sorting, but in Spark - we do not have to.

     

References :

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

http://stackoverflow.com/questions/24705724/is-caching-the-only-advantage-of-spark-over-map-reduce

Wednesday, January 14, 2015

Apache Spark Design Patterns - Using #Scala #apache-spark - Series -1 ; The word count

A simple word count using scala in Spark
Simple word count example - Click to see code
There are many limitations in the above code The objective is to count words in the post, however the Posts.xml has lot of meta-data like OwnerUserId,Title,Tags etc..The info we need is in the Body.
The missing logic is
1) Count words in the Body
2) Error handling
3) Data clean up - we don’t count single quotes, special characters This example uses case classes and xml parsing which in in-built Scala.
Enhanced word count example - Click to see code

Thursday, January 08, 2015

Apache Spark Design Patterns - Using Scala #-1 The Setup

The Hardware and Software stack used  

Spark version 1.2.0
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)

scala -version
Scala code runner version 2.11.4 -- Copyright 2002-2013, LAMP/EPFL

java -version
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
Java HotSpot(TM) 64-Bit Server VM (build 24.71-b01, mixed mode)

uname -a
Linux SERVER 3.11.10-301.fc20.x86_64 #1 SMP Thu Dec 5 14:01:17 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

cat /etc/redhat-release
Fedora release 20 (Heisenbug)

Data files used
8.0G Sep 18 03:06 Comments.xml
29G Sep 18 04:34 Posts.xml
1.8G Sep 23 02:01 stackoverflow.com-Comments.7z
5.8G Sep 27 01:26 stackoverflow.com-Posts.7z
101M Sep 23 21:49 stackoverflow.com-Users.7z
895M Sep 18 04:36 Users.xml
cat /proc/cpuinfo -Click to see details
processor       : 0
vendor_id       : GenuineIntel
cpu family      : 6
model           : 23
model name      : Intel(R) Core(TM)2 Duo CPU     E8400  @ 3.00GHz
stepping        : 10
microcode       : 0xa0b
cpu MHz         : 1998.000
cache size      : 6144 KB
physical id     : 0
siblings        : 2
core id         : 0
cpu cores       : 2
apicid          : 0
initial apicid  : 0
fpu             : yes
fpu_exception   : yes
cpuid level     : 13
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx lm constant_tsc arch_perfmon pebs bts rep_good nopl aperfmperf pni dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm sse4_1 xsave lahf_lm dtherm tpr_shadow vnmi flexpriority
bogomips        : 5985.62
clflush size    : 64
cache_alignment : 64
address sizes   : 36 bits physical, 48 bits virtual
power management:

processor       : 1
vendor_id       : GenuineIntel
cpu family      : 6
model           : 23
model name      : Intel(R) Core(TM)2 Duo CPU     E8400  @ 3.00GHz
stepping        : 10
microcode       : 0xa0b
cpu MHz         : 1998.000
cache size      : 6144 KB
physical id     : 0
siblings        : 2
core id         : 1
cpu cores       : 2
apicid          : 1
initial apicid  : 1
fpu             : yes
fpu_exception   : yes
cpuid level     : 13
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx lm constant_tsc arch_perfmon pebs bts rep_good nopl aperfmperf pni dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm sse4_1 xsave lahf_lm dtherm tpr_shadow vnmi flexpriority
bogomips        : 5985.62
clflush size    : 64
cache_alignment : 64
address sizes   : 36 bits physical, 48 bits virtual
power management:

Apache Spark Design Patterns - Using Scala #0

Apache Spark supports both batch and streaming analysis, meaning you can use a single framework for your batch processing as well as your near real time use cases. And Spark introduces a fantastic functional programming model, which is arguably better suited for data analysis than Hadoop’s Map/Reduce API

This blog series attempts to find out if the common set of use cases can be solved using Spark.
The use-cases are based on 

http://oreil.ly/mapreduce-design-patterns
“MapReduce Design Patterns by Donald Miner and Adam Shook (O’Reilly). Copyright 2013 Donald Miner and Adam Shook, 978-1-449-32717-0.”