Sunday, January 25, 2015

#apachespark for Hadoop programmers

 

Apache spark provides many advantages over Hadoop. Following are the important differences to consider before starting with Spark

Apache Spark API Hadoop API
The input is an RDD of Strings only, not of key-value pairs Mappers and Reducers always use key-value pairs as input and output
Tuple is the equivalent of key values. ReduceByKey is the equivalent

A Reducer reduces values per key only

Mapper should always return 1 record. Filter has to be used to remove unwanted records A Mapper or Reducer may emit 0, 1 or more key-value pairs for every input
Always returns typed results. Functions like flatten,flatmap, map and reduce have to be used in combination with GroupByKey. A worker may run out of memory if above function are improperly applied Mappers and Reducers may emit any arbitrary keys or values, not just subsets or transformations of those in the input
The Spark map() and flatMap() methods only operate on one input at a time though, and provide no means to execute code before or after transforming a batch of values. The nearest equivalent is mapPartitions. Mapper and Reducer objects have a lifecycle that spans many map() and reduce() calls. They support a setup() and cleanup() method, which can be used to take actions before or after a batch of records is processed

Other than the API differences there a lot of fundamental differences the way apache spark works

It provides

  • Caching + in memory computation
  • RDD(Resilient Distributed Data set): an RDD is the main abstraction of spark. It allows recovery of failed nodes by re-computation of the DAG while also supporting a more similar recovery style to Hadoop by way of checkpointing, to reduce the dependencies of an RDD. Storing a spark job in a DAG allows for lazy computation of RDD's and can also allow spark's optimization engine to schedule the flow in ways that make a big difference in performance
  • Spark API: Hadoop MapReduce has a very strict API that doesn't allow for as much versatility. Since spark abstracts away many of the low level details it allows for more productivity. Also things like broadcast variables and accumulators are much more versatile than DistributedCache and counters
  • As a product of in memory computation spark sort of acts as it's own flow scheduler. Whereas with standard MR you need an external job scheduler like Azkaban or Oozie to schedule complex flows
  • Scala API. Scala stands for Scalable Language and is clearly the best language to choose for parallel processing. They say Scala cuts down code by 2-5x, but in my experience from refactoring code in other languages - especially java mapreduce code, its more like 10-100x less code. Seriously I have refactored 100s of LOC from java into a handful of Scala / Spark. Its also much easier to read and reason about. Spark is even more concise and easy to use than the Hadoop abstraction tools like pig & hive, its even better than Scalding.
  • Spark has a repl / shell. The need for a compilation-deployment cycle in order to run simple jobs is eliminated. One can interactively play with data just like one uses Bash to poke around a system
  • Spark has much lower per job and per task overhead. It gives it ability to be applied to the cases where Hadoop MR is not applicable. It is cases when reply is needed in 1-30 seconds.
    Low per task overhead makes Spark more efficient for even big jobs with a lot of short tasks. As a very rough estimation - when task takes 1 second Spark will be 2 times more efficient then Hadoop MR
  • Spark has lower abstraction then MR - it is graph of computations. As a result it is possible to implement more efficient processing then MR - specifically in cases when sorting is not needed. In other words - in MR we always pay for the sorting, but in Spark - we do not have to.

     

References :

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

http://stackoverflow.com/questions/24705724/is-caching-the-only-advantage-of-spark-over-map-reduce

Wednesday, January 14, 2015

Apache Spark Design Patterns - Using #Scala #apache-spark - Series -1 ; The word count

A simple word count using scala in Spark
Simple word count example - Click to see code
There are many limitations in the above code The objective is to count words in the post, however the Posts.xml has lot of meta-data like OwnerUserId,Title,Tags etc..The info we need is in the Body.
The missing logic is
1) Count words in the Body
2) Error handling
3) Data clean up - we don’t count single quotes, special characters This example uses case classes and xml parsing which in in-built Scala.
Enhanced word count example - Click to see code

Thursday, January 08, 2015

Apache Spark Design Patterns - Using Scala #-1 The Setup

The Hardware and Software stack used  

Spark version 1.2.0
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)

scala -version
Scala code runner version 2.11.4 -- Copyright 2002-2013, LAMP/EPFL

java -version
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
Java HotSpot(TM) 64-Bit Server VM (build 24.71-b01, mixed mode)

uname -a
Linux SERVER 3.11.10-301.fc20.x86_64 #1 SMP Thu Dec 5 14:01:17 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

cat /etc/redhat-release
Fedora release 20 (Heisenbug)

