database (output)
Write records to PostgreSQL, MySQL, or SQLite.
The database output executes a SQL query for each record (or all of
them inside one transaction). Placeholders bound as positional
parameters, optional transactional wrapping, prepared statements
reused across records.
Minimal example
output:
type: database
connectionStringRef: ${WAREHOUSE_DATABASE_URL}
query: |
INSERT INTO orders (id, email, amount)
VALUES ({{record.id}}, {{record.email}}, {{record.amount}})
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
amount = EXCLUDED.amountOptions
| property | type | default | description |
|---|---|---|---|
typerequired | "database" | — | Module type discriminator. Must be `database` for this module. |
id | string | — | Unique identifier within the pipeline. |
name | string | — | Human-readable name. |
description | string | — | — |
enabled | boolean | true | Whether module is active. |
tags | array<string> | — | — |
onError | string | "fail" | Default error action. Case-insensitive; normalized to lowercase by the runtime. |
transaction | boolean | false | Wrap operations in a transaction. |
query | string | — | SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}). |
queryFile | string | — | Path to SQL file. Supports module-specific placeholders. |
connectionString | string | — | Database connection string (DSN). Avoid in production - use connectionStringRef instead. |
connectionStringRef | string | — | Environment variable reference for connection string. Format: ${ENV_VAR_NAME} |
driver | string | — | Database driver. Auto-detected from connection string if not specified. postgresmysqlsqlite |
maxOpenConns | integer | 10 | Maximum number of open connections in the pool. |
maxIdleConns | integer | 5 | Maximum number of idle connections in the pool. |
connMaxLifetimeSeconds | integer | 1800 | Maximum lifetime of a connection in seconds. |
connMaxIdleTimeSeconds | integer | 300 | Maximum idle time for a connection in seconds. |
timeoutMs | integer | 30000 | Query timeout in milliseconds. |
Query or query file
Provide one of query (inline) or queryFile (external). Inline is
convenient for short statements; external files are clearer for
anything beyond a one-liner.
queryFile: ./sql/upsert_order.sqlINSERT INTO orders (id, email, amount, paid_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
amount = EXCLUDED.amount,
paid_at = EXCLUDED.paid_at;The placeholders ($1, $2, … for PostgreSQL; ? for MySQL/SQLite)
are bound positionally from the order {{record.x}} appears in the
query (when using query) or via parameters (when using
queryFile).
Transactional writes
For atomic batch writes — either all records persist or none do — wrap in a transaction:
output:
type: database
connectionStringRef: ${WAREHOUSE_DATABASE_URL}
queryFile: ./sql/upsert_order.sql
transaction: trueThe runtime opens one transaction per output batch, executes the query once per record, then commits at the end. On any error, the transaction rolls back.
Connection
Same as the database input — see
Connection for driver
auto-detection and connection-pool tuning. The output reuses the same
pool config (maxOpenConns, maxIdleConns, etc.).
Examples
name: database-output-transaction-query-file
version: 1.0.0
description: Write records with a SQL query file in a transaction.
tags:
- database-output
- transaction
input:
type: httpPolling
schedule: "0 * * * *"
endpoint: https://source.example.com/api/products
dataField: products
filters:
- type: mapping
mappings:
- source: id
target: product_id
- source: sku
target: sku
- source: price
target: price
transforms:
- op: toFloat
- source: updated_at
target: updated_at
output:
type: database
connectionStringRef: ${WAREHOUSE_DATABASE_URL}
driver: postgres
queryFile: examples/assets/sql/upsert_product.sql
transaction: true
onError: fail