System Design Building Blocks
13 min read

Distributed Logging System

Centralized logging infrastructure enables observability across distributed systems. This article covers log data models, collection architectures, storage engines, indexing strategies, and scaling approaches—with design trade-offs and real-world implementations from Netflix (5 PB/day), Uber, and others.

Query Layer

Storage Layer

Processing Layer

Buffer Layer

Collection Layer

Log Sources

Service A

Service B

Service C

Agent

Agent

Agent

Message Queue

Kafka/Kinesis

Stream Processor

Parse, Enrich, Route

Hot Tier

SSD

Warm Tier

HDD

Cold Tier

Object Storage

Index Service

Search API

Visualization

End-to-end distributed logging architecture: collection agents ship logs through a message queue buffer, stream processors parse and route data to tiered storage, and an index service powers search queries.

Distributed logging solves the observability problem: reconstructing system behavior from scattered, ephemeral log streams across hundreds of services. The core tension is between query flexibility (full-text search, ad-hoc filtering) and cost efficiency (storage, indexing overhead).

Key design decisions:

  • Data model: Structured logging (JSON/Protobuf) enables schema evolution and efficient compression, but requires upfront discipline
  • Collection topology: Sidecar agents provide isolation but consume 10-20x more resources than DaemonSet agents; choose based on multi-tenancy needs
  • Index strategy: Full inverted indexes (Elasticsearch) enable rich queries but triple storage costs; label-only indexes (Loki) reduce costs 10x but require brute-force content scanning
  • Storage tiering: Hot/warm/cold tiers balance query latency against cost—Netflix keeps 3 days hot, 30 days warm, years cold

The mental model: logs are write-heavy, read-sparse time-series data with unpredictable query patterns. Systems optimized for write throughput (LSM trees, columnar storage) outperform row-oriented databases by 10-100x, but indexing strategy determines whether queries take milliseconds or minutes.

Unstructured logs (free-form text) are easy to emit but expensive to query. Every search requires regex parsing across terabytes of data.

Structured logging (JSON, Protocol Buffers) shifts parsing cost from query time to write time:

2 collapsed lines
// Structured log entry
{
"timestamp": "2024-01-15T10:23:45.123Z",
"level": "ERROR",
"service": "payment-service",
"trace_id": "abc123",
"message": "Payment failed",
"error_code": "INSUFFICIENT_FUNDS",
"amount_cents": 5000,
"user_id": "usr_789"
}

Trade-offs:

AspectUnstructuredStructured (JSON)Structured (Protobuf)
Write simplicity printf-styleRequires log libraryRequires codegen
Query flexibility Regex only Field extraction Field extraction
Schema evolutionN/AImplicit (any field)Explicit (field numbers)
Compression ratio2-5x5-10x10-20x
Cross-language Universal UniversalRequires runtime

Design choice: JSON is the de-facto standard for application logs because it balances flexibility with tooling support. Protocol Buffers excel for high-volume internal telemetry where schema discipline is enforced.

JSON’s implicit schema makes additions trivial but creates drift. Protocol Buffers enforce evolution rules:

  • Field numbers: Never reuse deleted field numbers (reserve them)
  • Adding fields: Safe—Proto3 defaults missing fields to zero values
  • Removing fields: Mark as reserved, never reuse the number
  • Type changes: Incompatible—requires new field

The buf toolchain automates breaking change detection for Protobuf schemas.

Real-world example: Datadog recommends log entries under 25KB for optimal performance, with a hard limit of 1MB. Large logs (stack traces, request bodies) should be truncated or sampled.

Two dominant patterns for Kubernetes environments:

DaemonSet Pattern (one agent per node):

4 collapsed lines
# Fluentd DaemonSet configuration
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
spec:
template:
spec:
containers:
- name: fluentd
resources:
3 collapsed lines
requests:
memory: "200Mi"
cpu: "100m"
  • Pros: Resource-efficient (one agent serves all pods), simpler management
  • Cons: Single point of failure per node, limited tenant isolation
  • Best for: Clusters with <500 log configurations, homogeneous workloads

Sidecar Pattern (one agent per pod):

  • Pros: Tenant isolation, per-application configuration, failure isolation
  • Cons: 10-20x higher resource overhead, complex lifecycle management
  • Best for: Multi-tenant platforms, >500 configurations, PaaS environments

Decision factor: WePay’s engineering team found the threshold at approximately 500 collection configurations—below this, DaemonSets suffice; above, sidecars provide necessary isolation.

Push vs Pull:

ModelLatencyFailure ModeBackpressure
PushLower (immediate)Agent buffers on failureAgent-side
PullHigher (polling interval)Central bufferServer-side

Batching is critical for efficiency. Fluent Bit defaults to 2KB chunks, flushing every 1 second. Larger batches reduce network overhead but increase latency and memory pressure.

Backpressure handling:

  1. Bounded memory buffers: Drop oldest logs when full (acceptable for most logs)
  2. Disk spillover: Write to local disk when memory exhausted (Fluentd’s buffer plugin)
  3. Sampling: Reduce rate dynamically under pressure (Vector’s adaptive sampling)
AgentLanguageMemoryThroughputBest For
Fluent BitC~20MBHighEdge, IoT, resource-constrained
FluentdRuby~100MBMediumPlugin ecosystem, complex routing
VectorRust~50MBVery highPerformance-critical, modern stacks
FilebeatGo~30MBMediumElastic ecosystem
LogstashJava~500MBMediumComplex transformations

Direct shipping from agents to storage creates coupling and cascading failures. A message queue buffer (Kafka, Kinesis) provides:

  1. Absorption: Handle ingestion spikes without dropping logs
  2. Decoupling: Storage maintenance doesn’t block collection
  3. Replay: Reprocess historical logs for new pipelines
  4. Fan-out: Multiple consumers from single stream

Trade-off: Adds operational complexity and ~10-50ms latency.

Kafka’s partitioned log model aligns naturally with log data:

Topic: application-logs
├── Partition 0: [service-a logs, ordered by offset]
├── Partition 1: [service-b logs, ordered by offset]
└── Partition 2: [service-c logs, ordered by offset]

Partition key choices:

KeyProsCons
Service nameCo-located logs, good compressionHot partitions for high-volume services
Trace IDCorrelated logs togetherUneven distribution
Round-robinEven distributionNo ordering guarantees
Timestamp bucketTime localityClock skew issues

Backpressure in Kafka consumers:

  1. Disable auto-commit; acknowledge only after successful processing
  2. Use bounded thread pools to cap concurrent processing
  3. Dynamically pause partitions when downstream systems stress
  4. Monitor consumer lag as health signal

Stream processors transform raw logs before storage:

  • Parsing: Extract structured fields from unstructured text
  • Enrichment: Add metadata (geo-IP, user attributes, service ownership)
  • Routing: Direct logs to different storage tiers by level/service
  • Sampling: Reduce volume for high-cardinality, low-value logs

Design choice: Lightweight processing (regex, JSON parsing) can happen in agents. Heavy enrichment (database lookups, ML classification) belongs in dedicated stream processors.

Logs are write-heavy: a single service might emit 10,000 logs/second but queries happen once per incident. Two architectures dominate:

LSM Trees (Log-Structured Merge Trees):

Write Path:
Log Entry → MemTable (memory) → Flush → SSTable (disk)
Background compaction
Merged SSTables
  • Writes: Sequential, batched, O(1) amortized
  • Reads: Check MemTable, then each SSTable level (bloom filters help)
  • Used in: Elasticsearch (Lucene segments), RocksDB, Cassandra

Columnar Storage:

Row-oriented: Columnar:
| ts | level | msg | | ts_col | level_col | msg_col |
| t1 | INFO | A | | t1 | INFO | A |
| t2 | ERROR | B | | t2 | ERROR | B |
| t3 | INFO | C | | t3 | INFO | C |
  • Compression: Same-type columns compress dramatically better (10-100x vs row)
  • Query efficiency: Read only needed columns; skip irrelevant data
  • Used in: ClickHouse, Druid, Parquet files

