Posts

GDELT on SCDF : Implementing a reactive source application

In the second part of our blog post series “processing GDELT data with SCDF on kubernetes” we will create a custom source application based on spring cloud stream to pull GDELT Data and use it in a very simple flow.

Source Code

You can find the source code on github:


git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow

cd gdelt-article-feed-source

maven project setup

The project will be based on Spring Cloud Stream and we wil use the spring cloud dependency management with the latest spring cloud relase Finchley.SR2:

<properties>
      <spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>
...
<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>${spring-cloud.version}</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

Even if the implementation itself is not kafka-specific (more on binder abstraction) we include the Spring Cloud Kafka Binder directly in our project to build artifacts deployable on our target setup (Kubernetes + Kafka). We also add reactive programming support to leverage the Java Integration DSL for our source implementation.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-reactive</artifactId>
	<optional>true</optional>
</dependency>

Use the spring boot maven plugin to package the application itself:

<plugin>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-maven-plugin</artifactId>
</plugin>

Besides the actual fat jar (that will be dockerized later on) we also want to create a so called “metadata-only” jar using the Spring Cloud Stream & Task Metadata Plugin to aggregate spring boot metadata into a seperate lightweight jar. As we are using kubernetes as our deployment target we need to provide docker-based artifacts for deployment. Spring cloud data flow can not determine the actual configuration properties of an application directly from the docker image, but you can provide an additional jar besides the docker image to provide the necessary metadata about configuration options (names, descriptions, default values).

<plugin>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
   <executions>
       <execution>
	  <id>aggregate-metadata</id>
	  <phase>compile</phase>
	  <goals>
	      <goal>aggregate-metadata</goal>
	  </goals>
       </execution>
    </executions>
 </plugin>

Dockerize the spring boot application using Google’s JIB Maven Plugin:

<plugin>
   <groupId>com.google.cloud.tools</groupId>
   <artifactId>jib-maven-plugin</artifactId>
   <version>0.9.9</version>
   <configuration>
      <to>
         <image>registry.hub.docker.com/syscrest/gdelt-article-feed-source</image>
      </to>
   </configuration>
</plugin>

Implementation

Spring Cloud Stream provides a couple of ways to implement a source application. Besides native spring cloud stream annotation you are also free to use Spring Integration or reactive apis. We chose
to implement our custom source utilizing the Spring Integration Java DSL as it resulted in very few lines of wrapper code to emit a Array Of GDELTArticle objects. Read more about reactive spring cloud sources here.

@EnableConfigurationProperties(GDELTSourceProperties.class)
@EnableBinding(Source.class)
@SpringBootApplication
public class GDELTSourceApplication {

@Autowired
private GDELTSourceProperties properties;

@StreamEmitter
@Output(Source.OUTPUT)
@Bean
public Publisher<Message<List<GDELTArticle>>> emit() {
   return IntegrationFlows.from(() -> {
       try {
           URL feedUrl = new URL("https://api.gdeltproject.org/api/v2/doc/doc?query="
                  + URLEncoder.encode(properties.getQuery(), "UTF-8")
                  + "&mode=artlist&maxrecords=250&timespan=1h&sort=datedesc&format=json");
           logger.info("going to fetch data from gdeltproject.org using url = " + feedUrl);
           InputStream inputStreamObject = feedUrl.openStream();
           BufferedReader streamReader = new BufferedReader(new InputStreamReader(inputStreamObject, "UTF-8"));
           StringBuilder responseStrBuilder = new StringBuilder();
           String inputStr;
           while ((inputStr = streamReader.readLine()) != null) {
               responseStrBuilder.append(inputStr);
           }
           JSONObject jsonObject = new JSONObject(responseStrBuilder.toString());
           JSONArray articles = jsonObject.getJSONArray("articles");
           List<GDELTArticle> response = new ArrayList<>();
           for (int i = 0; i < articles.length(); i++) {
               JSONObject article = articles.getJSONObject(i);
               GDELTArticle a = new GDELTArticle();
               a.setUrl(article.getString("url"));
               a.setTitle(article.getString("title"));
               a.setDomain(article.getString("domain"));
               a.setSourcecountry(article.getString("sourcecountry"));
               a.setLanguage(article.getString("language"));
               a.setSeendate(article.getString("seendate"));
               response.add(a);
           }
           return new GenericMessage<>(response);
        } catch (Exception e) {
            logger.error("", e);
            return new GenericMessage<>(null);
        } 
   }, e -> e.poller(p -> p.fixedDelay(this.properties.getTriggerDelay(), TimeUnit.SECONDS))).toReactivePublisher();
}

We want our source application to properly expose it’s configuration parameters, so we create a dedicated configuration property. The javadoc comment of the members will be displayed as the description and the initial values will automatically noted as the default values (no need to mention them in the javadoc description):

package com.syscrest.blogposts.scdf.gdeltsource;
...
@ConfigurationProperties("gdelt")
@Validated
public class GDELTSourceProperties {

/**
 * The query to use to select data.
 * 
 * Example: ("climate change" or "global warming")
 * 
 */
private String query = "climate change";

/**
 * The delay between pulling data from gdelt (in seconds).
 */
private long triggerDelay = 120L;

/* ... setter and getter omitted ... */

}

As Spring boot applications are aware of a lot common configuration properties, we create a file named META-INF/spring-configuration-metadata-whitelist.properties to explictly limit the displayed configuration options to our class (read more about whitelisting here).

configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltsource.GDELTSourceProperties

Build the project

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-feed-source).


