LogoLogo
1.3
1.3
  • Overview
  • Social Registry
    • Features
      • Individuals and Groups
        • 📔User Guides
          • 📔Create an Individual Registrant
          • 📔Create a Group and Add Individual Registrants to the Group
          • 📔Import CSV file to Social Registry
      • Deduplication
        • 📔User Guides
          • 📔Configure ID Deduplication, Deduplicate, and Save Duplicate Groups/Individuals
        • Deduplicator Service
      • Lock and Unlock
      • Enumerator
        • Enumerator ID
      • Dynamic Updates
      • Document Upload
      • ODK Importer
        • 📔User Guide
          • 📔Configure and Import ODK Form
          • 📔Import Specific ODK Forms using ODK Instance ID
      • Registration Portal
        • 📔User Guides
          • 📔Create a New Household
          • 📔Create a New Individual in Registration Portal
          • 📔Create a New Portal User
          • 📔Configure Portal User to Limit Accessing Location
      • Configurations
        • 📔User Guide
          • 📔Configure ID Types
          • 📔Configure Registrant Tags
          • 📔Configure Gender Types
          • 📔Configure Relationships
          • 📔Configure Group Types
          • 📔Configure Group Membership Kind
      • User Management
        • 📔User Guide
          • 📔Create User
          • 📔Assign a Role to a User
      • Geographic
      • Data Share
      • Languages Support
        • 📔User Guides
          • 📔Set Language Preference
      • API
        • Search APIs
        • Individual APIs
        • Group APIs
      • Privacy and Security
      • Interoperability
      • Monitoring and Reporting
      • ID Integration
        • ID Validation and Tokenisation
        • ID Authentication
          • 📔User Guides
            • 📔Configure eSignet Auth Provider for ID Authentication
            • 📔ID Authentication Process
            • 📔eSignet Client Creation
        • Fayda ID Integration
      • Verifiable Credentials Issuance
        • 📔User Guides
          • 📔Configure Inji to download Social Registry VCs
      • Computed fields
      • Record Revision History
      • SPAR Integration for Account Info
      • Self Service Registration Portal
      • Unique Reference ID
      • Logging
        • Audit Logs
        • System Logs
        • Change log
    • Versions
    • Deployment
      • Domain names and Certificates
      • Install Odoo Modules
    • Developer Zone
      • Technology Stack
      • Repositories
      • Developer Install
        • 📘Developer Install of OpenG2P Package on Linux
      • Packaging
        • 📘Docker Packaging Guide
        • 📘Helm Packaging Guide
      • Odoo Modules
        • ODK App User Mapping
  • PBMS
    • Features
      • Program Management
        • Role of a Program Manager
        • Program Life Cycle
        • 📔User Guides
          • 📔Create Program
          • 📔Create Eligibility Manager under Program
          • 📔Create Program Manager for a Program
          • 📔Create Deduplication Manager under Program
          • 📔Create Manager Type
            • 📔Create Payment Manager Types
              • 📔Create Payment Hub EE Payment Manager
              • 📔Create Payment Interoperability Layer Payment Manager
              • 📔Create Default Payment Manager
              • 📔Create Cash Payment Manager
              • 📔Create File Payment Manager
          • 📔Configure Entitlement Manager under Program
          • 📔Configure Payment Manager in Program
          • 📔Configure Default Program Manager
          • 📔Archive, Delete, End, and Re-activate a Program
      • Program Disbursement Cycles
        • 📔User Guides
          • 📔Create Program Fund
          • 📔Create Cycle Manager for a Program
      • Beneficiary Management
        • Beneficiary Registry
          • 📔User Guides
            • 📔Create an Individual Registrant
            • 📔Create a Group and Add Individual Registrants to the Group
            • 📔Assign a Program to a Group
            • 📔Assign a Program to an Individual
        • Beneficiary Registry Configurations
          • 📔User Guides
            • 📔Configure ID Types
            • 📔Configure Registrant Tags
            • 📔Configure Gender Types
            • 📔Configure Relationships
            • 📔Configure Group Types
            • 📔Configure Group Membership Kind
        • Registration
          • 📔User Guides
            • 📔Import CSV File to Registry Module
      • ID Verification
      • Eligibility
        • Proxy Means Test
        • 📔User Guides
          • 📔Create Eligibility Manager Types
            • 📔Configure Default Eligibility Manager
            • 📔Create ID Document Eligibility Manager
            • 📔Create Phone Number Eligibility Manager
          • 📔Configure Proxy Means Test
          • 📔Verify Eligibility of Enrolled Registrants
      • Deduplication
        • 📔User Guides
          • 📔Deduplicate Registrants
          • 📔Create Deduplication Manager Types
            • 📔Configure Default Deduplication Manager
            • 📔Create ID Deduplication Manager
            • 📔Create Phone Number Deduplication
      • Enrolment
        • 📔User Guides
          • 📔Enroll Registrants into Program
          • 📔Auto-Enroll New Registrants into a Program
          • 📔Enroll Eligible Individual Registrants into a Program
      • Entitlement
        • 📔User Guides
          • 📔Multi-Stage Approval
          • 📔Create Entitlement Manager Type
            • 📔Create Default Entitlement Manager
            • 📔Create Voucher Entitlement Manager
            • 📔Configure Cash Entitlement Manager
          • 📔Create Entitlement Voucher Template
          • 📔Configure the Payments File with QR Code
          • 📔Configure Default Cycle Managers
          • 📔Export Beneficiaries Approved Entitlement
      • Disbursement
        • Payment Batches
        • In-Kind Transfer
          • 📔User Guides
            • 📔Create a Product in Inventory
            • 📔Configure In-Kind Entitlement Manager
            • 📔Create and Approve Program Cycle
            • 📔Verify Eligibility of Registrants in a Cycle
        • Digital Cash Transfer
        • e-Voucher
        • 📔User Guides
          • Prepare and Send Payment
      • Self Service Portal
        • 📔User Guides
          • 📔Self Register Online
          • 📔Create Self Service Portal Form
          • 📔Map Self Service Portal Form
      • Document Management
      • Multi-tenancy
      • Notifications
        • 📔User Guides
          • 📔Send Notifications to Individual Registrants
          • 📔Create Notification Manager Types
            • 📔Create SMS Notification Manager
            • 📔Create Email Notification Manager
            • 📔Create Fast2SMS Notification Manager
          • 📔Create Notification Manager under Program
      • Accounting
      • Administration
        • RBAC
          • 📔User Guides
            • 📔Create User and Assign Role
            • 📔Configure Keycloak Authentication Provider for User Log in
        • i18n
      • ODK Importer
        • 📔User Guides
          • 📔Configure and Import ODK Form
          • 📔Import Specific ODK Forms using ODK Instance ID
          • 📔Import Social Registry Data into PBMS
      • MTS Connector
        • 📔User Guides
          • 📔Create MTS Connector
            • 📔Create ODK MTS Connector
            • 📔Create OpenG2P Registry MTS Connector
      • Audit Logs
      • Service Provider Portal
        • 📔User Guides
          • 📔Submit Reimbursement Using the Service Provider Portal
          • 📔Reimburse the Service Provider
      • Interoperability
      • Privacy and Security
      • Periodic Biometric Authentication for Beneficiaries
      • Beneficiary Exit Process
      • Verifiable Credential Issuance
        • 📔User Guides
          • 📔Configure Inji to download Beneficiary VCs
      • Deduplication
      • Manual In-Kind Entitlement
      • Print Disbursement Summary
      • Monitoring & Reporting
        • Logging
    • Versions
    • Developer Zone
      • Odoo Modules
        • G2P Enumerator
        • OpenG2P Registry MTS Connector
        • G2P Documents Store
        • MTS Connector
        • G2P Formio
        • G2P Registry: Rest API Extension Demo
        • G2P Registry: Additional Info REST API
        • G2P Registry: Bank Details Rest API
        • G2P Registry: Additional Info
        • G2P Registry: Membership
        • G2P Registry: Groups
        • G2P Registry: Individual
        • G2P Registry: Base
        • G2P Registry: Rest API
        • G2P Registry: Bank Details
        • OpenG2P Program Payment (Payment Hub EE)
        • OpenG2P Program Payments: In Files
        • G2P Program : Program Registrant Info Rest API
        • OpenG2P Entitlement: Differential
        • OpenG2P Program: Approval
        • OpenG2P Program: Assessment
        • G2P Program: Registrant Info
        • OpenG2P Program Payment: Simple Mpesa Payment Manager
        • OpenG2P Programs: Cycleless
        • OpenG2P Entitlement: In-Kind
        • G2P Notifications: Wiserv SMS Service Provider
        • G2P: Proxy Means Test
        • G2P Programs: REST API
        • G2P Program Payment (Payment Interoperability Layer)
        • OpenG2P Entitlement: Voucher
        • OpenG2P Programs: Reimbursement
        • OpenG2P Program Payment: Cash
        • OpenG2P Program: Documents
        • OpenG2P Program Payment: G2P Connect Payment Manager
        • OpenG2P Programs: Autoenrol
        • G2P ODK Importer
        • G2P Service Provider Beneficiary Management
        • OpenID Connect Authentication
        • G2P Auth: OIDC - Reg ID
        • G2P OpenID VCI: Base
        • G2P OpenID VCI: Programs
        • G2P OpenID VCI: Rest API
      • Developer Install on Linux
      • Repositories
        • openg2p-fastapi-common
          • OpenG2P FastAPI Common
          • OpenG2P FastAPI Auth
          • OpenG2P Common: G2P Connect ID Mapper
        • social-payments-account-registry
        • g2p-bridge
        • openg2p-packaging
        • openg2p-security
        • spar-load-test
        • 4sure
        • G2P SelfServicePortal
      • Technology Stack
      • Testing
        • Test Workflow
        • Automation Framework
    • Deployment
      • i18n
      • Installation of Odoo Modules
      • Domain names and Certificates
      • Helm Charts
  • SPAR
    • Features
      • SPAR Mapper
      • SPAR Self Service
      • Privacy & Security
      • Interoperability
      • Monitoring & Reporting
    • Deployment
      • Domain Names and Certificates
      • Helm Charts
    • 📔User Guides
      • 📔Link FA (Self Service)
      • 📔Link FA (Admin)
    • Development
      • Jira Board
      • Testing
        • Unit Testing
        • Functional Testing
        • Performance Testing
          • Mapper
            • Resolve API
            • Link API
            • Unlink API
            • Update API
      • Developer Install
        • SPAR Mapper API
        • SPAR Self Service API
        • SPAR Self Service UI
      • Repositories
      • API Reference
      • Tech Guides
    • Releases
      • 1.0.0
      • 1.1.0 - WIP
    • Roadmap
  • G2P Bridge
    • Features
      • Privacy & Security
      • Interoperability
      • Monitoring & Reporting
    • Deployment
    • Development
      • Design
        • IN APIs from PBMS
          • create_disbursement_envelope
          • cancel_disbursement_envelope
          • create_disbursements
          • cancel_disbursements
          • get_disbursement_envelope_status
          • get_disbursement_status
        • OUT APIs to Mapper
          • resolve
        • OUT APIs to Bank
          • check_funds_with_bank
          • block_funds_with_bank
          • disburse_funds_from_bank
        • IN APIs from Bank
          • upload_mt940
        • Helper Tables
          • benefit_program_configuration
        • Configuration parameters
        • Interfaces
        • Physical Organization
        • Example Bank
          • example-bank-models
          • example-bank-api
          • example-bank-celery
      • Testing
        • Unit Testing
        • Functional Testing
        • Performance Testing
      • Repositories
    • Tech Guides
    • 📔User Guides
      • 📔Configure G2P Connect Payment Manager
    • Releases
  • Utilities and Tools
    • ODK
      • 📔User Guides
        • 📔Create a Project for a Program
        • 📔Create a Form
        • 📔Upload a Form
        • 📔Upload revised Form
        • 📔Test a Form
        • 📔Publish a Form
        • 📔Provide Form Access to Field Agent
        • 📔Download a Form on ODK Collect
        • 📔Delete a Form
        • 📔Register Offline
    • 4Sure Verifier App
      • Installation Guide for 4Sure Application
      • 📔User Guides
        • 📔Verify Digital Credentials using 4Sure Application
        • 📔Verify and Populate the form in ODK Collect using 4Sure Application
      • 4Sure Test Summary
    • Smartscanner
      • 📔User Guides
    • Registration Tool Kit
  • Monitoring and Reporting
    • Apache Superset
    • Reporting Framework
      • 📔User Guides
        • 📔Connector Creation Guide
        • 📔Dashboards Creation Guide
        • 📔Installation & Troubleshooting
        • Page 1
      • Kafka Connect Transform Reference
    • System Logging
    • System Health
  • Privacy and Security
    • Key Manager
  • Interoperability
  • Integrations
    • eSignet Integration
    • M-Pesa Integration
    • Mojaloop Integration
    • 📔User Guides
  • Deployment
    • Base Infrastructure
      • Wireguard Bastion
        • Install WireGuard Client on Android Device
        • Wireguard Access to Users
        • Install WireGuard Client on Desktop
      • NFS Server
      • Rancher Cluster
      • OpenG2P Cluster
        • Kubernetes
          • Firewall
          • Istio
          • Adding Nodes to Cluster
          • Deleting Nodes from Cluster
        • Prometheus & Grafana
        • Fluentd & OpenSearch
          • DEPRECATED - OpenSearch
      • Load Balancer
        • Nginx
        • AWS
    • Resource Requirements
    • Helm Charts
    • Upgrades
    • Production
    • OpenG2P In a Box
    • Packaging
    • Versioning
    • Additional Guides
      • Automatic Build and Upload of Private Dockers
      • Generate SSL Certificates using Letsencrypt
      • Packaging Odoo based Docker
      • AWS
        • Create ACM Certificate on AWS
        • Create Security Group on AWS
        • Domain mapping on AWS Route53
        • Make Environment Publicly Accessible using AWS LB Configuration
      • Private Access Channel
      • Odoo Post Install Configuration
      • Pulling Docker from Private Repository on Docker Hub
      • Keycloak Client Creation
      • Troubleshooting: "fsnotify watcher" warning
      • Uninstalling Applications from Rancher UI
      • Access a Database from Outside the Cluster
      • Configure External Database to Connect OpenG2P Environment
      • Configure IPSec VPN Gateway to Connect to External Systems using Strongswan
      • Troubleshooting
        • PostgreSQL Database not Starting due to Replication Checkpoint Error
        • No Space Left on the Device Warning
      • Restart Deployment or StatefulSets to Redistribute Pods across Nodes
      • Rerun Jobs in Kubernetes Cluster
      • Finding URLs in the System
      • Transitioning PostgreSQL From Docker on K8s to Standalone PostgreSQL
      • Restore a PVC from an NFS Folder and Attach it to a Pod
      • View System Logs on the OpenSearch Dashboard
    • Persistent Storage
      • Resizing Persistent Volume Claim in Kubernetes Cluster
  • Documentation Guides
    • Documentation Guidelines
      • Embed a Miro diagram
      • Set an Image for a Start View
    • OpenG2P Module Doc Template
  • Use Cases
    • Technology for Inclusion
      • Registration in Low Connectivity Areas
      • Registration using Self Service Portal
    • Digital Cash Transfer Program
    • Create Social Registry
    • Case Studies
      • Immediate Assistance on Demand
      • Service Provider Reimbursement
      • Customise ODK Form - Add Ethiopian Calendar
  • Releases
    • 1.1.0
      • Release Notes
  • License
    • OpenG2P Support Policy
  • Community
    • Contributing
    • Code of Conduct
  • Blogs
    • OpenG2P and SDG Goals
    • OpenG2P - A Building Block for DPI