Data files used
8.0G Sep 18 03:06 Comments.xml
29G Sep 18 04:34 Posts.xml
1.8G Sep 23 02:01 stackoverflow.com-Comments.7z
5.8G Sep 27 01:26 stackoverflow.com-Posts.7z
101M Sep 23 21:49 stackoverflow.com-Users.7z
895M Sep 18 04:36 Users.xml
cat /proc/cpuinfo -Click to see details
processor       : 0
vendor_id       : GenuineIntel
cpu family      : 6
model           : 23
model name      : Intel(R) Core(TM)2 Duo CPU     E8400  @ 3.00GHz
stepping        : 10
microcode       : 0xa0b
cpu MHz         : 1998.000
cache size      : 6144 KB
physical id     : 0
siblings        : 2
core id         : 0
cpu cores       : 2
apicid          : 0
initial apicid  : 0
fpu             : yes
fpu_exception   : yes
cpuid level     : 13
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx lm constant_tsc arch_perfmon pebs bts rep_good nopl aperfmperf pni dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm sse4_1 xsave lahf_lm dtherm tpr_shadow vnmi flexpriority
bogomips        : 5985.62
clflush size    : 64
cache_alignment : 64
address sizes   : 36 bits physical, 48 bits virtual
power management:

processor       : 1
vendor_id       : GenuineIntel
cpu family      : 6
model           : 23
model name      : Intel(R) Core(TM)2 Duo CPU     E8400  @ 3.00GHz
stepping        : 10
microcode       : 0xa0b
cpu MHz         : 1998.000
cache size      : 6144 KB
physical id     : 0
siblings        : 2
core id         : 1
cpu cores       : 2
apicid          : 1
initial apicid  : 1
fpu             : yes
fpu_exception   : yes
cpuid level     : 13
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx lm constant_tsc arch_perfmon pebs bts rep_good nopl aperfmperf pni dtes64 monitor ds_cpl vmx smx est tm2 ssse3 cx16 xtpr pdcm sse4_1 xsave lahf_lm dtherm tpr_shadow vnmi flexpriority
bogomips        : 5985.62
clflush size    : 64
cache_alignment : 64
address sizes   : 36 bits physical, 48 bits virtual
power management:

Apache Spark Design Patterns - Using Scala #0

Apache Spark supports both batch and streaming analysis, meaning you can use a single framework for your batch processing as well as your near real time use cases. And Spark introduces a fantastic functional programming model, which is arguably better suited for data analysis than Hadoop’s Map/Reduce API

This blog series attempts to find out if the common set of use cases can be solved using Spark.
The use-cases are based on 

http://oreil.ly/mapreduce-design-patterns
“MapReduce Design Patterns by Donald Miner and Adam Shook (O’Reilly). Copyright 2013 Donald Miner and Adam Shook, 978-1-449-32717-0.”


Thursday, November 27, 2014

Tuesday, August 02, 2011

Enable DataNucleus logs in Jboss AS7

Jboss AS7 has a new logging system. It has a centralized configuration. There are only two configuration files a) standalone.xml b) domain.xml. Standalone.xml is used when jboss is running in standalone mode. domain.xml is used in domain mode. It is recommend that these file should be changed only through management api or command line features provided by jboss. It is convenient for the developer to know the standalone.xml. Every thing (almost ) in AS7 is a module or a subsystem. To get desired results one has to locate the subsystem and add his changes. For logging the subsystem is urn:jboss:domain:logging:1.0

There are two handlers <console-handler> and a <periodic-rotating-file-handler>
There can be many <loggers>

The below logger will log every thing from DataNucleus
<logger category="DataNucleus">
<level name="DEBUG">
</level></logger>

This one restricts to JDO
<logger category="DataNucleus.JDO">
<level name="DEBUG">
</level></logger>

Detailed list of loggers are here http://www.datanucleus.org/products/accessplatform_3_0/logging.html


Wait why cant I see the debug logs ?  Because you have to increase log level of your preferred handler. I have choosen to log into a FILE as below
<periodic-rotating-file-handler autoflush="true" name="FILE">
<level name="DEBUG">
<formatter>
<pattern-formatter pattern="%d{HH:mm:ss,SSS} %-5p [%c] (%t) %s%E%n">
</pattern-formatter></formatter>
<file path="server.log" relative-to="jboss.server.log.dir">
<suffix value=".yyyy-MM-dd">
</suffix></file></level></periodic-rotating-file-handler></loggers></periodic-rotating-file-handler></console-handler>

Friday, July 29, 2011

NetBeans7 integration with Datanucleus JDO

The most important step in developing application with DataNucleus is enhancement of compiled classes. NetBeans provides powerful features for integrating the build enviromnet for Datanucleus with out any need of plugin.

