A Kubebuilder-based Kubernetes operator SDK for building flow-based workflow engines. This SDK provides the complete infrastructure for developing modular operators that can be composed into visual workflows.
TinySystems Module SDK enables developers to create module operators (like common-module, http-module, grpc-module) that bring specific functionality into a Kubernetes-native flow engine. Each module provides reusable components that can be connected through a port-based architecture to create complex workflows.
{{expression}} syntax with JSONPath for flexible data mappingThe SDK is built around several key abstractions:
TinyNode: Core execution unit representing a component instance in a flow
TinyModule: Registry of available modules for service discovery
TinyFlow: Logical grouping of nodes representing a workflow
TinyProject: Top-level organizational unit grouping multiple flows
TinySignal: External trigger mechanism for node execution
TinyTracker: Execution monitoring for detailed tracing
TinyWidgetPage: Custom UI dashboard pages for visualization
Components are the building blocks of workflows. Each component:
Component interface (module/component.go)Handle() methodStandard message flow:
External Trigger (TinySignal)
→ TinySignal Controller
→ Scheduler.Handle()
→ Runner.MsgHandler()
→ Component.Handle()
→ output() callback
→ Next node via edges
→ TinySignal deleted (one-off)
Ports enable component communication:
_reconcile: Triggers node reconciliation_client: Receives Kubernetes client for resource operations_settings: Configuration port_control: Dashboard control portEach port can have:
Edges connect ports between nodes and support data transformation using mustache-style expressions:
Expression Syntax: Use {{expression}} to evaluate JSONPath expressions against incoming data.
{
"body": "{{$.request.body}}",
"statusCode": 200,
"greeting": "Hello {{$.user.name}}!",
"isAdmin": "{{$.user.role == 'admin'}}"
}
Expression Types:
"{{$.field}}") - Returns the actual type (string, number, boolean, object)"hello", 200, true) - Static values passed through as-is"Hello {{$.name}}!") - Embeds expression results in strings"{{$.count + 1}}", "{{$.method == 'GET'}}") - Supports arithmetic and comparisonKey Features:
"{{$.count}}" returns a number, not a stringnilBuilt-in Functions:
The expression engine includes many built-in functions:
| Category | Functions |
|---|---|
| Array | length(), first(), last(), avg(), sum(), size() |
| String | upper(), lower(), trim(), reverse(), b64encode(), b64decode() |
| String (multi-arg) | split(str, sep), join(arr, sep), contains(str, sub), hasprefix(str, prefix), hassuffix(str, suffix), replace(str, old, new), substr(str, start[, len]), index(str, sub) |
| Math | abs(), ceil(), floor(), round(), sqrt(), pow10() |
| Logic | not(), ternary ? : |
String Function Examples:
{
"kind": "{{first(split($.target, '/'))}}",
"name": "{{last(split($.target, '/'))}}",
"hasNamespace": "{{contains($.args, '-n ')}}",
"upperName": "{{upper($.name)}}",
"label": "{{replace($.label, '=', ': ')}}",
"domain": "{{first(split(last(split($.url, '//')), '/'))}}",
"message": "{{hasprefix($.error, 'NotFound') ? 'Resource not found' : $.error}}"
}
The SDK provides several packages for module developers:
/module/ - Core InterfacesComponent: Main interface for component implementationPort: Port definitions and configurationHandler: Callback function type for message routing/pkg/resource/ - Resource ManagerUnified Kubernetes client providing:
/pkg/schema/ - JSON Schema Generatorconfigurable, shared, propertyOrder, tab, align/pkg/evaluator/ - Expression Evaluator{{expression}} syntax processing/pkg/metrics/ - Observability/internal/scheduler/ - Message Routing/internal/client/ - Client Pool/cli/ - Command-Line Toolsrun: Complete operator runtimebuild: Module building and publishingWhen nodes belong to the same module, messages are routed directly through the scheduler for optimal performance.
When nodes belong to different modules:
PermanentError wrapper to stop retries{{expression}} syntax with JSONPath enables flexible data mapping without code changesshared:true or configurable:true for cross-node type safetyThe SDK supports horizontal scaling of module operators with leader election for coordination.
k8s.io/client-go/tools/leaderelection with Lease resourcesHOSTNAME environment variableisLeader atomic boolean.
├── api/v1alpha1/ # CRD definitions (TinyNode, TinyModule, TinyFlow, etc.)
├── module/ # Core SDK interfaces for component developers
│ ├── component.go # Component interface
│ ├── node.go # Port definitions
│ └── handler.go # Handler function type
├── pkg/ # Reusable SDK packages
│ ├── resource/ # Kubernetes resource manager
│ ├── schema/ # JSON schema generator
│ ├── evaluator/ # JSONPath expression evaluator
│ ├── errors/ # Error handling utilities
│ └── metrics/ # OpenTelemetry integration
├── internal/ # Internal operator implementation
│ ├── controller/ # Kubernetes controllers
│ ├── scheduler/ # Message routing and execution
│ ├── server/ # gRPC server
│ └── client/ # gRPC client pool
├── cli/ # Command-line tools (run, build)
├── registry/ # Component registration system
├── config/ # Kubernetes manifests and CRD definitions
│ ├── crd/ # CRD YAML files
│ ├── samples/ # Example resources
│ └── rbac/ # RBAC configurations
└── charts/ # Helm charts for deployment
You’ll need a Kubernetes cluster to run against. You can use KIND to get a local cluster for testing, or run against a remote cluster.
Note: Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster kubectl cluster-info shows).
helm repo add tinysystems https://tiny-systems.github.io/module/
helm repo update # if you already added repo before
helm install my-corp-data-processing-tools --set controllerManager.manager.image.repository=registry.mycorp/tools/data-processing tinysystems/tinysystems-operator
kubectl apply -f config/samples/
IMG:make docker-build docker-push IMG=<some-registry>/operator:tag
IMG:make deploy IMG=<some-registry>/operator:tag
UnDeploy the controller from the cluster:
make undeploy
// TODO(user): Add detailed information on how you would like others to contribute to this project
This project aims to follow the Kubernetes Operator pattern.
It uses Controllers, which provide a reconcile function responsible for synchronizing resources until the desired state is reached on the cluster.
make install
make run
NOTE: You can also run this in one step by running: make install run
If you are editing the API definitions, generate the manifests such as CRs or CRDs using:
make manifests
kubebuilder create api --group operator --version v1alpha1 --kind TinySignal
NOTE: Run make --help for more information on all potential make targets
More information can be found via the Kubebuilder Documentation
This SDK provides everything you need to build custom module operators. Here's a complete guide to developing your own module.
mkdir my-module
cd my-module
go mod init github.com/myorg/my-module
go get github.com/tiny-systems/module
Create components/hello.go:
package components
import (
"context"
"github.com/tiny-systems/module/module"
)
type Hello struct{}
// Configuration for the input port
type HelloInput struct {
Name string `json:"name" configurable:"true"`
}
// Configuration for the output
type HelloOutput struct {
Greeting string `json:"greeting" shared:"true"`
}
func (h *Hello) Instance() module.Component {
return &Hello{}
}
func (h *Hello) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: "hello",
Description: "Greets a person by name",
Info: "Simple greeting component example",
Tags: []string{"example", "greeting"},
}
}
func (h *Hello) Ports() []module.Port {
return []module.Port{
{
Name: "input",
Label: "Input",
Source: false, // This is an input port
Position: module.Left,
Configuration: &HelloInput{},
},
{
Name: "output",
Label: "Output",
Source: true, // This is an output port
Position: module.Right,
Configuration: &HelloOutput{},
},
{
Name: "error",
Label: "Error",
Source: true,
Position: module.Bottom,
},
}
}
func (h *Hello) Handle(ctx context.Context, output module.Handler, port string, message any) any {
if port == "input" {
// Parse input configuration
input := message.(*HelloInput)
// Create greeting
greeting := "Hello, " + input.Name + "!"
// Send to output port
output(ctx, "output", &HelloOutput{
Greeting: greeting,
})
}
return nil
}
Create main.go:
package main
import (
"github.com/myorg/my-module/components"
"github.com/tiny-systems/module/cli"
"github.com/tiny-systems/module/registry"
)
func main() {
// Register all components
registry.Register(&components.Hello{})
// Run the operator
cli.Run()
}
# Build
go build -o my-module
# Run locally (connects to your current kubectl context)
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
# Build and push Docker image
docker build -t myregistry/my-module:1.0.0 .
docker push myregistry/my-module:1.0.0
# Install using Helm
helm repo add tinysystems https://tiny-systems.github.io/module/
helm install my-module \
--set controllerManager.manager.image.repository=myregistry/my-module \
--set controllerManager.manager.image.tag=1.0.0 \
tinysystems/tinysystems-operator
func (c *MyComponent) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: "my-component", // Unique identifier
Description: "Does something", // Short description
Info: "Detailed info...", // Long description
Tags: []string{"tag1"}, // Searchable tags
}
}
Ports define how components connect to each other:
func (c *MyComponent) Ports() []module.Port {
return []module.Port{
{
Name: "input", // Unique port name
Label: "Input Data", // Display label
Source: false, // Input port
Position: module.Left, // Visual position
Configuration: &InputConfig{}, // Expected data structure
},
{
Name: "output",
Label: "Output Data",
Source: true, // Output port
Position: module.Right,
Configuration: &OutputConfig{}, // Output data structure
},
}
}
Port Positions: module.Top, module.Right, module.Bottom, module.Left
The Handle method is called when a message arrives on a port:
func (c *MyComponent) Handle(
ctx context.Context,
output module.Handler,
port string,
message any,
) any {
switch port {
case "input":
// Type assert the message
input := message.(*InputConfig)
// Do work
result := processData(input)
// Send to output port
output(ctx, "output", &OutputConfig{
Result: result,
})
case "_reconcile":
// Handle reconciliation (called periodically)
// Use this for cleanup, state sync, etc.
}
return nil
}
Key Points:
ctx: Context with tracing span and cancellationoutput: Callback function to send data to other portsport: Name of the port that received the messagemessage: The actual data (type assert to your config struct)The SDK automatically generates JSON Schemas from your Go structs. Use struct tags to control the UI:
type Config struct {
// Basic field
Name string `json:"name"`
// Configurable in UI (can reference other node outputs)
UserID string `json:"userId" configurable:"true"`
// Shared definition (other nodes can reference this)
Result string `json:"result" shared:"true"`
// Control UI layout
APIKey string `json:"apiKey" propertyOrder:"1" tab:"auth"`
// Nested object
Settings struct {
Timeout int `json:"timeout" configurable:"true"`
} `json:"settings"`
// Array
Items []string `json:"items" configurable:"true"`
}
Struct Tags:
configurable:"true": Field can accept values from other nodes via {{expression}} syntaxshared:"true": Field definition is available to other nodes for type-safe mappingpropertyOrder:"N": Controls field order in UItab:"name": Groups field under a tab in UIalign:"horizontal": Layout hint for UISpecial ports available to all components:
_reconcile PortCalled periodically (every 5 minutes) and on node changes:
case "_reconcile":
// Clean up resources
// Sync state
// Check for drift
_client PortProvides Kubernetes client for resource operations:
case "_client":
client := message.(resource.Manager)
// Create a signal
client.CreateSignal(ctx, resource.CreateSignalRequest{
Node: "target-node",
Port: "input",
Data: map[string]any{"key": "value"},
})
// Get node information
node, err := client.GetNode(ctx, "node-name")
_settings PortReceives initial configuration (no "from" connection required):
case "_settings":
settings := message.(*MyConfig)
// Store settings for later use
There is no guaranteed delivery order between system ports. The _settings, _reconcile, and _control ports may fire in any sequence after a pod restart or during normal operation. Module creators must handle all possible orderings.
Common pitfall: a component restores state from metadata via _reconcile, then a _settings delivery arrives with stale CRD values and overwrites it. The SDK does not enforce any ordering — it is the module creator's responsibility to handle this.
Example: if your component stores user-provided context via _control and persists it to metadata, you must protect it from being overwritten by _settings:
type Component struct {
settings Settings
contextFromControl bool // tracks whether context was set by control/metadata
}
case v1alpha1.SettingsPort:
in := msg.(Settings)
if c.contextFromControl {
in.Context = c.settings.Context // preserve control-set context
}
c.settings = in
case v1alpha1.ReconcilePort:
// restore from metadata
if cfg, ok := restoreFromMetadata(node); ok {
c.settings = cfg
c.contextFromControl = true
}
case v1alpha1.ControlPort:
ctrl := msg.(Control)
c.settings.Context = ctrl.Context
c.contextFromControl = true
Return regular errors for automatic retry with exponential backoff:
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
data, err := fetchFromAPI()
if err != nil {
// Will retry automatically
return err
}
// ...
}
Use PermanentError to stop retries:
import "github.com/tiny-systems/module/pkg/errors"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if !isValid(msg) {
// Won't retry - send to error port instead
output(ctx, "error", errors.PermanentError{
Err: fmt.Errorf("invalid input"),
})
return nil
}
// ...
}
Access Kubernetes resources from your component:
import "github.com/tiny-systems/module/pkg/resource"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
if port == "_client" {
c.client = msg.(resource.Manager)
return nil
}
if port == "create-flow" {
// Create a new flow
flow, err := c.client.CreateFlow(ctx, resource.CreateFlowRequest{
Name: "dynamic-flow",
Project: "my-project",
})
// Create nodes in the flow
node, err := c.client.CreateNode(ctx, resource.CreateNodeRequest{
Name: "node-1",
Flow: flow.Name,
Module: "http-module",
Component: "request",
Settings: map[string]any{
"url": "https://api.example.com",
},
})
// Trigger the node
c.client.CreateSignal(ctx, resource.CreateSignalRequest{
Node: node.Name,
Port: "trigger",
Data: map[string]any{},
})
}
return nil
}
OpenTelemetry is built-in. The context includes a span:
import "go.opentelemetry.io/otel"
func (c *MyComponent) Handle(ctx context.Context, output module.Handler, port string, msg any) any {
// Get tracer
tracer := otel.Tracer("my-module")
// Create child span
ctx, span := tracer.Start(ctx, "processing")
defer span.End()
// Add attributes
span.SetAttributes(
attribute.String("input.size", "large"),
)
// Do work...
result := doWork(ctx)
output(ctx, "output", result)
return nil
}
package components_test
import (
"context"
"testing"
"github.com/myorg/my-module/components"
)
func TestHello(t *testing.T) {
component := &components.Hello{}
var outputData *components.HelloOutput
outputHandler := func(ctx context.Context, port string, data any) any {
if port == "output" {
outputData = data.(*components.HelloOutput)
}
return nil
}
// Send message to input port
component.Handle(context.Background(), outputHandler, "input", &components.HelloInput{
Name: "World",
})
// Verify output
if outputData.Greeting != "Hello, World!" {
t.Errorf("Expected 'Hello, World!', got '%s'", outputData.Greeting)
}
}
_reconcile for cleanupctx.Done() for graceful shutdownshared:true for type-safe flowsGetInfo() for discoverabilityALWAYS return the result of handler() calls. Never ignore the return value.
The TinySystems SDK uses blocking I/O for request-response patterns. When a component like HTTP Server sends a request, it blocks waiting for a response to flow back through the same handler chain. If any component in the chain ignores the handler return value, the response is lost and the original caller times out.
BAD - Breaks blocking I/O:
func (c *Component) handleError(ctx context.Context, handler module.Handler, req Request, errMsg string) any {
if c.settings.EnableErrorPort {
_ = handler(ctx, "error", Error{...}) // WRONG: ignores return value!
return nil // Response is lost, HTTP Server times out
}
return errors.New(errMsg)
}
GOOD - Propagates response correctly:
func (c *Component) handleError(ctx context.Context, handler module.Handler, req Request, errMsg string) any {
if c.settings.EnableErrorPort {
return handler(ctx, "error", Error{...}) // CORRECT: returns handler result
}
return errors.New(errMsg)
}
Why this matters:
When HTTP Server → Slack Command → Router → HTTP Server:Response:
handler(ctx, "request", req)handler(ctx, "error", error)handleResponse() returns the ResponseIf Slack Command does _ = handler(...) and returns nil, the Response is lost.
Rules:
return handler(ctx, port, data) for output ports_reconcile port calls can ignore returns (internal system port)All component names must follow a consistent naming pattern using snake_case:
Pattern: [technology_][resource]_[action] or [resource]_[action]
Rules:
snake_case (lowercase with underscores)Examples by category:
| Category | Name | Description |
|---|---|---|
| HTTP | http_server |
HTTP server |
http_request |
Make HTTP request | |
http_auth_parse |
Parse auth header | |
| Encoding | json_encode |
Encode to JSON |
json_decode |
Decode from JSON | |
go_template |
Render Go template | |
smtp_send |
Send email via SMTP | |
sendgrid_send |
Send email via SendGrid API | |
| Messaging | slack_send |
Send Slack message |
slack_command |
Receive Slack command | |
| Kubernetes | pod_status_get |
Get pod status |
pod_logs_get |
Get pod logs | |
deployment_restart |
Restart deployment | |
resource_watch |
Watch K8s resources | |
| Utilities | transform |
Transform/passthrough data |
delay |
Delay execution | |
router |
Route messages |
When to include technology prefix:
smtp_send vs sendgrid_send)go_template vs handlebars_template)grpc_call vs http_request)Follow idiomatic Go patterns:
if err != nil { return } pattern// Good - early return
func (c *Component) Handle(ctx context.Context, handler module.Handler, port string, msg any) error {
if port != "request" {
return fmt.Errorf("unknown port: %s", port)
}
req, ok := msg.(Request)
if !ok {
return errors.New("invalid request")
}
result, err := c.process(ctx, req)
if err != nil {
return c.handleError(ctx, handler, req, err)
}
return handler(ctx, "output", result)
}
// Bad - nested ifs
func (c *Component) Handle(ctx context.Context, handler module.Handler, port string, msg any) error {
if port == "request" {
if req, ok := msg.(Request); ok {
if result, err := c.process(ctx, req); err == nil {
return handler(ctx, "output", result)
} else {
return c.handleError(ctx, handler, req, err)
}
}
}
return fmt.Errorf("unknown port: %s", port)
}
1. Minimize Settings
Settings should ONLY contain:
EnableErrorPort, EnableStatusPort (affect port visibility)Settings should NOT contain:
// Good - minimal settings
type Settings struct {
EnableErrorPort bool `json:"enableErrorPort" title:"Enable Error Port"`
DefaultLines int64 `json:"defaultLines" title:"Default Lines"`
}
// Bad - credentials and runtime config in settings
type Settings struct {
APIKey string `json:"apiKey"` // Should be in request
Endpoint string `json:"endpoint"` // Should be in request
Namespace string `json:"namespace"` // Should be in request
}
2. Credentials via Input Ports
All credentials and runtime configuration come through input ports:
type Request struct {
Context any `json:"context,omitempty" configurable:"true"`
// Credentials - from upstream (e.g., secret manager)
APIKey string `json:"apiKey" required:"true" configurable:"true"`
// Runtime config - varies per execution
Endpoint string `json:"endpoint" required:"true" configurable:"true"`
Namespace string `json:"namespace" required:"true" configurable:"true"`
}
Why: Settings are spread across flows, not programmatically configurable, and storing credentials in settings is a security anti-pattern.
3. Context Passthrough
Every component MUST pass context through for correlation:
type Request struct {
Context any `json:"context,omitempty" configurable:"true" title:"Context" description:"Arbitrary data passed through to output"`
// ... other fields
}
type Response struct {
Context any `json:"context,omitempty" title:"Context"`
// ... other fields
}
func (c *Component) Handle(ctx context.Context, handler module.Handler, port string, msg any) error {
req := msg.(Request)
result := c.process(req)
// Always pass context through
handler(ctx, "output", Response{
Context: req.Context, // Pass through!
Data: result,
})
return nil
}
4. Flow-Driven Configuration
Everything should be configurable via edges/signals, not the settings panel:
Check out these example modules for reference:
The SDK includes a CLI for running and building modules:
# Run module locally
./my-module run --name=my-module --version=1.0.0 --namespace=tinysystems
# Build (if custom build logic is needed)
./my-module build
# Get help
./my-module --help
Copyright 2026 Tiny Systems Limited. All rights reserved.
This project is licensed under the Business Source License 1.1.
Key terms:
For commercial licensing inquiries, contact Tiny Systems Limited.