LogoLogo
1.3
1.3
  • Overview
  • Social Registry
    • Features
      • Individuals and Groups
        • 📔User Guides
          • 📔Create an Individual Registrant
          • 📔Create a Group and Add Individual Registrants to the Group
          • 📔Import CSV file to Social Registry
      • Deduplication
        • 📔User Guides
          • 📔Configure ID Deduplication, Deduplicate, and Save Duplicate Groups/Individuals
        • Deduplicator Service
      • Lock and Unlock
      • Enumerator
        • Enumerator ID
      • Dynamic Updates
      • Document Upload
      • ODK Importer
        • 📔User Guide
          • 📔Configure and Import ODK Form
          • 📔Import Specific ODK Forms using ODK Instance ID
      • Registration Portal
        • 📔User Guides
          • 📔Create a New Household
          • 📔Create a New Individual in Registration Portal
          • 📔Create a New Portal User
          • 📔Configure Portal User to Limit Accessing Location
      • Configurations
        • 📔User Guide
          • 📔Configure ID Types
          • 📔Configure Registrant Tags
          • 📔Configure Gender Types
          • 📔Configure Relationships
          • 📔Configure Group Types
          • 📔Configure Group Membership Kind
      • User Management
        • 📔User Guide
          • 📔Create User
          • 📔Assign a Role to a User
      • Geographic
      • Data Share
      • Languages Support
        • 📔User Guides
          • 📔Set Language Preference
      • API
        • Search APIs
        • Individual APIs
        • Group APIs
      • Privacy and Security
      • Interoperability
      • Monitoring and Reporting
      • ID Integration
        • ID Validation and Tokenisation
        • ID Authentication
          • 📔User Guides
            • 📔Configure eSignet Auth Provider for ID Authentication
            • 📔ID Authentication Process
            • 📔eSignet Client Creation
        • Fayda ID Integration
      • Verifiable Credentials Issuance
        • 📔User Guides
          • 📔Configure Inji to download Social Registry VCs
      • Computed fields
      • Record Revision History
      • SPAR Integration for Account Info
      • Self Service Registration Portal
      • Unique Reference ID
      • Logging
        • Audit Logs
        • System Logs
        • Change log
    • Versions
    • Deployment
      • Domain names and Certificates
      • Install Odoo Modules
    • Developer Zone
      • Technology Stack
      • Repositories
      • Developer Install
        • 📘Developer Install of OpenG2P Package on Linux
      • Packaging
        • 📘Docker Packaging Guide
        • 📘Helm Packaging Guide
      • Odoo Modules
        • ODK App User Mapping
  • PBMS
    • Features
      • Program Management
        • Role of a Program Manager
        • Program Life Cycle
        • 📔User Guides
          • 📔Create Program
          • 📔Create Eligibility Manager under Program
          • 📔Create Program Manager for a Program
          • 📔Create Deduplication Manager under Program
          • 📔Create Manager Type
            • 📔Create Payment Manager Types
              • 📔Create Payment Hub EE Payment Manager
              • 📔Create Payment Interoperability Layer Payment Manager
              • 📔Create Default Payment Manager
              • 📔Create Cash Payment Manager
              • 📔Create File Payment Manager
          • 📔Configure Entitlement Manager under Program
          • 📔Configure Payment Manager in Program
          • 📔Configure Default Program Manager
          • 📔Archive, Delete, End, and Re-activate a Program
      • Program Disbursement Cycles
        • 📔User Guides
          • 📔Create Program Fund
          • 📔Create Cycle Manager for a Program
      • Beneficiary Management
        • Beneficiary Registry
          • 📔User Guides
            • 📔Create an Individual Registrant
            • 📔Create a Group and Add Individual Registrants to the Group
            • 📔Assign a Program to a Group
            • 📔Assign a Program to an Individual
        • Beneficiary Registry Configurations
          • 📔User Guides
            • 📔Configure ID Types
            • 📔Configure Registrant Tags
            • 📔Configure Gender Types
            • 📔Configure Relationships
            • 📔Configure Group Types
            • 📔Configure Group Membership Kind
        • Registration
          • 📔User Guides
            • 📔Import CSV File to Registry Module
      • ID Verification
      • Eligibility
        • Proxy Means Test
        • 📔User Guides
          • 📔Create Eligibility Manager Types
            • 📔Configure Default Eligibility Manager
            • 📔Create ID Document Eligibility Manager
            • 📔Create Phone Number Eligibility Manager
          • 📔Configure Proxy Means Test
          • 📔Verify Eligibility of Enrolled Registrants
      • Deduplication
        • 📔User Guides
          • 📔Deduplicate Registrants
          • 📔Create Deduplication Manager Types
            • 📔Configure Default Deduplication Manager
            • 📔Create ID Deduplication Manager
            • 📔Create Phone Number Deduplication
      • Enrolment
        • 📔User Guides
          • 📔Enroll Registrants into Program
          • 📔Auto-Enroll New Registrants into a Program
          • 📔Enroll Eligible Individual Registrants into a Program
      • Entitlement
        • 📔User Guides
          • 📔Multi-Stage Approval
          • 📔Create Entitlement Manager Type
            • 📔Create Default Entitlement Manager
            • 📔Create Voucher Entitlement Manager
            • 📔Configure Cash Entitlement Manager
          • 📔Create Entitlement Voucher Template
          • 📔Configure the Payments File with QR Code
          • 📔Configure Default Cycle Managers
          • 📔Export Beneficiaries Approved Entitlement
      • Disbursement
        • Payment Batches
        • In-Kind Transfer
          • 📔User Guides
            • 📔Create a Product in Inventory
            • 📔Configure In-Kind Entitlement Manager
            • 📔Create and Approve Program Cycle
            • 📔Verify Eligibility of Registrants in a Cycle
        • Digital Cash Transfer
        • e-Voucher
        • 📔User Guides
          • Prepare and Send Payment
      • Self Service Portal
        • 📔User Guides
          • 📔Self Register Online
          • 📔Create Self Service Portal Form
          • 📔Map Self Service Portal Form
      • Document Management
      • Multi-tenancy
      • Notifications
        • 📔User Guides
          • 📔Send Notifications to Individual Registrants
          • 📔Create Notification Manager Types
            • 📔Create SMS Notification Manager
            • 📔Create Email Notification Manager
            • 📔Create Fast2SMS Notification Manager
          • 📔Create Notification Manager under Program
      • Accounting
      • Administration
        • RBAC
          • 📔User Guides
            • 📔Create User and Assign Role
            • 📔Configure Keycloak Authentication Provider for User Log in
        • i18n
      • ODK Importer
        • 📔User Guides
          • 📔Configure and Import ODK Form
          • 📔Import Specific ODK Forms using ODK Instance ID
          • 📔Import Social Registry Data into PBMS
      • MTS Connector
        • 📔User Guides
          • 📔Create MTS Connector
            • 📔Create ODK MTS Connector
            • 📔Create OpenG2P Registry MTS Connector
      • Audit Logs
      • Service Provider Portal
        • 📔User Guides
          • 📔Submit Reimbursement Using the Service Provider Portal
          • 📔Reimburse the Service Provider
      • Interoperability
      • Privacy and Security
      • Periodic Biometric Authentication for Beneficiaries
      • Beneficiary Exit Process
      • Verifiable Credential Issuance
        • 📔User Guides
          • 📔Configure Inji to download Beneficiary VCs
      • Deduplication
      • Manual In-Kind Entitlement
      • Print Disbursement Summary
      • Monitoring & Reporting
        • Logging
    • Versions
    • Developer Zone
      • Odoo Modules
        • G2P Enumerator
        • OpenG2P Registry MTS Connector
        • G2P Documents Store
        • MTS Connector
        • G2P Formio
        • G2P Registry: Rest API Extension Demo
        • G2P Registry: Additional Info REST API
        • G2P Registry: Bank Details Rest API
        • G2P Registry: Additional Info
        • G2P Registry: Membership
        • G2P Registry: Groups
        • G2P Registry: Individual
        • G2P Registry: Base
        • G2P Registry: Rest API
        • G2P Registry: Bank Details
        • OpenG2P Program Payment (Payment Hub EE)
        • OpenG2P Program Payments: In Files
        • G2P Program : Program Registrant Info Rest API
        • OpenG2P Entitlement: Differential
        • OpenG2P Program: Approval
        • OpenG2P Program: Assessment
        • G2P Program: Registrant Info
        • OpenG2P Program Payment: Simple Mpesa Payment Manager
        • OpenG2P Programs: Cycleless
        • OpenG2P Entitlement: In-Kind
        • G2P Notifications: Wiserv SMS Service Provider
        • G2P: Proxy Means Test
        • G2P Programs: REST API
        • G2P Program Payment (Payment Interoperability Layer)
        • OpenG2P Entitlement: Voucher
        • OpenG2P Programs: Reimbursement
        • OpenG2P Program Payment: Cash
        • OpenG2P Program: Documents
        • OpenG2P Program Payment: G2P Connect Payment Manager
        • OpenG2P Programs: Autoenrol
        • G2P ODK Importer
        • G2P Service Provider Beneficiary Management
        • OpenID Connect Authentication
        • G2P Auth: OIDC - Reg ID
        • G2P OpenID VCI: Base
        • G2P OpenID VCI: Programs
        • G2P OpenID VCI: Rest API
      • Developer Install on Linux
      • Repositories
        • openg2p-fastapi-common
          • OpenG2P FastAPI Common
          • OpenG2P FastAPI Auth
          • OpenG2P Common: G2P Connect ID Mapper
        • social-payments-account-registry
        • g2p-bridge
        • openg2p-packaging
        • openg2p-security
        • spar-load-test
        • 4sure
        • G2P SelfServicePortal
      • Technology Stack
      • Testing
        • Test Workflow
        • Automation Framework
    • Deployment
      • i18n
      • Installation of Odoo Modules
      • Domain names and Certificates
      • Helm Charts
  • SPAR
    • Features
      • SPAR Mapper
      • SPAR Self Service
      • Privacy & Security
      • Interoperability
      • Monitoring & Reporting
    • Deployment
      • Domain Names and Certificates
      • Helm Charts
    • 📔User Guides
      • 📔Link FA (Self Service)
      • 📔Link FA (Admin)
    • Development
      • Jira Board
      • Testing
        • Unit Testing
        • Functional Testing
        • Performance Testing
          • Mapper
            • Resolve API
            • Link API
            • Unlink API
            • Update API
      • Developer Install
        • SPAR Mapper API
        • SPAR Self Service API
        • SPAR Self Service UI
      • Repositories
      • API Reference
      • Tech Guides
    • Releases
      • 1.0.0
      • 1.1.0 - WIP
    • Roadmap
  • G2P Bridge
    • Features
      • Privacy & Security
      • Interoperability
      • Monitoring & Reporting
    • Deployment
    • Development
      • Design
        • IN APIs from PBMS
          • create_disbursement_envelope
          • cancel_disbursement_envelope
          • create_disbursements
          • cancel_disbursements
          • get_disbursement_envelope_status
          • get_disbursement_status
        • OUT APIs to Mapper
          • resolve
        • OUT APIs to Bank
          • check_funds_with_bank
          • block_funds_with_bank
          • disburse_funds_from_bank
        • IN APIs from Bank
          • upload_mt940
        • Helper Tables
          • benefit_program_configuration
        • Configuration parameters
        • Interfaces
        • Physical Organization
        • Example Bank
          • example-bank-models
          • example-bank-api
          • example-bank-celery
      • Testing
        • Unit Testing
        • Functional Testing
        • Performance Testing
      • Repositories
    • Tech Guides
    • 📔User Guides
      • 📔Configure G2P Connect Payment Manager
    • Releases
  • Utilities and Tools
    • ODK
      • 📔User Guides
        • 📔Create a Project for a Program
        • 📔Create a Form
        • 📔Upload a Form
        • 📔Upload revised Form
        • 📔Test a Form
        • 📔Publish a Form
        • 📔Provide Form Access to Field Agent
        • 📔Download a Form on ODK Collect
        • 📔Delete a Form
        • 📔Register Offline
    • 4Sure Verifier App
      • Installation Guide for 4Sure Application
      • 📔User Guides
        • 📔Verify Digital Credentials using 4Sure Application
        • 📔Verify and Populate the form in ODK Collect using 4Sure Application
      • 4Sure Test Summary
    • Smartscanner
      • 📔User Guides
    • Registration Tool Kit
  • Monitoring and Reporting
    • Apache Superset
    • Reporting Framework
      • 📔User Guides
        • 📔Connector Creation Guide
        • 📔Dashboards Creation Guide
        • 📔Installation & Troubleshooting
        • Page 1
      • Kafka Connect Transform Reference
    • System Logging
    • System Health
  • Privacy and Security
    • Key Manager
  • Interoperability
  • Integrations
    • eSignet Integration
    • M-Pesa Integration
    • Mojaloop Integration
    • 📔User Guides
  • Deployment
    • Base Infrastructure
      • Wireguard Bastion
        • Install WireGuard Client on Android Device
        • Wireguard Access to Users
        • Install WireGuard Client on Desktop
      • NFS Server
      • Rancher Cluster
      • OpenG2P Cluster
        • Kubernetes
          • Firewall
          • Istio
          • Adding Nodes to Cluster
          • Deleting Nodes from Cluster
        • Prometheus & Grafana
        • Fluentd & OpenSearch
          • DEPRECATED - OpenSearch
      • Load Balancer
        • Nginx
        • AWS
    • Resource Requirements
    • Helm Charts
    • Upgrades
    • Production
    • OpenG2P In a Box
    • Packaging
    • Versioning
    • Additional Guides
      • Automatic Build and Upload of Private Dockers
      • Generate SSL Certificates using Letsencrypt
      • Packaging Odoo based Docker
      • AWS
        • Create ACM Certificate on AWS
        • Create Security Group on AWS
        • Domain mapping on AWS Route53
        • Make Environment Publicly Accessible using AWS LB Configuration
      • Private Access Channel
      • Odoo Post Install Configuration
      • Pulling Docker from Private Repository on Docker Hub
      • Keycloak Client Creation
      • Troubleshooting: "fsnotify watcher" warning
      • Uninstalling Applications from Rancher UI
      • Access a Database from Outside the Cluster
      • Configure External Database to Connect OpenG2P Environment
      • Configure IPSec VPN Gateway to Connect to External Systems using Strongswan
      • Troubleshooting
        • PostgreSQL Database not Starting due to Replication Checkpoint Error
        • No Space Left on the Device Warning
      • Restart Deployment or StatefulSets to Redistribute Pods across Nodes
      • Rerun Jobs in Kubernetes Cluster
      • Finding URLs in the System
      • Transitioning PostgreSQL From Docker on K8s to Standalone PostgreSQL
      • Restore a PVC from an NFS Folder and Attach it to a Pod
      • View System Logs on the OpenSearch Dashboard
    • Persistent Storage
      • Resizing Persistent Volume Claim in Kubernetes Cluster
  • Documentation Guides
    • Documentation Guidelines
      • Embed a Miro diagram
      • Set an Image for a Start View
    • OpenG2P Module Doc Template
  • Use Cases
    • Technology for Inclusion
      • Registration in Low Connectivity Areas
      • Registration using Self Service Portal
    • Digital Cash Transfer Program
    • Create Social Registry
    • Case Studies
      • Immediate Assistance on Demand
      • Service Provider Reimbursement
      • Customise ODK Form - Add Ethiopian Calendar
  • Releases
    • 1.1.0
      • Release Notes
  • License
    • OpenG2P Support Policy
  • Community
    • Contributing
    • Code of Conduct
  • Blogs
    • OpenG2P and SDG Goals
    • OpenG2P - A Building Block for DPI
