Your First Pipeline
Build a Cannectors pipeline from scratch and run it locally.
In this guide, you'll write a pipeline that polls an HTTP API on a schedule, maps a few fields, drops records you don't care about, and POSTs the rest to another endpoint. Everything stays local until you're ready to point it at real systems.
Scenario
You're consuming an /api/orders endpoint and forwarding only paid
orders to an internal /api/orders/import endpoint. Both endpoints
require a Bearer token, and you want a 15-minute schedule with retries
on 5xx responses.
Create the pipeline file
Create a new file sync-orders.yaml. Pipelines need four top-level
keys: input, filters, output, plus a name for logs.
name: sync-orders
version: 1.0.0
description: Forward paid orders to the internal import API.Wire the input
httpPolling calls a URL on a schedule and reads records from a JSON
field in the response.
input:
type: httpPolling
schedule: "*/15 * * * *"
endpoint: https://source.example.com/api/orders
dataField: orders
authentication:
type: bearer
credentials:
token: ${SOURCE_BEARER_TOKEN}Notable parts:
scheduleis a 5-field CRON expression. Without it, the pipeline runs once and exits.dataFieldis the JSON key in the response that holds the array of records (here,{ "orders": [...] }).${SOURCE_BEARER_TOKEN}is read from your shell environment at startup — secrets never live in the YAML.
Add filters
Filters run in declared order. Two are enough here:
mapping— flatten thecustomer.emailnested field and lowercase it.condition— drop anything not paid.
filters:
- type: mapping
mappings:
- source: order_id
target: id
- source: customer.email
target: email
transforms:
- op: lowercase
- type: condition
expression: "status == 'paid'"
else:
- type: dropThe absent then branch keeps paid records unchanged. The else branch
explicitly drops every non-paid record, so the output never sees it.
Wire the output
httpRequest POSTs records to an endpoint, in batch (one request for
the whole set) or single (one request per record). For an internal
import API, batch is the right shape.
output:
type: httpRequest
endpoint: https://destination.example.com/api/orders/import
method: POST
requestMode: batch
authentication:
type: bearer
credentials:
token: ${DESTINATION_BEARER_TOKEN}
retry:
maxAttempts: 3
delayMs: 500
backoffMultiplier: 2
maxDelayMs: 5000
retryableStatusCodes: [429, 500, 502, 503, 504]The retry block honors Retry-After automatically and backs off
exponentially with a jittered base of 500 ms.
Final file
Put it all together:
name: sync-orders
version: 1.0.0
description: Forward paid orders to the internal import API.
input:
type: httpPolling
schedule: "*/15 * * * *"
endpoint: https://source.example.com/api/orders
dataField: orders
authentication:
type: bearer
credentials:
token: ${SOURCE_BEARER_TOKEN}
filters:
- type: mapping
mappings:
- source: order_id
target: id
- source: customer.email
target: email
transforms:
- op: lowercase
- type: condition
expression: "status == 'paid'"
else:
- type: drop
output:
type: httpRequest
endpoint: https://destination.example.com/api/orders/import
method: POST
requestMode: batch
authentication:
type: bearer
credentials:
token: ${DESTINATION_BEARER_TOKEN}
retry:
maxAttempts: 3
delayMs: 500
backoffMultiplier: 2
maxDelayMs: 5000
retryableStatusCodes: [429, 500, 502, 503, 504]Validate it
Before running, validate the YAML against the schema:
cannectors validate sync-orders.yamlThis catches typos, missing required fields, and invalid types.
Dry-run it
Set the environment variables, then preview what would be sent:
export SOURCE_BEARER_TOKEN=…
export DESTINATION_BEARER_TOKEN=…
cannectors run --dry-run sync-orders.yamlYou'll see the input fetch, the mapping pass, the condition filter, and a preview of the batch that would be POSTed.
--dry-run executes the input and filters for real (so it hits the source
API), but stops before the output side effects. Use it on staging data
before you point a fresh pipeline at production endpoints.
Run it for real
Remove --dry-run:
cannectors run sync-orders.yamlBecause the input has a schedule, the process stays alive and triggers
every 15 minutes. Use Ctrl-C (or your container runtime's stop signal)
to shut it down cleanly.