cannectors

webhook

Listen for incoming HTTP POSTs and process them as records.

The webhook input starts an HTTP server and treats each incoming POST as a delivery to process. Each delivery is either treated as a single record, or unwrapped via dataField into an array of records.

The process stays alive as long as the server is running — there's no schedule for webhook inputs.

Minimal example

input:
  type: webhook
  path: /webhooks/shipments

This starts an HTTP server listening on 0.0.0.0:8080 and routes POSTs on /webhooks/shipments into the filter chain.

Options

propertytypedefaultdescription
typerequired
"webhook"Module type discriminator. Must be `webhook` for this module.
id
stringUnique identifier within the pipeline.
name
stringHuman-readable name.
description
string
enabled
booleantrueWhether module is active.
tags
array<string>
onError
string"fail"Default error action. Case-insensitive; normalized to lowercase by the runtime.
pathrequired
stringWebhook endpoint path (e.g. /webhook/orders).
listenAddress
string"0.0.0.0:8080"Webhook server listen address.
requestTimeoutMs
integerPer-request HTTP read/write timeout in milliseconds. Does NOT bound the webhook server lifetime.
object
object
queueSize
integerQueue size for async webhook processing.
maxConcurrent
integerMaximum concurrent webhook workers.
dataField
stringJSON field path containing the array of records to extract from the request body.

Signature verification

Webhook deliveries should be authenticated. Cannectors supports HMAC-SHA256 signature verification out of the box.

signature:
  type: hmac-sha256
  header: X-Hub-Signature-256          # optional, defaults to this
  secret: ${WEBHOOK_HMAC_SECRET}

The server computes HMAC-SHA256 over the raw body using the shared secret, then compares to the header value. Requests that don't match get rejected with 401 Unauthorized.

Queue and rate limiting

By default, the server processes deliveries synchronously — the HTTP response is held until the pipeline finishes the record. For bursty sources, configure an in-memory queue:

queueSize: 1024
maxConcurrent: 4
rateLimit:
  requestsPerSecond: 50
  burst: 100
FieldWhat it does
queueSizeNumber of deliveries buffered before the server returns 503.
maxConcurrentNumber of pipeline workers consuming the queue concurrently.
rateLimit.requestsPerSecondSteady-state rate the source can deliver.
rateLimit.burstMaximum burst above the steady rate.

Deliveries served by a worker run after the HTTP response is sent. That means the HTTP request context is already done — the runtime uses the server's context for downstream stages so that http_call/sql_call aren't cancelled mid-flight.

Examples

examples/05-webhook-hmac-to-http-single.yamlview source ↗
05-webhook-hmac-to-http-single.yaml
name: webhook-hmac-to-http-single
version: 1.0.0
description: Receive signed webhook payloads and forward each record separately.
tags:
  - webhook
  - hmac
input:
  type: webhook
  path: /webhooks/orders
  listenAddress: 0.0.0.0:8080
  dataField: orders
  signature:
    type: hmac-sha256
    header: X-Webhook-Signature
    secret: ${WEBHOOK_SECRET}
filters:
  - type: set
    target: metadata.received_by
    value: cannectors
output:
  type: httpRequest
  endpoint: https://destination.example.com/api/orders/{orderId}
  method: PATCH
  requestMode: single
  keys:
    - field: id
      paramType: path
      paramName: orderId
  success:
    statusCodes:
      - 200
      - 204
examples/06-webhook-queue-rate-limit-to-database.yamlview source ↗
06-webhook-queue-rate-limit-to-database.yaml
name: webhook-queue-rate-limit-to-database
version: 1.0.0
description: Receive queued webhooks with rate limiting and insert records into SQL.
tags:
  - webhook
  - database-output
input:
  type: webhook
  path: /webhooks/leads
  listenAddress: 0.0.0.0:8081
  dataField: leads
  queueSize: 500
  maxConcurrent: 4
  rateLimit:
    requestsPerSecond: 20
    burst: 40
filters:
  - type: mapping
    onError: skip
    mappings:
      - source: id
        target: lead_id
      - source: email
        target: email
        transforms:
          - op: trim
          - op: lowercase
      - source: source
        target: source
        onMissing: useDefault
        defaultValue: webhook
output:
  type: database
  connectionStringRef: ${CRM_DATABASE_URL}
  driver: postgres
  query: |
    insert into leads (lead_id, email, source)
    values ({{record.lead_id}}, {{record.email}}, {{record.source}})
  transaction: true

Cross-references