Powered by GitBook
LogoLogo

Copyright © OpenG2P. This work is licensed under Creative Common Attribution (CC-BY-4.0) International license unless otherwise noted.

On this page
  • Prerequisites
  • Debezium connector creation
  • OpenSearch connector creation
  • OpenSearch dashboard creation
  1. Monitoring and Reporting
  2. Reporting Framework
  3. User Guides

Connector Creation Guide

PreviousUser GuidesNextDashboards Creation Guide

Last updated 6 months ago

Creating Dashboards for Reporting involves the following steps:

  • Understanding what database tables are required to be indexed to OpenSearch for the dashboard.

  • Creating a pipeline for the data flow to OpenSearch. This pipeline involves:

    • Creating one Debezium Connector containing the database table.

    • Creating one OpenSearch Connector for the database table.

  • Creating a dashboard on OpenSearch

Follow the guides on this page to learn more about each step in the process above.

This document contains instructions for the developers (or dashboard creators) on creating the required connectors and dashboards to visualize reports on OpenSearch.

Follow the to install/update the connector configuration.

Prerequisites

  • Create a GitHub repository (or create a new directory in an existing repository) which is going to store the configuration for the connectors and the dashboards for OpenSearch.

  • Create a directory in the repository with these three folders debezium-connectors , opensearch-connectors and opensearch-dashboards .

  • For example .

  • Identify the tables from the database whose data will be required for the reports.

