17 min read

Design a Distributed File System

A comprehensive system design for a distributed file system like GFS or HDFS covering metadata management, chunk storage, replication strategies, consistency models, and failure handling. This design addresses petabyte-scale storage with high throughput for batch processing workloads while maintaining fault tolerance across commodity hardware.

Storage Layer

Metadata Layer

Clients

1. Metadata Request
1. Metadata Request
1. Metadata Request
2. Chunk Locations
2. Chunk Locations
2. Chunk Locations
3. Read Data
3. Read Data
3. Write Data
4. Pipeline Replication
4. Pipeline Replication

Client 1

(Reader)

Client 2

(Writer)

Client 3

(Appender)

Master Server

(NameNode)

Shadow Master

(Hot Standby)

Operation Log

+ Checkpoints

Chunk Server 1

(DataNode)

Chunk Server 2

(DataNode)

Chunk Server 3

(DataNode)

Chunk Server 4

(DataNode)

High-level architecture: Clients query the master for chunk locations, then read/write directly to chunk servers. Writes pipeline through replicas.

Distributed file systems solve the problem of storing and accessing files that exceed single-machine capacity while providing fault tolerance and high throughput. The core architectural tension is between metadata scalability (how many files can the system track) and data throughput (how fast can clients read/write).

Core architectural decisions:

DecisionChoiceRationale
Metadata managementSingle masterSimplifies placement, enables global optimization
Chunk size64-128 MBAmortizes metadata overhead, optimizes for large files
Replication3 replicas, rack-awareSurvives rack failure, balances write bandwidth
ConsistencyRelaxed (defined regions)Enables concurrent appends, simplifies implementation
Write modelAppend-only preferredEliminates random write complexity, enables atomic appends

Key trade-offs accepted:

  • Single master limits metadata operations to ~10K ops/sec (sufficient for batch workloads)
  • Large chunks waste space for small files and create hotspots
  • Relaxed consistency requires application-level handling of duplicates

What this design optimizes:

  • High throughput for large sequential reads/writes (100s MB/s per client)
  • Automatic failure recovery with minimal data loss
  • Linear storage scaling to petabytes
RequirementPriorityNotes
File creation/deletionCoreHierarchical namespace
Large file readCoreMulti-GB to TB files, sequential access
Large file writeCoreStreaming writes, immutable after close
Record appendCoreMultiple clients appending concurrently
SnapshotExtendedPoint-in-time copy for backups
Namespace operationsExtendedRename, move, permissions
Small file supportExtendedNot optimized, but functional
RequirementTargetRationale
Availability99.9% (3 nines)Batch processing tolerates brief outages
Read throughput100+ MB/s per clientSaturate network, not disk
Write throughput50+ MB/s per clientPipeline limits write speed
Append latencyp99 < 100msReal-time log ingestion
Durability99.9999%Data survives multiple simultaneous failures
Recovery time< 10 min for node failureRe-replication must not overwhelm cluster

Cluster size (large deployment):

  • Nodes: 1,000-5,000 chunk servers
  • Storage per node: 12 × 4TB disks = 48TB raw
  • Total raw capacity: 50,000 × 48TB = 2.4 PB per 50 nodes, scaling to 240 PB at 5,000 nodes
  • Usable capacity (3x replication): ~80 PB at 5,000 nodes

Files and chunks:

  • Files: 10 million (typical large deployment)
  • Average file size: 1 GB
  • Chunks per file: 1 GB / 64 MB = 16 chunks
  • Total chunks: 160 million
  • Metadata per chunk: ~150 bytes (GFS design)
  • Total metadata: 160M × 150B = 24 GB (fits in master memory)

Traffic:

  • Concurrent clients: 10,000
  • Read-heavy workload: 90% reads, 10% writes
  • Aggregate read throughput: 10,000 × 100 MB/s = 1 TB/s cluster-wide
  • Aggregate write throughput: 1,000 × 50 MB/s = 50 GB/s cluster-wide

Metadata operations:

  • File opens per second: 5,000-10,000
  • Chunk location lookups: 50,000-100,000 per second (batch prefetch)

Best when:

  • Metadata operations are not the bottleneck (batch processing)
  • Simplicity and operational ease are priorities
  • Global optimization of chunk placement is valuable
  • File count is under 100 million

Architecture:

Chunk Servers (1000s)