Powered by GitBook
LogoLogo

Copyright © OpenG2P. This work is licensed under Creative Common Attribution (CC-BY-4.0) International license unless otherwise noted.

On this page
  • Transformations
  • DynamicNewField
  • DynamicNewFieldInsertBack
  • ApplyJq
  • StringToJson
  • TimestampConverterAdv
  • TimestampSelector
  • TriggerDeduplication
  • Source code
  1. Monitoring and Reporting
  2. Reporting Framework

Kafka Connect Transform Reference

PreviousPage 1NextSystem Logging

Last updated 6 months ago

This document is the configuration reference guide for Kafka SMTs developed by OpenG2P, that can be used on .

Following is a list of some of the other transformations available on the OpenSearch Connectors, apart from the ones developed by OpenG2P:

  • .

  • .

Transformations

DynamicNewField

Class name:

  • org.openg2p.reporting.kafka.connect.DynamicNewField$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.DynamicNewField$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to query external data sources to retrieve new fields and add them to the current record, based on the values of some existing fields of this record.

  • Currently, only Elasticsearch-based queries are supported. This means any index on Elasticsearch(or OpenSearch) can be queried and some new fields can be populated based on fields from the current record.

    • Some selected from the current record will be taken. ES will be queried for records where the selected field values match. The top response will be picked. Fields from that response can be added back to the current record.

