Kafka Connect Source Connector

Kafka Connect Source Connector

JR provides a Source Connector for Apache Kafka Connect. This is useful if you are migrating from Kafka Connect Datagen

JR Source Connector can be configured with:

  • template: A valid JR existing template name. For a list of available templates see: https://jrnd.io/docs/#listing-existing-templates
  • topic: target topic
  • frequency: Repeat the creation of a random object every X milliseconds.
  • objects: Number of objects to create at every run. Default is 1.
  • key_field_name: Name for key field, for example ‘ID’. This is an OPTIONAL config, if not set, objects will be created without a key. Value for key will be calculated using JR function key, https://jrnd.io/docs/functions/#key
  • key_value_interval_max: Maximum interval value for key value, for example 150 (0 to key_value_interval_max). Default is 100.
  • jr_executable_path: Location for JR executable on workers. If not set, jr executable will be searched using $PATH variable.
  • value.converter: one between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter
  • value.converter.schema.registry.url: Only if value.converter is set to io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter. URL for Confluent Schema Registry.

At the moment for keys the supported format is String. For values there is also support for Confluent Schema Registry with Avro, Json and Protobuf schemas.

Following example is for a JR connector job using template net_device and producing 5 new random messages to net_device topic every 5 seconds.

{
    "name" : "jr-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "net_device",
        "topic": "net_device",
        "frequency" : 5000,
        "objects": 5,
        "tasks.max": 1
    }
}
kafka-console-consumer --bootstrap-server localhost:9092 --topic net_device --from-beginning --property print.key=true
null	{"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.98.6","IPV4_DST_ADDR": "10.1.185.254","IN_BYTES": 1756,"FIRST_SWITCHED": 1724287965,"LAST_SWITCHED": 1725353374,"L4_SRC_PORT": 80,"L4_DST_PORT": 443,"TCP_FLAGS": 0,"PROTOCOL": 3,"SRC_TOS": 190,"SRC_AS": 1,"DST_AS": 1,"L7_PROTO": 81,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Transport"}
null	{"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.95.4","IPV4_DST_ADDR": "10.1.239.68","IN_BYTES": 1592,"FIRST_SWITCHED": 1722620372,"LAST_SWITCHED": 1724586369,"L4_SRC_PORT": 443,"L4_DST_PORT": 22,"TCP_FLAGS": 0,"PROTOCOL": 0,"SRC_TOS": 165,"SRC_AS": 3,"DST_AS": 1,"L7_PROTO": 443,"L7_PROTO_NAME": "HTTP","L7_PROTO_CATEGORY": "Transport"}
null	{"VLAN": "DELTA","IPV4_SRC_ADDR": "10.1.126.149","IPV4_DST_ADDR": "10.1.219.156","IN_BYTES": 1767,"FIRST_SWITCHED": 1721931269,"LAST_SWITCHED": 1724976862,"L4_SRC_PORT": 631,"L4_DST_PORT": 80,"TCP_FLAGS": 0,"PROTOCOL": 1,"SRC_TOS": 139,"SRC_AS": 0,"DST_AS": 1,"L7_PROTO": 22,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Application"}

Usage of keys

A JR connector job for template users will be instantiated and produce 5 new random messages to users topic every 5 seconds, using a message key field named USERID set with a random integer value between 0 and 150.

{
    "name" : "jr-keys-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "users",
        "topic": "users",
        "frequency" : 5000,
        "objects": 5,
        "key_field_name": "USERID",
        "key_value_interval_max": 150,
        "jr_executable_path": "/usr/bin",
        "tasks.max": 1
    }
}
kafka-console-consumer --bootstrap-server localhost:9092 --topic users --from-beginning --property print.key=true

{"USERID":40}	{    "registertime": 1490191925954,    "USERID":40,    "regionid": "Region_1",    "gender": "MALE"}
{"USERID":53}	{    "registertime": 1490996658353,    "USERID":53,    "regionid": "Region_8",    "gender": "FEMALE"}
{"USERID":61}	{    "registertime": 1491758270753,    "USERID":61,    "regionid": "Region_8",    "gender": "FEMALE"}
{"USERID":86}	{    "registertime": 1515055706490,    "USERID":86,    "regionid": "Region_6",    "gender": "MALE"}
{"USERID":71}	{    "registertime": 1491441559667,    "USERID":71,    "regionid": "Region_6",    "gender": "OTHER"}

Avro objects

A JR connector job for template store will be instantiated and produce 5 new random messages to store topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema.

{
    "name" : "jr-avro-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "store",
        "topic": "store",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic store --from-beginning --property schema.registry.url=http://localhost:8081

{"store_id":1,"city":"Minneapolis","state":"AR"}
{"store_id":2,"city":"Baltimore","state":"LA"}
{"store_id":3,"city":"Chicago","state":"IL"}
{"store_id":4,"city":"Chicago","state":"MN"}
{"store_id":5,"city":"Washington","state":"OH"}

Show the Avro schema registered:

curl -v http://localhost:8081/subjects/store-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"record","name":"storeRecord","fields":[{"name":"store_id","type":"int"},{"name":"city","type":"string"},{"name":"state","type":"string"}],"connect.name":"storeRecord"}

Json schema objects

A JR connector job for template payment_credit_card will be instantiated and produce 5 new random messages to payment_credit_card topic every 5 seconds, using the Confluent Schema Registry to register the Json schema.

{
    "name" : "jr-jsonschema-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "payment_credit_card",
        "topic": "payment_credit_card",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}
kafka-json-schema-console-consumer --bootstrap-server localhost:9092 --topic payment_credit_card --from-beginning --property schema.registry.url=http://localhost:8081

{"cvv":"070","card_number":"4086489674117803","expiration_date":"10/24","card_id":1.0}
{"cvv":"505","card_number":"346185299753204","expiration_date":"09/27","card_id":2.0}
{"cvv":"690","card_number":"47606709930001","expiration_date":"12/24","card_id":3.0}
{"cvv":"706","card_number":"4936815806226074","expiration_date":"08/24","card_id":4.0}
{"cvv":"855","card_number":"4782025916077384","expiration_date":"09/22","card_id":5.0}

Show the Json schema registered:

curl -v http://localhost:8081/subjects/payment_credit_card-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"object","properties":{"cvv":{"type":"string","connect.index":2},"card_number":{"type":"string","connect.index":1},"expiration_date":{"type":"string","connect.index":3},"card_id":{"type":"number","connect.index":0,"connect.type":"float64"}}}

Protobuf objects

A JR connector job for template shopping_rating will be instantiated and produce 5 new random messages to shopping_rating topic every 5 seconds, using the Confluent Schema Registry to register the Protobuf schema.

{
    "name" : "jr-protobuf-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "shopping_rating",
        "topic": "shopping_rating",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}
kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --topic shopping_rating --from-beginning --property schema.registry.url=http://localhost:8081

{"ratingId":1,"userId":0,"stars":2,"routeId":2348,"ratingTime":1,"channel":"iOS-test","message":"thank you for the most friendly,helpful experience today at your new lounge"}
{"ratingId":2,"userId":0,"stars":1,"routeId":6729,"ratingTime":13,"channel":"iOS","message":"why is it so difficult to keep the bathrooms clean ?"}
{"ratingId":3,"userId":0,"stars":3,"routeId":1137,"ratingTime":25,"channel":"ios","message":"Surprisingly good,maybe you are getting your mojo back at long last!"}
{"ratingId":4,"userId":0,"stars":2,"routeId":7306,"ratingTime":37,"channel":"android","message":"worst. flight. ever. #neveragain"}
{"ratingId":5,"userId":0,"stars":3,"routeId":2982,"ratingTime":49,"channel":"android","message":"meh"}

Show the Protobuf schema registered:

curl -v http://localhost:8081/subjects/shopping_rating-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


syntax = "proto3";

message shopping_rating {
  int32 rating_id = 1;
  int32 user_id = 2;
  int32 stars = 3;
  int32 route_id = 4;
  int32 rating_time = 5;
  string channel = 6;
  string message = 7;
}

Additional details are listed in the official repository.