cannectors

database (input)

Pull records from PostgreSQL, MySQL, or SQLite.

The database input executes a SQL query, returns one record per row, and feeds them into the filter chain. Supports cursor or limit-offset pagination, incremental queries via a tracked timestamp/ID, and CRON scheduling.

Minimal example

input:
  type: database
  connectionStringRef: ${SOURCE_DATABASE_URL}
  query: SELECT id, name, email FROM customers

Options

propertytypedefaultdescription
typerequired
"database"Module type discriminator. Must be `database` for this module.
id
stringUnique identifier within the pipeline.
name
stringHuman-readable name.
description
string
enabled
booleantrueWhether module is active.
tags
array<string>
onError
string"fail"Default error action. Case-insensitive; normalized to lowercase by the runtime.
schedule
stringOptional CRON expression to run the database input on a schedule. Validated at runtime.
parameters
objectQuery parameters. Keys are parameter names (used with :paramName syntax).
object
object
query
stringSQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}).
queryFile
stringPath to SQL file. Supports module-specific placeholders.
connectionString
stringDatabase connection string (DSN). Avoid in production - use connectionStringRef instead.
connectionStringRef
stringEnvironment variable reference for connection string. Format: ${ENV_VAR_NAME}
driver
stringDatabase driver. Auto-detected from connection string if not specified.
postgresmysqlsqlite
maxOpenConns
integer10Maximum number of open connections in the pool.
maxIdleConns
integer5Maximum number of idle connections in the pool.
connMaxLifetimeSeconds
integer1800Maximum lifetime of a connection in seconds.
connMaxIdleTimeSeconds
integer300Maximum idle time for a connection in seconds.
timeoutMs
integer30000Query timeout in milliseconds.

Connection

Provide one of connectionString or connectionStringRef. Use the ref form in production — it reads from the environment, keeping secrets out of the YAML.

connectionStringRef: ${SOURCE_DATABASE_URL}

The driver is auto-detected from the URL scheme:

SchemeDriver
postgres://…postgres
mysql://… or user:pass@tcp(host:port)/dbmysql
file:./db.sqlitesqlite

Explicitly set driver: if you need to override the detection.

Pagination

Limit + offset

pagination:
  type: limit-offset
  limit: 500
  param: offset       # placeholder name inside the SQL
query: |
  SELECT id, payload
  FROM events
  ORDER BY id
  LIMIT 500 OFFSET :offset

Cursor

pagination:
  type: cursor
  cursorField: id        # which column to use as the cursor value
  param: cursor          # placeholder name in the SQL
query: |
  SELECT id, payload
  FROM events
  WHERE id > :cursor
  ORDER BY id
  LIMIT 500

Cursor pagination is the right choice for any large incremental dataset — offsets get slow once you're millions of rows in.

Incremental queries

For queries that should only return rows changed since the last run, combine incremental with :lastRun style placeholders:

incremental:
  enabled: true
  timestampField: updated_at
  timestampParam: lastTs

query: |
  SELECT id, payload
  FROM events
  WHERE updated_at > :lastTs
  ORDER BY updated_at

The first run, when no state is yet persisted, gets NULL. Handle that in your SQL with COALESCE(:lastTs, '-infinity'::timestamptz) or similar.

Examples

examples/07-database-input-basic-to-http.yamlview source ↗
07-database-input-basic-to-http.yaml
name: database-input-basic-to-http
version: 1.0.0
description: Read rows from a database and send them as an HTTP batch.
tags:
  - database-input
  - http-output
input:
  type: database
  connectionStringRef: ${SOURCE_DATABASE_URL}
  driver: postgres
  query: |
    select id, email, updated_at
    from customers
    where active = true
    order by updated_at asc
filters:
  - type: mapping
    mappings:
      - source: id
        target: id
        transforms:
          - op: toString
      - source: email
        target: email
        transforms:
          - op: lowercase
      - source: updated_at
        target: updatedAt
        transforms:
          - op: dateFormat
            format: YYYY-MM-DDTHH:mm:ss
output:
  type: httpRequest
  endpoint: https://destination.example.com/api/customers/sync
  method: POST
  requestMode: batch
examples/08-database-input-limit-offset-to-database.yamlview source ↗
08-database-input-limit-offset-to-database.yaml
name: database-input-limit-offset-to-database
version: 1.0.0
description: Read a paginated SQL source and write rows into another database.
tags:
  - database-input
  - pagination
input:
  type: database
  connectionStringRef: ${SOURCE_DATABASE_URL}
  driver: postgres
  query: |
    select id, sku, quantity
    from inventory
    order by id asc
    limit :limit offset :offset
  pagination:
    type: limit-offset
    limit: 500
    param: offset
filters:
  - type: set
    target: sync_source
    value: source-db
output:
  type: database
  connectionStringRef: ${WAREHOUSE_DATABASE_URL}
  driver: postgres
  query: |
    insert into inventory_snapshot (id, sku, quantity, sync_source)
    values ({{record.id}}, {{record.sku}}, {{record.quantity}}, {{record.sync_source}})
  transaction: true
examples/09-database-input-cursor-incremental.yamlview source ↗
09-database-input-cursor-incremental.yaml
name: database-input-cursor-incremental
version: 1.0.0
description: Read database rows incrementally using cursor pagination metadata.
tags:
  - database-input
  - incremental
input:
  type: database
  connectionStringRef: ${SOURCE_DATABASE_URL}
  driver: postgres
  query: |
    select id, customer_id, total, updated_at
    from orders
    where updated_at > {{lastRunTimestamp}}
      and id > :last_id
    order by id asc
    limit :limit
  pagination:
    type: cursor
    limit: 200
    cursorField: id
    param: last_id
  incremental:
    enabled: true
    timestampField: updated_at
    timestampParam: lastRunTimestamp
    idField: id
    idParam: last_id
filters:
  - type: mapping
    mappings:
      - source: id
        target: order.id
      - source: customer_id
        target: order.customerId
      - source: total
        target: order.total
        transforms:
          - op: toFloat
output:
  type: httpRequest
  endpoint: https://destination.example.com/api/orders
  method: POST
  requestMode: batch

Cross-references