Observability¶
relayna exposes backend-agnostic runtime observations through async sink
callbacks. The library emits typed dataclass events from long-running loops such
as SSE streaming, worker consumption, and status fanout, but it does not ship a
logging backend, metrics registry, or tracing exporter.
How it works¶
Observability in relayna is built around two public concepts from
relayna.observability:
RelaynaObservationA protocol implemented by all observation dataclasses.ObservationSinkAn async callback with the shapeasync def sink(event) -> None.
Every observation includes:
componentOne of"sse","consumer", or"status_hub".timestampA UTCdatetimerecorded when the event object was created.
Important behavior:
- sinks are async-only
- sink failures are suppressed by Relayna
- observations are best-effort and never block the main workflow intentionally
- Relayna does not include raw message bodies in observation events
Basic usage¶
from relayna.consumer import TaskConsumer
from relayna.observability import RelaynaObservation
async def sink(event: RelaynaObservation) -> None:
print(event)
consumer = TaskConsumer(
rabbitmq=client,
handler=handle_task,
consume_timeout_seconds=1.0,
observation_sink=sink,
)
await consumer.run_forever()
The same sink pattern works with:
TaskConsumerAggregationConsumerAggregationWorkerRuntimeSSEStatusStreamStatusHub
Example with multiple components:
from relayna.consumer import TaskConsumer
from relayna.sse import SSEStatusStream
from relayna.status_hub import StatusHub
async def sink(event) -> None:
print(event.component, event)
consumer = TaskConsumer(
rabbitmq=client,
handler=handle_task,
consume_timeout_seconds=1.0,
observation_sink=sink,
)
hub = StatusHub(rabbitmq=client, store=store, observation_sink=sink)
stream = SSEStatusStream(store=store, observation_sink=sink)
Filtering by event type¶
Use normal Python isinstance(...) checks against the observation dataclasses.
from relayna.observability import (
ConsumerRetryScheduled,
SSEStreamStarted,
StatusHubStoredEvent,
TaskMessageAcked,
)
async def sink(event) -> None:
if isinstance(event, SSEStreamStarted):
print("stream started", event.task_id, event.resume_requested)
elif isinstance(event, TaskMessageAcked):
print("task acked", event.task_id)
elif isinstance(event, ConsumerRetryScheduled):
print("retry scheduled", event.task_id, event.retry_attempt, event.max_retries)
elif isinstance(event, StatusHubStoredEvent):
print("stored event", event.task_id, event.status)
Structured logging example¶
If you want simple structured logs, convert dataclasses with dataclasses.asdict
before handing them to your logger.
from dataclasses import asdict, is_dataclass
import json
async def sink(event) -> None:
payload = asdict(event) if is_dataclass(event) else {"event": repr(event)}
print(json.dumps(payload, default=str))
This is a good default when you want observability quickly without introducing a metrics or tracing dependency.
Event groups¶
SSE events¶
SSEStatusStream can emit:
SSEStreamStartedSSEResumeRequestedSSEHistoryReplayedSSEKeepaliveSentSSELiveEventSentSSEMalformedPubsubPayloadSSEStreamEnded
Use these when you want to understand:
- how often clients reconnect with
Last-Event-ID - how many history events are replayed
- whether streams are mostly idle keepalives or live updates
- when malformed Redis pubsub payloads are being ignored
Consumer events¶
TaskConsumer emits:
TaskConsumerStartedTaskMessageReceivedTaskMessageAckedTaskMessageRejectedTaskHandlerFailedTaskLifecycleStatusPublishedTaskConsumerLoopErrorConsumerRetryScheduledConsumerDeadLetterPublishedConsumerDLQRecordPersistFailed
AggregationConsumer and AggregationWorkerRuntime currently share the retry
and dead-letter observation events:
ConsumerRetryScheduledConsumerDeadLetterPublishedConsumerDLQRecordPersistFailed
Use these when you want to monitor:
- queue start-up and reconnect loops
- message validation failures
- handler exceptions
- lifecycle status automation
- retry and DLQ volume
- DLQ publish success paired with Redis index write failures
Status hub events¶
StatusHub emits:
StatusHubStartedStatusHubStoredEventStatusHubMalformedMessageStatusHubStoreWriteFailedStatusHubLoopError
Use these when you want visibility into the RabbitMQ-to-Redis bridge, especially for malformed status payloads or Redis write failures.
Practical patterns¶
Use one sink for multiple outcomes:
- logging Serialize each event and write it to your app logger.
- metrics
Increment counters based on event type or
component. - tracing Attach event fields to spans or breadcrumb-style traces.
- debugging Print or buffer recent events while developing a worker or SSE endpoint.
Example metrics-style sink:
from collections import Counter
counts = Counter()
async def sink(event) -> None:
counts[type(event).__name__] += 1
counts[f"component:{event.component}"] += 1
Notes and limitations¶
- Observability is opt-in. Relayna does nothing unless you pass an
observation_sink=.... - The FastAPI lifespan helper does not currently accept observation sinks
directly. If you want observability there, construct
StatusHuborSSEStatusStreammanually and pass the sink yourself. - Observation dataclasses are intended for operational use, not as a stable wire protocol between services.