database (input)
Pull records from PostgreSQL, MySQL, or SQLite.
The database input executes a SQL query, returns one record per row,
and feeds them into the filter chain. Supports cursor or limit-offset
pagination, incremental queries via a tracked timestamp/ID, and CRON
scheduling.
Minimal example
input:
type: database
connectionStringRef: ${SOURCE_DATABASE_URL}
query: SELECT id, name, email FROM customersOptions
| property | type | default | description |
|---|---|---|---|
typerequired | "database" | — | Module type discriminator. Must be `database` for this module. |
id | string | — | Unique identifier within the pipeline. |
name | string | — | Human-readable name. |
description | string | — | — |
enabled | boolean | true | Whether module is active. |
tags | array<string> | — | — |
onError | string | "fail" | Default error action. Case-insensitive; normalized to lowercase by the runtime. |
schedule | string | — | Optional CRON expression to run the database input on a schedule. Validated at runtime. |
parameters | object | — | Query parameters. Keys are parameter names (used with :paramName syntax). |
| object | — | — | |
| object | — | — | |
query | string | — | SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}). |
queryFile | string | — | Path to SQL file. Supports module-specific placeholders. |
connectionString | string | — | Database connection string (DSN). Avoid in production - use connectionStringRef instead. |
connectionStringRef | string | — | Environment variable reference for connection string. Format: ${ENV_VAR_NAME} |
driver | string | — | Database driver. Auto-detected from connection string if not specified. postgresmysqlsqlite |
maxOpenConns | integer | 10 | Maximum number of open connections in the pool. |
maxIdleConns | integer | 5 | Maximum number of idle connections in the pool. |
connMaxLifetimeSeconds | integer | 1800 | Maximum lifetime of a connection in seconds. |
connMaxIdleTimeSeconds | integer | 300 | Maximum idle time for a connection in seconds. |
timeoutMs | integer | 30000 | Query timeout in milliseconds. |
Connection
Provide one of connectionString or connectionStringRef. Use the
ref form in production — it reads from the environment, keeping
secrets out of the YAML.
connectionStringRef: ${SOURCE_DATABASE_URL}The driver is auto-detected from the URL scheme:
| Scheme | Driver |
|---|---|
postgres://… | postgres |
mysql://… or user:pass@tcp(host:port)/db | mysql |
file:./db.sqlite | sqlite |
Explicitly set driver: if you need to override the detection.
Pagination
Limit + offset
pagination:
type: limit-offset
limit: 500
param: offset # placeholder name inside the SQL
query: |
SELECT id, payload
FROM events
ORDER BY id
LIMIT 500 OFFSET :offsetCursor
pagination:
type: cursor
cursorField: id # which column to use as the cursor value
param: cursor # placeholder name in the SQL
query: |
SELECT id, payload
FROM events
WHERE id > :cursor
ORDER BY id
LIMIT 500Cursor pagination is the right choice for any large incremental dataset — offsets get slow once you're millions of rows in.
Incremental queries
For queries that should only return rows changed since the last run,
combine incremental with :lastRun style placeholders:
incremental:
enabled: true
timestampField: updated_at
timestampParam: lastTs
query: |
SELECT id, payload
FROM events
WHERE updated_at > :lastTs
ORDER BY updated_atThe first run, when no state is yet persisted, gets NULL. Handle that
in your SQL with COALESCE(:lastTs, '-infinity'::timestamptz) or
similar.
Examples
name: database-input-basic-to-http
version: 1.0.0
description: Read rows from a database and send them as an HTTP batch.
tags:
- database-input
- http-output
input:
type: database
connectionStringRef: ${SOURCE_DATABASE_URL}
driver: postgres
query: |
select id, email, updated_at
from customers
where active = true
order by updated_at asc
filters:
- type: mapping
mappings:
- source: id
target: id
transforms:
- op: toString
- source: email
target: email
transforms:
- op: lowercase
- source: updated_at
target: updatedAt
transforms:
- op: dateFormat
format: YYYY-MM-DDTHH:mm:ss
output:
type: httpRequest
endpoint: https://destination.example.com/api/customers/sync
method: POST
requestMode: batchname: database-input-limit-offset-to-database
version: 1.0.0
description: Read a paginated SQL source and write rows into another database.
tags:
- database-input
- pagination
input:
type: database
connectionStringRef: ${SOURCE_DATABASE_URL}
driver: postgres
query: |
select id, sku, quantity
from inventory
order by id asc
limit :limit offset :offset
pagination:
type: limit-offset
limit: 500
param: offset
filters:
- type: set
target: sync_source
value: source-db
output:
type: database
connectionStringRef: ${WAREHOUSE_DATABASE_URL}
driver: postgres
query: |
insert into inventory_snapshot (id, sku, quantity, sync_source)
values ({{record.id}}, {{record.sku}}, {{record.quantity}}, {{record.sync_source}})
transaction: truename: database-input-cursor-incremental
version: 1.0.0
description: Read database rows incrementally using cursor pagination metadata.
tags:
- database-input
- incremental
input:
type: database
connectionStringRef: ${SOURCE_DATABASE_URL}
driver: postgres
query: |
select id, customer_id, total, updated_at
from orders
where updated_at > {{lastRunTimestamp}}
and id > :last_id
order by id asc
limit :limit
pagination:
type: cursor
limit: 200
cursorField: id
param: last_id
incremental:
enabled: true
timestampField: updated_at
timestampParam: lastRunTimestamp
idField: id
idParam: last_id
filters:
- type: mapping
mappings:
- source: id
target: order.id
- source: customer_id
target: order.customerId
- source: total
target: order.total
transforms:
- op: toFloat
output:
type: httpRequest
endpoint: https://destination.example.com/api/orders
method: POST
requestMode: batch