Implementing a processor as a function with spring cloud stream function for spring cloud dataflow

by Thomas Memenga on 2020-04-02

Implementing a processor as a function with spring cloud stream function for spring cloud dataflow

In the 6th part of our blog post series “processing GDELT data with SCDF on kubernetes” we will use spring cloud stream’s built-in support for spring cloud functions to implement a processor as a simple function.

Since Spring Cloud Stream v2.1, another alternative for defining stream handlers and sources is to use build-in support for Spring Cloud Function where processors can be expressed as beans of type Function. Read more about it here in the official documentation of spring cloud stream.

This blogpost will guide you through all necessary steps to implement a spring cloud dataflow (scdf) processor application as a java.util.function.Function, including dockerization and deployment on scdf.

It’s recommended to first got through the first part (implementing a source) as this source is used later on.

There is also a blog posts available on how to implement a sink application as a Consumer function.

Source Code

You can find the source code on github:

git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow
cd gdelt-on-spring-cloud-data-flow
cd gdelt-article-feed-functional-processor

project setup

The project uses the same project layout and plugins like the source application project. Please have a look at the first part - implementing a source - for a detailed introduction .

Implementation

We want our source application to properly expose its configuration parameters, so we create a dedicated configuration property. The javadoc comment of the members will be displayed as the description and the initial values will automatically noted as the default values (no need to mention them in the javadoc description):

@Component
@ConfigurationProperties("gdelt")
@Validated
public class GDELTArticleProcessorProperties {


    /**
	 * Whether to save the original html body or transform the markup to plain text
     */
    boolean transformHtmlToPlainText = true;

    public boolean isTransformHtmlToPlainText() {
        return transformHtmlToPlainText;
    }

    public void setTransformHtmlToPlainText(boolean transformHtmlToPlainText) {
        this.transformHtmlToPlainText = transformHtmlToPlainText;
    }

    
}