./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Djib.to.configuration.to.image=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-feed-source

The output should look like this:

....
....
[INFO]
[INFO] Containerizing application to syscrest/gdelt-article-feed-source...
[INFO]
[INFO] Retrieving registry credentials for registry.hub.docker.com...
[INFO] Building classes layer...
[INFO] Building resources layer...
[INFO] Getting base image gcr.io/distroless/java...
[INFO] Building dependencies layer...
[INFO] Finalizing...
[INFO]
[INFO] Container entrypoint set to [java, -cp, /app/resources/:/app/classes/:/app/libs/*, com.syscrest.blogposts.scdf.gdeltsource.GDELTSourceApplication]
[INFO]
[INFO] Built and pushed image as syscrest/gdelt-article-feed-source
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

The docker image has been pushed to hub.docker.com. But we also want to use the metadata jar (target/gdelt-article-feed-source-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application“:

scdf-gdelt-source-add-application

Select “Register one or more applications”:

scdf-gdelt-source-add-application-register-one-application
Register the app using:

  • Name: gdelt-article-feed-source
  • Type: Source
  • URI: docker:syscrest/gdelt-article-feed-source:latest
  • Metadata URI: http://www.syscrest.com/gdelt-on-scdf-artifacts/gdelt-article-feed-source-1.0.0-SNAPSHOT-metadata.jar

scdf-gdelt-source-add-application-registration-details
Afterwards browse the application list and click on “gdelt-article-feed-source”:

scdf-gdelt-source-add-application-verify

to verify that all configuration options have been picked up from the metadata jar file:

scdf-gdelt-source-add-application-verify-options

Creating a stream

Let’s create a simple stream that uses our custom application as the source and use the very basic log processor to just dump the messages into the logfile of a pod. Select Streams on the left sidebar and then click Create stream(s):

scdf-gdelt-source-add-stream

Just copy and paste our example to query all current articles containing ‘climate change’ into the textbox:


gdelt-article-feed-source --trigger-delay=300 --query='climate change' | log

scdf-gdelt-source-add-stream-stream-1-just-log

You can also just type and use the autocompletion:

scdf-gdelt-source-add-stream-stream-1-just-log-autocomplete

Afterwards save the stream (don’t check ‘Deploy Stream(s)’):

scdf-gdelt-source-add-stream-stream-1-just-log-save-stream

Locate your previously saved stream in the stream list:

scdf-gdelt-source-add-stream-stream-1-just-log-in-list

When you click on deploy you can define deployment specific settings like memory and cpu assignments (not necessary , default values are sufficient):

scdf-gdelt-source-add-stream-stream-1-just-log-deployment-options

Your spring cloud data flow instance will now deploy pods in the same namespace it’s running using the stream name plus source/processor names:


kubectl -n scdf-170 get pods

NAME                                                      READY     STATUS             RESTARTS   AGE
...
gdelt-stream-1-log-gdelt-article-feed-source-6c457dfb9f-v28ls   0/1       Running            0          1m
gdelt-stream-1-log-log-64cfc8bc-bb6n9                           0/1       Running            0          1m

...

Let’s peek into the “log” pod the see the data that has been emitted by our custom source:


kubectl -n scdf-170 logs -f gdelt-demo-1-log-7999bb94d8-9dcw6

output (reformatted for better readability):

2018-11-30 23:06:03.447  INFO 1 --- [container-0-C-1] log-sink
: 
[  
...
   {  
      "url":"https://www.rga.de/politik/angela-merkel-nach-g20-forbes-kuert-sie-zur-maechtigsten-frau-welt-news-zr-10773201.html",
      "title":"Angela Merkel nach G20 : „ Forbes  kürt sie zur mächtigsten Frau der Welt - News",
      "language":"German",
      "sourcecountry":"Germany",
      "domain":"rga.de",
      "seendate":"20181205T113000Z"
   },
   {  
      "url":"https://www.vienna.at/cop24-raet-wegen-klimawandel-zur-weniger-fleischkonsum/6022139",
      "title":"COP24 rät wegen Klimawandel zur weniger Fleischkonsum",
      "language":"German",
      "sourcecountry":"Austria",
      "domain":"vienna.at",
      "seendate":"20181205T113000Z"
   },
  ...
...
]

Let’s create a slightly improved version that splits the array of GDELTArticle into separate messages using the splitter starter app and channel these messages into an explicit topic named climate-change-articles:


gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" > :climate-change-articles

scdf-gdelt-source-add-stream-stream-2-with-split

If you peek into the topic you can see that each message just contains a single article:

{"url":"https://advertisingmarket24.com/biomass-power-generation-market-global-industry-analysis-size-share-growth-trends-and-forecast-2018-2022/807358/","title":"Biomass Power Generation Market – Global Industry Analysis , Size , Share , Growth , Trends and Forecast 2018 – 2022 – Advertising Market","language":"English","sourcecountry":"","domain":"advertisingmarket24.com","seendate":"20181205T120000Z"}
{"url":"https://www.princegeorgecitizen.com/news/national-news/in-the-news-today-dec-5-1.23519640","title":"In the news today , Dec . 5","language":"English","sourcecountry":"Canada","domain":"princegeorgecitizen.com","seendate":"20181205T120000Z"}
{"url":"https://www.zazoom.it/2018-12-05/manovra-rivoluzione-congedo-parentale-si-potra-lavorare-fino-al-parto/4953509/","title":"Manovra | rivoluzione congedo parentale Si potrà lavorare fino al parto","language":"Italian","sourcecountry":"Italy","domain":"zazoom.it","seendate":"20181205T120000Z"}
{"url":"http://www.businessghana.com/site/news/business/177668/AfDB,-AGTF-to-provide-funds-for-NEP","title":"AfDB , AGTF to provide funds for NEP - BusinessGhana News","language":"English","sourcecountry":"Ghana","domain":"businessghana.com","seendate":"20181205T120000Z"}
{"url":"http://visayandailystar.com/2018/December/05/overview.htm","title":"Daily Star Opinions : Overview with Gwyne Dyer","language":"English","sourcecountry":"Philippines","domain":"visayandailystar.com","seendate":"20181205T120000Z"}

You will notice that calling the gdelt endpoint continuously will result in a lot of duplicate articles … we will implement a filter/deduplication processor in one of the next SCDF on GDELT blog posts.

GDELT on SCDF : Bootstrapping spring cloud data flow 1.7.0 on kubernetes using kubectl

In the first part of our planned blog posts (processing GDELT data with SCDF on kubernetes) we go through the steps to deploy the latest Spring Cloud Data Flow (SCDF) Release 1.7.0 on Kubernetes , including the latest version of starter apps that will be used in the examples.

We stick to the manual steps described here in the official spring cloud dataflow documentation to deploy all components to our kubernetes cluster into a dedicated namespace scdf-170 to run the examples.

This installation will not be production-ready, it is about experimenting and to ensure compability as we experienced some incompabilities mixing own source/sink implementations based on Finchley.SR2 and the prepackaged Starter Apps based on Spring Boot 1.5 / Spring Cloud Streams 1.3.X.

Preparations

Clone the git repository to retrieve the neccessary kubernetes configuration files and switch to the 1.7.0.RELEASE branch:


git clone https://github.com/spring-cloud/spring-cloud-dataflow-server-kubernetes
cd spring-cloud-dataflow-server-kubernetes
git checkout v1.7.0.RELEASE

installation with kubectl

We want to use a dedicated namespace scdf-170 for our deployment, so we create it first:


echo '{ "kind": "Namespace", "apiVersion": "v1", "metadata": { "name": "scdf-170", "labels": { "name": "scdf-170" } } }' | kubectl create -f -

Afterwards we can deploy the dependencies (kafka/mysql/redis) and the spring cloud dataflow server itself:


kubectl create -n scdf-170 -f src/kubernetes/kafka/
kubectl create -n scdf-170 -f src/kubernetes/mysql/
kubectl create -n scdf-170 -f src/kubernetes/redis/
kubectl create -n scdf-170 -f src/kubernetes/metrics/metrics-svc.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-roles.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-rolebinding.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/service-account.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-config-kafka.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-svc.yaml
kubectl create -n scdf-170 -f src/kubernetes/server/server-deployment.yaml

verify and enable access


kubectl -n scdf-170 get all

Output should look like:

NAME                                READY     STATUS    RESTARTS   AGE
pod/kafka-broker-696786c8f7-fjp4p   1/1       Running   1          5m
pod/kafka-zk-5f9bff7d5-tmxzg        1/1       Running   0          5m
pod/mysql-f878678df-2t4d6           1/1       Running   0          5m
pod/redis-748db48b4f-8h75x          1/1       Running   0          5m
pod/scdf-server-757ccb576c-9fssd    1/1       Running   0          5m
 
NAME                  TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
service/kafka         ClusterIP      10.98.11.191             9092/TCP                     5m
service/kafka-zk      ClusterIP      10.111.45.25             2181/TCP,2888/TCP,3888/TCP   5m
service/metrics       ClusterIP      10.104.11.216            80/TCP                       5m
service/mysql         ClusterIP      10.99.141.105            3306/TCP                     5m
service/redis         ClusterIP      10.99.0.146              6379/TCP                     5m
service/scdf-server   LoadBalancer   10.101.180.212        80:30884/TCP                 5m
 
NAME                           DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-broker   1         1         1            1           5m
deployment.apps/kafka-zk       1         1         1            1           5m
deployment.apps/mysql          1         1         1            1           5m
deployment.apps/redis          1         1         1            1           5m
deployment.apps/scdf-server    1         1         1            1           5m
 
NAME                                      DESIRED   CURRENT   READY     AGE
replicaset.apps/kafka-broker-696786c8f7   1         1         1         5m
replicaset.apps/kafka-zk-5f9bff7d5        1         1         1         5m
replicaset.apps/mysql-f878678df           1         1         1         5m
replicaset.apps/redis-748db48b4f          1         1         1         5m
replicaset.apps/scdf-server-757ccb576c    1         1         1         5m

As the scdf server will automatically generate a random password for the user “user” on first startup, we need to grep the log output (using your individual scdf-service pod name, see output of previous command):


kubectl logs -n scdf-170 scdf-server-757ccb576c-9fssd | grep "Using default security password"

output should look like

Using default security password: 8d6b58fa-bf1a-4a0a-a42b-c7b03ab15c60 with roles 'VIEW,CREATE,MANAGE'

To access the UI (and rest-api for the cli) you can create a port-forward to the scdf-server pod (using your individual scdf-service pod name, see output of “kubectl -n scdf-170 get all”):


kubectl -n scdf-170 port-forward scdf-server-757ccb576c-9fssd 2345:80

where 2345 would be the local port on your machine where you can now access the ui and the rest-api.

Install the dataflow cli

You can download the jar file directly from spring’s maven repository:


wget https://repo.spring.io/release/org/springframework/cloud/spring-cloud-dataflow-shell/1.7.0.RELEASE/spring-cloud-dataflow-shell-1.7.0.RELEASE.jar

and start the cli:


java -jar spring-cloud-dataflow-shell-1.7.0.RELEASE.jar \
--dataflow.username=user \
--dataflow.password= \
--dataflow.uri=http://localhost:2345

The output should look like this:

   ...
 / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |
 \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |
  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |
 |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|
  ____ |_|    _          __|___/                 __________
 |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \
 | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \
 | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /
 |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/
 
1.7.0.RELEASE
 
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>

Install starter apps

After the initial installation there are no applications (task/stream) registed. As you can see here (for stream apps), you need to pick a explicit combination of the packaging format (jar vs docker) and the messaging technology (Kafka vs RabbitMQ).

We will go for docker (because we are running on Kubernetes) and Kafka 0.10 as the messaging layer based on newer Spring Boot and Spring Cloud Stream versions (2.0.x + 2.0.x).

We can register all the available starter apps with a single cli command:

dataflow:>app import --uri http://bit.ly/Darwin-SR2-stream-applications-kafka-docker

Output:

Successfully registered .........
.................................
...... processor.pmml.metadata, sink.router.metadata, sink.mongodb]

and the Spring Cloud Task Starter Apps (based on Spring Boot 2.0.x + Spring Cloud Task 2.0.x):


dataflow:> app import --uri http://bit.ly/Dearborn-SR1-task-applications-docker

Successfully registered ..........
..................................
........ task.timestamp, task.timestamp.metadata]

You can verify the installation by listing all available applications:


dataflow:>app list

The Output should look like this:

╔═══╤══════════════╤═══════════════════════════╤══════════════════════════╤════════════════════╗
║app│    source    │         processor         │           sink           │        task        ║
╠═══╪══════════════╪═══════════════════════════╪══════════════════════════╪════════════════════╣
║   │file          │bridge                     │aggregate-counter         │composed-task-runner║
║   │ftp           │filter                     │counter                   │timestamp           ║
║   │gemfire       │groovy-filter              │field-value-counter       │timestamp-batch     ║
║   │gemfire-cq    │groovy-transform           │file                      │                    ║
║   │http          │grpc                       │ftp                       │                    ║
║   │jdbc          │header-enricher            │gemfire                   │                    ║
║   │jms           │httpclient                 │hdfs                      │                    ║
║   │load-generator│image-recognition          │jdbc                      │                    ║
║   │loggregator   │object-detection           │log                       │                    ║
║   │mail          │pmml                       │mongodb                   │                    ║
║   │mongodb       │python-http                │mqtt                      │                    ║
║   │mqtt          │python-jython              │pgcopy                    │                    ║
║   │rabbit        │scriptable-transform       │rabbit                    │                    ║
║   │s3            │splitter                   │redis-pubsub              │                    ║
║   │sftp          │tasklaunchrequest-transform│router                    │                    ║
║   │syslog        │tcp-client                 │s3                        │                    ║
║   │tcp           │tensorflow                 │sftp                      │                    ║
║   │tcp-client    │transform                  │task-launcher-cloudfoundry│                    ║
║   │time          │twitter-sentiment          │task-launcher-local       │                    ║
║   │trigger       │                           │task-launcher-yarn        │                    ║
║   │triggertask   │                           │tcp                       │                    ║
║   │twitterstream │                           │throughput                │                    ║
║   │              │                           │websocket                 │                    ║
╚═══╧══════════════╧═══════════════════════════╧══════════════════════════╧════════════════════╝

Continue reading on how to implement a custom source application using the reactive framework on spring cloud streams in our next blog post: Implementing a custom reactive source application for spring cloud data flow.