Repository Tracing Decorator¶
instrument_repo is a class decorator that automatically wraps every public async method of a repository class with an OTel trace span. Apply it once at class definition time and every DB operation becomes visible in distributed traces — no per-method boilerplate required.
Quick Start¶
from obskit import instrument_repo
@instrument_repo(component="postgres")
class NotesRepo:
async def insert_note(self, title: str, body: str) -> None:
await self._db.execute(
"INSERT INTO notes (title, body) VALUES ($1, $2)", title, body
)
async def get_notes(self, limit: int = 100) -> list[dict]:
return await self._db.fetch("SELECT * FROM notes LIMIT $1", limit)
Each method call creates a span:
| Call | Span name | component |
|---|---|---|
repo.insert_note(...) |
NotesRepo.insert_note |
postgres |
repo.get_notes() |
NotesRepo.get_notes |
postgres |
Parameters¶
| Parameter | Default | Description |
|---|---|---|
component |
"db" |
Span component attribute — appears in trace views. Use "postgres", "redis", "mongo", etc. |
span_prefix |
class name | Override the prefix in span names. Useful for aliasing a repo to a shorter name. |
slow_threshold_ms |
None |
Emit a slow_repo_operation warning log when any method exceeds this duration in milliseconds. |
Slow Operation Warnings¶
Pass slow_threshold_ms to get automatic warnings for any method that exceeds the threshold:
from obskit import instrument_repo
@instrument_repo(component="postgres", slow_threshold_ms=200.0)
class OrderRepo:
async def get_orders(self, tenant_id: str) -> list[dict]:
return await self._db.fetch("SELECT * FROM orders WHERE tid=$1", tenant_id)
When get_orders takes longer than 200 ms, a structured warning is emitted:
{
"event": "slow_repo_operation",
"operation": "OrderRepo.get_orders",
"duration_ms": 347.8,
"threshold_ms": 200.0
}
The warning fires even when the method raises an exception, so slow failures are always visible.
Custom Prefix¶
@instrument_repo(component="postgres", span_prefix="tags")
class TagsRepository:
async def upsert_tags(self, entity_id: int, tags: list[str]) -> None: ...
async def delete_tags(self, entity_id: int) -> None: ...
# Spans: "tags.upsert_tags", "tags.delete_tags"
What Gets Wrapped¶
| Method type | Wrapped? |
|---|---|
Public async methods (async def method(self, ...)) |
Yes |
Private/dunder methods (_method, __init__) |
No |
Synchronous methods (def method(self, ...)) |
No |
@staticmethod |
No |
@classmethod |
No |
Exceptions Are Propagated¶
If a wrapped method raises, the exception propagates unchanged after the span closes:
@instrument_repo(component="postgres")
class AssignmentRepo:
async def upsert_assignment(self, data: dict) -> None:
raise DatabaseError("connection lost")
# Span ends, exception propagates — caller handles it normally
Multiple Repos¶
@instrument_repo(component="postgres")
class AssignmentRepo: ...
@instrument_repo(component="postgres")
class NotesRepo: ...
@instrument_repo(component="postgres")
class TagsRepo: ...
@instrument_repo(component="postgres")
class CustomFieldsRepo: ...
Event Handler Instrumentation¶
instrument_event_handler is an async decorator factory that wraps event handlers with an OTel span and Prometheus metrics — no per-handler boilerplate required.
Quick Start¶
from obskit import instrument_event_handler
@instrument_event_handler(name="order_created")
async def handle_order_created(event: dict) -> None:
order_id = event["order_id"]
await process_order(order_id)
Each invocation:
- Creates a child OTel span named
event_handler.order_created - Records latency in
event_handler_duration_seconds{name="order_created"} - Increments
event_handler_errors_total{name="order_created"}on any exception
Parameters¶
| Parameter | Description |
|---|---|
name |
Logical handler name — used as the name label in all metrics and appended to the OTel span name: event_handler.<name>. |
Emitted Metrics¶
| Metric | Type | Labels | Description |
|---|---|---|---|
event_handler_duration_seconds{name} |
Histogram | name |
Handler duration in seconds. Always recorded — including when the handler raises. |
event_handler_errors_total{name} |
Counter | name |
Exceptions raised by the handler. |
Span Name Convention¶
The OTel span is named event_handler.<name>:
name parameter |
Span name |
|---|---|
"order_created" |
event_handler.order_created |
"engagement_insert" |
event_handler.engagement_insert |
"status_update" |
event_handler.status_update |
Error Handling¶
The decorator re-raises the original exception unchanged after incrementing the error counter and recording duration:
@instrument_event_handler(name="payment_processed")
async def handle_payment(event: dict) -> None:
raise ValueError("missing payment_id")
# ValueError propagates to the caller unchanged
# event_handler_errors_total{name="payment_processed"} += 1
# event_handler_duration_seconds{name="payment_processed"} observed
Class Methods¶
The decorator works correctly on class methods:
class EngagementInsertHandler:
@instrument_event_handler(name="engagement_insert")
async def handle(self, event: dict) -> None:
await self.repo.insert(event["engagement"])
Multiple Handlers¶
@instrument_event_handler(name="order_created")
async def handle_order_created(event: dict) -> None: ...
@instrument_event_handler(name="order_cancelled")
async def handle_order_cancelled(event: dict) -> None: ...
@instrument_event_handler(name="payment_processed")
async def handle_payment_processed(event: dict) -> None: ...
Each handler has its own independent metric series.
Grafana alert example¶
# Alert when any event handler error rate rises
rate(event_handler_errors_total[5m]) > 0
# P99 handler latency by name
histogram_quantile(0.99,
rate(event_handler_duration_seconds_bucket[5m])
) by (name)
API Reference¶
obskit.decorators.repo.instrument_repo ¶
instrument_repo(
*,
component: str = "db",
span_prefix: str | None = None,
slow_threshold_ms: float | None = None,
) -> Callable[[type], type]
Class decorator that wraps all public async methods with OTel trace spans.
Parameters¶
component : str
Span component attribute, e.g. "postgres", "redis",
"mongo". Default: "db".
span_prefix : str, optional
Prefix for span names. If None, the decorated class name is used.
Span name format: "{prefix}.{method_name}".
Returns¶
callable Class decorator.
Example¶
::
@instrument_repo(component="postgres")
class TagsRepo:
async def upsert_tags(self, entity_id: int, tags: list[str]) -> None:
...
async def delete_tags(self, entity_id: int) -> None:
...
# Spans: "TagsRepo.upsert_tags", "TagsRepo.delete_tags"
Notes¶
Only public async methods (names not starting with _) defined
directly on the class are wrapped. Static methods, class methods,
and synchronous methods are left untouched.
Span attributes are set on the OTel span: component=<component>.
slow_threshold_ms : float, optional
If set, emit a slow_repo_operation warning log for any method call
whose wall-clock duration exceeds this threshold (in milliseconds).
Source code in src/obskit/decorators/repo.py
| Python | |
|---|---|
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | |
obskit.decorators.event_handler.instrument_event_handler ¶
instrument_event_handler(
name: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]
Decorator factory: wrap an async handler with a child OTel span + metrics.
Parameters¶
name : str
Handler name — used as the span operation name
(event_handler.<name>) and as the metric label value.
Returns¶
callable Decorator that wraps the target async function.
Example¶
::
@instrument_event_handler(name="status_update")
async def handle(self, event_data: dict) -> None:
await self._use_case.execute(event_data)
The emitted span name is event_handler.status_update. When
use_span_context is active on the calling thread (e.g. after
extract_trace_context_from_headers), this span is automatically
parented under the publisher's trace.
Notes¶
- Duration is recorded in the
finallyblock — it captures the full wall time including exceptions. - The error counter is incremented before re-raising so the metric is always recorded even if the caller swallows the exception.
- OTel tracing degrades gracefully: when
obskit[otlp]is not installed the span is a no-op and only the Prometheus metrics are emitted.
Source code in src/obskit/decorators/event_handler.py
| Python | |
|---|---|
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | |