Kafka Connect Transform Reference

This document is the configuration reference guide for Kafka SMTs developed by OpenG2P, that can be used on OpenSearch Sink Connectors.

Following is a list of some of the other transformations available on the OpenSearch Connectors, apart from the ones developed by OpenG2P:

Transformations

DynamicNewField

Class name:

  • org.openg2p.reporting.kafka.connect.DynamicNewField$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.DynamicNewField$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to query external data sources to retrieve new fields and add them to the current record, based on the values of some existing fields of this record.

  • Currently, only Elasticsearch-based queries are supported. This means any index on Elasticsearch(or OpenSearch) can be queried and some new fields can be populated based on fields from the current record.

    • Some selected from the current record will be taken. ES will be queried for records where the selected field values match. The top response will be picked. Fields from that response can be added back to the current record.

Configuration:

Field nameField titleDescriptionDefault Value

query.type

Query Type

This is the type of query made to retrieve new field values.

Supported values:

  • es (Elasticsearch based).

es

input.fields

Input Fields

List of comma-separated fields that will be considered as input fields in the current record.

Nested input fields are supported, like: (where profile is json that contains name and birthdate fields)

profile.name,profile.birthdate

output.fields

Output Fields

List of comma-separated fields to be added to this record.

input.default.values

Input Default Values

List of comma-separated values to give in place of the input fields when an input field is empty or null. Length of this has to match that of input.fields.

es.index

ES Index

Elasticsearch(or OpenSearch) index to query for.

es.input.fields

ES Input Fields

List of comma-separated fields, to be queried on the ES index, each of which maps to the fields on input.fields. Length of this has to match that of input.fields.

es.output.fields

ES Output Fields

List of comma-separated fields, to be retrieved from the ES query response document, each of which maps to the fields on output.fields. Length of this has to match that of output.fields.

es.input.query.add.keyword

ES Input Query Add Keyword

Whether or not to add .keyword to the es.input.fields during the term query. Supported values: true / false .

false

es.security.enabled

ES Security Enabled

If this value is given as true, then Security is enabled on ES.

es.url

ES Url

Elasticsearch/OpenSearch base URL.

es.username

ES Username

es.password

ES Password

StringToJson

Class name:

  • org.openg2p.reporting.kafka.connect.StringToJson$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.StringToJson$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to convert JSON string, present in a field in the record, to JSON. Example:

    {"profile": "{\"name\":\"Temp\"}"} -> {"profile": {"name": "Temp"}}
  • Currently, this transform only works in schemaless mode. (value.converter.schemas.enable=false).

Configuration

Field nameField titleDescriptionDefault Value

input.field

Input Field

Input Field that contains JSON string.

TimestampConverterAdv

Class name:

  • org.openg2p.reporting.kafka.connect.TimestampConverterAdv$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.TimestampConverterAdv$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to convert a Timestamp present in a field in the record, to another format. Example:

    {"create_date": 1723667415} -> {"profile": "2024-08-14'T'20:30:50.069'Z'"}
  • Currently, the output can only be in the form of a string.

Configuration

Field nameField titleDescriptionDefault Value

field

Input Field

Input Field that contains the Timestamp.

input.type

Input Type

Supported values:

  • milli_sec (Input is present as milliseconds since epoch)

  • micro_sec (Input is present as microseconds since epoch. Useful for converting Datetime field of PostgreSQL)

  • days_epoch (Input is present as days since epoch. Useful for converting Date field of PostgreSQL)

milli_sec

output.type

Output Type

Supported values:

  • string (Gives output as string)

string

output.format

Output Format

Format of string output

yyyy-MM-dd'T'HH:mm:ss.SSS'Z'

TimestampSelector

Class name:

  • org.openg2p.reporting.kafka.connect.TimestampSelector$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.TimestampSelector$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to create a new timestamp field, whose value can be selected from other fields, in the order of whichever is not empty first. Example: (when ts.order is profile.write_date,profile.create_date)

    {"profile": {"create_date": 7415, "write_date": null}} -> {"@timestamp_gen": 7415, "profile": {"create_date": 7415, "write_date": null}}
    {"profile": {"create_date": 2945, "write_date": 3442}} -> {"@timestamp_gen": 3442, "profile": {"create_date": 2945, "write_date": 3442}}

Configuration

Field nameField titleDescriptionDefault Value

ts.order

Timestamp order

List of comma-separated fields to select output from. The output will be selected based on whichever field in the order is not null first. Nested fields are supported.

output.field

Output Field

Name of the output field into which the selected timestamp is put.

@ts_generated

Source code

https://github.com/OpenG2P/openg2p-reporting/tree/develop/opensearch-kafka-connector

Last updated

Logo

Copyright © 2024 OpenG2P. This work is licensed under Creative Commons Attribution International LicenseCC-BY-4.0 unless otherwise noted.