obskit-db¶
Database observability — query tracking, slow-query detection, and OTel auto-instrumentation for SQLAlchemy, psycopg2, and psycopg3.
Installation¶
pip install "obskit[sqlalchemy]" # SQLAlchemy OTel auto-instrumentation
pip install "obskit[psycopg2]" # psycopg2 OTel auto-instrumentation (sync)
pip install "obskit[psycopg3]" # psycopg3 OTel auto-instrumentation (sync + async)
pip install "obskit[db]" # all three DB drivers
Overview¶
obskit.integrations.db gives every database query RED metrics, distributed traces, SLO tracking, and slow-query logging.
| Component | What it does |
|---|---|
instrument_sqlalchemy |
Zero-code: attaches event listeners to a SQLAlchemy engine |
instrument_psycopg2 |
Auto-instruments psycopg2 connections |
instrument_psycopg3 |
Auto-instruments psycopg3 connections (sync + async) |
DatabaseTracker |
Fine-grained per-operation tracking with tenant context |
track_query |
Convenience function — no class needed |
Quick Start¶
from sqlalchemy import create_engine
from obskit.integrations.db.sqlalchemy import instrument_sqlalchemy
engine = create_engine("postgresql://user:pass@localhost/mydb")
instrument_sqlalchemy(engine, database_name="postgres")
# All queries are now automatically tracked — no other changes needed
with engine.connect() as conn:
conn.execute(text("SELECT * FROM orders"))
from obskit.integrations.db.psycopg2 import instrument_psycopg2
instrument_psycopg2()
# All psycopg2 connections now emit OTel spans automatically
from obskit.integrations.db.psycopg3 import instrument_psycopg3
instrument_psycopg3()
# All psycopg3 connections now emit OTel spans automatically
from obskit.integrations.db.tracker import DatabaseTracker
db = DatabaseTracker("postgres", default_slow_threshold_ms=500.0)
def get_user(user_id: str, tenant_id: str):
with db.track_query("get_user", tenant_id=tenant_id):
return session.query(User).filter_by(id=user_id).first()
from obskit.integrations.db.tracker import track_query
with track_query("create_order", database_name="postgres",
tenant_id="acme", slow_query_threshold_ms=200.0):
session.add(order)
session.commit()
instrument_sqlalchemy¶
Attaches SQLAlchemy event listeners to track every query automatically.
from obskit.integrations.db.sqlalchemy import instrument_sqlalchemy
instrument_sqlalchemy(engine, database_name="database")
What it tracks:
- Query execution time
- Slow queries — logs warning when duration > 1 second
- Query errors — logs every
handle_errorevent - Connection pool saturation on each new connection
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
engine |
Engine |
required | SQLAlchemy engine |
database_name |
str |
"database" |
Label used in all metrics and logs |
# Multiple databases
instrument_sqlalchemy(read_engine, database_name="postgres_read")
instrument_sqlalchemy(write_engine, database_name="postgres_write")
instrument_psycopg2¶
Auto-instruments all psycopg2 connections to emit OTel spans.
from obskit.integrations.db.psycopg2 import instrument_psycopg2
instrument_psycopg2()
Requires pip install "obskit[psycopg2]".
instrument_psycopg3¶
Auto-instruments all psycopg3 connections to emit OTel spans. Supports both synchronous and async usage.
from obskit.integrations.db.psycopg3 import instrument_psycopg3
instrument_psycopg3()
Requires pip install "obskit[psycopg3]".
DatabaseTracker¶
Fine-grained tracking with tenant context, SLO integration, and per-operation thresholds.
from obskit.integrations.db.tracker import DatabaseTracker
db = DatabaseTracker(
database_name="postgres",
default_slo_name="db_latency", # optional: wire to SLO tracker
default_slow_threshold_ms=1000.0, # warn on queries > 1s
)
track_query(operation, ...) — context manager¶
with db.track_query(
operation="get_orders",
query="SELECT * FROM orders WHERE tenant_id = :tid", # for logs
slow_query_threshold_ms=200.0, # override per operation
tenant_id="acme",
slo_name="order_read_latency",
attributes={"order_status": "pending"},
enable_tracing=True,
enable_slo=True,
):
return session.execute(stmt).fetchall()
| Parameter | Type | Default | Description |
|---|---|---|---|
operation |
str |
required | Operation name for metrics/traces |
query |
str \| None |
None |
SQL text (truncated to 200 chars in logs) |
slow_query_threshold_ms |
float \| None |
instance default | Override slow threshold |
tenant_id |
str \| None |
None |
Multi-tenant context |
slo_name |
str \| None |
instance default | SLO to record against |
attributes |
dict \| None |
None |
Extra span attributes |
enable_tracing |
bool |
True |
Create an OTLP span |
enable_slo |
bool |
True |
Record SLO measurement |
record_query(operation, duration_seconds, ...) — manual¶
For when you can't use a context manager (e.g., wrapping an ORM callback):
start = time.perf_counter()
try:
result = execute_raw(sql)
db.record_query("raw_query", time.perf_counter() - start, success=True)
except Exception as e:
db.record_query("raw_query", time.perf_counter() - start,
success=False, error_type=type(e).__name__)
raise
track_query — Convenience Function¶
Stateless wrapper — creates a temporary DatabaseTracker internally.
from obskit.integrations.db.tracker import track_query
with track_query(
operation="create_order",
database_name="postgres",
query="INSERT INTO orders ...",
slow_query_threshold_ms=500.0,
tenant_id="acme",
):
session.add(order)
session.commit()
What Gets Emitted¶
Structured Logs¶
Every query emits a structured log entry on completion:
{
"event": "db_query_completed",
"database": "postgres",
"operation": "get_orders",
"duration_ms": 12.4,
"success": true,
"tenant_id": "acme"
}
Slow queries emit an additional warning:
{
"event": "slow_sql_query",
"database": "postgres",
"duration_ms": 1234.5,
"query": "SELECT * FROM orders WHERE ..."
}
Prometheus Metrics¶
DatabaseTracker records RED metrics (shared across all instrumented databases):
| Metric | Source |
|---|---|
request_duration_seconds |
track_query duration |
errors_total |
Failed queries |
Distributed Traces¶
Each track_query call creates an OTLP span with:
db.system= database namedb.operation= operation namedb.statement= query text (if provided)tenant.id= tenant ID (if provided)
Pattern: Reusable Tracker per Repository¶
# src/repositories/base.py
from obskit.integrations.db.tracker import DatabaseTracker
_db = DatabaseTracker("postgres", default_slow_threshold_ms=500.0)
class OrderRepository:
def get(self, order_id: str, tenant_id: str):
with _db.track_query("get_order", tenant_id=tenant_id):
return self.session.get(Order, order_id)
def create(self, order: Order, tenant_id: str):
with _db.track_query("create_order", tenant_id=tenant_id):
self.session.add(order)
self.session.commit()
return order
See Also¶
- SLO Tracking — wire
slo_nameto an SLO target