📔Connector Creation Guide

Creating Dashboards for Reporting involves the following steps:

  • Understanding what database tables are required to be indexed to OpenSearch for the dashboard.

  • Creating a pipeline for the data flow to OpenSearch. This pipeline involves:

    • Creating one Debezium Connector containing the database table.

    • Creating one OpenSearch Connector for the database table.

  • Creating a dashboard on OpenSearch

Follow the guides on this page to learn more about each step in the process above.

This document contains instructions for the developers (or dashboard creators) on creating the required connectors and dashboards to visualize reports on OpenSearch.

Follow the Installation guide to install/update the connector configuration.

Prerequisites

  • Create a GitHub repository (or create a new directory in an existing repository) which is going to store the configuration for the connectors and the dashboards for OpenSearch.

  • Create a directory in the repository with these three folders debezium-connectors , opensearch-connectors and opensearch-dashboards .

  • Identify the tables from the database whose data will be required for the reports.

Debezium connector creation

  • One debezium connector is sufficient for indexing all the required tables of one database. So create one connector for each database (rather than one for each table).

  • Create a json file in the debezium-connectors . Each json file corresponds to one debezium connector. With the following contents:

    {
        "name": "${DB_PREFIX_INDEX}_${DB_NAME}",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "plugin.name": "pgoutput",
            "publication.autocreate.mode": "filtered",
            "slot.name": "dbz_${DB_PREFIX_INDEX}_${DB_NAME}",
            "publication.name": "dbz_pub_${DB_PREFIX_INDEX}_${DB_NAME}",
            "database.hostname": "${DB_HOSTNAME}",
            "database.port": "${DB_PORT}",
            "database.user": "${DB_USER}",
            "database.password": "${DB_PASS}",
            "database.dbname": "${DB_NAME}",
            "topic.prefix": "${DB_PREFIX_INDEX}",
            "table.include.list": "",
            "heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}",
            "decimal.handling.mode": "double"
        }
    }

Each $ in the json file will be treated as an environment variable. Environment variables will be automatically picked up during installation. If you want to use a dollar in the file and not parse it as env variable during installation, replace your $ with ${dollar} .