Data ops

Data ops

Data ops

Metadata ops

Heartbeat + chunk list

Heartbeat + chunk list

Heartbeat + chunk list

Master

Namespace

(in-memory tree)

File→Chunk

Mapping

Chunk→Server

Locations

Server 1

Server 2

Server N

Client

Key characteristics:

  • All metadata in single master’s memory
  • Chunk locations not persisted—rebuilt from heartbeats on startup
  • Operation log replicated to shadow masters
  • Clients cache chunk locations, reducing master load

Trade-offs:

  • Simple design, easy to reason about
  • Global knowledge enables optimal placement
  • Single point for consistency
  • Memory limits file count (~100M files with 64GB RAM)
  • Master CPU limits metadata ops (~10K ops/sec)
  • Single point of failure (mitigated by shadows)

Real-world example: Google File System (2003-2010) used this model. Clusters grew to several thousand chunk servers and hundreds of terabytes before metadata limits became problematic. Google eventually replaced GFS with Colossus for exabyte-scale needs.

Best when:

  • Multiple independent workloads share infrastructure
  • File count exceeds single-master memory limits
  • Namespace isolation is desirable
  • Gradual scaling without re-architecture is needed

Architecture:

Shared DataNodes

Federation

NameNode 1

(Namespace A)

NameNode 2

(Namespace B)

NameNode 3

(Namespace C)

DataNode 1

Block Pool A,B

DataNode 2

Block Pool A,C

DataNode 3

Block Pool B,C

Key characteristics:

  • Each NameNode manages independent namespace
  • Block pools are isolated per NameNode
  • DataNodes serve all NameNodes
  • No coordination between NameNodes

Trade-offs:

  • Scales metadata horizontally
  • Namespace isolation for multi-tenancy
  • Incremental scaling
  • No cross-namespace operations (hardlinks, moves)
  • Uneven namespace utilization requires manual balancing
  • Client must know which NameNode to contact

Real-world example: HDFS Federation was introduced in Hadoop 2.0 (2012). Yahoo deployed clusters with multiple NameNodes managing separate namespaces for different teams.

Best when:

  • Exabyte-scale storage is required
  • Billions of files are expected
  • Multi-tenancy with strong isolation is critical
  • Team has expertise in distributed databases

Architecture:

Storage Service

Metadata Service

Curators

(Stateless)

BigTable/Spanner

Metadata Store

Custodians

(Background ops)

D Servers

(Storage)

Client

Key characteristics:

  • Metadata in distributed database (BigTable, Spanner)
  • Curators are stateless, scale horizontally
  • Custodians handle background operations
  • D servers are simple storage targets

Trade-offs:

  • Exabyte scale, billions of files
  • No single point of failure
  • True horizontal scaling
  • Complex multi-layer architecture
  • Higher latency for metadata operations
  • Requires distributed database expertise

Real-world example: Google Colossus (2010+) stores exabytes of data across Google’s infrastructure. Facebook Tectonic (2021) consolidated multiple storage systems into one, reducing data warehouse clusters by 10x.

FactorSingle MasterFederationDistributed
Files~100M~1B (sum of namespaces)Unlimited
Metadata ops/sec10K10K × N namespaces100K+
ComplexityLowMediumHigh
Cross-namespace opsN/ANoYes
Operational burdenLowMediumHigh
Best forMost deploymentsLarge enterprisesHyperscalers

This article focuses on Path A (Single Master) because:

  1. It’s the foundational model (GFS, early HDFS)
  2. Sufficient for 95% of deployments (up to 100M files)
  3. Simpler to understand and operate
  4. Concepts transfer to federated/distributed models

Path B (Federation) is covered briefly in the scaling section. Path C (Distributed) requires a separate article on metadata-at-scale architectures.

Monitoring

Chunk Server Pool

Master Cluster

Heartbeat

Heartbeat

Heartbeat

Metadata

Data

Data

Data

Client Library

Chunk Location Cache

Lease Manager

Write Buffer

Primary Master

Shadow Master 1

Shadow Master 2

Operation Log

Checkpoints

Rack 1

CS1, CS2, CS3

Rack 2

CS4, CS5, CS6

Rack 3

CS7, CS8, CS9

Heartbeat

Collector

Chunk

Rebalancer

Garbage

Collector

