Flow
Agent Flow lets you define multiple agents in a single flow.yaml file, wire them together with delegate sinks, and run them all with one command. Each agent runs as an asyncio task on a shared event loop, with agent executions dispatched to a thread pool. Delegate sinks route output from one agent to another via in-memory queues.
Agents start in tiers based on needs. Each agent is a standalone unit connected to others via delegate sinks.
Quick Start
# flow.yaml
apiVersion: initrunner/v1
kind: Flow
metadata:
name: my-pipeline
description: Simple producer-consumer pipeline
spec:
agents:
producer:
role: roles/producer.yaml
sink:
type: delegate
target: consumer
consumer:
role: roles/consumer.yaml
needs:
- producer# Validate
initrunner flow validate flow.yaml
# Start (foreground, Ctrl+C to stop)
initrunner flow up flow.yamlScaffold a Project
Use initrunner flow new to generate a complete multi-agent project with role files and a flow.yaml:
initrunner flow new my-pipeline # default: pipeline pattern
initrunner flow new my-pipeline --pattern fan-out # dispatcher + parallel workers
initrunner flow new my-pipeline --pattern route # intake with sense-based routing
initrunner flow new my-pipeline --agents 4 # customize agent count
initrunner flow new my-pipeline --shared-memory # enable shared memory
initrunner flow new --list-patterns # show available patternsThree patterns are available:
| Pattern | Description |
|---|---|
pipeline | Linear chain of agents. Configurable agent count. |
fan-out | A dispatcher fans work to parallel workers. |
route | An intake agent routes messages to specialized agents (researcher, responder, escalator) using sense-based scoring. |
Each pattern generates a ready-to-run project directory with role YAML files and a flow.yaml. Review the generated files, customize as needed, then run with initrunner flow up flow.yaml.
Flow Definition
The top-level structure follows the apiVersion/kind/metadata/spec pattern:
| Field | Type | Default | Description |
|---|---|---|---|
apiVersion | str | (required) | e.g. initrunner/v1 |
kind | str | (required) | Must be "Flow" |
metadata.name | str | (required) | Flow definition name |
metadata.description | str | "" | Human-readable description |
spec.agents | dict | (required) | Map of agent name to configuration |
Agent Configuration
agents:
my-agent:
role: roles/my-role.yaml
sink:
type: delegate
target: other-agent
needs:
- dependency-agent
restart:
condition: on-failure
max_retries: 3
delay_seconds: 5
environment: {}| Field | Type | Default | Description |
|---|---|---|---|
role | str | (required) | Path to role YAML (relative to flow file) |
sink | object | null | null | Delegate sink for routing output |
needs | list[str] | [] | Agents that must start first |
restart.condition | str | "none" | "none", "on-failure", or "always" |
restart.max_retries | int | 3 | Maximum restart attempts |
restart.delay_seconds | int | 5 | Seconds before restarting |
environment | dict | {} | Additional environment variables |
Delegate Sinks
Route an agent's output to other agents via in-memory queues.
# Single target
sink:
type: delegate
target: consumer
queue_size: 100
timeout_seconds: 60
# Fan-out to multiple targets
sink:
type: delegate
strategy: sense
target:
- researcher
- responder
keep_existing_sinks: true| Field | Type | Default | Description |
|---|---|---|---|
type | str | (required) | Must be "delegate" |
target | str | list[str] | (required) | Target agent name(s) |
strategy | "all" | "keyword" | "sense" | "all" | Routing strategy for multi-target delegates |
keep_existing_sinks | bool | false | Also activate role-level sinks |
queue_size | int | 100 | Max buffered events in target's inbox |
timeout_seconds | int | 60 | Block time when queue is full before dropping |
circuit_breaker_threshold | int | null | null | Consecutive failures before circuit opens |
circuit_breaker_reset_seconds | int | 60 | Seconds before probe in open state |
Only successful runs are forwarded. Failed runs are silently skipped.
Routing Strategy
When a delegate sink has multiple targets, the strategy field controls how messages are routed.
| Strategy | Behavior | API calls |
|---|---|---|
all | Fan-out — every target receives every message (default, backward compatible) | None |
keyword | Intent Sensing keyword scoring picks the best target | None |
sense | Keyword scoring first; LLM tiebreaker when ambiguous | 0 or 1 per message |
The keyword and sense strategies use the same two-pass Intent Sensing logic used by --sense in the CLI. They score the agent's output text against each target agent's metadata.name, metadata.description, and metadata.tags from its role definition.
Before (static fan-out): every message goes to ALL targets:
triager:
role: roles/triager.yaml
sink:
type: delegate
target: [researcher, responder, escalator]After (sense picks the right target):
triager:
role: roles/triager.yaml
sink:
type: delegate
strategy: sense # ← one line added
target: [researcher, responder, escalator]How routing works
- The upstream agent's output is scored against each target's role metadata (name, description, tags) using keyword matching.
- If the output doesn't produce a confident match, the original user prompt (preserved from the head of the delegation chain) is also scored.
- For
sensestrategy, if both attempts are inconclusive, an LLM tiebreaker call selects the best target. - The message is forwarded to the selected target only (not fanned out).
Routing diagnostics are injected into the payload's trigger metadata as _flow_route_reason for audit visibility.
Optimizing roles for routing
The same tips from Intent Sensing — Writing Roles That Sense Well apply. Each target agent's role should have specific, non-overlapping tags and a clear description:
# roles/researcher.yaml
metadata:
name: researcher
description: Researches topics in depth and gathers supporting evidence
tags: [research, analysis, investigation, evidence]
# roles/responder.yaml
metadata:
name: responder
description: Responds directly to user queries with concise answers
tags: [response, chat, answer, reply]
# roles/escalator.yaml
metadata:
name: escalator
description: Escalates complex issues to human operators
tags: [escalation, support, human, complex]Single target behavior
When only one target is specified, strategy has no effect — the message always goes to that target regardless of the strategy setting.
Startup Order
Agents start in topological order based on needs. Agents without dependencies start first, forming tiers of parallel startup. Shutdown happens in reverse order.
agents:
inbox-watcher:
role: roles/inbox-watcher.yaml
sink: { type: delegate, target: triager }
triager:
role: roles/triager.yaml
needs: [inbox-watcher]
sink: { type: delegate, target: [researcher, responder] }
researcher:
role: roles/researcher.yaml
needs: [triager]
responder:
role: roles/responder.yaml
needs: [triager]Tier 0: inbox-watcher (no dependencies)
Tier 1: triager (depends on inbox-watcher)
Tier 2: researcher, responder (both depend on triager)Restart Policies
| Condition | Restart when... |
|---|---|
none | Never restart |
on-failure | Restart only if errors were recorded |
always | Restart whenever the agent thread exits |
A health monitor checks every 10 seconds and applies restart policies. In flow mode, it runs as an asyncio task on the shared event loop; in standalone mode, it runs as a thread.
Runtime Architecture
Graph-Based Execution
Since v2026.3.8, flow and team runners use pydantic-graph for orchestration instead of thread-per-agent. Agents are modeled as graph nodes with edges representing delegate sinks. Fan-out, routing, and delegation run as graph steps with native async agent execution.
Since v2026.4.8, tool call start/complete events and a usage event (with token counts and cost) are streamed via SSE for flow runs, matching the agent stream contract. The dashboard displays these in the unified bottom panel with live tool activity.
Flow YAML
│
└── pydantic-graph
├── Step: agent-a (Tier 0)
├── Fork: agent-b, agent-c (Tier 1)
├── Join
└── Step: agent-d (Tier 2)The graph topology is derived from needs declarations. Sequential chains become linear step sequences, fan-out patterns become fork/join nodes, and sense routing is handled at graph edges. Agent executions run as native async calls within each graph step.
Async Tool Execution
Flow agents build with prefer_async=True, which gives I/O-bound tools (http, web_reader, web_scraper, search) async closures. When the agent runs via PydanticAI's async agent.run(), these tools execute natively on the event loop without thread-pool overhead. Sync tools are auto-wrapped by PydanticAI.
Shutdown Semantics
- First Ctrl+C (or SIGTERM) sets the stop event — agents stop accepting new inbox events.
- Each agent waits for any in-flight run to complete (grace period, default 30s).
- After the grace period, the agent detaches from the in-flight run.
- Agent tasks are cancelled on the shared loop.
- The
ThreadPoolExecutorshuts down, letting in-flight runs complete. - Delegate sinks flush their audit threads.
A second Ctrl+C force-exits immediately.
Executor Pool Sizing
The thread pool for agent runs defaults to min(32, len(agents) + 4). This is configurable via max_agent_workers on FlowOrchestrator.
Shared Memory
When spec.shared_memory.enabled is true, all agents in the flow share a single memory database. One agent's remember() calls become visible to every other agent's recall().
Configuration
spec:
shared_memory:
enabled: true
store_path: null # default: ~/.initrunner/memory/{name}-shared.db
max_memories: 1000
store_backend: lancedb| Field | Type | Default | Description |
|---|---|---|---|
shared_memory.enabled | bool | false | Enable shared memory across all agents. |
shared_memory.store_path | str | null | null | Path to the shared memory store. Default: ~/.initrunner/memory/{name}-shared.db. |
shared_memory.max_memories | int | 1000 | Maximum number of memories in the shared store. |
shared_memory.store_backend | str | "lancedb" | Store backend. |
All agents sharing a memory store must use compatible embedding models (same dimensions). Keep memory.embeddings consistent across roles, or omit it to let all agents derive from their spec.model.provider defaults.
Shared Documents
When spec.shared_documents.enabled is true, all agents in the flow share a single document store. This lets you ingest documents once (e.g. via one agent's ingest config) and have every agent's search_documents tool query the same store.
Unlike shared memory, shared documents requires explicit embedding configuration at the flow level. This prevents embedding model mismatches between roles querying the same store.
Configuration
spec:
shared_documents:
enabled: true
store_path: ./shared-docs.lance # optional, default: ~/.initrunner/stores/{name}-shared.lance
embeddings:
provider: openai # required when enabled
model: text-embedding-3-small # required when enabled
agents:
researcher:
role: roles/researcher.yaml # has ingest config with sources
writer:
role: roles/writer.yaml # no ingest config needed| Field | Type | Default | Description |
|---|---|---|---|
shared_documents.enabled | bool | false | Enable a shared document store across all agents. |
shared_documents.store_path | str | null | null | Path to the shared document store. Default: ~/.initrunner/stores/{name}-shared.lance. |
shared_documents.store_backend | str | "lancedb" | Store backend. |
shared_documents.embeddings.provider | str | (required when enabled) | Embedding provider. Must be set explicitly when enabled: true. |
shared_documents.embeddings.model | str | (required when enabled) | Embedding model. Must be set explicitly when enabled: true. |
How It Works
At startup, apply_shared_documents() patches each agent's role definition:
- Roles with
ingest:configured: the existingstore_path,store_backend, andembeddingsare overridden with the shared values. All other ingest settings (sources,chunking) are preserved. - Roles without
ingest:: a minimalIngestConfigis injected with emptysourcesand the shared store settings. This registers thesearch_documentsretrieval tool so the role can query the shared store without needing its own ingest config.
Shared documents is a flow-time config patch only. It does not run ingestion automatically. Run initrunner ingest against the role that has sources configured to populate the shared store.
Embedding Consistency
The flow definition owns the embedding configuration for the shared store. When shared_documents.enabled is true, both embeddings.provider and embeddings.model must be set explicitly. This is validated at parse time and prevents the situation where different roles derive different embedding models from their spec.model.provider.
Usage Pattern
- Configure one role (e.g.
researcher) withingest.sourcespointing at your documents. - Enable
shared_documentswith the same embedding model the researcher would use. - Run
initrunner ingest roles/researcher.yamlto populate the shared store. - Start the flow. All agents can now query the shared documents via
search_documents.
Systemd Deployment
Install flow pipelines as systemd user services for production:
# Install the unit
initrunner flow install flow.yaml
# Start
initrunner flow start my-pipeline
# Enable on boot
systemctl --user enable initrunner-my-pipeline.service
# Monitor
initrunner flow status my-pipeline
initrunner flow logs my-pipeline -fEnvironment Variables
Systemd services don't inherit shell exports. Provide secrets via environment files:
{flow_dir}/.env— project-level secrets~/.initrunner/.env— user-level defaults
Use --generate-env to create a template .env file:
initrunner flow install flow.yaml --generate-envUser Lingering
To keep services running after logout:
loginctl enable-linger $USERExample: Email Pipeline
inbox-watcher ──> triager ──> researcher
│
└──────> responderapiVersion: initrunner/v1
kind: Flow
metadata:
name: email-pipeline
description: Multi-agent email processing pipeline
spec:
agents:
inbox-watcher:
role: roles/inbox-watcher.yaml
sink:
type: delegate
target: triager
triager:
role: roles/triager.yaml
needs: [inbox-watcher]
sink:
type: delegate
target: [researcher, responder]
circuit_breaker_threshold: 5
researcher:
role: roles/researcher.yaml
needs: [triager]
responder:
role: roles/responder.yaml
needs: [triager]
restart: { condition: on-failure, max_retries: 3, delay_seconds: 5 }Agent Roles
Each agent points to a standalone role YAML. Here are the two key roles in this pipeline:
roles/triager.yaml — routes emails to the right handler:
apiVersion: initrunner/v1
kind: Agent
metadata:
name: triager
description: Routes emails to the right handler
spec:
role: >
You are an email triage agent. Analyze the email summary and
determine if it needs research (technical questions, data requests)
or a direct response (simple inquiries, acknowledgments).
Output your decision and reasoning clearly.
model:
provider: openai
name: gpt-4o-mini
temperature: 0.1
guardrails:
max_tokens_per_run: 2000
timeout_seconds: 30roles/responder.yaml — drafts email responses:
apiVersion: initrunner/v1
kind: Agent
metadata:
name: responder
description: Drafts email responses
spec:
role: >
You are an email response agent. Given a triaged email that needs
a direct response, draft a professional, helpful reply. Keep the
tone friendly and concise.
model:
provider: openai
name: gpt-4o-mini
temperature: 0.5
guardrails:
max_tokens_per_run: 3000
timeout_seconds: 30Agent roles are minimal — they focus on a single task and don't need triggers or sinks (the flow file handles routing). This keeps each agent simple and testable independently.
Example: CI Pipeline
A webhook-driven pipeline that processes CI events, diagnoses build failures, and sends notifications.
webhook-receiver ──> build-analyzer ──> notifierflow.yaml
apiVersion: initrunner/v1
kind: Flow
metadata:
name: ci-pipeline
description: CI event processing pipeline
spec:
agents:
webhook-receiver:
role: roles/webhook-receiver.yaml
sink:
type: delegate
target: build-analyzer
build-analyzer:
role: roles/build-analyzer.yaml
needs: [webhook-receiver]
sink:
type: delegate
target: notifier
notifier:
role: roles/notifier.yaml
needs: [build-analyzer]
restart: { condition: on-failure, max_retries: 3, delay_seconds: 5 }roles/notifier.yaml
The most interesting agent — it combines Slack messaging with the GitHub commit status API:
apiVersion: initrunner/v1
kind: Agent
metadata:
name: ci-notifier
description: Sends Slack notifications and updates GitHub commit status
spec:
role: |
You are a CI notification agent. You receive analyzed build events and:
1. Send a formatted Slack notification:
- Success: "✅ Build passed — [repo] @ [branch] ([sha])"
- Failure: "❌ Build failed — [repo] @ [branch] ([sha])\n
Diagnosis: [diagnosis]\nCategory: [category]"
- Include the build URL as a link
- Add a timestamp via get_current_time
2. Update the GitHub commit status using the create_commit_status API
endpoint:
- state: "success" or "failure"
- description: brief status message
- context: "ci-pipeline/initrunner"
Always send both the Slack message and the GitHub status update.
model:
provider: openai
name: gpt-4o-mini
temperature: 0.0
tools:
- type: slack
webhook_url: "${SLACK_WEBHOOK_URL}"
default_channel: "#ci-alerts"
username: CI Pipeline
icon_emoji: ":construction_worker:"
- type: api
name: github-status
description: GitHub commit status API
base_url: https://api.github.com
headers:
Accept: application/vnd.github.v3+json
auth:
Authorization: "Bearer ${GITHUB_TOKEN}"
endpoints:
- name: create_commit_status
method: POST
path: "/repos/{owner}/{repo}/statuses/{sha}"
description: Create a commit status check
parameters:
- name: owner
type: string
required: true
- name: repo
type: string
required: true
- name: sha
type: string
required: true
- name: state
type: string
required: true
description: "pending, success, failure, or error"
- name: description
type: string
required: false
- name: context
type: string
required: false
default: "ci-pipeline/initrunner"
body_template:
state: "{state}"
description: "{description}"
context: "{context}"
timeout_seconds: 15
- type: datetime
guardrails:
max_tokens_per_run: 15000
max_tool_calls: 10
timeout_seconds: 60Test the webhook
# Start the pipeline
initrunner flow up flow.yaml
# In another terminal, send a test event
curl -X POST http://localhost:9090/ci-webhook \
-H "Content-Type: application/json" \
-d '{
"source": "github-actions",
"repo": "myorg/myapp",
"branch": "main",
"sha": "abc12345",
"status": "failure",
"author": "dev@example.com",
"message": "fix: update auth middleware",
"url": "https://github.com/myorg/myapp/actions/runs/12345"
}'What to notice: The notifier combines two tool types —
slackfor human-readable alerts andapifor machine-readable GitHub status updates. The webhook receiver uses awebhooktrigger (port 9090), and the flow file wires all three agents together with delegate sinks.
Example: Support Desk
intake ──[sense]──> researcher | responder | escalatorA support desk pipeline where strategy: sense on the intake's delegate sink auto-routes each message to the best-matching handler — no static fan-out.
flow.yaml
apiVersion: initrunner/v1
kind: Flow
metadata:
name: support-desk
description: >
Support desk pipeline with intelligent auto-routing. An intake agent
summarizes incoming requests, then sense routing automatically sends
each request to the right handler -- researcher for technical issues,
responder for quick answers, or escalator for urgent/complex cases.
No static fan-out: each message goes to exactly one target.
spec:
agents:
intake:
role: roles/intake.yaml
sink:
type: delegate
# strategy: sense uses keyword scoring + LLM tiebreak to pick the
# best target for each message. Use "keyword" for zero API calls,
# or "all" to fan out to every target (default).
strategy: sense
target:
- researcher
- responder
- escalator
researcher:
role: roles/researcher.yaml
needs:
- intake
responder:
role: roles/responder.yaml
needs:
- intake
restart:
condition: on-failure
max_retries: 3
delay_seconds: 5
escalator:
role: roles/escalator.yaml
needs:
- intakeAgent Roles
roles/intake.yaml — receives and summarizes support requests:
apiVersion: initrunner/v1
kind: Agent
metadata:
name: intake
description: Receives support requests and summarizes them for triage
tags:
- support
- intake
spec:
role: >
You are a support intake agent. When you receive a support request,
produce a concise summary including: the customer's issue, urgency
level, and the type of action needed (research, direct response,
or human escalation). Be factual and brief.
model:
provider: openai
name: gpt-5-mini
temperature: 0.1
guardrails:
max_tokens_per_run: 1000
timeout_seconds: 30roles/researcher.yaml — investigates technical issues:
apiVersion: initrunner/v1
kind: Agent
metadata:
name: researcher
description: Investigates technical issues and gathers diagnostic information
tags:
- research
- analysis
- investigation
- technical
- diagnose
spec:
role: >
You are a technical research agent for a support desk. When you
receive a triaged support request that requires investigation,
research the issue thoroughly. Produce a structured report with:
root cause analysis, relevant documentation references, and
recommended resolution steps.
model:
provider: openai
name: gpt-5-mini
temperature: 0.3
guardrails:
max_tokens_per_run: 4000
timeout_seconds: 60initrunner flow up flow.yamlWhat to notice: The
strategy: senseon the intake's delegate sink means each message is scored against the three targets' role metadata (name, description, tags). Because the tags are non-overlapping — researcher uses[research, analysis, investigation, technical, diagnose]while responder and escalator cover different domains — keyword scoring alone resolves most messages without an LLM call. See Routing Strategy for details.
Example: Content Pipeline
content-watcher ──> researcher ──> writer
│
└──────> reviewerUses process_existing: true on the file watch trigger to handle files already in the directory on startup. See Triggers for details.
See also: Team Mode for single-file multi-persona collaboration — simpler than Flow when you need multiple perspectives on the same task rather than independent agents.