Ingestion Pipeline
End-to-end asynchronous ingestion flow -- from partner payload receipt to change request creation.
The ingestion pipeline is responsible for receiving data from external systems, classifying and transforming incoming payloads, and creating change requests in the registry. The pipeline is fully asynchronous and designed for high-throughput, horizontally scalable processing.
Requirements
The ingestion pipeline is designed to satisfy the following requirements:
Data can arrive from multiple sources -- staff portal, beneficiary portal, agency applications, and other registries.
Each source may send data in different schemas -- both standards-based and custom formats.
The pipeline supports multiple interoperability standards such as DCI and national registry formats.
Some standards follow a notification-then-search philosophy (e.g. DCI sends an event with an identifier; the receiver must invoke a search API on the source to retrieve the full record).
Processing must be asynchronous to decouple API response times from pipeline execution.
The pipeline must scale out horizontally by adding Celery workers.
High throughput -- the system must handle bulk ingestion without blocking.
Traceability -- every raw incoming message must be traceable all the way through to the resulting change log and register record.
Auditability -- raw payloads are persisted before any transformation occurs.
Error handling and retries -- every stage supports rollback and retry up to a configurable
worker_max_attemptsthreshold.
Configuration models
The ingestion pipeline uses several configuration tables to control how incoming payloads are identified, classified, and transformed.
incoming_partners
partner_id
Primary key
partner_mnemonic
Short name for the partner system
keymanager_reference_id
Reference to the partner's public key in the key manager
is_active
Whether the partner is currently active
data_models
data_model_id
Primary key
data_model_mnemonic
Short name for the data model (e.g. dci_v1, custom_farmer)
pattern_for_data_model
JSONPath or regex pattern used to identify this data model from a raw payload
is_active
Whether the data model is currently active
incoming_model_signature_patterns
signature_pattern_id
Primary key
data_model_id
Foreign key to the data model
key_path_for_sender
JSONPath to extract the sender identifier
key_path_for_signature
JSONPath to extract the signature value
key_path_for_signature_payload
JSONPath to extract the payload that was signed
incoming_model_semantic_patterns
semantic_pattern_id
Primary key
data_model_id
Foreign key to the data model
register_id
Target register for payloads matching this pattern
operation_id
Target section/operation
pattern_for_register
Pattern to match the register from the payload
pattern_for_operation
Pattern to match the operation from the payload
key_path_for_business_payload
JSONPath to extract the business-relevant portion of the payload
incoming_templates
template_id
Primary key
data_model_id
Foreign key to the data model
register_id
Target register
operation_id
Target section/operation
template_file_id
Reference to the Jinja template file in object storage
incoming_payload_enricher
enricher_id
Primary key
data_model_id
Foreign key to the data model
register_id
Target register
operation_id
Target section/operation
raw_payload_enricher_class
Fully qualified class name of the enricher implementation
The enricher is used for standards that follow the notification-then-search philosophy. When a payload contains only a reference identifier, the enricher fetches the full record from the source system before classification proceeds.
Pipeline stages
API Ingestion
The partner system calls the /partner/ingest_data endpoint. The API layer performs the following synchronous operations:
Validates the request structure and partner credentials.
Matches the payload against
incoming_model_signature_patternsto verify the cryptographic signature.Stores the raw payload in MinIO without any transformation.
Returns a
correlation_idto the caller for traceability.
A Celery task is then dispatched to begin asynchronous processing.
Classification Worker
The classification worker picks up the raw payload and applies semantic pattern matching:
Matches the payload against configured
incoming_model_semantic_patterns.Determines the target
register_idandoperation_id(section).Extracts the business payload using the configured JSONPath.
If an
incoming_payload_enricheris configured for this pattern, the enricher is invoked to fetch additional data from the source system.
Transformation Worker
The transformation worker converts the classified payload from the external schema to the internal registry schema:
Looks up the appropriate
incoming_templatefor the resolved register and operation.Applies the Jinja template to transform the external payload into the registry's internal change payload format.
Validates the transformed output against the target section's JSON form schema.
Ingestion Worker
The ingestion worker creates the actual registry change request:
Calls
G2PRegisterService.create_change_log()with the transformed payload.The change request enters the standard verification and approval workflow.
If auto-approval is configured for this section and source channel, the change request is approved immediately.
Every stage has built-in rollback and retry logic. If a stage fails, it is retried up to the configured worker_max_attempts before being moved to a dead-letter queue for manual inspection.
Last updated
Was this helpful?