# Cannectors AI Pipeline Generation Context Purpose: this raw text page gives an online AI enough project context to generate valid Cannectors YAML pipelines. Canonical URLs: - AI context: https://cannectors.com/ai-context.txt - Documentation: https://cannectors.com/docs - Examples: https://cannectors.com/docs/examples - LLM index: https://cannectors.com/llms.txt - Full documentation text: https://cannectors.com/llms-full.txt Generation rules for AI agents: - Generate YAML pipeline files for the Cannectors CLI. - Use the JSON Schemas in this page as the authoritative contract for keys, types, required fields, enum values, and defaults. - Prefer the maintained examples in this page as templates before inventing a new structure. - A pipeline moves records through input -> filters -> output. - Keep secrets in environment variables instead of hard-coding credentials. - Recommend `cannectors validate ` before running a generated pipeline. - Recommend `cannectors run --dry-run ` for previews. High-level capabilities: - Inputs: HTTP polling, SOAP polling, webhook server, database queries. - Filters: mapping, condition routing, loop (per-record array iteration), script, HTTP enrichment, SOAP enrichment, SQL enrichment, set, remove, drop. - Outputs: HTTP requests, SOAP requests, database writes. - Cross-cutting features: authentication, retries, scheduling, state persistence, pagination, dry-run previews, logging, secrets via environment variables, defaults inheritance. # Exhaustive Module Options Summary ## input: httpPolling - type: "httpPolling"; required - Module type discriminator. Must be `httpPolling` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - endpoint: string; required; inheritedFrom=httpRequestBase - HTTP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution. - method: object; optional; inheritedFrom=httpRequestBase - headers: object; optional; inheritedFrom=httpRequestBase nested: map - queryParams: object; optional; inheritedFrom=httpRequestBase - Query parameters. nested: map - body: string; optional; inheritedFrom=httpRequestBase - Inline request body. Templates ({{record.field}}) are evaluated at runtime. The body is sent regardless of the HTTP method. - bodyTemplateFile: string; optional; inheritedFrom=httpRequestBase - Path to an external template file used as the request body. Mutually exclusive with `body` is not enforced; if both are set, the inline body wins. - authentication: object; optional; inheritedFrom=httpRequestBase variant: api-key - type: "api-key"; required - credentials: object; required nested: object properties - key: string; required - API key value or env reference (${VAR}). - location: string; optional; default="header"; enum=header|query - Where to send the key. - headerName: string; optional; default="X-API-Key" - Header name when location=header. - paramName: string; optional; default="api_key" - Query param name when location=query. variant: bearer - type: "bearer"; required - credentials: object; required nested: object properties - token: string; required - Bearer token value or env reference (${VAR}). variant: basic - type: "basic"; required - credentials: object; required nested: object properties - username: string; required - Username or env reference (${VAR}). - password: string; required - Password or env reference (${VAR}). variant: oauth2 - type: "oauth2"; required - credentials: object; required nested: object properties - tokenUrl: string; required - Token endpoint URL. - clientId: string; required - Client ID or env reference (${VAR}). - clientSecret: string; required - Client secret or env reference (${VAR}). - scope: string; optional - OAuth2 scopes as a space-separated string (RFC 6749 §3.3). - timeoutMs: integer; optional; inheritedFrom=httpRequestBase - Request timeout in milliseconds. When omitted, each module applies its own runtime default. - schedule: string; optional; inheritedFrom=httpPollingInputConfig - Optional CRON expression for polling. Validated at runtime. - pagination: object; optional; inheritedFrom=httpPollingInputConfig nested: object properties - type: string; required; enum=cursor|offset|page - Pagination type. - param: string; required - Canonical query parameter name carrying the cursor, offset, or page value. - limitParam: string; optional - Query parameter name for the page size (when applicable). - limit: integer; optional - Number of items per page. When omitted, the runtime applies its default. - nextCursorField: string; optional - Field containing the next cursor value (cursor pagination). - totalPagesField: string; optional - Field containing the total pages count (page pagination). - totalField: string; optional - Field containing the total items count (offset pagination). - statePersistence: object; optional; inheritedFrom=httpPollingInputConfig nested: object properties - timestamp: object; optional - Timestamp-based state persistence. nested: object properties - enabled: boolean; optional; default=false - Enable timestamp persistence. - queryParam: string; optional - Query parameter name for API filtering (e.g., 'updated_after'). - id: object; optional - ID-based state persistence. nested: object properties - enabled: boolean; optional; default=false - Enable ID persistence. - field: string; optional - Field path to extract ID from records (supports dot notation). - queryParam: string; optional - Query parameter name for API filtering (e.g., 'last_id'). - storagePath: string; optional - Custom storage directory path for state files. - dataField: string; optional; inheritedFrom=httpPollingInputConfig - JSON field path containing the array of records to extract from the response. - retry: object; optional; inheritedFrom=httpPollingInputConfig nested: object properties - maxAttempts: integer; optional; default=3 - Maximum retry attempts (0 = no retry). - delayMs: integer; optional; default=1000 - Initial delay between retries in milliseconds. - backoffMultiplier: number; optional; default=2 - Multiplier for exponential backoff. - maxDelayMs: integer; optional; default=30000 - Maximum delay between retries. - retryableStatusCodes: array; optional; default=[429,500,502,503,504] - HTTP status codes that trigger retry. - useRetryAfterHeader: boolean; optional; default=false - Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff. - retryHintFromBody: string; optional; default="" - expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != "PERMANENT"'. ## input: soapPolling - type: "soapPolling"; required - Module type discriminator. Must be `soapPolling` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - endpoint: string; required; inheritedFrom=soapRequestBase - SOAP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution. - soapVersion: string; optional; default="1.1"; enum=1.1|1.2; inheritedFrom=soapRequestBase - SOAP envelope and HTTP binding version. - soapAction: string; optional; inheritedFrom=soapRequestBase - SOAP action. SOAP 1.1 sends this as the SOAPAction header; SOAP 1.2 sends it as a Content-Type action parameter. - operation: string; required; inheritedFrom=soapRequestBase - Logical SOAP operation name. - body: string; required; inheritedFrom=soapRequestBase - Raw XML body fragment. Templates ({{record.field}}) are XML-escaped at runtime. - headers: array; optional; inheritedFrom=soapRequestBase - Raw SOAP header XML fragments. nested: array items - xml: string; required - authentication: object; optional; inheritedFrom=soapRequestBase - HTTP transport authentication. variant: api-key - type: "api-key"; required - credentials: object; required nested: object properties - key: string; required - API key value or env reference (${VAR}). - location: string; optional; default="header"; enum=header|query - Where to send the key. - headerName: string; optional; default="X-API-Key" - Header name when location=header. - paramName: string; optional; default="api_key" - Query param name when location=query. variant: bearer - type: "bearer"; required - credentials: object; required nested: object properties - token: string; required - Bearer token value or env reference (${VAR}). variant: basic - type: "basic"; required - credentials: object; required nested: object properties - username: string; required - Username or env reference (${VAR}). - password: string; required - Password or env reference (${VAR}). variant: oauth2 - type: "oauth2"; required - credentials: object; required nested: object properties - tokenUrl: string; required - Token endpoint URL. - clientId: string; required - Client ID or env reference (${VAR}). - clientSecret: string; required - Client secret or env reference (${VAR}). - scope: string; optional - OAuth2 scopes as a space-separated string (RFC 6749 §3.3). - wsSecurity: object; optional; inheritedFrom=soapRequestBase nested: object properties - username: string; required - password: string; required - passwordType: string; optional; default="PasswordText"; enum=PasswordText|PasswordDigest - tokenId: string; optional - mustUnderstand: boolean; optional; default=false - mtom: object; optional; inheritedFrom=soapRequestBase nested: object properties - enabled: boolean; optional; default=false - attachments: array; optional nested: array items - contentId: string; required - contentType: string; required - sourceField: string; required - Record path containing the attachment bytes. - encoding: string; optional; default="binary"; enum=binary|base64 - Encoding of string values read from sourceField. 'binary' sends the string bytes as-is; 'base64' decodes the string before attaching it. - httpHeaders: object; optional; inheritedFrom=soapRequestBase - Additional HTTP headers. Content-Type and SOAPAction are controlled by the SOAP version. nested: map - timeoutMs: integer; optional; inheritedFrom=soapRequestBase - Request timeout in milliseconds. When omitted, each module applies its own runtime default. - schedule: string; optional; inheritedFrom=soapPollingInputConfig - Optional CRON expression for polling. Validated at runtime. - pagination: object; optional; inheritedFrom=soapPollingInputConfig nested: object properties - type: string; required; enum=cursor|offset|page - Pagination type. - param: string; required - Canonical query parameter name carrying the cursor, offset, or page value. - limitParam: string; optional - Query parameter name for the page size (when applicable). - limit: integer; optional - Number of items per page. When omitted, the runtime applies its default. - nextCursorField: string; optional - Field containing the next cursor value (cursor pagination). - totalPagesField: string; optional - Field containing the total pages count (page pagination). - totalField: string; optional - Field containing the total items count (offset pagination). - statePersistence: object; optional; inheritedFrom=soapPollingInputConfig nested: object properties - timestamp: object; optional - Timestamp-based state persistence. nested: object properties - enabled: boolean; optional; default=false - Enable timestamp persistence. - queryParam: string; optional - Query parameter name for API filtering (e.g., 'updated_after'). - id: object; optional - ID-based state persistence. nested: object properties - enabled: boolean; optional; default=false - Enable ID persistence. - field: string; optional - Field path to extract ID from records (supports dot notation). - queryParam: string; optional - Query parameter name for API filtering (e.g., 'last_id'). - storagePath: string; optional - Custom storage directory path for state files. - dataField: string; optional; inheritedFrom=soapPollingInputConfig - XML map field path containing the array of records to extract from the SOAP response. - retry: object; optional; inheritedFrom=soapPollingInputConfig nested: object properties - maxAttempts: integer; optional; default=3 - Maximum retry attempts (0 = no retry). - delayMs: integer; optional; default=1000 - Initial delay between retries in milliseconds. - backoffMultiplier: number; optional; default=2 - Multiplier for exponential backoff. - maxDelayMs: integer; optional; default=30000 - Maximum delay between retries. - retryableStatusCodes: array; optional; default=[429,500,502,503,504] - HTTP status codes that trigger retry. - useRetryAfterHeader: boolean; optional; default=false - Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff. - retryHintFromBody: string; optional; default="" - expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != "PERMANENT"'. ## input: webhook - type: "webhook"; required - Module type discriminator. Must be `webhook` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - path: string; required; inheritedFrom=webhookInputConfig - Webhook endpoint path (e.g. /webhook/orders). - listenAddress: string; optional; default="0.0.0.0:8080"; inheritedFrom=webhookInputConfig - Webhook server listen address. - requestTimeoutMs: integer; optional; inheritedFrom=webhookInputConfig - Per-request HTTP read/write timeout in milliseconds. Does NOT bound the webhook server lifetime. - signature: object; optional; inheritedFrom=webhookInputConfig nested: object properties - type: string; required; enum=hmac-sha256 - Signature algorithm. - header: string; optional - Header name containing the signature. Defaults to X-Hub-Signature-256 when omitted. - secret: string; required - Secret key for signature validation. - rateLimit: object; optional; inheritedFrom=webhookInputConfig nested: object properties - requestsPerSecond: integer; required - Maximum requests per second. - burst: integer; optional - Maximum burst size. - queueSize: integer; optional; inheritedFrom=webhookInputConfig - Queue size for async webhook processing. - maxConcurrent: integer; optional; inheritedFrom=webhookInputConfig - Maximum concurrent webhook workers. - dataField: string; optional; inheritedFrom=webhookInputConfig - JSON field path containing the array of records to extract from the request body. ## input: database - type: "database"; required - Module type discriminator. Must be `database` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - schedule: string; optional; inheritedFrom=databaseInputConfig - Optional CRON expression to run the database input on a schedule. Validated at runtime. - parameters: object; optional; inheritedFrom=databaseInputConfig - Query parameters. Keys are parameter names (used with :paramName syntax). - pagination: object; optional; inheritedFrom=databaseInputConfig nested: object properties - type: string; optional; enum=limit-offset|cursor - Pagination type. - limit: integer; optional - Number of records per page. Optional; runtime applies default when omitted. - param: string; optional - Named placeholder used in the SQL query to inject the cursor or offset value (e.g. ':cursor' / ':offset'). - cursorField: string; optional - Field of the previous page's last record used as the next cursor value (cursor pagination only). - incremental: object; optional; inheritedFrom=databaseInputConfig nested: object properties - enabled: boolean; optional; default=false - Enable incremental queries. - timestampField: string; optional - Field name for timestamp-based incremental queries. - timestampParam: string; optional - Parameter name for timestamp in query. - idField: string; optional - Field name for ID-based incremental queries. - idParam: string; optional - Parameter name for ID in query. - query: string; optional; inheritedFrom=sqlRequestBase - SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}). - queryFile: string; optional; inheritedFrom=sqlRequestBase - Path to SQL file. Supports module-specific placeholders. - connectionString: string; optional; inheritedFrom=databaseConnectionConfig - Database connection string (DSN). Avoid in production - use connectionStringRef instead. - connectionStringRef: string; optional; inheritedFrom=databaseConnectionConfig - Environment variable reference for connection string. Format: ${ENV_VAR_NAME} - driver: string; optional; enum=postgres|mysql|sqlite; inheritedFrom=databaseConnectionConfig - Database driver. Auto-detected from connection string if not specified. - maxOpenConns: integer; optional; default=10; inheritedFrom=databaseConnectionConfig - Maximum number of open connections in the pool. - maxIdleConns: integer; optional; default=5; inheritedFrom=databaseConnectionConfig - Maximum number of idle connections in the pool. - connMaxLifetimeSeconds: integer; optional; default=1800; inheritedFrom=databaseConnectionConfig - Maximum lifetime of a connection in seconds. - connMaxIdleTimeSeconds: integer; optional; default=300; inheritedFrom=databaseConnectionConfig - Maximum idle time for a connection in seconds. - timeoutMs: integer; optional; default=30000; inheritedFrom=databaseConnectionConfig - Query timeout in milliseconds. ## filter: mapping - type: "mapping"; required - Module type discriminator. Must be `mapping` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - mappings: array; required; inheritedFrom=mappingFilterConfig nested: array items - source: string; optional - Source field path. Omit to delete the target field. - target: string; required - Target field path. - transforms: array; optional nested: array items - op: string; required; enum=trim|lowercase|uppercase|dateFormat|replace|split|join|toString|toInt|toFloat|toBool|toArray|toObject - Operation name. - format: string; optional - Date format pattern. - pattern: string; optional - Regex pattern for replace/match operations. - replacement: string; optional - Replacement string. - separator: string; optional - Separator for split/join operations. - defaultValue: any; optional - onMissing: string; optional; default="setNull"; enum=setNull|skipField|useDefault|fail ## filter: condition - type: "condition"; required - Module type discriminator. Must be `condition` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - expression: string; required; inheritedFrom=conditionFilterConfig - Boolean expression evaluated against each record. - then: array; optional; inheritedFrom=conditionFilterConfig - Nested filters to execute when condition is true. nested: array items - type: string; required - Filter type. Canonical: mapping, condition, drop, loop, set, remove, script, http_call, soap_call, sql_call. - id: string; optional - Unique identifier within the pipeline. - name: string; optional - Human-readable name. - description: string; optional - enabled: boolean; optional; default=true - Whether module is active. - tags: array; optional - onError: string; optional; default="fail" - Default error action. Case-insensitive; normalized to lowercase by the runtime. - else: array; optional; inheritedFrom=conditionFilterConfig - Nested filters to execute when condition is false. nested: array items - type: string; required - Filter type. Canonical: mapping, condition, drop, loop, set, remove, script, http_call, soap_call, sql_call. - id: string; optional - Unique identifier within the pipeline. - name: string; optional - Human-readable name. - description: string; optional - enabled: boolean; optional; default=true - Whether module is active. - tags: array; optional - onError: string; optional; default="fail" - Default error action. Case-insensitive; normalized to lowercase by the runtime. ## filter: script - type: "script"; required - Module type discriminator. Must be `script` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - script: string; optional; inheritedFrom=scriptFilterConfig - Inline JavaScript source code containing a transform(record) function. - scriptFile: string; optional; inheritedFrom=scriptFilterConfig - Path to JavaScript file containing the transform(record) function. ## filter: http_call - type: "http_call"; required - Module type discriminator. Must be `http_call` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - endpoint: string; required; inheritedFrom=httpRequestBase - HTTP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution. - method: object; optional; inheritedFrom=httpRequestBase - headers: object; optional; inheritedFrom=httpRequestBase nested: map - queryParams: object; optional; inheritedFrom=httpRequestBase - Query parameters. nested: map - body: string; optional; inheritedFrom=httpRequestBase - Inline request body. Templates ({{record.field}}) are evaluated at runtime. The body is sent regardless of the HTTP method. - bodyTemplateFile: string; optional; inheritedFrom=httpRequestBase - Path to an external template file used as the request body. Mutually exclusive with `body` is not enforced; if both are set, the inline body wins. - authentication: object; optional; inheritedFrom=httpRequestBase variant: api-key - type: "api-key"; required - credentials: object; required nested: object properties - key: string; required - API key value or env reference (${VAR}). - location: string; optional; default="header"; enum=header|query - Where to send the key. - headerName: string; optional; default="X-API-Key" - Header name when location=header. - paramName: string; optional; default="api_key" - Query param name when location=query. variant: bearer - type: "bearer"; required - credentials: object; required nested: object properties - token: string; required - Bearer token value or env reference (${VAR}). variant: basic - type: "basic"; required - credentials: object; required nested: object properties - username: string; required - Username or env reference (${VAR}). - password: string; required - Password or env reference (${VAR}). variant: oauth2 - type: "oauth2"; required - credentials: object; required nested: object properties - tokenUrl: string; required - Token endpoint URL. - clientId: string; required - Client ID or env reference (${VAR}). - clientSecret: string; required - Client secret or env reference (${VAR}). - scope: string; optional - OAuth2 scopes as a space-separated string (RFC 6749 §3.3). - timeoutMs: integer; optional; inheritedFrom=httpRequestBase - Request timeout in milliseconds. When omitted, each module applies its own runtime default. - keys: array; optional; inheritedFrom=httpCallFilterConfig - List of key configurations for extracting values from records and using them in requests. nested: array items - field: string; required - Dot-notation path to extract value from record (e.g., 'customer.id'). - paramType: string; required; enum=query|path|header - How to include value in request. - paramName: string; required - Parameter name (path placeholder, query param, or header name). - cache: object; optional; inheritedFrom=httpCallFilterConfig nested: object properties - enabled: boolean; optional; default=false - Enable caching of query results. - maxSize: integer; optional; default=1000 - Maximum number of cached entries. - ttlSeconds: integer; optional; default=300 - Cache entry TTL in seconds. - key: string; optional - Cache key template using {{record.field}} syntax. - mergeStrategy: string; optional; default="merge"; enum=merge|replace|append; inheritedFrom=httpCallFilterConfig - How to merge HTTP response with input records. - dataField: string; optional; inheritedFrom=httpCallFilterConfig - Field to extract from HTTP response. - retry: object; optional; inheritedFrom=httpCallFilterConfig nested: object properties - maxAttempts: integer; optional; default=3 - Maximum retry attempts (0 = no retry). - delayMs: integer; optional; default=1000 - Initial delay between retries in milliseconds. - backoffMultiplier: number; optional; default=2 - Multiplier for exponential backoff. - maxDelayMs: integer; optional; default=30000 - Maximum delay between retries. - retryableStatusCodes: array; optional; default=[429,500,502,503,504] - HTTP status codes that trigger retry. - useRetryAfterHeader: boolean; optional; default=false - Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff. - retryHintFromBody: string; optional; default="" - expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != "PERMANENT"'. ## filter: soap_call - type: "soap_call"; required - Module type discriminator. Must be `soap_call` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - endpoint: string; required; inheritedFrom=soapRequestBase - SOAP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution. - soapVersion: string; optional; default="1.1"; enum=1.1|1.2; inheritedFrom=soapRequestBase - SOAP envelope and HTTP binding version. - soapAction: string; optional; inheritedFrom=soapRequestBase - SOAP action. SOAP 1.1 sends this as the SOAPAction header; SOAP 1.2 sends it as a Content-Type action parameter. - operation: string; required; inheritedFrom=soapRequestBase - Logical SOAP operation name. - body: string; required; inheritedFrom=soapRequestBase - Raw XML body fragment. Templates ({{record.field}}) are XML-escaped at runtime. - headers: array; optional; inheritedFrom=soapRequestBase - Raw SOAP header XML fragments. nested: array items - xml: string; required - authentication: object; optional; inheritedFrom=soapRequestBase - HTTP transport authentication. variant: api-key - type: "api-key"; required - credentials: object; required nested: object properties - key: string; required - API key value or env reference (${VAR}). - location: string; optional; default="header"; enum=header|query - Where to send the key. - headerName: string; optional; default="X-API-Key" - Header name when location=header. - paramName: string; optional; default="api_key" - Query param name when location=query. variant: bearer - type: "bearer"; required - credentials: object; required nested: object properties - token: string; required - Bearer token value or env reference (${VAR}). variant: basic - type: "basic"; required - credentials: object; required nested: object properties - username: string; required - Username or env reference (${VAR}). - password: string; required - Password or env reference (${VAR}). variant: oauth2 - type: "oauth2"; required - credentials: object; required nested: object properties - tokenUrl: string; required - Token endpoint URL. - clientId: string; required - Client ID or env reference (${VAR}). - clientSecret: string; required - Client secret or env reference (${VAR}). - scope: string; optional - OAuth2 scopes as a space-separated string (RFC 6749 §3.3). - wsSecurity: object; optional; inheritedFrom=soapRequestBase nested: object properties - username: string; required - password: string; required - passwordType: string; optional; default="PasswordText"; enum=PasswordText|PasswordDigest - tokenId: string; optional - mustUnderstand: boolean; optional; default=false - mtom: object; optional; inheritedFrom=soapRequestBase nested: object properties - enabled: boolean; optional; default=false - attachments: array; optional nested: array items - contentId: string; required - contentType: string; required - sourceField: string; required - Record path containing the attachment bytes. - encoding: string; optional; default="binary"; enum=binary|base64 - Encoding of string values read from sourceField. 'binary' sends the string bytes as-is; 'base64' decodes the string before attaching it. - httpHeaders: object; optional; inheritedFrom=soapRequestBase - Additional HTTP headers. Content-Type and SOAPAction are controlled by the SOAP version. nested: map - timeoutMs: integer; optional; inheritedFrom=soapRequestBase - Request timeout in milliseconds. When omitted, each module applies its own runtime default. - keys: array; optional; inheritedFrom=soapCallFilterConfig - List of key configurations for extracting values from records and using them in request metadata. nested: array items - field: string; required - Dot-notation path to extract value from record (e.g., 'customer.id'). - paramType: string; required; enum=query|path|header - How to include value in request. - paramName: string; required - Parameter name (path placeholder, query param, or header name). - cache: object; optional; inheritedFrom=soapCallFilterConfig nested: object properties - enabled: boolean; optional; default=false - Enable caching of query results. - maxSize: integer; optional; default=1000 - Maximum number of cached entries. - ttlSeconds: integer; optional; default=300 - Cache entry TTL in seconds. - key: string; optional - Cache key template using {{record.field}} syntax. - mergeStrategy: string; optional; default="merge"; enum=merge|replace|append; inheritedFrom=soapCallFilterConfig - How to merge SOAP response with input records. - resultKey: string; optional; inheritedFrom=soapCallFilterConfig - Key for storing the SOAP response when append mode is used. - dataField: string; optional; inheritedFrom=soapCallFilterConfig - Field path to extract from the parsed SOAP response map before merge/replace/append. When omitted, the full parsed SOAP response is used. - retry: object; optional; inheritedFrom=soapCallFilterConfig nested: object properties - maxAttempts: integer; optional; default=3 - Maximum retry attempts (0 = no retry). - delayMs: integer; optional; default=1000 - Initial delay between retries in milliseconds. - backoffMultiplier: number; optional; default=2 - Multiplier for exponential backoff. - maxDelayMs: integer; optional; default=30000 - Maximum delay between retries. - retryableStatusCodes: array; optional; default=[429,500,502,503,504] - HTTP status codes that trigger retry. - useRetryAfterHeader: boolean; optional; default=false - Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff. - retryHintFromBody: string; optional; default="" - expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != "PERMANENT"'. ## filter: sql_call - type: "sql_call"; required - Module type discriminator. Must be `sql_call` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - mergeStrategy: string; optional; default="merge"; enum=merge|replace|append; inheritedFrom=sqlCallFilterConfig - How to merge query results with input records. - resultKey: string; optional; inheritedFrom=sqlCallFilterConfig - Key for storing result in append mode. - cache: object; optional; inheritedFrom=sqlCallFilterConfig nested: object properties - enabled: boolean; optional; default=false - Enable caching of query results. - maxSize: integer; optional; default=1000 - Maximum number of cached entries. - ttlSeconds: integer; optional; default=300 - Cache entry TTL in seconds. - key: string; optional - Cache key template using {{record.field}} syntax. - query: string; optional; inheritedFrom=sqlRequestBase - SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}). - queryFile: string; optional; inheritedFrom=sqlRequestBase - Path to SQL file. Supports module-specific placeholders. - connectionString: string; optional; inheritedFrom=databaseConnectionConfig - Database connection string (DSN). Avoid in production - use connectionStringRef instead. - connectionStringRef: string; optional; inheritedFrom=databaseConnectionConfig - Environment variable reference for connection string. Format: ${ENV_VAR_NAME} - driver: string; optional; enum=postgres|mysql|sqlite; inheritedFrom=databaseConnectionConfig - Database driver. Auto-detected from connection string if not specified. - maxOpenConns: integer; optional; default=10; inheritedFrom=databaseConnectionConfig - Maximum number of open connections in the pool. - maxIdleConns: integer; optional; default=5; inheritedFrom=databaseConnectionConfig - Maximum number of idle connections in the pool. - connMaxLifetimeSeconds: integer; optional; default=1800; inheritedFrom=databaseConnectionConfig - Maximum lifetime of a connection in seconds. - connMaxIdleTimeSeconds: integer; optional; default=300; inheritedFrom=databaseConnectionConfig - Maximum idle time for a connection in seconds. - timeoutMs: integer; optional; default=30000; inheritedFrom=databaseConnectionConfig - Query timeout in milliseconds. ## filter: set - type: "set"; required - Module type discriminator. Must be `set` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - target: string; required; inheritedFrom=setFilterConfig - Field path to set (e.g. id, user.id, metadata.version). Supports dot notation for nested paths. - value: any; required; inheritedFrom=setFilterConfig - Literal value to set. Can be string, number, boolean, or null. Type is preserved. ## filter: remove - type: "remove"; required - Module type discriminator. Must be `remove` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - target: union; required; inheritedFrom=removeFilterConfig - Field path(s) to remove. Either a single non-empty string or a non-empty array of non-empty strings. Supports dot notation for nested paths. ## filter: drop - type: "drop"; required - Module type discriminator. Must be `drop` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. ## filter: loop - type: "loop"; required - Module type discriminator. Must be `loop` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - field: string; required; inheritedFrom=loopFilterConfig - Path to the array field, relative to the input record (root record for the outermost loop, parent scope for nested loops). Supports dot notation. - itemName: string; required; inheritedFrom=loopFilterConfig - Alias exposed to nested filters for the current item. Cannot be 'record', '_metadata', or 'loop', and must not duplicate an active parent loop alias. - filters: array; required; inheritedFrom=loopFilterConfig - Nested filters executed for every item. nested: array items - type: string; required - Filter type. Canonical: mapping, condition, drop, loop, set, remove, script, http_call, soap_call, sql_call. - id: string; optional - Unique identifier within the pipeline. - name: string; optional - Human-readable name. - description: string; optional - enabled: boolean; optional; default=true - Whether module is active. - tags: array; optional - onError: string; optional; default="fail" - Default error action. Case-insensitive; normalized to lowercase by the runtime. ## output: httpRequest - type: "httpRequest"; required - Module type discriminator. Must be `httpRequest` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - endpoint: string; required; inheritedFrom=httpRequestBase - HTTP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution. - method: object; optional; inheritedFrom=httpRequestBase - headers: object; optional; inheritedFrom=httpRequestBase nested: map - queryParams: object; optional; inheritedFrom=httpRequestBase - Query parameters. nested: map - body: string; optional; inheritedFrom=httpRequestBase - Inline request body. Templates ({{record.field}}) are evaluated at runtime. The body is sent regardless of the HTTP method. - bodyTemplateFile: string; optional; inheritedFrom=httpRequestBase - Path to an external template file used as the request body. Mutually exclusive with `body` is not enforced; if both are set, the inline body wins. - authentication: object; optional; inheritedFrom=httpRequestBase variant: api-key - type: "api-key"; required - credentials: object; required nested: object properties - key: string; required - API key value or env reference (${VAR}). - location: string; optional; default="header"; enum=header|query - Where to send the key. - headerName: string; optional; default="X-API-Key" - Header name when location=header. - paramName: string; optional; default="api_key" - Query param name when location=query. variant: bearer - type: "bearer"; required - credentials: object; required nested: object properties - token: string; required - Bearer token value or env reference (${VAR}). variant: basic - type: "basic"; required - credentials: object; required nested: object properties - username: string; required - Username or env reference (${VAR}). - password: string; required - Password or env reference (${VAR}). variant: oauth2 - type: "oauth2"; required - credentials: object; required nested: object properties - tokenUrl: string; required - Token endpoint URL. - clientId: string; required - Client ID or env reference (${VAR}). - clientSecret: string; required - Client secret or env reference (${VAR}). - scope: string; optional - OAuth2 scopes as a space-separated string (RFC 6749 §3.3). - timeoutMs: integer; optional; inheritedFrom=httpRequestBase - Request timeout in milliseconds. When omitted, each module applies its own runtime default. - success: object; optional; inheritedFrom=httpRequestOutputConfig nested: object properties - expression: string; optional - expr expression evaluated against the response. Available variables: `statusCode` (int), `headers` (map[string][]string), `body` (parsed JSON when applicable). Must return a boolean. - statusCodes: array; optional; default=[200,201,202,203,204] - HTTP status codes considered successful. - retry: object; optional; inheritedFrom=httpRequestOutputConfig nested: object properties - maxAttempts: integer; optional; default=3 - Maximum retry attempts (0 = no retry). - delayMs: integer; optional; default=1000 - Initial delay between retries in milliseconds. - backoffMultiplier: number; optional; default=2 - Multiplier for exponential backoff. - maxDelayMs: integer; optional; default=30000 - Maximum delay between retries. - retryableStatusCodes: array; optional; default=[429,500,502,503,504] - HTTP status codes that trigger retry. - useRetryAfterHeader: boolean; optional; default=false - Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff. - retryHintFromBody: string; optional; default="" - expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != "PERMANENT"'. - keys: array; optional; inheritedFrom=httpRequestOutputConfig - List of key configurations for extracting values from records and using them in path, query, or headers. nested: array items - field: string; required - Dot-notation path to extract value from record (e.g., 'customer.id'). - paramType: string; required; enum=query|path|header - How to include value in request. - paramName: string; required - Parameter name (path placeholder, query param, or header name). - requestMode: string; optional; default="batch"; enum=batch|single; inheritedFrom=httpRequestOutputConfig - Request mode: 'batch' (all records in one request, default) or 'single' (one request per record). ## output: soapRequest - type: "soapRequest"; required - Module type discriminator. Must be `soapRequest` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - endpoint: string; required; inheritedFrom=soapRequestBase - SOAP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution. - soapVersion: string; optional; default="1.1"; enum=1.1|1.2; inheritedFrom=soapRequestBase - SOAP envelope and HTTP binding version. - soapAction: string; optional; inheritedFrom=soapRequestBase - SOAP action. SOAP 1.1 sends this as the SOAPAction header; SOAP 1.2 sends it as a Content-Type action parameter. - operation: string; required; inheritedFrom=soapRequestBase - Logical SOAP operation name. - body: string; required; inheritedFrom=soapRequestBase - Raw XML body fragment. Templates ({{record.field}}) are XML-escaped at runtime. - headers: array; optional; inheritedFrom=soapRequestBase - Raw SOAP header XML fragments. nested: array items - xml: string; required - authentication: object; optional; inheritedFrom=soapRequestBase - HTTP transport authentication. variant: api-key - type: "api-key"; required - credentials: object; required nested: object properties - key: string; required - API key value or env reference (${VAR}). - location: string; optional; default="header"; enum=header|query - Where to send the key. - headerName: string; optional; default="X-API-Key" - Header name when location=header. - paramName: string; optional; default="api_key" - Query param name when location=query. variant: bearer - type: "bearer"; required - credentials: object; required nested: object properties - token: string; required - Bearer token value or env reference (${VAR}). variant: basic - type: "basic"; required - credentials: object; required nested: object properties - username: string; required - Username or env reference (${VAR}). - password: string; required - Password or env reference (${VAR}). variant: oauth2 - type: "oauth2"; required - credentials: object; required nested: object properties - tokenUrl: string; required - Token endpoint URL. - clientId: string; required - Client ID or env reference (${VAR}). - clientSecret: string; required - Client secret or env reference (${VAR}). - scope: string; optional - OAuth2 scopes as a space-separated string (RFC 6749 §3.3). - wsSecurity: object; optional; inheritedFrom=soapRequestBase nested: object properties - username: string; required - password: string; required - passwordType: string; optional; default="PasswordText"; enum=PasswordText|PasswordDigest - tokenId: string; optional - mustUnderstand: boolean; optional; default=false - mtom: object; optional; inheritedFrom=soapRequestBase nested: object properties - enabled: boolean; optional; default=false - attachments: array; optional nested: array items - contentId: string; required - contentType: string; required - sourceField: string; required - Record path containing the attachment bytes. - encoding: string; optional; default="binary"; enum=binary|base64 - Encoding of string values read from sourceField. 'binary' sends the string bytes as-is; 'base64' decodes the string before attaching it. - httpHeaders: object; optional; inheritedFrom=soapRequestBase - Additional HTTP headers. Content-Type and SOAPAction are controlled by the SOAP version. nested: map - timeoutMs: integer; optional; inheritedFrom=soapRequestBase - Request timeout in milliseconds. When omitted, each module applies its own runtime default. - success: object; optional; inheritedFrom=soapRequestOutputConfig nested: object properties - expression: string; optional - expr expression evaluated against the response. Available variables: `statusCode` (int), `headers` (map[string][]string), `body` (parsed JSON when applicable). Must return a boolean. - statusCodes: array; optional; default=[200,201,202,203,204] - HTTP status codes considered successful. - retry: object; optional; inheritedFrom=soapRequestOutputConfig nested: object properties - maxAttempts: integer; optional; default=3 - Maximum retry attempts (0 = no retry). - delayMs: integer; optional; default=1000 - Initial delay between retries in milliseconds. - backoffMultiplier: number; optional; default=2 - Multiplier for exponential backoff. - maxDelayMs: integer; optional; default=30000 - Maximum delay between retries. - retryableStatusCodes: array; optional; default=[429,500,502,503,504] - HTTP status codes that trigger retry. - useRetryAfterHeader: boolean; optional; default=false - Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff. - retryHintFromBody: string; optional; default="" - expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != "PERMANENT"'. - requestMode: string; optional; default="batch"; enum=batch|single; inheritedFrom=soapRequestOutputConfig - Request mode: 'batch' (all records in one request, default) or 'single' (one request per record). ## output: database - type: "database"; required - Module type discriminator. Must be `database` for this module. - id: string; optional; inheritedFrom=moduleBase - Unique identifier within the pipeline. - name: string; optional; inheritedFrom=moduleBase - Human-readable name. - description: string; optional; inheritedFrom=moduleBase - enabled: boolean; optional; default=true; inheritedFrom=moduleBase - Whether module is active. - tags: array; optional; inheritedFrom=moduleBase - onError: string; optional; default="fail"; inheritedFrom=moduleBase - Default error action. Case-insensitive; normalized to lowercase by the runtime. - transaction: boolean; optional; default=false; inheritedFrom=databaseOutputConfig - Wrap operations in a transaction. - query: string; optional; inheritedFrom=sqlRequestBase - SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}}). - queryFile: string; optional; inheritedFrom=sqlRequestBase - Path to SQL file. Supports module-specific placeholders. - connectionString: string; optional; inheritedFrom=databaseConnectionConfig - Database connection string (DSN). Avoid in production - use connectionStringRef instead. - connectionStringRef: string; optional; inheritedFrom=databaseConnectionConfig - Environment variable reference for connection string. Format: ${ENV_VAR_NAME} - driver: string; optional; enum=postgres|mysql|sqlite; inheritedFrom=databaseConnectionConfig - Database driver. Auto-detected from connection string if not specified. - maxOpenConns: integer; optional; default=10; inheritedFrom=databaseConnectionConfig - Maximum number of open connections in the pool. - maxIdleConns: integer; optional; default=5; inheritedFrom=databaseConnectionConfig - Maximum number of idle connections in the pool. - connMaxLifetimeSeconds: integer; optional; default=1800; inheritedFrom=databaseConnectionConfig - Maximum lifetime of a connection in seconds. - connMaxIdleTimeSeconds: integer; optional; default=300; inheritedFrom=databaseConnectionConfig - Maximum idle time for a connection in seconds. - timeoutMs: integer; optional; default=30000; inheritedFrom=databaseConnectionConfig - Query timeout in milliseconds. # Important Conditional Schema Constraints The module summary above lists direct required fields. The raw JSON Schemas below remain authoritative for conditional requirements and mutual exclusions. Key generation constraints: - `script` filters must provide exactly one of `script` or `scriptFile`. - SQL-shaped modules (`database` input, `database` output, `sql_call`) must provide exactly one of `query` or `queryFile` and exactly one of `connectionString` or `connectionStringRef`. - `soap_call` and `sql_call` require `resultKey` when `mergeStrategy: append`. - HTTP/SOAP pagination requires type-specific fields: `cursor` requires `nextCursorField`, `page` requires `totalPagesField`, and `offset` requires `limitParam` plus `totalField`. - Database pagination requires `type`; `limit-offset` requires `param`, and `cursor` requires both `param` and `cursorField`. - Mapping entries with `onMissing: useDefault` require `defaultValue`. - Output `success` requires at least one of `expression` or `statusCodes` when the `success` block is present. - Top-level `defaults.retry` is inherited field-by-field by retry-capable modules; module retry fields override only the fields they declare. - `loop` filters require `field`, `itemName`, and `filters`; `itemName` cannot be `record`, `_metadata`, or `loop`, and cannot duplicate an active parent loop alias when nesting. Nested filters must not write under `_metadata.loop` (read-only runtime state). Returning zero records removes the item; more than one record is rejected. Sub-path writes on non-object items are rejected. # Full Documentation Text The following pages are extracted from the documentation source and preserve the canonical documentation order from the site source. # Cannectors Documentation (/docs) Cannectors is a cross-platform CLI for running declarative data pipelines. It reads YAML pipeline files and moves records through an `input → filters → output` flow. ## What it does [#what-it-does] * Pull records from HTTP APIs, SOAP APIs, webhooks, PostgreSQL, MySQL, or SQLite. * Transform records with mapping, condition, script, set, remove, HTTP-call, SOAP-call, and SQL-call filters. * Send records to HTTP APIs, SOAP APIs, or databases. * Handle authentication, retries, scheduling, state persistence, and dry-run previews — all from configuration. ## Where to next [#where-to-next] ## Minimal pipeline [#minimal-pipeline] ```yaml name: sync-orders version: 1.0.0 description: Poll orders from an API and send them as a batch. input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders filters: - type: mapping mappings: - source: order_id target: id - source: customer.email target: email output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch ``` This documentation site is in active development. Get Started, Concepts, and Modules pages are being written in upcoming phases. See the [roadmap](https://github.com/alexandrecano/cannectors-doc/blob/main/docs/ROADMAP.md). # Exit codes (/docs/cli/exit-codes) Cannectors uses a small, stable set of exit codes so CI and process supervisors can branch on them. | Code | Meaning | Emitted by | | ----- | --------------------------------------------------------- | ---------------------------- | | **0** | Success — pipeline valid, or run completed cleanly. | `validate`, `run`, `version` | | **1** | Validation errors — schema check failed. | `validate`, `run` | | **2** | Parse error — the file is not valid YAML/JSON. | `validate`, `run` | | **3** | Runtime error — a stage failed past its `onError` policy. | `run` only | ## When each code is set [#when-each-code-is-set] ### `0` — success [#0--success] * `validate`: schema passed. * `run`: the pipeline ran to completion (one-shot) or was stopped via SIGINT/SIGTERM after gracefully draining in-flight work. * `version`: always. ### `1` — validation error [#1--validation-error] The YAML/JSON parses, but doesn't match the pipeline schema. The error message names the offending module and field. ``` ✗ filters[1] (mapping): mappings is required ``` This is the code that should fail a CI gate. ### `2` — parse error [#2--parse-error] The file isn't valid YAML/JSON at all — indentation broken, unclosed quote, invalid escape. The error names the line and column. ``` ✗ parse error: line 14:7 — could not find expected ':' ``` Treat the same as code 1 in CI. ### `3` — runtime error [#3--runtime-error] Only `run` emits this. It means the runtime started executing, something failed past the configured `onError` policy, and the pipeline aborted. Examples: * Output endpoint returns a non-retryable 4xx after the `retry` block is exhausted. * A `script` filter throws and `onError` is `fail` (the default). * Database connection drops mid-transaction. The log carries the root cause and the record index that triggered the failure. Process supervisors should treat code 3 as a normal "restart me" signal — there's nothing wrong with the YAML, just a transient external problem. ## Branching in shell [#branching-in-shell] ```bash cannectors run pipeline.yaml case $? in 0) echo "done" ;; 1|2) echo "fix the YAML and retry" ;; 3) echo "transient runtime error — will be restarted" ;; esac ``` ## See also [#see-also] # Flags (/docs/cli/flags) All four flags work on every command (`validate`, `run`, `version`), though `--dry-run` is only meaningful on `run`. ## `--verbose` [#--verbose] Print extra detail. On `validate`, that's the parsed pipeline structure (module list, defaults inheritance, env-var positions). On `run`, it's per-record trace lines as records move through the filter chain. ```bash cannectors run --verbose pipeline.yaml ``` Output: ``` · loading pipeline sync-orders v1.0.0 · defaults: timeoutMs=15000, onError=log, retry.maxAttempts=2 · input httpPolling GET https://source.example.com/api/orders · record 1/48 → mapping (kept) · record 1/48 → condition (status==paid: kept) · … ✓ complete 47 records · 1.84s ``` Useful when you need to know **why** the runtime made a particular decision. Don't ship `--verbose` to production logs — it's noisy. ## `--quiet` [#--quiet] Suppress informational logs. Warnings and errors still print. ```bash cannectors validate --quiet pipeline.yaml && echo "ok" ``` The exit code is still set correctly, so this is the right flag for CI gating: silence on success, errors on failure. ## `--log-file ` [#--log-file-path] Write logs to a file instead of stdout. The file gets the same structured records that would have been printed; stdout stays clean. ```bash cannectors run --log-file /var/log/cannectors/sync-orders.log pipeline.yaml ``` The log format is line-delimited JSON (one event per line). Pipe it into your log aggregator (Loki, Datadog, etc.) without further parsing. ## `--dry-run` [#--dry-run] `run`-only. Validates, fetches the input, runs every filter, then prints what would have been sent to the output instead of sending it. ```bash cannectors run --dry-run pipeline.yaml ``` See [Dry-run mode](/docs/concepts/dry-run) for the full semantics. ## Combining flags [#combining-flags] Flags compose freely: ```bash cannectors run --dry-run --verbose --log-file run.log pipeline.yaml ``` The order doesn't matter. The pipeline file argument can come anywhere (Cobra-style flag parsing). ## See also [#see-also] # CLI (/docs/cli) The Cannectors CLI is small on purpose. Three commands, four global flags. The interesting surface lives in the YAML. ## Flags & exit codes [#flags--exit-codes] ## A typical loop [#a-typical-loop] ```bash # 1. Edit the YAML $EDITOR pipeline.yaml # 2. Sanity-check it cannectors validate pipeline.yaml # 3. Preview against the real source, no destination write cannectors run --dry-run pipeline.yaml # 4. Ship it cannectors run pipeline.yaml ``` That's the whole development feedback loop. CI can do step 2 as a schema gate; step 3 is the safety net before a fresh pipeline hits production destinations. # run (/docs/cli/run) ```bash cannectors run [--dry-run] ``` Reads the pipeline, validates it (same checks as [`validate`](/docs/cli/validate)), resolves environment variables, then executes it. The process lifetime depends on the input. ## Examples [#examples] ```bash # Run once and exit cannectors run ./examples/01-http-polling-basic-to-http-batch.yaml # Preview without writing to the output cannectors run --dry-run ./examples/19-http-output-single-template.yaml # Stay alive on CRON cannectors run ./examples/04-http-polling-cursor-oauth2.yaml ``` ## Lifetime [#lifetime] | Input | Lifetime | | -------------------------------- | ------------------------------------------------------------------ | | `httpPolling` without `schedule` | One-shot. Exits 0 on success. | | `httpPolling` with `schedule` | Stays alive. Triggers on every CRON match. Exit on SIGINT/SIGTERM. | | `database` without `schedule` | One-shot. | | `database` with `schedule` | Stays alive on CRON. | | `webhook` | Long-running HTTP listener. Exit on SIGINT/SIGTERM. | The runtime drains in-flight work and persists state on shutdown — a graceful stop never loses a batch. ## Dry-run mode [#dry-run-mode] `--dry-run` runs everything except the output's side effect: the input fetches, every filter executes, then the would-be output payload is printed to stdout instead of being sent. ```bash cannectors run --dry-run sync-orders.yaml ``` See [Dry-run mode](/docs/concepts/dry-run) for the full mental model and `dryRunOptions` tuning. ## Flags [#flags] | Flag | Effect | | ------------------- | ----------------------------------------------------- | | `--dry-run` | Skip the output side effect; print a preview instead. | | `--verbose` | Per-record trace lines on stdout. | | `--quiet` | Suppress informational logs (errors still print). | | `--log-file ` | Write structured logs to a file. | See [Flags](/docs/cli/flags) for the full reference. ## Exit codes [#exit-codes] | Code | Meaning | | ---- | ----------------------------------------------------------------------- | | `0` | Pipeline completed successfully (one-shot mode) or was stopped cleanly. | | `1` | Validation errors before execution. | | `2` | Parse error. | | `3` | Runtime error — a stage failed past its `onError` policy. | ## Production process management [#production-process-management] The `run` process is single-replica by design. Don't run two copies of the same scheduled pipeline — they'd fire on the same CRON tick and double-process records. Use a single-replica `Deployment`, `StatefulSet`, or `systemd` unit. See [Operations · scheduling in production](/docs/operations/scheduling-in-production) for the deployment patterns. ## See also [#see-also] # validate (/docs/cli/validate) ```bash cannectors validate ``` Reads the file, parses it as YAML, validates every block against the JSON Schema the runtime uses, and reports the result. Doesn't run the pipeline. ## Examples [#examples] ```bash cannectors validate ./examples/01-http-polling-basic-to-http-batch.yaml cannectors validate --verbose ./examples/10-mapping-transforms-all.yaml cannectors validate --quiet ./examples/22-defaults-inheritance.yaml ``` ## What gets checked [#what-gets-checked] * **Top-level shape** — required keys (`name`, `input`, `filters`, `output`) are present and the right type. * **Module discriminator** — `input.type`, `output.type`, and each `filters[].type` resolve to a supported module. * **Module options** — every option's type, enum, pattern, and required flag against the module's own schema. * **Cross-cutting blocks** — `authentication`, `retry`, `pagination`, `statePersistence`, `cache` all validated against their shared schemas. `validate` does **not** resolve environment variables, hit external services, or open database connections. It's a pure parse-and-check pass — safe to run in CI on every commit. ## Flags [#flags] | Flag | Effect | | ------------------- | --------------------------------------------------------------------------------------- | | `--verbose` | Print the parsed pipeline structure (modules, defaults inheritance, env-var positions). | | `--quiet` | Suppress the success banner. Errors still go to stderr. | | `--log-file ` | Write logs to a file instead of stdout. | ## Exit codes [#exit-codes] | Code | Meaning | | ---- | --------------------------------------------- | | `0` | Pipeline is valid. | | `1` | Schema validation errors. Details on stderr. | | `2` | Parse error — the file isn't valid YAML/JSON. | See [Exit codes](/docs/cli/exit-codes) for the full table. ## In CI [#in-ci] A typical pre-merge gate: ```bash cannectors validate ./pipelines/*.yaml ``` Exits non-zero on the first invalid file, so the CI step fails fast. For all-or-nothing behavior, loop manually and collect: ```bash fail=0 for f in pipelines/*.yaml; do cannectors validate --quiet "$f" || fail=1 done exit "$fail" ``` ## See also [#see-also] # version (/docs/cli/version) ```bash cannectors version ``` Prints the build version, the Git commit it was built from, the build date, and the Go runtime version. No flags, no side effects. ## Example output [#example-output] ``` cannectors v0.1.0 commit: 9e3a2c1 built: 2026-04-21T12:30:00Z runtime: go1.25.0 linux/amd64 ``` The exact format may evolve, but the first line — `cannectors ` — is stable and CI-parseable. ## CI usage [#ci-usage] A common pattern is to print the version at the top of each CI step so log triage stays sane across binary upgrades: ```bash cannectors version cannectors validate pipeline.yaml cannectors run --dry-run pipeline.yaml ``` ## Exit code [#exit-code] Always `0`. The command takes no input that can fail. # Authentication (/docs/concepts/authentication) Every HTTP-shaped and SOAP-shaped module (`httpPolling`, `webhook`, `http_call`, `httpRequest`, `soapPolling`, `soap_call`, `soapRequest`) accepts the same `authentication` block for transport authentication. Credentials live in environment variables, never in the YAML. SOAP services may also require WS-Security UsernameToken. That is configured with `wsSecurity`, not `authentication`; see [SOAP](/docs/concepts/soap). ## The shape [#the-shape] ```yaml authentication: type: credentials: # type-specific fields ``` ## `api-key` [#api-key] A key passed either as an HTTP header or a query parameter. ```yaml authentication: type: api-key credentials: key: ${SOURCE_API_KEY} location: header # header | query headerName: X-Api-Key # required if location=header paramName: api_key # required if location=query ``` | Field | Required | Meaning | | ------------ | ---------------------- | --------------------------------------------- | | `key` | yes | The credential, usually an env var reference. | | `location` | yes | `header` or `query`. | | `headerName` | when `location=header` | HTTP header name. | | `paramName` | when `location=query` | Query string key. | ## `bearer` [#bearer] The most common HTTP auth flavor — `Authorization: Bearer `. ```yaml authentication: type: bearer credentials: token: ${SOURCE_BEARER_TOKEN} ``` ## `basic` [#basic] Sends an `Authorization: Basic …` header. Credentials are Base64-encoded by Cannectors at request time. ```yaml authentication: type: basic credentials: username: ${SOURCE_USERNAME} password: ${SOURCE_PASSWORD} ``` HTTP Basic auth has no built-in transport security. Only use it over HTTPS endpoints. ## `oauth2` [#oauth2] OAuth2 client credentials grant. Cannectors acquires a token on first use, caches it, and refreshes automatically before expiry. No manual refresh handling required. ```yaml authentication: type: oauth2 credentials: tokenUrl: ${OAUTH_TOKEN_URL} clientId: ${OAUTH_CLIENT_ID} clientSecret: ${OAUTH_CLIENT_SECRET} scope: read:orders # optional, space-separated for multiple ``` | Field | Required | Meaning | | -------------- | -------- | ------------------------------------------ | | `tokenUrl` | yes | The OAuth2 token endpoint. | | `clientId` | yes | OAuth2 client ID. | | `clientSecret` | yes | OAuth2 client secret. | | `scope` | no | Space-separated list of scopes to request. | Only the **client\_credentials** grant is supported. For user-delegated flows, fetch a token externally and feed it via `bearer`. ## Per-module overrides [#per-module-overrides] Authentication is per-module — the input, each filter, and the output can all use a different scheme. The [`23-auth-basic-bearer-query-key.yaml`](https://github.com/alexandrecano/cannectors/blob/main/examples/23-auth-basic-bearer-query-key.yaml) example uses bearer on input, basic on enrichment, and an API key on output, all in one pipeline. ```yaml input: type: httpPolling authentication: type: bearer credentials: token: ${SOURCE_BEARER_TOKEN} filters: - type: http_call authentication: type: basic credentials: username: ${DIR_USERNAME} password: ${DIR_PASSWORD} … output: type: httpRequest authentication: type: api-key credentials: key: ${DEST_API_KEY} location: query paramName: api_key ``` ## Secrets, never in YAML [#secrets-never-in-yaml] Credentials are always read from environment variables via the `${VAR}` syntax. The runtime substitutes them at startup; the resolved values are never logged. See [Environment variables](/docs/concepts/env-vars). ## Cross-references [#cross-references] # Defaults inheritance (/docs/concepts/defaults-inheritance) When several modules in a pipeline need the same timeout, retry, or `onError` policy, hoist them into a top-level `defaults` block. Any module that doesn't redefine the same key inherits the default. ## Top-level shape [#top-level-shape] ```yaml defaults: timeoutMs: 15000 onError: log retry: maxAttempts: 2 delayMs: 1000 backoffMultiplier: 2 maxDelayMs: 10000 retryableStatusCodes: - 429 - 500 - 503 useRetryAfterHeader: true ``` All three keys are independently optional. | Key | What it sets | | ----------- | ----------------------------------------------------------------------------------------------------- | | `timeoutMs` | HTTP request timeout for every HTTP-shaped module. | | `onError` | Default error policy. One of `fail`, `skip`, `log`. | | `retry` | Default retry block — full object, see [Retry & error handling](/docs/concepts/retry-error-handling). | ## Inheritance rules [#inheritance-rules] Each module reads its setting in this order, **first match wins**: 1. The value declared directly on the module. 2. The value declared in top-level `defaults`. 3. The module's built-in default (usually conservative — `onError: fail`, no retry). For scalar settings such as `timeoutMs` and `onError`, a module-level value completely overrides the corresponding default. For `retry`, inheritance is field-by-field: the top-level `defaults.retry` block is used as the base, then any fields declared in the module's own `retry` block override only those fields. This lets a module tune one retry setting without repeating the whole block. ## Override example [#override-example] ```yaml defaults: timeoutMs: 15000 onError: log retry: maxAttempts: 2 delayMs: 1000 input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/accounts # inherits timeoutMs=15000, onError=log, retry.maxAttempts=2 filters: - type: mapping onError: skip # override: drop the record on error mappings: … output: type: httpRequest endpoint: … method: POST requestMode: batch retry: maxAttempts: 4 # overrides only maxAttempts delayMs: 1000 backoffMultiplier: 2 ``` The `mapping` filter overrides `onError` only; it does not use `timeoutMs` or `retry`. The output overrides the retry fields it declares. Any retry fields omitted from the output's `retry` block still come from `defaults.retry`. ## When to hoist vs duplicate [#when-to-hoist-vs-duplicate] Hoist when the same value appears on **two or more** modules. Don't hoist a one-off — keep it on the module itself, where the reader will look for it. ## Cross-references [#cross-references] # Dry-run mode (/docs/concepts/dry-run) `cannectors run --dry-run ` runs everything up to (but not including) the output's side effects. It's the safest way to validate a fresh pipeline against a real source before you let it write to production. ## What dry-run does [#what-dry-run-does] 1. **Validates** the YAML against the JSON Schema. Same as `cannectors validate`. 2. **Resolves env vars** the same way a normal run does. 3. **Executes the input** — for real. The source API gets hit, the database query runs, the webhook listener starts. 4. **Runs every filter** — for real. HTTP enrichments, SQL enrichments, script transforms all execute against real services. 5. **Prepares the output** — formats the records as they would be sent, serializes them, etc. 6. **Prints a preview** — the prepared payloads are printed to stdout instead of being sent to the destination. Steps 3 and 4 are unchanged from a real run. Only the **output side effect** is suppressed. ## Why steps 3–4 still run [#why-steps-34-still-run] A pipeline isn't just YAML — it depends on the actual data shape coming out of the source. Real-running the input and filters is the only way to surface "this field is missing", "the API returns 401", "the condition expression doesn't compile against this record". If you don't want the source API hit either, point the pipeline at a fixture or a local mock. The [test lab](/docs/operations/test-lab) exists for exactly this case. ## Tuning the preview [#tuning-the-preview] The optional top-level `dryRunOptions` block controls what gets printed: ```yaml dryRunOptions: previewLimit: 5 # max number of preview entries to print includeHeaders: true # include the request headers in the preview includeBody: true # include the request body in the preview ``` Default is `previewLimit: 10`, both inclusions on. Lower the limit when the source returns large batches and you only need a sanity check. ## Sample output [#sample-output] ``` $ cannectors run --dry-run sync-orders.yaml · loading pipeline sync-orders v1.0.0 ✓ validated 1 input · 3 filters · 1 output · httpPolling GET https://source.example.com/api/orders · fetched 48 records · mapping 48 → 48 · condition 37 kept · 11 skipped · dry-run output preview (first 5 of 37): POST https://destination.example.com/api/orders/import Content-Type: application/json Authorization: Bearer ****** [ {"id": "ord_001", "email": "alice@example.com", "amount": 42.00}, {"id": "ord_002", "email": "bob@example.com", "amount": 19.50}, … ] ✓ dry-run complete — 0 destination writes ``` The body is the actual JSON that would be POSTed. Auth headers are redacted. ## When NOT to use dry-run [#when-not-to-use-dry-run] Dry-run isn't a substitute for testing. The input and filters all run for real, which means: * The source API sees a real request (and may rate-limit you). * `http_call` enrichments are billed/quota-consumed. * `script` filters run and can mutate external state if you've written them to. For deterministic testing, use the [local test lab](/docs/operations/test-lab) which spins up WireMock + PostgreSQL containers. ## Cross-references [#cross-references] # Environment variables (/docs/concepts/env-vars) Cannectors substitutes `${VAR_NAME}` references in the YAML with the corresponding environment variable at startup. This is the **only** way to pass secrets into a pipeline — credentials never live in the YAML, and the resolved values are never logged. ## Syntax [#syntax] ```yaml authentication: type: bearer credentials: token: ${SOURCE_BEARER_TOKEN} ``` The expression is `${NAME}`. Curly braces are required. The variable name must match `[A-Z_][A-Z0-9_]*` (uppercase letters, digits, underscores; can't start with a digit). ## When substitution happens [#when-substitution-happens] Substitution runs **once, at pipeline startup**, before the runtime is built and before the first input fetch. The resolved configuration is held in memory for the lifetime of the process. Implication: if you rotate a secret, you have to **restart the process** for Cannectors to pick up the new value. There's no hot-reload. ## Where it works [#where-it-works] Substitution happens on **string** values in the YAML, regardless of which module they belong to. The most common slots: | Slot | Example | | ----------------------------------- | ------------------------------------------------ | | Bearer / Basic / OAuth2 credentials | `token: ${SRC_TOKEN}` | | API key | `key: ${API_KEY}` | | Database connection strings | `connectionStringRef: ${WAREHOUSE_DATABASE_URL}` | | Webhook HMAC secrets | `secret: ${WEBHOOK_HMAC_SECRET}` | You can use `${VAR}` anywhere a string is expected, including in endpoints if you really want to: ```yaml endpoint: ${SOURCE_BASE_URL}/api/orders ``` ## What isn't substituted [#what-isnt-substituted] * **Numbers, booleans, arrays**: only strings get substituted. Don't write `maxAttempts: ${MAX_ATTEMPTS}` — keep numeric tuning inside the YAML. * **Templates**: the `{{record.x}}` syntax is **not** env-var substitution. Those are record placeholders, resolved per record at runtime — see [Records](/docs/concepts/records). * **Defaults**: if `${VAR}` is missing or empty, Cannectors fails fast at startup with a clear error. There's no implicit "default if unset" mechanism. ## Loading vars [#loading-vars] Cannectors reads `os.Environ()` — nothing else. Use whatever pattern your shell/runtime already uses to populate env: ```bash # direct export SOURCE_BEARER_TOKEN=… cannectors run pipeline.yaml # inline (one-shot) SOURCE_BEARER_TOKEN=… cannectors run pipeline.yaml # via .env file (your shell, not Cannectors) set -a; source .env; set +a cannectors run pipeline.yaml ``` For systemd, set `Environment=` or `EnvironmentFile=` in the unit. For Kubernetes, use `envFrom: secretRef` or `env: valueFrom: secretKeyRef`. Cannectors does **not** read `.env` files itself. Use your shell or a loader like `direnv`, `dotenv-cli`, or your container platform's secret injection. ## What gets logged [#what-gets-logged] The runtime logs the **structure** of the pipeline (module types, counts, schedule expression) but never the values resolved from env vars. If you see `${VAR}` in logs, it's because substitution failed — the value was never resolved. `cannectors validate --verbose` also redacts resolved credentials — you'll see `credentials: {token: "[redacted]"}` instead of the literal token. ## Cross-references [#cross-references] # Input → Filters → Output (/docs/concepts/input-filter-output) Every pipeline is the same three-stage shape. The input produces records; the filter chain transforms or drops them; the output consumes what survives. Filters run in **declared order**, top to bottom. ## The flow [#the-flow] ``` ┌─────────┐ batch ┌──────────┐ record ┌──────────┐ │ INPUT │ ─────────▶ │ FILTERS │ ──────────▶ │ OUTPUT │ └─────────┘ of N └──────────┘ one by └──────────┘ one ``` 1. The input fetches a **batch** of records (one HTTP response page, one SQL query result, one webhook delivery). 2. The runtime hands each record to the filter chain, **one at a time**, in the order filters are declared. 3. Each filter can transform the record, drop it, or pass it through. 4. Records that reach the end of the chain are buffered, then handed to the output. 5. The output sends records either **per-record** (`requestMode: single`) or as a **batch** (`requestMode: batch`). ## What's a "record"? [#whats-a-record] A record is one row of data — a JSON object, basically. Inputs unwrap the source's payload format to produce a list of records: * `httpPolling` reads an array from a JSON response (`dataField: orders` pulls `response.orders`). * `webhook` treats the POST body as a single record, or unwraps a `dataField` if configured. * `database` returns one record per row. Records are passed by value between filters — you can mutate them freely without affecting upstream stages. See [Records](/docs/concepts/records) for the access model. ## Filters in order [#filters-in-order] The order in the YAML **is** the order in execution. This is significant. ```yaml filters: - type: mapping # 1. flatten fields, normalize mappings: … - type: condition # 2. drop unpaid orders expression: "status == 'paid'" else: - type: drop - type: http_call # 3. enrich the ones that survived endpoint: … ``` If you swap the `condition` and `http_call` filters, you'd pay for HTTP enrichments on records you're about to drop. Order matters. ## What filters can do [#what-filters-can-do] | Action | Filters | | ------------------------------------- | -------------------------- | | Reshape fields | `mapping`, `set`, `remove` | | Decide whether a record continues | `condition` | | Enrich with external data | `http_call`, `sql_call` | | Run arbitrary JavaScript | `script` | | Drop the whole record | `drop` | | Iterate an array field on each record | `loop` | See [Filters reference](/docs/modules/filters) for every option of every filter. ## When a filter drops a record [#when-a-filter-drops-a-record] A dropped record is **silently removed from the chain**. Downstream filters and the output never see it. This is how a `condition` branch containing `drop` works, and it's also how a filter that errors out under `onError: skip` behaves. The output stats will reflect the drop: it'll only see records that made it all the way through. ## Pass-through pipelines [#pass-through-pipelines] If you want input → output with no transformation, declare `filters: []`: ```yaml filters: [] ``` Cannectors will still run validation, retry, auth, and state for you — you just get no record-level transformations. ## Cross-references [#cross-references] # Pipelines (/docs/concepts/pipelines) A Cannectors pipeline is a YAML file with four required top-level keys. JSON is still parsed (anything that's valid YAML 1.2 is parsed as YAML), but maintained examples and this documentation use YAML throughout. ## Required keys [#required-keys] ```yaml name: sync-orders input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch ``` | Key | Purpose | | --------- | ------------------------------------------------------------------------ | | `name` | Pipeline identifier shown in logs and execution results. | | `input` | Source module configuration — see [inputs](/docs/modules/inputs). | | `filters` | Ordered list of transformations. Use `[]` for a pass-through chain. | | `output` | Destination module configuration — see [outputs](/docs/modules/outputs). | `filters` is required even when empty. Use `filters: []` for a pass-through pipeline — leaving the key off makes validation fail with `missing required property "filters"`. ## Optional top-level keys [#optional-top-level-keys] ```yaml name: sync-orders version: 1.0.0 description: Forward paid orders to the internal import API. tags: - http-polling - http-output defaults: timeoutMs: 15000 onError: log retry: maxAttempts: 2 delayMs: 1000 dryRunOptions: previewLimit: 5 input: … filters: … output: … ``` | Key | Purpose | | --------------- | ----------------------------------------------------------------------------------------- | | `version` | Free-form pipeline version string. Logged on start. | | `description` | One-line summary. Logged on start, shown in `validate --verbose`. | | `tags` | List of free-form tags. Useful for grouping pipelines in tooling. | | `defaults` | Cross-cutting defaults — see [Defaults inheritance](/docs/concepts/defaults-inheritance). | | `dryRunOptions` | Tuning for `--dry-run` — see [Dry run](/docs/concepts/dry-run). | ## The pipeline lifecycle [#the-pipeline-lifecycle] 1. **Parse** — YAML is parsed into an internal config tree. 2. **Validate** — the tree is validated against the JSON Schema for every module type. This catches typos, missing required fields, invalid enum values, and shape mismatches. 3. **Resolve env vars** — every `${VAR}` reference is substituted from the process environment. 4. **Build runtime** — inputs, filters, and outputs are instantiated. 5. **Run** — the input emits batches of records; each batch flows through the filter chain in declared order; the output consumes what survives. 6. **Schedule or exit** — if the input has a `schedule`, the process stays alive and runs on CRON. Otherwise it exits with code 0. The runtime stops on the first fatal error unless `onError: skip` or `onError: log` is set somewhere along the chain — see [Retry & error handling](/docs/concepts/retry-error-handling). ## Module shape [#module-shape] Every input, filter, and output has a `type` field that selects the module, plus module-specific keys. ```yaml filters: - type: mapping # selects the module mappings: # module-specific options - source: id target: account_id ``` The full option set for every module is documented in the [Modules reference](/docs/modules). ## Cross-references [#cross-references] # Records (/docs/concepts/records) A **record** is the unit of data that moves through a pipeline. It's a JSON object — `string`, `number`, `boolean`, `null`, array, or nested object as values. ```json { "id": "ord_1Q2W3E", "status": "paid", "customer": { "email": "Hello@Example.com", "country": "FR" }, "items": [ { "sku": "SKU-1", "qty": 2 }, { "sku": "SKU-2", "qty": 1 } ] } ``` There's no schema requirement. Cannectors doesn't enforce field presence or types between filters — that's your call, per pipeline. ## Where records come from [#where-records-come-from] | Input | Record shape | | ------------- | --------------------------------------------------------------------------------------------------------------------------------------- | | `httpPolling` | One element of the array pulled from `dataField`. | | `soapPolling` | One object or array element pulled from the parsed SOAP response `dataField`; received MTOM parts are exposed under `_soapAttachments`. | | `webhook` | The POST body (or one element of `dataField` if configured). | | `database` | One row of the query result, with column names as keys. | ## Field paths [#field-paths] Most modules accept **dot-notated paths** to point at nested fields: ```yaml filters: - type: mapping mappings: - source: customer.email # reads customer.email target: email # writes top-level email transforms: - op: lowercase ``` Paths work for reading and writing. Writing to a missing intermediate object creates it. ```yaml - type: set target: metadata.source value: cannectors # creates `metadata` if absent ``` Array indexing isn't supported in paths — work on arrays inside a `script` filter if you need to iterate. ## Templates [#templates] A few modules (HTTP endpoints, body templates, SQL queries) accept `{{record.}}` placeholders. These are substituted **once per record** before the request fires. ```yaml output: type: httpRequest endpoint: https://api.example.com/customers/{{record.customerId}} method: GET ``` For `sql_call` and `database` outputs, placeholders are bound as **positional parameters** (`$1`, `$2`, …) — they are never string-spliced into the SQL. That makes them safe by construction. Never wrap a `{{record.x}}` placeholder in quotes inside a SQL query. It's already a parameter, not a string. ## Mutation between filters [#mutation-between-filters] Each filter receives a record, can mutate it, and returns it. The next filter sees the mutated version. Inputs are not consulted again once they've handed off a batch. This is important for filters like `http_call`, `soap_call`, and `sql_call` with `mergeStrategy: merge` — they overlay the fetched fields onto the current record before passing it down the chain. ## Cross-references [#cross-references] # Retry & error handling (/docs/concepts/retry-error-handling) Cannectors gives you two knobs for failures: **retry** (try the same thing again with backoff) and **`onError`** (what to do when retries are exhausted). ## `onError` [#onerror] Every module accepts an `onError` key. It picks what happens when the module fails after retries. | Value | Behavior | | ---------------- | ------------------------------------------------------------------------------ | | `fail` (default) | Stop the pipeline. Exit with code 3. | | `skip` | Drop the affected record and continue. | | `log` | Log the error at WARN level and continue. The record passes through unchanged. | ```yaml filters: - type: mapping onError: skip # drop records whose mapping fails mappings: … - type: http_call onError: log # keep going, log warnings on failures endpoint: … ``` For inputs and outputs, `onError: skip` is rarely useful — there's no record to skip yet for an input, and the output is the last stage anyway. `fail` and `log` are the common choices there. ## Retry block [#retry-block] For HTTP-shaped and SOAP-shaped modules (`httpPolling`, `http_call`, `httpRequest`, `soapPolling`, `soap_call`, `soapRequest`), the `retry` block defines how the module retries before giving up and applying `onError`. ```yaml retry: maxAttempts: 3 # total attempts, including the first delayMs: 500 # initial backoff backoffMultiplier: 2 # exponential factor maxDelayMs: 5000 # cap on the backoff retryableStatusCodes: - 429 - 500 - 502 - 503 - 504 useRetryAfterHeader: true ``` ### Fields [#fields] | Field | Default | Meaning | | ---------------------- | --------------------------- | ---------------------------------------------------- | | `maxAttempts` | `1` | Total attempts. `1` means "no retry". | | `delayMs` | `1000` | Backoff before the first retry. | | `backoffMultiplier` | `2` | Multiplied into the delay for each subsequent retry. | | `maxDelayMs` | `10000` | Upper bound on the computed delay. | | `retryableStatusCodes` | `[429, 500, 502, 503, 504]` | HTTP statuses considered retryable. | | `useRetryAfterHeader` | `true` | If the server sends `Retry-After`, honor it. | ### What gets retried [#what-gets-retried] * **HTTP status codes** in `retryableStatusCodes`. * **Transport errors** (TCP refused, DNS, TLS handshake failures, read timeouts). * **`429 Too Many Requests`** with a `Retry-After` header — Cannectors sleeps for the indicated duration before retrying, ignoring the computed backoff. Everything else fails immediately, no retry. ## Defaults inheritance [#defaults-inheritance] Drop a `retry` block into top-level `defaults` and every HTTP-shaped module inherits it. Module-level retry blocks merge field-by-field with the default: declared module fields override the default, while omitted fields keep the inherited value. See [Defaults inheritance](/docs/concepts/defaults-inheritance) for the exact rules. ```yaml defaults: onError: log retry: maxAttempts: 3 delayMs: 500 backoffMultiplier: 2 retryableStatusCodes: [429, 500, 502, 503, 504] ``` ## What "fail" actually does [#what-fail-actually-does] When a module fails with `onError: fail` (or is left as default and fails), Cannectors: 1. Logs the error at ERROR level with context (pipeline, module type, record index, root cause). 2. Stops the pipeline immediately. No further records are fetched or processed. 3. Exits the process with code **3** (`runtime errors`). For scheduled pipelines, the process exits — your supervisor decides whether to restart. ## Exit codes [#exit-codes] | Code | Meaning | | ---- | ----------------- | | `0` | Success | | `1` | Validation errors | | `2` | Parse errors | | `3` | Runtime errors | `validate` produces 0, 1, or 2. `run` adds 3 to that. ## Cross-references [#cross-references] # Scheduling (/docs/concepts/scheduling) Cannectors has two execution modes, picked by whether the input has a `schedule` field. ## Run once [#run-once] Without a `schedule`, `cannectors run` fires the pipeline exactly once and exits with code 0 on success, or a non-zero code on failure. ```yaml input: type: httpPolling endpoint: https://source.example.com/api/orders dataField: orders # no schedule → run once ``` Useful for one-shot syncs, backfills, or CI-driven jobs where an external scheduler (GitHub Actions, Airflow, Argo Workflows) handles the cadence. ## Run on a schedule [#run-on-a-schedule] With a `schedule`, `cannectors run` stays alive and triggers the pipeline on every CRON match. ```yaml input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders ``` The CLI process blocks until interrupted with `SIGINT` (Ctrl-C) or `SIGTERM` (your container runtime stopping the container). Cannectors finishes the current run before shutting down. ## CRON format [#cron-format] Cannectors uses the standard **5-field CRON** format: ``` ┌───────────── minute (0–59) │ ┌───────────── hour (0–23) │ │ ┌───────────── day of month (1–31) │ │ │ ┌───────────── month (1–12) │ │ │ │ ┌───────────── day of week (0–6, Sunday = 0) │ │ │ │ │ * * * * * ``` | Expression | Meaning | | -------------- | --------------------------------- | | `*/15 * * * *` | Every 15 minutes | | `0 * * * *` | Every hour, on the hour | | `0 */6 * * *` | Every 6 hours, on the hour | | `0 9 * * 1-5` | 9 AM on weekdays | | `0 0 1 * *` | First day of each month, midnight | Schedule strings must be at least 9 characters long (this is the only length validation). The CRON parser is the one from [`github.com/robfig/cron`](https://pkg.go.dev/github.com/robfig/cron), which is what most Go schedulers use. ## Inputs that accept `schedule` [#inputs-that-accept-schedule] | Input | `schedule` | | ------------- | ----------------------------------------- | | `httpPolling` | yes | | `soapPolling` | yes | | `database` | yes | | `webhook` | no — it's a long-running listener instead | A webhook input doesn't schedule; it starts an HTTP server and waits for callers. The process stays alive the same way a scheduled pipeline does. ## Overlap behavior [#overlap-behavior] If a scheduled run is still in flight when the next tick fires, Cannectors **skips the next tick** and logs a warning. Runs do not overlap. Pick a schedule comfortably longer than your worst-case run time, or shrink the work each run does (smaller pages, cursors). ## Production deployment [#production-deployment] For scheduled pipelines, run the binary under a process supervisor: * `systemd` — see [Operations · Scheduling in production](/docs/operations/scheduling-in-production) * Kubernetes Deployment / StatefulSet (one replica) * Container platforms (ECS, Cloud Run, Fly.io) The runtime is single-process and single-replica by design — multiple replicas would all fire on the same CRON tick and double-process records. ## Cross-references [#cross-references] # SOAP (/docs/concepts/soap) Cannectors supports SOAP without WSDL code generation. You provide the raw XML fragment that belongs inside ``, then the runtime builds the envelope, applies templating, sends the request, and parses the XML response back into records. Use the SOAP modules when an integration exposes SOAP operations but you still want the same pipeline model as HTTP modules: polling, per-record enrichment, or output delivery. ## Runtime model [#runtime-model] There is no WSDL parsing at runtime. The WSDL remains a design-time artifact you can inspect with a vendor document, SoapUI, or Postman. In the pipeline, configure: ```yaml endpoint: https://soap.example.com/orders soapVersion: "1.1" soapAction: urn:SubmitOrder operation: SubmitOrder body: | {{record.orderId}} ``` `operation` is a logical name for logs and observability. The actual SOAP payload is the XML in `body`, plus `soapAction` when the service requires it. ## SOAP 1.1 vs 1.2 [#soap-11-vs-12] Set `soapVersion` to `"1.1"` or `"1.2"`. The default is `"1.1"`. | Setting | SOAP 1.1 | SOAP 1.2 | | ------------------ | ------------------------------------------- | --------------------------------------------------- | | Envelope namespace | `http://schemas.xmlsoap.org/soap/envelope/` | `http://www.w3.org/2003/05/soap-envelope` | | HTTP content type | `text/xml; charset=utf-8` | `application/soap+xml; charset=utf-8; action="..."` | | SOAP action | `SOAPAction` header | `action` parameter on `Content-Type` | | Fault shape | `faultcode`, `faultstring`, `detail` | `Code`, `Reason`, `Detail` | Do not set your own `Content-Type` or `SOAPAction` in `httpHeaders`. The SOAP client owns those headers so the binding stays consistent with `soapVersion`. ## XML templating [#xml-templating] SOAP `body`, SOAP header fragments, `endpoint`, HTTP headers, MTOM `contentId`, and WS-Security credentials can be templated with record values. Text inserted into XML is escaped at runtime, so a value containing `<`, `&`, or `` remains text rather than becoming markup. ```yaml body: | {{record.customerId}} ``` The template engine also supports defaults: ```yaml {{record.pagination.cursor | default: ""}} ``` If you need XML elements or attributes as markup, write them directly in the template. Record substitutions are for values. ## Response paths [#response-paths] SOAP responses are parsed as XML maps. `dataField` selects the sub-tree that becomes records or the value to merge: ```yaml dataField: Envelope.Body.ListOrdersResponse.Orders.Order ``` Namespaces are decoded according to the XML parser's map output. In most raw responses the path starts with `Envelope.Body...`; when an upstream preserves namespace prefixes in element names, include those names exactly as they appear in the parsed response. If `dataField` is omitted on `soap_call`, the whole parsed SOAP response is merged, replaced, or appended. On `soapPolling`, `dataField` should point at the array or object that represents emitted records. ## MTOM send [#mtom-send] Outgoing MTOM uses explicit XOP references. The body must contain the `xop:Include` element, and the `contentId` in `mtom.attachments` must resolve to the same value. ```yaml body: | {{record.documentId}} mtom: enabled: true attachments: - contentId: "{{record.documentId}}" contentType: application/pdf sourceField: documentBase64 encoding: base64 ``` `encoding: base64` decodes a string field before sending the MIME part. The default, `binary`, sends string bytes as-is and passes byte arrays through. Keep the `cid:` in the XML and the attachment `contentId` in sync. If one changes without the other, the receiver will see an XOP reference with no matching MIME part. ## MTOM receive [#mtom-receive] When a SOAP response is `multipart/related`, Cannectors parses the root XML part and exposes MIME attachments on each emitted record under `_soapAttachments`: ```yaml _soapAttachments: file-1: contentId: file-1 contentType: application/pdf data: ``` Use a mapping or script filter after `soapPolling` if you want to copy an attachment into a domain-specific field before sending it onward. ## WS-Security UsernameToken [#ws-security-usernametoken] `wsSecurity` supports UsernameToken only: `PasswordText` and `PasswordDigest`. It does not implement X.509 certificates, XML signing, or XML encryption. ```yaml wsSecurity: username: soap-user password: ${SOAP_PASSWORD} passwordType: PasswordDigest mustUnderstand: true ``` Prefer environment variables or your deployment secret mechanism for passwords. Use `PasswordDigest` when the SOAP service accepts it; use `PasswordText` only when the service requires plain-text UsernameToken passwords. ## Faults and retries [#faults-and-retries] SOAP faults are surfaced as typed SOAP errors before the module decides whether the pipeline failed, skipped, or logged the record. Retry still follows the module `retry` block and the HTTP status code returned by the server. ```yaml retry: maxAttempts: 2 retryableStatusCodes: [500, 502, 503, 504] ``` For SOAP outputs, `success.statusCodes` is evaluated after the SOAP client has accepted the HTTP response. A non-2xx status with no SOAP fault is treated as an HTTP error, so custom SOAP success codes are mainly useful for distinguishing successful 2xx responses. ## Known limits [#known-limits] * No WSDL parsing, code generation, or SOAP/XSD schema validation. * No WS-Security signing, encryption, or X.509 support. * Unknown fields in SOAP module YAML follow the current project-wide schema behavior and may not be rejected in every nested position. * Received MTOM attachments use the `_soapAttachments` convention until a later version adds a configurable attachment destination. # State persistence (/docs/concepts/state-persistence) When a scheduled pipeline restarts — because of a deploy, an OOM, or your process supervisor — you usually don't want to refetch everything from scratch. **State persistence** keeps a small cursor file on disk that the input reads at boot and writes after every successful batch. ## The three persistable cursors [#the-three-persistable-cursors] | Type | What's tracked | Typical query parameter | | ----------- | ---------------------------------------- | ------------------------ | | `id` | The largest record ID seen so far | `after_id`, `since_id` | | `timestamp` | The largest record timestamp seen so far | `updated_after`, `since` | | `offset` | Pagination offset | `offset` | Most inputs support a mix. You pick the ones that match the source API's pagination semantics. ## httpPolling example [#httppolling-example] ```yaml input: type: httpPolling schedule: "*/10 * * * *" endpoint: https://source.example.com/api/events dataField: events statePersistence: timestamp: enabled: true field: event.timestamp # which record field is the timestamp queryParam: updated_after # how the source API expects it id: enabled: true field: event.id # which record field is the ID queryParam: after_id storagePath: ./.cannectors-state ``` | Field | Meaning | | ------------- | ------------------------------------------------------------- | | `enabled` | Master switch for this cursor type. | | `field` | Dot-notated path to the field on the record. | | `queryParam` | The query parameter name the source API expects. | | `storagePath` | Directory where state files live. Shared across cursor types. | After each successful batch, Cannectors writes the max-seen values into `./.cannectors-state/.json`. On the next run, it reads that file and appends the relevant query parameters to the request. ## database example [#database-example] For `database` inputs, the cursor is a SQL column instead of a query parameter: ```yaml input: type: database connectionStringRef: ${SOURCE_DATABASE_URL} query: | SELECT id, updated_at, payload FROM events WHERE updated_at > $1 ORDER BY updated_at ASC LIMIT 500 statePersistence: timestamp: enabled: true field: updated_at storagePath: ./.cannectors-state ``` The placeholder `$1` is bound to the persisted timestamp at runtime. The first run, when no state exists yet, gets `NULL` — handle that in your SQL (`COALESCE($1::timestamptz, '-infinity'::timestamptz)` is a common pattern). ## Where to put `storagePath` [#where-to-put-storagepath] | Environment | Recommended path | | ----------------------------- | ------------------------------------------------------------------------ | | Local dev | `./.cannectors-state` (`.gitignore` it) | | Single host (systemd) | `/var/lib/cannectors/state` | | Kubernetes | A `PersistentVolumeClaim` mounted at e.g. `/state` | | Container with ephemeral disk | Mount an attached block volume; **never** trust the container filesystem | If the storage disappears between runs, the pipeline restarts from scratch — possibly re-processing already-shipped records. Before v22.6, the `storagePath` was only read by the input. The executor would save to a different default path, so cursors never made the round-trip cleanly. From v22.6 onwards, the executor shares the input's storage. If you ever see `state file written but never read` behaviour on an old version, upgrade. ## Backfills [#backfills] To force a fresh full sync, delete the relevant state file: ```bash rm ./.cannectors-state/.json ``` The next run will treat itself as a fresh start. There is no "backfill from date X" knob in the YAML — instead, write to the state file manually if you really need to seed it: ```bash echo '{"timestamp": "2026-01-01T00:00:00Z"}' \ > ./.cannectors-state/.json ``` ## Cross-references [#cross-references] # Architecture (/docs/developer/architecture) Cannectors is a single Go process organized as four cooperating layers. Records move through them in one direction. ## The four layers [#the-four-layers] ``` ┌─────────────────────────────────────────────────┐ │ cmd/cannectors CLI entry point │ │ ↓ parses flags, picks a subcommand │ ├─────────────────────────────────────────────────┤ │ internal/config YAML → pipeline tree │ │ ↓ validates against JSON Schema │ │ ↓ resolves ${ENV} references │ ├─────────────────────────────────────────────────┤ │ internal/runtime builds modules, scheduler │ │ ↓ wires input → filters → output │ ├─────────────────────────────────────────────────┤ │ internal/modules inputs · filters · outputs │ │ ↓ each module is one Go package │ └─────────────────────────────────────────────────┘ ``` Each layer depends only on the layer above. The runtime never reaches into `cmd/`, modules never reach into `runtime/`. ## `cmd/cannectors` [#cmdcannectors] The CLI. Parses subcommand + flags, opens the YAML, hands off to `internal/config`. Tiny — under 200 lines. ## `internal/config` [#internalconfig] Owns the **pipeline tree** — the in-memory representation between "YAML on disk" and "running modules". Responsibilities: * Parse YAML / JSON into a typed config struct. * Validate against the JSON Schemas under `internal/config/schema/*.json`. * Substitute `${ENV}` references. * Apply top-level `defaults` to module-level configs. * Surface validation errors with line + column. The schemas live here, not in the modules, because the validator needs to type-check before any module code runs. ## `internal/runtime` [#internalruntime] Turns the validated config into running modules and wires them together. Responsibilities: * Instantiate one input, N filters, one output from their config. * Build the scheduler if the input has a `schedule`. * Build the per-batch executor that feeds records through the filter chain. * Manage the `StateStore` shared between input and executor. * Handle process signals (SIGINT/SIGTERM) → graceful drain. The runtime is what gives "input → filters → output" its strong single-direction guarantee. Modules don't know about each other; the runtime is the only place that holds the whole graph. ## `internal/modules/{input,filter,output}/` [#internalmodulesinputfilteroutput] The leaf layer. One Go package per module type. Each module implements a small interface (`Fetch` / `Process` / `Send` depending on the slot) plus a `Config` struct that the validator can populate. Module packages own: * Their own retry, auth, cache, and template logic — these are package-private helpers. * Tests against the real protocol (HTTP servers, sqlmock for SQL). * Their slice of the JSON Schema is what we vendor into this site. See [Module extensibility](/docs/developer/module-extensibility) for the contract a new module needs to satisfy. ## Where to start reading [#where-to-start-reading] | You want to … | Read | | --------------------------- | ---------------------------------------------------------------- | | Add a CLI flag | `cmd/cannectors/main.go` | | Tighten a schema | `internal/config/schema/*.json` | | Change scheduling behaviour | `internal/runtime/scheduler.go` | | Add a new transform | `internal/modules/filter/mapping_transforms.go` | | Add a new input type | `internal/modules/input/` — copy `http_polling.go` as a template | ## See also [#see-also] # Contributing (/docs/developer/contributing) Cannectors welcomes contributions. The codebase is small, the tests are quick, and the bar for code review is "is this consistent with the surrounding code". ## Local dev setup [#local-dev-setup] ```bash git clone https://github.com/alexandrecano/cannectors.git cd cannectors go mod download make build # builds bin/cannectors make test # full test suite (~30s) make lint # golangci-lint ``` Requirements: * **Go 1.25+** (see `go.mod`) * **golangci-lint v2.7.1+** for `make lint` * **Docker** for the local test lab (`make test-lab-up`) * **`make`** for the canonical commands; the Makefile is short and readable if you'd rather invoke `go` directly ## What to run before opening a PR [#what-to-run-before-opening-a-pr] ```bash make test # everything must pass make lint # zero issues make validate-examples # the 33 maintained examples still validate ``` Three quick gates that match what CI runs. ## Conventions [#conventions] ### Package layout [#package-layout] Module packages live one-per-type under `internal/modules/{input,filter,output}/`. Don't introduce sub-packages inside a module unless the module is genuinely large. ### Tests [#tests] * Co-locate test files with source: `foo.go` + `foo_test.go`. * Use `t.Helper()` in test helpers so failure messages point at the call site. * Integration tests against external services (HTTP, SQL) use real containers in the test lab or in-memory test servers — not mocks. Mocks drift; real servers don't. * Prefer table-driven tests for cases that share scaffolding. ### Errors [#errors] * Use `errors.Join` for multi-error returns (validation, batch output). The runtime aggregates them for the user. * Wrap with `fmt.Errorf("...: %w", err)` to preserve causality. * Don't return raw third-party errors at module boundaries — they leak implementation details. ### Logging [#logging] * Use `log/slog` with the standard fields (`pipeline`, `module`, `msg`). The output goes to stdout as JSON. * Don't log credentials. Don't log entire records at `info` level — use `debug` if you must. ## Commit messages [#commit-messages] One-line summary, optional body. Reference issues by number. Examples that match the existing log: * `feat(input): add kafkaConsumer input module` * `fix(http_call): honor Retry-After header in 429 responses` * `docs: clarify state-persistence ordering across cursors` * `chore: bump golangci-lint to v2.8.0` Don't rebase or amend after a review starts — push fix-up commits and squash on merge. ## PR checklist [#pr-checklist] Before clicking "Ready for review": * [ ] `make test` is green. * [ ] `make lint` reports zero issues. * [ ] Any new module is documented under `cannectors-doc/content/docs/modules/...` and rendered options match the schema. * [ ] A test was added or updated. * [ ] The PR description names the user-facing change (feature, bug fix, refactor). ## Reporting bugs / proposing changes [#reporting-bugs--proposing-changes] Open an issue on [github.com/alexandrecano/cannectors/issues](https://github.com/alexandrecano/cannectors/issues). For non-trivial proposals (new module, schema change), open the issue first to align on the design before opening the PR. ## License [#license] Apache 2.0. Contributions are licensed under the same terms by opening the PR. # Execution flow (/docs/developer/execution-flow) This is the exact sequence the runtime walks every time `run` fires. Useful to read alongside the source — most of it lives under `internal/runtime/`. ## Steps [#steps] ``` 1. parse YAML → config tree 2. validate → errors or OK 3. resolve env vars → concrete config 4. build runtime → input, filters[], output, scheduler 5. tick → scheduler fires (CRON or one-shot) 6. fetch → input emits a batch of records 7. for each record: 7a. for each filter in order: → mutate / drop / branch 7b. queue to output 8. flush → output sends (batch) or per-record (single) 9. persist state → cursors written 10. log result → summary on stdout 11. loop or exit → scheduled? back to step 5. else exit 0. ``` ## Step-by-step [#step-by-step] ### 1. Parse [#1-parse] `internal/config/parser.go` opens the file, normalizes whitespace, and converts to YAML AST. JSON works too — `gopkg.in/yaml.v3` parses both. Output is a Go struct tree. ### 2. Validate [#2-validate] `internal/config/validator.go` runs the parsed tree through the JSON Schema. Validation is recursive — each module type's schema is walked, with `allOf` and `oneOf` resolved to pick the right shape. Errors include line + column from the original YAML so the user sees useful messages. ### 3. Resolve env vars [#3-resolve-env-vars] `internal/config/env.go` walks the tree, substituting every `${VAR_NAME}` with the corresponding value from `os.Environ()`. Substitution happens **once at startup**, so a rotated secret needs a process restart. If a `${VAR}` doesn't resolve, the runtime exits before step 4 with a clear error. ### 4. Build runtime [#4-build-runtime] `internal/runtime/builder.go` walks the validated config and instantiates one input, the filter chain, and one output by calling each module's `New(config)` constructor. The constructor returns a typed module value (e.g. `*HTTPPollingInput`). The builder also wires the shared `StateStore` and the scheduler. ### 5. Tick [#5-tick] If the input has a `schedule`, the scheduler runs the pipeline on every CRON match. Without a schedule, it runs exactly once. If the runtime is still processing when the next tick fires, the runtime **skips** that tick and logs a warning. Runs never overlap. ### 6. Fetch [#6-fetch] The input's `Fetch(ctx)` method runs. Implementations: * `httpPolling`: builds the URL (incl. pagination + state query params), sends GET, parses the JSON, returns `dataField` as the records. * `webhook`: returns the current delivery (one per HTTP POST). * `database`: opens a connection, runs the query, scans rows. Each `Fetch` returns `[]Record` plus an opaque state delta that the runtime will persist later. ### 7. Filter chain [#7-filter-chain] For each record, the executor calls each filter's `Process(ctx, record)` in declared order. Filters return: * A modified record (continue down the chain), * `nil` (drop the record), * An error (handled by `onError`). The chain stops at the first drop. A record only reaches the output if **every** filter passed it through. ### 8. Flush [#8-flush] The output's `Send(ctx, records)` runs. In `batch` mode, this is one call with all records that survived. In `single` mode, it's one call per record. Retries happen inside `Send`. The runtime above only sees success or the post-retry error. ### 9. Persist state [#9-persist-state] After a successful batch, the runtime asks the input for its state delta and writes it to `/.json`. The write is atomic (tempfile + rename) — a crash mid-write leaves the previous state intact. ### 10. Log [#10-log] A summary line goes to stdout: ```json {"time":"…","level":"info","pipeline":"sync-orders","msg":"complete","records":93,"duration_ms":1840} ``` ### 11. Loop or exit [#11-loop-or-exit] If the input has a schedule and the process hasn't received a stop signal, the runtime sleeps until the next tick. Otherwise it exits 0. ## Graceful shutdown [#graceful-shutdown] SIGINT / SIGTERM cancels the context passed through every layer. The runtime then: 1. Stops accepting new ticks (scheduled mode) or new webhook deliveries (listener mode). 2. Finishes the current batch — `Fetch` and `Process` calls already in flight see a cancelled context and decide whether to abort cleanly or finish what they started. 3. Flushes the output for completed records. 4. Persists state. 5. Closes connections (DB pools, HTTP transports). 6. Exits 0. `TimeoutStopSec=30s` (systemd) or `terminationGracePeriodSeconds: 60` (Kubernetes) gives the runtime enough time for steps 2–5. ## See also [#see-also] # Developer (/docs/developer) This section is for developers who want to **extend** Cannectors — add a new input/filter/output module, contribute a transform, or just understand why the runtime does what it does. Cannectors is open source under [Apache 2.0](https://github.com/alexandrecano/cannectors/blob/main/LICENSE). The codebase is small and well-organized — about 30k lines of Go, mostly under `internal/`. Most contributions land in a single package. # Module extensibility (/docs/developer/module-extensibility) Adding a module is a four-step process: implement the interface, declare the JSON Schema, register the module type, ship tests. ## The interfaces [#the-interfaces] Each module group has its own interface in `internal/modules/`. ### Input [#input] ```go type Input interface { // Fetch reads the next batch. For scheduled inputs, called on each // CRON tick. For one-shot inputs, called exactly once. Fetch(ctx context.Context) ([]Record, StateDelta, error) // Schedule returns the CRON expression, or "" for one-shot inputs. Schedule() string } ``` Webhook-style inputs use a slightly different callback-based shape — see `webhook.go` for the pattern. ### Filter [#filter] ```go type Filter interface { // Process transforms or drops a single record. Returning // (nil, nil) drops the record from the chain. Process(ctx context.Context, r Record) (Record, error) } ``` ### Output [#output] ```go type Output interface { // Send delivers a batch (requestMode: batch) or a single record // wrapped in a one-element slice (requestMode: single). Send(ctx context.Context, batch []Record) error // Preview renders what would be sent without actually sending. // Called only in --dry-run mode. Preview(ctx context.Context, batch []Record) (string, error) } ``` ## Walkthrough — a new input module [#walkthrough--a-new-input-module] Let's add a hypothetical `kafkaConsumer` input. ### 1. Create the package [#1-create-the-package] ``` internal/modules/input/kafka_consumer.go internal/modules/input/kafka_consumer_test.go ``` ### 2. Define the config struct [#2-define-the-config-struct] ```go type kafkaConsumerConfig struct { Brokers []string `yaml:"brokers"` Topic string `yaml:"topic"` GroupID string `yaml:"groupId"` // … } ``` ### 3. Implement `Input` [#3-implement-input] ```go type kafkaConsumer struct { cfg kafkaConsumerConfig client *kafka.Reader } func newKafkaConsumer(cfg kafkaConsumerConfig) (*kafkaConsumer, error) { // construct the kafka reader return &kafkaConsumer{cfg: cfg, client: client}, nil } func (k *kafkaConsumer) Fetch(ctx context.Context) ([]Record, StateDelta, error) { // poll until ctx is done or a batch is full // return records + offset delta } func (k *kafkaConsumer) Schedule() string { return "" // long-running, not CRON-driven } ``` ### 4. Register the type [#4-register-the-type] In `internal/modules/input/input.go`: ```go var registry = map[string]func(cfg map[string]any) (Input, error){ "httpPolling": newHTTPPolling, "webhook": newWebhook, "database": newDatabase, "kafkaConsumer": newKafkaConsumer, // ← here } ``` ### 5. Declare the schema [#5-declare-the-schema] Add to `internal/config/schema/input-schema.json`: ```json { "$defs": { "kafkaConsumerInputConfig": { "type": "object", "required": ["brokers", "topic", "groupId"], "properties": { "brokers": { "type": "array", "items": { "type": "string" }, "minItems": 1 }, "topic": { "type": "string", "minLength": 1 }, "groupId": { "type": "string", "minLength": 1 } } }, "inputModule": { "allOf": [ { "$ref": "common-schema.json#/$defs/moduleBase" }, { "oneOf": [ ...existing branches..., { "allOf": [ { "$ref": "#/$defs/kafkaConsumerInputConfig" }, { "properties": { "type": { "const": "kafkaConsumer" } } } ] } ] } ] } } } ``` ### 6. Tests [#6-tests] At minimum: * A contract test that the new module satisfies `Input`. * A schema test confirming a valid YAML parses and an invalid one fails. * An integration test using a real Kafka container (or a Sarama mock). The existing modules' `*_test.go` files are the templates to follow. ## Documentation [#documentation] Once the module is in `main`: 1. `pnpm sync-schemas ../cannectors` in the docs repo. 2. Add a `content/docs/modules/inputs/kafka-consumer/index.mdx` with a minimal example + ``. 3. `pnpm generate-subpages` to create sub-pages for any complex options. The options table renders itself from the schema you just added — zero manual table-writing. ## Conventions [#conventions] * Module type names are **camelCase** for input/output families (`httpPolling`, `httpRequest`, `soapPolling`, `soapRequest`) and **snake\_case** for per-record call filters (`http_call`, `soap_call`, `sql_call`). Pick one consistent with the existing peers. * Config field names are `camelCase` in YAML, mapped via `yaml:"…"` tags. * Module package names match the type, with underscores → no dashes in package names. ## See also [#see-also] # JSON Schemas (/docs/developer/schemas) Cannectors uses JSON Schema Draft 2020-12 to validate pipelines. The schemas live under [`internal/config/schema/`](https://github.com/alexandrecano/cannectors/tree/main/internal/config/schema) in the main repository and are the single source of truth for what a pipeline can declare. ## The six files [#the-six-files] | File | Purpose | | ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | | `pipeline-schema.json` | Top-level shape — `name`, `version`, `input`, `filters`, `output`, `defaults`. | | `common-schema.json` | Cross-cutting types — `httpRequestBase`, `soapRequestBase`, `retryConfig`, `pagination`, `databaseConnectionConfig`, `statePersistenceConfig`. | | `input-schema.json` | The input variants (`httpPolling`, `soapPolling`, `webhook`, `database`). | | `filter-schema.json` | The filter variants (`mapping`, `condition`, `script`, `http_call`, `soap_call`, `sql_call`, `set`, `remove`, `drop`). | | `output-schema.json` | The output variants (`httpRequest`, `soapRequest`, `database`). | | `auth-schema.json` | The four authentication variants (`api-key`, `bearer`, `basic`, `oauth2`). | Files reference each other via `$ref` to keep the common pieces shared. The validator resolves these cross-file references during `cannectors validate` and the `run` startup path. ## Composition pattern [#composition-pattern] Each module config is structured the same way: ```json { "httpPollingInputConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/httpRequestBase" }, { "type": "object", "properties": { "schedule": { ... }, "pagination": { ... }, "statePersistence": { ... }, "dataField": { ... }, "retry": { ... } } } ] } } ``` The `allOf` lists shared bases first (here `httpRequestBase` contributes `endpoint`, `method`, `headers`, `queryParams`, `body`, `authentication`, `timeoutMs`), then adds module-specific properties. The wrapper (`inputModule`) uses `oneOf` over `{ type: const }` to pick the right config: ```json { "inputModule": { "allOf": [ { "$ref": "common-schema.json#/$defs/moduleBase" }, { "oneOf": [ { "allOf": [ { "$ref": "#/$defs/httpPollingInputConfig" }, { "properties": { "type": { "const": "httpPolling" } } } ] }, ... ] } ] } } ``` This pattern keeps every module's options discoverable through one known path: `-schema.json#/$defs/InputConfig`. ## How this site uses them [#how-this-site-uses-them] The `` you see on every module reference page is built by walking these schemas at build time. The vendored copy lives under `cannectors-doc/schemas/cannectors/` and is synced from the upstream repo via `pnpm sync-schemas`. `pnpm check-schemas` asserts the vendored manifest matches the disk — a CI hook fails the build if the docs drift from the binary. ## Adding a property [#adding-a-property] To make a new option visible: 1. Add it under the relevant `properties` in the right schema file. 2. Bump the schema's `$id` (the version string) if the change is non-backward-compatible. 3. Re-run the cannectors test suite — schema-driven tests will catch missing field handling in the runtime. 4. Sync the docs site: `pnpm sync-schemas ../cannectors` then `pnpm generate-subpages`. The same JSON object now drives validation in the binary and documentation on this site — no risk of the two going out of sync. ## See also [#see-also] # Examples (/docs/examples) The `cannectors/examples/` directory holds 34 maintained YAML pipelines that the test suite validates on every commit. They cover every module and most combinations — copying one of them is the fastest way to bootstrap a new pipeline. Every example is rendered live elsewhere in this site via [``](/docs/modules) — open any module page and scroll to "Examples" to see the YAML inline. This page is the index. ## Running them [#running-them] ```bash # Validate all make validate-examples # Validate one cannectors validate ./examples/01-http-polling-basic-to-http-batch.yaml # Dry-run one cannectors run --dry-run ./examples/01-http-polling-basic-to-http-batch.yaml ``` They target `https://source.example.com` / `https://destination.example.com` placeholder hosts. Edit the endpoints or copy the file before pointing at real systems. ## HTTP polling [#http-polling] | File | Covers | | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------- | | [01-http-polling-basic-to-http-batch.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/01-http-polling-basic-to-http-batch.yaml) | Basic polling, no auth, batched HTTP output | | [02-http-polling-page-pagination.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/02-http-polling-page-pagination.yaml) | Page-based pagination | | [03-http-polling-offset-pagination-state.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/03-http-polling-offset-pagination-state.yaml) | Offset pagination + state persistence | | [04-http-polling-cursor-oauth2.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/04-http-polling-cursor-oauth2.yaml) | Cursor pagination + OAuth2 | ## Webhook [#webhook] | File | Covers | | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------- | | [05-webhook-hmac-to-http-single.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/05-webhook-hmac-to-http-single.yaml) | HMAC verification, per-record HTTP output | | [06-webhook-queue-rate-limit-to-database.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/06-webhook-queue-rate-limit-to-database.yaml) | Async queue + rate limit, database output | ## Database I/O [#database-io] | File | Covers | | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------- | | [07-database-input-basic-to-http.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/07-database-input-basic-to-http.yaml) | Database input → HTTP output | | [08-database-input-limit-offset-to-database.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/08-database-input-limit-offset-to-database.yaml) | Limit/offset pagination, DB → DB | | [09-database-input-cursor-incremental.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/09-database-input-cursor-incremental.yaml) | Cursor + incremental queries | ## Transformations [#transformations] | File | Covers | | ----------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------ | | [10-mapping-transforms-all.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/10-mapping-transforms-all.yaml) | Every mapping transform | | [11-condition-nested-routing.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/11-condition-nested-routing.yaml) | Nested condition routing | | [12-script-inline-transform.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/12-script-inline-transform.yaml) | Inline JavaScript transform | | [13-script-file-transform.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/13-script-file-transform.yaml) | External JavaScript file | | [25-loop-cells-extraction.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/25-loop-cells-extraction.yaml) | Iterate `cells[]` and extract values by columnId | ## Enrichment [#enrichment] | File | Covers | | ------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------- | | [14-http-call-get-merge-cache.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/14-http-call-get-merge-cache.yaml) | GET enrichment, merge, cache | | [15-http-call-query-header-append.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/15-http-call-query-header-append.yaml) | Query + header keys, append strategy | | [16-http-call-post-template-replace.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/16-http-call-post-template-replace.yaml) | POST + body template, replace strategy | | [17-sql-call-merge-cache.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/17-sql-call-merge-cache.yaml) | SQL enrichment, merge + cache | | [18-sql-call-append-query-file.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/18-sql-call-append-query-file.yaml) | SQL enrichment, append + external query file | | [42-soap-call-enrichment.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/42-soap-call-enrichment.yaml) | SOAP enrichment, append + cache | ## HTTP output specifics [#http-output-specifics] | File | Covers | | ----------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------- | | [19-http-output-single-template.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/19-http-output-single-template.yaml) | Single-record mode + URL template | | [20-http-output-retry-auth-api-key.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/20-http-output-retry-auth-api-key.yaml) | Retry block + API key auth | ## Database output [#database-output] | File | Covers | | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------ | | [21-database-output-transaction-query-file.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/21-database-output-transaction-query-file.yaml) | Transactional output + external query file | ## SOAP [#soap] | File | Covers | | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------- | | [40-soap-polling-basic-v11.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/40-soap-polling-basic-v11.yaml) | SOAP 1.1 polling | | [40b-soap-polling-basic-v12.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/40b-soap-polling-basic-v12.yaml) | SOAP 1.2 polling | | [41-soap-polling-cursor.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/41-soap-polling-cursor.yaml) | SOAP cursor pagination | | [43-soap-output-batch.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/43-soap-output-batch.yaml) | SOAP batch output | | [44-soap-output-mtom-emission.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/44-soap-output-mtom-emission.yaml) | MTOM attachment emission | | [44b-soap-input-mtom-reception.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/44b-soap-input-mtom-reception.yaml) | MTOM attachment reception | | [45-soap-output-wssecurity-passwordtext.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/45-soap-output-wssecurity-passwordtext.yaml) | WS-Security PasswordText | | [45b-soap-output-wssecurity-passworddigest.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/45b-soap-output-wssecurity-passworddigest.yaml) | WS-Security PasswordDigest | ## Pipeline-wide concerns [#pipeline-wide-concerns] | File | Covers | | ----------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------- | | [22-defaults-inheritance.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/22-defaults-inheritance.yaml) | Top-level `defaults` + per-module overrides | | [23-auth-basic-bearer-query-key.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/23-auth-basic-bearer-query-key.yaml) | Bearer + basic + API key in one pipeline | | [24-empty-filter-pass-through.yaml](https://github.com/alexandrecano/cannectors/blob/main/examples/24-empty-filter-pass-through.yaml) | Empty filter chain (`filters: []`) | # AI context (/docs/get-started/ai-context) Use the raw text endpoint when an AI agent needs the full Cannectors context before generating a pipeline: [`/ai-context.txt`](/ai-context.txt) The endpoint is intentionally plain text. It includes: * Generation rules for AI agents. * The full documentation text. * An exhaustive module options summary. * The vendored Cannectors JSON Schemas. * Every maintained YAML example pipeline. Agents should treat the JSON Schemas as the authoritative contract for keys, types, required fields, enum values, defaults, and nested shapes. ## Recommended prompt [#recommended-prompt] ```text Read https://cannectors.com/ai-context.txt first. Then generate a valid Cannectors YAML pipeline for this integration: Source: - ... Transformations: - ... Destination: - ... Constraints: - Keep secrets in environment variables. - Use maintained examples as templates. - Explain which example or schema section you used. - Include the command to validate the file. ``` ## Validation loop [#validation-loop] ```bash cannectors validate pipeline.yaml cannectors run --dry-run pipeline.yaml ``` # Your First Pipeline (/docs/get-started/first-pipeline) In this guide, you'll write a pipeline that polls an HTTP API on a schedule, maps a few fields, drops records you don't care about, and POSTs the rest to another endpoint. Everything stays local until you're ready to point it at real systems. ## Scenario [#scenario] You're consuming an `/api/orders` endpoint and forwarding only paid orders to an internal `/api/orders/import` endpoint. Both endpoints require a Bearer token, and you want a 15-minute schedule with retries on 5xx responses. ## Create the pipeline file [#create-the-pipeline-file] Create a new file `sync-orders.yaml`. Pipelines need four top-level keys: `input`, `filters`, `output`, plus a `name` for logs. ```yaml title="sync-orders.yaml" name: sync-orders version: 1.0.0 description: Forward paid orders to the internal import API. ``` ## Wire the input [#wire-the-input] `httpPolling` calls a URL on a schedule and reads records from a JSON field in the response. ```yaml title="sync-orders.yaml" input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders authentication: type: bearer credentials: token: ${SOURCE_BEARER_TOKEN} ``` Notable parts: * **`schedule`** is a 5-field CRON expression. Without it, the pipeline runs once and exits. * **`dataField`** is the JSON key in the response that holds the array of records (here, `{ "orders": [...] }`). * **`${SOURCE_BEARER_TOKEN}`** is read from your shell environment at startup — secrets never live in the YAML. ## Add filters [#add-filters] Filters run in declared order. Two are enough here: 1. **`mapping`** — flatten the `customer.email` nested field and lowercase it. 2. **`condition`** — drop anything not paid. ```yaml title="sync-orders.yaml" filters: - type: mapping mappings: - source: order_id target: id - source: customer.email target: email transforms: - op: lowercase - type: condition expression: "status == 'paid'" else: - type: drop ``` The absent `then` branch keeps paid records unchanged. The `else` branch explicitly drops every non-paid record, so the output never sees it. ## Wire the output [#wire-the-output] `httpRequest` POSTs records to an endpoint, in batch (one request for the whole set) or single (one request per record). For an internal import API, batch is the right shape. ```yaml title="sync-orders.yaml" output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch authentication: type: bearer credentials: token: ${DESTINATION_BEARER_TOKEN} retry: maxAttempts: 3 delayMs: 500 backoffMultiplier: 2 maxDelayMs: 5000 retryableStatusCodes: [429, 500, 502, 503, 504] ``` The retry block honors `Retry-After` automatically and backs off exponentially with a jittered base of 500 ms. ## Final file [#final-file] Put it all together: ```yaml title="sync-orders.yaml" name: sync-orders version: 1.0.0 description: Forward paid orders to the internal import API. input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders authentication: type: bearer credentials: token: ${SOURCE_BEARER_TOKEN} filters: - type: mapping mappings: - source: order_id target: id - source: customer.email target: email transforms: - op: lowercase - type: condition expression: "status == 'paid'" else: - type: drop output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch authentication: type: bearer credentials: token: ${DESTINATION_BEARER_TOKEN} retry: maxAttempts: 3 delayMs: 500 backoffMultiplier: 2 maxDelayMs: 5000 retryableStatusCodes: [429, 500, 502, 503, 504] ``` ## Validate it [#validate-it] Before running, validate the YAML against the schema: ```bash cannectors validate sync-orders.yaml ``` This catches typos, missing required fields, and invalid types. ## Dry-run it [#dry-run-it] Set the environment variables, then preview what would be sent: ```bash export SOURCE_BEARER_TOKEN=… export DESTINATION_BEARER_TOKEN=… cannectors run --dry-run sync-orders.yaml ``` You'll see the input fetch, the mapping pass, the condition filter, and a preview of the batch that would be POSTed. `--dry-run` executes the input and filters for real (so it hits the source API), but stops before the output side effects. Use it on staging data before you point a fresh pipeline at production endpoints. ## Run it for real [#run-it-for-real] Remove `--dry-run`: ```bash cannectors run sync-orders.yaml ``` Because the input has a `schedule`, the process stays alive and triggers every 15 minutes. Use Ctrl-C (or your container runtime's stop signal) to shut it down cleanly. ## Where to go next [#where-to-go-next] # Installation (/docs/get-started/installation) Cannectors is a single static Go binary. There's no runtime to install, no Python virtualenv, no Docker required. Pick the option that fits your setup. ## Requirements [#requirements] * **Go 1.25+** if you want to build from source * **A POSIX shell** for the commands below (a recent PowerShell works too) * Optional: **`make`** if you want to use the included Makefile targets ## Option 1 — `go install` [#option-1--go-install] The fastest way if you already have Go set up: ```bash go install github.com/alexandrecano/cannectors/cmd/cannectors@latest ``` This drops a `cannectors` binary into `$GOPATH/bin` (or `$HOME/go/bin`). Make sure that directory is on your `PATH`. ## Option 2 — Build from source [#option-2--build-from-source] Clone the repository and build a binary into the current directory: ```bash git clone https://github.com/alexandrecano/cannectors.git cd cannectors go build -o cannectors ./cmd/cannectors ``` Or, if you have `make`: ```bash make build # builds into ./bin/cannectors ``` The build is fully static and has no link-time dependencies — you can copy the resulting binary into a container image or a CI runner without installing anything else. ## Cross-compilation [#cross-compilation] Build for a different OS or architecture from the same machine: ```bash make build-linux # → ./bin/cannectors-linux-amd64 make build-darwin # → ./bin/cannectors-darwin-arm64 make build-windows # → ./bin/cannectors-windows-amd64.exe make build-all # all of the above ``` ## Verify the install [#verify-the-install] ```bash cannectors version ``` You should see the build version and Go runtime printed. If you get `command not found`, double-check that `$GOPATH/bin` (or the directory holding your binary) is on your `PATH`. Cannectors is currently distributed as source only. Pre-built binaries and container images are planned but not published yet — track progress in [issue #1](https://github.com/alexandrecano/cannectors/issues). ## Next [#next] # Next Steps (/docs/get-started/next-steps) You've got the binary installed, run a maintained example, and written your own pipeline. Here's where to go next, depending on what you're trying to do. ## Build the right mental model [#build-the-right-mental-model] Cannectors has a small, fixed shape (input → filters → output) and a handful of cross-cutting concerns (auth, retries, state, scheduling). The **Concepts** section walks through them. ## Reach for a specific module [#reach-for-a-specific-module] The **Modules** reference covers every input, filter, and output with all their options, defaults, and pitfalls — generated from the same JSON Schemas that the binary uses to validate. ## Learn the CLI [#learn-the-cli] The CLI is small — `validate`, `run`, `version` — but the flags matter. ## Crib from real pipelines [#crib-from-real-pipelines] The `examples/` directory contains 33 maintained YAML pipelines covering every module and most combinations. Each is small, validated, and documented. ## Run it in production [#run-it-in-production] Cannectors is one process. Use whatever process manager you already use (`systemd`, Kubernetes, ECS, Nomad, your CI runner). The Operations section covers the patterns that matter at scale. ## Contribute [#contribute] Cannectors is Apache 2.0. The codebase is small and well-organized — adding a new module or transform is a matter of implementing one interface. # Quick Start (/docs/get-started/quick-start) Cannectors ships with 33 maintained example pipelines under `examples/`. They are validated by the test suite and ready to run. This page walks you through running one without touching the YAML. ## Prerequisites [#prerequisites] You should already have the `cannectors` binary on your `PATH`. If not, see [Installation](/docs/get-started/installation). ## Step-by-step [#step-by-step] ### Clone the examples [#clone-the-examples] The example files live in the main Cannectors repository: ```bash git clone https://github.com/alexandrecano/cannectors.git cd cannectors ``` ### Validate the example [#validate-the-example] Validate the pipeline against the JSON Schema. This catches typos, missing required fields, and invalid module options before anything runs. ```bash cannectors validate ./examples/01-http-polling-basic-to-http-batch.yaml ``` You should see: ``` ✓ pipeline is valid ``` Add `--verbose` to print the parsed pipeline structure. Add `--quiet` to suppress the success banner (useful in CI). ### Preview with `--dry-run` [#preview-with---dry-run] Run the pipeline in **dry-run mode**. Cannectors will execute everything up to the output stage, then print a preview of what would be sent — without actually calling the destination. ```bash cannectors run --dry-run ./examples/01-http-polling-basic-to-http-batch.yaml ``` You'll see the trace for each stage (input → filters → output) plus a preview of the batch that would be POSTed. This is the safest way to test a pipeline against a real source. ### Run for real [#run-for-real] Remove `--dry-run` to actually call the output: ```bash cannectors run ./examples/01-http-polling-basic-to-http-batch.yaml ``` If the pipeline has a `schedule`, the process stays alive and re-runs on CRON. If not, it runs once and exits. The maintained examples target `https://source.example.com` and `https://destination.example.com` — placeholder hosts. You'll want to edit the endpoints or copy the file before running it for real. ## What just happened [#what-just-happened] The pipeline you ran is shaped like every Cannectors pipeline: one input, zero or more filters, one output. ```yaml input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch ``` The input polls the source on CRON, parses `orders` out of the JSON response, hands each record to the filter chain (here, empty), and the output batches everything into one POST. ## Next [#next] # Modules (/docs/modules) A pipeline plugs **one input**, **zero or more filters**, and **one output** into a fixed shape. The available modules in each slot are listed below. Click through for the exhaustive option reference, generated directly from the JSON Schema the binary uses to validate. ## Shared concepts [#shared-concepts] Most HTTP-shaped modules (`httpPolling`, `http_call`, `httpRequest`) share a common base. SOAP modules (`soapPolling`, `soap_call`, `soapRequest`) share raw XML body templating, SOAP version handling, MTOM, and WS-Security. Most SQL-shaped modules (`database` input, `sql_call`, `database` output) share another. Cross-cutting concerns live in their own pages: # Cross-platform builds (/docs/operations/cross-platform-builds) Cannectors is a pure-Go binary — no cgo, no SQLite dynamic links, no locale tables baked in. That means you can build for any Go-supported OS/arch from any other host with a single `go build` invocation, or the Makefile targets that wrap it. ## Makefile targets [#makefile-targets] ```bash make build # current host make build-linux # → bin/cannectors-linux-amd64 make build-darwin # → bin/cannectors-darwin-arm64 make build-windows # → bin/cannectors-windows-amd64.exe make build-all # all three ``` All four produce stripped, statically-linked binaries suitable for dropping into a container image or copying to a fresh host. ## Going manual [#going-manual] If you want a target outside the Makefile (e.g. `linux/arm64` for a Raspberry Pi or AWS Graviton): ```bash GOOS=linux GOARCH=arm64 \ go build -trimpath -ldflags="-s -w" \ -o bin/cannectors-linux-arm64 \ ./cmd/cannectors ``` `-trimpath` strips local filesystem paths from the binary, `-s -w` strip the symbol table and DWARF info. Together they cut the binary size roughly in half. ## Supported targets [#supported-targets] The toolchain supports anything Go does — see `go tool dist list`. Tested in CI: | OS | Arch | Notes | | --------- | ------- | --------------------------------------------- | | `linux` | `amd64` | Primary platform for production. | | `linux` | `arm64` | Graviton, Raspberry Pi 4+, Apple-on-Linux CI. | | `darwin` | `amd64` | Intel Mac dev machines. | | `darwin` | `arm64` | Apple Silicon dev machines. | | `windows` | `amd64` | Tested manually. | ## Static link guarantees [#static-link-guarantees] The binary doesn't depend on: * libc (CGO\_ENABLED=0 by default) * the system OpenSSL (Go ships its own TLS stack) * the system timezone data (`time.LoadLocation` bundles tzdata) * a SQLite shared library (modernc.org/sqlite is pure Go) You can copy `bin/cannectors-linux-amd64` into a `scratch` container image and it will run. No `apt install` step. ## Container images [#container-images] A minimal Dockerfile: ```dockerfile FROM scratch COPY bin/cannectors-linux-amd64 /usr/local/bin/cannectors ENTRYPOINT ["/usr/local/bin/cannectors"] CMD ["version"] ``` For TLS connections (any HTTP-shaped input/output/filter), you'll need CA certificates: ```dockerfile FROM alpine:3.20 AS certs RUN apk add --no-cache ca-certificates FROM scratch COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY bin/cannectors-linux-amd64 /usr/local/bin/cannectors ENTRYPOINT ["/usr/local/bin/cannectors"] ``` Built image size: \~15 MB. ## See also [#see-also] # Operations (/docs/operations) Cannectors is one Go binary. Everything below is about how to run it reliably — process supervisors, durable state, secret injection, log shipping, cross-platform builds. # Logging (/docs/operations/logging) The runtime emits structured logs to stdout by default. Use `--log-file` to redirect, or let your container platform pick stdout up automatically. ## Format [#format] Line-delimited JSON, one event per line. ```json {"time":"2026-04-21T12:34:56Z","level":"info","pipeline":"sync-orders","module":"httpPolling","msg":"fetched","records":128,"cursor":"eyJpZCI6MTI4fQ"} {"time":"2026-04-21T12:34:57Z","level":"info","pipeline":"sync-orders","module":"mapping","msg":"processed","records_in":128,"records_out":128,"dropped":0} {"time":"2026-04-21T12:34:58Z","level":"warn","pipeline":"sync-orders","module":"http_call","msg":"cache miss","key":"cust_42"} ``` Standard fields on every event: | Field | Meaning | | ---------- | -------------------------------------------------- | | `time` | RFC 3339 UTC. | | `level` | `debug`, `info`, `warn`, `error`. | | `pipeline` | The pipeline's `name`. | | `module` | The module type that emitted the event. | | `msg` | Short human label. Stable enough to use in alerts. | Plus module-specific fields (record counts, cursor values, HTTP status, etc.). ## Levels [#levels] | Level | When | | ------- | ------------------------------------------------------------------------- | | `debug` | Per-record traces. Only emitted with `--verbose`. | | `info` | Stage start/end, batch totals, cache hits, scheduled-tick fires. Default. | | `warn` | Retry attempts, soft failures, deprecated YAML fields. | | `error` | Stage failed past `onError`. The runtime is about to abort. | `error` events always include the root cause and the record index that triggered it. ## Shipping [#shipping] Pipe stdout into your aggregator: | Aggregator | Pattern | | ------------- | -------------------------------------------- | | Loki | Promtail tailing the container stdout | | Datadog | Datadog Agent autodiscovery on the container | | CloudWatch | ECS/EKS log driver, no extra config | | Vector | Sidecar tailing `/var/log/cannectors/*.log` | | OpenTelemetry | `otelcol` with the `filelog` receiver | All of them parse line-delimited JSON without further work — point them at the stream and the fields land as labels/attributes. ## What to monitor [#what-to-monitor] Alert on: * **`level=error`** — any error event is worth a page. Cannectors doesn't recover from them automatically. * **`msg=retry`** at high cardinality — sustained retries usually mean the destination is degrading. * **No `msg=tick` event in N CRON intervals** — the scheduler is stuck or the process is dead. Don't alert on every `level=warn` — they cover normal operating conditions (transient retries, cache misses, etc.). ## Local dev [#local-dev] `--verbose` adds per-record trace lines and is the right mode for debugging a pipeline locally. `--quiet` flips it the other way — nothing on success. ```bash cannectors run --verbose pipeline.yaml | jq -c '.' ``` `jq -c` keeps the line-delimited format but pretty-prints individual events as you watch them roll by. ## See also [#see-also] # Scheduling in production (/docs/operations/scheduling-in-production) `cannectors run` is a single long-running process. Use whatever process supervisor you already use to keep it alive, restart on crash, and stop it cleanly on shutdown. ## Single-replica rule [#single-replica-rule] The runtime fires on every CRON tick of its own scheduler. Two copies of the same scheduled pipeline both fire on the same tick → records get double-processed. Run **exactly one replica** of any given scheduled pipeline: * Kubernetes: `Deployment` with `replicas: 1` and a `StrategyType: Recreate` (not RollingUpdate — that briefly runs two pods). * ECS / Fly.io: 1 task per pipeline. * systemd: 1 unit per pipeline. For high availability, use the supervisor's restart mechanism rather than running multiple replicas. The runtime persists state on every successful batch, so a restart resumes where it left off. ## systemd [#systemd] ```ini title="/etc/systemd/system/cannectors-orders.service" [Unit] Description=Cannectors — orders sync After=network-online.target [Service] Type=simple User=cannectors WorkingDirectory=/var/lib/cannectors EnvironmentFile=/etc/cannectors/orders.env ExecStart=/usr/local/bin/cannectors run /etc/cannectors/orders.yaml Restart=on-failure RestartSec=10s KillSignal=SIGTERM TimeoutStopSec=30s [Install] WantedBy=multi-user.target ``` Key points: * **`EnvironmentFile`** holds secrets — outside the YAML, outside the service file. See [Secrets management](/docs/operations/secrets-management). * **`KillSignal=SIGTERM`** — Cannectors handles SIGTERM gracefully. * **`TimeoutStopSec=30s`** — give the runtime time to finish the current batch and persist state before systemd sends SIGKILL. * **`Restart=on-failure`** — restart on exit code 3 (runtime error), not on exit code 1/2 (YAML problem you have to fix anyway). ## Kubernetes [#kubernetes] ```yaml title="orders-sync.yaml" apiVersion: apps/v1 kind: Deployment metadata: name: orders-sync spec: replicas: 1 strategy: type: Recreate selector: matchLabels: { app: orders-sync } template: metadata: labels: { app: orders-sync } spec: terminationGracePeriodSeconds: 60 containers: - name: cannectors image: ghcr.io/your-org/cannectors:v0.1.0 args: ["run", "/etc/cannectors/orders.yaml"] envFrom: - secretRef: name: orders-sync-secrets volumeMounts: - name: state mountPath: /state - name: config mountPath: /etc/cannectors volumes: - name: state persistentVolumeClaim: claimName: orders-sync-state - name: config configMap: name: orders-sync-config ``` Key points: * **`Recreate` strategy** avoids running two pods during a rollout. * **`terminationGracePeriodSeconds: 60`** — at least as long as your longest expected batch. * **PersistentVolumeClaim** for `state` — without it, restarting the pod loses the cursor and the next batch re-fetches everything. ## CronJob is the wrong shape [#cronjob-is-the-wrong-shape] A Kubernetes `CronJob` looks tempting, but it doesn't fit Cannectors: * Cannectors does its own CRON scheduling. * `CronJob` would spawn a new process per tick — the runtime would re-validate, re-acquire OAuth tokens, re-open DB pools every time. * State persistence works across ticks **within the same process**; starting fresh each time loses warm caches and connection pools. Use a `Deployment` and let the runtime tick itself. ## Webhook inputs [#webhook-inputs] Webhook inputs are long-running HTTP listeners with no `schedule`. Same deployment patterns apply, except expose a `Service` (or ingress) on the port `listenAddress` binds to. ```yaml input: type: webhook path: /webhooks/orders listenAddress: "0.0.0.0:8080" ``` Set a `livenessProbe` / `readinessProbe` against the listen address — Cannectors handles `GET /healthz` out of the box. ## See also [#see-also] # Secrets management (/docs/operations/secrets-management) Cannectors reads every secret from environment variables via the `${VAR}` syntax — see [Environment variables](/docs/concepts/env-vars) for the resolution rules. This page is about wiring those vars into your deployment without leaking them. ## Sources of truth [#sources-of-truth] | Where the secret lives | Loader | | ------------------------ | --------------------------------------------------------------- | | HashiCorp Vault | `vault agent` → env file | | AWS Secrets Manager | `aws secretsmanager get-secret-value` in entrypoint, or sidecar | | GCP Secret Manager | Workload Identity + secret accessor, or sidecar | | Kubernetes Secret | `envFrom: secretRef:` on the pod | | systemd | `EnvironmentFile=` pointing to a chmod-600 file | | 1Password CLI (dev only) | `op run --env-file=…` wrapping `cannectors run` | Pick whichever your platform already uses. Cannectors itself doesn't talk to any secret store directly — it only consumes `os.Environ()`. ## What lives where [#what-lives-where] | Type | Storage | YAML reference | | ------------------------------ | ----------------------- | -------------------------------------------------- | | OAuth2 client ID/secret | Secret store | `${OAUTH_CLIENT_ID}`, `${OAUTH_CLIENT_SECRET}` | | Bearer tokens | Secret store | `${SOURCE_BEARER_TOKEN}` | | API keys | Secret store | `${SOURCE_API_KEY}` | | Database URLs (incl. password) | Secret store | `${WAREHOUSE_DATABASE_URL}` | | HMAC webhook secrets | Secret store | `${WEBHOOK_HMAC_SECRET}` | | Endpoint hostnames | ConfigMap / config file | hard-coded in the YAML or via `${SOURCE_BASE_URL}` | | CRON schedules | YAML | hard-coded | Default to env vars for anything sensitive. Hard-code the rest. ## systemd EnvironmentFile [#systemd-environmentfile] ```bash title="/etc/cannectors/orders.env" # permissions: chmod 600, owner: cannectors SOURCE_BEARER_TOKEN=eyJhbGciOiJIUzI1... DESTINATION_BEARER_TOKEN=eyJ... WAREHOUSE_DATABASE_URL=postgres://user:pass@host/db ``` ```ini title="orders.service" [Service] EnvironmentFile=/etc/cannectors/orders.env ExecStart=/usr/local/bin/cannectors run /etc/cannectors/orders.yaml ``` `chmod 600` on the env file. `cannectors` user, no group read. ## Kubernetes Secret [#kubernetes-secret] ```yaml apiVersion: v1 kind: Secret metadata: name: orders-sync-secrets type: Opaque stringData: SOURCE_BEARER_TOKEN: eyJhbGciOiJIUzI1... WAREHOUSE_DATABASE_URL: postgres://... ``` ```yaml # In the Deployment spec spec: containers: - name: cannectors envFrom: - secretRef: name: orders-sync-secrets ``` For real production, store the Secret content in your secret manager (External Secrets Operator, Sealed Secrets, Vault Secrets Operator) rather than committing it to git. ## Redaction [#redaction] The runtime never logs resolved credentials. `cannectors validate --verbose` also redacts: ``` authentication: type: bearer credentials: token: [redacted] ``` If you ever see a literal token in logs, that's a bug — open an issue. ## Rotation [#rotation] Cannectors reads env vars **once at startup**. After rotating a secret in your store, restart the process to pick it up: ```bash sudo systemctl restart cannectors-orders # or kubectl rollout restart deployment/orders-sync ``` Both honor `Recreate` semantics, so the new replica starts with the new env, and the old one stops cleanly. ## See also [#see-also] # State management (/docs/operations/state-management) State persistence keeps a small per-pipeline JSON file on disk. The input reads it at startup and writes it after every successful batch — see [State persistence](/docs/concepts/state-persistence) for the YAML config. This page is about operating it. ## Storage location [#storage-location] | Environment | Recommended `storagePath` | | ----------------------------- | ------------------------------------------------------------ | | Local dev | `./.cannectors-state` (add to `.gitignore`) | | Single host, systemd | `/var/lib/cannectors/state` (owned by the `cannectors` user) | | Kubernetes | A `PersistentVolumeClaim` mounted at e.g. `/state` | | Fly.io / Railway | An attached volume mounted at `/state` | | Container with ephemeral disk | **Don't.** State must survive container replacement. | If the storage disappears between runs, the pipeline restarts from scratch — possibly re-processing already-shipped records. ## What the file looks like [#what-the-file-looks-like] One file per pipeline, named after `pipeline.name`: ``` /var/lib/cannectors/state/sync-orders.json ``` ```json { "id": 4821, "timestamp": "2026-04-21T12:34:56Z" } ``` Only the cursor types your `statePersistence` config enables show up. Don't hand-edit while the pipeline is running. ## Backups [#backups] The state file is small (\< 1 KB). Snapshot it with whatever you use for the rest of the volume: ```bash # Linux, daily sudo cp /var/lib/cannectors/state/*.json /backup/cannectors-state/$(date +%F)/ ``` For Kubernetes, a `VolumeSnapshot` on the PVC works the same way. Restoring a state file is just dropping it back in place — the next run reads it. ## Backfills [#backfills] To re-process everything from scratch, delete the state file: ```bash sudo systemctl stop cannectors-orders sudo rm /var/lib/cannectors/state/sync-orders.json sudo systemctl start cannectors-orders ``` To re-process from a specific point, seed the state file: ```bash sudo systemctl stop cannectors-orders sudo tee /var/lib/cannectors/state/sync-orders.json <<'EOF' { "timestamp": "2026-01-01T00:00:00Z" } EOF sudo chown cannectors:cannectors /var/lib/cannectors/state/sync-orders.json sudo systemctl start cannectors-orders ``` ## Monitoring [#monitoring] Two things are worth alerting on: * **State file age** — if the file hasn't been touched in N CRON ticks, the pipeline isn't making progress. `stat -c %Y state-file.json` gives the mtime in seconds. * **State file growth** — if the cursor value isn't advancing, the source might be returning empty pages even though new data exists. Diff between two snapshots tells you. ## Idempotence at the destination [#idempotence-at-the-destination] State persistence is best-effort restart-safety, not exactly-once delivery. After a crash mid-batch, the runtime re-fetches and re-sends some records that the destination already saw. Design your destination to handle that — `INSERT … ON CONFLICT DO UPDATE`, idempotency keys, or a dedup layer. ## See also [#see-also] # Local test lab (/docs/operations/test-lab) The cannectors repository ships a **local test lab** under `test-lab/`. It spins up a Postgres + WireMock stack via Docker Compose, loads fixtures, and gives you a deterministic sandbox to run pipelines against — no calls to real external services. ## Why it exists [#why-it-exists] `--dry-run` still hits the real input (its source API, your database). That's great for a one-off smoke test, less great for: * CI that needs to be deterministic * Pipelines that aren't built yet (the source doesn't exist) * Reproducing a bug with a specific record shape The test lab solves all three by mocking the source and the destination behind URLs your pipelines can point at. ## Layout [#layout] ``` cannectors/test-lab/ ├── docker-compose.yml postgres + wiremock ├── pipelines/ example pipelines pointing at the stack ├── scenarios/ named scenarios with expected outputs ├── wiremock/__files/ response bodies served by WireMock ├── wiremock/mappings/ WireMock request → response rules ├── postgres/ init SQL for the source/dest databases └── run.py scenario runner ``` ## Bringing the stack up [#bringing-the-stack-up] ```bash cd cannectors make test-lab-up # docker compose up -d, waits for healthchecks ``` After \~10 seconds, WireMock is at `http://localhost:8080` and Postgres is at `postgres://lab:lab@localhost:5432/lab`. ## Running a scenario [#running-a-scenario] ```bash make test-lab-run # runs every scenario under test-lab/scenarios/ ``` Each scenario: 1. Resets WireMock mappings and Postgres tables to a known state. 2. Runs a specific pipeline from `test-lab/pipelines/`. 3. Checks the result against expected output (records in Postgres, requests in WireMock's journal). The runner is `test-lab/run.py` — a Python script, \~500 lines. Reading the scenarios is the fastest way to learn the patterns. ## Tearing it down [#tearing-it-down] ```bash make test-lab-down # docker compose down -v ``` The `-v` flag drops volumes too, so the next `make test-lab-up` starts clean. ## CI usage [#ci-usage] In CI, run the lab as part of the integration step: ```yaml - run: make test-lab-up - run: make test-lab-run - run: make test-lab-down if: always() ``` The compose stack starts under 15s on a warm runner. Total scenario time is a few seconds. ## See also [#see-also] # drop (/docs/modules/filters/drop) The `drop` filter discards every record it sees. It has **no options**. It's almost always placed inside a `condition` branch — the condition selects which records get dropped. ## Minimal example [#minimal-example] Drop cancelled orders: ```yaml filters: - type: condition expression: "status == 'cancelled'" then: - type: drop ``` ## How "drop" differs from `remove` [#how-drop-differs-from-remove] | Filter | What it drops | | -------- | ---------------------------------------------------------------- | | `drop` | The whole record. Downstream stages never see it. | | `remove` | One or more fields from the record. The record itself continues. | They're complementary, not interchangeable. ## When to use it alone [#when-to-use-it-alone] Putting `drop` outside a `condition` is technically valid — it'll drop every record. That's a useful debugging shortcut for "execute the input and filters but produce no output", but it's mostly equivalent to `--dry-run` (which also avoids the network round-trip on the output side). ```yaml filters: - type: drop # nothing reaches the output ``` For production use, always pair `drop` with a `condition`. ## Cross-references [#cross-references] # Filters (/docs/modules/filters) Filters run **in declared order** between the input and the output. Each filter sees one record at a time and can mutate it, drop it, branch on it, or enrich it from an external system. ## Picking the right filter [#picking-the-right-filter] | You want to … | Reach for | | ------------------------------------- | ------------------------------------ | | Rename or normalize a field | `mapping` | | Set a fixed value on every record | `set` | | Get rid of a field | `remove` | | Drop the record entirely | `drop` (usually under a `condition`) | | Route records by a boolean expression | `condition` | | Walk an array field on each record | `loop` | | Look something up from an HTTP API | `http_call` | | Look something up from a SOAP API | `soap_call` | | Look something up from a SQL database | `sql_call` | | Run arbitrary JavaScript | `script` | See [Input → Filters → Output](/docs/concepts/input-filter-output) for how filter order affects record flow. # remove (/docs/modules/filters/remove) The `remove` filter deletes fields from each record. It accepts either a single path or an array of paths. Both top-level fields and nested paths (`debug.internal.notes`) are supported. ## Minimal example [#minimal-example] ```yaml filters: - type: remove target: debug ``` ## Options [#options] ## Multiple fields [#multiple-fields] `target` accepts an array of paths. The schema requires at least one non-empty entry: ```yaml - type: remove target: - debug - internal.notes - metadata.audit ``` ## When the path is missing [#when-the-path-is-missing] Removing a field that doesn't exist is a **no-op** — no error, the filter just continues. This is useful for "cleanup" filters that drop optional fields without caring whether each record actually has them. ## Common patterns [#common-patterns] ### Stripping internals before output [#stripping-internals-before-output] Most pipelines accumulate temporary fields during enrichment (cache keys, internal IDs, debug data). Use `remove` right before the output to keep the output clean: ```yaml filters: - type: http_call endpoint: … # adds many fields, some of which we don't want to ship - type: remove target: - _response - _cache_key - internal ``` ### Reset PII for non-production destinations [#reset-pii-for-non-production-destinations] ```yaml - type: remove target: - customer.full_name - customer.phone - customer.address ``` ## Cross-references [#cross-references] # script (/docs/modules/filters/script) The `script` filter runs a JavaScript function on every record. It uses [Goja](https://github.com/dop251/goja), a pure-Go ECMAScript 5.1+ implementation — no `eval`, no Node.js APIs, no I/O. Just record → record. ## Minimal example [#minimal-example] ```yaml filters: - type: script script: | function transform(record) { record.processed = true; return record; } ``` ## Options [#options] Provide **exactly one** of `script` or `scriptFile` — never both. The schema enforces this. ## The `transform` function [#the-transform-function] The runtime calls `transform(record)` once per record. The returned value must be an object. Returning `null`, `undefined`, or a primitive is an execution error, handled according to `onError`. To **drop** a record from a script, throw — the runtime catches and applies `onError`: ```yaml filters: - type: script onError: skip script: | function transform(record) { if (!record.email) { throw new Error('missing email'); } return record; } ``` ## File-backed scripts [#file-backed-scripts] For non-trivial logic, factor the script out into a `.js` file: ```yaml filters: - type: script scriptFile: ./scripts/normalize-customer.js ``` ```js title="scripts/normalize-customer.js" function transform(record) { if (record.email) { record.email = record.email.trim().toLowerCase(); } if (record.country) { record.country = record.country.toUpperCase(); } record.normalized_at = new Date().toISOString(); return record; } ``` ## What's available [#whats-available] Inside the script: * `record` is a plain JavaScript object — read and mutate freely. * Standard ES5+ APIs: `JSON.stringify/parse`, `Math.*`, `Date.*`, string methods, array methods. * `console.error`, `console.warn`, `console.info`, `console.log`, and `console.debug` are available and routed to the Cannectors logger. * **No** Node.js APIs (`fs`, `http`, `require`) and no network or file I/O. Each record gets a fresh script execution. There's no shared state between records. The script filter is the most expensive filter per record. Reach for `mapping`, `condition`, `set`, or `remove` first — they're orders of magnitude faster. Use `script` when the logic genuinely doesn't fit into those. ## Examples [#examples] ## Cross-references [#cross-references] # set (/docs/modules/filters/set) The `set` filter writes a literal value to a target path on every record. If the path includes nested objects that don't exist yet, they're created. ## Minimal example [#minimal-example] ```yaml filters: - type: set target: metadata.source value: cannectors ``` After this filter, every record carries `metadata.source = "cannectors"`. ## Options [#options] ## Type preservation [#type-preservation] The YAML type of `value` is preserved — strings stay strings, numbers stay numbers, booleans stay booleans, `null` stays `null`. Use YAML quoting if you need an explicit string: ```yaml - type: set target: version value: "1.0" # string "1.0", not number 1.0 - type: set target: count value: 0 # number 0 - type: set target: tags value: ["api", "internal"] # array ``` ## Creating nested paths [#creating-nested-paths] Writing to a nested path creates intermediate objects automatically: ```yaml - type: set target: metadata.audit.source value: cannectors ``` Even if the record had no `metadata` key, the filter creates `metadata: { audit: { source: "cannectors" } }`. ## When to use `set` vs `mapping` [#when-to-use-set-vs-mapping] | Need | Reach for | | ------------------------------------------------------ | --------- | | Set a literal value (no transform) | `set` | | Map a source field to a target, optionally transformed | `mapping` | | Drop a field | `remove` | `set` is the cheapest filter — no expressions, no transforms, no lookups. Use it freely for tagging, marking, or stamping. ## Cross-references [#cross-references] # Inputs (/docs/modules/inputs) Every pipeline has **exactly one input**. It produces records the filter chain consumes, either on a CRON schedule (HTTP polling, SOAP polling, database) or as a long-running listener (webhook). ## What an input does [#what-an-input-does] 1. Fetches a batch of records — one HTTP response page, one SQL query result, one webhook delivery. 2. Hands the batch off to the filter chain, **one record at a time**. 3. On scheduled inputs, persists state (cursor, last ID, timestamp) between runs so the next batch picks up where the previous one left off. See [Input → Filters → Output](/docs/concepts/input-filter-output) for the full record-flow model. # database (output) (/docs/modules/outputs/database) The `database` output executes a SQL query for each record (or all of them inside one transaction). Placeholders bound as positional parameters, optional transactional wrapping, prepared statements reused across records. ## Minimal example [#minimal-example] ```yaml output: type: database connectionStringRef: ${WAREHOUSE_DATABASE_URL} query: | INSERT INTO orders (id, email, amount) VALUES ({{record.id}}, {{record.email}}, {{record.amount}}) ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, amount = EXCLUDED.amount ``` ## Options [#options] ## Query or query file [#query-or-query-file] Provide one of `query` (inline) or `queryFile` (external). Inline is convenient for short statements; external files are clearer for anything beyond a one-liner. ```yaml queryFile: ./sql/upsert_order.sql ``` ```sql title="sql/upsert_order.sql" INSERT INTO orders (id, email, amount, paid_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, amount = EXCLUDED.amount, paid_at = EXCLUDED.paid_at; ``` The placeholders (`$1`, `$2`, … for PostgreSQL; `?` for MySQL/SQLite) are bound positionally from the order `{{record.x}}` appears in the query (when using `query`) or via `parameters` (when using `queryFile`). ## Transactional writes [#transactional-writes] For atomic batch writes — either all records persist or none do — wrap in a transaction: ```yaml output: type: database connectionStringRef: ${WAREHOUSE_DATABASE_URL} queryFile: ./sql/upsert_order.sql transaction: true ``` The runtime opens one transaction per output batch, executes the query once per record, then commits at the end. On any error, the transaction rolls back. ## Connection [#connection] Same as the database input — see [Connection](/docs/modules/inputs/database#connection) for driver auto-detection and connection-pool tuning. The output reuses the same pool config (`maxOpenConns`, `maxIdleConns`, etc.). ## Examples [#examples] ## Cross-references [#cross-references] # Outputs (/docs/modules/outputs) Every pipeline has **exactly one output**. It receives the records that made it through the filter chain and writes them somewhere external — an API, a SOAP service, a database table, your data warehouse. ## Batch vs single [#batch-vs-single] HTTP and SOAP outputs accept two request modes: * **`batch`** — one request carries every record. Cheaper, fewer round trips, and the natural shape for bulk import endpoints. Default. * **`single`** — one request per record. Required when the destination only accepts one record at a time, or when the URL is templated per record (`/api/customers/{{record.id}}`). Database outputs always operate per-record (one query per row), but can wrap a whole batch in a transaction with `transaction: true`. # else (/docs/modules/filters/condition/else) {/* generated by scripts/generate-module-subpages.ts */} # condition (/docs/modules/filters/condition) The `condition` filter evaluates a boolean expression against each record. Records matching the expression go through the `then` branch; non-matching records go through `else`. Both branches are themselves filter chains, so you can nest as deeply as the YAML lets you read. ## Minimal example [#minimal-example] Keep paid orders, drop everything else: ```yaml filters: - type: condition expression: "status == 'paid'" then: [] # paid records continue unchanged else: - type: drop # everything else is removed ``` ## Options [#options] ## Expression syntax [#expression-syntax] Expressions are evaluated by [`expr`](https://expr-lang.org/), the same engine the runtime uses for retry hints and HTTP success conditions. The current record is the root context — fields are accessible by name (including dot paths). Operators you'll use most: | Operator | Example | | -------------------- | -------------------------------- | | `==`, `!=` | `status == 'paid'` | | `>`, `<`, `>=`, `<=` | `amount >= 1000` | | `&&`, `\|\|`, `!` | `status == 'paid' && amount > 0` | | `in` | `region in ['EU', 'US']` | | `contains` | `email contains '@example.com'` | | `matches` | `phone matches '^\+33'` | ## Nested branches [#nested-branches] `then` and `else` are full filter arrays. Common pattern: route by field, then enrich each branch differently. ```yaml - type: condition expression: "kind == 'B2B'" then: - type: http_call endpoint: https://accounts.example.com/api/companies/{accountId} keys: - field: account_id paramType: path paramName: accountId mergeStrategy: merge else: - type: http_call endpoint: https://accounts.example.com/api/individuals/{accountId} keys: - field: account_id paramType: path paramName: accountId mergeStrategy: merge ``` ## Filtering records out [#filtering-records-out] `condition` itself never drops records — both branches just produce a modified record. To remove a record, put a `drop` filter inside the relevant branch: ```yaml - type: condition expression: "status == 'cancelled'" then: - type: drop ``` ## Examples [#examples] ## Cross-references [#cross-references] # then (/docs/modules/filters/condition/then) {/* generated by scripts/generate-module-subpages.ts */} # authentication (/docs/modules/filters/http-call/authentication) {/* generated by scripts/generate-module-subpages.ts */} # cache (/docs/modules/filters/http-call/cache) {/* generated by scripts/generate-module-subpages.ts */} # http_call (/docs/modules/filters/http-call) The `http_call` filter makes an HTTP request **per record**, then merges the response back onto the record. Supports path / query / header keys extracted from record fields, LRU + TTL caching, and three merge strategies. ## Minimal example [#minimal-example] ```yaml filters: - type: http_call endpoint: https://profiles.example.com/api/customers/{customerId} method: GET keys: - field: customer_id paramType: path paramName: customerId mergeStrategy: merge ``` ## Options [#options] ## Keys [#keys] A `key` extracts a value from the record and uses it in the outgoing request, in one of three positions: ```yaml keys: - field: customer.id # dot path on the record paramType: path # path | query | header paramName: customerId ``` | `paramType` | Effect | | ----------- | ----------------------------------------------------- | | `path` | Replaces the `{paramName}` placeholder in `endpoint`. | | `query` | Appended as `?paramName=value` to the URL. | | `header` | Sent as the HTTP header named `paramName`. | Each key is one entry — list multiple if you need more than one parameter. ## Caching [#caching] Per-record HTTP calls multiply quickly. The built-in cache (LRU with TTL) deduplicates calls that resolve to the same key. ```yaml cache: enabled: true maxSize: 1000 # default 1000 ttlSeconds: 600 # default 300 key: "{{record.customerId}}" # template ``` | Field | Meaning | | ------------ | ------------------------------------------------------------------ | | `enabled` | Master switch. | | `maxSize` | LRU capacity. Older entries are evicted past this. | | `ttlSeconds` | Per-entry TTL. | | `key` | Optional template — if omitted, the resolved URL is the cache key. | If you don't set `keys` or `cache.key`, every record resolves to the same URL and you'll observe only one HTTP request per pipeline run — because the cache has one slot for every record. To force a per-record call, configure at least one `key` or set `cache.key` to a per-record template. ## Merge strategies [#merge-strategies] What to do with the response, given that it returns alongside an already-shaped record: | Strategy | Effect | | ----------------- | ----------------------------------------------------------------------------------------------------------- | | `merge` (default) | Deep-merge response fields into the record. Nested objects are merged; response values overwrite conflicts. | | `replace` | Overlay response fields onto the record. Existing fields not present in the response are preserved. | | `append` | Store the response under `_response` (the filter has no `resultKey` of its own). | ```yaml mergeStrategy: merge # response fields overlay on the record ``` For `append`, the response sits at `_response` until you mapping/remove it explicitly. ## Examples [#examples] ## Cross-references [#cross-references] # keys (/docs/modules/filters/http-call/keys) {/* generated by scripts/generate-module-subpages.ts */} # retry (/docs/modules/filters/http-call/retry) {/* generated by scripts/generate-module-subpages.ts */} # filters (/docs/modules/filters/loop/filters) {/* generated by scripts/generate-module-subpages.ts */} # loop (/docs/modules/filters/loop) The `loop` filter walks an array field on each record and runs a nested filter chain for every item. The current item is exposed under the configured `itemName` alias; the root record stays available as `record`. Use it to flatten records that carry an array of "cells", "rows", or "lines" into top-level fields without dropping to a `script` filter. ## Minimal example [#minimal-example] Extract a `displayValue` keyed by `columnId` into a flat `record.eventId`: ```yaml filters: - type: loop field: cells itemName: cell filters: - type: condition expression: "cell.columnId == 8150579298996100" then: - type: mapping mappings: - source: cell.displayValue target: record.eventId ``` ## Options [#options] ## Scope inside the loop [#scope-inside-the-loop] Nested filters operate on a *scope* — a synthetic record built once per item: | Key | What it is | | -------------- | -------------------------------------------------------------------------------------------------- | | `record` | The root record. Writes to `record.*` mutate the original record. | | `` | The current item. Writes to `.*` are persisted back into the array at the same position. | | Parent aliases | Every active parent loop alias when loops are nested. | | `_metadata` | The root metadata. Loop iteration state lives at `_metadata.loop..index` (read-only). | The scope is built fresh per item, but `record` and `_metadata` are shared by reference — mutations are visible to subsequent items and filters after the loop. ## Loop metadata [#loop-metadata] The current index is exposed at `_metadata.loop..index`: ```yaml - type: loop field: rows itemName: row filters: - type: mapping mappings: - source: _metadata.loop.row.index target: row.position ``` `_metadata.loop` is **read-only** for nested filters. Any attempt to write under that path is rejected with `loop nested filters wrote to _metadata.loop which is read-only`, including replacement or deletion of `_metadata` itself. ## Nested loops [#nested-loops] Loops can be nested. Each inner loop adds its alias to the scope and keeps every parent alias available: ```yaml - type: loop field: cells itemName: cell filters: - type: loop field: cell.children itemName: x filters: - type: condition expression: >- _metadata.loop.cell.index == 2 && _metadata.loop.x.index == 0 then: - type: mapping mappings: - source: x.label target: record.first ``` Constraints on `itemName`: * Cannot be `record`, `_metadata`, or `loop` (reserved scope keys). * Cannot duplicate an active parent loop alias. ## Item removal and expansion [#item-removal-and-expansion] | Nested filters return … | Effect on the array | | ------------------------ | ------------------------------------------------------ | | Zero records for an item | Item is removed from the array. | | Exactly one record | Item is updated with the result. | | More than one record | Pipeline fails — item expansion is out of scope in v1. | Use a nested `condition` + `drop` to remove items conditionally: ```yaml - type: loop field: items itemName: item filters: - type: condition expression: "item.keep == false" then: - type: drop ``` ## Non-object items [#non-object-items] Scalar / array / `null` items pass through unchanged when nested filters don't touch the alias. Sub-path writes such as `item.foo = ...` on a non-object item are **rejected** to prevent silent map auto-creation by the path engine. ## Examples [#examples] ## Cross-references [#cross-references] # mapping (/docs/modules/filters/mapping) The `mapping` filter rewrites records: it reads a value from a source path, optionally runs it through a chain of transforms (trim, lowercase, dateFormat, …), and writes it to a target path. ## Minimal example [#minimal-example] ```yaml filters: - type: mapping mappings: - source: order_id target: id - source: customer.email target: email transforms: - op: lowercase ``` ## Options [#options] ## Field mappings [#field-mappings] Each entry in `mappings` is a `fieldMapping` — see the [mappings option page](/docs/modules/filters/mapping/mappings) for the full per-item shape. ### `onMissing` [#onmissing] What to do when the source path is absent on the record: | Value | Behavior | | ------------------- | ------------------------------------------------------ | | `setNull` (default) | Set target to `null`. | | `skipField` | Leave the target alone — don't write anything. | | `useDefault` | Use `defaultValue`. Requires `defaultValue` to be set. | | `fail` | Abort the pipeline (or trigger `onError`). | ```yaml - source: customer.region target: region onMissing: useDefault defaultValue: "unknown" ``` ## Transforms [#transforms] Each entry in `transforms` is one operation applied left to right on the value pulled from `source`. The shape of each `transformOp` is documented on the [mappings option page](/docs/modules/filters/mapping/mappings) (under each item's `transforms` field). The full list of `op` values: | `op` | Notes | | ---------------------------------------- | ----------------------------------------------------------- | | `trim` | `strings.TrimSpace` | | `lowercase` | `strings.ToLower` | | `uppercase` | `strings.ToUpper` | | `dateFormat` | Reformat with `format` field — accepts Go layout strings. | | `replace` | Regex-based, requires `pattern` and `replacement`. | | `split` | Split a string by `separator` into an array. | | `join` | Join an array by `separator` into a string. | | `toString`, `toInt`, `toFloat`, `toBool` | Coerce types. | | `toArray`, `toObject` | Wrap a scalar into a single-item array / single-key object. | ```yaml - source: customer.email target: contact.email transforms: - op: trim - op: lowercase - source: created_at target: createdAt transforms: - op: dateFormat format: "2006-01-02T15:04:05Z07:00" ``` ## Deleting a field [#deleting-a-field] Omit `source` to delete the target: ```yaml - target: internal_notes # field is removed from the record ``` ## Examples [#examples] ## Cross-references [#cross-references] # mappings (/docs/modules/filters/mapping/mappings) {/* generated by scripts/generate-module-subpages.ts */} # authentication (/docs/modules/filters/soap-call/authentication) {/* generated by scripts/generate-module-subpages.ts */} # cache (/docs/modules/filters/soap-call/cache) {/* generated by scripts/generate-module-subpages.ts */} # headers (/docs/modules/filters/soap-call/headers) {/* generated by scripts/generate-module-subpages.ts */} # soap_call (/docs/modules/filters/soap-call) The `soap_call` filter sends a SOAP request for each record and merges the parsed SOAP response back into that record. It supports the same cache and merge strategy model as `http_call`, plus SOAP-specific XML body templating, WS-Security, and MTOM. ## Minimal example [#minimal-example] ```yaml filters: - type: soap_call endpoint: https://soap.example.com/customers soapAction: urn:GetCustomer operation: GetCustomer body: | {{record.customerId}} dataField: Envelope.Body.GetCustomerResponse.Customer mergeStrategy: append resultKey: customer ``` ## Options [#options] ## Merge strategies [#merge-strategies] | Strategy | Effect | | ----------------- | ------------------------------------------------------------------------------------------------------------------- | | `merge` (default) | Deep-merge response fields into the current record. Nested objects are merged; response values overwrite conflicts. | | `replace` | Overlay SOAP response fields onto the current record. Existing fields not present in the response are preserved. | | `append` | Store the SOAP response under `resultKey`. | `append` requires `resultKey` so the filter has a stable destination for the SOAP result. ## Caching [#caching] Use the `cache` block to deduplicate SOAP enrichment calls that would resolve to the same result: ```yaml cache: enabled: true maxSize: 1000 ttlSeconds: 300 key: "{{record.customerId}}" ``` If `cache.key` is omitted, the resolved endpoint and operation shape are used as the cache identity. Set an explicit per-record key when the SOAP body varies by record. ## Error handling [#error-handling] `onError` uses the same strategies as other filters: | Value | Effect | | ------ | ------------------------------------------------ | | `fail` | Stop the pipeline on the first SOAP error. | | `skip` | Drop the failing record. | | `log` | Log the SOAP error and keep the original record. | SOAP faults are surfaced as SOAP errors, so downstream logs include the fault code and reason when the server returns one. ## Examples [#examples] ## Cross-references [#cross-references] # keys (/docs/modules/filters/soap-call/keys) {/* generated by scripts/generate-module-subpages.ts */} # mtom (/docs/modules/filters/soap-call/mtom) {/* generated by scripts/generate-module-subpages.ts */} # retry (/docs/modules/filters/soap-call/retry) {/* generated by scripts/generate-module-subpages.ts */} # wsSecurity (/docs/modules/filters/soap-call/ws-security) {/* generated by scripts/generate-module-subpages.ts */} # cache (/docs/modules/filters/sql-call/cache) {/* generated by scripts/generate-module-subpages.ts */} # sql_call (/docs/modules/filters/sql-call) The `sql_call` filter runs a SQL query **per record**, then merges the result back onto the record. Placeholders like `{{record.x}}` are substituted as **positional parameters** at execution time — they are never string-spliced into the SQL, so they're safe by construction. ## Minimal example [#minimal-example] ```yaml filters: - type: sql_call connectionStringRef: ${REFERENCE_DATABASE_URL} query: SELECT tier FROM customers WHERE id = {{record.customerId}} mergeStrategy: merge ``` ## Options [#options] ## Positional parameters [#positional-parameters] Every `{{record.}}` placeholder in `query` is bound as a positional parameter at execution time (`$1`, `$2`, …in PostgreSQL; `?` in MySQL/SQLite). The order matches first-appearance order in the query string. ```yaml query: | SELECT plan, mrr FROM accounts WHERE id = {{record.account_id}} AND region = {{record.region}} ``` Never wrap a `{{record.x}}` placeholder in quotes. It's already a parameter — the database will reject quoted parameters as syntax errors, or worse, treat them as literal strings. ## Connection [#connection] Same shape as the `database` input — `connectionString` or `connectionStringRef`. Use the ref form for production: ```yaml connectionStringRef: ${REFERENCE_DATABASE_URL} ``` See [database input · connection](/docs/modules/inputs/database#connection) for driver auto-detection and tuning. ## Merge strategies [#merge-strategies] | Strategy | Effect | | ----------------- | ------------------------------------------------------------------------------------------------------------- | | `merge` (default) | Deep-merge the first returned row onto the record. Nested objects are merged; SQL values overwrite conflicts. | | `replace` | Overlay the first returned row onto the record. Existing fields not present in the row are preserved. | | `append` | Store the first returned row under `resultKey`. Requires `resultKey` to be set. | ```yaml mergeStrategy: append resultKey: enrichments ``` `sql_call` currently consumes only the first row returned by the query. For one-to-many enrichment, aggregate the related data in SQL so the query returns one row with the desired value under a column. ## Caching [#caching] `sql_call` has the same cache config as `http_call` — LRU with TTL. ```yaml cache: enabled: true maxSize: 1000 ttlSeconds: 600 key: "{{record.customerId}}" ``` The same warning applies: without an explicit `cache.key` or per-record parameters, all records will hit the same cache slot. ## Examples [#examples] ## Cross-references [#cross-references] # incremental (/docs/modules/inputs/database/incremental) {/* generated by scripts/generate-module-subpages.ts */} # database (input) (/docs/modules/inputs/database) The `database` input executes a SQL query, returns one record per row, and feeds them into the filter chain. Supports cursor or limit-offset pagination, incremental queries via a tracked timestamp/ID, and CRON scheduling. ## Minimal example [#minimal-example] ```yaml input: type: database connectionStringRef: ${SOURCE_DATABASE_URL} query: SELECT id, name, email FROM customers ``` ## Options [#options] ## Connection [#connection] Provide one of `connectionString` or `connectionStringRef`. Use the **ref** form in production — it reads from the environment, keeping secrets out of the YAML. ```yaml connectionStringRef: ${SOURCE_DATABASE_URL} ``` The driver is auto-detected from the URL scheme: | Scheme | Driver | | -------------------------------------------- | ---------- | | `postgres://…` | `postgres` | | `mysql://…` or `user:pass@tcp(host:port)/db` | `mysql` | | `file:./db.sqlite` | `sqlite` | Explicitly set `driver:` if you need to override the detection. ## Pagination [#pagination] ### Limit + offset [#limit--offset] ```yaml pagination: type: limit-offset limit: 500 param: offset # placeholder name inside the SQL query: | SELECT id, payload FROM events ORDER BY id LIMIT 500 OFFSET :offset ``` ### Cursor [#cursor] ```yaml pagination: type: cursor cursorField: id # which column to use as the cursor value param: cursor # placeholder name in the SQL query: | SELECT id, payload FROM events WHERE id > :cursor ORDER BY id LIMIT 500 ``` Cursor pagination is the right choice for any large incremental dataset — offsets get slow once you're millions of rows in. ## Incremental queries [#incremental-queries] For queries that should only return rows changed since the last run, combine `incremental` with `:lastRun` style placeholders: ```yaml incremental: enabled: true timestampField: updated_at timestampParam: lastTs query: | SELECT id, payload FROM events WHERE updated_at > :lastTs ORDER BY updated_at ``` The first run, when no state is yet persisted, gets `NULL`. Handle that in your SQL with `COALESCE(:lastTs, '-infinity'::timestamptz)` or similar. ## Examples [#examples] ## Cross-references [#cross-references] # pagination (/docs/modules/inputs/database/pagination) {/* generated by scripts/generate-module-subpages.ts */} # authentication (/docs/modules/inputs/http-polling/authentication) {/* generated by scripts/generate-module-subpages.ts */} # httpPolling (/docs/modules/inputs/http-polling) The `httpPolling` input GETs a JSON endpoint, extracts an array of records from the response, and emits each batch into the filter chain. With a `schedule`, it runs on CRON. Without one, it runs once. ## Minimal example [#minimal-example] ```yaml input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders dataField: orders ``` ## Options [#options] ## Pagination [#pagination] `httpPolling` supports three pagination styles. Each requires a different combination of fields. ### Page-based [#page-based] ```yaml pagination: type: page param: page # query param name limitParam: per_page limit: 100 totalPagesField: meta.total_pages ``` ### Offset-based [#offset-based] ```yaml pagination: type: offset param: offset limitParam: limit limit: 250 totalField: meta.total ``` ### Cursor-based [#cursor-based] ```yaml pagination: type: cursor param: cursor limit: 100 nextCursorField: meta.next_cursor ``` The runtime keeps requesting pages until the source signals there are no more. ## State persistence [#state-persistence] To resume from where you left off between runs, configure `statePersistence`. The state file is written after every successful batch and read on startup. ```yaml statePersistence: timestamp: enabled: true queryParam: updated_after id: enabled: true field: id queryParam: after_id storagePath: ./.cannectors-state ``` See [State persistence](/docs/concepts/state-persistence) for the full mental model and per-environment storage recommendations. ## Authentication [#authentication] `authentication` accepts the standard four schemes — `api-key`, `bearer`, `basic`, `oauth2`. See [Authentication](/docs/concepts/authentication) for examples. ## Examples [#examples] ## Cross-references [#cross-references] # pagination (/docs/modules/inputs/http-polling/pagination) {/* generated by scripts/generate-module-subpages.ts */} # retry (/docs/modules/inputs/http-polling/retry) {/* generated by scripts/generate-module-subpages.ts */} # statePersistence (/docs/modules/inputs/http-polling/state-persistence) {/* generated by scripts/generate-module-subpages.ts */} # authentication (/docs/modules/inputs/soap-polling/authentication) {/* generated by scripts/generate-module-subpages.ts */} # headers (/docs/modules/inputs/soap-polling/headers) {/* generated by scripts/generate-module-subpages.ts */} # soapPolling (/docs/modules/inputs/soap-polling) `soapPolling` is the SOAP counterpart to `httpPolling`. It sends a SOAP request, parses the XML response into a map, extracts records with `dataField`, and then hands those records to the filter chain. With a `schedule`, it runs on CRON. Without one, it runs once. ## Minimal example [#minimal-example] ```yaml input: type: soapPolling endpoint: https://soap.example.com/orders/v11 soapVersion: "1.1" soapAction: urn:ListOrders operation: ListOrders body: | ready dataField: Envelope.Body.ListOrdersResponse.Orders.Order ``` ## Options [#options] ## Record extraction [#record-extraction] `dataField` points into the parsed SOAP response map. For a response shaped like `Envelope.Body.ListOrdersResponse.Orders.Order`, configure that exact path: ```yaml dataField: Envelope.Body.ListOrdersResponse.Orders.Order ``` If the selected value is an array, each item becomes one record. If it is a single object, one record is emitted. ## Pagination and state [#pagination-and-state] SOAP pagination is configured with the same `pagination` and `statePersistence` blocks as HTTP polling. The cursor, page, or offset values are exposed through the synthetic record used to render the SOAP body. ```yaml body: | {{record.pagination.cursor | default: ""}} 100 pagination: type: cursor param: cursor nextCursorField: Envelope.Body.ListOrdersPageResponse.NextCursor ``` ## MTOM responses [#mtom-responses] When the SOAP response is `multipart/related`, records include received attachments under `_soapAttachments`. Each entry is keyed by Content-ID and contains `contentId`, `contentType`, and `data`. ## Examples [#examples] ## Cross-references [#cross-references] # mtom (/docs/modules/inputs/soap-polling/mtom) {/* generated by scripts/generate-module-subpages.ts */} # pagination (/docs/modules/inputs/soap-polling/pagination) {/* generated by scripts/generate-module-subpages.ts */} # retry (/docs/modules/inputs/soap-polling/retry) {/* generated by scripts/generate-module-subpages.ts */} # statePersistence (/docs/modules/inputs/soap-polling/state-persistence) {/* generated by scripts/generate-module-subpages.ts */} # wsSecurity (/docs/modules/inputs/soap-polling/ws-security) {/* generated by scripts/generate-module-subpages.ts */} # webhook (/docs/modules/inputs/webhook) The `webhook` input starts an HTTP server and treats each incoming POST as a delivery to process. Each delivery is either treated as a single record, or unwrapped via `dataField` into an array of records. The process stays alive as long as the server is running — there's no `schedule` for webhook inputs. ## Minimal example [#minimal-example] ```yaml input: type: webhook path: /webhooks/shipments ``` This starts an HTTP server listening on `0.0.0.0:8080` and routes POSTs on `/webhooks/shipments` into the filter chain. ## Options [#options] ## Signature verification [#signature-verification] Webhook deliveries should be authenticated. Cannectors supports **HMAC-SHA256** signature verification out of the box. ```yaml signature: type: hmac-sha256 header: X-Hub-Signature-256 # optional, defaults to this secret: ${WEBHOOK_HMAC_SECRET} ``` The server computes HMAC-SHA256 over the raw body using the shared secret, then compares to the `header` value. Requests that don't match get rejected with `401 Unauthorized`. ## Queue and rate limiting [#queue-and-rate-limiting] By default, the server processes deliveries synchronously — the HTTP response is held until the pipeline finishes the record. For bursty sources, configure an in-memory queue: ```yaml queueSize: 1024 maxConcurrent: 4 rateLimit: requestsPerSecond: 50 burst: 100 ``` | Field | What it does | | ----------------------------- | ------------------------------------------------------------ | | `queueSize` | Number of deliveries buffered before the server returns 503. | | `maxConcurrent` | Number of pipeline workers consuming the queue concurrently. | | `rateLimit.requestsPerSecond` | Steady-state rate the source can deliver. | | `rateLimit.burst` | Maximum burst above the steady rate. | Deliveries served by a worker run **after** the HTTP response is sent. That means the HTTP request context is already done — the runtime uses the server's context for downstream stages so that `http_call`/`sql_call` aren't cancelled mid-flight. ## Examples [#examples] ## Cross-references [#cross-references] # rateLimit (/docs/modules/inputs/webhook/rate-limit) {/* generated by scripts/generate-module-subpages.ts */} # signature (/docs/modules/inputs/webhook/signature) {/* generated by scripts/generate-module-subpages.ts */} # authentication (/docs/modules/outputs/http-request/authentication) {/* generated by scripts/generate-module-subpages.ts */} # httpRequest (/docs/modules/outputs/http-request) The `httpRequest` output ships records to an HTTP destination. Two request modes: **batch** (everything that reached the output in one request) or **single** (one request per record). Supports retries, authentication, and custom success conditions. ## Minimal example [#minimal-example] ```yaml output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch ``` ## Options [#options] ## Request mode [#request-mode] ```yaml requestMode: batch # one request with all records (default) requestMode: single # one request per record ``` | Mode | When to use | | -------- | -------------------------------------------------------------------------------------------- | | `batch` | Destination accepts arrays — fewer requests, lower overhead. | | `single` | Destination only accepts one record at a time, or you need per-record templating in the URL. | In `single` mode, you can template the URL with `{{record.x}}`: ```yaml endpoint: https://destination.example.com/api/orders/{{record.id}} method: PUT requestMode: single ``` The URL is resolved once per record, then the runtime fires one request for each. ## Success conditions [#success-conditions] By default, Cannectors considers status codes `200`, `201`, `202`, `203`, and `204` as success. Override via the `success` block: ```yaml success: statusCodes: [200, 201, 202] ``` You can also evaluate an `expr` expression against the response. The variables `statusCode` (int), `headers` (map), and `body` (parsed JSON if applicable) are exposed: ```yaml success: expression: 'statusCode == 200 && body.status == "ok"' ``` When both `statusCodes` and `expression` are set, **both must hold** for the request to count as successful. ## Templating [#templating] The body, headers, and URL path (in `single` mode) all support `{{record.}}` templates. They're substituted at request time, once per record. ```yaml output: type: httpRequest endpoint: https://destination.example.com/api/customers/{{record.id}} method: PUT requestMode: single body: | { "id": "{{record.id}}", "email": "{{record.email}}", "tier": "{{record.tier}}" } ``` Or, for larger templates, externalize the body: ```yaml bodyTemplateFile: ./templates/customer-import.json ``` ## Examples [#examples] ## Cross-references [#cross-references] # keys (/docs/modules/outputs/http-request/keys) {/* generated by scripts/generate-module-subpages.ts */} # retry (/docs/modules/outputs/http-request/retry) {/* generated by scripts/generate-module-subpages.ts */} # success (/docs/modules/outputs/http-request/success) {/* generated by scripts/generate-module-subpages.ts */} # authentication (/docs/modules/outputs/soap-request/authentication) {/* generated by scripts/generate-module-subpages.ts */} # headers (/docs/modules/outputs/soap-request/headers) {/* generated by scripts/generate-module-subpages.ts */} # soapRequest (/docs/modules/outputs/soap-request) `soapRequest` is the SOAP output module. It builds a SOAP envelope from a raw XML body fragment and sends either one request for the whole batch or one request per record. It supports SOAP 1.1, SOAP 1.2, HTTP transport authentication, WS-Security UsernameToken, MTOM, retries, and success conditions. ## Minimal example [#minimal-example] ```yaml output: type: soapRequest endpoint: https://soap.example.com/orders soapAction: urn:SubmitOrder operation: SubmitOrder requestMode: single body: | {{record.orderId}} ``` ## Options [#options] ## Request mode [#request-mode] ```yaml requestMode: batch # one SOAP request for the whole batch requestMode: single # one SOAP request per record ``` Use `single` when the SOAP operation accepts one business object at a time or when the body references scalar fields from each record. Use `batch` when the operation accepts an array or wrapper object. ## MTOM output [#mtom-output] Outgoing MTOM requires both an XOP include in the XML and a matching attachment declaration: ```yaml body: | mtom: enabled: true attachments: - contentId: "{{record.documentId}}" contentType: application/pdf sourceField: documentBase64 encoding: base64 ``` Use `encoding: base64` for JSON-originated binary payloads encoded as strings. ## WS-Security [#ws-security] ```yaml wsSecurity: username: soap-user password: ${SOAP_PASSWORD} passwordType: PasswordDigest mustUnderstand: true ``` Only UsernameToken `PasswordText` and `PasswordDigest` are supported. Use the standard `authentication` block separately when the HTTP transport itself needs Basic, bearer, API key, or OAuth2 authentication. ## Success conditions [#success-conditions] By default, successful SOAP requests are 2xx responses with no SOAP fault. Add a `success` block when a destination distinguishes multiple successful 2xx statuses or requires a response-body expression. ```yaml success: statusCodes: [200, 202] ``` Non-2xx responses with no SOAP fault are treated as HTTP errors. ## Examples [#examples] ## Cross-references [#cross-references] # mtom (/docs/modules/outputs/soap-request/mtom) {/* generated by scripts/generate-module-subpages.ts */} # retry (/docs/modules/outputs/soap-request/retry) {/* generated by scripts/generate-module-subpages.ts */} # success (/docs/modules/outputs/soap-request/success) {/* generated by scripts/generate-module-subpages.ts */} # wsSecurity (/docs/modules/outputs/soap-request/ws-security) {/* generated by scripts/generate-module-subpages.ts */} # Raw JSON Schemas These schemas are vendored from the Cannectors repository and are authoritative for pipeline generation. ## schemas/cannectors/pipeline-schema.json ```json { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://cannectors.io/schemas/pipeline/v1.1.0/pipeline-schema.json", "title": "Cannectors Pipeline Configuration Schema", "description": "Schema for defining declarative modular pipeline configurations. Supports Input → Filter → Output pattern with extensible module types.", "type": "object", "required": [ "name", "input", "output" ], "properties": { "id": { "type": "string", "description": "Optional pipeline identifier. Defaults to name when omitted.", "pattern": "^[a-z][a-z0-9_-]*$" }, "name": { "type": "string", "description": "Unique identifier for this connector.", "minLength": 1, "maxLength": 128 }, "version": { "type": "string", "description": "Connector version. Free-form non-empty string.", "minLength": 1 }, "description": { "type": "string", "maxLength": 1024 }, "tags": { "type": "array", "description": "User-facing tags. Accepted but ignored by the runtime.", "items": { "type": "string" }, "uniqueItems": true }, "enabled": { "type": "boolean", "description": "Whether the pipeline is active.", "default": true }, "dryRunOptions": { "type": "object", "description": "Dry-run mode options.", "properties": { "showCredentials": { "type": "boolean", "description": "When true, dry-run shows resolved credentials instead of masked values.", "default": false } }, "additionalProperties": false }, "defaults": { "$ref": "common-schema.json#/$defs/moduleDefaults", "description": "Default settings for all modules. Module-level settings override these." }, "input": { "$ref": "input-schema.json#/$defs/inputModule" }, "filters": { "type": "array", "items": { "$ref": "filter-schema.json#/$defs/filterModule" } }, "output": { "$ref": "output-schema.json#/$defs/outputModule" } }, "additionalProperties": false } ``` ## schemas/cannectors/input-schema.json ```json { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://cannectors.io/schemas/pipeline/v1.1.0/input-schema.json", "title": "Cannectors Input Module Schema", "description": "Schema definitions for input modules.", "$defs": { "inputModule": { "type": "object", "description": "Input module configuration.", "required": [ "type" ], "unevaluatedProperties": false, "properties": { "type": { "type": "string", "description": "Input type. Canonical: httpPolling, soapPolling, webhook, database.", "minLength": 1 } }, "allOf": [ { "$ref": "common-schema.json#/$defs/moduleBase" }, { "oneOf": [ { "allOf": [ { "$ref": "#/$defs/webhookInputConfig" }, { "properties": { "type": { "const": "webhook" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/httpPollingInputConfig" }, { "properties": { "type": { "const": "httpPolling" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/soapPollingInputConfig" }, { "properties": { "type": { "const": "soapPolling" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/databaseInputConfig" }, { "properties": { "type": { "const": "database" } }, "required": [ "type" ] } ] } ] } ] }, "httpPollingInputConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/httpRequestBase" }, { "type": "object", "description": "HTTP polling input module configuration. Without 'schedule', runs once. With 'schedule', registered with the CRON scheduler.", "properties": { "schedule": { "type": "string", "description": "Optional CRON expression for polling. Validated at runtime.", "minLength": 9 }, "pagination": { "$ref": "common-schema.json#/$defs/pagination" }, "statePersistence": { "$ref": "common-schema.json#/$defs/statePersistenceConfig" }, "dataField": { "type": "string", "description": "JSON field path containing the array of records to extract from the response." }, "retry": { "$ref": "common-schema.json#/$defs/retryConfig" } } } ] }, "soapPollingInputConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/soapRequestBase" }, { "type": "object", "description": "SOAP polling input module configuration. Without 'schedule', runs once. With 'schedule', registered with the CRON scheduler.", "properties": { "schedule": { "type": "string", "description": "Optional CRON expression for polling. Validated at runtime.", "minLength": 9 }, "pagination": { "$ref": "common-schema.json#/$defs/pagination" }, "statePersistence": { "$ref": "common-schema.json#/$defs/statePersistenceConfig" }, "dataField": { "type": "string", "description": "XML map field path containing the array of records to extract from the SOAP response." }, "retry": { "$ref": "common-schema.json#/$defs/retryConfig" } } } ] }, "webhookInputConfig": { "type": "object", "description": "Webhook input module configuration. Listens for incoming HTTP requests.", "required": [ "path" ], "properties": { "path": { "type": "string", "description": "Webhook endpoint path (e.g. /webhook/orders)." }, "listenAddress": { "type": "string", "description": "Webhook server listen address.", "default": "0.0.0.0:8080" }, "requestTimeoutMs": { "type": "integer", "minimum": 1, "description": "Per-request HTTP read/write timeout in milliseconds. Does NOT bound the webhook server lifetime." }, "signature": { "$ref": "#/$defs/webhookSignature" }, "rateLimit": { "$ref": "#/$defs/webhookRateLimit" }, "queueSize": { "type": "integer", "description": "Queue size for async webhook processing.", "minimum": 1 }, "maxConcurrent": { "type": "integer", "description": "Maximum concurrent webhook workers.", "minimum": 1 }, "dataField": { "type": "string", "description": "JSON field path containing the array of records to extract from the request body." } } }, "databaseInputConfig": { "type": "object", "description": "Database input module configuration. Executes SQL queries to fetch data. Supports {{lastRunTimestamp}} placeholder for incremental queries. Without 'schedule', runs once. With 'schedule', registered with the CRON scheduler.", "allOf": [ { "$ref": "common-schema.json#/$defs/sqlRequestBase" } ], "properties": { "schedule": { "type": "string", "description": "Optional CRON expression to run the database input on a schedule. Validated at runtime.", "minLength": 9 }, "parameters": { "type": "object", "description": "Query parameters. Keys are parameter names (used with :paramName syntax).", "additionalProperties": true }, "pagination": { "$ref": "common-schema.json#/$defs/databasePaginationConfig" }, "incremental": { "$ref": "common-schema.json#/$defs/databaseIncrementalConfig" } } }, "webhookSignature": { "type": "object", "description": "Webhook signature validation configuration.", "properties": { "type": { "type": "string", "description": "Signature algorithm.", "enum": [ "hmac-sha256" ] }, "header": { "type": "string", "minLength": 1, "description": "Header name containing the signature. Defaults to X-Hub-Signature-256 when omitted." }, "secret": { "type": "string", "minLength": 1, "description": "Secret key for signature validation." } }, "required": [ "type", "secret" ], "additionalProperties": false }, "webhookRateLimit": { "type": "object", "description": "Webhook rate limiting configuration.", "properties": { "requestsPerSecond": { "type": "integer", "description": "Maximum requests per second.", "minimum": 1 }, "burst": { "type": "integer", "description": "Maximum burst size.", "minimum": 1 } }, "required": [ "requestsPerSecond" ], "additionalProperties": false } } } ``` ## schemas/cannectors/filter-schema.json ```json { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://cannectors.io/schemas/pipeline/v1.1.0/filter-schema.json", "title": "Cannectors Filter Module Schema", "description": "Schema definitions for filter modules.", "$defs": { "filterModule": { "type": "object", "description": "Filter module configuration.", "required": [ "type" ], "unevaluatedProperties": false, "properties": { "type": { "type": "string", "description": "Filter type. Canonical: mapping, condition, drop, loop, set, remove, script, http_call, soap_call, sql_call.", "minLength": 1 } }, "allOf": [ { "$ref": "common-schema.json#/$defs/moduleBase" }, { "oneOf": [ { "allOf": [ { "$ref": "#/$defs/mappingFilterConfig" }, { "properties": { "type": { "const": "mapping" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/conditionFilterConfig" }, { "properties": { "type": { "const": "condition" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/sqlCallFilterConfig" }, { "properties": { "type": { "const": "sql_call" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/scriptFilterConfig" }, { "properties": { "type": { "const": "script" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/httpCallFilterConfig" }, { "properties": { "type": { "const": "http_call" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/soapCallFilterConfig" }, { "properties": { "type": { "const": "soap_call" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/setFilterConfig" }, { "properties": { "type": { "const": "set" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/removeFilterConfig" }, { "properties": { "type": { "const": "remove" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/dropFilterConfig" }, { "properties": { "type": { "const": "drop" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/loopFilterConfig" }, { "properties": { "type": { "const": "loop" } }, "required": [ "type" ] } ] } ] } ] }, "mappingFilterConfig": { "type": "object", "description": "Mapping filter module configuration. Maps fields from source to target paths.", "required": [ "mappings" ], "properties": { "mappings": { "type": "array", "items": { "$ref": "#/$defs/fieldMapping" } } } }, "conditionFilterConfig": { "type": "object", "description": "Condition filter module. Routes records based on an expression evaluated against each record. Records matching the expression go through the 'then' branch; others go through 'else'. An absent branch keeps the record unchanged. To drop records use the explicit 'drop' filter inside a branch.", "required": [ "expression" ], "properties": { "expression": { "type": "string", "minLength": 1, "description": "Boolean expression evaluated against each record." }, "then": { "type": "array", "description": "Nested filters to execute when condition is true.", "items": { "$ref": "#/$defs/filterModule" } }, "else": { "type": "array", "description": "Nested filters to execute when condition is false.", "items": { "$ref": "#/$defs/filterModule" } } } }, "dropFilterConfig": { "type": "object", "description": "Drop filter module. Removes every record it receives from the stream. Typically placed inside a condition branch to filter records out explicitly." }, "loopFilterConfig": { "type": "object", "description": "Loop filter module. Iterates over an array field on each record, running a nested filter chain per item. The current item is exposed under the configured 'itemName' alias; the root record stays available as 'record'. Loop metadata is exposed read-only at '_metadata.loop..index'.", "required": [ "field", "itemName", "filters" ], "properties": { "field": { "type": "string", "minLength": 1, "description": "Path to the array field, relative to the input record (root record for the outermost loop, parent scope for nested loops). Supports dot notation." }, "itemName": { "type": "string", "minLength": 1, "description": "Alias exposed to nested filters for the current item. Cannot be 'record', '_metadata', or 'loop', and must not duplicate an active parent loop alias.", "not": { "enum": [ "record", "_metadata", "loop" ] } }, "filters": { "type": "array", "description": "Nested filters executed for every item.", "items": { "$ref": "#/$defs/filterModule" } } } }, "setFilterConfig": { "type": "object", "description": "Set filter module configuration. Sets a field to a literal value on each record (create or overwrite).", "required": [ "target", "value" ], "properties": { "target": { "type": "string", "minLength": 1, "description": "Field path to set (e.g. id, user.id, metadata.version). Supports dot notation for nested paths." }, "value": { "description": "Literal value to set. Can be string, number, boolean, or null. Type is preserved." } } }, "removeFilterConfig": { "type": "object", "description": "Remove filter module configuration. Removes one or more fields from each record.", "required": [ "target" ], "properties": { "target": { "description": "Field path(s) to remove. Either a single non-empty string or a non-empty array of non-empty strings. Supports dot notation for nested paths.", "oneOf": [ { "type": "string", "minLength": 1 }, { "type": "array", "items": { "type": "string", "minLength": 1 }, "minItems": 1 } ] } } }, "scriptFilterConfig": { "type": "object", "description": "Script filter module configuration. Executes JavaScript transformations using Goja.", "properties": { "script": { "type": "string", "minLength": 1, "description": "Inline JavaScript source code containing a transform(record) function." }, "scriptFile": { "type": "string", "minLength": 1, "description": "Path to JavaScript file containing the transform(record) function." } }, "oneOf": [ { "required": [ "script" ], "not": { "required": [ "scriptFile" ] } }, { "required": [ "scriptFile" ], "not": { "required": [ "script" ] } } ] }, "httpCallFilterConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/httpRequestBase" }, { "type": "object", "description": "HTTP call filter module configuration. Makes HTTP requests to external APIs for record enrichment.", "properties": { "keys": { "type": "array", "description": "List of key configurations for extracting values from records and using them in requests.", "items": { "$ref": "common-schema.json#/$defs/httpCallKeyConfig" } }, "cache": { "$ref": "#/$defs/cacheConfig" }, "mergeStrategy": { "type": "string", "description": "How to merge HTTP response with input records.", "enum": [ "merge", "replace", "append" ], "default": "merge" }, "dataField": { "type": "string", "description": "Field to extract from HTTP response." }, "retry": { "$ref": "common-schema.json#/$defs/retryConfig" } } } ] }, "soapCallFilterConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/soapRequestBase" }, { "type": "object", "description": "SOAP call filter module configuration. Makes SOAP requests to external APIs for record enrichment.", "properties": { "keys": { "type": "array", "description": "List of key configurations for extracting values from records and using them in request metadata.", "items": { "$ref": "common-schema.json#/$defs/httpCallKeyConfig" } }, "cache": { "$ref": "#/$defs/cacheConfig" }, "mergeStrategy": { "type": "string", "description": "How to merge SOAP response with input records.", "enum": [ "merge", "replace", "append" ], "default": "merge" }, "resultKey": { "type": "string", "minLength": 1, "description": "Key for storing the SOAP response when append mode is used." }, "dataField": { "type": "string", "description": "Field path to extract from the parsed SOAP response map before merge/replace/append. When omitted, the full parsed SOAP response is used." }, "retry": { "$ref": "common-schema.json#/$defs/retryConfig" } } }, { "if": { "properties": { "mergeStrategy": { "const": "append" } }, "required": [ "mergeStrategy" ] }, "then": { "required": [ "resultKey" ] } } ] }, "sqlCallFilterConfig": { "type": "object", "description": "SQL call filter module configuration. Executes SQL queries for record enrichment.", "allOf": [ { "$ref": "common-schema.json#/$defs/sqlRequestBase" }, { "if": { "properties": { "mergeStrategy": { "const": "append" } }, "required": [ "mergeStrategy" ] }, "then": { "required": [ "resultKey" ] } }, { "not": { "required": [ "retry" ] } } ], "properties": { "mergeStrategy": { "type": "string", "description": "How to merge query results with input records.", "enum": [ "merge", "replace", "append" ], "default": "merge" }, "resultKey": { "type": "string", "minLength": 1, "description": "Key for storing result in append mode." }, "cache": { "$ref": "#/$defs/cacheConfig" } } }, "cacheConfig": { "type": "object", "description": "Cache configuration for SQL call filter.", "properties": { "enabled": { "type": "boolean", "description": "Enable caching of query results.", "default": false }, "maxSize": { "type": "integer", "description": "Maximum number of cached entries.", "minimum": 1, "default": 1000 }, "ttlSeconds": { "type": "integer", "description": "Cache entry TTL in seconds.", "minimum": 1, "default": 300 }, "key": { "type": "string", "minLength": 1, "description": "Cache key template using {{record.field}} syntax." } }, "additionalProperties": false }, "fieldMapping": { "type": "object", "description": "Field mapping. Requires target. Omit source to delete the target field.", "required": [ "target" ], "properties": { "source": { "type": "string", "minLength": 1, "description": "Source field path. Omit to delete the target field." }, "target": { "type": "string", "minLength": 1, "description": "Target field path." }, "transforms": { "type": "array", "items": { "$ref": "#/$defs/transformOp" } }, "defaultValue": {}, "onMissing": { "type": "string", "enum": [ "setNull", "skipField", "useDefault", "fail" ], "default": "setNull" } }, "additionalProperties": false, "allOf": [ { "if": { "properties": { "onMissing": { "const": "useDefault" } }, "required": [ "onMissing" ] }, "then": { "required": [ "defaultValue" ] } } ] }, "transformOp": { "type": "object", "required": [ "op" ], "properties": { "op": { "type": "string", "description": "Operation name.", "enum": [ "trim", "lowercase", "uppercase", "dateFormat", "replace", "split", "join", "toString", "toInt", "toFloat", "toBool", "toArray", "toObject" ] }, "format": { "type": "string", "description": "Date format pattern." }, "pattern": { "type": "string", "minLength": 1, "description": "Regex pattern for replace/match operations." }, "replacement": { "type": "string", "description": "Replacement string." }, "separator": { "type": "string", "description": "Separator for split/join operations." } }, "additionalProperties": false, "allOf": [ { "if": { "properties": { "op": { "const": "replace" } }, "required": [ "op" ] }, "then": { "required": [ "pattern" ] } } ] } } } ``` ## schemas/cannectors/output-schema.json ```json { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://cannectors.io/schemas/pipeline/v1.1.0/output-schema.json", "title": "Cannectors Output Module Schema", "description": "Schema definitions for output modules.", "$defs": { "outputModule": { "type": "object", "description": "Output module configuration.", "required": [ "type" ], "unevaluatedProperties": false, "properties": { "type": { "type": "string", "description": "Output type. Canonical: httpRequest, soapRequest, database.", "minLength": 1 } }, "allOf": [ { "$ref": "common-schema.json#/$defs/moduleBase" }, { "oneOf": [ { "allOf": [ { "$ref": "#/$defs/httpRequestOutputConfig" }, { "properties": { "type": { "const": "httpRequest" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/soapRequestOutputConfig" }, { "properties": { "type": { "const": "soapRequest" } }, "required": [ "type" ] } ] }, { "allOf": [ { "$ref": "#/$defs/databaseOutputConfig" }, { "properties": { "type": { "const": "database" } }, "required": [ "type" ] } ] } ] } ] }, "httpRequestOutputConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/httpRequestBase" }, { "type": "object", "description": "HTTP request output module configuration. Sends records to an HTTP endpoint.", "properties": { "success": { "$ref": "#/$defs/successCondition" }, "retry": { "$ref": "common-schema.json#/$defs/retryConfig" }, "keys": { "type": "array", "description": "List of key configurations for extracting values from records and using them in path, query, or headers.", "items": { "$ref": "common-schema.json#/$defs/httpCallKeyConfig" } }, "requestMode": { "type": "string", "description": "Request mode: 'batch' (all records in one request, default) or 'single' (one request per record).", "enum": [ "batch", "single" ], "default": "batch" } } } ] }, "soapRequestOutputConfig": { "allOf": [ { "$ref": "common-schema.json#/$defs/soapRequestBase" }, { "type": "object", "description": "SOAP request output module configuration. Sends records to a SOAP endpoint.", "properties": { "success": { "$ref": "#/$defs/successCondition" }, "retry": { "$ref": "common-schema.json#/$defs/retryConfig" }, "requestMode": { "type": "string", "description": "Request mode: 'batch' (all records in one request, default) or 'single' (one request per record).", "enum": [ "batch", "single" ], "default": "batch" } } } ] }, "databaseOutputConfig": { "type": "object", "description": "Database output module configuration. Executes SQL queries to write records.", "allOf": [ { "$ref": "common-schema.json#/$defs/sqlRequestBase" } ], "properties": { "transaction": { "type": "boolean", "description": "Wrap operations in a transaction.", "default": false } } }, "successCondition": { "type": "object", "description": "Determines when an HTTP response is considered successful. When omitted, success defaults to status codes [200, 201, 202, 203, 204]. When `expression` and `statusCodes` are both provided, both must be true. When only `expression` is provided, it alone determines success.", "additionalProperties": false, "anyOf": [ { "required": [ "expression" ] }, { "required": [ "statusCodes" ] } ], "properties": { "expression": { "type": "string", "description": "expr expression evaluated against the response. Available variables: `statusCode` (int), `headers` (map[string][]string), `body` (parsed JSON when applicable). Must return a boolean.", "minLength": 1 }, "statusCodes": { "type": "array", "description": "HTTP status codes considered successful.", "items": { "type": "integer", "minimum": 100, "maximum": 599 }, "minItems": 1, "uniqueItems": true, "default": [ 200, 201, 202, 203, 204 ] } } } } } ``` ## schemas/cannectors/auth-schema.json ```json { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://cannectors.io/schemas/pipeline/v1.1.0/auth-schema.json", "title": "Cannectors Authentication Schema Definitions", "description": "Authentication definitions shared across pipeline modules.", "$defs": { "auth": { "description": "Authentication configuration. Supports api-key, bearer, basic, and oauth2 types.", "oneOf": [ { "$ref": "#/$defs/authApiKey" }, { "$ref": "#/$defs/authBearer" }, { "$ref": "#/$defs/authBasic" }, { "$ref": "#/$defs/authOAuth2" } ] }, "authApiKey": { "type": "object", "description": "API key authentication.", "required": [ "type", "credentials" ], "properties": { "type": { "const": "api-key" }, "credentials": { "$ref": "#/$defs/credentialsApiKey" } }, "additionalProperties": false }, "authBearer": { "type": "object", "description": "Bearer token authentication.", "required": [ "type", "credentials" ], "properties": { "type": { "const": "bearer" }, "credentials": { "$ref": "#/$defs/credentialsBearer" } }, "additionalProperties": false }, "authBasic": { "type": "object", "description": "HTTP Basic authentication.", "required": [ "type", "credentials" ], "properties": { "type": { "const": "basic" }, "credentials": { "$ref": "#/$defs/credentialsBasic" } }, "additionalProperties": false }, "authOAuth2": { "type": "object", "description": "OAuth2 client credentials authentication.", "required": [ "type", "credentials" ], "properties": { "type": { "const": "oauth2" }, "credentials": { "$ref": "#/$defs/credentialsOAuth2" } }, "additionalProperties": false }, "credentialsApiKey": { "type": "object", "description": "API key credentials.", "required": [ "key" ], "properties": { "key": { "type": "string", "description": "API key value or env reference (${VAR})." }, "location": { "type": "string", "enum": [ "header", "query" ], "default": "header", "description": "Where to send the key." }, "headerName": { "type": "string", "default": "X-API-Key", "description": "Header name when location=header." }, "paramName": { "type": "string", "default": "api_key", "description": "Query param name when location=query." } }, "additionalProperties": false }, "credentialsBearer": { "type": "object", "description": "Bearer token credentials.", "required": [ "token" ], "properties": { "token": { "type": "string", "description": "Bearer token value or env reference (${VAR})." } }, "additionalProperties": false }, "credentialsBasic": { "type": "object", "description": "Basic auth credentials.", "required": [ "username", "password" ], "properties": { "username": { "type": "string", "description": "Username or env reference (${VAR})." }, "password": { "type": "string", "description": "Password or env reference (${VAR})." } }, "additionalProperties": false }, "credentialsOAuth2": { "type": "object", "description": "OAuth2 client credentials.", "required": [ "tokenUrl", "clientId", "clientSecret" ], "properties": { "tokenUrl": { "type": "string", "format": "uri", "description": "Token endpoint URL." }, "clientId": { "type": "string", "description": "Client ID or env reference (${VAR})." }, "clientSecret": { "type": "string", "description": "Client secret or env reference (${VAR})." }, "scope": { "type": "string", "description": "OAuth2 scopes as a space-separated string (RFC 6749 §3.3)." } }, "additionalProperties": false } } } ``` ## schemas/cannectors/common-schema.json ```json { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://cannectors.io/schemas/pipeline/v1.1.0/common-schema.json", "title": "Cannectors Common Schema Definitions", "description": "Common definitions shared across pipeline modules.", "$defs": { "moduleBase": { "type": "object", "description": "Common properties for all modules.", "properties": { "id": { "type": "string", "description": "Unique identifier within the pipeline.", "pattern": "^[a-z][a-z0-9_-]*$" }, "name": { "type": "string", "description": "Human-readable name.", "maxLength": 128 }, "description": { "type": "string", "maxLength": 1024 }, "enabled": { "type": "boolean", "description": "Whether module is active.", "default": true }, "tags": { "type": "array", "items": { "type": "string" } }, "onError": { "type": "string", "description": "Default error action. Case-insensitive; normalized to lowercase by the runtime.", "pattern": "^(?i)(fail|skip|log)$", "default": "fail" } } }, "moduleDefaults": { "type": "object", "description": "Default settings applied to Input and Output modules unless overridden at module level.", "properties": { "timeoutMs": { "type": "integer", "description": "Default timeout in milliseconds.", "minimum": 0, "default": 30000 }, "retry": { "$ref": "#/$defs/retryConfig" }, "onError": { "type": "string", "description": "Default error handling strategy. Case-insensitive; normalized to lowercase by the runtime.", "pattern": "^(?i)(fail|skip|log)$" } }, "additionalProperties": false }, "retryConfig": { "type": "object", "description": "Retry configuration. Precedence: module > defaults.", "properties": { "maxAttempts": { "type": "integer", "description": "Maximum retry attempts (0 = no retry).", "minimum": 0, "maximum": 10, "default": 3 }, "delayMs": { "type": "integer", "description": "Initial delay between retries in milliseconds.", "minimum": 0, "default": 1000 }, "backoffMultiplier": { "type": "number", "description": "Multiplier for exponential backoff.", "minimum": 1, "maximum": 10, "default": 2 }, "maxDelayMs": { "type": "integer", "description": "Maximum delay between retries.", "minimum": 0, "default": 30000 }, "retryableStatusCodes": { "type": "array", "description": "HTTP status codes that trigger retry.", "items": { "type": "integer", "minimum": 100, "maximum": 599 }, "default": [ 429, 500, 502, 503, 504 ] }, "useRetryAfterHeader": { "type": "boolean", "description": "Use Retry-After response header to determine delay before retrying. Supports seconds format (e.g., '120') or HTTP-date format (e.g., 'Fri, 31 Dec 2025 23:59:59 GMT'). The delay is capped by maxDelayMs. If header is invalid or absent, falls back to exponential backoff.", "default": false }, "retryHintFromBody": { "type": "string", "description": "expr expression to evaluate against JSON response body. The parsed JSON body is available as the 'body' variable. If expression returns true, error is retryable (if status code is in retryableStatusCodes). If false, error is NOT retryable (overrides status code). If body is not valid JSON, falls back to status code only. Example: 'body.retryable == true' or 'body.error.code != \"PERMANENT\"'.", "default": "" } }, "additionalProperties": false }, "httpMethod": { "type": "string", "description": "HTTP method. Any RFC 7230 token is accepted (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS, custom verbs, etc.). The runtime normalizes to upper-case and rejects malformed tokens.", "pattern": "^[!#$%&'*+\\-.^_`|~0-9A-Za-z]+$", "minLength": 1 }, "httpHeaders": { "type": "object", "additionalProperties": { "type": "string" } }, "httpCallKeyConfig": { "type": "object", "description": "Key configuration for extracting a value from a record and using it in HTTP requests (path, query, or header).", "required": [ "field", "paramType", "paramName" ], "additionalProperties": false, "properties": { "field": { "type": "string", "description": "Dot-notation path to extract value from record (e.g., 'customer.id').", "minLength": 1 }, "paramType": { "type": "string", "description": "How to include value in request.", "enum": [ "query", "path", "header" ] }, "paramName": { "type": "string", "description": "Parameter name (path placeholder, query param, or header name).", "minLength": 1 } } }, "httpRequestBase": { "type": "object", "description": "Common HTTP request configuration shared by httpPolling, httpCall, and httpRequest modules.", "required": [ "endpoint" ], "properties": { "endpoint": { "type": "string", "description": "HTTP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution.", "minLength": 1 }, "method": { "$ref": "#/$defs/httpMethod" }, "headers": { "$ref": "#/$defs/httpHeaders" }, "queryParams": { "type": "object", "description": "Query parameters.", "additionalProperties": { "type": "string" } }, "body": { "type": "string", "description": "Inline request body. Templates ({{record.field}}) are evaluated at runtime. The body is sent regardless of the HTTP method." }, "bodyTemplateFile": { "type": "string", "description": "Path to an external template file used as the request body. Mutually exclusive with `body` is not enforced; if both are set, the inline body wins.", "minLength": 1 }, "authentication": { "$ref": "auth-schema.json#/$defs/auth" }, "timeoutMs": { "type": "integer", "description": "Request timeout in milliseconds. When omitted, each module applies its own runtime default.", "minimum": 1 } } }, "soapRequestBase": { "type": "object", "description": "Common SOAP request configuration shared by soapPolling, soap_call, and soapRequest modules. The body is raw XML; no WSDL parsing is performed at runtime.", "required": [ "endpoint", "operation", "body" ], "properties": { "endpoint": { "type": "string", "description": "SOAP endpoint URL. Templates are accepted; the final URL is validated at runtime after template resolution.", "minLength": 1 }, "soapVersion": { "type": "string", "description": "SOAP envelope and HTTP binding version.", "enum": [ "1.1", "1.2" ], "default": "1.1" }, "soapAction": { "type": "string", "description": "SOAP action. SOAP 1.1 sends this as the SOAPAction header; SOAP 1.2 sends it as a Content-Type action parameter." }, "operation": { "type": "string", "description": "Logical SOAP operation name.", "minLength": 1 }, "body": { "type": "string", "description": "Raw XML body fragment. Templates ({{record.field}}) are XML-escaped at runtime.", "minLength": 1 }, "headers": { "type": "array", "description": "Raw SOAP header XML fragments.", "items": { "$ref": "#/$defs/soapHeaderTemplate" } }, "authentication": { "$ref": "auth-schema.json#/$defs/auth", "description": "HTTP transport authentication." }, "wsSecurity": { "$ref": "#/$defs/soapSecurityConfig" }, "mtom": { "$ref": "#/$defs/soapMTOMConfig" }, "httpHeaders": { "$ref": "#/$defs/httpHeaders", "description": "Additional HTTP headers. Content-Type and SOAPAction are controlled by the SOAP version." }, "timeoutMs": { "type": "integer", "description": "Request timeout in milliseconds. When omitted, each module applies its own runtime default.", "minimum": 1 } }, "allOf": [ { "if": { "properties": { "soapVersion": { "const": "1.2" } }, "required": [ "soapVersion" ] }, "then": { "properties": { "httpHeaders": { "propertyNames": { "not": { "pattern": "^(?i)soapaction$" } } } } } } ] }, "soapHeaderTemplate": { "type": "object", "description": "Raw SOAP header XML fragment.", "required": [ "xml" ], "additionalProperties": false, "properties": { "xml": { "type": "string", "minLength": 1 } } }, "soapSecurityConfig": { "type": "object", "description": "WS-Security UsernameToken configuration. Only PasswordText and PasswordDigest are supported.", "required": [ "username", "password" ], "additionalProperties": false, "properties": { "username": { "type": "string", "minLength": 1 }, "password": { "type": "string", "minLength": 1 }, "passwordType": { "type": "string", "enum": [ "PasswordText", "PasswordDigest" ], "default": "PasswordText" }, "tokenId": { "type": "string", "minLength": 1 }, "mustUnderstand": { "type": "boolean", "default": false } } }, "soapMTOMConfig": { "type": "object", "description": "MTOM/XOP configuration. The raw XML body must contain matching xop:Include cid references for outgoing attachments.", "additionalProperties": false, "properties": { "enabled": { "type": "boolean", "default": false }, "attachments": { "type": "array", "items": { "$ref": "#/$defs/soapAttachmentTemplate" } } } }, "soapAttachmentTemplate": { "type": "object", "description": "Outgoing MTOM attachment template.", "required": [ "contentId", "contentType", "sourceField" ], "additionalProperties": false, "properties": { "contentId": { "type": "string", "minLength": 1 }, "contentType": { "type": "string", "minLength": 1 }, "sourceField": { "type": "string", "description": "Record path containing the attachment bytes.", "minLength": 1 }, "encoding": { "type": "string", "description": "Encoding of string values read from sourceField. 'binary' sends the string bytes as-is; 'base64' decodes the string before attaching it.", "enum": [ "binary", "base64" ], "default": "binary" } } }, "pagination": { "type": "object", "description": "Pagination configuration for HTTP polling.", "unevaluatedProperties": false, "required": [ "type", "param" ], "properties": { "type": { "type": "string", "description": "Pagination type.", "enum": [ "cursor", "offset", "page" ] }, "param": { "type": "string", "minLength": 1, "description": "Canonical query parameter name carrying the cursor, offset, or page value." }, "limitParam": { "type": "string", "minLength": 1, "description": "Query parameter name for the page size (when applicable)." }, "limit": { "type": "integer", "minimum": 1, "description": "Number of items per page. When omitted, the runtime applies its default." }, "nextCursorField": { "type": "string", "description": "Field containing the next cursor value (cursor pagination)." }, "totalPagesField": { "type": "string", "description": "Field containing the total pages count (page pagination)." }, "totalField": { "type": "string", "description": "Field containing the total items count (offset pagination)." } }, "oneOf": [ { "properties": { "type": { "const": "cursor" } }, "required": [ "nextCursorField" ] }, { "properties": { "type": { "const": "page" } }, "required": [ "totalPagesField" ] }, { "properties": { "type": { "const": "offset" } }, "required": [ "limitParam", "totalField" ] } ] }, "databaseConnectionConfig": { "type": "object", "description": "Database connection configuration. Supports PostgreSQL, MySQL, SQLite.", "properties": { "connectionString": { "type": "string", "minLength": 1, "description": "Database connection string (DSN). Avoid in production - use connectionStringRef instead.", "examples": [ "postgres://user:pass@host:5432/db?sslmode=require", "user:pass@tcp(host:3306)/db?tls=true", "file:./database.db" ] }, "connectionStringRef": { "type": "string", "description": "Environment variable reference for connection string. Format: ${ENV_VAR_NAME}", "pattern": "^\\$\\{[A-Z_][A-Z0-9_]*\\}$", "examples": [ "${DATABASE_URL}", "${POSTGRES_CONNECTION_STRING}" ] }, "driver": { "type": "string", "description": "Database driver. Auto-detected from connection string if not specified.", "enum": [ "postgres", "mysql", "sqlite" ] }, "maxOpenConns": { "type": "integer", "description": "Maximum number of open connections in the pool.", "minimum": 1, "default": 10 }, "maxIdleConns": { "type": "integer", "description": "Maximum number of idle connections in the pool.", "minimum": 1, "default": 5 }, "connMaxLifetimeSeconds": { "type": "integer", "description": "Maximum lifetime of a connection in seconds.", "minimum": 1, "default": 1800 }, "connMaxIdleTimeSeconds": { "type": "integer", "description": "Maximum idle time for a connection in seconds.", "minimum": 1, "default": 300 }, "timeoutMs": { "type": "integer", "description": "Query timeout in milliseconds.", "minimum": 1, "default": 30000 } }, "oneOf": [ { "required": [ "connectionString" ] }, { "required": [ "connectionStringRef" ] } ] }, "sqlRequestBase": { "type": "object", "description": "Base configuration for SQL-based modules. Combines database connection with query or queryFile.", "properties": { "query": { "type": "string", "minLength": 1, "description": "SQL query. Supports module-specific placeholders (e.g. {{record.field}}, {{lastRunTimestamp}})." }, "queryFile": { "type": "string", "minLength": 1, "description": "Path to SQL file. Supports module-specific placeholders." } }, "allOf": [ { "$ref": "#/$defs/databaseConnectionConfig" }, { "oneOf": [ { "required": [ "query" ] }, { "required": [ "queryFile" ] } ] }, { "not": { "required": [ "query", "queryFile" ] } } ] }, "databasePaginationConfig": { "type": "object", "description": "Pagination configuration for database queries. Uses canonical 'param' field for both cursor and limit-offset placeholders.", "unevaluatedProperties": false, "properties": { "type": { "type": "string", "description": "Pagination type.", "enum": [ "limit-offset", "cursor" ] }, "limit": { "type": "integer", "description": "Number of records per page. Optional; runtime applies default when omitted.", "minimum": 1 }, "param": { "type": "string", "minLength": 1, "description": "Named placeholder used in the SQL query to inject the cursor or offset value (e.g. ':cursor' / ':offset')." }, "cursorField": { "type": "string", "minLength": 1, "description": "Field of the previous page's last record used as the next cursor value (cursor pagination only)." } }, "allOf": [ { "required": [ "type" ] }, { "oneOf": [ { "allOf": [ { "properties": { "type": { "const": "limit-offset" } } }, { "required": [ "param" ] } ] }, { "allOf": [ { "properties": { "type": { "const": "cursor" } } }, { "required": [ "cursorField", "param" ] } ] } ] } ] }, "databaseIncrementalConfig": { "type": "object", "description": "Incremental query configuration. Tracks last processed timestamp or ID.", "properties": { "enabled": { "type": "boolean", "description": "Enable incremental queries.", "default": false }, "timestampField": { "type": "string", "description": "Field name for timestamp-based incremental queries." }, "timestampParam": { "type": "string", "description": "Parameter name for timestamp in query." }, "idField": { "type": "string", "description": "Field name for ID-based incremental queries." }, "idParam": { "type": "string", "description": "Parameter name for ID in query." } }, "additionalProperties": false }, "statePersistenceConfig": { "type": "object", "description": "State persistence configuration for input modules. Tracks execution state between runs.", "properties": { "timestamp": { "type": "object", "description": "Timestamp-based state persistence.", "properties": { "enabled": { "type": "boolean", "description": "Enable timestamp persistence.", "default": false }, "queryParam": { "type": "string", "description": "Query parameter name for API filtering (e.g., 'updated_after')." } }, "additionalProperties": false }, "id": { "type": "object", "description": "ID-based state persistence.", "properties": { "enabled": { "type": "boolean", "description": "Enable ID persistence.", "default": false }, "field": { "type": "string", "description": "Field path to extract ID from records (supports dot notation)." }, "queryParam": { "type": "string", "description": "Query parameter name for API filtering (e.g., 'last_id')." } }, "additionalProperties": false }, "storagePath": { "type": "string", "description": "Custom storage directory path for state files." } }, "additionalProperties": false } } } ``` # Maintained YAML Examples These examples are copied from the Cannectors repository and validated by the test suite. ## schemas/examples/01-http-polling-basic-to-http-batch.yaml ```yaml name: http-polling-basic-to-http-batch version: 1.0.0 description: Poll a JSON array from an HTTP API and send it as one batch request. tags: - http-polling - http-output input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/orders headers: Accept: application/json dataField: orders filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch headers: Content-Type: application/json success: statusCodes: - 200 - 201 - 202 ``` ## schemas/examples/02-http-polling-page-pagination.yaml ```yaml name: http-polling-page-pagination version: 1.0.0 description: Poll page-number pagination and normalize fields before output. tags: - http-polling - pagination input: type: httpPolling schedule: "0 */1 * * *" endpoint: https://source.example.com/api/customers dataField: data pagination: type: page param: page limitParam: per_page limit: 100 totalPagesField: total_pages filters: - type: mapping mappings: - source: id target: customer.id transforms: - op: toString - source: email target: customer.email transforms: - op: trim - op: lowercase - source: name target: customer.name onMissing: useDefault defaultValue: Unknown output: type: httpRequest endpoint: https://destination.example.com/api/customers method: POST requestMode: batch ``` ## schemas/examples/03-http-polling-offset-pagination-state.yaml ```yaml name: http-polling-offset-pagination-state version: 1.0.0 description: Poll offset pagination with timestamp and ID state persistence. tags: - http-polling - state input: type: httpPolling schedule: "*/10 * * * *" endpoint: https://source.example.com/api/events dataField: events pagination: type: offset param: offset limitParam: limit limit: 250 totalField: total statePersistence: timestamp: enabled: true queryParam: updated_after id: enabled: true field: event.id queryParam: after_id storagePath: ./.cannectors-state filters: - type: set target: metadata.source value: events-api - type: remove target: - debug - internal.notes output: type: httpRequest endpoint: https://destination.example.com/api/events method: POST requestMode: batch ``` ## schemas/examples/04-http-polling-cursor-oauth2.yaml ```yaml name: http-polling-cursor-oauth2 version: 1.0.0 description: Poll cursor pagination with OAuth2 client credentials. tags: - http-polling - oauth2 input: type: httpPolling schedule: "0 */6 * * *" endpoint: https://source.example.com/api/invoices dataField: items authentication: type: oauth2 credentials: tokenUrl: https://source.example.com/oauth/token clientId: ${SOURCE_CLIENT_ID} clientSecret: ${SOURCE_CLIENT_SECRET} scope: invoices.read customers.read pagination: type: cursor param: cursor limitParam: limit limit: 100 nextCursorField: next_cursor filters: - type: condition expression: status == "paid" else: - type: drop output: type: httpRequest endpoint: https://destination.example.com/api/invoices method: POST requestMode: batch ``` ## schemas/examples/05-webhook-hmac-to-http-single.yaml ```yaml name: webhook-hmac-to-http-single version: 1.0.0 description: Receive signed webhook payloads and forward each record separately. tags: - webhook - hmac input: type: webhook path: /webhooks/orders listenAddress: 0.0.0.0:8080 dataField: orders signature: type: hmac-sha256 header: X-Webhook-Signature secret: ${WEBHOOK_SECRET} filters: - type: set target: metadata.received_by value: cannectors output: type: httpRequest endpoint: https://destination.example.com/api/orders/{orderId} method: PATCH requestMode: single keys: - field: id paramType: path paramName: orderId success: statusCodes: - 200 - 204 ``` ## schemas/examples/06-webhook-queue-rate-limit-to-database.yaml ```yaml name: webhook-queue-rate-limit-to-database version: 1.0.0 description: Receive queued webhooks with rate limiting and insert records into SQL. tags: - webhook - database-output input: type: webhook path: /webhooks/leads listenAddress: 0.0.0.0:8081 dataField: leads queueSize: 500 maxConcurrent: 4 rateLimit: requestsPerSecond: 20 burst: 40 filters: - type: mapping onError: skip mappings: - source: id target: lead_id - source: email target: email transforms: - op: trim - op: lowercase - source: source target: source onMissing: useDefault defaultValue: webhook output: type: database connectionStringRef: ${CRM_DATABASE_URL} driver: postgres query: | insert into leads (lead_id, email, source) values ({{record.lead_id}}, {{record.email}}, {{record.source}}) transaction: true ``` ## schemas/examples/07-database-input-basic-to-http.yaml ```yaml name: database-input-basic-to-http version: 1.0.0 description: Read rows from a database and send them as an HTTP batch. tags: - database-input - http-output input: type: database connectionStringRef: ${SOURCE_DATABASE_URL} driver: postgres query: | select id, email, updated_at from customers where active = true order by updated_at asc filters: - type: mapping mappings: - source: id target: id transforms: - op: toString - source: email target: email transforms: - op: lowercase - source: updated_at target: updatedAt transforms: - op: dateFormat format: YYYY-MM-DDTHH:mm:ss output: type: httpRequest endpoint: https://destination.example.com/api/customers/sync method: POST requestMode: batch ``` ## schemas/examples/08-database-input-limit-offset-to-database.yaml ```yaml name: database-input-limit-offset-to-database version: 1.0.0 description: Read a paginated SQL source and write rows into another database. tags: - database-input - pagination input: type: database connectionStringRef: ${SOURCE_DATABASE_URL} driver: postgres query: | select id, sku, quantity from inventory order by id asc limit :limit offset :offset pagination: type: limit-offset limit: 500 param: offset filters: - type: set target: sync_source value: source-db output: type: database connectionStringRef: ${WAREHOUSE_DATABASE_URL} driver: postgres query: | insert into inventory_snapshot (id, sku, quantity, sync_source) values ({{record.id}}, {{record.sku}}, {{record.quantity}}, {{record.sync_source}}) transaction: true ``` ## schemas/examples/09-database-input-cursor-incremental.yaml ```yaml name: database-input-cursor-incremental version: 1.0.0 description: Read database rows incrementally using cursor pagination metadata. tags: - database-input - incremental input: type: database connectionStringRef: ${SOURCE_DATABASE_URL} driver: postgres query: | select id, customer_id, total, updated_at from orders where updated_at > {{lastRunTimestamp}} and id > :last_id order by id asc limit :limit pagination: type: cursor limit: 200 cursorField: id param: last_id incremental: enabled: true timestampField: updated_at timestampParam: lastRunTimestamp idField: id idParam: last_id filters: - type: mapping mappings: - source: id target: order.id - source: customer_id target: order.customerId - source: total target: order.total transforms: - op: toFloat output: type: httpRequest endpoint: https://destination.example.com/api/orders method: POST requestMode: batch ``` ## schemas/examples/10-mapping-transforms-all.yaml ```yaml name: mapping-transforms-all version: 1.0.0 description: Demonstrate every mapping transform implemented by the mapping filter. tags: - mapping - transforms input: type: httpPolling schedule: "0 * * * *" endpoint: https://source.example.com/api/raw-records dataField: records filters: - type: mapping onError: log mappings: - source: name target: name transforms: - op: trim - op: uppercase - source: email target: email transforms: - op: trim - op: lowercase - source: phone target: phone transforms: - op: replace pattern: "[^0-9+]" replacement: "" - source: created_at target: createdDate transforms: - op: dateFormat format: YYYY-MM-DD - source: amount target: amountInt transforms: - op: toInt - source: score target: scoreFloat transforms: - op: toFloat - source: active target: active transforms: - op: toBool - source: tags_csv target: tags transforms: - op: split separator: "," - source: tag_list target: tagString transforms: - op: join separator: "|" - source: external_id target: externalId transforms: - op: toString - source: payload target: payloadArray transforms: - op: toArray - source: attributes target: attributes transforms: - op: toObject - target: deprecatedField output: type: httpRequest endpoint: https://destination.example.com/api/normalized-records method: POST requestMode: batch ``` ## schemas/examples/11-condition-nested-routing.yaml ```yaml name: condition-nested-routing version: 1.0.0 description: Route records with condition filters and nested filter modules. tags: - condition - nested-filters input: type: httpPolling schedule: "*/30 * * * *" endpoint: https://source.example.com/api/payments dataField: payments filters: - type: condition expression: status == "paid" && amount > 0 then: - type: set target: routing.bucket value: billable - type: mapping mappings: - source: id target: payment.id - source: amount target: payment.amount else: - type: set target: routing.bucket value: review - type: remove target: - card.number - card.cvv output: type: httpRequest endpoint: https://destination.example.com/api/payments/routed method: POST requestMode: batch ``` ## schemas/examples/12-script-inline-transform.yaml ```yaml name: script-inline-transform version: 1.0.0 description: Transform records using inline JavaScript. tags: - script - inline input: type: httpPolling schedule: "0 * * * *" endpoint: https://source.example.com/api/subscriptions dataField: subscriptions filters: - type: script onError: skip script: | function transform(record) { record.normalizedStatus = String(record.status || "").toLowerCase(); record.isActive = record.normalizedStatus === "active"; record.planCode = String(record.plan || "unknown").toUpperCase(); return record; } output: type: httpRequest endpoint: https://destination.example.com/api/subscriptions method: POST requestMode: batch ``` ## schemas/examples/13-script-file-transform.yaml ```yaml name: script-file-transform version: 1.0.0 description: Transform records with a JavaScript file referenced by scriptFile. tags: - script - script-file input: type: httpPolling schedule: "0 */2 * * *" endpoint: https://source.example.com/api/customers dataField: customers filters: - type: script scriptFile: examples/assets/scripts/customer_enrichment.js output: type: httpRequest endpoint: https://destination.example.com/api/customers/enriched method: POST requestMode: batch ``` ## schemas/examples/14-http-call-get-merge-cache.yaml ```yaml name: http-call-get-merge-cache version: 1.0.0 description: Enrich each record with an HTTP GET response and merge it recursively. tags: - http-call - cache input: type: httpPolling schedule: "*/20 * * * *" endpoint: https://source.example.com/api/orders dataField: orders filters: - type: http_call endpoint: https://profiles.example.com/api/customers/{customerId} method: GET keys: - field: customer.id paramType: path paramName: customerId dataField: profile mergeStrategy: merge cache: enabled: true maxSize: 10000 ttlSeconds: 900 key: customer.id output: type: httpRequest endpoint: https://destination.example.com/api/orders/enriched method: POST requestMode: batch ``` ## schemas/examples/15-http-call-query-header-append.yaml ```yaml name: http-call-query-header-append version: 1.0.0 description: Use query and header keys for an HTTP enrichment call and append the response. tags: - http-call - append input: type: httpPolling schedule: "*/20 * * * *" endpoint: https://source.example.com/api/tickets dataField: tickets filters: - type: http_call endpoint: https://support.example.com/api/sla method: GET headers: X-Tenant: "{{record.tenantId}}" keys: - field: priority paramType: query paramName: priority - field: tenantId paramType: header paramName: X-Tenant-Id mergeStrategy: append cache: enabled: true maxSize: 1000 ttlSeconds: 300 output: type: httpRequest endpoint: https://destination.example.com/api/tickets method: POST requestMode: batch ``` ## schemas/examples/16-http-call-post-template-replace.yaml ```yaml name: http-call-post-template-replace version: 1.0.0 description: Use POST enrichment with a body template and replace overlapping response fields. tags: - http-call - body-template input: type: httpPolling schedule: "0 * * * *" endpoint: https://source.example.com/api/addresses dataField: addresses filters: - type: http_call endpoint: https://geo.example.com/api/normalize method: POST headers: X-Request-Source: cannectors bodyTemplateFile: examples/assets/templates/geocode_request.json dataField: result mergeStrategy: replace cache: enabled: true maxSize: 5000 ttlSeconds: 86400 key: address.hash output: type: httpRequest endpoint: https://destination.example.com/api/addresses/normalized method: POST requestMode: batch ``` ## schemas/examples/17-sql-call-merge-cache.yaml ```yaml name: sql-call-merge-cache version: 1.0.0 description: Enrich records with a SQL lookup and deep merge the result. tags: - sql-call - cache input: type: httpPolling schedule: "*/30 * * * *" endpoint: https://source.example.com/api/orders dataField: orders filters: - type: sql_call connectionStringRef: ${REFERENCE_DATABASE_URL} driver: postgres query: | select segment, account_owner from customer_reference where customer_id = {{record.customer.id}} limit 1 mergeStrategy: merge cache: enabled: true maxSize: 2000 ttlSeconds: 600 key: "customer:{{record.customer.id}}" output: type: httpRequest endpoint: https://destination.example.com/api/orders/enriched method: POST requestMode: batch ``` ## schemas/examples/18-sql-call-append-query-file.yaml ```yaml name: sql-call-append-query-file version: 1.0.0 description: Use queryFile for SQL enrichment and append the lookup result under a custom key. tags: - sql-call - query-file input: type: httpPolling schedule: "0 * * * *" endpoint: https://source.example.com/api/customers dataField: customers filters: - type: sql_call connectionStringRef: ${REFERENCE_DATABASE_URL} driver: postgres queryFile: examples/assets/sql/customer_lookup.sql mergeStrategy: append resultKey: reference cache: enabled: true maxSize: 1000 ttlSeconds: 300 key: "customer:{{record.id}}" output: type: httpRequest endpoint: https://destination.example.com/api/customers/reference method: POST requestMode: batch ``` ## schemas/examples/19-http-output-single-template.yaml ```yaml name: http-output-single-template version: 1.0.0 description: Send one templated HTTP request per record with path, query, and header keys. tags: - http-output - body-template input: type: httpPolling schedule: "*/10 * * * *" endpoint: https://source.example.com/api/tasks dataField: tasks filters: - type: remove target: - internal - debug output: type: httpRequest endpoint: https://destination.example.com/api/projects/{projectId}/tasks/{{record.id}} method: PUT requestMode: single bodyTemplateFile: examples/assets/templates/task_update.json queryParams: source: cannectors keys: - field: project.id paramType: path paramName: projectId - field: updatedAt paramType: query paramName: updated_at - field: tenantId paramType: header paramName: X-Tenant-Id headers: Content-Type: application/json X-Trace-Source: "{{record.source | default: \"unknown\"}}" success: statusCodes: - 200 - 204 ``` ## schemas/examples/20-http-output-retry-auth-api-key.yaml ```yaml name: http-output-retry-auth-api-key version: 1.0.0 description: Send records with API key authentication and output retry handling. tags: - http-output - retry input: type: httpPolling schedule: "*/5 * * * *" endpoint: https://source.example.com/api/shipments dataField: shipments filters: - type: condition expression: id != nil else: - type: drop output: type: httpRequest endpoint: https://destination.example.com/api/shipments method: POST requestMode: batch authentication: type: api-key credentials: key: ${DESTINATION_API_KEY} location: header headerName: X-API-Key retry: maxAttempts: 3 delayMs: 500 backoffMultiplier: 2 maxDelayMs: 5000 retryableStatusCodes: - 408 - 429 - 500 - 502 - 503 - 504 useRetryAfterHeader: true retryHintFromBody: body.retryable == true success: statusCodes: - 200 - 201 - 202 ``` ## schemas/examples/21-database-output-transaction-query-file.yaml ```yaml name: database-output-transaction-query-file version: 1.0.0 description: Write records with a SQL query file in a transaction. tags: - database-output - transaction input: type: httpPolling schedule: "0 * * * *" endpoint: https://source.example.com/api/products dataField: products filters: - type: mapping mappings: - source: id target: product_id - source: sku target: sku - source: price target: price transforms: - op: toFloat - source: updated_at target: updated_at output: type: database connectionStringRef: ${WAREHOUSE_DATABASE_URL} driver: postgres queryFile: examples/assets/sql/upsert_product.sql transaction: true onError: fail ``` ## schemas/examples/22-defaults-inheritance.yaml ```yaml name: defaults-inheritance version: 1.0.0 description: Apply top-level timeout, retry, and onError defaults across modules. tags: - defaults - retry defaults: timeoutMs: 15000 onError: log retry: maxAttempts: 2 delayMs: 1000 backoffMultiplier: 2 maxDelayMs: 10000 retryableStatusCodes: - 429 - 500 - 503 useRetryAfterHeader: true input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/accounts dataField: accounts filters: - type: mapping onError: skip mappings: - source: id target: account.id - source: name target: account.name output: type: httpRequest endpoint: https://destination.example.com/api/accounts method: POST requestMode: batch retry: maxAttempts: 4 ``` ## schemas/examples/23-auth-basic-bearer-query-key.yaml ```yaml name: auth-basic-bearer-query-key version: 1.0.0 description: Combine bearer input authentication, basic enrichment auth, and query API key output auth. tags: - authentication input: type: httpPolling schedule: "*/30 * * * *" endpoint: https://source.example.com/api/accounts dataField: accounts authentication: type: bearer credentials: token: ${SOURCE_BEARER_TOKEN} filters: - type: http_call endpoint: https://directory.example.com/api/accounts/{accountId} method: GET authentication: type: basic credentials: username: ${DIRECTORY_USERNAME} password: ${DIRECTORY_PASSWORD} keys: - field: id paramType: path paramName: accountId mergeStrategy: merge cache: enabled: true maxSize: 1000 ttlSeconds: 600 output: type: httpRequest endpoint: https://destination.example.com/api/accounts method: POST requestMode: batch authentication: type: api-key credentials: key: ${DESTINATION_API_KEY} location: query paramName: api_key ``` ## schemas/examples/24-empty-filter-pass-through.yaml ```yaml name: empty-filter-pass-through version: 1.0.0 description: Pass records from input to output without filter modules. tags: - pass-through input: type: httpPolling schedule: "0 * * * *" endpoint: https://source.example.com/api/items dataField: items filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/items method: POST requestMode: batch ``` ## schemas/examples/25-loop-cells-extraction.yaml ```yaml name: loop-cells-extraction version: 1.0.0 description: Iterate over a cells[] array and extract values into flat record fields by columnId. tags: - loop - smartsheet input: type: httpPolling schedule: "*/15 * * * *" endpoint: https://source.example.com/api/rows dataField: rows filters: - type: loop field: cells itemName: cell filters: - type: condition expression: cell.columnId == 1 then: - type: mapping mappings: - source: cell.displayValue target: record.eventId - type: condition expression: cell.columnId == 2 then: - type: mapping mappings: - source: cell.displayValue target: record.coordinates - type: condition expression: cell.columnId == 3 then: - type: mapping mappings: - source: cell.displayValue target: record.customerName - type: remove target: - cells output: type: httpRequest endpoint: https://destination.example.com/api/events method: POST requestMode: single ``` ## schemas/examples/40-soap-polling-basic-v11.yaml ```yaml name: soap-polling-basic-v11 version: 1.0.0 description: Poll a SOAP 1.1 operation and forward extracted records as an HTTP batch. tags: - soap - soap-polling input: type: soapPolling endpoint: https://soap.example.com/orders/v11 soapVersion: "1.1" soapAction: urn:ListOrders operation: ListOrders body: | ready dataField: Envelope.Body.ListOrdersResponse.Orders.Order filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch headers: Content-Type: application/json ``` ## schemas/examples/40b-soap-polling-basic-v12.yaml ```yaml name: soap-polling-basic-v12 version: 1.0.0 description: Poll a SOAP 1.2 operation. The SOAP action is sent as a content-type parameter. tags: - soap - soap12 input: type: soapPolling endpoint: https://soap.example.com/orders/v12 soapVersion: "1.2" soapAction: urn:ListOrders operation: ListOrders body: | ready dataField: Envelope.Body.ListOrdersResponse.Orders.Order filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch headers: Content-Type: application/json ``` ## schemas/examples/41-soap-polling-cursor.yaml ```yaml name: soap-polling-cursor version: 1.0.0 description: Poll a cursor-paginated SOAP operation by injecting the cursor into the XML body. tags: - soap - pagination input: type: soapPolling endpoint: https://soap.example.com/orders/cursor soapAction: urn:ListOrdersPage operation: ListOrdersPage body: | {{record.pagination.cursor | default: ""}} 100 dataField: Envelope.Body.ListOrdersPageResponse.Orders.Order pagination: type: cursor param: cursor nextCursorField: Envelope.Body.ListOrdersPageResponse.NextCursor filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/orders/import method: POST requestMode: batch headers: Content-Type: application/json ``` ## schemas/examples/42-soap-call-enrichment.yaml ```yaml name: soap-call-enrichment version: 1.0.0 description: Poll orders every five minutes, enrich each record with a SOAP lookup, cache responses, and append the SOAP result under a result key. tags: - soap - enrichment input: type: httpPolling schedule: "*/5 * * * *" endpoint: https://source.example.com/api/orders dataField: orders filters: - type: soap_call endpoint: https://soap.example.com/customers soapAction: urn:GetCustomer operation: GetCustomer body: | {{record.customerId}} dataField: Envelope.Body.GetCustomerResponse.Customer mergeStrategy: append resultKey: customer cache: enabled: true maxSize: 1000 ttlSeconds: 300 key: "{{record.customerId}}" output: type: httpRequest endpoint: https://destination.example.com/api/orders/enriched method: POST requestMode: batch headers: Content-Type: application/json ``` ## schemas/examples/43-soap-output-batch.yaml ```yaml name: soap-output-batch version: 1.0.0 description: Send records to a SOAP operation in batch mode. tags: - soap - soap-output input: type: webhook path: /webhooks/orders filters: [] output: type: soapRequest endpoint: https://soap.example.com/import soapAction: urn:ImportOrders operation: ImportOrders requestMode: batch body: | {{record.recordCount}} {{record.records[0].orderId | default: ""}} success: statusCodes: - 200 - 202 ``` ## schemas/examples/44-soap-output-mtom-emission.yaml ```yaml name: soap-output-mtom-emission version: 1.0.0 description: Send a SOAP request with a base64 record field decoded into an outgoing MTOM attachment. tags: - soap - mtom input: type: webhook path: /webhooks/documents filters: [] output: type: soapRequest endpoint: https://soap.example.com/documents/upload soapAction: urn:UploadDocument operation: UploadDocument requestMode: single body: | {{record.documentId}} mtom: enabled: true attachments: - contentId: "{{record.documentId}}" contentType: application/pdf sourceField: documentBase64 encoding: base64 ``` ## schemas/examples/44b-soap-input-mtom-reception.yaml ```yaml name: soap-input-mtom-reception version: 1.0.0 description: Poll a SOAP operation whose response may be multipart/related MTOM; parsed attachments are exposed under each record's _soapAttachments map. tags: - soap - mtom input: type: soapPolling endpoint: https://soap.example.com/documents/download soapAction: urn:ListDocuments operation: ListDocuments body: | inbox dataField: Envelope.Body.ListDocumentsResponse.Documents.Document filters: [] output: type: httpRequest endpoint: https://destination.example.com/api/documents method: POST requestMode: batch headers: Content-Type: application/json # Runtime convention for received MTOM parts: # each returned record includes _soapAttachments[contentId] with contentId, # contentType, and data ([]byte). Phase 5 docs show mapping patterns for # copying these values into domain-specific fields. ``` ## schemas/examples/45-soap-output-wssecurity-passwordtext.yaml ```yaml name: soap-output-wssecurity-passwordtext version: 1.0.0 description: Send a SOAP request using WS-Security UsernameToken PasswordText. tags: - soap - ws-security input: type: webhook path: /webhooks/orders filters: [] output: type: soapRequest endpoint: https://soap.example.com/secure/orders soapAction: urn:SubmitOrder operation: SubmitOrder requestMode: single body: | {{record.orderId}} wsSecurity: username: soap-user password: ${SOAP_PASSWORD} passwordType: PasswordText mustUnderstand: true ``` ## schemas/examples/45b-soap-output-wssecurity-passworddigest.yaml ```yaml name: soap-output-wssecurity-passworddigest version: 1.0.0 description: Send a SOAP request using WS-Security UsernameToken PasswordDigest. tags: - soap - ws-security input: type: webhook path: /webhooks/orders filters: [] output: type: soapRequest endpoint: https://soap.example.com/secure/orders soapAction: urn:SubmitOrder operation: SubmitOrder requestMode: single body: | {{record.orderId}} wsSecurity: username: soap-user password: ${SOAP_PASSWORD} passwordType: PasswordDigest mustUnderstand: true ```