Skip to main content

Installation

pip install timberlogs-client

Basic Usage

from timberlogs import create_timberlogs

timber = create_timberlogs(
    source="my-app",
    environment="production",
    api_key="tb_live_xxxxxxxxxxxxx",
)

timber.info("Hello, Timberlogs!")

Exports

The SDK exports the following:
from timberlogs import (
    create_timberlogs,    # Factory function
    TimberlogsClient,     # Client class
    Flow,                 # Flow class for tracking
    LogEntry,             # Log entry dataclass
    LogOptions,           # Options for logging methods
    TimberlogsConfig,     # Configuration dataclass
    FormatName,           # Literal type for ingest formats
    IngestRawOptions,     # Options for ingest_raw()
)

Logging Methods

debug(message, data?, options?)

Log debug-level messages for detailed diagnostic information.
# Simple debug
timber.debug("Cache lookup")

# With data
timber.debug("Cache hit", {"key": "user:123", "ttl": 3600})

# With tags
from timberlogs import LogOptions
timber.debug("Cache miss", {"key": "user:456"}, LogOptions(tags=["cache", "performance"]))

info(message, data?, options?)

Log informational messages about normal operations.
timber.info("User signed in", {"user_id": "user_123", "method": "oauth"})

timber.info("Order placed", {"order_id": "ord_xyz", "total": 99.99})

warn(message, data?, options?)

Log warning conditions that might indicate a problem.
timber.warn("Rate limit approaching", {"current": 950, "limit": 1000, "endpoint": "/api/users"})

timber.warn("Deprecated API called", {"endpoint": "/v1/legacy", "recommendation": "Use /v2/modern instead"})

error(message, errorOrData?, options?)

Log errors. Accepts either an Exception object or a data dictionary.
# With Exception object (extracts name, message, traceback)
try:
    risky_operation()
except Exception as e:
    timber.error("Operation failed", e)

# With data object
timber.error("Validation failed", {"field": "email", "value": "invalid", "reason": "Invalid email format"})

# With tags
from timberlogs import LogOptions
timber.error("Payment failed", e, LogOptions(tags=["payments", "critical"]))
When you pass an Exception object, the SDK automatically extracts:
  • errorName — The exception’s class name (e.g. ValueError)
  • errorStack — The full traceback
  • The exception message is included in the data

log(entry)

Low-level logging method with full control over the log entry.
from timberlogs import LogEntry

timber.log(LogEntry(
    level="info",
    message="Custom log entry",
    data={"custom": "data", "nested": {"value": 123}},
    user_id="user_123",
    session_id="sess_abc",
    request_id="req_xyz",
    tags=["important", "billing"],
))

Error Handling

When you pass an Exception object to error(), the SDK extracts structured fields:
try:
    risky_operation()
except Exception as e:
    timber.error("Operation failed", e)
    # Creates a log entry with:
    # - error_name: "ValueError" (type(e).__name__)
    # - error_stack: full traceback string
    # - message includes the exception message

Data Object

The data parameter accepts any JSON-serializable dictionary:
timber.info("Request completed", {
    # Primitives
    "status": 200,
    "duration": 123.45,
    "cached": True,

    # Nested objects
    "user": {
        "id": "user_123",
        "role": "admin",
    },

    # Arrays
    "permissions": ["read", "write", "delete"],
})

Tags

Tags help categorize and filter logs. Add them via the options parameter:
from timberlogs import LogOptions

timber.info("Payment processed", {"amount": 99.99}, LogOptions(tags=["payments", "success"]))

timber.error("Payment failed", e, LogOptions(tags=["payments", "critical", "pagerduty"]))

Flow Tracking

Flows group related logs across a multi-step process with automatic flow IDs and step indexing. See Flows for a conceptual overview.

Creating a Flow

Synchronous — generates a local flow ID:
flow = timber.flow("checkout")

flow.info("Started checkout")
flow.info("Validated cart", {"items": 3})
flow.info("Payment processed", {"amount": 99.99})
flow.info("Order confirmed", {"order_id": "ord_123"})
Asynchronous — creates the flow on the server for a server-generated ID:
flow = await timber.flow_async("checkout")

flow.info("Started checkout")
flow.info("Payment processed", {"amount": 99.99})
Both produce logs with:
  • flowId: "checkout-a1b2c3d4" (auto-generated)
  • stepIndex: 0, 1, 2, 3 (auto-incrementing)

Flow Properties

flow = timber.flow("user-onboarding")

print(flow.id)    # "user-onboarding-a1b2c3d4"
print(flow.name)  # "user-onboarding"

Flow Logging Methods

Flows have the same logging methods as the main client:
flow = timber.flow("data-pipeline")

flow.debug("Debug info", {"stage": "init"})
flow.info("Processing started")
flow.warn("Slow operation", {"duration": 5000})
flow.error("Processing failed", Exception("Timeout"))

Flow Chaining

Flow methods return the flow for chaining:
flow = timber.flow("checkout")
flow.info("Cart validated").info("Payment authorized").info("Order created").info("Confirmation sent")

Real-World Examples

Data Pipeline

def process_pipeline(pipeline_id: str, records: list):
    flow = timber.flow(f"pipeline-{pipeline_id}")

    flow.info("Pipeline started", {"record_count": len(records)})

    for i, record in enumerate(records):
        flow.debug("Processing record", {"index": i, "id": record["id"]})
        transform(record)

    flow.info("Pipeline completed", {"processed": len(records)})

