Other producers

Use JR to stream data to external stores

You can use JR to stream data to many different stores, not only to standard output and Kafka. JR supports natively several different producers: you can also easily jr run template | CLI-tool-to-your-store if your preferred store is not natively supported.

Look at this Postgres recipe for example: though Postgres is not natively supported, it’s pretty easy to use JR to produce data for it.

If you think that your preferred store should be supported, why not implement it? Or just open up an issue and we’ll do that for you!

jr producer list

to use an output, just set the corresponding value in --output

Every output needs also a corresponding configuration.

AWS S3 Producer

--output = s3

--s3Config parameter to add a S3 configuration

Configure your AWS credentials properly:

$ export AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>
$ export AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>

Amazon S3 Configuration in config.json

{
  "aws_region": "<aws-region>",
  "bucket": "<s3-bucket-name>"
}

Azure Blob Storage Producer

--output = azblobstorage

--azBlobStorageConfig parameter to add a Azure Blob Storage configuration

Azure Blob Storage Configuration example:

{
    "account_name": "<account name>",
    "primary_account_key":"<primary account key>",
    "container":{
        "name": "<container name>",
        "create": false
    }

}

Azure Cosmos DB producer

--output = azcosmosdb

--azCosmosDBConfig parameter to add a Azure Cosmos DB configuration

{
   "endpoint": "https://<account>.documents.azure.com:443/",
   "primary_account_key": "<security primary access key>",
   "database": "<database name>",
   "container":"<container name>",
   "partition_key": "<name of partition key field>"
}

Cassandra Producer

--output = cassandra

--cassandraConfig parameter to add a Cassandra configuration

{
  "hosts":["host1:port1","host2:port2",...],
  "keyspace": "<keyspacename>",
  "table":"<table_name>",
  "username": "<username>",
  "password": "<password>",
  "timeout": "<timeout>",
  "consistencyLevel": "<consistencyLevel>"
}

Elastic Search Producer

--output = elastic

--elasticConfig parameter to add an Elastic Search configuration

{
  "es_uri": "http://<host>:<port>",
  "index": "<index_name>",
  "username": "<username>",
  "password": "<password>"
}

Google Cloud Storage Producer

--output = gcs

--gcsConfig parameter to add a GCS configuration

Current implementation uses Google Application Default Credentials to authorize and authenticate the client. More information about Application Default Credentials and how to enable is at: https://developers.google.com/identity/protocols/application-default-credentials.

Google GCS Configuration example:

{
  "bucket_name": "<gcs-bucket-name>"
}

HTTP Producer

--output = http

--httpConfig parameter to add a HTTP configuration (default "./httpConfig.json")

Usage:

jr run net_device --output http --httpConfig ./httpconfig.json

where httpconfig.json is something like:

{
    "endpoint": {
        "url": "https://jr.io",
        "method": "POST",
        "timeout": "10s"
    },
    "session":{
        "use_cookie_jar": false
    },
    "error_handling":{
        "expect_status_code": 200,
        "ignore_status_code": false,
    },
    "headers":{
        "header01":"value01",
        "header02":"value02",
    },
    "tls":{
        "insecure_skip_verify": false,
        "cert_file": "/path/to/cert_file",
        "key_file": "/path/to/key_file",
        "root_ca_file": "/path/to/root_ca_file"
    },
    "authentication":{
        "type": "basic",
        "basic":{
            "username": "user",
            "password": "password",
        }
    }
}

MongoDB producer

--output = mongo

--mongoConfig parameter to add a MongoDB/Atlas configuration

MongoDB Configuration:

{
"mongo_uri": "mongodb://<host>:<port>",
"database": "<database>",
"collection": "<collection>",
"username": "<username>",
"password": "<password>"
}

MongoDB Atlas Configuration:

{
  "mongo_uri": "mongodb+srv://<username>:<password>@<cluster-address>/<database-name>?retryWrites=true&w=majority",
  "database": "<database>",
  "collection": "<collection>"
}

Redis Producer

--output = redis

--redisConfig parameter to add a Redis configuration

{
  "host": "<redis_host>",
  "port": "<redis_host_port>",
  "username": "<username>",
  "password": "<password>"
}

AWS Dynamo DB

--output = awsdynamodb

--awsDynamoDBConfig parameter to add a AWS DynamoDB configuration

AWS DynamoDB Configuration:

TODO

LUA Script

An interesting and flexible JR output is LUA scripting. With this producer, you can basically do whatever you want, directly scripting JR with LUA.

This producer uses the GOPHER Lua plugin nad includes by default all the GOPHER LUA libs

To test it:

jr run net_device -o luascript --luascriptConfig ./lua/luaConfig.json

where the luaConfig.json contains

{
      "script_file":"~/examples/http.lua"
}

the http.lua in this example just POST the value on a URL.

local http = require("http")
local client = http.client()
local request = http.request("POST","http://localhost:8085",v)
local result, err = client:do_request(request)
if err then
    error(err)
end
if not (result.code == 200) then
    error("code")
end

WASM

Similar to Lua, you can also script JR with a WASM producer.

To test it:

jr run net_device -o wasm --wasmConfig ./wasm/wasmConfig.json

where the wasmConfig.json contains

{
      "ModulePath":"~/examples/wasm_producer_test_function.wasm"
}

The wasm_producer_test_function.wasm is obtained compiling with tinygo the wasm_producer_test_function.go example: This simple example just prints the value on standard output, but you have maximum flexibility to do whatever you want.

//go:build tinygo.wasm

// tinygo build -o pkg/producers/wasm/wasm_producer_test_function.wasm -target=wasi pkg/producers/wasm/wasm_producer_test_function.go
package main

import (
	"bytes"
	"encoding/json"
	"io"
	"os"
)

const NoError = 0

//export produce
func _produce(size uint32) uint64 {
	b := make([]byte, size)

	_, err := io.ReadAtLeast(os.Stdin, b, int(size))
	if err != nil {
		return e(err)
	}

	in := make(map[string][]byte)

	err = json.Unmarshal(b, &in)
	if err != nil {
		return e(err)
	}

	out := bytes.ToUpper(in["v"])

	_, err = os.Stdout.Write(out)
	if err != nil {
		return e(err)
	}

	return NoError
}

func e(err error) uint64 {
	if err == nil {
		return NoError
	}

	_, _ = os.Stderr.WriteString(err.Error())
	return uint64(len(err.Error()))
}

func main() {}

Implementing other Producers

To add another store producer, you have to implement this interface

type Producer interface {
	Produce(k []byte, v []byte, o any)
	io.Closer
}