GDELT on SCDF 2.2.0: Implementing a basic filter application to drop duplicate data

by Thomas Memenga on 2019-08-06

GDELT on SCDF 2.2.0: Implementing a basic filter application to drop duplicate data

In the third part of our blog post series “processing GDELT data with SCDF on kubernetes” we want to filter the stream created in the last post as it contains a lot of duplicate data.

This is an repost of ‘GDELT on SCDF 1.7.0: Implementing a filter application to update instructions and code from SCDF 1.7.0 to 2.2.0.

Source Code

You can find the source code on github:

git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow
cd gdelt-on-spring-cloud-data-flow
git checkout scdf-2-2-0
cd gdelt-article-simple-deduplication-filter

maven project setup

We use the same setup to build the neccessary jar files and docker image based on maven, spring maven plugins and Google’s JIB. See the predecessor blog post (Implementing a custom reactive source application) for details about the project structure.

Implementation

Let’s start with our configuration properties. As we want to check the articles if we already have seen them (=the url or the article) before, we use a very simple cache to keep the urls for a certain amount of time. So let’s expose a single configuration propery as timeToLive:

@ConfigurationProperties("gdelt")
@Validated
public class GDELTDeduplicationFilterProperties {

/**
 * time in minutes to remember already seen article urls
 */
private long timeToLive = 24 * 60l;

   /** ... getter and setter omitted */

To limit the configuration options to just our own class add a file named spring-configuration-metadata-whitelist.properties to src/main/resources/META-INF containing:

configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltprocessor.simplearticlededuplicator.GDELTDeduplicationFilterProperties

The actual filter implementation is a very simple boot application that uses @PostConstruct to configure the cache and @Filter to annotate the actual filter implementation:

@EnableConfigurationProperties(GDELTDeduplicationFilterProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class GDELTDeduplicationFilter {

private static final Log logger = LogFactory.getLog(GDELTDeduplicationFilter.class);

public static void main(String[] args) {
   SpringApplication.run(GDELTDeduplicationFilter.class, args);
}

@Autowired
private GDELTDeduplicationFilterProperties configuation;

private Cache<String, Long> cache

@PostConstruct
private void postConstruct() {
   cache = CacheBuilder.newBuilder().expireAfterWrite(configuation.getTimeToLive(), TimeUnit.MINUTES).build();
}

@Filter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) 
public boolean filter(Message&lt;GDELTArticle&gt; message) {
   GDELTArticle article = message.getPayload();
   Long currentCount = cache.getIfPresent(article.getUrl());
   if (currentCount != null) {
	  cache.put(article.getUrl(), ++currentCount);
	  logger.info("already seen " + (currentCount) + " times, filtering out article = " + article);
	  return false;
   } else {
	  cache.put(article.getUrl(), 1L);
	  logger.info("seen for first time, passing thru = " + article);
	  return true;
   }
}

Note: This implementation is a very simple implementation of a deduplication filter. It will not gurantee 100 percent accuracy as urls are just being kept in memory, so restarts of pods may result in duplicate articles.

build and deploy

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-simple-deduplication-filter-greenwich).

./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-simple-deduplication-filter-greenwich

We also want to use the metadata jar (target/gdelt-article-simple-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application”:

Select “Register one or more applications”:

Register the app (our prebuild image + jar files) using:

Afterwards browse the application list and click on “gdelt-simple-dedup-filter” to verify that all configuration options have been picked up from the metadata jar file:

Creating a stream

Let’s enhance our stream from the last blog post (Implementing a custom reactive source application) and plug in our filtering processor:

gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" | gdelt-simple-dedup-filter --time-to-live=1440 > :filtered-gdelt-articles

Save (we chose “stream-filtered” as its name) (do check the “deploy” box yet). On the deployment configuration page make sure to just create a single instance per application (There will be a new blog post about scaling and proper routing later on).

Afterwards we can use kubectl to peek into the deduplication-filter pod:

kubectl -n scdf-220 get pods

stream-filtered-gdelt-article-feed-source-v1-5f894b5df8-pxc7x   1/1     Running   0          18m
stream-filtered-gdelt-simple-dedup-filter-v1-77bd6c69f4-j69z8   1/1     Running   0          18m
stream-filtered-splitter-v1-c76787fc6-kxkmz                     1/1     Running   0          18m



kubectl -n scdf-220 logs -f stream-filtered-gdelt-simple-dedup-filter-v1-77bd6c69f4-j69z8

first pull - let the articles pass:

...
.... : seen for first time, passing thru = GDELTArticle [url=http://www.kabc.com/news/planetary-emergency-after-30-years-leaders-are-still-fighting-about-basic-truths-of-climate-science/, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States, domain=kabc.com, seendate=20181216T191500Z]
...

subsequent pulls (still containing the article - drop them):

...
.... :already seen 2 times, filtering out article = GDELTArticle [url=http://www.kabc.com/news/planetary-emergency-after-30-years-leaders-are-still-fighting-about-basic-truths-of-climate-science/, title=  Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States, domain=kabc.com, seendate=20181216T191500Z]
...

The blog post series continues by leveraging kafka streams sessionizing to implement an alternative filter solution.