Skip to content

Repository Tracing Decorator

instrument_repo is a class decorator that automatically wraps every public async method of a repository class with an OTel trace span. Apply it once at class definition time and every DB operation becomes visible in distributed traces — no per-method boilerplate required.

Quick Start

Python
from obskit import instrument_repo

@instrument_repo(component="postgres")
class NotesRepo:
    async def insert_note(self, title: str, body: str) -> None:
        await self._db.execute(
            "INSERT INTO notes (title, body) VALUES ($1, $2)", title, body
        )

    async def get_notes(self, limit: int = 100) -> list[dict]:
        return await self._db.fetch("SELECT * FROM notes LIMIT $1", limit)

Each method call creates a span:

Call Span name component
repo.insert_note(...) NotesRepo.insert_note postgres
repo.get_notes() NotesRepo.get_notes postgres

Parameters

Parameter Default Description
component "db" Span component attribute — appears in trace views. Use "postgres", "redis", "mongo", etc.
span_prefix class name Override the prefix in span names. Useful for aliasing a repo to a shorter name.
slow_threshold_ms None Emit a slow_repo_operation warning log when any method exceeds this duration in milliseconds.

Slow Operation Warnings

Pass slow_threshold_ms to get automatic warnings for any method that exceeds the threshold:

Python
from obskit import instrument_repo

@instrument_repo(component="postgres", slow_threshold_ms=200.0)
class OrderRepo:
    async def get_orders(self, tenant_id: str) -> list[dict]:
        return await self._db.fetch("SELECT * FROM orders WHERE tid=$1", tenant_id)

When get_orders takes longer than 200 ms, a structured warning is emitted:

JSON
{
  "event": "slow_repo_operation",
  "operation": "OrderRepo.get_orders",
  "duration_ms": 347.8,
  "threshold_ms": 200.0
}

The warning fires even when the method raises an exception, so slow failures are always visible.

Custom Prefix

Python
@instrument_repo(component="postgres", span_prefix="tags")
class TagsRepository:
    async def upsert_tags(self, entity_id: int, tags: list[str]) -> None: ...
    async def delete_tags(self, entity_id: int) -> None: ...
# Spans: "tags.upsert_tags", "tags.delete_tags"

What Gets Wrapped

Method type Wrapped?
Public async methods (async def method(self, ...)) Yes
Private/dunder methods (_method, __init__) No
Synchronous methods (def method(self, ...)) No
@staticmethod No
@classmethod No

Exceptions Are Propagated

If a wrapped method raises, the exception propagates unchanged after the span closes:

Python
@instrument_repo(component="postgres")
class AssignmentRepo:
    async def upsert_assignment(self, data: dict) -> None:
        raise DatabaseError("connection lost")

# Span ends, exception propagates — caller handles it normally

Multiple Repos

Python
@instrument_repo(component="postgres")
class AssignmentRepo: ...

@instrument_repo(component="postgres")
class NotesRepo: ...

@instrument_repo(component="postgres")
class TagsRepo: ...

@instrument_repo(component="postgres")
class CustomFieldsRepo: ...

Event Handler Instrumentation

instrument_event_handler is an async decorator factory that wraps event handlers with an OTel span and Prometheus metrics — no per-handler boilerplate required.

Quick Start

Python
from obskit import instrument_event_handler

@instrument_event_handler(name="order_created")
async def handle_order_created(event: dict) -> None:
    order_id = event["order_id"]
    await process_order(order_id)

Each invocation:

  • Creates a child OTel span named event_handler.order_created
  • Records latency in event_handler_duration_seconds{name="order_created"}
  • Increments event_handler_errors_total{name="order_created"} on any exception

Parameters

Parameter Description
name Logical handler name — used as the name label in all metrics and appended to the OTel span name: event_handler.<name>.

Emitted Metrics

Metric Type Labels Description
event_handler_duration_seconds{name} Histogram name Handler duration in seconds. Always recorded — including when the handler raises.
event_handler_errors_total{name} Counter name Exceptions raised by the handler.

Span Name Convention

The OTel span is named event_handler.<name>:

name parameter Span name
"order_created" event_handler.order_created
"engagement_insert" event_handler.engagement_insert
"status_update" event_handler.status_update

Error Handling

The decorator re-raises the original exception unchanged after incrementing the error counter and recording duration:

Python
@instrument_event_handler(name="payment_processed")
async def handle_payment(event: dict) -> None:
    raise ValueError("missing payment_id")

# ValueError propagates to the caller unchanged
# event_handler_errors_total{name="payment_processed"} += 1
# event_handler_duration_seconds{name="payment_processed"} observed

Class Methods

The decorator works correctly on class methods:

Python
class EngagementInsertHandler:
    @instrument_event_handler(name="engagement_insert")
    async def handle(self, event: dict) -> None:
        await self.repo.insert(event["engagement"])

Multiple Handlers

Python
@instrument_event_handler(name="order_created")
async def handle_order_created(event: dict) -> None: ...

@instrument_event_handler(name="order_cancelled")
async def handle_order_cancelled(event: dict) -> None: ...

@instrument_event_handler(name="payment_processed")
async def handle_payment_processed(event: dict) -> None: ...

Each handler has its own independent metric series.

Grafana alert example

PromQL
# Alert when any event handler error rate rises
rate(event_handler_errors_total[5m]) > 0

# P99 handler latency by name
histogram_quantile(0.99,
  rate(event_handler_duration_seconds_bucket[5m])
) by (name)

API Reference

obskit.decorators.repo.instrument_repo

Python
instrument_repo(
    *,
    component: str = "db",
    span_prefix: str | None = None,
    slow_threshold_ms: float | None = None,
) -> Callable[[type], type]

Class decorator that wraps all public async methods with OTel trace spans.

Parameters

component : str Span component attribute, e.g. "postgres", "redis", "mongo". Default: "db". span_prefix : str, optional Prefix for span names. If None, the decorated class name is used. Span name format: "{prefix}.{method_name}".

Returns

callable Class decorator.

Example

::

Text Only
@instrument_repo(component="postgres")
class TagsRepo:
    async def upsert_tags(self, entity_id: int, tags: list[str]) -> None:
        ...
    async def delete_tags(self, entity_id: int) -> None:
        ...
# Spans: "TagsRepo.upsert_tags", "TagsRepo.delete_tags"

Notes

Only public async methods (names not starting with _) defined directly on the class are wrapped. Static methods, class methods, and synchronous methods are left untouched. Span attributes are set on the OTel span: component=<component>. slow_threshold_ms : float, optional If set, emit a slow_repo_operation warning log for any method call whose wall-clock duration exceeds this threshold (in milliseconds).

Source code in src/obskit/decorators/repo.py
Python
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def instrument_repo(
    *,
    component: str = "db",
    span_prefix: str | None = None,
    slow_threshold_ms: float | None = None,
) -> Callable[[type], type]:
    """Class decorator that wraps all public async methods with OTel trace spans.

    Parameters
    ----------
    component : str
        Span ``component`` attribute, e.g. ``"postgres"``, ``"redis"``,
        ``"mongo"``.  Default: ``"db"``.
    span_prefix : str, optional
        Prefix for span names.  If *None*, the decorated class name is used.
        Span name format: ``"{prefix}.{method_name}"``.

    Returns
    -------
    callable
        Class decorator.

    Example
    -------
    ::

        @instrument_repo(component="postgres")
        class TagsRepo:
            async def upsert_tags(self, entity_id: int, tags: list[str]) -> None:
                ...
            async def delete_tags(self, entity_id: int) -> None:
                ...
        # Spans: "TagsRepo.upsert_tags", "TagsRepo.delete_tags"

    Notes
    -----
    Only **public async** methods (names not starting with ``_``) defined
    *directly* on the class are wrapped.  Static methods, class methods,
    and synchronous methods are left untouched.
    Span attributes are set on the OTel span: ``component=<component>``.
    slow_threshold_ms : float, optional
        If set, emit a ``slow_repo_operation`` warning log for any method call
        whose wall-clock duration exceeds this threshold (in milliseconds).
    """

    def decorator(cls: type) -> type:
        prefix = span_prefix if span_prefix is not None else cls.__name__

        for attr_name, value in vars(cls).items():
            if attr_name.startswith("_"):
                continue
            # Skip staticmethod / classmethod descriptor objects
            if isinstance(value, (staticmethod, classmethod)):
                continue
            if not asyncio.iscoroutinefunction(value):
                continue

            span_name = f"{prefix}.{attr_name}"
            _component = component
            _threshold_ms = slow_threshold_ms

            @functools.wraps(value)
            async def _wrapped(
                *args: Any,
                _fn: Any = value,
                _span: str = span_name,
                _comp: str = _component,
                _thr: float | None = _threshold_ms,
                **kwargs: Any,
            ) -> Any:
                from obskit.tracing.tracer import async_trace_span  # noqa: PLC0415

                t0 = time.monotonic()
                try:
                    async with async_trace_span(_span, component=_comp):
                        return await _fn(*args, **kwargs)
                finally:
                    if _thr is not None:
                        elapsed_ms = (time.monotonic() - t0) * 1000.0
                        if elapsed_ms > _thr:
                            from obskit.logging.logger import get_logger  # noqa: PLC0415

                            get_logger().warning(
                                "slow_repo_operation",
                                operation=_span,
                                duration_ms=round(elapsed_ms, 2),
                                threshold_ms=_thr,
                            )

            setattr(cls, attr_name, _wrapped)

        return cls

    return decorator