Debezium connector creation

  • One debezium connector is sufficient for indexing all the required tables of one database. So create one connector for each database (rather than one for each table).

  • Create a json file in the debezium-connectors . Each json file corresponds to one debezium connector. With the following contents:

    {
        "name": "${DB_PREFIX_INDEX}_${DB_NAME}",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "plugin.name": "pgoutput",
            "publication.autocreate.mode": "filtered",
            "slot.name": "dbz_${DB_PREFIX_INDEX}_${DB_NAME}",
            "publication.name": "dbz_pub_${DB_PREFIX_INDEX}_${DB_NAME}",
            "database.hostname": "${DB_HOSTNAME}",
            "database.port": "${DB_PORT}",
            "database.user": "${DB_USER}",
            "database.password": "${DB_PASS}",
            "database.dbname": "${DB_NAME}",
            "topic.prefix": "${DB_PREFIX_INDEX}",
            "table.include.list": "",
            "heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}",
            "decimal.handling.mode": "double"
        }
    }

Each $ in the json file will be treated as an environment variable. Environment variables will be automatically picked up during installation. If you want to use a dollar in the file and not parse it as env variable during installation, replace your $ with ${dollar} .

  • Add the list of all tables required from this database into the table.include.list field (in no particular order) (Accepts regex). For example

    "table.include.list": "public.res_partner,public.g2p_program_membership,public.g2p_programs"
    • This list needs to include relationship tables of the current table. For example: if you want to index g2p_program_membership but would also like to retrieve the name of the program in which the beneficiary belongs, then you have to add g2p_program as well.

