Example Implementation Workflow
The RegistryFarmer class demonstrates a custom implementation of the RegistryInterface, tailored for integrating with a Farmer Registry data source.
Below is the typical workflow for building a similar registry connector:
Define a Custom Registry Class and Update Factory
Create a class (e.g., RegistryFarmer) that inherits from RegistryInterface.
This ensures the connector implements all abstract methods required by the PBMS framework — including summaries, searches, and entitlement computations.
# /computations/registry_farmer.py
class RegistryFarmer(RegistryInterface):Update the /factory/registry_factory.py file to include this new registry class
# /factory/registry_factory.py
class RegistryFactory:
"""Get the appropriate summary computation class based on the registrant type"""
@staticmethod
def get_registry_class(
target_registry,
) -> RegistryInterface:
if target_registry == G2PRegistryType.FARMER.value:
return RegistryFarmer()
# add multiple interfaces using elif blocks
else:
raise BGTaskException(code=BGTaskErrorCodes.INVALID_REQUEST)Create Custom Schema (/schemas) and Model Definitions (/models)
/schemas) and Model Definitions (/models)Define a pydantic schema to structure registry-specific summary data. Each registry schema extends its base schema to ensure the payload integrates seamlessly with existing response models.
# /schemas/beneficiary_list_summary_farmer.py
from typing import Optional
from pydantic import BaseModel
from .beneficiary_list_summary import BeneficiaryListSummaryPayload
class BeneficiaryListSummaryFarmer(BaseModel):
# ... registry-specific stats ...
# computaion logic is expected in registry connector implementation
class BeneficiaryListSummaryFarmerPayload(BeneficiaryListSummaryPayload):
registry_summary: BeneficiaryListSummaryFarmerExtend the base SQLAlchemy model BeneficiaryListSummary to persist registry-specific statistics. The inheritance ensures all common fields (e.g., beneficiary_list_id, timestamps) are available automatically.
# /models/beneficiary_list_summary_farmer.py
from openg2p_bg_task_models.models import BeneficiaryListSummary
from sqlalchemy import JSON, Float, String
from sqlalchemy.orm import mapped_column
class BeneficiaryListSummaryFarmer(BeneficiaryListSummary):
__tablename__ = "beneficiary_list_summary_farmer"
land_holding_mean = mapped_column(Float, default=0)
annual_income_mean = mapped_column(Float, default=0)
average_entitlement_female = mapped_column(JSON, nullable=True)
average_entitlement_male = mapped_column(JSON, nullable=True)
# ... other numeric/statistical columns ...
# computaion logic is expected in registry connector implementationAfter new model creation you are expected to update the migration script in migrate.py with the new models.
Similarly, /models and /schema should also be populated by adding the registry views created as models and related payloads. These models will be used in lookup from the registry database. registry_type.py should house all the target model mappings.
# /schema/registry_farmer.py
class G2PFarmerRegistryPayload(G2PRegistryPayload):
## ... add fields from registry view defined ...# /models/registry_farmer.py
class G2PFarmerRegistry(G2PRegistry):
__tablename__ = "g2p_farmer_registry" # table name in registry view
## ... add fields from registry view defined ...Implement the computation and registry methods, you can use the SQL utility methods provided in the interface by passing target_registry string to get a TextClause SQL query. Refer the Code Anatomy for Registry Connector Interface below to populate your custom interface with the current interface template.
get_summary
Async
Retrieves summary statistics for a given beneficiary list asynchronously.
beneficiary_list_id: str, bg_task_session: AsyncSession, formated: bool
BeneficiaryListSummaryPayload
Used in API calls; fetches formatted summary metrics from summary table.
get_summary_sync
Sync
Same as get_summary but executed synchronously (for Celery or background tasks).
beneficiary_list_id: str, bg_task_session: Session
BeneficiaryListSummaryPayload
Ideal for heavy computation where async isn’t needed.
compute_eligibility_statistics
Sync
Computes eligibility-based summary metrics for beneficiaries.
beneficiary_list_details: List[BeneficiaryListDetails], base_summary, sr_session, bg_task_session
None
Uses NumPy for percentile and mean computations; updates summary model.
compute_entitlement_statistics
Sync
Computes entitlement statistics (e.g., payment distribution by gender).
beneficiary_list_id: str, bg_task_session: Session, sr_session: Session
None
Groups entitlements by benefit_code_id; calculates mean, Q1, Q2, Q3.
get_registrants_by_ids
Sync
Fetches registrant data from the registry database.
registrant_ids: List[str], sr_session: Session
List[G2PRegistry]
Uses chunked loading (yield_per(500)) for performance on large datasets.
get_is_registant_entitled
Sync
Checks if a registrant satisfies entitlement criteria using a SQL query.
registrant_id: str, sql_query: str, sr_session: Session
bool
Constructs validated dynamic SQL using construct_get_is_registrant_entitled_sql_query.
get_entitlement_multiplier
Sync
Retrieves multiplier value for entitlement scaling.
multiplier: str, registrant_id: str, sr_session: Session
int
Executes a SQL query; defaults to 1 if not found or multiplier is "none".
search_beneficiaries
Async
Performs paginated and filtered beneficiary searches.
bg_task_session: AsyncSession, sr_session: AsyncSession, beneficiary_list_id: str, target_registry: str, search_query, page, page_size, order_by
BeneficiarySearchResponsePayload
Builds dynamic SQL queries with construct_beneficiary_search_sql_query and applies caching.
construct_multiplier_sql_query
Utility
Builds SQL query to fetch multiplier column from registry table.
multiplier: str, target_registry: str
TextClause
Returns a prepared SQLAlchemy text() object.
construct_beneficiary_search_sql_query
Utility
Constructs SQL for paginated search with WHERE and ORDER BY.
registrant_ids: List[str], target_registry: str, where_clause: str, order_by: str, page_size: int, page: int
(TextClause, Dict[str, Any])
Replaces curly quotes in filters; dynamically injects pagination params.
construct_beneficiary_search_count_sql_query
Utility
Builds SQL query to count total search results.
registrant_ids: List[str], target_registry: str, where_clause: str
(TextClause, Dict[str, Any])
Mirrors main query but replaces SELECT * with SELECT COUNT(*).
construct_get_is_registrant_entitled_sql_query
Utility
Prepares validated entitlement SQL query with a dynamic WHERE clause.
registrant_id: str, target_registry: str, sql_query: str
TextClause
Validates SQL starts with SELECT; appends correct registry table reference.
After pushing this custom adapter code to GitHub, you can proceed to create a custom Docker image for your setup. Simply follow the existing Docker creation guide for PBMS Background Tasks, updating the path for the extensions package.
This approach ensures your environment remains consistent with the PBMS deployment standards while allowing flexibility to integrate your custom logic and components seamlessly.
Last updated
Was this helpful?