Flow ID Generation

Flow IDs are generated using the pattern: {name}-{random8chars}
checkout-a1b2c3d4
user-onboarding-x7y8z9ab
api-request-mn0p1q2r

Level Filtering with Flows

When using min_level configuration, filtered logs don’t increment the step index:
timber = create_timberlogs(
    # ...
    min_level="info",
)

flow = timber.flow("example")
flow.debug("Not sent")      # Filtered, step_index not incremented
flow.info("First log")      # step_index: 0
flow.debug("Not sent")      # Filtered, step_index not incremented
flow.info("Second log")     # step_index: 1
This ensures your step indices remain sequential without gaps.

Raw Format Ingestion

Send pre-formatted log data directly to the ingestion endpoint, bypassing the structured log pipeline. Useful for forwarding logs from external systems (syslog daemons, CSV exports, JSONL streams).

ingest_raw(body, format, options?)

Synchronous version:
timber.ingest_raw(
    "<165>1 2024-01-15T10:30:00.000Z myhost api 1234 - - Connection refused",
    "syslog",
    IngestRawOptions(source="syslog-relay", environment="production"),
)

ingest_raw_async(body, format, options?) (async)

Asynchronous version:
await timber.ingest_raw_async(
    "<165>1 2024-01-15T10:30:00.000Z myhost api 1234 - - Connection refused",
    "syslog",
    IngestRawOptions(source="syslog-relay", environment="production"),
)
Parameters:
ParameterTypeDescription
bodystrThe raw log data
formatFormatNameOne of json, jsonl, syslog, text, csv, obl
options?IngestRawOptionsOptional defaults for source, environment, level, dataset
The SDK sets the correct Content-Type header automatically and retries with exponential backoff on failure.

Examples

CSV:
timber.ingest_raw(
    "level,message,source\nerror,Connection refused,api\ninfo,Request completed,api",
    "csv",
    IngestRawOptions(environment="production"),
)
JSONL:
timber.ingest_raw(
    '{"level":"info","message":"line 1","source":"api","environment":"production"}\n{"level":"error","message":"line 2","source":"api","environment":"production"}',
    "jsonl",
)
Plain text:
timber.ingest_raw(
    "2024-01-15 10:30:00 ERROR Connection refused\n2024-01-15 10:30:01 INFO Retrying...",
    "text",
    IngestRawOptions(source="nginx", environment="production"),
)

Supported Formats

FormatContent-TypeDescription
jsonapplication/jsonJSON array or { logs: [...] }
jsonlapplication/x-ndjsonOne JSON object per line
syslogapplication/x-syslogRFC 5424 / RFC 3164
texttext/plainOne log per line
csvtext/csvHeader row + data rows
oblapplication/x-oblOpen Board Logging
See Log Ingestion for full format details.

Client Methods

set_user_id(user_id)

Set the default user ID for subsequent logs.
timber.set_user_id("user_123")

set_session_id(session_id)

Set the default session ID for subsequent logs.
timber.set_session_id("sess_abc")

flush()

Immediately send all queued logs.
timber.flush()

flush_async() (async)

Asynchronously send all queued logs.
await timber.flush_async()

disconnect()

Flush logs and stop the auto-flush timer.
timber.disconnect()

disconnect_async() (async)

Asynchronously flush and stop the client.
await timber.disconnect_async()

Method Chaining

All methods return self for chaining:
timber.set_user_id("user_123").set_session_id("sess_abc").info("User action")

Log Entry Dataclass

@dataclass
class LogEntry:
    level: Literal["debug", "info", "warn", "error"]
    message: str
    data: Optional[Dict[str, Any]] = None
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    request_id: Optional[str] = None
    error_name: Optional[str] = None
    error_stack: Optional[str] = None
    tags: Optional[List[str]] = None
    flow_id: Optional[str] = None
    step_index: Optional[int] = None
    dataset: Optional[str] = None
    ip_address: Optional[str] = None
    country: Optional[str] = None

Options Object

All logging methods accept an optional options object:
from timberlogs import LogOptions

timber.info("Message", {"data": "here"}, LogOptions(
    tags=["important", "billing"],
))

Context Manager

Use the client as a context manager for automatic cleanup:
with create_timberlogs(
    source="my-app",
    environment="production",
    api_key="tb_live_xxx",
) as timber:
    timber.info("Auto-flushed on exit")

Async Context Manager

async with create_timberlogs(
    source="my-app",
    environment="production",
    api_key="tb_live_xxx",
) as timber:
    timber.info("Auto-flushed on exit")

Configuration

timber = create_timberlogs(
    # Required
    source="my-app",
    environment="production",  # development, staging, production
    api_key="tb_live_xxx",

    # Optional
    version="1.2.3",
    dataset="analytics",         # Default dataset for log routing
    user_id="user_123",
    session_id="sess_abc",
    batch_size=10,           # Logs to batch before sending
    flush_interval=5.0,      # Auto-flush interval in seconds
    min_level="debug",       # Minimum level to send
    on_error=lambda e: print(e),  # Error callback
    max_retries=3,
    initial_delay_ms=1000,
    max_delay_ms=30000,
)

Requirements

  • Python 3.8+
  • httpx >= 0.24.0
Ready to start logging? Sign up free — send your first log in under 5 minutes.