Kafka Connect Source Connector

Kafka Connect Source Connector

JR provides a Source Connector for Apache Kafka Connect that is in the same segment of Kafka Connect Datagen for generation of synthetic data.

Configuration

JR Source Connector can be configured with:

ParameterDescriptionDefault
templateA valid JR existing template name. Skipped when _embedded_template is set. For a list of available templates see: https://jrnd.io/docs/#listing-existing-templatesnet_device
embedded_templateLocation of a file or URL, containing a valid custom JR template. This property will take precedence over template. File must exist on Kafka Connect Worker nodes.
topicdestination topic on Kafka
frequencyRepeat the creation of a random object every ‘frequency’ milliseconds.5000
durationSet a time bound to the entire object creation. The duration is calculated starting from the first run and is expressed in milliseconds. At least one run will always been scheduled, regardless of the value for ‘duration’. If not set creation will run forever.-1
objectsNumber of objects to create at every run.1
key_field_nameName for key field, for example ‘ID’. This is an OPTIONAL config, if not set, objects will be created without a key. Skipped when key_embedded_template is set. Value for key will be calculated using JR function key, https://jrnd.io/docs/functions/#key
key_value_interval_maxMaximum interval value for key value, for example 150 (0 to key_value_interval_max). Skipped when key_embedded_template is set.100
key_embedded_templateLocation of a file or URL, containing a valid custom JR template for keys. This property will take precedence over key_field_name and key_value_interval_max. File must exist on Kafka Connect Worker nodes.
jr_executable_pathLocation for JR executable on workers. If not set, jr executable will be searched using $PATH variable.
value.converterone between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverterorg.apache.kafka.connect.storage.StringConverter
value.converter.schema.registry.urlOnly if value.converter is set to io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter. URL for Schema Registry.
key.converterone between org.apache.kafka.connect.storage.StringConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverterorg.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.urlOnly if key.converter is set to io.confluent.connect.avro.AvroConverter, io.confluent.connect.json.JsonSchemaConverter or io.confluent.connect.protobuf.ProtobufConverter. URL for Schema Registry.

Following example is for a JR connector job using template net_device and producing 5 new random messages to net_device topic every 5 seconds.

{
    "name" : "jr-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "net_device",
        "topic": "net_device",
        "frequency" : 5000,
        "objects": 5,
        "tasks.max": 1
    }
}
kafka-console-consumer --bootstrap-server localhost:9092 --topic net_device --from-beginning --property print.key=true
null	{"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.98.6","IPV4_DST_ADDR": "10.1.185.254","IN_BYTES": 1756,"FIRST_SWITCHED": 1724287965,"LAST_SWITCHED": 1725353374,"L4_SRC_PORT": 80,"L4_DST_PORT": 443,"TCP_FLAGS": 0,"PROTOCOL": 3,"SRC_TOS": 190,"SRC_AS": 1,"DST_AS": 1,"L7_PROTO": 81,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Transport"}
null	{"VLAN": "BETA","IPV4_SRC_ADDR": "10.1.95.4","IPV4_DST_ADDR": "10.1.239.68","IN_BYTES": 1592,"FIRST_SWITCHED": 1722620372,"LAST_SWITCHED": 1724586369,"L4_SRC_PORT": 443,"L4_DST_PORT": 22,"TCP_FLAGS": 0,"PROTOCOL": 0,"SRC_TOS": 165,"SRC_AS": 3,"DST_AS": 1,"L7_PROTO": 443,"L7_PROTO_NAME": "HTTP","L7_PROTO_CATEGORY": "Transport"}
null	{"VLAN": "DELTA","IPV4_SRC_ADDR": "10.1.126.149","IPV4_DST_ADDR": "10.1.219.156","IN_BYTES": 1767,"FIRST_SWITCHED": 1721931269,"LAST_SWITCHED": 1724976862,"L4_SRC_PORT": 631,"L4_DST_PORT": 80,"TCP_FLAGS": 0,"PROTOCOL": 1,"SRC_TOS": 139,"SRC_AS": 0,"DST_AS": 1,"L7_PROTO": 22,"L7_PROTO_NAME": "TCP","L7_PROTO_CATEGORY": "Application"}

Usage of keys

Connector can be configured to create messages having keys.

In this example a JR connector job for template user will be instantiated and produce 5 new random messages to user topic every 5 seconds, using a message key field named ‘guid’ set with a random integer value between 0 and 150.

{
    "name" : "jr-keys-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "user",
        "topic": "user",
        "frequency" : 5000,
        "objects": 5,
        "key_field_name": "guid",
        "key_value_interval_max": 150,
        "jr_executable_path": "/usr/bin",
        "tasks.max": 1
    }
}

