Module extensibility
Add a custom input, filter, or output module to Cannectors.
Adding a module is a four-step process: implement the interface, declare the JSON Schema, register the module type, ship tests.
The interfaces
Each module group has its own interface in internal/modules/.
Input
type Input interface {
// Fetch reads the next batch. For scheduled inputs, called on each
// CRON tick. For one-shot inputs, called exactly once.
Fetch(ctx context.Context) ([]Record, StateDelta, error)
// Schedule returns the CRON expression, or "" for one-shot inputs.
Schedule() string
}Webhook-style inputs use a slightly different callback-based shape —
see webhook.go for the pattern.
Filter
type Filter interface {
// Process transforms or drops a single record. Returning
// (nil, nil) drops the record from the chain.
Process(ctx context.Context, r Record) (Record, error)
}Output
type Output interface {
// Send delivers a batch (requestMode: batch) or a single record
// wrapped in a one-element slice (requestMode: single).
Send(ctx context.Context, batch []Record) error
// Preview renders what would be sent without actually sending.
// Called only in --dry-run mode.
Preview(ctx context.Context, batch []Record) (string, error)
}Walkthrough — a new input module
Let's add a hypothetical kafkaConsumer input.
1. Create the package
internal/modules/input/kafka_consumer.go
internal/modules/input/kafka_consumer_test.go2. Define the config struct
type kafkaConsumerConfig struct {
Brokers []string `yaml:"brokers"`
Topic string `yaml:"topic"`
GroupID string `yaml:"groupId"`
// …
}3. Implement Input
type kafkaConsumer struct {
cfg kafkaConsumerConfig
client *kafka.Reader
}
func newKafkaConsumer(cfg kafkaConsumerConfig) (*kafkaConsumer, error) {
// construct the kafka reader
return &kafkaConsumer{cfg: cfg, client: client}, nil
}
func (k *kafkaConsumer) Fetch(ctx context.Context) ([]Record, StateDelta, error) {
// poll until ctx is done or a batch is full
// return records + offset delta
}
func (k *kafkaConsumer) Schedule() string {
return "" // long-running, not CRON-driven
}4. Register the type
In internal/modules/input/input.go:
var registry = map[string]func(cfg map[string]any) (Input, error){
"httpPolling": newHTTPPolling,
"webhook": newWebhook,
"database": newDatabase,
"kafkaConsumer": newKafkaConsumer, // ← here
}5. Declare the schema
Add to internal/config/schema/input-schema.json:
{
"$defs": {
"kafkaConsumerInputConfig": {
"type": "object",
"required": ["brokers", "topic", "groupId"],
"properties": {
"brokers": {
"type": "array",
"items": { "type": "string" },
"minItems": 1
},
"topic": { "type": "string", "minLength": 1 },
"groupId": { "type": "string", "minLength": 1 }
}
},
"inputModule": {
"allOf": [
{ "$ref": "common-schema.json#/$defs/moduleBase" },
{
"oneOf": [
...existing branches...,
{ "allOf": [
{ "$ref": "#/$defs/kafkaConsumerInputConfig" },
{ "properties": { "type": { "const": "kafkaConsumer" } } }
] }
]
}
]
}
}
}6. Tests
At minimum:
- A contract test that the new module satisfies
Input. - A schema test confirming a valid YAML parses and an invalid one fails.
- An integration test using a real Kafka container (or a Sarama mock).
The existing modules' *_test.go files are the templates to follow.
Documentation
Once the module is in main:
pnpm sync-schemas ../cannectorsin the docs repo.- Add a
content/docs/modules/inputs/kafka-consumer/index.mdxwith a minimal example +<ModuleOptionsTable group="input" type="kafkaConsumer" />. pnpm generate-subpagesto create sub-pages for any complex options.
The options table renders itself from the schema you just added — zero manual table-writing.
Conventions
- Module type names are camelCase for input/output families
(
httpPolling,httpRequest,soapPolling,soapRequest) and snake_case for per-record call filters (http_call,soap_call,sql_call). Pick one consistent with the existing peers. - Config field names are
camelCasein YAML, mapped viayaml:"…"tags. - Module package names match the type, with underscores → no dashes in package names.