Documentation Index
Fetch the complete documentation index at: https://docs.ivory.finance/llms.txt
Use this file to discover all available pages before exploring further.
How the generic pipeline works
Every Singer tap — regardless of source — flows through the same transformation pipeline. No per-source code is required.
1. meltano run tap-{name} target-postgres
↓ writes to etl_staging_{source}.{stream}
2. POST /v1/etl/meltano/webhook
↓ verifies HMAC-SHA256 signature
3. _transform_and_load()
↓ reads etl.field_mappings for this connector
↓ for each staged row: applies transform_fn per field
↓ batches 500 rows → INSERT INTO {namespace}.{table} (Spark SQL)
4. Singer STATE persisted to etl.meltano_connectors.tap_state
↓ used as --state on the next incremental run
LLM field mapping
POST /v1/admin/connectors/{id}/map-fields
Calls GPT-4o-mini to propose source_field → target_column mappings for a Singer stream. Understands Salesforce objects, HubSpot deals, Bloomberg tickers, and canonical IB platform schema conventions (dm, md, ci, pm).
Request body
| Field | Required | Description |
|---|
source_stream | Yes | Singer stream name (e.g. Opportunity, contacts) |
source_schema | Yes | {"field_name": "field_type"} from meltano invoke tap-{name} --discover |
target_schema | Yes | Target schema: dm, md, ci, pm, or custom |
target_table | Yes | Target Iceberg table name |
target_schema_def | No | {"col_name": "col_type"} override — omit to use canonical schema |
curl -X POST https://api.ivory.finance/v1/admin/connectors/a1b2c3d4-.../map-fields \
-H "Authorization: Bearer $IVORY_JWT" \
-H "Content-Type: application/json" \
-d '{
"source_stream": "Opportunity",
"source_schema": {
"Id": "string",
"Name": "string",
"Amount": "number",
"CloseDate": "string",
"StageName": "string",
"OwnerId": "string",
"CreatedDate": "string"
},
"target_schema": "dm",
"target_table": "deals"
}'
Response 201 Created
{
"mapped": 6,
"review_required": 1,
"message": "1 mapping has confidence < 0.7 and should be reviewed.",
"mappings": [
{
"id": "map-uuid-...",
"source_stream": "Opportunity",
"source_field": "Id",
"target_schema": "dm",
"target_table": "deals",
"target_column": "external_id",
"transform_fn": null,
"llm_mapped": true,
"confidence_score": 1.0,
"reviewed": false
},
{
"source_field": "Amount",
"target_column": "deal_value_usd",
"transform_fn": "float(v)/100 if v else 0.0",
"confidence_score": 0.9
},
{
"source_field": "CloseDate",
"target_column": "close_date",
"transform_fn": "str(v)[:10] if v else None",
"confidence_score": 1.0
},
{
"source_field": "StageName",
"target_column": "stage",
"transform_fn": "v.strip().upper() if v else None",
"confidence_score": 0.9
},
{
"source_field": "OwnerId",
"target_column": "owner_crm_id",
"transform_fn": null,
"confidence_score": 0.65,
"reviewed": false
}
]
}
Confidence scores:
| Score | Meaning |
|---|
1.0 | Exact name match |
0.9 | Semantic match (e.g. Amount → deal_value_usd) |
0.7 | Inferred mapping |
< 0.7 | Uncertain — review before enabling |
List field mappings
GET /v1/admin/connectors/{id}/mappings
Returns all mappings for the connector, ordered by source_stream, source_field.
Response 200 OK
{
"mappings": [
{
"id": "map-uuid-...",
"source_stream": "Opportunity",
"source_field": "Amount",
"target_schema": "dm",
"target_table": "deals",
"target_column": "deal_value_usd",
"transform_fn": "float(v)/100 if v else 0.0",
"llm_mapped": true,
"confidence_score": 0.9,
"reviewed": true,
"created_at": "2026-03-28T10:00:00Z"
}
],
"count": 48
}
Update a field mapping
PUT /v1/admin/connectors/{id}/mappings/{mapping_id}
Approve a mapping (reviewed: true), correct the target column, or edit the transform expression.
Request body (all fields optional)
| Field | Type | Description |
|---|
reviewed | bool | Mark mapping as reviewed/approved |
target_column | string | Correct the LLM’s column suggestion |
transform_fn | string | Python expression evaluated as eval(fn, {}, {"v": field_value}) |
Built-in transform patterns:
| Pattern | Expression |
|---|
| ISO-8601 date | str(v)[:10] if v else None |
| Cents → USD | float(v)/100 if v else 0.0 |
| Strip + upper-case | v.strip().upper() if v else None |
| Boolean coerce | bool(v) |
| Null-safe string | v.strip() if v else None |
# Approve a mapping
curl -X PUT https://api.ivory.finance/v1/admin/connectors/a1b2c3d4-.../mappings/map-uuid-... \
-H "Authorization: Bearer $IVORY_JWT" \
-H "Content-Type: application/json" \
-d '{"reviewed": true}'
# Fix target column + transform
curl -X PUT https://api.ivory.finance/v1/admin/connectors/a1b2c3d4-.../mappings/map-uuid-... \
-H "Authorization: Bearer $IVORY_JWT" \
-H "Content-Type: application/json" \
-d '{
"target_column": "deal_value_usd",
"transform_fn": "round(float(v) / 100, 2) if v else 0.0"
}'
Response 200 OK
Returns the updated mapping record.
Webhook callback
POST /v1/etl/meltano/webhook
Called by meltano_runner_worker when a tap subprocess completes. Not intended for direct client use — the runner handles this automatically.
Authentication
The runner signs the X-Webhook-Secret header with HMAC-SHA256:
X-Webhook-Secret: HMAC-SHA256(connector.webhook_secret, connector_id)
Requests without a valid signature are rejected with 403 Forbidden.
Request body
| Field | Type | Description |
|---|
connector_id | string | UUID of the connector |
tenant_id | string | Tenant UUID |
source_system | string | Tap identity |
status | string | completed | failed | partial |
rows_written | int | Rows written by the Singer tap |
tap_state | object | Singer STATE message for next incremental run |
error_msg | string | Error detail on failure |
duration_s | int | Runner subprocess wall-clock seconds |
run_id | string | etl.runner_jobs.id (optional) |
Behaviour on status = "completed"
- Runs
_transform_and_load() → staging → IOMETE Iceberg
- Persists
tap_state for the next incremental run
- Advances
next_run_at from sync_schedule
- Resets
consecutive_failures to 0
- Inserts a row in
etl.sync_runs with rows_synced + rows_loaded
Behaviour on status = "failed"
- Increments
consecutive_failures
- If
consecutive_failures >= 3: sets status = 'error' (circuit breaker)
- Still inserts
etl.sync_runs row with error details
Response 200 OK
{
"run_id": "run-uuid-...",
"status": "completed",
"rows_loaded": 1204,
"rows_failed": 0,
"tables": ["ivory_tenant_acme.salesforce_opportunities"]
}