Netflix’s ClickHouse deployment ingests 5 petabytes daily (10.6M events/second average, 12.5M peak) using columnar storage with aggressive compression.

ClickHouse achieves 170x compression on nginx access logs through layered compression:

TechniqueHow It WorksBest For
Dictionary encodingStore unique values in dictionary, reference by IDLow-cardinality fields (level, service)
Delta encodingStore differences between consecutive valuesTimestamps, monotonic IDs
LZ4Fast block compressionGeneral purpose, read-heavy
ZSTDHigher compression, more CPUArchive, I/O-bound queries

Codec selection rule: ZSTD for large range scans where decompression is amortized; LZ4 when decompression latency dominates (point queries).

Hot/warm/cold tiering balances query performance against storage cost:

TierStorageIndexingRetentionQuery Latency
HotNVMe SSDFull1-7 days<100ms
WarmHDD/SSDPartial7-90 days1-10s
ColdObject storage (S3)Metadata onlyYears30s-minutes

Elasticsearch ILM (Index Lifecycle Management) automates transitions:

2 collapsed lines
// ILM policy example
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": { "max_size": "50GB", "max_age": "1d" }
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
6 collapsed lines
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
}
}
}
}

Splunk SmartStore decouples compute from storage: indexers use local SSD as cache while persisting warm/cold data to S3-compatible object storage. This enables elastic scaling—compute scales independently from storage.

Elasticsearch/Lucene builds inverted indexes mapping terms to documents:

Term Dictionary: Postings List:
"error" → [doc1, doc3, doc7]
"payment" → [doc2, doc3]
"timeout" → [doc1, doc5, doc7]

Query execution: Look up term in dictionary (O(log n)), retrieve posting list, intersect/union lists for boolean queries.

Storage overhead: Inverted indexes typically 2-3x the original data size. With position information (for phrase queries), overhead reaches 3-4x.

Trade-off: Rich query capabilities (wildcards, fuzzy matching, phrase search) at significant storage and write cost.

Grafana Loki indexes only label metadata, not log content:

Labels: {service="payment", level="error", env="prod"}
Chunks: [compressed log lines matching these labels]

Query execution: Filter by labels (indexed), then brute-force scan chunk content.

LogQL query:

{service="payment"} |= "timeout" | json | latency_ms > 500

Trade-offs:

AspectInverted Index (Elasticsearch)Label-Only (Loki)
Index size2-4x original<5% original
Storage costHighLow
Full-text search Fast Scan required
High cardinalityHandles well⚠️ Label explosion
Query latencyConsistentVaries with scan size

Cardinality constraint: Loki performs poorly with high-cardinality labels (user IDs, request IDs). Keep label cardinality under 100,000 unique combinations.

Bloom filters provide probabilistic existence checks with minimal memory:

Query: "Does chunk X contain 'error'?"
Bloom filter: "Probably yes" or "Definitely no"

Characteristics:

  • False positives possible (check chunk, find nothing)
  • False negatives impossible (if filter says no, it’s no)
  • Memory: ~10 bits per element for 1% false positive rate

ClickHouse uses bloom filters as skip indexes—they tell the engine where NOT to look, reducing disk I/O without full indexing overhead.

Break-even analysis: For existence queries on moderate datasets, bloom filters outperform inverted indexes up to ~7,200 documents. Beyond that, inverted indexes’ O(1) lookup dominates.

Query TypeLatency SLAIndex StrategyStorage Tier
Live tail<1sIn-memory onlyHot
Incident investigation<10sFull indexHot + Warm
Compliance auditMinutes OKPartial/NoneWarm + Cold
Analytics/trendingMinutes OKAggregatedAll tiers

Distributed tracing enables log correlation:

Request flow:
API Gateway → Auth Service → Payment Service → Notification
↓ ↓ ↓ ↓
trace_id=abc trace_id=abc trace_id=abc trace_id=abc

