In the third part of our blog post series “processing GDELT data with SCDF on kubernetes” we want to filter the stream created in the last post as it contains a lot of duplicate data.
This is an repost of ‘GDELT on SCDF 1.7.0: Implementing a filter application to update instructions and code from SCDF 1.7.0 to 2.2.0.
Source Code
You can find the source code on github:
git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow
cd gdelt-on-spring-cloud-data-flow
git checkout scdf-2-2-0
cd gdelt-article-simple-deduplication-filter
maven project setup
We use the same setup to build the neccessary jar files and docker image based on maven, spring maven plugins and Google’s JIB. See the predecessor blog post (Implementing a custom reactive source application) for details about the project structure.
Implementation
Let’s start with our configuration properties. As we want to check the articles if we already have seen them (=the url or the article) before, we use a very simple cache to keep the urls for a certain amount of time. So let’s expose a single configuration propery as timeToLive:
@ConfigurationProperties("gdelt")
@Validated
public class GDELTDeduplicationFilterProperties {
/**
* time in minutes to remember already seen article urls
*/
private long timeToLive = 24 * 60l;
/** ... getter and setter omitted */
To limit the configuration options to just our own class add a file named spring-configuration-metadata-whitelist.properties to src/main/resources/META-INF containing:
configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltprocessor.simplearticlededuplicator.GDELTDeduplicationFilterProperties
The actual filter implementation is a very simple boot application that uses @PostConstruct to configure the cache and @Filter to annotate the actual filter implementation:
@EnableConfigurationProperties(GDELTDeduplicationFilterProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class GDELTDeduplicationFilter {
private static final Log logger = LogFactory.getLog(GDELTDeduplicationFilter.class);
public static void main(String[] args) {
SpringApplication.run(GDELTDeduplicationFilter.class, args);
}
@Autowired
private GDELTDeduplicationFilterProperties configuation;
private Cache<String, Long> cache
@PostConstruct
private void postConstruct() {
cache = CacheBuilder.newBuilder().expireAfterWrite(configuation.getTimeToLive(), TimeUnit.MINUTES).build();
}
@Filter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public boolean filter(Message<GDELTArticle> message) {
GDELTArticle article = message.getPayload();
Long currentCount = cache.getIfPresent(article.getUrl());
if (currentCount != null) {
cache.put(article.getUrl(), ++currentCount);
logger.info("already seen " + (currentCount) + " times, filtering out article = " + article);
return false;
} else {
cache.put(article.getUrl(), 1L);
logger.info("seen for first time, passing thru = " + article);
return true;
}
}
Note: This implementation is a very simple implementation of a deduplication filter. It will not gurantee 100 percent accuracy as urls are just being kept in memory, so restarts of pods may result in duplicate articles.
build and deploy
You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).
Note: you could skip this step and use our docker image (syscrest/gdelt-article-simple-deduplication-filter-greenwich).
./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-simple-deduplication-filter-greenwich
We also want to use the metadata jar (target/gdelt-article-simple-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).
Register the app
Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application”:
Select “Register one or more applications”:
Register the app (our prebuild image + jar files) using:
- Name: gdelt-simple-dedup-filter
- Type: Processor
- URI: docker:syscrest/gdelt-article-simple-deduplication-filter-greenwich:latest
- Metadata URI: https://www.syscrest.com/gdelt-on-scdf-artifacts/gdelt-article-simple-deduplication-filter-greenwich-metadata.jar
Afterwards browse the application list and click on “gdelt-simple-dedup-filter” to verify that all configuration options have been picked up from the metadata jar file:
Creating a stream
Let’s enhance our stream from the last blog post (Implementing a custom reactive source application) and plug in our filtering processor:
gdelt-article-feed-source --query='climate change' | splitter --expression="#jsonPath(payload,'$.*')" | gdelt-simple-dedup-filter --time-to-live=1440 > :filtered-gdelt-articles
Save (we chose “stream-filtered” as its name) (do check the “deploy” box yet). On the deployment configuration page make sure to just create a single instance per application (There will be a new blog post about scaling and proper routing later on).
Afterwards we can use kubectl to peek into the deduplication-filter pod:
kubectl -n scdf-220 get pods
stream-filtered-gdelt-article-feed-source-v1-5f894b5df8-pxc7x 1/1 Running 0 18m
stream-filtered-gdelt-simple-dedup-filter-v1-77bd6c69f4-j69z8 1/1 Running 0 18m
stream-filtered-splitter-v1-c76787fc6-kxkmz 1/1 Running 0 18m
kubectl -n scdf-220 logs -f stream-filtered-gdelt-simple-dedup-filter-v1-77bd6c69f4-j69z8
first pull - let the articles pass:
...
.... : seen for first time, passing thru = GDELTArticle [url=http://www.kabc.com/news/planetary-emergency-after-30-years-leaders-are-still-fighting-about-basic-truths-of-climate-science/, title= Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States, domain=kabc.com, seendate=20181216T191500Z]
...
subsequent pulls (still containing the article - drop them):
...
.... :already seen 2 times, filtering out article = GDELTArticle [url=http://www.kabc.com/news/planetary-emergency-after-30-years-leaders-are-still-fighting-about-basic-truths-of-climate-science/, title= Planetary emergency : After 30 years , leaders are still fighting about basic truths of climate science, language=English, sourcecountry=United States, domain=kabc.com, seendate=20181216T191500Z]
...
The blog post series continues by leveraging kafka streams sessionizing to implement an alternative filter solution.