Ingestion pipeline
This document outlines the end-to-end ingestion flow from the partner API through to registry change request creation. The pipeline is fully asynchronous and decoupled into specific celery stages.
API Ingestion (/partner/ingest_data)
/partner/ingest_data)Module: openg2p-registry-partner-api / openg2p-registry-core
Validation: Reconciles the incoming request against per-configured key paths (
incoming_model_key_paths) to extract the sender, message ID, and signature.Processing: Identifies the corresponding
DataModel(either via query param or regex pattern matching on the payload) and verifies the payload signature from the key manager. If the payload is a continuous list, it handles splitting it into manageable individual elements.Storage: Persists the raw payload into the
incoming_raw_dataandincoming_raw_data_payloadstables with aPENDINGclassification status.Response: Immediately returns a
correlation_idback to the partner system for tracking.
Classification Worker
Module: openg2p-registry-celery-workers (ingest_data_classification_worker.py)
Trigger:
ingest_data_classification_beat_producercontinuously polls for records inincoming_raw_datawith aPENDINGstatus.Classification: Matches the payload against
incoming_model_semantic_patternsto determine the corresponding registry definition (register_id) and section (section_id).Storage: Successfully mapped records are stored in
incoming_classified_dataand the original raw data row is marked asPROCESSED.
Transformation Worker
Module: openg2p-registry-celery-workers (ingest_data_transformation_worker.py)
Trigger:
ingest_data_transformation_beat_producerpicks up records fromincoming_classified_datapending transformation.Enrichment: Extracts the specific business payload using configured key paths and invokes a dynamic enricher class (
G2PPayloadEnricherInterface) to populate the payload with additional context (e.g. cross-referencing parent properties).Transformation: Uses per-configured Jinja hosted in Minio (
incoming_templates) to transform the payload from the external partner's schema directly into our internal OpenG2P schema standard.Storage: Stores both states in
incoming_enriched_transformed_dataand updates pipeline statuses.
Ingestion Worker
Module: openg2p-registry-celery-workers (ingest_data_worker.py)
Trigger:
ingest_data_beat_producerpicks up rows that have aPENDINGingestion status.Change Request Creation: Uses the previously mapped register IDs, section IDs, and fully transformed payload to create a
ChangeRequestRequestPayload.Execution: Invokes the core core method
G2PRegisterService.create_change_request().Completion: Adds the new data to the registry change request queue and stamps the generated
change_request_idonto theincoming_classified_datarecord.
Every stage follows a rollback and retry mechanism up to a configured worker_max_attempts. If a failure threshold is met, the particular segment fails cleanly and logs its specific exception code back onto the database record (*_latest_error_code), ensuring healthy records are not blocked.
Last updated
Was this helpful?