Manages all filesystem metadata and coordinates cluster operations.

Responsibilities:

  • Namespace management (directory tree, file metadata)
  • File-to-chunk mapping
  • Chunk replica placement decisions
  • Lease management for write coordination
  • Garbage collection of orphaned chunks
  • Re-replication of under-replicated chunks

Design decisions:

DecisionChoiceRationale
Metadata storageIn-memorySub-millisecond lookups, 64GB supports 100M files
PersistenceOperation log + checkpointsFast recovery, crash consistency
Chunk locationsNot persistedRebuilt from heartbeats in 30-60 seconds
FailoverManual + shadow mastersSimplicity; automatic adds complexity

Memory layout (per 64GB master):

DataSizeCount Supported
Namespace tree~200 bytes/file100M files = 20GB
File→chunk mapping~100 bytes/file100M files = 10GB
Chunk metadata~64 bytes/chunk500M chunks = 32GB
Total~62GB100M files, 500M chunks

Stores chunks as local files and serves read/write requests.

Responsibilities:

  • Store chunks as Linux files on local disks
  • Serve read requests directly to clients
  • Accept writes and forward in pipeline
  • Report chunk inventory via heartbeat
  • Compute and verify checksums
  • Participate in re-replication

Design decisions:

DecisionChoiceRationale
Chunk storageLocal filesystem (ext4/xfs)Leverage OS buffer cache, simple
Checksumming32KB blocks, CRC32CDetect corruption before serving
Heartbeat interval3 secondsBalance failure detection vs overhead
Chunk reportPiggyback on heartbeatReduce message count

Disk layout:

/data/
├── disk1/
│ ├── chunks/
│ │ ├── chunk_abc123.dat # 64MB chunk data
│ │ ├── chunk_abc123.crc # Checksums (2KB for 64MB)
│ │ └── chunk_def456.dat
│ └── meta/
│ └── chunk_inventory.db # Local SQLite for chunk metadata
├── disk2/
│ └── chunks/
└── disk12/
└── chunks/

Provides file system interface and handles complexity of distributed operations.

Responsibilities:

  • Translate file operations to master/chunk server RPCs
  • Cache chunk locations (reduces master load 100x)
  • Buffer writes for efficiency
  • Handle retries and failover
  • Implement record append semantics

Design decisions:

DecisionChoiceRationale
Location cacheLRU, 10K entries, 10min TTLReduces master load by 100x
Write buffer64MB (one chunk)Batch small writes
Retry policyExponential backoff, 3 retriesHandle transient failures
Checksum verificationOn readCatch corruption early

Create File:

CreateFile(path: string, replication: int) → FileHandle
  • Validates path, creates namespace entry
  • Does not allocate chunks (lazy allocation)
  • Returns handle with initial chunk locations

Open File:

OpenFile(path: string, mode: READ|WRITE|APPEND) → FileHandle
  • For writes: grants lease to client
  • Returns current chunk locations

Delete File:

DeleteFile(path: string) → void
  • Marks file as deleted in namespace
  • Actual chunk deletion via garbage collection (72-hour delay)

Get Chunk Locations:

GetChunkLocations(file: FileHandle, chunkIndex: int) → ChunkInfo

Response:

{
"chunkId": "chunk_abc123",
"version": 42,
"replicas": [
{ "server": "cs1.example.com:9000", "rack": "rack1" },
{ "server": "cs4.example.com:9000", "rack": "rack2" },
{ "server": "cs7.example.com:9000", "rack": "rack3" }
],
"primary": "cs1.example.com:9000",
"leaseExpiry": "2024-02-03T10:05:00Z"
}

Add Chunk:

AddChunk(file: FileHandle) → ChunkInfo
  • Allocates new chunk ID
  • Selects replica locations (rack-aware)
  • Grants lease to one replica as primary

Read Chunk:

ReadChunk(chunkId: string, offset: int, length: int) → bytes
  • Verifies checksum before returning data
  • Returns error if checksum fails (client retries other replica)

Write Chunk:

WriteChunk(chunkId: string, offset: int, data: bytes, replicas: []Server) → void
  • Accepts data, writes to disk
  • Forwards to next replica in pipeline
  • Returns success only when all replicas confirm

Append Chunk:

