Posts

how to use dynamic allocation in a oozie spark action on CDH5

using spark’s dynamic allocation feature in a oozie spark action can be a tricky.

enable dynamic allocation

First you need to make sure that dynamic allocation is actually available on your cluster. Navigate to your “Spark” service, then “Configuration” and search for “dynamic”.

spark_oozie_action_dynamic_allocation_enabled

Both (shuffle service + dynamic allocation) needs to be enabled.

how to configure the oozie spark action

If you just omit --num-executors in your spark-args definition your job will fall back to configuration defaults and will utilize only two executors:

spark_oozie_action_no_num_executors

If you go for --num-executors 0 your oozie workflow will fail with this strange error message:

Number of executors was 0, but must be at least 1 (or 0 if dynamic executor allocation is enabled). 

Usage: org.apache.spark.deploy.yarn.Client 

[options] Options: 
--jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode) 
--class CLASS_NAME Name of your application's main class (required) 
--primary-py-file A main Python file 
--primary-r-file A main R file 
--arg ARG Argument to be passed to

So even if you enabled the shuffle service on your cluster and set spark.dynamicAllocation.enabled to true, the spark action seems unaware of these settings.

adding missing configuration properties

To get this working you need to provide spark.dynamicAllocation.enabled=true and spark.shuffle.service.enabled=true together with --num-executors 0 in the spark-args section:

<master>yarn-cluster</master>>
<mode>cluster</mode>
<name>spark-job-name</name>
<class>com.syscrest.demo.CustomSparkOozieAction</class>
<jar>${sparkJarPath}</jar>
<spark-opts>
... your job specific options ...
--num-executors 0 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.shuffle.service.enabled=true 
</spark-opts> 
<arg>-t</arg>

spark oozie action jobs not showing up on spark history server

If you execute spark jobs within an oozie workflow using a <spark> action node on a Cloudera CDH5 cluster, your job may not show up on your spark history server. Even if you configured all these things using the cloudera manager, your history server may only lists jobs started on the commandline using spark-submit.

adding missing configuration properties

When using mode = cluster and master = yarn-cluster or yarn-client you need to provide spark.yarn.historyServer.addres and spark.eventLog.dir with the actual adress of your spark history server and the fully qualified hdfs path of the applicationHistory directory and set spark.eventLog.enabled to true.

<master>yarn-cluster</master>>
<mode>cluster</mode>
<name>spark-job-name</name>
<class>com.syscrest.demo.CustomSparkOozieAction</class>
<jar>${sparkJarPath}</jar>
<spark-opts>
... your job specific options ...
--conf spark.yarn.historyServer.address=http://historyservernode:18088
--conf spark.eventLog.dir=hdfs://nameservicehost:8020/user/spark/applicationHistory 
--conf spark.eventLog.enabled=true
</spark-opts> 
<arg>-t</arg>

With these additional configuration properties these oozie-controlled spark jobs should also show up on your spark history server.

fixing spark classpath issues on CDH5 accessing Accumulo 1.7.2

We experienced some strange NoSuchMethorError while migrating a Accumulo based application from 1.6.0 to 1.7.2 running on CDH5. A couple of code changes where necessary moving from 1.6.0 to 1.7.2, but these were pretty straightforward (members visibility changed, some getters were introduced). Everything compiled fine, but when we executed the spark application on the cluster we got an exception that was pointing directly to a line we changed during the migration:

Exception in thread "main" java.lang.NoSuchMethodError: 
        com.syscrest.HelloworldDriver$Opts.getPrincipal()Ljava/lang/String;
        at com.syscrest.HelloworldDriver.main(HelloworldDriver.java:29)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

We double checked our fat jar that we bundle the right version and then we checked the full classpath of the CDH5 spark service.

the distributed spark classpath on CDH5

We noticed that CDH5 (CDH 5.8.2 in our case) already bundles four accumulo jars:

/opt/cloudera/parcels/CDH/jars/accumulo-core-1.6.0.jar
/opt/cloudera/parcels/CDH/jars/accumulo-fate-1.6.0.jar
/opt/cloudera/parcels/CDH/jars/accumulo-start-1.6.0.jar
/opt/cloudera/parcels/CDH/jars/accumulo-trace-1.6.0.jar

And all jars in /opt/cloudera/parcels/cdh/jars are automatically inserted into the spark distributed classpath, as they are listed in /etc/spark/conf/classpath.txt.

spark.{driver,executor}.userClassPathFirst

Before tampering with the spark distributed classpath we tried to get it working using

spark-submit \
  --conf "spark.driver.userClassPathFirst=true" \
  --conf "spark.executor.userClassPathFirst=true" \
  ....

but this resulted in another mismatch of libraries on the classpath and did not solved our initial problem:

org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.(SnappyOutputStream.java:79)
at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:157)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
at scala.Option.map(Option.scala:145)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.rdd.NewHadoopRDD.(NewHadoopRDD.scala:77)
at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
at org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)

So we tried to fix the distributed spark classpath directly. Unfortunately /etc/spark/conf/classpath.txt is not modifiable via cloudera manager, and we did not want to change it manually on all nodes.

But as /etc/spark/conf/classpath.txt is being read in /etc/spark/conf/spark-env.sh

# Set distribution classpath. This is only used in CDH 5.3 and later.
export SPARK_DIST_CLASSPATH=$(paste -sd: "$SELF/classpath.txt")

we were able to use Spark Service Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh – Spark (Service-Wide) to tamper with SPARK_DIST_CLASSPATH.

In our case it was sufficient to add the correct accumulo jars (version 1.7.2 from the installed parcel) at the beginning of the list so they take precedence over the outdated ones:

SPARK_DIST_CLASSPATH=/opt/cloudera/parcels/ACCUMULO/lib/accumulo/lib/accumulo-core.jar:$SPARK_DIST_CLASSPATH
SPARK_DIST_CLASSPATH=/opt/cloudera/parcels/ACCUMULO/lib/accumulo/lib/accumulo-fate.jar:$SPARK_DIST_CLASSPATH
SPARK_DIST_CLASSPATH=/opt/cloudera/parcels/ACCUMULO/lib/accumulo/lib/accumulo-start.jar:$SPARK_DIST_CLASSPATH
SPARK_DIST_CLASSPATH=/opt/cloudera/parcels/ACCUMULO/lib/accumulo/lib/accumulo-trace.jar:$SPARK_DIST_CLASSPATH
export SPARK_DIST_CLASSPATH

Increasing mapreduce.job.counters.max on CDH5 YARN (MR2)

How to increase mapreduce.job.counters.max on YARN (MR2) for HUE / HIVE / OOZIE.
Read more