Implementing a source as a function (supplier) with spring cloud stream function for spring cloud dataflow

by thomas.memenga on 2020-03-30

Implementing a source as a function (supplier) with spring cloud stream function for spring cloud dataflow

In the 5th part of our blog post series “processing GDELT data with SCDF on kubernetes” we will use spring cloud stream’s built-in support for spring cloud functions to implement a Source as a simple Supplier function.

This is an reimplementation of GDELT on SCDF 2.2.0 : Implementing a reactive source application based on the new capabilites provided by spring cloud functions available in Spring Cloud Hoxton.

Since Spring Cloud Stream v2.1, another alternative for defining stream handlers and sources is to use build-in support for Spring Cloud Function where they can be expressed as beans of type ‘java.util.function.Supplier’. Read more about it here in the official documentation of spring cloud stream.

This blogpost will guide you through all necessary steps to implement a spring cloud dataflow (scdf) source application as a java.util.function.Supplier , including dockerization and deployment on scdf.

There are also blog posts available for processor and sink applications implemented as functions.

Source Code

You can find the source code on github:

git clone https://github.com/syscrest/gdelt-on-spring-cloud-data-flow
cd gdelt-on-spring-cloud-data-flow
cd gdelt-article-feed-functional-source

maven project setup

The project will be based on Spring Cloud Stream and we wil use the spring cloud dependency management with the latest spring cloud relase Hoxton.SR3](https://spring.io/blog/2020/03/05/spring-cloud-hoxton-service-release-3-sr3-is-available):

...
<properties>
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
</properties>
...
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
...

Even if the implementation itself is not kafka-specific (more on binder abstraction) we include the Spring Cloud Kafka Binder directly in our project to build artifacts deployable on our target setup (Kubernetes + Kafka).

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Unlike the other projects of this series this project does not use maven but gradle. Unfortunately, there is no gradle version of the Spring Cloud Stream & Task Metadata Plugin plugin available to aggregate spring boot metadata into a seperate lightweight jar. We will use the fatjar later on.

Use the spring boot maven plugin to package the application itself:

<plugin>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-maven-plugin</artifactId>
</plugin>

Besides the actual fat jar (that will be dockerized later on) we also want to create a so called “metadata-only” jar using the Spring Cloud Stream & Task Metadata Plugin to aggregate spring boot metadata into a seperate lightweight jar. As we are using kubernetes as our deployment target we need to provide docker-based artifacts for deployment. Spring cloud data flow can not determine the actual configuration properties of an application directly from the docker image, but you can provide an additional jar besides the docker image to provide the necessary metadata about configuration options (names, descriptions, default values).

<plugin>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
   <executions>
       <execution>
      <id>aggregate-metadata</id>
      <phase>compile</phase>
      <goals>
          <goal>aggregate-metadata</goal>
      </goals>
       </execution>
    </executions>
 </plugin>

Dockerize the spring boot application using Google’s JIB Maven Plugin:

<plugin>
    <groupId>com.google.cloud.tools</groupId>
    <artifactId>jib-maven-plugin</artifactId>
    <version>1.6.1</version>
    <configuration>
        <useOnlyProjectCache>true</useOnlyProjectCache>
        <to>
            <image>registry.hub.docker.com/syscrest/gdelt-article-feed-source-greenwich</image>
        </to>
    </configuration>
</plugin>

Implementation

We want our source application to properly expose it’s configuration parameters, so we create a dedicated configuration property. The javadoc comment of the members will be displayed as the description and the initial values will automatically noted as the default values (no need to mention them in the javadoc description):

@Component
@ConfigurationProperties("gdelt")
@Validated
public class GDELTArticleSourceProperties {

    /**
    * The query to use to select data.
    * 
    * Example: ("climate change" or "global warming")
    * 
    */
    private String query = "global";

    public String getQuery() {
        return query;
    }

    public void setQuery(String query) {
        this.query = query;
    }
}

The actual spring boot implementation contains our java.util.functions.Supplier implementation that will fetch data from the GDELT.org rest-API, create a POJO (GDELTArticle) for each dataset and return a list. Because we use @PollableBean(splittable = true)` this list will be automatically split into individually messages and each POJO will be serialized as json.

@SpringBootApplication
public class GDELTArticleSourceApplication {

Logger logger = org.slf4j.LoggerFactory.getLogger(GDELTArticleSourceApplication.class);

private static DateTimeFormatter START_DATETIME_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss")
        .withZone(DateTimeZone.UTC);


private RestTemplate restTemplate = new RestTemplate();

@Autowired
private GDELTArticleSourceProperties configuration;

public static void main(String[] args) {
    SpringApplication.run(GDELTArticleSourceApplication.class, args);
}

@PollableBean(splittable = true)
public Supplier<List<GDELTArticle>> pullArticles() {
    return () -> {
        try {

            UriComponentsBuilder builder = UriComponentsBuilder
                    .fromHttpUrl("https://api.gdeltproject.org/api/v2/doc/doc")
                    .queryParam("query", configuration.getQuery()).queryParam("mode", "artlist")
                    .queryParam("maxrecords", "10")
                    .queryParam("startdatetime",
                            START_DATETIME_FORMATTER
                                    .print(new DateTime()
                                    .withZone(DateTimeZone.UTC)
                                    .minusMinutes(31)))
                    .queryParam("sort", "HybridRel").queryParam("format", "json");

            GDELTResponse data = restTemplate.getForObject(builder.toUriString(), 
                                                           GDELTResponse.class);
            logger.info("data =  " + data);
            if (data != null && data.getArticles() != null && data.getArticles().length > 0) {
                return Arrays.asList(data.getArticles());
            } else {
                return null;
            }
        } catch (Exception e) {
            logger.error("", e);
            throw new RuntimeException("", e);
        }
    };
}

As Spring boot applications are aware of a lot common configuration properties, we create a file named META-INF/spring-configuration-metadata-whitelist.properties to explictly limit the displayed configuration options to our class and the configuration properties available on the default poller. Read more about whitelisting here.

configuration-properties.classes=com.syscrest.blogposts.scdf.gdeltsource.GDELTArticleSourceProperties

configuration-properties.names=\
spring.cloud.stream.poller.fixed-delay,\
spring.cloud.stream.poller.max-messages-per-poll,\
spring.cloud.stream.poller.cron,\
spring.cloud.stream.poller.initial-delay

Spring Cloud Stream functions support (documentation) automatically creates input and output channels based on the function name see docs for more details. As our function is named “pullArticles”, it would create an output channel named output-pullArticles-out-0. But spring cloud data flow would not be aware of this channel, it expects a single output channel named output on a source implementation. We can define an alias for output-pullArticles-out-0 so it matches the channel name expected by spring cloud dataflow:

src/main/resources/application.properties:

spring.cloud.stream.function.bindings.pullArticles-out-0=output

And we also need to tell spring cloud stream which function to execute:

src/main/resources/application.properties:

spring.cloud.function.definition=pullArticles

.

Build the project

You can package the application, create the docker image and upload it to docker-hub with a single command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you could skip this step and use our docker image (syscrest/gdelt-article-feed-functional-source-hoxton).

./mvnw clean package jib:build \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/gdelt-article-feed-functional-source-hoxton

The output should look like this:

[INFO] Scanning for projects...
[INFO]
[INFO] --< com.syscrest.gdelt-on-scdf:gdelt-article-feed-functional-source >---
[INFO] Building gdelt-article-feed-functional-source 1.0.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
....
....
[INFO] --- jib-maven-plugin:1.6.1:build (default-cli) @ gdelt-article-feed-functional-source ---
[INFO]
[INFO] Containerizing application to registry.hub.docker.com/syscrest/gdelt-article-feed-functional-source-hoxton...
[INFO]
[INFO] Container entrypoint set to [java, -cp, /app/resources:/app/classes:/app/libs/*, com.syscrest.blogposts.scdf.gdeltsource.GDELTArticleSourceApplication]
[INFO]
[INFO] Built and pushed image as registry.hub.docker.com/syscrest/gdelt-article-feed-functional-source-hoxton
[INFO] Executing tasks:
[INFO] [===========================   ] 90,0% complete
[INFO] > launching layer pushers
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

The docker image has been pushed to hub.docker.com. But we also want to use the metadata-jar (target\gdelt-article-feed-functional-source-1.0.0-SNAPSHOT-metadata.jar). SCDF can pull jar files not only from maven central but also from any http server, so we uploaded it to our website to make it available for the scdf server (you can find the url in the next section).

Register the app

Browse your Spring Cloud Data Flow UI and select “Apps” and then “+ Add Application":

scdf-gdelt-source-add-application

Select “Register one or more applications”:

scdf-gdelt-source-add-application-register-one-application Register the app using:

scdf-gdelt-source-add-application-registration-details

Afterwards browse the application list and click on “gdelt-article-feed-source”:

scdf-gdelt-source-add-application-verify

to verify that all configuration options have been picked up from the metadata jar file:

scdf-gdelt-source-add-application-verify-options

Creating a stream

Let’s create a simple stream that uses our custom application as the source and use the very basic log processor to just dump the messages into the logfile of a pod. Select Streams on the left sidebar and then click Create stream(s):

scdf-gdelt-source-add-stream

Just copy and paste our example to query all current articles containing climate into the textbox:

gdelt-functional-source --query='climate' --spring.cloud.stream.poller.fixed-delay=60000 | log

scdf-gdelt-source-add-stream-stream-1-just-log

Afterwards save the stream (don’t check ‘Deploy Stream(s)'):

scdf-gdelt-source-add-stream-stream-1-just-log-save-stream

Locate your previously saved stream in the stream list:

scdf-gdelt-source-add-stream-stream-1-just-log-in-list

When you click on deploy you can define deployment specific settings like memory and cpu assignments (not necessary , default values are sufficient):

scdf-gdelt-source-add-stream-stream-1-just-log-deployment-options

Navigate back to the “streams” section and you will see that your stream is now in the state “deploying”:

scdf-gdelt-source-add-stream-stream-1-just-log-deployment-options

After a couple of minutes it should be listed as “deployed”.

Your spring cloud data flow instance will now deploy pods in the same namespace it’s running using the stream name plus source/processor names:

kubectl -n scdf-240 get pods


NAME                                                      READY     STATUS             RESTARTS   AGE
...
functional-source-log-gdelt-functional-source-v1-54cffd8cfv25jk   1/1     Running   0          2m32s
functional-source-log-log-v1-589b75bcdd-97ztl                     1/1     Running   0          2m32s
...

Let’s peek into the “log” pod the see the data that has been emitted by our custom source:

kubectl -n scdf-240 logs -f functional-source-log-log-v1-589b75bcdd-97ztl

output (reformatted for better readability):

2020-04-03 23:06:03.447  INFO 1 --- [container-0-C-1] log-sink
  { "url": "https://www.marketwatch.com/press-release/medical-collagen-market-growth-by-application-technology-service-forecast-to-2025-2020-04-03",
    "title":"Medical Collagen Market Growth By Application , Technology Service , Forecast to 2025",
    "language":"English",
    "sourcecountry":"United States",
    "domain":"marketwatch.com",
    "seendate":"20200403T210000Z"
  }
  ...
...
]

Because we enabled splitting, the list of GDELTArticles of our supplier function is already split into individual messages and we don’t need to use the splitter processor.

The source is now fully functional, you can continue with the next blog post “Implementing a processor as a function” to add a custom processor to your stream that will download the content of the article.