Configuration:

Field name
Field title
Description
Default Value

query.type

Query Type

This is the type of query made to retrieve new field values.

Supported values:

  • es (Elasticsearch based).

es

input.fields

Input Fields

List of comma-separated fields that will be considered as input fields in the current record.

Nested input fields are supported, like: (where profile is json that contains name and birthdate fields)

output.fields

Output Fields

List of comma-separated fields to be added to this record.

input.default.values

Input Default Values

List of comma-separated values to give in place of the input fields when an input field is empty or null. Length of this has to match that of input.fields.

es.index

ES Index

Elasticsearch(or OpenSearch) index to query for.

es.input.fields

ES Input Fields

List of comma-separated fields, to be queried on the ES index, each of which maps to the fields on input.fields. Length of this has to match that of input.fields.

es.output.fields

ES Output Fields

List of comma-separated fields, to be retrieved from the ES query response document, each of which maps to the fields on output.fields. Length of this has to match that of output.fields.

es.input.query.add.keyword

ES Input Query Add Keyword

Whether or not to add .keyword to the es.input.fields during the term query. Supported values: true / false .

false

es.security.enabled

ES Security Enabled

If this value is given as true, then Security is enabled on ES.

es.url

ES Url

Elasticsearch/OpenSearch base URL.

es.username

ES Username

es.password

ES Password

DynamicNewFieldInsertBack

Class name:

  • org.openg2p.reporting.kafka.connect.DynamicNewFieldInsertBack$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.DynamicNewFieldInsertBack$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to add additional data to documents of different index.

  • If record matches the configured condition, the given data will be updated into the record with given id.

Configuration:

Field name
Field title
Description
Default Value

query.type

Query Type

This is the type of query made to retrieve new field values.

Supported values:

  • es (Elasticsearch based).

es

id.expr

ID Jq Expression

Jq expression to evaluate the ID of the external document into which the data is supposed to be updated.

condition

Condition

Jq expression that evaluates to a boolean value which decides whether or not to update.

value

Value

Jq expression of the value, that evaluates to a JSON, that is to be updated into the external document.

es.index

ES Index

Elasticsearch(or OpenSearch) index to update into.

es.security.enabled

ES Security Enabled

If this value is given as true, then Security is enabled on ES.

es.url

ES Url

Elasticsearch/OpenSearch base URL.

es.username

ES Username

es.password

ES Password

ApplyJq

Class name:

  • org.openg2p.reporting.kafka.connect.ApplyJq$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.ApplyJq$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation applies the given Jq expression on the current record and replace the current record with the result from Jq.

  • This transformation can be used for operations like extracting, merging, removing, and/or renaming fields.

  • For example:

    • "expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}", : The expr field should contain a valid Jq expression.

