Basic Pulsar producer and consumer

by Thomas.Memenga on 2019-10-18

Basic Pulsar producer and consumer

In the second part of our blog post series “getting started with pulsar on kubernetes” we create a very simple pulsar producer and consumer. It will use json Serde and the project setup includes dockerization of the application and a simple helm chart to deploy it on Kubernetes.

prerequisites

You’ve followed the installation instructions in the first part of the blog post series: Installing Pulsar on Kubernetes using Helm, so you have a pulsar cluster running on kubernetes and your local client setup includes a working helm and kubectl installation.

Source Code

You can find the source code on github:

git clone https://github.com/syscrest/pulsar-on-kubernetes
cd pulsar-on-kubernetes

gradle project setup

Each component (producer/consumer) has it’s own subdirectory and must be build seperately.

Besides the mandatory pulsar dependencies:

compile group: 'org.apache.pulsar', name: 'pulsar-client', version:'2.4.0'

there are also some 3rd party library for logging and command line option parsing:

compile group: 'org.slf4j', name: 'slf4j-api', version:'1.7.25'
compile group: 'org.slf4j', name: 'slf4j-simple', version:'1.7.21'
compile group: 'com.beust', name: 'jcommander', version: '1.78'

and it will dockerize the application using Google’s JIB Gradle Plugin:

plugins {
    id 'com.google.cloud.tools.jib' version '1.7.0'
}

...

jib {
to {
    image = 'registry.hub.docker.com/syscrest/blog-post-basic-pulsar-producer'
}
container {
    mainClass = 'com.syscrest.demo.basicpulsarproducer.BasicPulsarProducer'
}

}

Implementation

The application could send raw messages (= byte array) and do the serialization/deserialization within our code, but pulsar has some built-in features to automatically transforms POJOs to JSON and vise versa. Therefore both projects (producer+consumer) need a simple POJO:

Code on github: BasicMessage

public class BasicMessage {

    public BasicMessage() {
    }

    public String body;

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}
Producer

Let’s have a look at the main class of the producer application. As we are going to use helm to make certain things configurable during deployment, we can choose between environment variables or command line args to pass it to our application. We went for args and used jcommander for option parsing and validation:

Code on github: BasicPulsarProducer

public class BasicPulsarProducer {

    static Logger logger = LoggerFactory.getLogger(BasicPulsarProducer.class);

    @Parameter(names = { "-t", "--topic" }, description = "topic-name")
    private String topic;

    @Parameter(names = { "--brokerServiceURL" }, description = "brokeradress")
    private String brokerServiceURL = "pulsar://localhost:6650";

    @Parameter(names = { "--waitMs" }, description = "wait time in milliseconds between messages")
    private long waitMs = 1000l;


    public static void main(String[] argscli) {

        BasicPulsarProducer main = new BasicPulsarProducer();

        JCommander.newBuilder().addObject(main).build().parse(argscli);
        main.run();
    }

The actual application logic (run()) initializes a new client for the given brokerServiceURL and then uses JSONSchema.of (<our POJO>) to create a typed producer for the given topic. Afterwards the application keeps sending messages until it is killed.

