GDELT on SCDF : Implementing a filter application

In this blog post (part of processing GDELT data with SCDF on kubernetes) we want to filter the stream created in the last post (Implementing a custom reactive source application) as it contains a lot of duplicate data.

Source Code

You can find the source code on github:

git clone

cd gdelt-article-simple-deduplication-filter

maven project setup

We use the same setup to build the neccessary jar files and docker image based on maven, spring maven plugins and Google’s JIB. See the predecessor blog post (Implementing a custom reactive source application) for details about the project structure.


Let’s start with our configuration properties. As we want to check the articles if we already have seen them (=the url or the article) before, we use a very simple cache to keep the urls for a certain amount of time. So let’s expose a single configuration propery as timeToLive:

public class GDELTDeduplicationFilterProperties {

	 * time in minutes to remember already seen article urls
	private long timeToLive = 24 * 60l;

       /** ... getter and setter omitted */

To limit the configuration options to just our own class add a file named to src/main/resources/META-INF containing:


The actual filter implementation is a very simple boot application that uses @PostConstruct to configure the cache and @Filter to annotate the actual filter implementation:

public class GDELTDeduplicationFilter {

private static final Log logger = LogFactory.getLog(GDELTDeduplicationFilter.class);

public static void main(String[] args) {, args);

private GDELTDeduplicationFilterProperties configuation;

private Cache cache

private void postConstruct() {
   cache = CacheBuilder.newBuilder().expireAfterWrite(configuation.getTimeToLive(), TimeUnit.MINUTES).build();

@Filter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
public boolean filter(Message<GDELTArticle> message) {
   GDELTArticle article = message.getPayload();
   Long currentCount = cache.getIfPresent(article.getUrl());
   if (currentCount != null) {
      cache.put(article.getUrl(), ++currentCount);"already seen " + (currentCount) + " times, filtering out article = " + article);
      return false;
   } else {
      cache.put(article.getUrl(), 1L);"seen for first time, passing thru = " + article);
      return true;

Note: This implementation is a very simple implementation of a deduplication filter. It will not gurantee 100 percent accuracy as urls are just being kept in memory, so restarts of pods may result in duplicate articles.

build and deploy

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-simple-deduplication-filter).

./mvnw clean package jib:build \ \ \

The docker image has been pushed to But we also want to use the metadata jar (target/gdelt-article-simple-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application“:


Select “Register one or more applications”:

Register the app (our prebuild image + jar files) using:

  • Name: gdelt-article-simple-deduplication-filter
  • Type: Processor
  • URI: docker:syscrest/gdelt-article-simple-deduplication-filter:latest
  • Metadata URI:


Afterwards browse the application list and click on “gdelt-article-simple-deduplication-filter” to verify that all configuration options have been picked up from the metadata jar file:


Creating a stream

Let’s enhance our stream from the last blog post (Implementing a custom reactive source application) and plug in our filtering processor:

gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" | gdelt-article-simple-deduplication-filter --time-to-live=1440 > :filtered-gdelt-articles


Save (we chose “stream-filtered” as its name) (do check the “deploy” box yet). On the deployment configuration page make sure to just create a single instance per application (There will be a new blog post about scaling and proper routing later on) and add a required deployer property (as the source,processor and the starter apps are spring 2.0-based):



Afterwards we can use kubectl to peek into the deduplication-filter pod:

kubectl -n scdf-170 get pods

stream-filtered-gdelt-article-feed-source-6cd4468594-c5p9j        1/1       Running   0          16m
stream-filtered-gdelt-article-simple-deduplication-filter-srx7p   1/1       Running   0          16m
stream-filtered-splitter-6df58fbcc7-zjckp                         1/1       Running   0          16m

kubectl -n scdf-170 logs -f stream-filtered-gdelt-article-simple-deduplication-filter-srx7p

first pull – let the articles pass:

.... : seen for first time, passing thru = GDELTArticle [url=, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States,, seendate=20181216T191500Z]

subsequent pulls (still containing the article – drop them):

.... :already seen 2 times, filtering out article = GDELTArticle [url=, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States,, seendate=20181216T191500Z]

GDELT on SCDF : Implementing a reactive source application

In the second part of our blog post series “processing GDELT data with SCDF on kubernetes” we will create a custom source application based on spring cloud stream to pull GDELT Data and use it in a very simple flow.

Source Code

You can find the source code on github:

git clone

cd gdelt-article-feed-source

maven project setup

The project will be based on Spring Cloud Stream and we wil use the spring cloud dependency management with the latest spring cloud relase Finchley.SR2:


Even if the implementation itself is not kafka-specific (more on binder abstraction) we include the Spring Cloud Kafka Binder directly in our project to build artifacts deployable on our target setup (Kubernetes + Kafka). We also add reactive programming support to leverage the Java Integration DSL for our source implementation.



Use the spring boot maven plugin to package the application itself:


Besides the actual fat jar (that will be dockerized later on) we also want to create a so called “metadata-only” jar using the Spring Cloud Stream & Task Metadata Plugin to aggregate spring boot metadata into a seperate lightweight jar. As we are using kubernetes as our deployment target we need to provide docker-based artifacts for deployment. Spring cloud data flow can not determine the actual configuration properties of an application directly from the docker image, but you can provide an additional jar besides the docker image to provide the necessary metadata about configuration options (names, descriptions, default values).


Dockerize the spring boot application using Google’s JIB Maven Plugin:



Spring Cloud Stream provides a couple of ways to implement a source application. Besides native spring cloud stream annotation you are also free to use Spring Integration or reactive apis. We chose
to implement our custom source utilizing the Spring Integration Java DSL as it resulted in very few lines of wrapper code to emit a Array Of GDELTArticle objects. Read more about reactive spring cloud sources here.

public class GDELTSourceApplication {

private GDELTSourceProperties properties;

public Publisher<Message<List<GDELTArticle>>> emit() {
   return IntegrationFlows.from(() -> {
       try {
           URL feedUrl = new URL(""
                  + URLEncoder.encode(properties.getQuery(), "UTF-8")
                  + "&mode=artlist&maxrecords=250&timespan=1h&sort=datedesc&format=json");
 "going to fetch data from using url = " + feedUrl);
           InputStream inputStreamObject = feedUrl.openStream();
           BufferedReader streamReader = new BufferedReader(new InputStreamReader(inputStreamObject, "UTF-8"));
           StringBuilder responseStrBuilder = new StringBuilder();
           String inputStr;
           while ((inputStr = streamReader.readLine()) != null) {
           JSONObject jsonObject = new JSONObject(responseStrBuilder.toString());
           JSONArray articles = jsonObject.getJSONArray("articles");
           List<GDELTArticle> response = new ArrayList<>();
           for (int i = 0; i < articles.length(); i++) {
               JSONObject article = articles.getJSONObject(i);
               GDELTArticle a = new GDELTArticle();
           return new GenericMessage<>(response);
        } catch (Exception e) {
            logger.error("", e);
            return new GenericMessage<>(null);
   }, e -> e.poller(p -> p.fixedDelay(, TimeUnit.SECONDS))).toReactivePublisher();

We want our source application to properly expose it’s configuration parameters, so we create a dedicated configuration property. The javadoc comment of the members will be displayed as the description and the initial values will automatically noted as the default values (no need to mention them in the javadoc description):

package com.syscrest.blogposts.scdf.gdeltsource;
public class GDELTSourceProperties {

 * The query to use to select data.
 * Example: ("climate change" or "global warming")
private String query = "climate change";

 * The delay between pulling data from gdelt (in seconds).
private long triggerDelay = 120L;

/* ... setter and getter omitted ... */


As Spring boot applications are aware of a lot common configuration properties, we create a file named META-INF/ to explictly limit the displayed configuration options to our class (read more about whitelisting here).


Build the project

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-feed-source).

./mvnw clean package jib:build \ \ \

The output should look like this:

[INFO] Containerizing application to syscrest/gdelt-article-feed-source...
[INFO] Retrieving registry credentials for
[INFO] Building classes layer...
[INFO] Building resources layer...
[INFO] Getting base image
[INFO] Building dependencies layer...
[INFO] Finalizing...
[INFO] Container entrypoint set to [java, -cp, /app/resources/:/app/classes/:/app/libs/*, com.syscrest.blogposts.scdf.gdeltsource.GDELTSourceApplication]
[INFO] Built and pushed image as syscrest/gdelt-article-feed-source
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------

The docker image has been pushed to But we also want to use the metadata jar (target/gdelt-article-feed-source-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application“:


Select “Register one or more applications”:

Register the app using:

  • Name: gdelt-article-feed-source
  • Type: Source
  • URI: docker:syscrest/gdelt-article-feed-source:latest
  • Metadata URI:

Afterwards browse the application list and click on “gdelt-article-feed-source”:


to verify that all configuration options have been picked up from the metadata jar file:


Creating a stream

Let’s create a simple stream that uses our custom application as the source and use the very basic log processor to just dump the messages into the logfile of a pod. Select Streams on the left sidebar and then click Create stream(s):


Just copy and paste our example to query all current articles containing ‘climate change’ into the textbox:

gdelt-article-feed-source --trigger-delay=300 --query='climate change' | log


You can also just type and use the autocompletion:


Afterwards save the stream (don’t check ‘Deploy Stream(s)’):


Locate your previously saved stream in the stream list:


When you click on deploy you can define deployment specific settings like memory and cpu assignments (not necessary , default values are sufficient):


But you will need to add a special deployer properties (as your source and the starter apps are spring 2.0-based):



Your spring cloud data flow instance will now deploy pods in the same namespace it’s running using the stream name plus source/processor names:

kubectl -n scdf-170 get pods

NAME                                                      READY     STATUS             RESTARTS   AGE
gdelt-stream-1-log-gdelt-article-feed-source-6c457dfb9f-v28ls   0/1       Running            0          1m
gdelt-stream-1-log-log-64cfc8bc-bb6n9                           0/1       Running            0          1m


Let’s peek into the “log” pod the see the data that has been emitted by our custom source:

kubectl -n scdf-170 logs -f gdelt-demo-1-log-7999bb94d8-9dcw6

output (reformatted for better readability):

2018-11-30 23:06:03.447  INFO 1 --- [container-0-C-1] log-sink
      "title":"Angela Merkel nach G20 : „ Forbes  kürt sie zur mächtigsten Frau der Welt - News",
      "title":"COP24 rät wegen Klimawandel zur weniger Fleischkonsum",

Let’s create a slightly improved version that splits the array of GDELTArticle into separate messages using the splitter starter app and channel these messages into an explicit topic named climate-change-articles:

gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" > :climate-change-articles


If you peek into the topic you can see that each message just contains a single article:

{"url":"","title":"Biomass Power Generation Market – Global Industry Analysis , Size , Share , Growth , Trends and Forecast 2018 – 2022 – Advertising Market","language":"English","sourcecountry":"","domain":"","seendate":"20181205T120000Z"}
{"url":"","title":"In the news today , Dec . 5","language":"English","sourcecountry":"Canada","domain":"","seendate":"20181205T120000Z"}
{"url":"","title":"Manovra | rivoluzione congedo parentale Si potrà lavorare fino al parto","language":"Italian","sourcecountry":"Italy","domain":"","seendate":"20181205T120000Z"}
{"url":",-AGTF-to-provide-funds-for-NEP","title":"AfDB , AGTF to provide funds for NEP - BusinessGhana News","language":"English","sourcecountry":"Ghana","domain":"","seendate":"20181205T120000Z"}
{"url":"","title":"Daily Star Opinions : Overview with Gwyne Dyer","language":"English","sourcecountry":"Philippines","domain":"","seendate":"20181205T120000Z"}

You will notice that calling the gdelt endpoint continuously will result in a lot of duplicate articles … we will implement a filter/deduplication processor in one of the next SCDF on GDELT blog posts.

GDELT on SCDF : Bootstrapping spring cloud data flow 1.7.0 on kubernetes using kubectl

In the first part of our planned blog posts (processing GDELT data with SCDF on kubernetes) we go through the steps to deploy the latest Spring Cloud Data Flow (SCDF) Release 1.7.0 on Kubernetes , including the latest version of starter apps that will be used in the examples.

We stick to the manual steps described here in the official spring cloud dataflow documentation to deploy all components to our kubernetes cluster into a dedicated namespace scdf-170 to run the examples.

This installation will not be production-ready, it is about experimenting and to ensure compability as we experienced some incompabilities mixing own source/sink implementations based on Finchley.SR2 and the prepackaged Starter Apps based on Spring Boot 1.5 / Spring Cloud Streams 1.3.X.


Clone the git repository to retrieve the neccessary kubernetes configuration files and switch to the 1.7.0.RELEASE branch:

git clone
cd spring-cloud-dataflow-server-kubernetes
git checkout v1.7.0.RELEASE

installation with kubectl

We want to use a dedicated namespace scdf-170 for our deployment, so we create it first:

echo '{ "kind": "Namespace", "apiVersion": "v1", "metadata": { "name": "scdf-170", "labels": { "name": "scdf-170" } } }' | kubectl create -f -

Afterwards we can deploy the dependencies (kafka/mysql/redis) and the spring cloud dataflow server itself:

kubectl create -n scdf-170 -f src/kubernetes/kafka/
kubectl create -n scdf-170 -f src/kubernetes/mysql/
kubectl create -n scdf-170 -f src/kubernetes/redis/
kubectl create -n scdf-170 -f src/kubernetes/metrics/metrics-svc.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-roles.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-rolebinding.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/service-account.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-config-kafka.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-svc.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-deployment.yaml

verify and enable access

kubectl -n scdf-170 get all

Output should look like:

NAME                                READY     STATUS    RESTARTS   AGE
pod/kafka-broker-696786c8f7-fjp4p   1/1       Running   1          5m
pod/kafka-zk-5f9bff7d5-tmxzg        1/1       Running   0          5m
pod/mysql-f878678df-2t4d6           1/1       Running   0          5m
pod/redis-748db48b4f-8h75x          1/1       Running   0          5m
pod/scdf-server-757ccb576c-9fssd    1/1       Running   0          5m
NAME                  TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/kafka         ClusterIP             9092/TCP                     5m
service/kafka-zk      ClusterIP             2181/TCP,2888/TCP,3888/TCP   5m
service/metrics       ClusterIP            80/TCP                       5m
service/mysql         ClusterIP            3306/TCP                     5m
service/redis         ClusterIP              6379/TCP                     5m
service/scdf-server   LoadBalancer        80:30884/TCP                 5m
NAME                           DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-broker   1         1         1            1           5m
deployment.apps/kafka-zk       1         1         1            1           5m
deployment.apps/mysql          1         1         1            1           5m
deployment.apps/redis          1         1         1            1           5m
deployment.apps/scdf-server    1         1         1            1           5m
NAME                                      DESIRED   CURRENT   READY     AGE
replicaset.apps/kafka-broker-696786c8f7   1         1         1         5m
replicaset.apps/kafka-zk-5f9bff7d5        1         1         1         5m
replicaset.apps/mysql-f878678df           1         1         1         5m
replicaset.apps/redis-748db48b4f          1         1         1         5m
replicaset.apps/scdf-server-757ccb576c    1         1         1         5m

As the scdf server will automatically generate a random password for the user “user” on first startup, we need to grep the log output (using your individual scdf-service pod name, see output of previous command):

kubectl logs -n scdf-170 scdf-server-757ccb576c-9fssd | grep "Using default security password"

output should look like

Using default security password: 8d6b58fa-bf1a-4a0a-a42b-c7b03ab15c60 with roles 'VIEW,CREATE,MANAGE'

To access the UI (and rest-api for the cli) you can create a port-forward to the scdf-server pod (using your individual scdf-service pod name, see output of “kubectl -n scdf-170 get all”):

kubectl -n scdf-170 port-forward scdf-server-757ccb576c-9fssd 2345:80

where 2345 would be the local port on your machine where you can now access the ui and the rest-api.

Install the dataflow cli

You can download the jar file directly from spring’s maven repository:


and start the cli:

java -jar spring-cloud-dataflow-shell-1.7.0.RELEASE.jar \
--dataflow.username=user \
--dataflow.password= \

The output should look like this:

 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".

Install starter apps

After the initial installation there are no applications (task/stream) registed. As you can see here (for stream apps), you need to pick a explicit combination of the packaging format (jar vs docker) and the messaging technology (Kafka vs RabbitMQ).

We will go for docker (because we are running on Kubernetes) and Kafka 0.10 as the messaging layer based on newer Spring Boot and Spring Cloud Stream versions (2.0.x + 2.0.x).

We can register all the available starter apps with a single cli command:

dataflow:>app import --uri


Successfully registered .........
...... processor.pmml.metadata, sink.router.metadata, sink.mongodb]

and the Spring Cloud Task Starter Apps (based on Spring Boot 2.0.x + Spring Cloud Task 2.0.x):

dataflow:> app import --uri

Successfully registered ..........
........ task.timestamp, task.timestamp.metadata]

You can verify the installation by listing all available applications:

dataflow:>app list

The Output should look like this:

║app│    source    │         processor         │           sink           │        task        ║
║   │file          │bridge                     │aggregate-counter         │composed-task-runner║
║   │ftp           │filter                     │counter                   │timestamp           ║
║   │gemfire       │groovy-filter              │field-value-counter       │timestamp-batch     ║
║   │gemfire-cq    │groovy-transform           │file                      │                    ║
║   │http          │grpc                       │ftp                       │                    ║
║   │jdbc          │header-enricher            │gemfire                   │                    ║
║   │jms           │httpclient                 │hdfs                      │                    ║
║   │load-generator│image-recognition          │jdbc                      │                    ║
║   │loggregator   │object-detection           │log                       │                    ║
║   │mail          │pmml                       │mongodb                   │                    ║
║   │mongodb       │python-http                │mqtt                      │                    ║
║   │mqtt          │python-jython              │pgcopy                    │                    ║
║   │rabbit        │scriptable-transform       │rabbit                    │                    ║
║   │s3            │splitter                   │redis-pubsub              │                    ║
║   │sftp          │tasklaunchrequest-transform│router                    │                    ║
║   │syslog        │tcp-client                 │s3                        │                    ║
║   │tcp           │tensorflow                 │sftp                      │                    ║
║   │tcp-client    │transform                  │task-launcher-cloudfoundry│                    ║
║   │time          │twitter-sentiment          │task-launcher-local       │                    ║
║   │trigger       │                           │task-launcher-yarn        │                    ║
║   │triggertask   │                           │tcp                       │                    ║
║   │twitterstream │                           │throughput                │                    ║
║   │              │                           │websocket                 │                    ║

When you start using the starter apps in your first test stream definitions be aware that you need to add a special deployer property on the deployment screen (because we are using the spring 2 based starter apps):



You can read more about the internal changes between 1.5 and 2.0 on the health and readiness probes here.

Continue reading on how to implement a custom source application using the reactive framework on spring cloud streams in our next blog post: Implementing a custom reactive source application for spring cloud data flow.

Blog post series: Processing feeds with Spring Cloud Data Flow on Kubernetes

We are starting a blog post series to dig deeper into the capabilities of Spring Cloud Data Flow (SCDF) running on Kubernetes. This blog post will be updated when new posts have been published.

List of blog posts for quick access:


Let’s have a look at the data we want to process:

Supported by Google Jigsaw, the GDELT Project monitors the world’s broadcast, print, and web news from nearly every corner of every country in over 100 languages and identifies the people, locations, organizations, themes, sources, emotions, counts, quotes, images and events driving our global society every second of every day, creating a free open platform for computing on the entire world.

Besides raw data feeds there are powerful APIs to query and even to visualize the GDELT datasets. This Blog post is inspired by the “rss feed for web archiving coverage about climate change” taken from a blog post on

This searches for all articles published in the last hour mentioning
“climate change” or “global warming” and returns the first 200 articles,
ordered by date with the newest articles first and returned as an RSS feed
that includes the primary URL of each article as one item and, as a separate
item, the URL of the mobile/AMP edition of the page, if available.
This demonstrates how to use the API as a data source for web archiving.

We want to use this type of query to pull the latest articles for a configurable query from the GDELT project and do some complex processing on Spring Cloud Data Flow (SCDF). We will download the articles, do some analysis on the content, store and also visualize it.

About Spring Cloud Data Flow (SCDF)

Taken from

Spring Cloud Data Flow is a toolkit for building data integration
and real-time data processing pipelines.

Pipelines consist of Spring Boot apps, built using the Spring Cloud Stream
or Spring Cloud Task microservice frameworks. This makes Spring Cloud Data Flow
suitable for a range of data processing use cases, from import/export to
event streaming and predictive analytics.

Part 1 : Setting up the runtime environment

As there are multiple platform implementations available, it can run your streams/tasks on a Local Server, on CloudFoundry, on Kubernetes, on YARN and even MESOS

As these different implementations require different packaging (jar vs docker), we went for kubernetes as the platform and Kafka as the messaging solution in our examples.

You can find instructions to install Spring Cloud Data Flow in our first blog post about GDELT on SCDF: Bootstrapping SCDF on Kubernetes using KUBECTL.

Part 2 : How to pull gdelt data into SCDF

Continue reading on how to implement a custom source application using the reactive framework on spring cloud streams: Implementing a custom reactive source application for spring cloud data flow.

Part 3 : How to filter duplicates

The next planned blog post will continue to enhance your first stream definition by adding a custom processor to drop duplicate articles. Stay tuned.

how to use dynamic allocation in a oozie spark action on CDH5

using spark’s dynamic allocation feature in a oozie spark action can be a tricky.

enable dynamic allocation

First you need to make sure that dynamic allocation is actually available on your cluster. Navigate to your “Spark” service, then “Configuration” and search for “dynamic”.


Both (shuffle service + dynamic allocation) needs to be enabled.

how to configure the oozie spark action

If you just omit --num-executors in your spark-args definition your job will fall back to configuration defaults and will utilize only two executors:


If you go for --num-executors 0 your oozie workflow will fail with this strange error message:

Number of executors was 0, but must be at least 1 (or 0 if dynamic executor allocation is enabled). 

Usage: org.apache.spark.deploy.yarn.Client 

[options] Options: 
--jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode) 
--class CLASS_NAME Name of your application's main class (required) 
--primary-py-file A main Python file 
--primary-r-file A main R file 
--arg ARG Argument to be passed to

So even if you enabled the shuffle service on your cluster and set spark.dynamicAllocation.enabled to true, the spark action seems unaware of these settings.

adding missing configuration properties

To get this working you need to provide spark.dynamicAllocation.enabled=true and spark.shuffle.service.enabled=true together with --num-executors 0 in the spark-args section:

... your job specific options ...
--num-executors 0 
--conf spark.dynamicAllocation.enabled=true 
--conf spark.shuffle.service.enabled=true 

spark oozie action jobs not showing up on spark history server

If you execute spark jobs within an oozie workflow using a <spark> action node on a Cloudera CDH5 cluster, your job may not show up on your spark history server. Even if you configured all these things using the cloudera manager, your history server may only lists jobs started on the commandline using spark-submit.

adding missing configuration properties

When using mode = cluster and master = yarn-cluster or yarn-client you need to provide spark.yarn.historyServer.addres and spark.eventLog.dir with the actual adress of your spark history server and the fully qualified hdfs path of the applicationHistory directory and set spark.eventLog.enabled to true.

... your job specific options ...
--conf spark.yarn.historyServer.address=http://historyservernode:18088
--conf spark.eventLog.dir=hdfs://nameservicehost:8020/user/spark/applicationHistory 
--conf spark.eventLog.enabled=true

With these additional configuration properties these oozie-controlled spark jobs should also show up on your spark history server.

fixing spark classpath issues on CDH5 accessing Accumulo 1.7.2

We experienced some strange NoSuchMethorError while migrating a Accumulo based application from 1.6.0 to 1.7.2 running on CDH5. A couple of code changes where necessary moving from 1.6.0 to 1.7.2, but these were pretty straightforward (members visibility changed, some getters were introduced). Everything compiled fine, but when we executed the spark application on the cluster we got an exception that was pointing directly to a line we changed during the migration:

Exception in thread "main" java.lang.NoSuchMethodError: 
        at com.syscrest.HelloworldDriver.main(
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(
        at java.lang.reflect.Method.invoke(
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

We double checked our fat jar that we bundle the right version and then we checked the full classpath of the CDH5 spark service.

the distributed spark classpath on CDH5

We noticed that CDH5 (CDH 5.8.2 in our case) already bundles four accumulo jars:


And all jars in /opt/cloudera/parcels/cdh/jars are automatically inserted into the spark distributed classpath, as they are listed in /etc/spark/conf/classpath.txt.


Before tampering with the spark distributed classpath we tried to get it working using

spark-submit \
  --conf "spark.driver.userClassPathFirst=true" \
  --conf "spark.executor.userClassPathFirst=true" \

but this resulted in another mismatch of libraries on the classpath and did not solved our initial problem:

java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(
at org.xerial.snappy.SnappyOutputStream.(
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.rdd.NewHadoopRDD.(NewHadoopRDD.scala:77)
at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)

So we tried to fix the distributed spark classpath directly. Unfortunately /etc/spark/conf/classpath.txt is not modifiable via cloudera manager, and we did not want to change it manually on all nodes.

But as /etc/spark/conf/classpath.txt is being read in /etc/spark/conf/

# Set distribution classpath. This is only used in CDH 5.3 and later.
export SPARK_DIST_CLASSPATH=$(paste -sd: "$SELF/classpath.txt")

we were able to use Spark Service Advanced Configuration Snippet (Safety Valve) for spark-conf/ – Spark (Service-Wide) to tamper with SPARK_DIST_CLASSPATH.

In our case it was sufficient to add the correct accumulo jars (version 1.7.2 from the installed parcel) at the beginning of the list so they take precedence over the outdated ones:


how to collect cloudera manager usage data with google analytics

The Cloudera Manager is already capable of tracking usage data via Google Analytics, but that data is beeing send to a cloudera account. This blog post is about configuring the cloudera manager and changing the tracking id so that these usage metrics are being send to your own account.

Setup Google Analytics

Log into your Google Analytics account and create a new tracking id (looks like UA-XXXXXXX-X).

Enable the tracking of usage data

Log into your Cloudera Manager instance and browse to “Administration” -> “Settings”


and make sure that “Allow Usage Data Collection” is enabled:


replace the tracking id

Now you need to edit a file on your Cloudera Manager server

cd /usr/share/cmf/webapp/static/ext/google-analytics/
nano scmx.js

And replace the existing google apps tracking id (UA-XXXXXXX-X) with your own id and save the file.


You change will be immediately active (no need to restart anything).

Be aware that this change will be overwritten when you update your Cloudera Manager instance so you need to reapply that change after every upgrade.

Patching Oozie in a parcel-based CDH 5.8.0 Installation

This blogpost will guide you to the process of cloning, patching, building and deploying a custom version of the oozie workflow engine based on the cdh 5.8.0 source code that is available on github.
Read more

how to access a remote ha-enabled hdfs in a (oozie) distcp action

how to inject the configuration of a remote ha-hdfs in a distcp call without modifing the local cluster configuration.
Read more