cannectors

Your First Pipeline

Build a Cannectors pipeline from scratch and run it locally.

In this guide, you'll write a pipeline that polls an HTTP API on a schedule, maps a few fields, drops records you don't care about, and POSTs the rest to another endpoint. Everything stays local until you're ready to point it at real systems.

Scenario

You're consuming an /api/orders endpoint and forwarding only paid orders to an internal /api/orders/import endpoint. Both endpoints require a Bearer token, and you want a 15-minute schedule with retries on 5xx responses.

Create the pipeline file

Create a new file sync-orders.yaml. Pipelines need four top-level keys: input, filters, output, plus a name for logs.

sync-orders.yaml
name: sync-orders
version: 1.0.0
description: Forward paid orders to the internal import API.

Wire the input

httpPolling calls a URL on a schedule and reads records from a JSON field in the response.

sync-orders.yaml
input:
  type: httpPolling
  schedule: "*/15 * * * *"
  endpoint: https://source.example.com/api/orders
  dataField: orders
  authentication:
    type: bearer
    credentials:
      token: ${SOURCE_BEARER_TOKEN}

Notable parts:

  • schedule is a 5-field CRON expression. Without it, the pipeline runs once and exits.
  • dataField is the JSON key in the response that holds the array of records (here, { "orders": [...] }).
  • ${SOURCE_BEARER_TOKEN} is read from your shell environment at startup — secrets never live in the YAML.

Add filters

Filters run in declared order. Two are enough here:

  1. mapping — flatten the customer.email nested field and lowercase it.
  2. condition — drop anything not paid.
sync-orders.yaml
filters:
  - type: mapping
    mappings:
      - source: order_id
        target: id
      - source: customer.email
        target: email
        transforms:
          - op: lowercase

  - type: condition
    expression: "status == 'paid'"
    else:
      - type: drop

The absent then branch keeps paid records unchanged. The else branch explicitly drops every non-paid record, so the output never sees it.

Wire the output

httpRequest POSTs records to an endpoint, in batch (one request for the whole set) or single (one request per record). For an internal import API, batch is the right shape.

sync-orders.yaml
output:
  type: httpRequest
  endpoint: https://destination.example.com/api/orders/import
  method: POST
  requestMode: batch
  authentication:
    type: bearer
    credentials:
      token: ${DESTINATION_BEARER_TOKEN}
  retry:
    maxAttempts: 3
    delayMs: 500
    backoffMultiplier: 2
    maxDelayMs: 5000
    retryableStatusCodes: [429, 500, 502, 503, 504]

The retry block honors Retry-After automatically and backs off exponentially with a jittered base of 500 ms.

Final file

Put it all together:

sync-orders.yaml
name: sync-orders
version: 1.0.0
description: Forward paid orders to the internal import API.

input:
  type: httpPolling
  schedule: "*/15 * * * *"
  endpoint: https://source.example.com/api/orders
  dataField: orders
  authentication:
    type: bearer
    credentials:
      token: ${SOURCE_BEARER_TOKEN}

filters:
  - type: mapping
    mappings:
      - source: order_id
        target: id
      - source: customer.email
        target: email
        transforms:
          - op: lowercase

  - type: condition
    expression: "status == 'paid'"
    else:
      - type: drop

output:
  type: httpRequest
  endpoint: https://destination.example.com/api/orders/import
  method: POST
  requestMode: batch
  authentication:
    type: bearer
    credentials:
      token: ${DESTINATION_BEARER_TOKEN}
  retry:
    maxAttempts: 3
    delayMs: 500
    backoffMultiplier: 2
    maxDelayMs: 5000
    retryableStatusCodes: [429, 500, 502, 503, 504]

Validate it

Before running, validate the YAML against the schema:

cannectors validate sync-orders.yaml

This catches typos, missing required fields, and invalid types.

Dry-run it

Set the environment variables, then preview what would be sent:

export SOURCE_BEARER_TOKEN=
export DESTINATION_BEARER_TOKEN=
cannectors run --dry-run sync-orders.yaml

You'll see the input fetch, the mapping pass, the condition filter, and a preview of the batch that would be POSTed.

--dry-run executes the input and filters for real (so it hits the source API), but stops before the output side effects. Use it on staging data before you point a fresh pipeline at production endpoints.

Run it for real

Remove --dry-run:

cannectors run sync-orders.yaml

Because the input has a schedule, the process stays alive and triggers every 15 minutes. Use Ctrl-C (or your container runtime's stop signal) to shut it down cleanly.

Where to go next