Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.ivory.finance/llms.txt

Use this file to discover all available pages before exploring further.

How the generic pipeline works

Every Singer tap — regardless of source — flows through the same transformation pipeline. No per-source code is required.
1. meltano run tap-{name} target-postgres
       ↓  writes to etl_staging_{source}.{stream}

2. POST /v1/etl/meltano/webhook
       ↓  verifies HMAC-SHA256 signature

3. _transform_and_load()
       ↓  reads etl.field_mappings for this connector
       ↓  for each staged row: applies transform_fn per field
       ↓  batches 500 rows → INSERT INTO {namespace}.{table} (Spark SQL)

4. Singer STATE persisted to etl.meltano_connectors.tap_state
       ↓  used as --state on the next incremental run

LLM field mapping

POST /v1/admin/connectors/{id}/map-fields
Calls GPT-4o-mini to propose source_field → target_column mappings for a Singer stream. Understands Salesforce objects, HubSpot deals, Bloomberg tickers, and canonical IB platform schema conventions (dm, md, ci, pm).

Request body

FieldRequiredDescription
source_streamYesSinger stream name (e.g. Opportunity, contacts)
source_schemaYes{"field_name": "field_type"} from meltano invoke tap-{name} --discover
target_schemaYesTarget schema: dm, md, ci, pm, or custom
target_tableYesTarget Iceberg table name
target_schema_defNo{"col_name": "col_type"} override — omit to use canonical schema
curl -X POST https://api.ivory.finance/v1/admin/connectors/a1b2c3d4-.../map-fields \
  -H "Authorization: Bearer $IVORY_JWT" \
  -H "Content-Type: application/json" \
  -d '{
    "source_stream": "Opportunity",
    "source_schema": {
      "Id": "string",
      "Name": "string",
      "Amount": "number",
      "CloseDate": "string",
      "StageName": "string",
      "OwnerId": "string",
      "CreatedDate": "string"
    },
    "target_schema": "dm",
    "target_table": "deals"
  }'

Response 201 Created

{
  "mapped": 6,
  "review_required": 1,
  "message": "1 mapping has confidence < 0.7 and should be reviewed.",
  "mappings": [
    {
      "id": "map-uuid-...",
      "source_stream": "Opportunity",
      "source_field": "Id",
      "target_schema": "dm",
      "target_table": "deals",
      "target_column": "external_id",
      "transform_fn": null,
      "llm_mapped": true,
      "confidence_score": 1.0,
      "reviewed": false
    },
    {
      "source_field": "Amount",
      "target_column": "deal_value_usd",
      "transform_fn": "float(v)/100 if v else 0.0",
      "confidence_score": 0.9
    },
    {
      "source_field": "CloseDate",
      "target_column": "close_date",
      "transform_fn": "str(v)[:10] if v else None",
      "confidence_score": 1.0
    },
    {
      "source_field": "StageName",
      "target_column": "stage",
      "transform_fn": "v.strip().upper() if v else None",
      "confidence_score": 0.9
    },
    {
      "source_field": "OwnerId",
      "target_column": "owner_crm_id",
      "transform_fn": null,
      "confidence_score": 0.65,
      "reviewed": false
    }
  ]
}
Confidence scores:
ScoreMeaning
1.0Exact name match
0.9Semantic match (e.g. Amountdeal_value_usd)
0.7Inferred mapping
< 0.7Uncertain — review before enabling

List field mappings

GET /v1/admin/connectors/{id}/mappings
Returns all mappings for the connector, ordered by source_stream, source_field.

Response 200 OK

{
  "mappings": [
    {
      "id": "map-uuid-...",
      "source_stream": "Opportunity",
      "source_field": "Amount",
      "target_schema": "dm",
      "target_table": "deals",
      "target_column": "deal_value_usd",
      "transform_fn": "float(v)/100 if v else 0.0",
      "llm_mapped": true,
      "confidence_score": 0.9,
      "reviewed": true,
      "created_at": "2026-03-28T10:00:00Z"
    }
  ],
  "count": 48
}

Update a field mapping

PUT /v1/admin/connectors/{id}/mappings/{mapping_id}
Approve a mapping (reviewed: true), correct the target column, or edit the transform expression.

Request body (all fields optional)

FieldTypeDescription
reviewedboolMark mapping as reviewed/approved
target_columnstringCorrect the LLM’s column suggestion
transform_fnstringPython expression evaluated as eval(fn, {}, {"v": field_value})
Built-in transform patterns:
PatternExpression
ISO-8601 datestr(v)[:10] if v else None
Cents → USDfloat(v)/100 if v else 0.0
Strip + upper-casev.strip().upper() if v else None
Boolean coercebool(v)
Null-safe stringv.strip() if v else None
# Approve a mapping
curl -X PUT https://api.ivory.finance/v1/admin/connectors/a1b2c3d4-.../mappings/map-uuid-... \
  -H "Authorization: Bearer $IVORY_JWT" \
  -H "Content-Type: application/json" \
  -d '{"reviewed": true}'

# Fix target column + transform
curl -X PUT https://api.ivory.finance/v1/admin/connectors/a1b2c3d4-.../mappings/map-uuid-... \
  -H "Authorization: Bearer $IVORY_JWT" \
  -H "Content-Type: application/json" \
  -d '{
    "target_column": "deal_value_usd",
    "transform_fn": "round(float(v) / 100, 2) if v else 0.0"
  }'

Response 200 OK

Returns the updated mapping record.

Webhook callback

POST /v1/etl/meltano/webhook
Called by meltano_runner_worker when a tap subprocess completes. Not intended for direct client use — the runner handles this automatically.

Authentication

The runner signs the X-Webhook-Secret header with HMAC-SHA256:
X-Webhook-Secret: HMAC-SHA256(connector.webhook_secret, connector_id)
Requests without a valid signature are rejected with 403 Forbidden.

Request body

FieldTypeDescription
connector_idstringUUID of the connector
tenant_idstringTenant UUID
source_systemstringTap identity
statusstringcompleted | failed | partial
rows_writtenintRows written by the Singer tap
tap_stateobjectSinger STATE message for next incremental run
error_msgstringError detail on failure
duration_sintRunner subprocess wall-clock seconds
run_idstringetl.runner_jobs.id (optional)

Behaviour on status = "completed"

  1. Runs _transform_and_load() → staging → IOMETE Iceberg
  2. Persists tap_state for the next incremental run
  3. Advances next_run_at from sync_schedule
  4. Resets consecutive_failures to 0
  5. Inserts a row in etl.sync_runs with rows_synced + rows_loaded

Behaviour on status = "failed"

  1. Increments consecutive_failures
  2. If consecutive_failures >= 3: sets status = 'error' (circuit breaker)
  3. Still inserts etl.sync_runs row with error details

Response 200 OK

{
  "run_id": "run-uuid-...",
  "status": "completed",
  "rows_loaded": 1204,
  "rows_failed": 0,
  "tables": ["ivory_tenant_acme.salesforce_opportunities"]
}