Consume from user topic:

kafka-console-consumer --bootstrap-server localhost:9092 --topic user --from-beginning --property print.key=true

{"guid":131}	{  "guid":"131",  "isActive": false,  "balance": "€328.52",  "picture": "http://placehold.it/32x32",  "age": 21,  "eyeColor": "brown",  "name": "Megan Peterson",  "gender": "F",  "company": "Evil Partners",  "work_email": "megan.peterson@evilpartners.com",  "email": "megan.peterson@gmail.com",  "about": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.Fusce elit magna, lobortis nec semper non, aliquam at nisl. Vestibulum elementum suscipit",  "country": "US",  "address": "Tucson, South Street 54, 05602",  "phone_number": "928 59979355",  "mobile": "7109146",  "latitude": 17.1992,  "longitude": -56.3007}
{"guid":70}	{  "guid":"70",  "isActive": true,  "balance": "€633.72",  "picture": "http://placehold.it/32x32",  "age": 24,  "eyeColor": "blue",  "name": "Doris Sanders",  "gender": "F",  "company": "Angels Investors",  "work_email": "doris.sanders@angelsinvestors.com",  "email": "doris.sanders@email.com",  "about": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.Fusce elit magna, lobortis nec semper non, aliquam at nisl. Vestibulum elementum suscipit",  "country": "US",  "address": "Memphis, Oakwood Avenue 8, 02201",  "phone_number": "502 96273890",  "mobile": "7958446",  "latitude": -45.6278,  "longitude": 124.5713}
{"guid":36}	{  "guid":"36",  "isActive": true,  "balance": "€7783.02",  "picture": "http://placehold.it/32x32",  "age": 40,  "eyeColor": "green",  "name": "Sharon Alvarez",  "gender": "F",  "company": "Initech",  "work_email": "sharon.alvarez@initech.com",  "email": "sharon.alvarez@email.com",  "about": "Lorem ipsum dolor sit amet, laoreet ligula. Curabitur id nisl ut Lorem sit amet justo pulvinar aliquet accumsan sit amet",  "country": "US",  "address": "Columbus, Park Place 05, 32301",  "phone_number": "220 06092006",  "mobile": "1856616",  "latitude": 76.7921,  "longitude": 10.1295}
{"guid":9}	{  "guid":"9",  "isActive": false,  "balance": "€6071.06",  "picture": "http://placehold.it/32x32",  "age": 40,  "eyeColor": "brown",  "name": "Michael Jones",  "gender": "M",  "company": "Initech",  "work_email": "michael.jones@initech.com",  "email": "michael.jones@aol.com",  "about": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.Fusce elit magna, lobortis nec semper non, aliquam at nisl. Vestibulum elementum suscipit",  "country": "US",  "address": "Kansas City, South Street 3, 40601",  "phone_number": "689 17290457",  "mobile": "8620336",  "latitude": -61.7961,  "longitude": -167.5185}
{"guid":43}	{  "guid":"43",  "isActive": false,  "balance": "€8298.22",  "picture": "http://placehold.it/32x32",  "age": 38,  "eyeColor": "blue",  "name": "Denise Parker",  "gender": "F",  "company": "Veement Capital Partners",  "work_email": "denise.parker@veementcapitalpartners.com",  "email": "denise.parker@yahoo.com",  "about": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. In ullamcorper non eros eget porta. Aliquam erat volutpat. Mauris molestie lobortis",  "country": "US",  "address": "Charlotte, Queen Street 6, 95814",  "phone_number": "980 95836260",  "mobile": "8203291",  "latitude": 2.5160,  "longitude": 63.2610}

Usage of duration

Connector can be configured to run for a duration of time.

In this example a JR connector job for template marketing_campaign_finance will be instantiated and produce 5 new random messages to users topic every 10 seconds for a total duration of 30 seconds.

{
    "name" : "jr-duration-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "marketing_campaign_finance",
        "topic": "marketing_campaign_finance",
        "frequency" : 10000,
        "duration" : 30000,
        "objects": 5,
        "tasks.max": 1
    }
}

Consume from marketing_campaign_finance topic:

kafka-console-consumer --bootstrap-server localhost:9092 --topic marketing_campaign_finance --from-beginning

{  "time": 1610894253695,  "candidate_id": "A3272238",  "party_affiliation": "DEM",  "contribution": 1684}
{  "time": 1614389092497,  "candidate_id": "G8822487",  "party_affiliation": "DEM",  "contribution": 3166}
{  "time": 1600022334958,  "candidate_id": "G5165512",  "party_affiliation": "REP",  "contribution": 2933}
{  "time": 1594525458073,  "candidate_id": "X2839265",  "party_affiliation": "DEM",  "contribution": 824}
{  "time": 1606508742842,  "candidate_id": "T5688428",  "party_affiliation": "IND",  "contribution": 966}
{  "time": 1614055215125,  "candidate_id": "E4299542",  "party_affiliation": "DEM",  "contribution": 1240}
{  "time": 1610035678542,  "candidate_id": "H9769974",  "party_affiliation": "IND",  "contribution": 1793}
{  "time": 1609662702352,  "candidate_id": "S2314618",  "party_affiliation": "DEM",  "contribution": 1531}
{  "time": 1601632523200,  "candidate_id": "A8111647",  "party_affiliation": "IND",  "contribution": 2650}
{  "time": 1612493464065,  "candidate_id": "B1157343",  "party_affiliation": "DEM",  "contribution": 628}
{  "time": 1617678398100,  "candidate_id": "S7362235",  "party_affiliation": "REP",  "contribution": 3405}
{  "time": 1608939902703,  "candidate_id": "N9165865",  "party_affiliation": "REP",  "contribution": 1909}
{  "time": 1599100684111,  "candidate_id": "B2399959",  "party_affiliation": "REP",  "contribution": 1472}
{  "time": 1606312277382,  "candidate_id": "J1118736",  "party_affiliation": "IND",  "contribution": 1156}
{  "time": 1589668105856,  "candidate_id": "Q8211968",  "party_affiliation": "REP",  "contribution": 3457}

Processed a total of 15 messages

Schema Registry: Avro

Connector can be configured to produce objects serialized using Avro.

In this example a JR connector job for template store will be instantiated and produce 5 new random messages to store topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema.

{
    "name" : "jr-avro-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "store",
        "topic": "store",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}

Consume from store topic:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic store --from-beginning --property schema.registry.url=http://localhost:8081

{"store_id":1,"city":"Minneapolis","state":"AR"}
{"store_id":2,"city":"Baltimore","state":"LA"}
{"store_id":3,"city":"Chicago","state":"IL"}
{"store_id":4,"city":"Chicago","state":"MN"}
{"store_id":5,"city":"Washington","state":"OH"}

Show the Avro schema registered:

curl -v http://localhost:8081/subjects/store-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"record","name":"storeRecord","fields":[{"name":"store_id","type":"int"},{"name":"city","type":"string"},{"name":"state","type":"string"}],"connect.name":"storeRecord"}

Schema Registry: Json schema

Connector can be configured to produce objects serialized using JsonSchema.

In this example a JR connector job for template payment_credit_card will be instantiated and produce 5 new random messages to payment_credit_card topic every 5 seconds, using the Confluent Schema Registry to register the Json schema.

{
    "name" : "jr-jsonschema-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "payment_credit_card",
        "topic": "payment_credit_card",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}

Consume from payment_credit_card topic:

kafka-json-schema-console-consumer --bootstrap-server localhost:9092 --topic payment_credit_card --from-beginning --property schema.registry.url=http://localhost:8081

{"cvv":"070","card_number":"4086489674117803","expiration_date":"10/24","card_id":1.0}
{"cvv":"505","card_number":"346185299753204","expiration_date":"09/27","card_id":2.0}
{"cvv":"690","card_number":"47606709930001","expiration_date":"12/24","card_id":3.0}
{"cvv":"706","card_number":"4936815806226074","expiration_date":"08/24","card_id":4.0}
{"cvv":"855","card_number":"4782025916077384","expiration_date":"09/22","card_id":5.0}

Show the Json schema registered:

curl -v http://localhost:8081/subjects/payment_credit_card-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"object","properties":{"cvv":{"type":"string","connect.index":2},"card_number":{"type":"string","connect.index":1},"expiration_date":{"type":"string","connect.index":3},"card_id":{"type":"number","connect.index":0,"connect.type":"float64"}}}

Schema Registry: Protobuf

Connector can be configured to produce objects serialized using Protobuf.

In this example a JR connector job for template shopping_rating will be instantiated and produce 5 new random messages to shopping_rating topic every 5 seconds, using the Confluent Schema Registry to register the Protobuf schema.

{
    "name" : "jr-protobuf-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "template" : "shopping_rating",
        "topic": "shopping_rating",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}

Consume from shopping_rating topic:

kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --topic shopping_rating --from-beginning --property schema.registry.url=http://localhost:8081

{"ratingId":1,"userId":0,"stars":2,"routeId":2348,"ratingTime":1,"channel":"iOS-test","message":"thank you for the most friendly,helpful experience today at your new lounge"}
{"ratingId":2,"userId":0,"stars":1,"routeId":6729,"ratingTime":13,"channel":"iOS","message":"why is it so difficult to keep the bathrooms clean ?"}
{"ratingId":3,"userId":0,"stars":3,"routeId":1137,"ratingTime":25,"channel":"ios","message":"Surprisingly good,maybe you are getting your mojo back at long last!"}
{"ratingId":4,"userId":0,"stars":2,"routeId":7306,"ratingTime":37,"channel":"android","message":"worst. flight. ever. #neveragain"}
{"ratingId":5,"userId":0,"stars":3,"routeId":2982,"ratingTime":49,"channel":"android","message":"meh"}

Show the Protobuf schema registered:

curl -v http://localhost:8081/subjects/shopping_rating-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


syntax = "proto3";

message shopping_rating {
  int32 rating_id = 1;
  int32 user_id = 2;
  int32 stars = 3;
  int32 route_id = 4;
  int32 rating_time = 5;
  string channel = 6;
  string message = 7;
}

Custom templates

Connector can be configured using a custom template for keys and values.

In this example a JR connector job with a custom template for values will be instantiated and produce 5 new random messages to customer topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema.

Template definition is loaded from file /tmp/customer-template.json existing on Kafka Connect Worker nodes.

Definition for customer-template.json:

{
    "customer_id": "{{uuid}}",
    "first_name": "{{name}}",
    "last_name": "{{surname}}",
    "email": "{{email}}",
    "phone_number": "{{phone}}",
    "street_address": "{{city}}, {{street}} {{building 2}}, {{zip}}",
    "state": "{{state}}",
    "zip_code": "{{zip}}",
    "country": "United States",
    "country_code": "US"
}

Connector job:

{
    "name" : "jr-avro-custom-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "embedded_template" : "/tmp/customer-template.json",
        "topic": "customer",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}

Show the Avro schema registered for value:

curl -v http://localhost:8081/subjects/customer-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"record","name":"recordvalueRecord","fields":[{"name":"customer_id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"phone_number","type":"string"},{"name":"street_address","type":"string"},{"name":"state","type":"string"},{"name":"zip_code","type":"string"},{"name":"country","type":"string"},{"name":"country_code","type":"string"}],"connect.name":"recordvalueRecord"}

Consume from customer topic:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic customer --from-beginning --property schema.registry.url=http://localhost:8081

{"customer_id":"6775933f-89c2-43b0-9eaf-e52e5f23293c","first_name":"Cynthia","last_name":"Foster","email":"cynthia.foster@hotmail.com","phone_number":"623 27678252","street_address":"Louisville, Cedar Lane 99, 21401","state":"Massachusetts","zip_code":"21401","country":"United States","country_code":"US"}
{"customer_id":"a15f891e-a3e7-4720-bf59-28202596c667","first_name":"Zachary","last_name":"Harris","email":"zachary.harris@aol.com","phone_number":"747 95821702","street_address":"Austin, River Road 8, 99801","state":"Illinois","zip_code":"99801","country":"United States","country_code":"US"}
{"customer_id":"8906111f-d6d3-4115-bd1a-3e231e3caaa2","first_name":"Julie","last_name":"Long","email":"julie.long@email.com","phone_number":"718 08720661","street_address":"Raleigh, Peachtree Street 43, 58501","state":"Georgia","zip_code":"58501","country":"United States","country_code":"US"}
{"customer_id":"9864ef53-eadf-4012-9cd0-c79e755169df","first_name":"Bryan","last_name":"Wilson","email":"bryan.wilson@mac.com","phone_number":"984 61669636","street_address":"San Antonio, Juniper Drive 23, 17101","state":"Illinois","zip_code":"17101","country":"United States","country_code":"US"}
{"customer_id":"a57911e5-dc9e-4da4-b280-1c0b0143538e","first_name":"Charles","last_name":"Thompson","email":"charles.thompson@gmail.com","phone_number":"726 39040449","street_address":"Richmond, Hillcrest Road 6, 43215","state":"Indiana","zip_code":"43215","country":"United States","country_code":"US"}

In this second example a JR connector job with a custom template for values will be instantiated and produce 5 new random messages to customer topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema.

Template definition is loaded from URL http://web/job-template.json .

Definition for job-template.json:

{
  "job_title": "{{randoms "Software Engineer|Data Scientist|DevOps Engineer|Product Manager|UI/UX Designer"}}",
  "job_department": "{{randoms "Engineering|Data Science|Operations|Product|Design"}}",
  "job_location": "{{randoms "New York|San Francisco|Remote|London|Berlin"}}",
  "job_description": "{{randoms "Join our innovative team to build cutting-edge software solutions|Lead projects in developing next-gen data products|Help design and implement cloud infrastructure for our services|Collaborate with cross-functional teams to enhance our product line|Design user-centered interfaces for our web and mobile applications"}}",
  "required_skills": [
    "{{randoms "Python|Java|JavaScript|AWS|Docker"}}",
    "{{randoms "Agile methodologies|SQL|React|Node.js|Machine Learning"}}",
    "{{randoms "Kubernetes|Figma|Project Management|Data Visualization|Microservices Architecture"}}"],
  "salary": "{{randoms "$80,000 - $100,000|$100,000 - $120,000|$120,000 - $150,000"}}",
  "experience_level": "{{randoms "Entry Level|Mid Level|Senior Level"}}"
}

Connector job:

{
    "name" : "jr-avro-job-template-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "embedded_template" : "http://web/job-template.json",
        "topic": "jobs",
        "frequency" : 5000,
        "objects": 5,
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}

Show the Avro schema registered for value:

curl -v http://localhost:8081/subjects/jobs-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"record","name":"recordvalueRecord","fields":[{"name":"job_title","type":"string"},{"name":"job_department","type":"string"},{"name":"job_location","type":"string"},{"name":"job_description","type":"string"},{"name":"required_skills","type":{"type":"array","items":"string"}},{"name":"salary","type":"string"},{"name":"experience_level","type":"string"}],"connect.name":"recordvalueRecord"}

Consume from jobs topic:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic jobs --from-beginning --property schema.registry.url=http://localhost:8081

{"job_title":"UI/UX Designer","job_department":"Data Science","job_location":"Remote","job_description":"Collaborate with cross-functional teams to enhance our product line","required_skills":["JavaScript","Node.js","Figma"],"salary":"$120,000 - $150,000","experience_level":"Senior Level"}
{"job_title":"DevOps Engineer","job_department":"Design","job_location":"New York","job_description":"Help design and implement cloud infrastructure for our services","required_skills":["JavaScript","Machine Learning","Figma"],"salary":"$120,000 - $150,000","experience_level":"Senior Level"}
{"job_title":"DevOps Engineer","job_department":"Product","job_location":"London","job_description":"Lead projects in developing next-gen data products","required_skills":["JavaScript","Node.js","Microservices Architecture"],"salary":"$100,000 - $120,000","experience_level":"Mid Level"}
{"job_title":"Product Manager","job_department":"Engineering","job_location":"Remote","job_description":"Collaborate with cross-functional teams to enhance our product line","required_skills":["Java","Machine Learning","Project Management"],"salary":"$80,000 - $100,000","experience_level":"Mid Level"}
{"job_title":"UI/UX Designer","job_department":"Product","job_location":"London","job_description":"Design user-centered interfaces for our web and mobile applications","required_skills":["JavaScript","Machine Learning","Data Visualization"],"salary":"$80,000 - $100,000","experience_level":"Entry Level"}

Custom templates for keys

In this example a JR connector job using a custom template for values will be instantiated and produce 5 new random messages to customer_full topic every 5 seconds, using the Confluent Schema Registry to register the Avro schema. Message keys are also created using a custom template, using the Confluent Schema Registry to register the Avro schema.

Template definition is loaded from file /tmp/customer-template.json existing on Kafka Connect Worker nodes.

Key Template definition is loaded from file /tmp/key-customer-template.json existing on Kafka Connect Worker nodes.

Definition for customer-template.json:

{
    "customer_id": "{{uuid}}",
    "first_name": "{{name}}",
    "last_name": "{{surname}}",
    "email": "{{email}}",
    "phone_number": "{{phone}}",
    "street_address": "{{city}}, {{street}} {{building 2}}, {{zip}}",
    "state": "{{state}}",
    "zip_code": "{{zip}}",
    "country": "United States",
    "country_code": "US"
}

Definition for key-customer-template.json:

{
  "customer_id": "{{uuid}}",
  "last_name": "{{surname}}"
}

Connector job:

{
    "name" : "jr-avro-custom-full-quickstart",
    "config": {
        "connector.class" : "io.jrnd.kafka.connect.connector.JRSourceConnector",
        "embedded_template" : "/tmp/customer-template.json",
        "key_embedded_template" : "/tmp/key-customer-template.json",
        "topic": "customer_full",
        "frequency" : 5000,
        "objects": 5,
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "tasks.max": 1
    }
}

Show the Avro schema registered for value:

curl -v http://localhost:8081/subjects/customer_full-value/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"record","name":"recordvalueRecord","fields":[{"name":"customer_id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"phone_number","type":"string"},{"name":"street_address","type":"string"},{"name":"state","type":"string"},{"name":"zip_code","type":"string"},{"name":"country","type":"string"},{"name":"country_code","type":"string"}],"connect.name":"recordvalueRecord"}

Show the Avro schema registered for key:

curl -v http://localhost:8081/subjects/customer_full-key/versions/1/schema
< HTTP/1.1 200 OK
< Content-Type: application/vnd.schemaregistry.v1+json


{"type":"record","name":"recordkeyRecord","fields":[{"name":"customer_id","type":"string"},{"name":"last_name","type":"string"}],"connect.name":"recordkeyRecord"}

Consume from customer_full topic:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic customer_full --from-beginning --property schema.registry.url=http://localhost:8081 --property print.key=true

{"customer_id":"e3beafc6-4916-4da4-a624-a8b80f689f51","last_name":"Gonzalez"}	{"customer_id":"e3beafc6-4916-4da4-a624-a8b80f689f51","first_name":"Linda","last_name":"Gonzalez","email":"linda.james@yahoo.com","phone_number":"414 91888379","street_address":"Orlando, Cypress Street 14, 03301","state":"Maryland","zip_code":"03301","country":"United States","country_code":"US"}
{"customer_id":"351ccdd0-c4da-4361-a0fd-2b88c2c28599","last_name":"Gonzalez"}	{"customer_id":"351ccdd0-c4da-4361-a0fd-2b88c2c28599","first_name":"Samantha","last_name":"Gonzalez","email":"samantha.gutierrez@hotmail.com","phone_number":"405 66116008","street_address":"Tampa, River Street 3, 84111","state":"Maine","zip_code":"84111","country":"United States","country_code":"US"}
{"customer_id":"56b99aef-0764-4cc2-9cc6-82d0f9d0d97a","last_name":"Taylor"}	{"customer_id":"56b99aef-0764-4cc2-9cc6-82d0f9d0d97a","first_name":"Frances","last_name":"Taylor","email":"frances.reed@yahoo.com","phone_number":"813 51891158","street_address":"Washington, Franklin Avenue 3, 23219","state":"Arizona","zip_code":"23219","country":"United States","country_code":"US"}
{"customer_id":"ddea0697-1218-40f0-81e8-3d56e324f5c6","last_name":"Baker"}	{"customer_id":"ddea0697-1218-40f0-81e8-3d56e324f5c6","first_name":"Wayne","last_name":"Baker","email":"wayne.brooks@aol.com","phone_number":"571 29789830","street_address":"Richmond, River Street 04, 43215","state":"Iowa","zip_code":"43215","country":"United States","country_code":"US"}
{"customer_id":"0a0ea230-035e-441f-b969-9c6ad5d6f91b","last_name":"Campbell"}	{"customer_id":"0a0ea230-035e-441f-b969-9c6ad5d6f91b","first_name":"Donald","last_name":"Campbell","email":"donald.carter@aol.com","phone_number":"804 33076187","street_address":"Dallas, Orange Street 43, 30303","state":"Wyoming","zip_code":"30303","country":"United States","country_code":"US"}

Additional info

Additional details are listed in the official repository.

JR Source Connector is available on Confluent Hub: https://www.confluent.io/hub/jrndio/jr-source-connector