the actual spring boot implementation contains our `java.util.functions.Function’ implementation that will retrieve a POJO containing the GDEL article entry, fetches the url and returns another POJOs that also includes the content of the article.

@SpringBootApplication
public class GDELTArticleProcessorApplication {

    Logger logger = org.slf4j.LoggerFactory.getLogger(GDELTArticleProcessorApplication.class);

    @Autowired
    private GDELTArticleProcessorProperties configuration;

    public static void main(String[] args) {
        SpringApplication.run(GDELTArticleProcessorApplication.class, args);
    }

    private RestTemplate restTemplate = new RestTemplate();

    @Bean
    public Function<GDELTArticle, GDELTArticleWithContent> downloadContent() {

        return (article) -> {

            GDELTArticleWithContent articleWithContent = new GDELTArticleWithContent(article);

            try {
                logger.info(String.format("fetching url = %s", article.getUrl()));
                ResponseEntity<String> response = restTemplate.getForEntity(article.getUrl(), String.class);

                if (response.getStatusCode().is2xxSuccessful()) {
                    if (configuration.isTransformHtmlToPlainText()) {
                        articleWithContent.setContent(Jsoup.parse(response.getBody()).text());
                    } else {
                        articleWithContent.setContent(response.getBody());
                    }
                    return articleWithContent;
                } else {
                    logger.info(String.format("bad status code (%d, dropping message", response.getStatusCodeValue()));
                    return null;
                }

            } catch (Exception e) {
                logger.error("while fetching url = " + article.getUrl(), e);
                return null; // filter entity
            }
        };

    }

}

As Spring boot applications are aware of a lot common configuration properties, we create a file named META-INF/spring-configuration-metadata-whitelist.properties to explictly limit the displayed configuration options to our class. Read more about whitelisting here.

configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltprocessor.GDELTArticleProcessorProperties

Spring Cloud Stream functions support (documentation) automatically creates input and output channels based on the function name see docs more more details. As our function is named “downloadContent”, it will create a channel named downloadContent-in-0 and downloadContent-out-0. But spring cloud data flow is not aware of these channels, it expects a single output channel named output and a single output channel named input on a processor implementation. So we need to define aliases so it matches the channel names expected by spring cloud data flow:

src/main/resources/application.properties:

spring.cloud.stream.function.bindings.downloadContent-in-0=input
spring.cloud.stream.function.bindings.downloadContent-out-0=output

And we also need to tell spring cloud stream which function to use.

src/main/resources/application.properties:

spring.cloud.function.definition=downloadContent

Build the project

You can package the application, create the docker image and upload it to docker-hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-feed-functional-processor-hoxton).

./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-feed-functional-processor-hoxton

The output should look like this:

[INFO] Scanning for projects...
[WARNING]
.......
[INFO] Containerizing application to registry.hub.docker.com/syscrest/gdelt-article-functional-processor-hoxton...
[INFO]
[INFO] Container entrypoint set to [java, -cp, /app/resources:/app/classes:/app/libs/*, com.syscrest.blogposts.scdf.gdeltprocessor.GDELTArticleProcessorApplication]
[INFO]
[INFO] Built and pushed image as registry.hub.docker.com/syscrest/gdelt-article-functional-processor-hoxton
[INFO] Executing tasks:
[INFO] [===========================   ] 90,0% complete
[INFO] > launching layer pushers
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.916 s
[INFO] Finished at: 2020-04-03T22:32:11+02:00
[INFO] ------------------------------------------------------------------------

The docker image has been pushed to hub.docker.com. But we also want to use the metadata-jar (target\gdelt-article-feed-functional-processor-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application”:

scdf-gdelt-source-add-application

Select “Register one or more applications”:

scdf-gdelt-source-add-application-register-one-application

Register the app using:

scdf-gdelt-source-add-application-registration-details

Afterwards browse the application list and click on “gdelt-functional-processor”:

scdf-gdelt-source-add-application-verify

to verify that all configuration options have been picked up from the metadata jar file:

scdf-gdelt-source-add-application-verify-options

Creating a stream

Let’s create a simple stream that uses our custom application as the source and use the very basic log processor to just dump the messages into the logfile of a pod. Select Streams on the left sidebar and then click Create stream(s):

scdf-gdelt-source-add-stream

Just copy and paste our example to query all current articles containing ‘climate’ into the textbox:

gdelt-functional-source --query='climate' --spring.cloud.stream.poller.fixed-delay=60000 | gdelt-functional-processor | log

scdf-gdelt-source-add-stream-stream-1-just-log

Afterwards save the stream (don’t check ‘Deploy Stream(s)’):

scdf-gdelt-source-add-stream-stream-1-just-log-save-stream

Locate your previously saved stream in the stream list:

scdf-gdelt-source-add-stream-stream-1-just-log-in-list

When you click on deploy you can define deployment specific settings like memory and cpu assignments, but this is not neccessary. Just hit deploy.

2020-04-scdf-gdelt-functional-processor-deploy.png

Navigate back to the “streams” section and you will see that your stream is now in the state “deploying”:

2020-04-scdf-functional-processor-deploying.png

After a couple of minutes it should be listed as “deployed”.

Your spring cloud data flow instance will now deploy pods in the same namespace it’s running using the stream name plus source/processor names:

kubectl -n scdf-240 get pods


NAME                                                      READY     STATUS             RESTARTS   AGE
...
gdelt-download-content-gdelt-functional-processor-v1-c675c44jkt   0/1     Running   0          90s
gdelt-download-content-gdelt-functional-source-v1-6bdbd7887plwv   0/1     Running   0          89s
gdelt-download-content-log-v1-65bdb6c57f-brln7                    0/1     Running   0          90s
...

Let’s peek into the “log” pod the see the data that has been emitted by our custom source:

kubectl -n scdf-240 logs -f gdelt-download-content-log-v1-65bdb6c57f-brln7

output (reformatted for better readability) - including the content of the article:

2020-04-05 20:59:45.903  INFO 1 --- [container-0-C-1] log-sink 
{
    "url":"https://finance.yahoo.com/news/oil-prices-under-pressure-saudi-194458417.html",
    "title":"Oil prices under pressure from Saudi - Russia dispute , but tariffs provide support",
    "language":"English",
    "sourcecountry":"United States",
    "domain":"finance.yahoo.com",
    "seendate":"20200405T203000Z",
    "content":"Oil prices under pressure from Saudi-Russia dispute, ....."
}
...
]

You can now continue with the next blog post “Implementing a sink as a function” to add a custom sink to this stream that will persist the articles into a database.