Example Implementation Workflow

The RegistryFarmer class demonstrates a custom implementation of the RegistryInterface, tailored for integrating with a Farmer Registry data source.

Below is the typical workflow for building a similar registry connector:

Define a Custom Registry Class and Update Factory

Create a class (e.g., RegistryFarmer) that inherits from RegistryInterface. This ensures the connector implements all abstract methods required by the PBMS framework — including summaries, searches, and entitlement computations.

# /computations/registry_farmer.py
class RegistryFarmer(RegistryInterface):

Update the /factory/registry_factory.py file to include this new registry class

# /factory/registry_factory.py
class RegistryFactory:
    """Get the appropriate summary computation class based on the registrant type"""

    @staticmethod
    def get_registry_class(
        target_registry,
    ) -> RegistryInterface:
        if target_registry == G2PRegistryType.FARMER.value:
            return RegistryFarmer()

        # add multiple interfaces using elif blocks 

        else:
            raise BGTaskException(code=BGTaskErrorCodes.INVALID_REQUEST)

Create Custom Schema (/schemas) and Model Definitions (/models)

Define a pydantic schema to structure registry-specific summary data. Each registry schema extends its base schema to ensure the payload integrates seamlessly with existing response models.

# /schemas/beneficiary_list_summary_farmer.py
from typing import Optional
from pydantic import BaseModel
from .beneficiary_list_summary import BeneficiaryListSummaryPayload

class BeneficiaryListSummaryFarmer(BaseModel):
    # ... registry-specific stats ...
    # computaion logic is expected in registry connector implementation

class BeneficiaryListSummaryFarmerPayload(BeneficiaryListSummaryPayload):
    registry_summary: BeneficiaryListSummaryFarmer

Extend the base SQLAlchemy model BeneficiaryListSummary to persist registry-specific statistics. The inheritance ensures all common fields (e.g., beneficiary_list_id, timestamps) are available automatically.

# /models/beneficiary_list_summary_farmer.py
from openg2p_bg_task_models.models import BeneficiaryListSummary
from sqlalchemy import JSON, Float, String
from sqlalchemy.orm import mapped_column

class BeneficiaryListSummaryFarmer(BeneficiaryListSummary):
    __tablename__ = "beneficiary_list_summary_farmer"

    land_holding_mean = mapped_column(Float, default=0)
    annual_income_mean = mapped_column(Float, default=0)
    average_entitlement_female = mapped_column(JSON, nullable=True)
    average_entitlement_male = mapped_column(JSON, nullable=True)
    # ... other numeric/statistical columns ...
    # computaion logic is expected in registry connector implementation

After new model creation you are expected to update the migration script in migrate.py with the new models.

Similarly, /models and /schema should also be populated by adding the registry views created as models and related payloads. These models will be used in lookup from the registry database. registry_type.py should house all the target model mappings.

# /schema/registry_farmer.py
class G2PFarmerRegistryPayload(G2PRegistryPayload):

    ## ... add fields from registry view defined ...
# /models/registry_farmer.py
class G2PFarmerRegistry(G2PRegistry):
    __tablename__ = "g2p_farmer_registry"    # table name in registry view

    ## ... add fields from registry view defined ...

Implement the computation and registry methods, you can use the SQL utility methods provided in the interface by passing target_registry string to get a TextClause SQL query. Refer the Code Anatomy for Registry Connector Interface below to populate your custom interface with the current interface template.

Method Name
Type
Purpose
Key Arguments
Returns
Implementation Notes

get_summary

Async

Retrieves summary statistics for a given beneficiary list asynchronously.

beneficiary_list_id: str, bg_task_session: AsyncSession, formated: bool

BeneficiaryListSummaryPayload

Used in API calls; fetches formatted summary metrics from summary table.

get_summary_sync

Sync

Same as get_summary but executed synchronously (for Celery or background tasks).

beneficiary_list_id: str, bg_task_session: Session

BeneficiaryListSummaryPayload

Ideal for heavy computation where async isn’t needed.

compute_eligibility_statistics

Sync

Computes eligibility-based summary metrics for beneficiaries.

beneficiary_list_details: List[BeneficiaryListDetails], base_summary, sr_session, bg_task_session

None

Uses NumPy for percentile and mean computations; updates summary model.

compute_entitlement_statistics

Sync

Computes entitlement statistics (e.g., payment distribution by gender).

beneficiary_list_id: str, bg_task_session: Session, sr_session: Session

None

Groups entitlements by benefit_code_id; calculates mean, Q1, Q2, Q3.

get_registrants_by_ids

Sync

Fetches registrant data from the registry database.

registrant_ids: List[str], sr_session: Session

List[G2PRegistry]

Uses chunked loading (yield_per(500)) for performance on large datasets.

get_is_registant_entitled

Sync

Checks if a registrant satisfies entitlement criteria using a SQL query.

registrant_id: str, sql_query: str, sr_session: Session

bool

Constructs validated dynamic SQL using construct_get_is_registrant_entitled_sql_query.

get_entitlement_multiplier

Sync

Retrieves multiplier value for entitlement scaling.

multiplier: str, registrant_id: str, sr_session: Session

int

Executes a SQL query; defaults to 1 if not found or multiplier is "none".

search_beneficiaries

Async

Performs paginated and filtered beneficiary searches.

bg_task_session: AsyncSession, sr_session: AsyncSession, beneficiary_list_id: str, target_registry: str, search_query, page, page_size, order_by

BeneficiarySearchResponsePayload

Builds dynamic SQL queries with construct_beneficiary_search_sql_query and applies caching.

construct_multiplier_sql_query

Utility

Builds SQL query to fetch multiplier column from registry table.

multiplier: str, target_registry: str

TextClause

Returns a prepared SQLAlchemy text() object.

construct_beneficiary_search_sql_query

Utility

Constructs SQL for paginated search with WHERE and ORDER BY.

registrant_ids: List[str], target_registry: str, where_clause: str, order_by: str, page_size: int, page: int

(TextClause, Dict[str, Any])

Replaces curly quotes in filters; dynamically injects pagination params.

construct_beneficiary_search_count_sql_query

Utility

Builds SQL query to count total search results.

registrant_ids: List[str], target_registry: str, where_clause: str

(TextClause, Dict[str, Any])

Mirrors main query but replaces SELECT * with SELECT COUNT(*).

construct_get_is_registrant_entitled_sql_query

Utility

Prepares validated entitlement SQL query with a dynamic WHERE clause.

registrant_id: str, target_registry: str, sql_query: str

TextClause

Validates SQL starts with SELECT; appends correct registry table reference.

After pushing this custom adapter code to GitHub, you can proceed to create a custom Docker image for your setup. Simply follow the existing Docker creation guide for PBMS Background Tasks, updating the path for the extensions package.

This approach ensures your environment remains consistent with the PBMS deployment standards while allowing flexibility to integrate your custom logic and components seamlessly.

Last updated

Was this helpful?