State persistence
Resume from cursor, last-seen ID, or timestamp between runs.
When a scheduled pipeline restarts — because of a deploy, an OOM, or your process supervisor — you usually don't want to refetch everything from scratch. State persistence keeps a small cursor file on disk that the input reads at boot and writes after every successful batch.
The three persistable cursors
| Type | What's tracked | Typical query parameter |
|---|---|---|
id | The largest record ID seen so far | after_id, since_id |
timestamp | The largest record timestamp seen so far | updated_after, since |
offset | Pagination offset | offset |
Most inputs support a mix. You pick the ones that match the source API's pagination semantics.
httpPolling example
input:
type: httpPolling
schedule: "*/10 * * * *"
endpoint: https://source.example.com/api/events
dataField: events
statePersistence:
timestamp:
enabled: true
field: event.timestamp # which record field is the timestamp
queryParam: updated_after # how the source API expects it
id:
enabled: true
field: event.id # which record field is the ID
queryParam: after_id
storagePath: ./.cannectors-state| Field | Meaning |
|---|---|
enabled | Master switch for this cursor type. |
field | Dot-notated path to the field on the record. |
queryParam | The query parameter name the source API expects. |
storagePath | Directory where state files live. Shared across cursor types. |
After each successful batch, Cannectors writes the max-seen values into
./.cannectors-state/<pipeline-name>.json. On the next run, it reads
that file and appends the relevant query parameters to the request.
database example
For database inputs, the cursor is a SQL column instead of a query
parameter:
input:
type: database
connectionStringRef: ${SOURCE_DATABASE_URL}
query: |
SELECT id, updated_at, payload
FROM events
WHERE updated_at > $1
ORDER BY updated_at ASC
LIMIT 500
statePersistence:
timestamp:
enabled: true
field: updated_at
storagePath: ./.cannectors-stateThe placeholder $1 is bound to the persisted timestamp at runtime.
The first run, when no state exists yet, gets NULL — handle that in
your SQL (COALESCE($1::timestamptz, '-infinity'::timestamptz) is a
common pattern).
Where to put storagePath
| Environment | Recommended path |
|---|---|
| Local dev | ./.cannectors-state (.gitignore it) |
| Single host (systemd) | /var/lib/cannectors/state |
| Kubernetes | A PersistentVolumeClaim mounted at e.g. /state |
| Container with ephemeral disk | Mount an attached block volume; never trust the container filesystem |
If the storage disappears between runs, the pipeline restarts from scratch — possibly re-processing already-shipped records.
Before v22.6, the storagePath was only read by the input. The
executor would save to a different default path, so cursors never made
the round-trip cleanly. From v22.6 onwards, the executor shares the
input's storage. If you ever see state file written but never read
behaviour on an old version, upgrade.
Backfills
To force a fresh full sync, delete the relevant state file:
rm ./.cannectors-state/<pipeline-name>.jsonThe next run will treat itself as a fresh start. There is no "backfill from date X" knob in the YAML — instead, write to the state file manually if you really need to seed it:
echo '{"timestamp": "2026-01-01T00:00:00Z"}' \
> ./.cannectors-state/<pipeline-name>.json