AppendChunk(chunkId: string, data: bytes) → (offset: int, error)
  • Primary determines offset
  • Broadcasts to replicas
  • Returns offset where data was appended
  • If chunk doesn’t have space, returns ChunkFull error
class DistributedFileSystem:
def create(path: str, replication: int = 3) -> File
def open(path: str, mode: str = "r") -> File
def delete(path: str) -> None
def rename(src: str, dst: str) -> None
def list(path: str) -> List[FileInfo]
def mkdir(path: str) -> None
def exists(path: str) -> bool
class File:
def read(size: int = -1) -> bytes
def write(data: bytes) -> int
def append(data: bytes) -> int # Returns offset
def seek(offset: int) -> None
def close() -> None

Namespace (in-memory tree):

type NamespaceNode struct {
Name string
IsDirectory bool
Children map[string]*NamespaceNode // For directories
FileInfo *FileMetadata // For files
Parent *NamespaceNode
// Access control
Owner string
Group string
Permissions uint16
}
type FileMetadata struct {
FileID uint64
Size int64
Replication int
ChunkSize int64 // Usually 64MB
Chunks []ChunkHandle
CreatedAt time.Time
ModifiedAt time.Time
}

Chunk mapping (in-memory hash table):

type ChunkHandle uint64
type ChunkMetadata struct {
Handle ChunkHandle
Version uint64 // Incremented on each mutation
Replicas []ChunkServerID // Current replica locations
Primary ChunkServerID // Current lease holder
LeaseExpiry time.Time
}
// Global map: O(1) lookup
var chunkTable map[ChunkHandle]*ChunkMetadata

Persistent, append-only log for crash recovery:

[Timestamp][OpType][OpData]
Example entries:
[1706900000][CREATE_FILE]["/logs/2024/app.log", replication=3]
[1706900001][ADD_CHUNK][fileId=12345, chunkHandle=67890, version=1]
[1706900002][UPDATE_REPLICAS][chunkHandle=67890, replicas=[cs1,cs4,cs7]]
[1706900003][DELETE_FILE]["/tmp/old_file.dat"]

Compaction:

  • Checkpoint: Serialize full in-memory state to disk
  • Truncate log entries before checkpoint
  • Frequency: Every 1M operations or 1 hour

Chunk file format:

chunk_<handle>.dat:
[Data: 64MB or less]
chunk_<handle>.meta:
{
"handle": 67890,
"version": 42,
"size": 67108864,
"checksums": [
{"offset": 0, "crc32c": "a1b2c3d4"},
{"offset": 32768, "crc32c": "e5f6g7h8"},
... // One per 32KB block
],
"createdAt": "2024-02-03T10:00:00Z"
}

Chunk server → Master (every 3 seconds):

{
"serverId": "cs1.example.com:9000",
"timestamp": 1706900000,
"diskUsage": {
"total": 48000000000000,
"used": 32000000000000,
"available": 16000000000000
},
"chunkReports": [
{ "handle": 67890, "version": 42, "size": 67108864 },
{ "handle": 67891, "version": 15, "size": 33554432 }
],
"corruptChunks": [67892],
"load": {
"readOps": 150,
"writeOps": 20,
"networkBytesIn": 1073741824,
"networkBytesOut": 5368709120
}
}

Master → Chunk server (response):

{
"commands": [
{ "type": "DELETE", "chunks": [67893, 67894] },
{ "type": "REPLICATE", "chunk": 67895, "target": "cs5.example.com:9000" },
{ "type": "REPORT_FULL", "reason": "version_mismatch" }
]
}
Secondary CS2Secondary CS1Primary CSMasterClientSecondary CS2Secondary CS1Primary CSMasterClientData cached in memory1. AddChunk(file)Select replicas (rack-aware)Grant lease to primaryChunkInfo{primary, secondaries, version}2. Push data (no write yet)2. Push data2. Push data3. WriteChunk(offset, length)Write to disk4. Forward write command4. Forward write command5. ACK5. ACK6. Success

Why separate data push from write command:

  1. Data flows directly to all replicas (parallel, saturates network)
  2. Write command is small, serializes at primary
  3. Primary controls ordering for concurrent writes
  4. If replica fails, retry is cheap (data already pushed)

Atomic append is the key differentiator from traditional file systems:

