cannectors

sql_call

Per-record SQL enrichment with caching and merge strategies.

The sql_call filter runs a SQL query per record, then merges the result back onto the record. Placeholders like {{record.x}} are substituted as positional parameters at execution time — they are never string-spliced into the SQL, so they're safe by construction.

Minimal example

filters:
  - type: sql_call
    connectionStringRef: ${REFERENCE_DATABASE_URL}
    query: SELECT tier FROM customers WHERE id = {{record.customerId}}
    mergeStrategy: merge

Options

propertytypedefaultdescription
typerequired
"sql_call"Module type discriminator. Must be `sql_call` for this module.
id
stringUnique identifier within the pipeline.
name
stringHuman-readable name.
description
string
enabled
booleantrueWhether module is active.
tags
array<string>
onError
string"fail"Default error action. Case-insensitive; normalized to lowercase by the runtime.
mergeStrategy
string"merge"How to merge query results with input records.
mergereplaceappend
resultKey
stringKey for storing result in append mode.
object
query
stringSQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}).
queryFile
stringPath to SQL file. Supports module-specific placeholders.
connectionString
stringDatabase connection string (DSN). Avoid in production - use connectionStringRef instead.
connectionStringRef
stringEnvironment variable reference for connection string. Format: ${ENV_VAR_NAME}
driver
stringDatabase driver. Auto-detected from connection string if not specified.
postgresmysqlsqlite
maxOpenConns
integer10Maximum number of open connections in the pool.
maxIdleConns
integer5Maximum number of idle connections in the pool.
connMaxLifetimeSeconds
integer1800Maximum lifetime of a connection in seconds.
connMaxIdleTimeSeconds
integer300Maximum idle time for a connection in seconds.
timeoutMs
integer30000Query timeout in milliseconds.

Positional parameters

Every {{record.<path>}} placeholder in query is bound as a positional parameter at execution time ($1, $2, …in PostgreSQL; ? in MySQL/SQLite). The order matches first-appearance order in the query string.

query: |
  SELECT plan, mrr
  FROM accounts
  WHERE id = {{record.account_id}}
    AND region = {{record.region}}

Never wrap a {{record.x}} placeholder in quotes. It's already a parameter — the database will reject quoted parameters as syntax errors, or worse, treat them as literal strings.

Connection

Same shape as the database input — connectionString or connectionStringRef. Use the ref form for production:

connectionStringRef: ${REFERENCE_DATABASE_URL}

See database input · connection for driver auto-detection and tuning.

Merge strategies

StrategyEffect
merge (default)Deep-merge the first returned row onto the record. Nested objects are merged; SQL values overwrite conflicts.
replaceOverlay the first returned row onto the record. Existing fields not present in the row are preserved.
appendStore the first returned row under resultKey. Requires resultKey to be set.
mergeStrategy: append
resultKey: enrichments

sql_call currently consumes only the first row returned by the query. For one-to-many enrichment, aggregate the related data in SQL so the query returns one row with the desired value under a column.

Caching

sql_call has the same cache config as http_call — LRU with TTL.

cache:
  enabled: true
  maxSize: 1000
  ttlSeconds: 600
  key: "{{record.customerId}}"

The same warning applies: without an explicit cache.key or per-record parameters, all records will hit the same cache slot.

Examples

examples/17-sql-call-merge-cache.yamlview source ↗
17-sql-call-merge-cache.yaml
name: sql-call-merge-cache
version: 1.0.0
description: Enrich records with a SQL lookup and deep merge the result.
tags:
  - sql-call
  - cache
input:
  type: httpPolling
  schedule: "*/30 * * * *"
  endpoint: https://source.example.com/api/orders
  dataField: orders
filters:
  - type: sql_call
    connectionStringRef: ${REFERENCE_DATABASE_URL}
    driver: postgres
    query: |
      select segment, account_owner
      from customer_reference
      where customer_id = {{record.customer.id}}
      limit 1
    mergeStrategy: merge
    cache:
      enabled: true
      maxSize: 2000
      ttlSeconds: 600
      key: "customer:{{record.customer.id}}"
output:
  type: httpRequest
  endpoint: https://destination.example.com/api/orders/enriched
  method: POST
  requestMode: batch
examples/18-sql-call-append-query-file.yamlview source ↗
18-sql-call-append-query-file.yaml
name: sql-call-append-query-file
version: 1.0.0
description: Use queryFile for SQL enrichment and append the lookup result under a custom key.
tags:
  - sql-call
  - query-file
input:
  type: httpPolling
  schedule: "0 * * * *"
  endpoint: https://source.example.com/api/customers
  dataField: customers
filters:
  - type: sql_call
    connectionStringRef: ${REFERENCE_DATABASE_URL}
    driver: postgres
    queryFile: examples/assets/sql/customer_lookup.sql
    mergeStrategy: append
    resultKey: reference
    cache:
      enabled: true
      maxSize: 1000
      ttlSeconds: 300
      key: "customer:{{record.id}}"
output:
  type: httpRequest
  endpoint: https://destination.example.com/api/customers/reference
  method: POST
  requestMode: batch

Cross-references