Execution flow
From `cannectors run pipeline.yaml` to a record landing at the output.
This is the exact sequence the runtime walks every time run fires.
Useful to read alongside the source — most of it lives under
internal/runtime/.
Steps
1. parse YAML → config tree
2. validate → errors or OK
3. resolve env vars → concrete config
4. build runtime → input, filters[], output, scheduler
5. tick → scheduler fires (CRON or one-shot)
6. fetch → input emits a batch of records
7. for each record:
7a. for each filter in order:
→ mutate / drop / branch
7b. queue to output
8. flush → output sends (batch) or per-record (single)
9. persist state → cursors written
10. log result → summary on stdout
11. loop or exit → scheduled? back to step 5. else exit 0.Step-by-step
1. Parse
internal/config/parser.go opens the file, normalizes whitespace,
and converts to YAML AST. JSON works too — gopkg.in/yaml.v3 parses
both. Output is a Go struct tree.
2. Validate
internal/config/validator.go runs the parsed tree through the JSON
Schema. Validation is recursive — each module type's schema is
walked, with allOf and oneOf resolved to pick the right shape.
Errors include line + column from the original YAML so the user sees useful messages.
3. Resolve env vars
internal/config/env.go walks the tree, substituting every
${VAR_NAME} with the corresponding value from os.Environ().
Substitution happens once at startup, so a rotated secret needs
a process restart.
If a ${VAR} doesn't resolve, the runtime exits before step 4 with
a clear error.
4. Build runtime
internal/runtime/builder.go walks the validated config and
instantiates one input, the filter chain, and one output by calling
each module's New(config) constructor. The constructor returns a
typed module value (e.g. *HTTPPollingInput).
The builder also wires the shared StateStore and the scheduler.
5. Tick
If the input has a schedule, the scheduler runs the pipeline on
every CRON match. Without a schedule, it runs exactly once.
If the runtime is still processing when the next tick fires, the runtime skips that tick and logs a warning. Runs never overlap.
6. Fetch
The input's Fetch(ctx) method runs. Implementations:
httpPolling: builds the URL (incl. pagination + state query params), sends GET, parses the JSON, returnsdataFieldas the records.webhook: returns the current delivery (one per HTTP POST).database: opens a connection, runs the query, scans rows.
Each Fetch returns []Record plus an opaque state delta that
the runtime will persist later.
7. Filter chain
For each record, the executor calls each filter's Process(ctx, record) in declared order. Filters return:
- A modified record (continue down the chain),
nil(drop the record),- An error (handled by
onError).
The chain stops at the first drop. A record only reaches the output if every filter passed it through.
8. Flush
The output's Send(ctx, records) runs. In batch mode, this is one
call with all records that survived. In single mode, it's one call
per record.
Retries happen inside Send. The runtime above only sees success or
the post-retry error.
9. Persist state
After a successful batch, the runtime asks the input for its state
delta and writes it to <storagePath>/<pipeline-name>.json. The
write is atomic (tempfile + rename) — a crash mid-write leaves the
previous state intact.
10. Log
A summary line goes to stdout:
{"time":"…","level":"info","pipeline":"sync-orders","msg":"complete","records":93,"duration_ms":1840}11. Loop or exit
If the input has a schedule and the process hasn't received a stop signal, the runtime sleeps until the next tick. Otherwise it exits 0.
Graceful shutdown
SIGINT / SIGTERM cancels the context passed through every layer. The runtime then:
- Stops accepting new ticks (scheduled mode) or new webhook deliveries (listener mode).
- Finishes the current batch —
FetchandProcesscalls already in flight see a cancelled context and decide whether to abort cleanly or finish what they started. - Flushes the output for completed records.
- Persists state.
- Closes connections (DB pools, HTTP transports).
- Exits 0.
TimeoutStopSec=30s (systemd) or terminationGracePeriodSeconds: 60
(Kubernetes) gives the runtime enough time for steps 2–5.