    private void run() {
        try {

            PulsarClient client = PulsarClient.builder()
                                        .serviceUrl(this.brokerServiceURL)
                                        .statsInterval(1, TimeUnit.SECONDS)
                                        .build();

            Producer<BasicMessage> producer = client
                                                .newProducer(JSONSchema.of(BasicMessage.class))
                                                .topic(this.topic)
                                                .create();
            long i = 0;
            while (true) {
                i++;
                logger.info("sending message # " + i);
                producer.newMessage().key("" + i).value(new BasicMessage("this is message # " + i)).send();
                Thread.sleep(this.waitMs);
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
}
Consumer

The consumer also has configuration options, this time we do not need to define a wait period (it will pull messages as they are available) but we need to add a configurable subscription label.

Code on github: BasicPulsarConsumer

public class BasicPulsarConsumer {

    static Logger logger = LoggerFactory.getLogger(BasicPulsarConsumer.class);

    @Parameter(names = "--topic", description = "topic-name")
    private String topic;

    @Parameter(names = "--subscription", description = "subscription name")
    private String subscription;

    @Parameter(names = { "--brokerServiceURL" }, description = "broker address")
    private String brokerServiceURL = "pulsar://localhost:6650";

    public static void main(String[] argscli) {

        BasicPulsarConsumer main = new BasicPulsarConsumer();

        JCommander.newBuilder().addObject(main).build().parse(argscli);
        main.run();
    }

Similar to the producer’s application logic it also initializes a new client for the given brokerServiceURL and then uses JSONSchema.of (<our POJO>) to create a typed consumer for the given topic. Afterwards the application keeps polling messages until it is killed. Note that consumer.subscribe() will block until at least one new message is available.

    private void run() {
        try {

            PulsarClient client = PulsarClient.builder().serviceUrl(this.brokerServiceURL)
                    .statsInterval(1, TimeUnit.SECONDS).build();

            Consumer<BasicMessage> consumer = client.newConsumer(JSONSchema.of(BasicMessage.class)).topic(this.topic)
                    .subscriptionName(this.subscription).subscribe();

            while (true) {
                Message<BasicMessage> msg = consumer.receive();
                logger.info("recieved message = " + msg.getValue());
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }

    }
}

Build the project

You can package each application, create the docker image and upload it to docker hub with a single gradlew command (It requires a docker hub account, please replace the placeholders accordingly).

Note: you can skip this step and use our docker images if you just want to run the original code without any own modifications.

which are the defaults in the helm chart.

Build the producer docker image
# how to build the producer
cd basic-pulsar-producer
./gradlew jib \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/blog-post-basic-pulsar-producer
cd ..
build the consumer docker image
cd basic-pulsar-consumer
./gradlew jib \
-Djib.to.auth.username=yourdockerhubaccount \
-Djib.to.auth.password=youdockerhubpassword \
-Dimage=registry.hub.docker.com/yourdockerhubaccount/blog-post-basic-pulsar-consumer
cd ..
deploy the producer

Use the helm chart located in basic-pulsar-producer/src/main/helm. Its default values will pull the prebuilt images from https://hub.docker.com/u/syscrest and use pulsar://pulsar-broker:5560 as the broker URL which is aligned to the setup created here:

helm install ./basic-pulsar-producer/src/main/helm --name basic-pulsar-producer --namespace pulsar-demo

This will use the helm chart’s default topic name my-basic-messages-in-json, if you want to override the name append --set producer.topic=your-topic-name.

Your output should look like this:

NAME:   basic-pulsar-producer
...
RESOURCES:
==> v1/Deployment
NAME                   READY  UP-TO-DATE  AVAILABLE  AGE
basic-pulsar-producer  0/1    1           0          0s

==> v1/Pod(related)
NAME                                    READY  STATUS             RESTARTS  AGE
basic-pulsar-producer-565584d577-flk65  0/1    ContainerCreating  0         0s

take a look at the producer pod log (use the pod name from your helm output):

kubectl -n pulsar-demo logs basic-pulsar-producer-565584d577-flk65

output:

...
[main] INFO com.syscrest.demo.basicpulsarproducer.BasicPulsarProducer - sending message # 1
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [my-basic-messages-in-json] [pulsar-8-6] Pending messages: 1 --- Publish throughput: 1.00 msg/s --- 0.00 Mbit/s --- Latency: med: 0.000 ms - 95pct: 0.0 │00 ms - 99pct: 0.000 ms - 99.9pct: 0.000 ms - max: -? ms --- Ack received rate: 0.00 ack/s --- Failed messages: 0
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [my-basic-messages-in-json] [pulsar-8-6] Pending messages: 0 --- Publish throughput: 0.00 msg/s --- 0.00 Mbit/s --- Latency: med: 312.299 ms - 95pct: 3 │12.299 ms - 99pct: 312.299 ms - 99.9pct: 312.299 ms - max: 312.299 ms --- Ack received rate: 0.99 ack/s --- Failed messages: 0
[main] INFO com.syscrest.demo.basicpulsarproducer.BasicPulsarProducer - sending message # 2
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [my-basic-messages-in-json] [pulsar-8-6] Pending messages: 0 --- Publish throughput: 1.00 msg/s --- 0.00 Mbit/s --- Latency: med: 15.195 ms - 95pct: 15 │.195 ms - 99pct: 15.195 ms - 99.9pct: 15.195 ms - max: 15.195 ms --- Ack received rate: 1.00 ack/s --- Failed messages: 0
[main] INFO com.syscrest.demo.basicpulsarproducer.BasicPulsarProducer - sending message # 3
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [my-basic-messages-in-json] [pulsar-8-6] Pending messages: 0 --- Publish throughput: 1.00 msg/s --- 0.00 Mbit/s --- Latency: med: 12.030 ms - 95pct: 12 │.030 ms - 99pct: 12.030 ms - 99.9pct: 12.030 ms - max: 12.030 ms --- Ack received rate: 1.00 ack/s --- Failed messages: 0                                           
[main] INFO com.syscrest.demo.basicpulsarproducer.BasicPulsarProducer - sending message # 4  
...   

It’s adding messages to the topic my-basic-messages-in-json! Let’s continue with the consumer.

deploy the consumer

The consumer has also its own little helm chart with defaults pointing to the pre-built docker images and pulsar://pulsar-broker:5560 as the broker.

Install:

helm install ./basic-pulsar-consumer/src/main/helm --name basic-pulsar-consumer --namespace pulsar-demo

This will use the helm chart’s default topic name my-basic-messages-in-json, if you want to override the name append --set consumer.topic=your-topic-name. Your output should look like this:

NAME:   basic-pulsar-consumer
...
RESOURCES:
==> v1/Deployment
NAME                   READY  UP-TO-DATE  AVAILABLE  AGE
basic-pulsar-consumer  0/1    1           0          0s

==> v1/Pod(related)
NAME                                    READY  STATUS             RESTARTS  AGE
basic-pulsar-consumer-746dbcdf87-xttqj  0/1    ContainerCreating  0         0s

take a look at the consumer pod log (use the pod name from your helm output):

kubectl -n pulsar-demo logs basic-pulsar-consumer-746dbcdf87-xttqj 

output:

....
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [my-basic-messages-in-json] [basic-pulsar-consumer-subscription-name] [9ecfa] Prefetched messages: 0 --- Consume throughput received: 1.00 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- Failed acks: 0
[main] INFO com.syscrest.demo.basicpulsarconsumer.BasicPulsarConsumer - recieved message = BasicMessage [body=this is message # 156]
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [my-basic-messages-in-json] [basic-pulsar-consumer-subscription-name] [9ecfa] Prefetched messages: 0 --- Consume throughput received: 1.00 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- Failed acks: 0
[main] INFO com.syscrest.demo.basicpulsarconsumer.BasicPulsarConsumer - recieved message = BasicMessage [body=this is message # 157]
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [my-basic-messages-in-json] [basic-pulsar-consumer-subscription-name] [9ecfa] Prefetched messages: 0 --- Consume throughput received: 1.00 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- Failed acks: 0
[main] INFO com.syscrest.demo.basicpulsarconsumer.BasicPulsarConsumer - recieved message = BasicMessage [body=this is message # 158]
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [my-basic-messages-in-json] [basic-pulsar-consumer-subscription-name] [9ecfa] Prefetched messages: 0 --- Consume throughput received: 1.00 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- Failed acks: 0
[main] INFO com.syscrest.demo.basicpulsarconsumer.BasicPulsarConsumer - recieved message = BasicMessage [body=this is message # 159]
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [my-basic-messages-in-json] [basic-pulsar-consumer-subscription-name] [9ecfa] Prefetched messages: 0 --- Consume throughput received: 1.00 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- Failed acks: 0
[main] INFO com.syscrest.demo.basicpulsarconsumer.BasicPulsarConsumer - recieved message = BasicMessage [body=this is message # 160]
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [my-basic-messages-in-json] [basic-pulsar-consumer-subscription-name] [9ecfa] Prefetched messages: 0 --- Consume throughput received: 1.00 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- Failed acks: 0
...

It’s receiving the messages sent by our producer!

topic schema

The way we created the typed producer (JSONSchema.of()) pulsar automatically inferred a schema based on our POJO and assigned it to the topic. You can read more about the internal pulsar schema handling here.

Use pulsar-admin to display the schema information:

pulsar-admin schemas get persistent://public/default/my-basic-messages-in-json

Output:

{
"name": "my-basic-messages-in-json",
"schema": {
    "type": "record",
    "name": "BasicMessage",
    "namespace": "com.syscrest.demo.basicpulsarproducer",
    "fields": [
    {
        "name": "body",
        "type": [
        "null",
        "string"
        ]
    }
    ]
},
"type": "JSON",
"properties": {
    "__alwaysAllowNull": "true"
}
}

Pulsar has a couple of schema compability check strategies to handle evolving data structures. We will come back to this in an upcoming blog post.

undeploy both applications

If you want to remove both application, use helm delete:

helm delete basic-pulsar-consumer

helm delete basic-pulsar-producer