All logs share trace_id, enabling reconstruction of the full request path.

Uber’s Jaeger records thousands of traces per second across hundreds of microservices, using trace IDs to correlate logs, metrics, and traces.

Common patterns:

-- Error rate by service (last hour)
SELECT service, count(*) as errors
FROM logs
WHERE level = 'ERROR' AND timestamp > now() - interval 1 hour
GROUP BY service
ORDER BY errors DESC
-- P99 latency by endpoint
SELECT endpoint, quantile(0.99)(latency_ms) as p99
FROM logs
WHERE timestamp > now() - interval 1 day
GROUP BY endpoint

Columnar storage (ClickHouse) excels here—only the referenced columns are read from disk.

Time-based partitioning (most common):

logs-2024-01-15
logs-2024-01-16
logs-2024-01-17
  • Natural fit for retention policies (drop old partitions)
  • Query locality for time-range queries
  • Risk: Recent partitions are hot; older partitions cold

Composite partitioning (time + source):

logs-payment-2024-01-15
logs-auth-2024-01-15
logs-payment-2024-01-16
  • Better load distribution
  • Service-specific retention policies
  • Complexity: More partitions to manage

Key insight: Each partition is independently ordered, but there’s no global ordering across partitions. This enables horizontal scaling but complicates cross-partition queries.

Standard replication for durability:

StrategyConsistencyWrite LatencyFailure Tolerance
Sync 2 replicasStrongHigher1 node
Async replicationEventualLowerData loss window
Quorum (2 of 3)StrongMedium1 node

Logs typically tolerate eventual consistency—losing a few log lines during failures is acceptable for most use cases.

When a single partition can’t handle write volume:

logs-payment (logical) →
logs-payment-shard-0 (physical, 33% of writes)
logs-payment-shard-1 (physical, 33% of writes)
logs-payment-shard-2 (physical, 33% of writes)

Shard key considerations:

  • Hash of trace ID: Even distribution, but scatter-gather queries
  • Round-robin: Maximum distribution, no locality
  • Consistent hashing: Smooth rebalancing when adding shards

Architecture layers:

  1. Collection: Filebeat/Metricbeat ship logs
  2. Processing: Logstash ingests, parses, enriches
  3. Storage: Elasticsearch indexes into sharded indices
  4. Visualization: Kibana queries via REST API

Design characteristics:

  • Inverted indexes for full-text search
  • Shard sizing: 10-50GB per shard for optimal performance
  • Replica count: Typically 1 (2 copies total)

When to use: Need rich full-text search, complex queries, established ecosystem.

Architecture:

  1. Distributor: Receives logs, validates, forwards to ingesters
  2. Ingester: Batches logs into chunks, builds label index
  3. Querier: Executes LogQL queries, merges results
  4. Chunk Store: Object storage (S3/GCS) for compressed log chunks
  5. Index Store: Label index in BoltDB, Cassandra, or object storage

Design characteristics:

  • Label-only indexing (like Prometheus for logs)
  • Chunk compression: Snappy or LZ4
  • Cost: 10x cheaper than full-text indexing at scale

When to use: Already using Grafana, cost-sensitive, structured logs with low-cardinality labels.

Netflix case study (5 PB/day):

Three optimizations reduced query latency from 3s to 700ms:

  1. Generated lexers: Custom log fingerprinting achieved 8-10x faster parsing than regex
  2. Native protocol serialization: Bypassed HTTP overhead for bulk inserts
  3. Sharded tag maps: Distributed high-cardinality tag lookups across nodes

Design characteristics:

  • Columnar storage with MergeTree engine
  • Compression: 170x possible with proper codecs
  • Materialized views for pre-aggregation

When to use: Extreme scale (petabytes), analytical queries, cost efficiency over query flexibility.

Architecture:

  1. Forwarders: Ship raw data
  2. Indexers: Parse, extract fields, build indexes
  3. Search heads: Execute SPL queries

