cannectors

database (output)

Write records to PostgreSQL, MySQL, or SQLite.

The database output executes a SQL query for each record (or all of them inside one transaction). Placeholders bound as positional parameters, optional transactional wrapping, prepared statements reused across records.

Minimal example

output:
  type: database
  connectionStringRef: ${WAREHOUSE_DATABASE_URL}
  query: |
    INSERT INTO orders (id, email, amount)
    VALUES ({{record.id}}, {{record.email}}, {{record.amount}})
    ON CONFLICT (id) DO UPDATE SET
      email = EXCLUDED.email,
      amount = EXCLUDED.amount

Options

propertytypedefaultdescription
typerequired
"database"Module type discriminator. Must be `database` for this module.
id
stringUnique identifier within the pipeline.
name
stringHuman-readable name.
description
string
enabled
booleantrueWhether module is active.
tags
array<string>
onError
string"fail"Default error action. Case-insensitive; normalized to lowercase by the runtime.
transaction
booleanfalseWrap operations in a transaction.
query
stringSQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}).
queryFile
stringPath to SQL file. Supports module-specific placeholders.
connectionString
stringDatabase connection string (DSN). Avoid in production - use connectionStringRef instead.
connectionStringRef
stringEnvironment variable reference for connection string. Format: ${ENV_VAR_NAME}
driver
stringDatabase driver. Auto-detected from connection string if not specified.
postgresmysqlsqlite
maxOpenConns
integer10Maximum number of open connections in the pool.
maxIdleConns
integer5Maximum number of idle connections in the pool.
connMaxLifetimeSeconds
integer1800Maximum lifetime of a connection in seconds.
connMaxIdleTimeSeconds
integer300Maximum idle time for a connection in seconds.
timeoutMs
integer30000Query timeout in milliseconds.

Query or query file

Provide one of query (inline) or queryFile (external). Inline is convenient for short statements; external files are clearer for anything beyond a one-liner.

queryFile: ./sql/upsert_order.sql
sql/upsert_order.sql
INSERT INTO orders (id, email, amount, paid_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
  email     = EXCLUDED.email,
  amount    = EXCLUDED.amount,
  paid_at   = EXCLUDED.paid_at;

The placeholders ($1, $2, … for PostgreSQL; ? for MySQL/SQLite) are bound positionally from the order {{record.x}} appears in the query (when using query) or via parameters (when using queryFile).

Transactional writes

For atomic batch writes — either all records persist or none do — wrap in a transaction:

output:
  type: database
  connectionStringRef: ${WAREHOUSE_DATABASE_URL}
  queryFile: ./sql/upsert_order.sql
  transaction: true

The runtime opens one transaction per output batch, executes the query once per record, then commits at the end. On any error, the transaction rolls back.

Connection

Same as the database input — see Connection for driver auto-detection and connection-pool tuning. The output reuses the same pool config (maxOpenConns, maxIdleConns, etc.).

Examples

examples/21-database-output-transaction-query-file.yamlview source ↗
21-database-output-transaction-query-file.yaml
name: database-output-transaction-query-file
version: 1.0.0
description: Write records with a SQL query file in a transaction.
tags:
  - database-output
  - transaction
input:
  type: httpPolling
  schedule: "0 * * * *"
  endpoint: https://source.example.com/api/products
  dataField: products
filters:
  - type: mapping
    mappings:
      - source: id
        target: product_id
      - source: sku
        target: sku
      - source: price
        target: price
        transforms:
          - op: toFloat
      - source: updated_at
        target: updated_at
output:
  type: database
  connectionStringRef: ${WAREHOUSE_DATABASE_URL}
  driver: postgres
  queryFile: examples/assets/sql/upsert_product.sql
  transaction: true
  onError: fail

Cross-references