Maven :- NetBeans has a native integration with maven. Any datanucleus project based on maven will open and run as is with netbeans. No changes are needed in the project nor in netbeans.

ANT :- The default build system in netbeans is Ant. Follow through for steps involved. There are two types of enhancement. 1) When we need typesafe queries datanucleus provides annotation processor. 2)Byte Code enhancement by datanucleus enhancer.

Requirements
1. Datanucleus from http://sourceforge.net/projects/datanucleus/files/datanucleus-accessplatform/ choose datanucleus-accessplatform-full-deps-3.0.0-m6.zip it has most of it.
2. http://sourceforge.net/projects/datanucleus/files/datanucleus-jca/ needed if you are working with JavaEE

Setup Libraries for datanucleus
  • Datanucleus - Containing all files from lib folder
  • Datanucleusdeps - Containing all files from deps folder

TypeSafe Queries :- Datanucleus generates addtional code for supporting type safe queries. Ensure that "Enable Annotion processing" check box is selected, which is under Project Properties | build | compiling
Enhancer :- Datanucleus provides an ant task to enhancement. This task has to be executed just after compiling all the classes in the project. Open the files tab and localte the build.xml . Paste the below code before the end of  
<target name="-post-compile" depends="init">
<path id="module.enhancer.classpath">
<pathelement path="${javac.classpath}"/>
<pathelement location="${build.classes.dir}"/>
</path>
<taskdef name="datanucleusenhancer" classpathref="module.enhancer.classpath"
classname="org.datanucleus.enhancer.tools.EnhancerTask" />
<echo message="start datanucleusenhancer"/>
<datanucleusenhancer classpathref="module.enhancer.classpath" dir="${build.classes.dir}" verbose="true">
<fileset dir="${build.classes.dir}/com/blogspot/jkook/daytrader/jdo/">
<include name="**/*.class"/>
</fileset>
</datanucleusenhancer>
<echo message="end datanucleusenhancer"/>
</target>

You output screen will show a log smilar as below

Compiling 5 source files to /NetBeansProjects/JDOTutorial/build/web/WEB-INF/classes
DataNucleus : JDO Query - com.blogspot.jkook.daytrader.jdo.JDOOrderData -> com.blogspot.jkook.daytrader.jdo.QJDOOrderData
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
Copying 2 files to /NetBeansProjects/JDOTutorial/build/web/WEB-INF/classes
start datanucleusenhancer
Jul 29, 2011 2:40:36 PM org.datanucleus.enhancer.DataNucleusEnhancer
INFO: DataNucleus Enhancer : Using ClassEnhancer "ASM" for API "JDO"
Jul 29, 2011 2:40:37 PM org.datanucleus.enhancer.DataNucleusEnhancer main
INFO: DataNucleus Enhancer (version 3.0.0.m6) : Enhancement of classes
DataNucleus Enhancer (version 3.0.0.m6) : Enhancement of classes
Jul 29, 2011 2:40:38 PM org.datanucleus.api.jdo.metadata.JDOAnnotationReader processClassAnnotations
INFO: Class "com.blogspot.jkook.daytrader.jdo.JDOOrderData" has been specified with JDO annotations so using those.
Jul 29, 2011 2:40:38 PM org.datanucleus.metadata.MetaDataManager loadClasses
INFO: Class "com.blogspot.jkook.daytrader.jdo.QJDOOrderData" has no MetaData or annotations.
Jul 29, 2011 2:40:38 PM org.datanucleus.enhancer.AbstractClassEnhancer save
INFO: Writing class file "/NetBeansProjects/JDOTutorial/build/web/WEB-INF/classes/com/blogspot/jkook/daytrader/jdo/JDOOrderData.class" with enhanced definition
Jul 29, 2011 2:40:38 PM org.datanucleus.enhancer.DataNucleusEnhancer addMessage
INFO: DataNucleus Enhancer completed with success for 1 classes. Timings : input=514 ms, enhance=290 ms, total=804 ms. Consult the log for full details
DataNucleus Enhancer completed with success for 1 classes. Timings : input=514 ms, enhance=290 ms, total=804 ms. Consult the log for full details
end datanucleusenhancer
 
compile:
compile-jsps:
Created dir: /NetBeansProjects/JDOTutorial/dist
Building jar: /NetBeansProjects/JDOTutorial/dist/JDOTutorial.war
do-dist:
dist:
BUILD SUCCESSFUL (total time: 6 seconds)


The first red line is from the datanucleus annotation processor
Next two red lines are from the ant task we just added