cannectors

Execution flow

From `cannectors run pipeline.yaml` to a record landing at the output.

This is the exact sequence the runtime walks every time run fires. Useful to read alongside the source — most of it lives under internal/runtime/.

Steps

1. parse YAML        →  config tree
2. validate          →  errors or OK
3. resolve env vars  →  concrete config
4. build runtime     →  input, filters[], output, scheduler
5. tick              →  scheduler fires (CRON or one-shot)
6. fetch             →  input emits a batch of records
7. for each record:
   7a. for each filter in order:
       → mutate / drop / branch
   7b. queue to output
8. flush             →  output sends (batch) or per-record (single)
9. persist state     →  cursors written
10. log result       →  summary on stdout
11. loop or exit     →  scheduled? back to step 5. else exit 0.

Step-by-step

1. Parse

internal/config/parser.go opens the file, normalizes whitespace, and converts to YAML AST. JSON works too — gopkg.in/yaml.v3 parses both. Output is a Go struct tree.

2. Validate

internal/config/validator.go runs the parsed tree through the JSON Schema. Validation is recursive — each module type's schema is walked, with allOf and oneOf resolved to pick the right shape.

Errors include line + column from the original YAML so the user sees useful messages.

3. Resolve env vars

internal/config/env.go walks the tree, substituting every ${VAR_NAME} with the corresponding value from os.Environ(). Substitution happens once at startup, so a rotated secret needs a process restart.

If a ${VAR} doesn't resolve, the runtime exits before step 4 with a clear error.

4. Build runtime

internal/runtime/builder.go walks the validated config and instantiates one input, the filter chain, and one output by calling each module's New(config) constructor. The constructor returns a typed module value (e.g. *HTTPPollingInput).

The builder also wires the shared StateStore and the scheduler.

5. Tick

If the input has a schedule, the scheduler runs the pipeline on every CRON match. Without a schedule, it runs exactly once.

If the runtime is still processing when the next tick fires, the runtime skips that tick and logs a warning. Runs never overlap.

6. Fetch

The input's Fetch(ctx) method runs. Implementations:

  • httpPolling: builds the URL (incl. pagination + state query params), sends GET, parses the JSON, returns dataField as the records.
  • webhook: returns the current delivery (one per HTTP POST).
  • database: opens a connection, runs the query, scans rows.

Each Fetch returns []Record plus an opaque state delta that the runtime will persist later.

7. Filter chain

For each record, the executor calls each filter's Process(ctx, record) in declared order. Filters return:

  • A modified record (continue down the chain),
  • nil (drop the record),
  • An error (handled by onError).

The chain stops at the first drop. A record only reaches the output if every filter passed it through.

8. Flush

The output's Send(ctx, records) runs. In batch mode, this is one call with all records that survived. In single mode, it's one call per record.

Retries happen inside Send. The runtime above only sees success or the post-retry error.

9. Persist state

After a successful batch, the runtime asks the input for its state delta and writes it to <storagePath>/<pipeline-name>.json. The write is atomic (tempfile + rename) — a crash mid-write leaves the previous state intact.

10. Log

A summary line goes to stdout:

{"time":"…","level":"info","pipeline":"sync-orders","msg":"complete","records":93,"duration_ms":1840}

11. Loop or exit

If the input has a schedule and the process hasn't received a stop signal, the runtime sleeps until the next tick. Otherwise it exits 0.

Graceful shutdown

SIGINT / SIGTERM cancels the context passed through every layer. The runtime then:

  1. Stops accepting new ticks (scheduled mode) or new webhook deliveries (listener mode).
  2. Finishes the current batch — Fetch and Process calls already in flight see a cancelled context and decide whether to abort cleanly or finish what they started.
  3. Flushes the output for completed records.
  4. Persists state.
  5. Closes connections (DB pools, HTTP transports).
  6. Exits 0.

TimeoutStopSec=30s (systemd) or terminationGracePeriodSeconds: 60 (Kubernetes) gives the runtime enough time for steps 2–5.

See also