OpenSearch connector creation

  • Each json file in the opensearch-connectors folder will be considered a connector. Create one connector file for each table with the following content:

    {
        "name": "res_partner_${DB_PREFIX_INDEX}",
        "config": {
            "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
            "connection.url": "${OPENSEARCH_URL}",
            "connection.username": "${OPENSEARCH_USERNAME}",
            "connection.password": "${OPENSEARCH_PASSWORD}",
            "tasks.max": "1",
            "topics": "${DB_PREFIX_INDEX}.public.res_partner",
            "key.ignore": "false",
            "schema.ignore": "true",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "true",
            "value.converter.schemas.enable": "false",
    
            "behavior.on.null.values": "delete",
            "behavior.on.malformed.documents": "warn",
            "behavior.on.version.conflict": "warn",
    
            "transforms": "keyExtId,valExt,tsconvert01,...",
    
            "transforms.keyExtId.type": "org.apache.kafka.connect.transforms.ExtractField${dollar}Key",
            "transforms.keyExtId.field": "id",
    
            "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
            "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
    
            "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
            "transforms.tsconvert01.field": "source_ts_ms",
            
            ...
    }
  • Replace name with the appropriate table names.

  • Replace topics field with the name of table.

    "topics": "${DB_PREFIX_INDEX}.public.g2p_program",
  • After the base file is configured, you can now add transformations to your connector at the end of the file (denoted by ... in the above example). Each transformation (SMT) will apply some change to the data or a particular field from the table, before pushing the entry to OpenSearch.

  • Add the following transformations to your connector based on the data available in the table.

    • For every Datetime field / Date field in the table add the following transform.

      "transforms.tsconvert02.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
      "transforms.tsconvert02.field": "create_date",
      "transforms.tsconvert02.input.type": "micro_sec",
    • At the end of all the transformations, add a TimestampSelector transform, which creates a new @timestamp_gen field whose value can be selected as any of the available Datetime fields in the table. This will be useful while creating a Dashboard on OpenSearch, where we can use this new @timestamp_gen field as the IndexPattern timestamp.

      "transforms.tsSelect.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampSelector${dollar}Value",
      "transforms.tsSelect.ts.order": "write_date,create_date",
      "transforms.tsSelect.output.field": "@timestamp_gen"
    • If you want to pull data from another table (which is already indexed into OpenSearch) into this table that the connector is pointing to, use the DynamicNewField transform. For example; g2p_program_membership contains the beneficiary list. But the demographic info of the beneficiary is present in res_partner table. Say you want to pull gender, and address of the beneficiary, and name of the program that the beneficiary is part of, then create two transforms like this:

      "transforms.join01.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value",
      "transforms.join01.input.fields": "program_id",
      "transforms.join01.output.fields": "program_name",
      "transforms.join01.es.index": "${DB_PREFIX_INDEX}.public.g2p_program",
      "transforms.join01.es.input.fields": "id",
      "transforms.join01.es.output.fields": "name",
      "transforms.join01.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
      "transforms.join01.es.url": "${OPENSEARCH_URL}",
      "transforms.join01.es.username": "${OPENSEARCH_USERNAME}",
      "transforms.join01.es.password": "${OPENSEARCH_PASSWORD}",
      
      "transforms.join02.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value",
      "transforms.join02.input.fields": "partner_id",
      "transforms.join02.output.fields": "beneficiary_gender,beneficiary_address",
      "transforms.join02.es.index": "${DB_PREFIX_INDEX}.public.res_partner",
      "transforms.join02.es.input.fields": "id",
      "transforms.join02.es.output.fields": "gender,address",
      "transforms.join02.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
      "transforms.join02.es.url": "${OPENSEARCH_URL}",
      "transforms.join02.es.username": "${OPENSEARCH_USERNAME}",
      "transforms.join02.es.password": "${OPENSEARCH_PASSWORD}",
    • If you want to add data/fields from one connector to another index on OpenSearch, use the DynamicNewFieldInsertBack transform. For example; NATIONAL IDs of registrants are saved in g2p_reg_id table. But if that field data is needed on res_partner index (main registrant data table) the following can be done on the g2p_reg_id connector. (The following adds reg_id_NATIONAL_ID field into res_partner index from g2p_reg_id connector into the document with ID from partner_id field) :

      "transforms.insertBack1.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewFieldInsertBack${dollar}Value",
      "transforms.insertBack1.id.expr": ".partner_id",
      "transforms.insertBack1.condition": ".id_type_name == \"NATIONAL ID\"",
      "transforms.insertBack1.value": "{reg_id_NATIONAL_ID: .value}",
      "transforms.insertBack1.es.index": "${DB_PREFIX_INDEX}.public.res_partner",
      "transforms.insertBack1.es.url": "${OPENSEARCH_URL}",
      "transforms.insertBack1.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
      "transforms.insertBack1.es.username": "${OPENSEARCH_USERNAME}",
      "transforms.insertBack1.es.password": "${OPENSEARCH_PASSWORD}",
    • "transforms.jqApply1.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq{dollar}Value",
      "transforms.jqApply1.expr": "{new_field: .payload.old_field, operation: .source.op}"
    • The connector by default indexes all the fields from the DB into OpenSearch. If you want to exclude fields from getting indexed, they must be explicitly deleted using a transform like given below. For example PII fields like name, phone number, address, etc. As a general rule, fields that are not required for dashboards must be excluded explicitly.

      "transforms.excludeFields.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq{dollar}Value",
      "transforms.excludeFields.expr": "del(.name,.address)",
      • column.exclude.list property can also be used to remove specific columns from being indexed (Not preferred method). The disadvantage is that this excludes the fields from Kafka topics itself. If there are multiple OpenSearch Connectors, referring to the same topic, each with different data requirements, then this is not possible to control at the SINK connector side.

    • If you wish to change the name of the Index into which data is supposed to be inserted, use RenameTopic transform. The default index name before rename will be that of the topic name given in the topics config field. Example :

      "transforms.renameTopic.type": "org.openg2p.reporting.kafka.connect.transforms.RenameTopic",
      "transforms.renameTopic.topic": "res_partner_new"
    • After configuring all the transforms, add the names of all transforms, in the order in which they have to be applied, in the transforms field.

      "transforms": "keyExtId,valExt1,valExt2,tsconvert01,tsconvert02,tsSelect,excludeFields,renameTopic",

Each $ in the json file will be treated as an environment variable. Environment variables will be automatically picked up during installation. If you want to use a dollar in the file and not parse it as env variable during installation, replace your $ with ${dollar} .

Capturing Change History

  • {
        "name": "res_partner_history_${DB_PREFIX_INDEX}",
        "config": {
            ...
            "key.ignore": "true",
            ...
            "behavior.on.null.values": "ignore",
            ...
            "transforms.renameTopic.type": "org.openg2p.reporting.kafka.connect.transforms.RenameTopic",
            "transforms.renameTopic.topic": "${DB_PREFIX_INDEX}.public.res_partner_history"
        }
    }
  • With this configuration, you will have two OpenSearch connectors. One that tracks the latest data of a table. And one that tracks all the changes. Correspondingly you have two indexes on OpenSearch (one with _history and one with regular data).

OpenSearch dashboard creation

Example debezium connector

Reference.

If you wish to apply a on the record, use ApplyJq transform. The current record will be replaced with the result after applying Jq. Example:

Example OpenSearch Connector

For more info on basic connector configuration, refer to .

For detailed transform configuration, refer to doc.

For a list of all available SMTs and their configs, refer to .

If you also wish to record all the changes that are made to the records of a table, create a new OpenSearch connector for the same topic as given in and change the following properties.

Refer to .

📔
📔
Installation guide
https://github.com/OpenG2P/openg2p-reporting/tree/develop/scripts/social-registry
https://github.com/OpenG2P/openg2p-reporting/blob/develop/scripts/social-registry/debezium-connectors/default.json
Debezium PostgreSQL Connector
Jq filter
https://github.com/OpenG2P/openg2p-reporting/blob/develop/scripts/pbms/opensearch-connectors/30.g2p_program_membership.json
Apacha Kafka Connect
Apache Kafka Connect Transformations
Reporting Kafka Connect Transforms
OpenSearch Dashboard Creation Guide
this section