SmartStore innovation: Separates compute from storage—indexers cache recent data on SSD while warm/cold data lives in S3. Enables elastic scaling without full reindexing.

When to use: Enterprise requirements, compliance, integrated security analytics.

CloudPrem architecture (self-hosted option):

  • Indexers write splits directly to object storage
  • Central metastore tracks splits for instant query availability
  • Horizontal scaling via load balancer
  • Observability Pipelines Worker for complex routing

Design characteristics:

  • Log entries optimally <25KB, max 1MB
  • Separation of indexing from storage
  • Multi-tenant isolation

The mistake: Using request IDs, user IDs, or timestamps as index labels.

Why it happens: Developers want to query by these fields.

The consequence: Index explosion—Loki becomes unusable; Elasticsearch shard counts explode.

The fix: High-cardinality values go in log content, not labels. Query them with full-text search or store in a separate trace store.

The mistake: Logging every request at DEBUG level in production.

Why it happens: “We might need it for debugging.”

The consequence: Storage costs spiral; query performance degrades.

The fix:

  • Sample high-volume, low-value logs
  • Use log levels appropriately (ERROR/WARN for alerts, INFO for business events, DEBUG for local dev)
  • Set per-service quotas

The mistake: No trace ID propagation across services.

Why it happens: Requires cross-team coordination.

The consequence: Incident investigation requires manual correlation across time windows.

The fix: Mandate trace ID headers (e.g., X-Request-ID) at API gateway; propagate through all services.

The mistake: Keeping all logs on hot storage indefinitely.

Why it happens: “Storage is cheap.”

The consequence: At scale, SSD costs dwarf compute. Query performance degrades as index size grows.

The fix: Implement hot/warm/cold tiering. Most logs are never queried after 7 days.

The mistake: Agents with unbounded memory buffers.

Why it happens: “We can’t lose logs.”

The consequence: Agent OOMs during ingestion spikes; entire node destabilized.

The fix: Bounded buffers with overflow to disk, or sampling under pressure. Some log loss is better than node failure.

Distributed logging infrastructure requires balancing query flexibility against cost efficiency. The key design decisions are:

  1. Data model: Structured logging (JSON for flexibility, Protobuf for performance) enables downstream efficiency
  2. Collection: DaemonSets for simplicity, sidecars for isolation—threshold around 500 configurations
  3. Indexing: Full inverted indexes (Elasticsearch) for query richness; label-only indexes (Loki) for cost efficiency
  4. Storage: Tiered architecture with ILM automation; most logs never queried after 7 days
  5. Scaling: Time-based partitioning with consistent hashing for writes; replication for durability

Netflix’s 5 PB/day deployment demonstrates that extreme scale is achievable with columnar storage (ClickHouse), aggressive compression (170x), and careful data modeling. The trade-off: less query flexibility than full-text search systems.

  • Understanding of distributed systems fundamentals (replication, partitioning)
  • Familiarity with time-series data characteristics
  • Basic knowledge of search indexing concepts
  • LSM Tree (Log-Structured Merge Tree): Write-optimized data structure that batches writes in memory and flushes to immutable sorted files
  • Inverted Index: Mapping from terms to documents containing those terms; enables full-text search
  • Bloom Filter: Probabilistic data structure for set membership with false positives but no false negatives
  • ILM (Index Lifecycle Management): Automated policy for transitioning data through storage tiers
  • LogQL: Grafana Loki’s query language, inspired by PromQL
  • Structured logging (JSON/Protobuf) shifts parsing cost from query time to write time
  • Collection agents trade resource efficiency (DaemonSet) against isolation (sidecar)
  • Message queue buffers (Kafka) decouple collection from storage, enabling replay and fan-out
  • Columnar storage (ClickHouse) achieves 170x compression; LSM trees (Elasticsearch) enable rich indexing
  • Label-only indexing (Loki) costs 10x less than full-text indexing but requires content scanning
  • Hot/warm/cold tiering balances query latency against storage cost—automate with ILM

Read more