Configuration:

Field name
Field title
Description
Default value

expr

Expression

Jq expression to be applied.

behavior.on.error

Behaviour on error

What to do when encountering error applying Jq expression. Possible values:

  • halt : Throws exception upon encountering error.

  • ignore : Ignores any errors encountered.

halt

StringToJson

Class name:

  • org.openg2p.reporting.kafka.connect.StringToJson$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.StringToJson$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to convert JSON string, present in a field in the record, to JSON. Example:

    {"profile": "{\"name\":\"Temp\"}"} -> {"profile": {"name": "Temp"}}
  • Currently, this transform only works in schemaless mode. (value.converter.schemas.enable=false).

Configuration

Field name
Field title
Description
Default Value

input.field

Input Field

Input Field that contains JSON string.

TimestampConverterAdv

Class name:

  • org.openg2p.reporting.kafka.connect.TimestampConverterAdv$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.TimestampConverterAdv$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to convert a Timestamp present in a field in the record, to another format. Example:

    {"create_date": 1723667415} -> {"profile": "2024-08-14'T'20:30:50.069'Z'"}
  • Currently, the output can only be in the form of a string.

Configuration

Field name
Field title
Description
Default Value

field

Input Field

Input Field that contains the Timestamp.

input.type

Input Type

Supported values:

  • milli_sec (Input is present as milliseconds since epoch)

  • micro_sec (Input is present as microseconds since epoch. Useful for converting Datetime field of PostgreSQL)

  • days_epoch (Input is present as days since epoch. Useful for converting Date field of PostgreSQL)