Secondary CSPrimary CSClient2Client1Secondary CSPrimary CSClient2Client1Both clients append concurrentlyAppendChunk(data1)AppendChunk(data2)Serialize: offset1 for data1Serialize: offset2 for data2Append data1 at offset1Append data2 at offset2ACK bothSuccess, offset=offset1Success, offset=offset2

Append semantics:

  • At-least-once guarantee: If append returns success, data is durably stored
  • Atomicity: Each append is all-or-nothing (no partial appends)
  • Ordering: Primary determines global order
  • Duplicates possible: If primary fails after writing but before ACK, client retries → duplicate record

Handling append failures:

If replica fails during append:
1. Primary returns failure to client
2. Client retries (may retry different chunk if padding needed)
3. Some replicas may have the data, some may not
4. Result: "defined" region followed by "inconsistent" padding

GFS-style distributed file systems use a relaxed consistency model:

After OperationConsistentDefined
Write (single client)YesYes
Write (concurrent clients)YesNo (interleaved)
Record Append (success)YesYes (at some offset)
Record Append (failure)NoNo

Definitions:

  • Consistent: All replicas have identical data
  • Defined: Data reflects exactly one client’s write

Application implications:

  1. Writers should use record append, not random writes
  2. Readers must handle:
    • Duplicate records (use unique IDs + deduplication)
    • Partial records (use checksums in records)
    • Inconsistent regions (validate before processing)

Example: Log file with concurrent appenders:

[Record 1: app_id=A, seq=1, checksum=valid] ← Defined
[Record 2: app_id=B, seq=1, checksum=valid] ← Defined
[Padding: zeros or garbage] ← Inconsistent (skip)
[Record 3: app_id=A, seq=2, checksum=valid] ← Defined
[Record 1: app_id=A, seq=1, checksum=valid] ← Duplicate (skip)

Goals:

  1. Survive rack failure
  2. Distribute load
  3. Minimize cross-rack traffic for writes

Algorithm (for 3 replicas):

def select_replicas(num_replicas: int, client_rack: str) -> List[Server]:
replicas = []
# First replica: prefer client's rack (if client is a chunk server)
# or select least-loaded server
if client_is_chunk_server and has_capacity(client_rack):
replicas.append(select_server(client_rack))
else:
replicas.append(select_least_loaded_server())
# Second replica: different rack
rack2 = select_different_rack(replicas[0].rack)
replicas.append(select_server(rack2))
# Third replica: same rack as second (minimize cross-rack writes)
replicas.append(select_server(rack2, exclude=replicas[1]))
return replicas
def select_server(rack: str, exclude: Server = None) -> Server:
candidates = [s for s in rack.servers
if s != exclude
and s.available_space > THRESHOLD
and s.recent_creates < RATE_LIMIT]
# Balance by available space and recent activity
return weighted_random(candidates, weight=available_space)

Write bandwidth analysis:

Replica PlacementCross-Rack Transfers
All same rack0
Spread across 3 racks2
1 rack + 2 another rack1

The third approach (1 + 2) balances reliability and bandwidth.

Yes

No

Heartbeat Timeout

30 seconds

Mark server as dead

Scan all chunks

on failed server

Chunk has

< 3 replicas?

Add to re-replication queue

Skip chunk

Prioritize:

1. Single replica

2. Two replicas

3. Under-replicated

Copy from healthy replica

to new server

Throttle to avoid

network saturation

Re-replication throttling:

  • Limit: 10 MB/s per source server
  • Reason: Prevent recovery traffic from impacting production reads
  • Trade-off: Slower recovery vs. sustained availability

Recovery process:

  1. Shadow master detection: Monitoring detects primary failure
  2. Promotion: Operator (or automated system) promotes shadow
  3. Log replay: Shadow applies any uncommitted log entries
  4. Chunk reports: Wait for chunk servers to report (30-60 seconds)
  5. Resume operations: Master accepts client requests

Recovery time breakdown:

PhaseDuration
Detection10-30 seconds
Promotion decisionManual: minutes, Auto: seconds
Log replaySeconds (incremental)
Chunk reports30-60 seconds
Total1-5 minutes

Checksum verification:

