Kafka

Kafka Producer

JR has been built with Apache Kafka in mind since the inception, so the Kafka support is pretty advanced.

Writing data to Apache Kafka

Just use the --output kafka (which defaults to console) flag and --topic flag to indicate the topic name:

jr run net_device -n 5 -f 500ms -o kafka -t test

If you don’t specify a key, ta null key will be used for each record. Using --key you can use a template for the key, to be embedded directly in the command:

For example:

jr run -k '{{key "KEY" 20}}' -f 1s -d 10s net_device -o kafka -t test

Another example:

jr run -k '{{randoms "ONE|TWO|THREE"}}' -f 1s -d 10s net_device -o kafka -t test

Autocreate topics

Topics autocreation is disabled in Confluent Cloud. If you are really lazy you can use the -a option, so JR will create the topic for you.

jr run -a -k '{{randoms "ONE|TWO|THREE"}}' -f 1s -d 10s net_device -o kafka -t mynewtopic

Alternatively, you can also create it explicitly from JR:

jr createTopic mynewtopic

If you want to specify number of partitions and replication Factor you can use the -p and -r flags:

jr createTopic mynewtopic -p 10 -r 2

KCat (was: Kafkacat)

Using JR to pipe data to kcat

If you don’t want to use the JR embedded Kafka support, you can use the wonderful kcat CLI tool together with JR.

JR supports kcat out of the box. Using the --kcat flag the standard output will be formatted with K,V on a single line: --kcat in fact it’s only a shorthand equivalent for --output stdout --outputTemplate '{{.K}},{{.V}}' --oneline

jr run -k '{{randoms "ONE|TWO|THREE"}}' -f 1s -d 5s net_device --kcat | kcat -F kafka/config.properties -K , -P -t test

Confluent Cloud

First thing to do is to create a Kafka cluster and relative kafka.properties file. The easiest way to do that is to use Confluent Cloud.

Here we document three different ways of doing that. Choose the one that fits you better!

Confluent Cloud and downloading the config file

Just create a basic (free!) Cluster with the web console in Confluent Cloud and copy-paste the configuration in the HOME > ENVIRONMENTS > YOUR ENVIRONMENT > YOUR CLUSTER > CLIENTS > New Client section.

Confluent Cloud and config file via Confluent CLI

You can use the confluent CLI to create a cluster and the configuration in a programmatic way:

Config your vars as you see fit, for example:

export CONFLUENT_CLUSTER_NAME=jr-test
export CONFLUENT_CLUSTER_CLOUD_PROVIDER=aws
export CONFLUENT_CLUSTER_REGION=eu-west-1 

Then execute the following commands


confluent login --save

OUTPUT=$(confluent kafka cluster create "$CONFLUENT_CLUSTER_NAME" --cloud $CONFLUENT_CLUSTER_CLOUD_PROVIDER --region $CONFLUENT_CLUSTER_REGION --output json 2>&1)
(($? != 0)) && { echo "$OUTPUT"; exit 1; }
CONFLUENT_CLUSTER_ID=$(echo "$OUTPUT" | jq -r .id)
confluent kafka cluster use $CLUSTER 2>/dev/null
echo "Cluster $CONFLUENT_CLUSTER_NAME created, Id: $CONFLUENT_CLUSTER_ID"

confluent api-key create --resource $CONFLUENT_CLUSTER_ID

OUTPUT=$(confluent api-key create --resource $CONFLUENT_CLUSTER_ID -o json)
CONFLUENT_CLUSTER_API_KEY=$(echo "$OUTPUT" | jq -r ".api_key")
CONFLUENT_CLUSTER_API_SECRET=$(echo "$OUTPUT" | jq -r ".api_secret")

echo "API KEY:SECRET  -> $CONFLUENT_CLUSTER_API_KEY:$CONFLUENT_CLUSTER_API_SECRET"

confluent kafka topic create test --cluster $CONFLUENT_CLUSTER_ID

confluent kafka client-config create go --cluster $CONFLUENT_CLUSTER_ID --api-key $CONFLUENT_CLUSTER_API_KEY --api-secret $CONFLUENT_CLUSTER_API_SECRET 1> kafka/config.properties 2>&1

An existing Kafka cluster & manually creating config file

If you have an existing cluster, just fill the fields in the provided kafka/config.properties.example

# Kafka configuration
# https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=
sasl.password=
compression.type=gzip
compression.level=9
# statistics.interval.ms=1000

Confluent Schema Registry support

There is also support for Confluent Schema Registry. At the moment json-schema and avro are supported.

To use Confluent Schema registry you need first to fill the registry.properties provided example with the needed link and user/pwd:

schemaRegistryURL=https://blabla.europe-west3.gcp.confluent.cloud
schemaRegistryUser=blablabla-saslkey
schemaRegistryPassword=blablabla-saslpwd

then use the --schema and the --serializer flags

Example usage:

jr run user -o kafka -t topic1 -s --serializer avro-generic

or

jr run net_device -o kafka -t topic2 -s --serializer json-schema

Remember that once you run these commands, topic1 will be associated with an avro generic schema representing an user object, and topic2 with a json-schema representing a net_device object. You can manage/evolve the schemas directly into Confluent Cloud

Support for Client-Side Field Level Encryption on Confluent Cloud (CSFLE)

JR supports end to end (E2E) encryption for Confluent Cloud.

In order to use the functionality, first follow the official documentation on Confluent Cloud for prerequisites and configuration: https://docs.confluent.io/cloud/current/security/encrypt/csfle/overview.html

After setup accordingly with the documentation, you can then enable CSFLE in JR simply filling those properties in registry.properties:

kekName=a name identifying the key encryption key 
kmsType=valid options are "aws-kms" or "azure-kms" or "gcp-kms"
kmsKeyID=id of the kek

In order to use CSFLE, you need an AVRO schema with fields marked as sensitive (PII). The default schemas in JR are located in pks/types folder: at the moment the ones containing PII fields are:

  • payment_credit_card.avsc, field card_number

If you need additional PII fields or additional templates with PII, you should update avsc files in pkg/types folder and then recompile JR.

Dynamic AVRO

System wide templates can natively serialize to AVRO, but user/embedded templates cannot - unless you recompile JR - due to the static nature of the go libraries used. Here is a recipe to dynamically serialize to avro your user/embedded templates without recompiling