GDELT on SCDF 2.2.0: Implementing an advanced processor to drop duplicate data with kafka streams

by Thomas Memenga on 2019-10-10

GDELT on SCDF 2.2.0: Implementing an advanced processor to drop duplicate data with kafka streams

In the 4th part of our blog post series “processing GDELT data with SCDF on kubernetes” we will reimplement the deduplication filter from the last post as a kafka streams application including custom SerDes.

Source Code

You can find the source code on github:

git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow
cd gdelt-on-spring-cloud-data-flow
git checkout scdf-2-2-0
cd gdelt-article-kafka-streams-deduplication-filter

maven project setup

We reuse the setup to build the neccessary jar files and docker image based on maven, spring maven plugins and Google’s JIB. See the predecessor blog post (Implementing a custom reactive source application) for details about the project structure.

In contrast to the first two projects (article source and simple deduplication filter), we do not use the latest Spring Cloud Stream Release Greenwich but the latest Milestone release Hoxton.M3. Kafka-Streams is already available in Greenwich, but we want to use features that are only available in the current version of Kafka Streams.

You can find a good introduction on how Kafka Streams was integrated into the Spring Cloud Stream programming model here. If you are already familiar with Kafka Streams, you will see in this example that you directly interact with Kafka Stream Types (KStream and KTable).

Implementation

The previous (simple) implementation did keep an internal cache of all urls and counted the occurrences of each GDELTArticle and emitted only on the first occurrence. Kafka Stream is also able to group a stream on a custom key and count occurrences, but you are not able to control when the aggregation of data will take place (might be batched) and when intermediate results will be emitted.

But we can leverage a windowing operation called session windows to supress duplicate articles.

Our article sink is periodically pulling the latest 250 articles from gdelt.org, causing duplicates of articles until the article is no longer part of the response (because only newer articles are returned). So it’s safe to assume that if we have not seen an article for x seconds (x greater that the poll interval of the sink) it will not show up again.

Let’s start with our configuration properties to make the inactivityGap configurable:

@ConfigurationProperties("gdelt")
@Validated
public class GDELTKStreamsDeduplicationProperties {

	/**
	* time in seconds before a seen url (without any further occurrences) is
	* considered final and can be emitted. For the deduplication to work this
	* values must be greater than the poll interval of the source.
	*/
	private long inactivityGap = 180;

}

To limit the configuration options to just our own class add a file named spring-configuration-metadata-whitelist.properties to src/main/resources/META-INF containing:

configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltprocessor.kstreamsdeduplicator.GDELTKStreamsDeduplicationProperties

The actual KStreams application:

@EnableConfigurationProperties(GDELTKStreamsDeduplicationProperties.class)
@EnableBinding(KafkaStreamsProcessor.class)
@SpringBootApplication
public class GDELTKStreamsDeduplicationApplication {

	public static void main(String[] args) {
		SpringApplication.run(GDELTKStreamsDeduplicationApplication.class, args);
	}

	@Autowired
	private GDELTKStreamsDeduplicationProperties configuration;

	@StreamListener("input")
	@SendTo("output")
	public KStream<String, GDELTArticle> process(KStream<Object, GDELTArticle> input) {

	return input
			.groupBy((k, v) -> v.getUrl(), Grouped.with(Serdes.String(), CustomSerdes.GDELTArticleSerde()))
			.windowedBy(
					SessionWindows.with(
							Duration
								.ofSeconds(configuration.getInactivityGap()))
								.grace(Duration.ZERO)
					)
			.reduce((a1, a2) -> a1 )
			.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
			.toStream((windowed, value) -> windowed.key())
			.peek((k, v) -> {
				logger.info("emitting article = " + v);
			});

}

Let’s go through each operation in detail:

As in any other spring cloud stream application, we need to define the channels our processor should read from / write to. But in contrast, you directly consume and return kstreams:

@StreamListener("input")
@SendTo("output")
public KStream<KeyValue<String, GDELTArticle>, GDELTArticle> process(KStream<Object, GDELTArticle> input) {

As our messages have no explict key, we use the url from the payload as the key. Because we want to process our GDELTArticle POJO, we need to provide a custom Serde (See CustomSerdes on github):

.groupBy(
	(k, v) -> v.getUrl(), 
	Grouped.with(Serdes.String(), CustomSerdes.GDELTArticleSerde())
)

Now we apply the SessionWindow Operator on our by-url grouped data. It’s very important to use an explicit value for grace, otherwise it would use the default of 24 hours (= time to wait for late arrivals).

.windowedBy(
	SessionWindows.with(
		Duration
			.ofMinutes(configuration.getInactivityGap())
			.grace(Duration.ZERO))
)

As each value is identical, it does not matter which element to keep as the final output:

.reduce(
	(a1, a2) -> a1
)

All steps up to now would emit aggregated sessions when new data arrives, even if the session is not already finalized. Using suppress.untilWindowClose the stream only emits the session when the window has been closed and the gap time has passed.

.suppress(
    Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
)

A windowing operation does not return the original key as the key, but wraps it in Windowed, which also contains start and endtime of the session. So the last step creates a Kstreams with a remapping of the key so that it contains the url (previously used key).

.toStream((windowed, value) -> windowed.key())

The last operation is just logging so we can see every emitted article in the pods’ log:

.peek((k, v) -> {
	logger.info("emitting article = " + v);
});

You already seen the usage of the custom SerDes in the groupBy operation, you also need to define it in your application.properties :

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=com.syscrest.blogposts.scdf.gdeltprocessor.kstreamsdeduplicator.CustomSerdes$GDELTArticleSerde

build and deploy

You can package the application, create the docker image and upload it to docker hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (hub.docker.com/syscrest/gdelt-article-kafka-streams-deduplication-filter-hoxton).

./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-kafka-streams-deduplication-filter-hoxton

We also want to use the metadata jar (target/gdelt-article-kafka-streams-deduplication-filter-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application”:

Select “Register one or more applications”:

Register the app (our prebuild image + jar files) using:

Afterwards browse the application list and click on “gdelt-simple-dedup-filter” to verify that all configuration options have been picked up from the metadata jar file:

Creating a stream

Let’s enhance our stream from the last blog post (Implementing a basic filter application to drop duplicate data) and use our new deduplication logic. Will we configure the source to pull every 120 seconds and our kstreams application to wait for 250 seconds. Therefore an article is emitted if it has not been in the response from the last two api calls:

Hint: Make sure to undeploy + delete the previous version, otherwise it would also process the data and write to the topic filtered-gdelt-articles.

gdelt-article-feed-source --query='climate change' --trigger-delay=120 | splitter --expression="#jsonPath(payload,'$.*')" | gdelt-kstreams-dedup-filter --inactivity-gap=250 > :filtered-gdelt-articles

Save and then deploy the stream. Based on your query (volume of new articles) and the current time of the day (most articles are published during daytime) it might take up to 45 minutes before the first articles are emitted by the filter:

GDELTKStreamsDeduplicationApplication : emitting article = GDELTArticle [url=https://www.ctvnews.ca/world/lights-out-power-cut-in-california-to-prevent-deadly-fires-1.4632186,  │
│ title=Lights out : Power cut in California to prevent deadly fires, language=English, sourcecountry=Canada, domain=ctvnews.ca, seendate=20191010T070000Z]