GDELT on SCDF 1.7.0 : Implementing a filter application

by Thomas Memenga on 2018-12-16

GDELT on SCDF 1.7.0 : Implementing a filter application

This blog post got updated for SCDF 2.2.0, please continue reading here: GDELT on SCDF 2.2.0 : Implementing a filter application.

In this blog post (part of processing GDELT data with SCDF on kubernetes) we want to filter the stream created in the last post (Implementing a custom reactive source application) as it contains a lot of duplicate data.

Source Code

You can find the source code on github:

git clone
cd gdelt-on-spring-cloud-data-flow
git checkout scdf-1-7-0
cd gdelt-article-simple-deduplication-filter

maven project setup

We use the same setup to build the neccessary jar files and docker image based on maven, spring maven plugins and Google’s JIB. See the predecessor blog post (Implementing a custom reactive source application) for details about the project structure.


Let’s start with our configuration properties. As we want to check the articles if we already have seen them (=the url or the article) before, we use a very simple cache to keep the urls for a certain amount of time. So let’s expose a single configuration propery as timeToLive:

public class GDELTDeduplicationFilterProperties {

 * time in minutes to remember already seen article urls
private long timeToLive = 24 * 60l;

   /** ... getter and setter omitted */

To limit the configuration options to just our own class add a file named to src/main/resources/META-INF containing:


The actual filter implementation is a very simple boot application that uses @PostConstruct to configure the cache and @Filter to annotate the actual filter implementation:

public class GDELTDeduplicationFilter {

private static final Log logger = LogFactory.getLog(GDELTDeduplicationFilter.class);

public static void main(String[] args) {, args);

private GDELTDeduplicationFilterProperties configuation;

private Cache<String, Long> cache

private void postConstruct() {
   cache = CacheBuilder.newBuilder().expireAfterWrite(configuation.getTimeToLive(), TimeUnit.MINUTES).build();

@Filter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
public boolean filter(Message&lt;GDELTArticle&gt; message) {
   GDELTArticle article = message.getPayload();
   Long currentCount = cache.getIfPresent(article.getUrl());
   if (currentCount != null) {
      cache.put(article.getUrl(), ++currentCount);"already seen " + (currentCount) + " times, filtering out article = " + article);
      return false;
   } else {
      cache.put(article.getUrl(), 1L);"seen for first time, passing thru = " + article);
      return true;

Note: This implementation is a very simple implementation of a deduplication filter. It will not gurantee 100 percent accuracy as urls are just being kept in memory, so restarts of pods may result in duplicate articles.

build and deploy

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-simple-deduplication-filter).

./mvnw clean package jib:build \ \ \

The docker image has been pushed to But we also want to use the metadata jar (target/gdelt-article-simple-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application“:


Select “Register one or more applications”:

scdf-gdelt-source-add-application-register-one-application Register the app (our prebuild image + jar files) using:


Afterwards browse the application list and click on “gdelt-article-simple-deduplication-filter” to verify that all configuration options have been picked up from the metadata jar file:


Creating a stream

Let’s enhance our stream from the last blog post (Implementing a custom reactive source application) and plug in our filtering processor:

gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" | gdelt-article-simple-deduplication-filter --time-to-live=1440 > :filtered-gdelt-articles


Save (we chose “stream-filtered” as its name) (do check the “deploy” box yet). On the deployment configuration page make sure to just create a single instance per application (There will be a new blog post about scaling and proper routing later on) and add a required deployer property (as the source,processor and the starter apps are spring 2.0-based):



Afterwards we can use kubectl to peek into the deduplication-filter pod:

kubectl -n scdf-170 get pods

stream-filtered-gdelt-article-feed-source-6cd4468594-c5p9j        1/1       Running   0          16m
stream-filtered-gdelt-article-simple-deduplication-filter-srx7p   1/1       Running   0          16m
stream-filtered-splitter-6df58fbcc7-zjckp                         1/1       Running   0          16m

kubectl -n scdf-170 logs -f stream-filtered-gdelt-article-simple-deduplication-filter-srx7p

first pull - let the articles pass:

.... : seen for first time, passing thru = GDELTArticle [url=, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States,, seendate=20181216T191500Z]

subsequent pulls (still containing the article - drop them):

.... :already seen 2 times, filtering out article = GDELTArticle [url=, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States,, seendate=20181216T191500Z]