sql_call
Per-record SQL enrichment with caching and merge strategies.
The sql_call filter runs a SQL query per record, then merges
the result back onto the record. Placeholders like {{record.x}} are
substituted as positional parameters at execution time — they are
never string-spliced into the SQL, so they're safe by construction.
Minimal example
filters:
- type: sql_call
connectionStringRef: ${REFERENCE_DATABASE_URL}
query: SELECT tier FROM customers WHERE id = {{record.customerId}}
mergeStrategy: mergeOptions
| property | type | default | description |
|---|---|---|---|
typerequired | "sql_call" | — | Module type discriminator. Must be `sql_call` for this module. |
id | string | — | Unique identifier within the pipeline. |
name | string | — | Human-readable name. |
description | string | — | — |
enabled | boolean | true | Whether module is active. |
tags | array<string> | — | — |
onError | string | "fail" | Default error action. Case-insensitive; normalized to lowercase by the runtime. |
mergeStrategy | string | "merge" | How to merge query results with input records. mergereplaceappend |
resultKey | string | — | Key for storing result in append mode. |
| object | — | — | |
query | string | — | SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}). |
queryFile | string | — | Path to SQL file. Supports module-specific placeholders. |
connectionString | string | — | Database connection string (DSN). Avoid in production - use connectionStringRef instead. |
connectionStringRef | string | — | Environment variable reference for connection string. Format: ${ENV_VAR_NAME} |
driver | string | — | Database driver. Auto-detected from connection string if not specified. postgresmysqlsqlite |
maxOpenConns | integer | 10 | Maximum number of open connections in the pool. |
maxIdleConns | integer | 5 | Maximum number of idle connections in the pool. |
connMaxLifetimeSeconds | integer | 1800 | Maximum lifetime of a connection in seconds. |
connMaxIdleTimeSeconds | integer | 300 | Maximum idle time for a connection in seconds. |
timeoutMs | integer | 30000 | Query timeout in milliseconds. |
Positional parameters
Every {{record.<path>}} placeholder in query is bound as a
positional parameter at execution time ($1, $2, …in PostgreSQL;
? in MySQL/SQLite). The order matches first-appearance order in the
query string.
query: |
SELECT plan, mrr
FROM accounts
WHERE id = {{record.account_id}}
AND region = {{record.region}}Never wrap a {{record.x}} placeholder in quotes. It's already a
parameter — the database will reject quoted parameters as syntax errors,
or worse, treat them as literal strings.
Connection
Same shape as the database input — connectionString or
connectionStringRef. Use the ref form for production:
connectionStringRef: ${REFERENCE_DATABASE_URL}See database input · connection for driver auto-detection and tuning.
Merge strategies
| Strategy | Effect |
|---|---|
merge (default) | Deep-merge the first returned row onto the record. Nested objects are merged; SQL values overwrite conflicts. |
replace | Overlay the first returned row onto the record. Existing fields not present in the row are preserved. |
append | Store the first returned row under resultKey. Requires resultKey to be set. |
mergeStrategy: append
resultKey: enrichmentssql_call currently consumes only the first row returned by the query.
For one-to-many enrichment, aggregate the related data in SQL so the query
returns one row with the desired value under a column.
Caching
sql_call has the same cache config as http_call — LRU with TTL.
cache:
enabled: true
maxSize: 1000
ttlSeconds: 600
key: "{{record.customerId}}"The same warning applies: without an explicit cache.key or per-record
parameters, all records will hit the same cache slot.
Examples
name: sql-call-merge-cache
version: 1.0.0
description: Enrich records with a SQL lookup and deep merge the result.
tags:
- sql-call
- cache
input:
type: httpPolling
schedule: "*/30 * * * *"
endpoint: https://source.example.com/api/orders
dataField: orders
filters:
- type: sql_call
connectionStringRef: ${REFERENCE_DATABASE_URL}
driver: postgres
query: |
select segment, account_owner
from customer_reference
where customer_id = {{record.customer.id}}
limit 1
mergeStrategy: merge
cache:
enabled: true
maxSize: 2000
ttlSeconds: 600
key: "customer:{{record.customer.id}}"
output:
type: httpRequest
endpoint: https://destination.example.com/api/orders/enriched
method: POST
requestMode: batchname: sql-call-append-query-file
version: 1.0.0
description: Use queryFile for SQL enrichment and append the lookup result under a custom key.
tags:
- sql-call
- query-file
input:
type: httpPolling
schedule: "0 * * * *"
endpoint: https://source.example.com/api/customers
dataField: customers
filters:
- type: sql_call
connectionStringRef: ${REFERENCE_DATABASE_URL}
driver: postgres
queryFile: examples/assets/sql/customer_lookup.sql
mergeStrategy: append
resultKey: reference
cache:
enabled: true
maxSize: 1000
ttlSeconds: 300
key: "customer:{{record.id}}"
output:
type: httpRequest
endpoint: https://destination.example.com/api/customers/reference
method: POST
requestMode: batch