Getting started¶
Requirements¶
You need:
- Python
>=3.13 - RabbitMQ
- Redis
Install from GitHub Releases¶
Install the wheel:
pip install https://github.com/sarattha/relayna/releases/download/v1.3.3/relayna-1.3.3-py3-none-any.whl
Or install the source distribution:
pip install https://github.com/sarattha/relayna/releases/download/v1.3.3/relayna-1.3.3.tar.gz
For local work in this repository:
uv sync --extra dev
Topology overview¶
relayna now uses named topology classes as the primary RabbitMQ configuration
API.
SharedTasksSharedStatusTopology: one shared task queue and one shared status queue/stream.SharedTasksSharedStatusShardedAggregationTopology: the same shared task and shared status plane, plus shard-bound aggregation worker queues on the status exchange.RoutedTasksSharedStatusTopology: task queues are bound bytask_type, while status stays on one shared queue/stream.RoutedTasksSharedStatusShardedAggregationTopology: routed task queues plus shard-bound aggregation worker queues on the shared status exchange.SharedStatusWorkflowTopology: one workflow topic exchange, one durable inbox queue per consuming stage, and one shared status queue/stream.
Topology naming guidance¶
Shared topology resources are usually namespaced by exchange, queue, and Redis
prefix values. For sharded topologies, remember that the default aggregation
queue names such as aggregation.queue.0 and
aggregation.queue.shards.1-3 are global durable queues. If multiple
deployments, smoke tests, or local stacks share one RabbitMQ vhost, give the
aggregation queues a deployment-specific prefix too.
Queue argument configuration¶
Topology constructors include a small curated set of first-class RabbitMQ queue arguments for worker-owned queues:
task_consumer_timeout_mstask_single_active_consumertask_max_prioritytask_queue_typeaggregation_consumer_timeout_msaggregation_single_active_consumeraggregation_max_priorityaggregation_queue_type
Priority queue max values must be between 1 and 255.
Status queues keep their existing dedicated stream and expiry settings, but all topologies also expose explicit mapping escape hatches for broker-specific queue arguments:
task_queue_arguments_overridestask_queue_kwargsaggregation_queue_arguments_overridesaggregation_queue_kwargsstatus_queue_arguments_overridesstatus_queue_kwargs
Native queue-argument field mapping:
- Task queue:
tasks_message_ttl_ms->x-message-ttl - Task queue:
dead_letter_exchange->x-dead-letter-exchange - Task queue:
task_consumer_timeout_ms->x-consumer-timeout - Task queue:
task_single_active_consumer->x-single-active-consumer - Task queue:
task_max_priority->x-max-priority - Task queue:
task_queue_type->x-queue-type - Status queue:
status_use_streams=True->x-queue-type=stream - Status queue:
status_queue_ttl_ms->x-expires - Status queue:
status_stream_max_length_gb->x-max-length-bytes - Status queue:
status_stream_max_segment_size_mb->x-stream-max-segment-size-bytes - Aggregation queue:
aggregation_consumer_timeout_ms->x-consumer-timeout - Aggregation queue:
aggregation_single_active_consumer->x-single-active-consumer - Aggregation queue:
aggregation_max_priority->x-max-priority - Aggregation queue:
aggregation_queue_type->x-queue-type
These topology timeout fields configure RabbitMQ queue arguments. They are
separate from the runtime-side consume_timeout_seconds setting on
TaskConsumer, AggregationConsumer, and AggregationWorkerRuntime, which
controls how long the local consumer loop waits for the next message before it
re-enters the loop.
status_stream_initial_offset is also native, but it affects the consumer
argument x-stream-offset rather than queue declaration arguments.
Example: task queue native fields only
topology = SharedTasksSharedStatusTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.queue",
tasks_routing_key="task.request",
status_exchange="status.exchange",
status_queue="status.queue",
tasks_message_ttl_ms=30000,
dead_letter_exchange="tasks.dlx",
task_consumer_timeout_ms=600000,
task_single_active_consumer=True,
task_max_priority=10,
task_queue_type="quorum",
)
Example: task queue native fields plus broker-specific kwargs
topology = SharedTasksSharedStatusTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.queue",
tasks_routing_key="task.request",
status_exchange="status.exchange",
status_queue="status.queue",
task_consumer_timeout_ms=600000,
task_queue_kwargs={"x-overflow": "reject-publish"},
)
Example: sharded aggregation queue native fields plus kwargs
topology = SharedTasksSharedStatusShardedAggregationTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.queue",
tasks_routing_key="task.request",
status_exchange="status.exchange",
status_queue="status.queue",
shard_count=4,
aggregation_consumer_timeout_ms=120000,
aggregation_single_active_consumer=True,
aggregation_queue_type="quorum",
aggregation_queue_kwargs={"x-delivery-limit": 20},
)
Example: status queue stream settings plus kwargs
topology = SharedTasksSharedStatusTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.queue",
tasks_routing_key="task.request",
status_exchange="status.exchange",
status_queue="status.queue",
status_use_streams=True,
status_stream_max_length_gb=2,
status_stream_max_segment_size_mb=64,
status_queue_kwargs={"x-initial-cluster-size": 3},
)
Relayna raises ValueError when the same RabbitMQ argument key is configured
through more than one source for the same queue family.
Example: shared tasks + shared status¶
This is the default setup for task workers, shared status history, and FastAPI status endpoints.
from fastapi import FastAPI
from relayna.dlq import DLQService
from relayna.fastapi import create_dlq_router, create_relayna_lifespan, create_status_router, get_relayna_runtime
from relayna.rabbitmq import RelaynaRabbitClient
from relayna.topology import SharedTasksSharedStatusTopology
topology = SharedTasksSharedStatusTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.queue",
tasks_routing_key="task.request",
status_exchange="status.exchange",
status_queue="status.queue",
task_consumer_timeout_ms=600000,
)
client = RelaynaRabbitClient(topology=topology)
await client.initialize()
await client.publish_task({"task_id": "task-123", "payload": {"kind": "demo"}})
app = FastAPI(
lifespan=create_relayna_lifespan(
topology=topology,
redis_url="redis://localhost:6379/0",
)
)
runtime = get_relayna_runtime(app)
app.include_router(
create_status_router(
sse_stream=runtime.sse_stream,
history_reader=runtime.history_reader,
latest_status_store=runtime.store,
)
)
if runtime.dlq_store is not None:
app.include_router(
create_dlq_router(
dlq_service=DLQService(
rabbitmq=runtime.rabbitmq,
dlq_store=runtime.dlq_store,
status_store=runtime.store,
),
broker_dlq_queue_names=["tasks.queue.dlq", "aggregation.queue.0.dlq"],
)
)
This exposes:
GET /events/{task_id}GET /historyGET /status/{task_id}
For a detailed observability walkthrough, see Observability.
Contract aliases¶
Use ContractAliasConfig when external producers or API clients should use
different top-level envelope field names.
from relayna.contracts import ContractAliasConfig
alias_config = ContractAliasConfig(
field_aliases={
"task_id": "attempt_id",
"correlation_id": "request_id",
"service": "source_service",
"task_type": "job_type",
}
)
client = RelaynaRabbitClient(topology=topology, alias_config=alias_config)
await client.initialize()
await client.publish_task(
{
"attempt_id": "attempt-123",
"request_id": "req-123",
"source_service": "billing-api",
"job_type": "invoice.render",
"payload": {"kind": "demo"},
}
)
app = FastAPI(
lifespan=create_relayna_lifespan(
topology=topology,
redis_url="redis://localhost:6379/0",
alias_config=alias_config,
)
)
runtime = get_relayna_runtime(app)
app.include_router(
create_status_router(
sse_stream=runtime.sse_stream,
history_reader=runtime.history_reader,
latest_status_store=runtime.store,
alias_config=alias_config,
)
)
Example: routed tasks + shared status¶
Use RoutedTasksSharedStatusTopology when different workers should consume
different task_type values from the same task exchange while still publishing
into one shared status stream.
Each routed worker gets its own topology instance:
from relayna.topology import RoutedTasksSharedStatusTopology
generate_topology = RoutedTasksSharedStatusTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.generate.queue",
task_types=("draft.generate",),
status_exchange="status.exchange",
status_queue="status.queue",
)
review_topology = RoutedTasksSharedStatusTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.review.queue",
task_types=("draft.review",),
status_exchange="status.exchange",
status_queue="status.queue",
)
Important details:
- the routed workers share
tasks_exchange,status_exchange, andstatus_queue - each routed worker uses its own
tasks_queue - each routed worker declares one or more
task_typesthat it owns RelaynaRabbitClient.publish_task(...)routes byTaskEnvelope.task_typeunder routed topologies, sotask_typeis required
That layout is what makes manual handoff retry possible: one worker can
republish the same task_id under a different task_type, and RabbitMQ will
deliver it to another routed worker queue.
Relayna keeps the transport canonical internally and only applies aliases at the edges:
- inbound task and status payloads accept aliased top-level names such as
attempt_id,request_id,source_service, andjob_type - workers still receive canonical envelope fields such as
TaskEnvelope.task_id,TaskEnvelope.correlation_id,TaskEnvelope.service, andTaskEnvelope.task_type /events/{attempt_id},/status/{attempt_id}, and/history?attempt_id=...use the alias when the same config is passed into FastAPI- history, latest-status, and SSE bodies expose the aliased names instead of the canonical keys
- aliasing is top-level only; nested keys inside
payload,meta, orresultare not renamed
Example latest-status response with aliasing enabled:
{
"attempt_id": "attempt-123",
"event": {
"attempt_id": "attempt-123",
"request_id": "req-123",
"source_service": "billing-api",
"job_type": "invoice.render",
"status": "completed",
"message": "Task completed.",
"event_id": "8e53f4...",
"meta": {}
}
}
Example SSE event with aliasing enabled:
id: 8e53f4...
event: status
data: {"attempt_id":"attempt-123","request_id":"req-123","source_service":"billing-api","job_type":"invoice.render","status":"completed","message":"Task completed.","event_id":"8e53f4...","meta":{}}
Example worker view of the same task after Relayna normalizes aliases:
async def handle_task(task: TaskEnvelope, context: TaskContext) -> None:
assert task.task_id == "attempt-123"
assert task.correlation_id == "req-123"
assert task.service == "billing-api"
assert task.task_type == "invoice.render"
If your HTTP route parameter should differ from the payload alias, add
http_aliases:
alias_config = ContractAliasConfig(
field_aliases={
"task_id": "attempt_id",
"correlation_id": "request_id",
},
http_aliases={"task_id": "attemptId"},
)
With that config, request bodies still use attempt_id, while FastAPI routes
and query parameters use attemptId.
Example split between HTTP naming and JSON body naming:
- request path:
GET /status/attempt-123using the route templateGET /status/{attemptId} - request query:
GET /history?attemptId=attempt-123 - response body:
{
"attempt_id": "attempt-123",
"event": {
"attempt_id": "attempt-123",
"status": "completed"
}
}
That split is intentional: http_aliases controls path/query parameter names,
while field_aliases controls JSON payload and response field names.
Batch publishing¶
Use publish_tasks(...) when you want one client call to submit several tasks.
Relayna supports two modes.
Individual mode¶
mode="individual" publishes one RabbitMQ message per task.
await client.publish_tasks(
[
{
"attempt_id": "task-1",
"request_id": "req-1",
"source_service": "bulk-api",
"job_type": "invoice.render",
"payload": {"kind": "demo"},
},
{
"attempt_id": "task-2",
"request_id": "req-2",
"source_service": "bulk-api",
"job_type": "invoice.render",
"payload": {"kind": "demo"},
},
],
mode="individual",
)
Batch-envelope mode¶
mode="batch_envelope" publishes one RabbitMQ message containing several task
items plus a batch_id.
await client.publish_tasks(
[
{
"attempt_id": "task-1",
"request_id": "req-1",
"source_service": "bulk-api",
"job_type": "invoice.render",
"payload": {"kind": "demo"},
},
{
"attempt_id": "task-2",
"request_id": "req-2",
"source_service": "bulk-api",
"job_type": "invoice.render",
"payload": {"kind": "demo"},
},
],
mode="batch_envelope",
batch_id="batch-123",
meta={"source": "bulk-api"},
)
Example batch-envelope transport body published by Relayna:
{
"batch_id": "batch-123",
"tasks": [
{
"task_id": "task-1",
"correlation_id": "req-1",
"service": "bulk-api",
"task_type": "invoice.render",
"payload": {
"kind": "demo"
}
},
{
"task_id": "task-2",
"correlation_id": "req-2",
"service": "bulk-api",
"task_type": "invoice.render",
"payload": {
"kind": "demo"
}
}
],
"meta": {
"source": "bulk-api"
}
}
Even if producers publish aliased fields such as attempt_id,
request_id, source_service, or job_type, Relayna normalizes the envelope
to canonical top-level fields before it reaches the worker.
When a TaskConsumer with retry_policy=... receives that batch envelope, it
does not run all items inline under the original delivery. Relayna first fans
the envelope back out into one task-queue message per item, preserving:
task_idbatch_idbatch_indexbatch_size
That means later failures do not cause RabbitMQ to redeliver earlier successful items from the same original batch envelope.
Example per-item message shape after fan-out:
{
"task_id": "task-1",
"correlation_id": "req-1",
"service": "bulk-api",
"task_type": "invoice.render",
"payload": {
"kind": "demo"
},
"spec_version": "1.0"
}
Example headers on that per-item message:
{
"task_id": "task-1",
"batch_id": "batch-123",
"batch_index": 0,
"batch_size": 2
}
Task worker example¶
from relayna.consumer import RetryPolicy, TaskConsumer, TaskContext
from relayna.contracts import TaskEnvelope
async def handle_task(task: TaskEnvelope, context: TaskContext) -> None:
await context.publish_status(
status="processing",
message=f"Batch {context.batch_id} item {context.batch_index + 1}/{context.batch_size} started."
if context.batch_id is not None
else "Task processing started.",
)
await context.publish_status(
status="completed",
message="Task processing completed.",
result={
"batch_id": context.batch_id,
"batch_index": context.batch_index,
"batch_size": context.batch_size,
},
)
consumer = TaskConsumer(
rabbitmq=client,
handler=handle_task,
retry_policy=RetryPolicy(max_retries=3, delay_ms=30000),
consume_timeout_seconds=1.0,
alias_config=alias_config,
)
await consumer.run_forever()
Set consume_timeout_seconds=None when you want the worker runtime to block
indefinitely waiting for the next message instead of waking up every second.
That reduces idle wake-ups, but stop() becomes best-effort for standalone
consumers and may not complete until a message arrives or the task is canceled.
Inside the handler:
task.task_idis always canonical, even when the producer sentattempt_idcontext.batch_idis set only for batch-envelope deliveriescontext.batch_indexis zero-based within the batchcontext.batch_sizeis the total number of items in that envelope
Batch-envelope consumption requires retry_policy. When one item fails, Relayna
retries only that failed per-item message instead of retrying the original
batch envelope.
Example status response for the first item in a batch:
{
"attempt_id": "attempt-123",
"event": {
"attempt_id": "attempt-123",
"status": "completed",
"message": "Task processing completed.",
"result": {
"batch_id": "batch-123",
"batch_index": 0,
"batch_size": 2
}
}
}
When retry_policy is enabled more generally, Relayna creates a broker-delayed
retry queue and a per-source DLQ. Malformed JSON and invalid envelopes go
straight to the DLQ; handler failures retry until max_retries is exhausted,
then dead-letter.
Retry and DLQ headers¶
Relayna keeps retry metadata in RabbitMQ message headers. The task or aggregation payload body is not rewritten.
Headers added by Relayna:
headers["x-relayna-retry-attempt"]The current retry attempt number on the republished message.0means the original delivery.1means the first delayed retry.headers["x-relayna-max-retries"]The configured retry limit for this consumer. Relayna compares the current attempt against this value to decide whether to retry again or dead-letter.headers["x-relayna-source-queue"]The original worker queue that owns the message. Relayna uses this to make it clear which queue the retry or DLQ message came from.headers["x-relayna-failure-reason"]A short machine-readable failure category such ashandler_error,malformed_json, orinvalid_envelope.headers["x-relayna-exception-type"]The Python exception type name when one exists, such asRuntimeErrororValidationError. Relayna sets this tonull/Nonestyle absence when the failure was not caused by a raised exception, such as malformed JSON decode.
Example DLQ message metadata after a handler fails on the last allowed retry:
headers = {
"x-relayna-retry-attempt": 3,
"x-relayna-max-retries": 3,
"x-relayna-source-queue": "tasks.queue",
"x-relayna-failure-reason": "handler_error",
"x-relayna-exception-type": "RuntimeError",
}
Example task body in that same DLQ message:
{
"task_id": "task-123",
"payload": {
"kind": "demo"
}
}
That example means:
- the task has already been retried three times
- the consumer limit was three retries
- the message originated from
tasks.queue - the final failure came from application handler code
- the thrown exception type was
RuntimeError
DLQ monitoring router¶
If you want a dashboard or internal tool to investigate dead-lettered messages, enable the optional Redis-backed DLQ index in FastAPI and pass the same store to workers.
from redis.asyncio import Redis
from relayna.consumer import RetryPolicy, TaskConsumer
from relayna.dlq import DLQService, RedisDLQStore
from relayna.fastapi import create_dlq_router, create_relayna_lifespan, get_relayna_runtime
dlq_store_prefix = "relayna-dlq"
worker_dlq_store = RedisDLQStore(Redis.from_url("redis://localhost:6379/0"), prefix=dlq_store_prefix)
consumer = TaskConsumer(
rabbitmq=client,
handler=handle_task,
retry_policy=RetryPolicy(max_retries=3, delay_ms=30000),
consume_timeout_seconds=None,
dlq_store=worker_dlq_store,
)
app = FastAPI(
lifespan=create_relayna_lifespan(
topology=topology,
redis_url="redis://localhost:6379/0",
dlq_store_prefix=dlq_store_prefix,
)
)
runtime = get_relayna_runtime(app)
if runtime.dlq_store is not None:
app.include_router(
create_dlq_router(
dlq_service=DLQService(
rabbitmq=runtime.rabbitmq,
dlq_store=runtime.dlq_store,
status_store=runtime.store,
)
)
)
This exposes:
GET /dlq/queuesGET /dlq/messagesGET /dlq/messages/{dlq_id}POST /dlq/messages/{dlq_id}/replayGET /broker/dlq/queueswhenbroker_dlq_queue_names=...is configured
Important limitation:
- Relayna does not read live DLQ payloads directly from RabbitMQ because classic queues do not support a read-only payload peek over AMQP
- the DLQ router uses Redis for message detail and RabbitMQ only for live queue counts and replay transport
GET /dlq/queues is intentionally index-backed. It means “queues known from
indexed DLQ records plus live count lookup,” not “list all RabbitMQ DLQ
queues.”
If you want broader broker visibility, configure broker_dlq_queue_names=...
and use GET /broker/dlq/queues. That endpoint inspects the configured
candidate queue names together with queue names already present in the DLQ
index. Broker-only queues appear with indexed_count=0 and
last_indexed_at=null.
Example: multi-stage workflow + shared status¶
Use SharedStatusWorkflowTopology when work should move through multiple named
stages such as planner, re-planner, search, writer, and file-creation steps.
This topology is stage-inbox based:
- one workflow topic exchange carries stage-to-stage work
- each consuming stage owns one durable inbox queue
- producers publish to a stage routing key or named entry route
- status events still publish on the shared status exchange so
StatusHub,StreamHistoryReader, and SSE stay unchanged
This is the recommended first-class model because the durable queue belongs to the stage that consumes it. Retry, DLQ, lag, and autoscaling therefore stay attached to the worker that owns the work rather than to an upstream producer.
Routing convention¶
The examples below use these conventions:
- planner-stage routing keys:
planner.<stage>.in - replanner entry routes:
replanner.<stage>.in - writer-stage routing keys:
writer.<stage>.in - one stage may bind multiple routing keys when both planner and replanner traffic should land in the same inbox queue
Planner Flow¶
flowchart LR
API["API"] -->|publish planner.topic_planner.in| EX["workflow exchange"]
EX --> TPQ["topic_planner inbox queue"]
TPQ --> TP["Topic Planner Agent"]
TP -->|publish planner.docsearch_planner.in| EX
EX --> DSPQ["docsearch_planner inbox queue"]
DSPQ --> DSP["Docsearch Planner Agent"]
DSP -->|publish planner.planner_assembler.in| EX
EX --> PAQ["planner_assembler inbox queue"]
PAQ --> PA["Planner Assembler"]
PA -->|publish planner.document_date_finder.in| EX
EX --> DDFQ["document_date_finder inbox queue"]
DDFQ --> DDF["Document Date Finder Agent"]
TP -. publish status .-> SX["status exchange"]
DSP -. publish status .-> SX
PA -. publish status .-> SX
DDF -. publish status .-> SX
Re-planner Flow¶
The re-planner path reuses the same consuming planner stages. The only thing that changes is the entry route and, where needed, extra binding keys on the shared planner inbox queue.
flowchart LR
API["API"] -->|publish replanner.docsearch_planner.in| EX["workflow exchange"]
API -->|publish replanner.planner_assembler.in| EX
EX --> DSPQ["docsearch_planner inbox queue"]
DSPQ --> DSP["Docsearch Planner Agent"]
EX --> PAQ["planner_assembler inbox queue"]
PAQ --> PA["Planner Assembler"]
DSP -->|publish planner.planner_assembler.in| EX
DSP -. publish status .-> SX["status exchange"]
PA -. publish status .-> SX
Writer Flow¶
flowchart LR
API["API"] -->|publish writer.docsearcher.in| EX["workflow exchange"]
API -->|publish writer.websearcher.in| EX
EX --> DSQ["docsearcher inbox queue"]
DSQ --> DS["DocSearcher Agent"]
EX --> WSQ["websearcher inbox queue"]
WSQ --> WS["WebSearcher Agent"]
DS -->|publish writer.researcher_aggregator.in| EX
WS -->|publish writer.researcher_aggregator.in| EX
EX --> RAQ["researcher_aggregator inbox queue"]
RAQ --> RA["Researcher Aggregator"]
RA -->|publish writer.writer.in| EX
EX --> WQ["writer inbox queue"]
WQ --> W["Writer Agent"]
W -->|publish writer.writer_aggregator.in| EX
EX --> WAQ["writer_aggregator inbox queue"]
WAQ --> WA["Writer Aggregator"]
WA -->|publish writer.file_creator.in| EX
EX --> FCQ["file_creator inbox queue"]
FCQ --> FC["File Creator Agent"]
FC -->|publish writer.writer_assembler.in| EX
EX --> WASQ["writer_assembler inbox queue"]
WASQ --> WAS["Writer Assembler"]
Constructing the topology¶
from relayna.topology import SharedStatusWorkflowTopology, WorkflowEntryRoute, WorkflowStage
topology = SharedStatusWorkflowTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
workflow_exchange="ca.workflow.exchange",
status_exchange="ca.status.exchange",
status_queue="ca.status.queue",
workflow_consumer_timeout_ms=120000,
workflow_queue_type="quorum",
stages=(
WorkflowStage(
name="topic_planner",
queue="cq.topic_planner.in_queue",
binding_keys=("planner.topic_planner.in",),
publish_routing_key="planner.topic_planner.in",
),
WorkflowStage(
name="docsearch_planner",
queue="cq.docsearch_planner.in_queue",
binding_keys=(
"planner.docsearch_planner.in",
"replanner.docsearch_planner.in",
),
publish_routing_key="planner.docsearch_planner.in",
),
WorkflowStage(
name="planner_assembler",
queue="cq.planner_assembler.in_queue",
binding_keys=(
"planner.planner_assembler.in",
"replanner.planner_assembler.in",
),
publish_routing_key="planner.planner_assembler.in",
),
WorkflowStage(
name="document_date_finder",
queue="cq.document_date_finder.in_queue",
binding_keys=("planner.document_date_finder.in",),
publish_routing_key="planner.document_date_finder.in",
),
WorkflowStage(
name="docsearcher",
queue="cq.docsearcher.in_queue",
binding_keys=("writer.docsearcher.in",),
publish_routing_key="writer.docsearcher.in",
),
WorkflowStage(
name="websearcher",
queue="cq.websearcher.in_queue",
binding_keys=("writer.websearcher.in",),
publish_routing_key="writer.websearcher.in",
),
WorkflowStage(
name="researcher_aggregator",
queue="cq.researcher_aggregator.in_queue",
binding_keys=("writer.researcher_aggregator.in",),
publish_routing_key="writer.researcher_aggregator.in",
),
WorkflowStage(
name="writer",
queue="cq.writer.in_queue",
binding_keys=("writer.writer.in",),
publish_routing_key="writer.writer.in",
),
WorkflowStage(
name="writer_aggregator",
queue="cq.writer_aggregator.in_queue",
binding_keys=("writer.writer_aggregator.in",),
publish_routing_key="writer.writer_aggregator.in",
),
WorkflowStage(
name="file_creator",
queue="cq.file_creator.in_queue",
binding_keys=("writer.file_creator.in",),
publish_routing_key="writer.file_creator.in",
),
WorkflowStage(
name="writer_assembler",
queue="cq.writer_assembler.in_queue",
binding_keys=("writer.writer_assembler.in",),
publish_routing_key="writer.writer_assembler.in",
),
),
entry_routes=(
WorkflowEntryRoute(
name="planner_entry",
routing_key="planner.topic_planner.in",
target_stage="topic_planner",
),
WorkflowEntryRoute(
name="replanner_docsearch_entry",
routing_key="replanner.docsearch_planner.in",
target_stage="docsearch_planner",
),
WorkflowEntryRoute(
name="replanner_assembler_entry",
routing_key="replanner.planner_assembler.in",
target_stage="planner_assembler",
),
WorkflowEntryRoute(
name="writer_docsearch_entry",
routing_key="writer.docsearcher.in",
target_stage="docsearcher",
),
WorkflowEntryRoute(
name="writer_websearch_entry",
routing_key="writer.websearcher.in",
target_stage="websearcher",
),
),
)
Publishing to a normal entry route¶
Use publish_to_entry(...) when a producer wants the topology to own the
workflow routing decision for a named ingress route.
from relayna.contracts import WorkflowEnvelope
from relayna.rabbitmq import RelaynaRabbitClient
client = RelaynaRabbitClient(topology=topology, connection_name="planner-api")
await client.initialize()
await client.publish_to_entry(
WorkflowEnvelope(
task_id="task-001",
stage="topic_planner",
action="build-plan",
priority=7,
payload={"query": "new compliance workflow"},
meta={"source": "api"},
),
route="planner_entry",
)
Publishing to a re-planner entry route¶
Use a different named entry route when re-planner traffic should enter further down the same planner pipeline.
await client.publish_to_entry(
WorkflowEnvelope(
task_id="task-001",
stage="docsearch_planner",
action="replan-docsearch",
priority=6,
payload={"new_documents": ["doc-100", "doc-200"]},
meta={"reason": "documents_changed"},
),
route="replanner_docsearch_entry",
)
await client.publish_to_entry(
WorkflowEnvelope(
task_id="task-002",
stage="planner_assembler",
action="replan-assemble",
priority=4,
payload={"existing_documents": ["doc-001", "doc-002"]},
meta={"reason": "old_documents_only"},
),
route="replanner_assembler_entry",
)
Planner-stage worker with WorkflowConsumer¶
from relayna.consumer import RetryPolicy, RetryStatusConfig, WorkflowConsumer, WorkflowContext
from relayna.contracts import WorkflowEnvelope
async def handle_docsearch_planner(message: WorkflowEnvelope, context: WorkflowContext) -> None:
await context.publish_status(status="processing", message="Docsearch planner started.")
docs = [
{"doc_id": "doc-123", "title": "Policy Handbook"},
{"doc_id": "doc-456", "title": "Retention Matrix"},
]
await context.publish_to_stage(
"planner_assembler",
payload={"documents": docs, "source_action": message.action},
action="assemble-plan",
meta={"planner_stage": context.stage},
priority=5,
)
consumer = WorkflowConsumer(
rabbitmq=client,
stage="docsearch_planner",
handler=handle_docsearch_planner,
retry_policy=RetryPolicy(max_retries=3, delay_ms=15000),
retry_statuses=RetryStatusConfig(enabled=True),
consume_timeout_seconds=1.0,
)
await consumer.run_forever()
Publishing directly to a stage¶
Use publish_to_stage(...) when the caller already knows the destination stage
and does not need a named entry-route abstraction.
await client.publish_to_stage(
WorkflowEnvelope(
task_id="task-010",
stage="writer",
action="draft",
priority=8,
payload={"outline": ["intro", "body", "summary"]},
meta={"source_stage": "researcher_aggregator"},
),
stage="writer",
)
Writer fan-in example¶
Multiple upstream stages can target the same inbox queue. That is the normal way to model fan-in under the stage-inbox topology.
from relayna.consumer import WorkflowContext
from relayna.contracts import WorkflowEnvelope
async def handle_docsearcher(message: WorkflowEnvelope, context: WorkflowContext) -> None:
await context.publish_status(status="searching", message="Docsearch results ready.")
await context.publish_to_stage(
"researcher_aggregator",
payload={"source": "docsearcher", "documents": [{"doc_id": "doc-100"}]},
action="aggregate-research",
)
async def handle_websearcher(message: WorkflowEnvelope, context: WorkflowContext) -> None:
await context.publish_status(status="searching", message="Websearch results ready.")
await context.publish_to_stage(
"researcher_aggregator",
payload={"source": "websearcher", "documents": [{"doc_id": "web-200"}]},
action="aggregate-research",
)
In this model:
- both upstream workers publish to
writer.researcher_aggregator.in researcher_aggregatorowns the only durable inbox queue for that stage- retries and backlog are measured on that consumer-owned inbox queue
Example: shared tasks + shared status + sharded aggregation¶
Use this topology when normal task workers stay on one shared task queue, but
aggregation work should be distributed across shard-owned queues such as
aggregation.queue.0, aggregation.queue.1, or subset queues like
aggregation.queue.shards.1-3.
Aggregation messages are still published on the shared status exchange, so
StatusHub, StreamHistoryReader, and SSE clients can see them like normal
status events.
from relayna.rabbitmq import RelaynaRabbitClient
from relayna.topology import SharedTasksSharedStatusShardedAggregationTopology
topology = SharedTasksSharedStatusShardedAggregationTopology(
rabbitmq_url="amqp://guest:guest@localhost:5672/",
tasks_exchange="tasks.exchange",
tasks_queue="tasks.queue",
tasks_routing_key="task.request",
status_exchange="status.exchange",
status_queue="status.queue",
shard_count=4,
aggregation_queue_template="aggregation.queue.dev.{shard}",
aggregation_queue_name_prefix="aggregation.queue.dev.shards",
)
client = RelaynaRabbitClient(topology=topology)
await client.initialize()
Task worker publishes aggregation work¶
from relayna.consumer import RetryPolicy, RetryStatusConfig, TaskConsumer, TaskContext
from relayna.contracts import TaskEnvelope
async def handle_task(task: TaskEnvelope, context: TaskContext) -> None:
await context.publish_status(status="processing", message="Sub-task started.")
await context.publish_aggregation_status(
status="aggregating",
message="Partial result ready for shard aggregation.",
meta={"parent_task_id": "root-task-123"},
result={"chunk_id": task.task_id, "value": 42},
)
consumer = TaskConsumer(
rabbitmq=client,
handler=handle_task,
retry_policy=RetryPolicy(max_retries=3, delay_ms=15000),
retry_statuses=RetryStatusConfig(enabled=True),
consume_timeout_seconds=1.0,
)
await consumer.run_forever()
meta.parent_task_id is required for aggregation publishing. relayna
computes the shard from that parent id, writes meta.aggregation_shard, sets
meta.aggregation_role="aggregation", and publishes the event to a routing key
like agg.0.
Choosing publish_status(...) vs publish_aggregation_status(...)¶
This is the most important rule:
TaskContext.publish_status(...)publishes a normal shared status event for the current context taskTaskContext.publish_aggregation_status(...)publishes an aggregation input event for the current context task, routed bymeta.parent_task_idTaskContext.manual_retry(...)republishes the current task under the sametask_idfor a different routed worker, usually by changingtask_type
In both helpers, the canonical task_id stays the current context task id.
meta.parent_task_id does not replace task_id; it only describes the parent
relationship and shard-routing key.
That means:
- use
publish_status(...)when you want to say something about the current child task itself - use
publish_aggregation_status(...)when the current child task is emitting input for parent-level aggregation workers - if you need to publish a status whose real
task_idis the parent task, do not use the context helper; publish an explicitStatusEventEnvelopewith the parent task id throughrabbitmq.publish_status(...) - use
manual_retry(...)when the current worker decides the task should be handed off to another routed task worker under the sametask_id
manual_retry(...) is separate from broker retry/DLQ behavior. It republishes a
new task message, emits a manual_retrying status for the current task_id,
and preserves manual-retry lineage metadata for the next worker.
When top-level task priority is present, manual_retry(...) preserves it by
default. Pass priority=... to override the republished task priority.
Manual handoff retry example¶
Use manual_retry(...) when the current worker decides the task should be
reprocessed by a different routed worker with different settings.
Source worker:
from relayna.consumer import TaskContext
from relayna.contracts import TaskEnvelope
async def generate(task: TaskEnvelope, context: TaskContext) -> None:
score = float(task.payload.get("quality_score", 0))
if score < 0.9:
await context.manual_retry(
task_type="draft.review",
priority=7,
payload_merge={
"mode": "strict",
"review_required": True,
},
reason="quality gate failed",
meta={"quality_score": score},
)
return
await context.publish_status(status="completed", message="Draft accepted.")
If you publish tasks with mode="batch_envelope" and use top-level
priority, every enclosed task must share the same priority value. Relayna
rejects mixed-priority batch envelopes because RabbitMQ only accepts one AMQP
message priority per published delivery.
RabbitMQ only schedules by priority when the destination queue was declared
with x-max-priority, using Relayna topology fields such as
task_max_priority or workflow_max_priority.
Relayna also fails client-side when a task or workflow publish requests a
top-level priority above the configured queue max priority for that topology.
Target worker:
from relayna.consumer import TaskContext
from relayna.contracts import TaskEnvelope
async def review(task: TaskEnvelope, context: TaskContext) -> None:
assert task.task_type == "draft.review"
assert context.manual_retry_count == 1
assert context.manual_retry_previous_task_type == "draft.generate"
await context.publish_status(
status="processing",
message="Review worker accepted manual retry handoff.",
meta={
"review_mode": task.payload["mode"],
"handoff_reason": context.manual_retry_reason,
},
)
await context.publish_status(status="completed", message="Review completed.")
Result:
- the republished task keeps the same
task_id - the republished task can use a different
task_type, payload, orservice - Relayna emits
manual_retryingfor the original worker instead ofcompleted - the next worker sees lineage on
TaskContext.manual_retry_count,manual_retry_previous_task_type,manual_retry_source_consumer, andmanual_retry_reason - status history, latest-status, and SSE remain on one timeline because all
events still use the same
task_id
Operational rule:
- the source and target workers must use routed topologies that share the same
exchanges/status plane but bind different task queues to different
task_typevalues - if no queue is bound for the target
task_type, RabbitMQ will not have a routed worker queue to deliver that handoff message to
Header behavior:
- broker retry headers such as
x-relayna-retry-attemptare not reused - manual handoff lineage is carried in separate
x-relayna-manual-retry-*headers and reflected in status metadata undermeta.manual_retry - normal broker retry still applies independently if the target worker later
fails and has
retry_policy=...
Child task status example¶
Use publish_status(...) for ordinary child lifecycle updates:
async def handle_task(task: TaskEnvelope, context: TaskContext) -> None:
await context.publish_status(
status="processing",
message="Child task started.",
)
await context.publish_status(
status="completed",
message="Child task finished successfully.",
)
Result:
task_idis the child task id fromcontext- the event goes to the shared status stream
- SSE, latest-status, and history readers will treat it as normal status for that child task
Child emits aggregation input example¶
Use publish_aggregation_status(...) when a child has partial output that a
parent-level aggregation worker should consume:
async def handle_task(task: TaskEnvelope, context: TaskContext) -> None:
await context.publish_aggregation_status(
status="aggregating",
message="Partial result ready for parent aggregation.",
meta={"parent_task_id": "root-task-123"},
result={"chunk_id": task.task_id, "value": 42},
)
Result:
task_idis still the child task id fromcontextmeta.parent_task_idtells Relayna which parent/root task this child belongs to- the event is routed to the aggregation shard queue for that parent
- aggregation workers can process it, while
StatusHub, SSE, and history can still observe it from the shared status exchange
Parent task status example¶
If you want a status event whose canonical identity is the parent task itself, publish it explicitly instead of using the child context helper:
from relayna.contracts import StatusEventEnvelope
async def handle_task(task: TaskEnvelope, context: TaskContext) -> None:
await context.rabbitmq.publish_status(
StatusEventEnvelope(
task_id="root-task-123",
status="aggregation-complete",
message="Parent task finished aggregation.",
meta={"source_child_task_id": task.task_id},
correlation_id="root-task-123",
)
)
Result:
task_idis the parent task id, not the child task id- clients querying
/events/root-task-123or/status/root-task-123will see this as a parent-task status - this does not route back into aggregation queues
Practical decision guide¶
Use context.publish_status(...) when:
- the status belongs to the current task in the handler context
- you want normal status history, SSE, and latest-status behavior
- you are inside an aggregation worker and want to publish bookkeeping or final statuses without feeding the aggregation queue again
Use context.publish_aggregation_status(...) when:
- the current task is emitting work or partial output for parent aggregation
- you need shard routing based on
meta.parent_task_id - the event is intended to be consumed by
AggregationConsumer
Use context.rabbitmq.publish_status(StatusEventEnvelope(...)) when:
- the event should belong to a different task id than the current context task
- you are publishing a real parent/root status from child or aggregation logic
Use context.manual_retry(...) when:
- the current task should stay on the same logical
task_id - the next processing attempt should go to a different routed
task_type - you want shared history/SSE/latest-status continuity across the handoff
- you need the next worker to see standardized handoff lineage
Aggregation worker runtime example¶
import asyncio
from relayna.consumer import AggregationWorkerRuntime, RetryPolicy, RetryStatusConfig, TaskContext
from relayna.contracts import StatusEventEnvelope
async def aggregate(event: StatusEventEnvelope, context: TaskContext) -> None:
shard = event.meta.get("aggregation_shard")
await context.publish_status(
status="aggregation-processed",
message=f"Aggregation worker handled shard {shard}.",
meta={
"parent_task_id": event.meta["parent_task_id"],
"aggregation_shard": shard,
},
)
runtime = AggregationWorkerRuntime(
topology=topology,
handler=aggregate,
shard_groups=[[0], [1, 2, 3]],
retry_policy=RetryPolicy(max_retries=3, delay_ms=15000),
retry_statuses=RetryStatusConfig(enabled=True),
consume_timeout_seconds=None,
)
await runtime.start()
try:
await asyncio.Event().wait()
finally:
await runtime.stop()
shard_groups=[[0], [1, 2, 3]] means:
- one aggregation consumer owns shard
0 - a second aggregation consumer owns shards
1,2, and3
consume_timeout_seconds=None makes each aggregation consumer block waiting for
the next event. AggregationWorkerRuntime.stop() still falls back to task
cancellation if a blocking consumer does not exit within the runtime stop
timeout.
Direct aggregation-queue probe example¶
Use this when you want to validate shard routing directly against RabbitMQ in a real environment.
import json
from relayna.contracts import StatusEventEnvelope
from relayna.rabbitmq import RelaynaRabbitClient
probe = RelaynaRabbitClient(topology=topology, connection_name="aggregation-probe")
await probe.initialize()
queue_name = await probe.ensure_aggregation_queue(shards=[0, 1, 2, 3])
await probe.publish_aggregation_status(
StatusEventEnvelope(
task_id="agg-task-123",
status="aggregating",
message="Aggregation input ready.",
meta={"parent_task_id": "parent-123"},
)
)
channel = await probe.acquire_channel(prefetch=1)
queue = await channel.declare_queue(
queue_name,
durable=True,
arguments=topology.aggregation_queue_arguments() or None,
)
message = await queue.get(timeout=5.0, fail=True)
payload = json.loads(message.body.decode("utf-8"))
await message.ack()
payload["meta"]["aggregation_shard"] tells you which shard routing key was
used, and the fact that the queue received the message proves that the
aggregation queue binding is live.
Custom SSE terminal statuses¶
If your aggregation workflow finishes on a status other than completed or
failed, pass a TerminalStatusSet to FastAPI so /events/{task_id} streams
terminate correctly.
from relayna.contracts import TerminalStatusSet
from relayna.fastapi import create_relayna_lifespan
app = FastAPI(
lifespan=create_relayna_lifespan(
topology=topology,
redis_url="redis://localhost:6379/0",
sse_terminal_statuses=TerminalStatusSet({"aggregation-processed"}),
)
)
Redis-backed status history¶
from redis.asyncio import Redis
from relayna.status_store import RedisStatusStore
redis = Redis.from_url("redis://localhost:6379/0")
store = RedisStatusStore(redis, prefix="relayna", ttl_seconds=86400, history_maxlen=50)
Status bridge¶
from relayna.status_hub import StatusHub
hub = StatusHub(rabbitmq=client, store=store)
await hub.run_forever()
End-to-end composition¶
For the default shared topology:
- Publish tasks with
RelaynaRabbitClient. - Consume tasks with
TaskConsumer. - Publish status updates from handlers with
TaskContext.publish_status(...). - Run
StatusHubto bridge shared status messages intoRedisStatusStore. - Expose
create_status_router(...)in your FastAPI service for history and SSE.
For the sharded aggregation topology:
- Publish tasks with the same shared task plane.
- Task workers publish aggregation work with
publish_aggregation_status(...). - Aggregation workers consume shard-owned queues through
AggregationWorkerRuntime. StatusHub,StreamHistoryReader, and SSE still observe aggregation events from the shared status exchange.
Real-environment smoke commands¶
If RabbitMQ is running at localhost:5672 and Redis at localhost:6379, these
scripts exercise both topology families against the real stack:
PYTHONPATH=src ./.venv/bin/python scripts/real_fastapi_status_smoke.py
PYTHONPATH=src ./.venv/bin/python scripts/real_task_worker_smoke.py
PYTHONPATH=src ./.venv/bin/python scripts/real_workflow_smoke.py
PYTHONPATH=src ./.venv/bin/python scripts/real_sharded_aggregation_smoke.py
PYTHONPATH=src ./.venv/bin/python scripts/real_queue_args_smoke.py