OpenSearch connector creation

  • Each json file in the opensearch-connectors folder will be considered a connector. Create one connector file for each table with the following content:

    {
        "name": "res_partner_${DB_PREFIX_INDEX}",
        "config": {
            "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
            "connection.url": "${OPENSEARCH_URL}",
            "connection.username": "${OPENSEARCH_USERNAME}",
            "connection.password": "${OPENSEARCH_PASSWORD}",
            "tasks.max": "1",
            "topics": "${DB_PREFIX_INDEX}.public.res_partner",
            "key.ignore": "false",
            "schema.ignore": "true",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "true",
            "value.converter.schemas.enable": "false",
    
            "behavior.on.null.values": "delete",
            "behavior.on.malformed.documents": "warn",
            "behavior.on.version.conflict": "warn",
    
            "transforms": "keyExtId,valExt,tsconvert01,...",
    
            "transforms.keyExtId.type": "org.apache.kafka.connect.transforms.ExtractField${dollar}Key",
            "transforms.keyExtId.field": "id",
    
            "transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
            "transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
    
            "transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
            "transforms.tsconvert01.field": "source_ts_ms",
            
            ...
    }
  • Replace name with the appropriate table names.

  • Replace topics field with the name of table.

    "topics": "${DB_PREFIX_INDEX}.public.g2p_program",
  • After the base file is configured, you can now add transformations to your connector at the end of the file (denoted by ... in the above example). Each transformation (SMT) will apply some change to the data or a particular field from the table, before pushing the entry to OpenSearch.

  • Add the following transformations to your connector based on the data available in the table.

    • For every Datetime field / Date field in the table add the following transform.

      "transforms.tsconvert02.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
      "transforms.tsconvert02.field": "create_date",
      "transforms.tsconvert02.input.type": "micro_sec",
    • At the end of all the transformations, add a TimestampSelector transform, which creates a new @timestamp_gen field whose value can be selected as any of the available Datetime fields in the table. This will be useful while creating a Dashboard on OpenSearch, where we can use this new @timestamp_gen field as the IndexPattern timestamp.

      "transforms.tsSelect.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampSelector${dollar}Value",
      "transforms.tsSelect.ts.order": "write_date,create_date",
      "transforms.tsSelect.output.field": "@timestamp_gen"
    • If you want to pull data from another table (which is already indexed into OpenSearch) into this table that the connector is pointing to, use the DynamicNewField transform. For example; g2p_program_membership contains the beneficiary list. But the demographic info of the beneficiary is present in res_partner table. Say you want to pull gender, and address of the beneficiary, and name of the program that the beneficiary is part of, then create two transforms like this:

      "transforms.join01.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value",
      "transforms.join01.input.fields": "program_id",
      "transforms.join01.output.fields": "program_name",
      "transforms.join01.es.index": "${DB_PREFIX_INDEX}.public.g2p_program",
      "transforms.join01.es.input.fields": "id",
      "transforms.join01.es.output.fields": "name",
      "transforms.join01.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
      "transforms.join01.es.url": "${OPENSEARCH_URL}",
      "transforms.join01.es.username": "${OPENSEARCH_USERNAME}",
      "transforms.join01.es.password": "${OPENSEARCH_PASSWORD}",
      
      "transforms.join02.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewField${dollar}Value",
      "transforms.join02.input.fields": "partner_id",
      "transforms.join02.output.fields": "beneficiary_gender,beneficiary_address",
      "transforms.join02.es.index": "${DB_PREFIX_INDEX}.public.res_partner",
      "transforms.join02.es.input.fields": "id",
      "transforms.join02.es.output.fields": "gender,address",
      "transforms.join02.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
      "transforms.join02.es.url": "${OPENSEARCH_URL}",
      "transforms.join02.es.username": "${OPENSEARCH_USERNAME}",
      "transforms.join02.es.password": "${OPENSEARCH_PASSWORD}",
    • If you want to add data/fields from one connector to another index on OpenSearch, use the DynamicNewFieldInsertBack transform. For example; NATIONAL IDs of registrants are saved in g2p_reg_id table. But if that field data is needed on res_partner index (main registrant data table) the following can be done on the g2p_reg_id connector. (The following adds reg_id_NATIONAL_ID field into res_partner index from g2p_reg_id connector into the document with ID from partner_id field) :

      "transforms.insertBack1.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewFieldInsertBack${dollar}Value",
      "transforms.insertBack1.id.expr": ".partner_id",
      "transforms.insertBack1.condition": ".id_type_name == \"NATIONAL ID\"",
      "transforms.insertBack1.value": "{reg_id_NATIONAL_ID: .value}",
      "transforms.insertBack1.es.index": "${DB_PREFIX_INDEX}.public.res_partner",
      "transforms.insertBack1.es.url": "${OPENSEARCH_URL}",
      "transforms.insertBack1.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
      "transforms.insertBack1.es.username": "${OPENSEARCH_USERNAME}",
      "transforms.insertBack1.es.password": "${OPENSEARCH_PASSWORD}",
    • If you wish to apply a Jq filter on the record, use ApplyJq transform. The current record will be replaced with the result after applying Jq. Example:

      "transforms.jqApply1.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq{dollar}Value",
      "transforms.jqApply1.expr": "{new_field: .payload.old_field, operation: .source.op}"
    • The connector by default indexes all the fields from the DB into OpenSearch. If you want to exclude fields from getting indexed, they must be explicitly deleted using a transform like given below. For example PII fields like name, phone number, address, etc. As a general rule, fields that are not required for dashboards must be excluded explicitly.

      "transforms.excludeFields.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq{dollar}Value",
      "transforms.excludeFields.expr": "del(.name,.address)",
      • column.exclude.list property can also be used to remove specific columns from being indexed (Not preferred method). The disadvantage is that this excludes the fields from Kafka topics itself. If there are multiple OpenSearch Connectors, referring to the same topic, each with different data requirements, then this is not possible to control at the SINK connector side.

    • If you wish to change the name of the Index into which data is supposed to be inserted, use RenameTopic transform. The default index name before rename will be that of the topic name given in the topics config field. Example :

      "transforms.renameTopic.type": "org.openg2p.reporting.kafka.connect.transforms.RenameTopic",
      "transforms.renameTopic.topic": "res_partner_new"
    • After configuring all the transforms, add the names of all transforms, in the order in which they have to be applied, in the transforms field.

      "transforms": "keyExtId,valExt1,valExt2,tsconvert01,tsconvert02,tsSelect,excludeFields,renameTopic",

Each $ in the json file will be treated as an environment variable. Environment variables will be automatically picked up during installation. If you want to use a dollar in the file and not parse it as env variable during installation, replace your $ with ${dollar} .

Capturing Change History

  • If you also wish to record all the changes that are made to the records of a table, create a new OpenSearch connector for the same topic as given in this section and change the following properties.

    {
        "name": "res_partner_history_${DB_PREFIX_INDEX}",
        "config": {
            ...
            "key.ignore": "true",
            ...
            "behavior.on.null.values": "ignore",
            ...
            "transforms.renameTopic.type": "org.openg2p.reporting.kafka.connect.transforms.RenameTopic",
            "transforms.renameTopic.topic": "${DB_PREFIX_INDEX}.public.res_partner_history"
        }
    }
  • With this configuration, you will have two OpenSearch connectors. One that tracks the latest data of a table. And one that tracks all the changes. Correspondingly you have two indexes on OpenSearch (one with _history and one with regular data).

OpenSearch dashboard creation

Refer to OpenSearch Dashboard Creation Guide.

Last updated