GDELT on SCDF : Implementing a filter application

In this blog post (part of processing GDELT data with SCDF on kubernetes) we want to filter the stream created in the last post (Implementing a custom reactive source application) as it contains a lot of duplicate data.

Source Code

You can find the source code on github:


git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow

cd gdelt-article-simple-deduplication-filter

maven project setup

We use the same setup to build the neccessary jar files and docker image based on maven, spring maven plugins and Google’s JIB. See the predecessor blog post (Implementing a custom reactive source application) for details about the project structure.

Implementation

Let’s start with our configuration properties. As we want to check the articles if we already have seen them (=the url or the article) before, we use a very simple cache to keep the urls for a certain amount of time. So let’s expose a single configuration propery as timeToLive:

@ConfigurationProperties("gdelt")
@Validated
public class GDELTDeduplicationFilterProperties {

	/**
	 * time in minutes to remember already seen article urls
	 */
	private long timeToLive = 24 * 60l;

       /** ... getter and setter omitted */

To limit the configuration options to just our own class add a file named spring-configuration-metadata-whitelist.properties to src/main/resources/META-INF containing:

configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltprocessor.simplearticlededuplicator.GDELTDeduplicationFilterProperties

The actual filter implementation is a very simple boot application that uses @PostConstruct to configure the cache and @Filter to annotate the actual filter implementation:

@EnableConfigurationProperties(GDELTDeduplicationFilterProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class GDELTDeduplicationFilter {

private static final Log logger = LogFactory.getLog(GDELTDeduplicationFilter.class);

public static void main(String[] args) {
   SpringApplication.run(GDELTDeduplicationFilter.class, args);
}

@Autowired
private GDELTDeduplicationFilterProperties configuation;

private Cache cache

@PostConstruct
private void postConstruct() {
   cache = CacheBuilder.newBuilder().expireAfterWrite(configuation.getTimeToLive(), TimeUnit.MINUTES).build();
}

@Filter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
public boolean filter(Message<GDELTArticle> message) {
   GDELTArticle article = message.getPayload();
   Long currentCount = cache.getIfPresent(article.getUrl());
   if (currentCount != null) {
      cache.put(article.getUrl(), ++currentCount);
      logger.info("already seen " + (currentCount) + " times, filtering out article = " + article);
      return false;
   } else {
      cache.put(article.getUrl(), 1L);
      logger.info("seen for first time, passing thru = " + article);
      return true;
   }
}

Note: This implementation is a very simple implementation of a deduplication filter. It will not gurantee 100 percent accuracy as urls are just being kept in memory, so restarts of pods may result in duplicate articles.

build and deploy

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-simple-deduplication-filter).


./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-simple-deduplication-filter

The docker image has been pushed to hub.docker.com. But we also want to use the metadata jar (target/gdelt-article-simple-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application“:

scdf-gdelt-source-add-application

Select “Register one or more applications”:

scdf-gdelt-source-add-application-register-one-application
Register the app (our prebuild image + jar files) using:

  • Name: gdelt-article-simple-deduplication-filter
  • Type: Processor
  • URI: docker:syscrest/gdelt-article-simple-deduplication-filter:latest
  • Metadata URI: http://www.syscrest.com/gdelt-on-scdf-artifacts/gdelt-article-simple-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar

scdl-gdelt-simple-filter-add

Afterwards browse the application list and click on “gdelt-article-simple-deduplication-filter” to verify that all configuration options have been picked up from the metadata jar file:

scdl-gdelt-simple-filter-properties

Creating a stream

Let’s enhance our stream from the last blog post (Implementing a custom reactive source application) and plug in our filtering processor:

gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" | gdelt-article-simple-deduplication-filter --time-to-live=1440 > :filtered-gdelt-articles

scdl-gdelt-simple-filter-flow-A

Save (we chose “stream-filtered” as its name) (do check the “deploy” box yet). On the deployment configuration page make sure to just create a single instance per application (There will be a new blog post about scaling and proper routing later on) and add a required deployer property (as the source,processor and the starter apps are spring 2.0-based):

kubernetes.bootMajorVersion=2

kubernetes.bootMajorVersion_2

Afterwards we can use kubectl to peek into the deduplication-filter pod:


kubectl -n scdf-170 get pods

stream-filtered-gdelt-article-feed-source-6cd4468594-c5p9j        1/1       Running   0          16m
stream-filtered-gdelt-article-simple-deduplication-filter-srx7p   1/1       Running   0          16m
stream-filtered-splitter-6df58fbcc7-zjckp                         1/1       Running   0          16m


kubectl -n scdf-170 logs -f stream-filtered-gdelt-article-simple-deduplication-filter-srx7p

first pull – let the articles pass:

...
.... : seen for first time, passing thru = GDELTArticle [url=http://www.kabc.com/news/planetary-emergency-after-30-years-leaders-are-still-fighting-about-basic-truths-of-climate-science/, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States, domain=kabc.com, seendate=20181216T191500Z]
...

subsequent pulls (still containing the article – drop them):

...
.... :already seen 2 times, filtering out article = GDELTArticle [url=http://www.kabc.com/news/planetary-emergency-after-30-years-leaders-are-still-fighting-about-basic-truths-of-climate-science/, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States, domain=kabc.com, seendate=20181216T191500Z]
...