milli_sec

output.type

Output Type

Supported values:

  • string (Gives output as string)

string

output.format

Output Format

Format of string output

TimestampSelector

Class name:

  • org.openg2p.reporting.kafka.connect.TimestampSelector$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.TimestampSelector$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to create a new timestamp field, whose value can be selected from other fields, in the order of whichever is not empty first. Example: (when ts.order is profile.write_date,profile.create_date)

    {"profile": {"create_date": 7415, "write_date": null}} -> {"@timestamp_gen": 7415, "profile": {"create_date": 7415, "write_date": null}}
    {"profile": {"create_date": 2945, "write_date": 3442}} -> {"@timestamp_gen": 3442, "profile": {"create_date": 2945, "write_date": 3442}}

Configuration

Field name
Field title
Description
Default Value

ts.order

Timestamp order

List of comma-separated fields to select output from. The output will be selected based on whichever field in the order is not null first. Nested fields are supported.

output.field

Output Field

Name of the output field into which the selected timestamp is put.

TriggerDeduplication

Class name:

  • org.openg2p.reporting.kafka.connect.TriggerDeduplication$Key - Applies transform only to the Key of Kafka Connect Record.

  • org.openg2p.reporting.kafka.connect.TriggerDeduplication$Value - Applies transform only to the Value of Kafka Connect Record.

Description:

  • This transformation can be used to trigger deduplication when there is a change in any one of the configured fields.

  • This transformation is best used before applying any other transformation.

Configuration

Field name
Field title
Description
Default Value

deduplication.base.url

Base URL of Deduplicator Service

dedupe.config.name

Dedupe Config name

Name of config used for deduplication by deduplicator

default

id.expr

ID Jq Expression

Jq expression that evaluates the ID of the document that is to be deduplicated

before.expr

Before Jq Expression

Jq expression that evaluates the before part of the change. (Used to compare fields with the after part of the change).

after.expr

After Jq Expression

Jq expression that evaluates the after part of the change. (Used to compare fields with the before part of the change).

wait.before.exec.secs

Wait before Exec (in secs)

Time to wait (in secs) before starting deduplication. Useful so that the transformations get applied and the record get indexed into OpenSearch

10

Source code

profile.name,profile.birthdate
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
@ts_generated
.payload.after.id
.payload.before
.payload.after
OpenSearch Sink Connectors
Apache Kafka Connect SMTs
Debezium Kafka Connect Transformations
https://github.com/OpenG2P/openg2p-reporting/tree/develop/opensearch-kafka-connector