Ingestion Pipeline
Operational capabilities exposed by the ingestion pipeline - APIs, configuration management, observability, and user-facing behavior.
Partner data ingestion
External partner systems submit data through the partner API.
Endpoint: POST /partner/ingest_data
Query parameters:
data_model
No
Data model mnemonic. If omitted, all active data models are scanned against pattern_for_data_model.
register_id
No
Target register. When supplied together with intake_form_id, skips the classification stage.
intake_form_id
No
Target intake form. When supplied together with register_id, skips the classification stage.
Request body: Arbitrary JSON envelope defined by the partner data model. Structure is not enforced at the API layer - validation is driven by configured key paths and patterns.
Response: A correlation_id rendered through a partner-specific Jinja response template stored in MinIO (data_models.response_template_file_id). All records created from a single API call (including list fan-out) share the same correlation_id.
Staff data ingestion
Staff users can submit data through the staff portal using the same core ingest service.
Endpoint: POST /input-mechanism-data/ingest-data
Permission: intakeSubmission:edit
Behaviour: Same ingestion pipeline as the partner API, but returns a plain JSON response (no MinIO response template rendering).
Classification bypass
When both register_id and intake_form_id are provided on ingest, the classification worker is skipped:
incoming_raw_data.classification_statusis set toPROCESSEDimmediately.incoming_classified_datais inserted withtransformation_status = PENDING.The transformation beat producer picks up the row on its next poll.
Useful when the caller already knows the target register and intake form (e.g. staff-initiated or pre-classified submissions).
ADD and UPDATE pipeline actions
Classification resolves whether the payload creates a new record or updates an existing one.
ADD
External record identifier not found in registry
Intake form submission created and finalized (draft_status = FINAL)
UPDATE
External record identifier matches an existing registry record
Change request created via G2PChangeRequestWorkerService.create_change_request()
The ingestion beat producer routes to ingest_data_worker (ADD) or change_request_ingest_worker (UPDATE) based on pipeline_action on the classified row.
UPDATE change requests enter the standard verification and approval workflow. Auto-approval applies when configured for the target section and source channel.
Bulk and list ingestion
When incoming_model_key_paths.is_list = true, a single API call containing a list of records is fanned out into multiple ingest rows:
Each list element gets its own
ingest_id.All records share a single
correlation_idfor end-to-end traceability.Each record progresses independently through classification, transformation, and ingestion.
Payload enrichment
For standards that follow notification-then-search (e.g. DCI), the incoming payload may contain only a reference identifier. An enricher implementation resolves the full business record before transformation.
Configured via
raw_payload_enricher_classonincoming_model_semantic_patterns.Instantiated by
G2PPayloadEnricherFactoryfrom extension packages (e.g.openg2p_registry_nsr_extension).Runs in the transformation worker, after business payload extraction and before Jinja template rendering.
Traceability and observability
Staff can search and inspect ingestion records at every pipeline stage.
Prefix: /ingestion-data
POST /get_ingestion_summary_data
—
Summary statistics across ingestion records
POST /search_in_ingestion_data
incomingMessage:view
Search and paginate ingestion records
POST /get_raw_payload
incomingMessage:view
Retrieve the original raw payload by ingest_id
POST /get_enriched_and_transformed_payload
incomingMessage:view
Retrieve enriched and transformed payload by ingest_id
Traceability chain: correlation_id → ingest_id → classified data → enriched/transformed data → intake_form_submission_id (ADD) or change_request_id (UPDATE).
Ingestion configuration management
Staff configure how payloads are identified, classified, enriched, and transformed through the ingestion config API.
Prefix: /ingestion-config
Key paths
POST /create_incoming_key_path
ingestKeyPath:create
Create signature/message key paths for a data model
POST /get_incoming_key_path
ingestKeyPath:view
Get key path by ID
POST /get_all_incoming_key_paths
ingestKeyPath:view
List all key paths
POST /update_incoming_key_path
ingestKeyPath:edit
Update key path
POST /delete_incoming_key_path
ingestKeyPath:delete
Delete key path
Semantic patterns
POST /create_semantic_pattern
ingestExpression:create
Create semantic pattern
POST /get_semantic_pattern
ingestExpression:view
Get semantic pattern by ID
POST /get_all_semantic_patterns
ingestExpression:view
List all semantic patterns
POST /update_semantic_pattern
ingestExpression:edit
Update semantic pattern
POST /delete_semantic_pattern
ingestExpression:delete
Delete semantic pattern
Register semantic patterns
POST /create_register_semantic_pattern
ingestExpression:create
Create register-level pattern
POST /get_register_semantic_pattern
ingestExpression:view
Get register pattern by ID
POST /get_all_register_semantic_patterns
ingestExpression:view
List all register patterns
POST /update_register_semantic_pattern
ingestExpression:edit
Update register pattern
POST /delete_register_semantic_pattern
ingestExpression:delete
Delete register pattern
Templates
POST /create_template
ingestTemplate:create
Create Jinja transformation template reference
POST /get_template
ingestTemplate:view
Get template by ID
POST /get_all_templates
ingestTemplate:view
List all templates
POST /update_template
ingestTemplate:edit
Update template
POST /delete_template
ingestTemplate:delete
Delete template
Subscription activity logs
POST /create_subscription_activity_log
ingestSubscription:create
Log WebSub subscription activity
POST /get_subscription_activity_logs_by_partner
ingestSubscription:view
Get logs for a partner
POST /get_all_subscription_activity_logs
ingestSubscription:view
List all subscription logs
Partner response templating
Partner API success and error responses are not returned as fixed JSON schemas. Instead, each data model can define a response_template_file_id pointing to a Jinja template in MinIO. The template receives the correlation_id (and error details on failure) and renders the partner-specific response envelope.
This allows each interoperability standard (DCI, custom formats) to return responses in its own schema without changing the core API.
Error handling and failed records
When a pipeline stage fails:
The worker rolls back the current transaction and records the error.
The row status is reset to
PENDINGfor retry, up toworker_max_attempts(default: 5).After exhausting retries, the row is marked
FAILEDwith the error message in*_latest_error_code.
Failed records can be inspected via /ingestion-data/search_in_ingestion_data and retried manually by resetting the relevant status field to PENDING.
There is no separate broker dead-letter queue.
Configuration:
worker_max_attempts
5
REGISTRY_CELERY_WORKERS_
Beat polling frequency
configurable
REGISTRY_CELERY_BEAT_
Worker queue
registry_worker_queue
REGISTRY_CELERY_BEAT_ / REGISTRY_CELERY_WORKERS_
Permissions summary
intakeSubmission:edit
Staff data ingestion
incomingMessage:view
Search and inspect ingestion records
ingestKeyPath:*
Key path configuration
ingestExpression:*
Semantic and register semantic pattern configuration
ingestTemplate:*
Transformation template configuration
ingestSubscription:*
WebSub subscription activity logs
Last updated
Was this helpful?