obskit.decorators.event_handler.instrument_event_handler

Python
instrument_event_handler(
    name: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]

Decorator factory: wrap an async handler with a child OTel span + metrics.

Parameters

name : str Handler name — used as the span operation name (event_handler.<name>) and as the metric label value.

Returns

callable Decorator that wraps the target async function.

Example

::

Text Only
@instrument_event_handler(name="status_update")
async def handle(self, event_data: dict) -> None:
    await self._use_case.execute(event_data)

The emitted span name is event_handler.status_update. When use_span_context is active on the calling thread (e.g. after extract_trace_context_from_headers), this span is automatically parented under the publisher's trace.

Notes

  • Duration is recorded in the finally block — it captures the full wall time including exceptions.
  • The error counter is incremented before re-raising so the metric is always recorded even if the caller swallows the exception.
  • OTel tracing degrades gracefully: when obskit[otlp] is not installed the span is a no-op and only the Prometheus metrics are emitted.
Source code in src/obskit/decorators/event_handler.py
Python
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def instrument_event_handler(
    name: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator factory: wrap an async handler with a child OTel span + metrics.

    Parameters
    ----------
    name : str
        Handler name — used as the span operation name
        (``event_handler.<name>``) and as the metric label value.

    Returns
    -------
    callable
        Decorator that wraps the target async function.

    Example
    -------
    ::

        @instrument_event_handler(name="status_update")
        async def handle(self, event_data: dict) -> None:
            await self._use_case.execute(event_data)

    The emitted span name is ``event_handler.status_update``.  When
    ``use_span_context`` is active on the calling thread (e.g. after
    ``extract_trace_context_from_headers``), this span is automatically
    parented under the publisher's trace.

    Notes
    -----
    * Duration is recorded in the ``finally`` block — it captures the full
      wall time including exceptions.
    * The error counter is incremented **before** re-raising so the metric
      is always recorded even if the caller swallows the exception.
    * OTel tracing degrades gracefully: when ``obskit[otlp]`` is not
      installed the span is a no-op and only the Prometheus metrics are
      emitted.
    """
    _name = name
    _span_name = f"event_handler.{name}"

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        @functools.wraps(fn)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            from obskit.tracing.tracer import async_trace_span  # noqa: PLC0415

            start = time.perf_counter()
            try:
                async with async_trace_span(_span_name):
                    return await fn(*args, **kwargs)
            except Exception:
                EVENT_HANDLER_ERRORS_TOTAL.labels(name=_name).inc()
                raise
            finally:
                elapsed = time.perf_counter() - start
                EVENT_HANDLER_DURATION_SECONDS.labels(name=_name).observe(elapsed)

        return wrapper

    return decorator