BLOCK_SIZE = 32 * 1024 # 32KB
def write_chunk(chunk_id: str, data: bytes):
# Compute checksums for each 32KB block
checksums = []
for i in range(0, len(data), BLOCK_SIZE):
block = data[i:i+BLOCK_SIZE]
checksums.append(crc32c(block))
# Write data and checksums atomically
write_file(f"{chunk_id}.dat", data)
write_file(f"{chunk_id}.crc", checksums)
def read_chunk(chunk_id: str, offset: int, length: int) -> bytes:
data = read_file(f"{chunk_id}.dat", offset, length)
checksums = read_file(f"{chunk_id}.crc")
# Verify each block
for i in range(offset // BLOCK_SIZE, (offset + length) // BLOCK_SIZE + 1):
block = data[i*BLOCK_SIZE:(i+1)*BLOCK_SIZE]
if crc32c(block) != checksums[i]:
raise CorruptionError(chunk_id, i)
return data

Corruption handling:

  1. Chunk server reports corruption to master
  2. Master marks chunk as corrupt
  3. Master initiates re-replication from healthy replica
  4. Chunk server deletes corrupt chunk

Lazy deletion design:

  1. DELETE /path/file → File renamed to hidden name (.deleted_<timestamp>_<filename>)
  2. After 72 hours: Master removes file metadata
  3. During heartbeat: Master tells chunk servers to delete orphaned chunks

Why 72-hour delay:

  • Allows recovery from accidental deletion
  • Batches deletion operations
  • Reduces master load

Orphan detection:

def garbage_collect():
# Collect all chunks referenced by files
referenced_chunks = set()
for file in all_files():
referenced_chunks.update(file.chunks)
# On each heartbeat, check chunk server's inventory
for chunk_id in chunk_server.reported_chunks:
if chunk_id not in referenced_chunks:
# Tell chunk server to delete
commands.append(DeleteChunk(chunk_id))

While distributed file systems are primarily backend infrastructure, client-facing considerations exist:

MapReduce/Spark data locality:

def get_input_splits(file_path: str) -> List[InputSplit]:
"""Return splits with location hints for scheduler."""
splits = []
for chunk in file.chunks:
locations = master.get_chunk_locations(chunk)
splits.append(InputSplit(
chunk_id=chunk.id,
offset=0,
length=chunk.size,
# Scheduler prefers these hosts
preferred_locations=[loc.host for loc in locations]
))
return splits

Data locality statistics:

Locality LevelTypical RateImpact
Node-local70-90%Zero network for read
Rack-local95-99%Low network overhead
Off-rack1-5%Full network cost

Essential operations:

Terminal window
# File operations
dfs put local_file.txt /hdfs/path/file.txt
dfs get /hdfs/path/file.txt local_file.txt
dfs ls /hdfs/path/
dfs rm /hdfs/path/file.txt
# Admin operations
dfs fsck /path # Check file system health
dfs balancer # Rebalance data across servers
dfs report # Cluster utilization report
dfs safemode enter|leave # Maintenance mode

Key metrics for operators:

MetricWarning ThresholdCritical Threshold
Under-replicated blocks> 100> 1000
Corrupt blocks> 0> 10
Dead nodes> 0> N × 0.05
Capacity used> 70%> 85%
Pending replications> 10000> 100000
Master heap usage> 70%> 85%
ComponentPurposeOptions
Master storageOperation log, checkpointsLocal SSD, NFS, cloud block storage
Chunk storageData storageLocal HDD/SSD arrays
NetworkData transfer10-100 Gbps, leaf-spine topology
MonitoringHealth, metricsPrometheus, Grafana, Datadog
ConfigurationCluster configZooKeeper, etcd, Consul

Master server (per server):

ComponentSpecificationRationale
CPU32+ coresMetadata operations are CPU-bound
Memory128-256 GBAll metadata in RAM
Storage2× NVMe SSD (RAID 1)Operation log durability
Network25 GbpsHeartbeat + client traffic

Chunk server (per server):

ComponentSpecificationRationale
CPU8-16 coresI/O bound, not CPU bound
Memory64-128 GBOS buffer cache
Storage12-24× 4-16 TB HDDCost-effective bulk storage
Network25-100 GbpsSaturate disk throughput

Cluster sizing formula:

Raw capacity needed = Data size × Replication factor
= 100 PB × 3 = 300 PB
Servers needed = Raw capacity / Capacity per server
= 300 PB / 48 TB = 6,250 servers
Network capacity = Expected throughput × Headroom
= 1 TB/s × 2 = 2 TB/s aggregate
= ~200 servers at 10 Gbps each (network limited)

Network

Storage Tier

Master Tier (3 AZs)

AZ 3

AZ 2

AZ 1

Primary Master

i3en.12xlarge

Shadow Master 1

i3en.12xlarge

Shadow Master 2

i3en.12xlarge

d3en.12xlarge × 100

d3en.12xlarge × 100

d3en.12xlarge × 100

Transit Gateway

VPC with placement groups

Instance selection:

RoleInstancevCPUsMemoryStorageNetwork
Masteri3en.12xlarge48384 GB4× 7.5TB NVMe50 Gbps
Chunk Serverd3en.12xlarge48192 GB12× 14TB HDD50 Gbps

Cost comparison (300 servers, 3-year reserved):

ComponentMonthly Cost
d3en.12xlarge × 297~$400K
i3en.12xlarge × 3~$8K
Network (inter-AZ)~$50K
Total~$460K/month

Alternative: Self-hosted on-prem often 60-70% cheaper for sustained workloads.

This design provides a distributed file system capable of:

  1. Petabyte-scale storage across thousands of commodity servers
  2. 100+ MB/s throughput per client for large sequential operations
  3. Fault tolerance surviving simultaneous disk, server, and rack failures
  4. Atomic record append enabling concurrent producers without coordination

Key architectural decisions:

  • Single master with in-memory metadata enables global optimization but limits scale to ~100M files
  • Large chunks (64-128 MB) optimize for batch processing but penalize small files
  • Relaxed consistency trades simplicity for application-level complexity (deduplication, checksums)
  • Append-only design eliminates random write complexity

Known limitations:

  • Single master is metadata bottleneck (addressed by federation or distributed metadata)
  • Large chunks waste space for small files (addressed by tiered storage)
  • No strong consistency for concurrent writes (acceptable for batch workloads)

When to use alternatives:

RequirementBetter Choice
Many small files (millions of < 1MB)Object storage (S3, MinIO)
POSIX semantics requiredCephFS, Lustre
Real-time random readsKey-value stores (Cassandra, DynamoDB)
Exabyte scaleColossus-style distributed metadata
  • Storage fundamentals (RAID, replication, erasure coding)
  • Distributed systems basics (consensus, failure detection)
  • Networking (TCP, datacenter topology)
TermDefinition
ChunkFixed-size unit of file data (64-128 MB), called “block” in HDFS
MasterServer managing metadata, called “NameNode” in HDFS
Chunk ServerServer storing chunk data, called “DataNode” in HDFS
LeaseTime-limited grant to a client for write operations
Operation LogAppend-only journal of metadata changes for recovery
CheckpointSnapshot of in-memory metadata state
Rack-awarePlacement strategy considering physical rack topology
Re-replicationProcess of copying chunks to restore replication factor
  • Architecture: Single master manages metadata in-memory; chunk servers store data as local files
  • Chunk size: 64-128 MB balances metadata overhead against small file efficiency
  • Replication: 3 replicas across 2 racks survives rack failure with one cross-rack write
  • Consistency: Relaxed model with atomic record append; applications handle duplicates
  • Write flow: Data pushed in parallel, write command serialized at primary
  • Failure handling: Heartbeat detection, re-replication throttled to preserve production traffic

Original Papers:

Production Systems:

Architecture Analysis:

Related Concepts:

  • Erasure coding vs replication - Trade-offs for fault tolerance in distributed storage
  • Consistent hashing - Data distribution pattern used in many distributed systems
Continue Reading
  • Previous

    Design a Distributed Key-Value Store

    System Design / System Design Problems 16 min read

    A distributed key-value store provides simple get/put semantics while handling the complexities of partitioning, replication, and failure recovery across a cluster of machines. This design explores the architectural decisions behind systems like Amazon Dynamo, Apache Cassandra, and Riak—AP systems that prioritize availability and partition tolerance over strong consistency. We also contrast with CP alternatives like etcd for scenarios requiring linearizability.

  • Next

    Design a Time Series Database

    System Design / System Design Problems 23 min read

    A comprehensive system design for a metrics and monitoring time-series database (TSDB) handling high-velocity writes, efficient compression, and long-term retention. This design addresses write throughput at millions of samples/second, sub-millisecond queries over billions of datapoints, cardinality management for dimensional data, and multi-tier storage for cost-effective retention.