webhook
Listen for incoming HTTP POSTs and process them as records.
The webhook input starts an HTTP server and treats each incoming POST
as a delivery to process. Each delivery is either treated as a single
record, or unwrapped via dataField into an array of records.
The process stays alive as long as the server is running — there's no
schedule for webhook inputs.
Minimal example
input:
type: webhook
path: /webhooks/shipmentsThis starts an HTTP server listening on 0.0.0.0:8080 and routes POSTs
on /webhooks/shipments into the filter chain.
Options
| property | type | default | description |
|---|---|---|---|
typerequired | "webhook" | — | Module type discriminator. Must be `webhook` for this module. |
id | string | — | Unique identifier within the pipeline. |
name | string | — | Human-readable name. |
description | string | — | — |
enabled | boolean | true | Whether module is active. |
tags | array<string> | — | — |
onError | string | "fail" | Default error action. Case-insensitive; normalized to lowercase by the runtime. |
pathrequired | string | — | Webhook endpoint path (e.g. /webhook/orders). |
listenAddress | string | "0.0.0.0:8080" | Webhook server listen address. |
requestTimeoutMs | integer | — | Per-request HTTP read/write timeout in milliseconds. Does NOT bound the webhook server lifetime. |
| object | — | — | |
| object | — | — | |
queueSize | integer | — | Queue size for async webhook processing. |
maxConcurrent | integer | — | Maximum concurrent webhook workers. |
dataField | string | — | JSON field path containing the array of records to extract from the request body. |
Signature verification
Webhook deliveries should be authenticated. Cannectors supports HMAC-SHA256 signature verification out of the box.
signature:
type: hmac-sha256
header: X-Hub-Signature-256 # optional, defaults to this
secret: ${WEBHOOK_HMAC_SECRET}The server computes HMAC-SHA256 over the raw body using the shared
secret, then compares to the header value. Requests that don't match
get rejected with 401 Unauthorized.
Queue and rate limiting
By default, the server processes deliveries synchronously — the HTTP response is held until the pipeline finishes the record. For bursty sources, configure an in-memory queue:
queueSize: 1024
maxConcurrent: 4
rateLimit:
requestsPerSecond: 50
burst: 100| Field | What it does |
|---|---|
queueSize | Number of deliveries buffered before the server returns 503. |
maxConcurrent | Number of pipeline workers consuming the queue concurrently. |
rateLimit.requestsPerSecond | Steady-state rate the source can deliver. |
rateLimit.burst | Maximum burst above the steady rate. |
Deliveries served by a worker run after the HTTP response is sent.
That means the HTTP request context is already done — the runtime
uses the server's context for downstream stages so that
http_call/sql_call aren't cancelled mid-flight.
Examples
name: webhook-hmac-to-http-single
version: 1.0.0
description: Receive signed webhook payloads and forward each record separately.
tags:
- webhook
- hmac
input:
type: webhook
path: /webhooks/orders
listenAddress: 0.0.0.0:8080
dataField: orders
signature:
type: hmac-sha256
header: X-Webhook-Signature
secret: ${WEBHOOK_SECRET}
filters:
- type: set
target: metadata.received_by
value: cannectors
output:
type: httpRequest
endpoint: https://destination.example.com/api/orders/{orderId}
method: PATCH
requestMode: single
keys:
- field: id
paramType: path
paramName: orderId
success:
statusCodes:
- 200
- 204name: webhook-queue-rate-limit-to-database
version: 1.0.0
description: Receive queued webhooks with rate limiting and insert records into SQL.
tags:
- webhook
- database-output
input:
type: webhook
path: /webhooks/leads
listenAddress: 0.0.0.0:8081
dataField: leads
queueSize: 500
maxConcurrent: 4
rateLimit:
requestsPerSecond: 20
burst: 40
filters:
- type: mapping
onError: skip
mappings:
- source: id
target: lead_id
- source: email
target: email
transforms:
- op: trim
- op: lowercase
- source: source
target: source
onMissing: useDefault
defaultValue: webhook
output:
type: database
connectionStringRef: ${CRM_DATABASE_URL}
driver: postgres
query: |
insert into leads (lead_id, email, source)
values ({{record.lead_id}}, {{record.email}}, {{record.source}})
transaction: true