cannectors

Module extensibility

Add a custom input, filter, or output module to Cannectors.

Adding a module is a four-step process: implement the interface, declare the JSON Schema, register the module type, ship tests.

The interfaces

Each module group has its own interface in internal/modules/.

Input

type Input interface {
  // Fetch reads the next batch. For scheduled inputs, called on each
  // CRON tick. For one-shot inputs, called exactly once.
  Fetch(ctx context.Context) ([]Record, StateDelta, error)

  // Schedule returns the CRON expression, or "" for one-shot inputs.
  Schedule() string
}

Webhook-style inputs use a slightly different callback-based shape — see webhook.go for the pattern.

Filter

type Filter interface {
  // Process transforms or drops a single record. Returning
  // (nil, nil) drops the record from the chain.
  Process(ctx context.Context, r Record) (Record, error)
}

Output

type Output interface {
  // Send delivers a batch (requestMode: batch) or a single record
  // wrapped in a one-element slice (requestMode: single).
  Send(ctx context.Context, batch []Record) error

  // Preview renders what would be sent without actually sending.
  // Called only in --dry-run mode.
  Preview(ctx context.Context, batch []Record) (string, error)
}

Walkthrough — a new input module

Let's add a hypothetical kafkaConsumer input.

1. Create the package

internal/modules/input/kafka_consumer.go
internal/modules/input/kafka_consumer_test.go

2. Define the config struct

type kafkaConsumerConfig struct {
  Brokers   []string `yaml:"brokers"`
  Topic     string   `yaml:"topic"`
  GroupID   string   `yaml:"groupId"`
  // …
}

3. Implement Input

type kafkaConsumer struct {
  cfg    kafkaConsumerConfig
  client *kafka.Reader
}

func newKafkaConsumer(cfg kafkaConsumerConfig) (*kafkaConsumer, error) {
  // construct the kafka reader
  return &kafkaConsumer{cfg: cfg, client: client}, nil
}

func (k *kafkaConsumer) Fetch(ctx context.Context) ([]Record, StateDelta, error) {
  // poll until ctx is done or a batch is full
  // return records + offset delta
}

func (k *kafkaConsumer) Schedule() string {
  return "" // long-running, not CRON-driven
}

4. Register the type

In internal/modules/input/input.go:

var registry = map[string]func(cfg map[string]any) (Input, error){
  "httpPolling":   newHTTPPolling,
  "webhook":       newWebhook,
  "database":      newDatabase,
  "kafkaConsumer": newKafkaConsumer, // ← here
}

5. Declare the schema

Add to internal/config/schema/input-schema.json:

{
  "$defs": {
    "kafkaConsumerInputConfig": {
      "type": "object",
      "required": ["brokers", "topic", "groupId"],
      "properties": {
        "brokers": {
          "type": "array",
          "items": { "type": "string" },
          "minItems": 1
        },
        "topic":   { "type": "string", "minLength": 1 },
        "groupId": { "type": "string", "minLength": 1 }
      }
    },

    "inputModule": {
      "allOf": [
        { "$ref": "common-schema.json#/$defs/moduleBase" },
        {
          "oneOf": [
            ...existing branches...,
            { "allOf": [
              { "$ref": "#/$defs/kafkaConsumerInputConfig" },
              { "properties": { "type": { "const": "kafkaConsumer" } } }
            ] }
          ]
        }
      ]
    }
  }
}

6. Tests

At minimum:

  • A contract test that the new module satisfies Input.
  • A schema test confirming a valid YAML parses and an invalid one fails.
  • An integration test using a real Kafka container (or a Sarama mock).

The existing modules' *_test.go files are the templates to follow.

Documentation

Once the module is in main:

  1. pnpm sync-schemas ../cannectors in the docs repo.
  2. Add a content/docs/modules/inputs/kafka-consumer/index.mdx with a minimal example + <ModuleOptionsTable group="input" type="kafkaConsumer" />.
  3. pnpm generate-subpages to create sub-pages for any complex options.

The options table renders itself from the schema you just added — zero manual table-writing.

Conventions

  • Module type names are camelCase for input/output families (httpPolling, httpRequest, soapPolling, soapRequest) and snake_case for per-record call filters (http_call, soap_call, sql_call). Pick one consistent with the existing peers.
  • Config field names are camelCase in YAML, mapped via yaml:"…" tags.
  • Module package names match the type, with underscores → no dashes in package names.

See also