docs: update documentation with information about replication

This commit is contained in:
Jeremy Tregunna 2025-04-28 01:15:24 -06:00
parent f9e332096c
commit 60d401a615
3 changed files with 799 additions and 331 deletions

View File

@ -20,6 +20,7 @@ Kevo is a clean, composable storage engine that follows LSM tree principles, foc
- **Interface-driven design** with clear component boundaries
- **Comprehensive statistics collection** for monitoring and debugging
- **ACID-compliant transactions** with SQLite-inspired reader-writer concurrency
- **Primary-replica replication** with automatic client request routing
## Use Cases
@ -154,7 +155,14 @@ Type `.help` in the CLI for more commands.
### Run Server
```bash
# Run as standalone node (default)
go run ./cmd/kevo/main.go -server [database_path]
# Run as primary node
go run ./cmd/kevo/main.go -server [database_path] -replication.enabled=true -replication.mode=primary -replication.listen=:50053
# Run as replica node
go run ./cmd/kevo/main.go -server [database_path] -replication.enabled=true -replication.mode=replica -replication.primary=localhost:50053
```
## Configuration
@ -192,6 +200,7 @@ Kevo implements a facade-based design over the LSM tree architecture, consisting
- **StorageManager**: Handles data storage operations across multiple layers
- **TransactionManager**: Manages transaction lifecycle and isolation
- **CompactionManager**: Coordinates background optimization processes
- **ReplicationManager**: Handles primary-replica replication and node role management
- **Statistics Collector**: Provides comprehensive metrics for monitoring
### Storage Layer
@ -201,6 +210,12 @@ Kevo implements a facade-based design over the LSM tree architecture, consisting
- **SSTables**: Immutable, sorted files for persistent storage
- **Compaction**: Background process to merge and optimize SSTables
### Replication Layer
- **Primary Node**: Single writer that streams WAL entries to replicas
- **Replica Node**: Read-only node that receives and applies WAL entries from the primary
- **Client Routing**: Smart client SDK that automatically routes reads to replicas and writes to the primary
### Interface-Driven Design
The system is designed around clear interfaces that define contracts between components:

View File

@ -0,0 +1,421 @@
# Kevo Client SDK Development Guide
This document provides technical guidance for developing client SDKs for Kevo in various programming languages. It focuses on the gRPC API, communication patterns, and best practices.
## gRPC API Overview
Kevo exposes its functionality through a gRPC service defined in `proto/kevo/service.proto`. The service provides operations for:
1. **Key-Value Operations** - Basic get, put, and delete operations
2. **Batch Operations** - Atomic multi-key operations
3. **Iterator Operations** - Range scans and prefix scans
4. **Transaction Operations** - Support for ACID transactions
5. **Administrative Operations** - Statistics and compaction
6. **Replication Operations** - Node role discovery and topology information
## Service Definition
The main service is `KevoService`, which contains the following RPC methods:
### Key-Value Operations
- `Get(GetRequest) returns (GetResponse)`: Retrieves a value by key
- `Put(PutRequest) returns (PutResponse)`: Stores a key-value pair
- `Delete(DeleteRequest) returns (DeleteResponse)`: Removes a key-value pair
### Batch Operations
- `BatchWrite(BatchWriteRequest) returns (BatchWriteResponse)`: Performs multiple operations atomically
### Iterator Operations
- `Scan(ScanRequest) returns (stream ScanResponse)`: Streams key-value pairs in a range
### Transaction Operations
- `BeginTransaction(BeginTransactionRequest) returns (BeginTransactionResponse)`: Starts a new transaction
- `CommitTransaction(CommitTransactionRequest) returns (CommitTransactionResponse)`: Commits a transaction
- `RollbackTransaction(RollbackTransactionRequest) returns (RollbackTransactionResponse)`: Aborts a transaction
- `TxGet(TxGetRequest) returns (TxGetResponse)`: Get operation in a transaction
- `TxPut(TxPutRequest) returns (TxPutResponse)`: Put operation in a transaction
- `TxDelete(TxDeleteRequest) returns (TxDeleteResponse)`: Delete operation in a transaction
- `TxScan(TxScanRequest) returns (stream TxScanResponse)`: Scan operation in a transaction
### Administrative Operations
- `GetStats(GetStatsRequest) returns (GetStatsResponse)`: Retrieves database statistics
- `Compact(CompactRequest) returns (CompactResponse)`: Triggers compaction
### Replication Operations
- `GetNodeInfo(GetNodeInfoRequest) returns (GetNodeInfoResponse)`: Retrieves information about the node's role and replication topology
## Implementation Considerations
When implementing a client SDK, consider the following aspects:
### Connection Management
1. **Establish Connection**: Create and maintain gRPC connection to the server
2. **Connection Pooling**: Implement connection pooling for performance (if the language/platform supports it)
3. **Timeout Handling**: Set appropriate timeouts for connection establishment and requests
4. **TLS Support**: Support secure communications with TLS
5. **Replication Awareness**: Discover node roles and maintain appropriate connections
```
// Connection options example
options = {
endpoint: "localhost:50051",
connectTimeout: 5000, // milliseconds
requestTimeout: 10000, // milliseconds
poolSize: 5, // number of connections
tlsEnabled: false,
certPath: "/path/to/cert.pem",
keyPath: "/path/to/key.pem",
caPath: "/path/to/ca.pem",
// Replication options
discoverTopology: true, // automatically discover node role and topology
autoRouteWrites: true, // automatically route writes to primary
autoRouteReads: true // route reads to replicas when possible
}
```
### Basic Operations
Implement clean, idiomatic methods for basic operations:
```
// Example operations (in pseudo-code)
client.get(key) -> [value, found]
client.put(key, value, sync) -> success
client.delete(key, sync) -> success
// With proper error handling
try {
value, found = client.get(key)
} catch (Exception e) {
// Handle errors
}
```
### Batch Operations
Batch operations should be atomic from the client perspective:
```
// Example batch write
operations = [
{ type: "put", key: key1, value: value1 },
{ type: "put", key: key2, value: value2 },
{ type: "delete", key: key3 }
]
success = client.batchWrite(operations, sync)
```
### Streaming Operations
For scan operations, implement both streaming and iterator patterns based on language idioms:
```
// Streaming example
client.scan(prefix, startKey, endKey, limit, function(key, value) {
// Process each key-value pair
})
// Iterator example
iterator = client.scan(prefix, startKey, endKey, limit)
while (iterator.hasNext()) {
[key, value] = iterator.next()
// Process each key-value pair
}
iterator.close()
```
### Transaction Support
Provide a transaction API with proper resource management:
```
// Transaction example
tx = client.beginTransaction(readOnly)
try {
val = tx.get(key)
tx.put(key2, value2)
tx.commit()
} catch (Exception e) {
tx.rollback()
throw e
}
```
Consider implementing a transaction callback pattern for better resource management (if the language supports it):
```
// Transaction callback pattern
client.transaction(function(tx) {
// Operations inside transaction
val = tx.get(key)
tx.put(key2, value2)
// Auto-commit if no exceptions
})
```
### Error Handling and Retries
1. **Error Categories**: Map gRPC error codes to meaningful client-side errors
2. **Retry Policy**: Implement exponential backoff with jitter for transient errors
3. **Error Context**: Provide detailed error information
```
// Retry policy example
retryPolicy = {
maxRetries: 3,
initialBackoffMs: 100,
maxBackoffMs: 2000,
backoffFactor: 1.5,
jitter: 0.2
}
```
### Performance Considerations
1. **Message Size Limits**: Handle large messages appropriately
2. **Stream Management**: Properly handle long-running streams
```
// Performance options example
options = {
maxMessageSize: 16 * 1024 * 1024 // 16MB
}
```
## Key Implementation Areas
### Key and Value Types
All keys and values are represented as binary data (`bytes` in protobuf). Your SDK should handle conversions between language-specific types and byte arrays.
### The `sync` Parameter
In operations that modify data (`Put`, `Delete`, `BatchWrite`), the `sync` parameter determines whether the operation waits for data to be durably persisted before returning. This is a critical parameter for balancing performance vs. durability.
### Transaction IDs
Transaction IDs are strings generated by the server on transaction creation. Clients must store and pass these IDs for all operations within a transaction.
### Scan Operation Parameters
- `prefix`: Optional prefix to filter keys (when provided, start_key/end_key are ignored)
- `start_key`: Start of the key range (inclusive)
- `end_key`: End of the key range (exclusive)
- `limit`: Maximum number of results to return
### Node Role and Replication Support
When implementing an SDK for a Kevo cluster with replication, your client should:
1. **Discover Node Role**: On connection, query the server for node role information
2. **Connection Management**: Maintain appropriate connections based on node role:
- When connected to a primary, optionally connect to available replicas for reads
- When connected to a replica, connect to the primary for writes
3. **Operation Routing**: Direct operations to the appropriate node:
- Read operations: Can be directed to replicas when available
- Write operations: Must be directed to the primary
4. **Connection Recovery**: Handle connection failures with automatic reconnection
### Node Role Discovery
```
// Get node information on connection
nodeInfo = client.getNodeInfo()
// Check node role
if (nodeInfo.role == "primary") {
// Connected to primary
// Optionally connect to replicas for read distribution
for (replica in nodeInfo.replicas) {
if (replica.available) {
connectToReplica(replica.address)
}
}
} else if (nodeInfo.role == "replica") {
// Connected to replica
// Connect to primary for writes
connectToPrimary(nodeInfo.primaryAddress)
}
```
### Operation Routing
```
// Get operation
function get(key) {
if (nodeInfo.role == "primary" && hasReplicaConnections()) {
// Try to read from replica
try {
return readFromReplica(key)
} catch (error) {
// Fall back to primary if replica read fails
return readFromPrimary(key)
}
} else {
// Read from current connection
return readFromCurrent(key)
}
}
// Put operation
function put(key, value) {
if (nodeInfo.role == "replica" && hasPrimaryConnection()) {
// Route write to primary
return writeToPrimary(key, value)
} else {
// Write to current connection
return writeToCurrent(key, value)
}
}
```
## Common Pitfalls
1. **Stream Resource Leaks**: Always close streams properly
2. **Transaction Resource Leaks**: Always commit or rollback transactions
3. **Large Result Sets**: Implement proper pagination or streaming for large scans
4. **Connection Management**: Properly handle connection failures and reconnection
5. **Timeout Handling**: Set appropriate timeouts for different operations
6. **Role Discovery**: Discover node role at connection time and after reconnections
7. **Write Routing**: Always route writes to the primary node
8. **Read-after-Write**: Be aware of potential replica lag in read-after-write scenarios
## Example Usage Patterns
To ensure a consistent experience across different language SDKs, consider implementing these common usage patterns:
### Simple Usage
```
// Create client
client = new KevoClient("localhost:50051")
// Connect
client.connect()
// Key-value operations
client.put("key", "value")
value = client.get("key")
client.delete("key")
// Close connection
client.close()
```
### Advanced Usage with Options
```
// Create client with options
options = {
endpoint: "kevo-server:50051",
connectTimeout: 5000,
requestTimeout: 10000,
tlsEnabled: true,
certPath: "/path/to/cert.pem",
// ... more options
}
client = new KevoClient(options)
// Connect with context
client.connect(context)
// Batch operations
operations = [
{ type: "put", key: "key1", value: "value1" },
{ type: "put", key: "key2", value: "value2" },
{ type: "delete", key: "key3" }
]
client.batchWrite(operations, true) // sync=true
// Transaction
client.transaction(function(tx) {
value = tx.get("key1")
tx.put("key2", "updated-value")
tx.delete("key3")
})
// Iterator
iterator = client.scan({ prefix: "user:" })
while (iterator.hasNext()) {
[key, value] = iterator.next()
// Process each key-value pair
}
iterator.close()
// Close connection
client.close()
```
### Replication Usage
```
// Create client with replication options
options = {
endpoint: "kevo-replica:50051", // Connect to any node (primary or replica)
discoverTopology: true, // Automatically discover node role
autoRouteWrites: true, // Route writes to primary
autoRouteReads: true // Distribute reads to replicas when possible
}
client = new KevoClient(options)
// Connect and discover topology
client.connect()
// Get node role information
nodeInfo = client.getNodeInfo()
console.log("Connected to " + nodeInfo.role + " node")
if (nodeInfo.role == "primary") {
console.log("This node has " + nodeInfo.replicas.length + " replicas")
} else if (nodeInfo.role == "replica") {
console.log("Primary node is at " + nodeInfo.primaryAddr)
}
// Operations automatically routed to appropriate nodes
client.put("key1", "value1") // Routed to primary
value = client.get("key1") // May be routed to a replica if available
// Different routing behavior can be explicitly set
value = client.get("key2", { preferReplica: false }) // Force primary read
// Manual routing for advanced use cases
client.withPrimary(function(primary) {
// These operations are executed directly on the primary
primary.get("key3")
primary.put("key4", "value4")
})
// Close all connections
client.close()
```
## Testing Your SDK
When testing your SDK implementation, consider these scenarios:
1. **Basic Operations**: Simple get, put, delete operations
2. **Concurrency**: Multiple concurrent operations
3. **Error Handling**: Server errors, timeouts, network issues
4. **Connection Management**: Reconnection after server restart
5. **Large Data**: Large keys and values, many operations
6. **Transactions**: ACID properties, concurrent transactions
7. **Performance**: Throughput, latency, resource usage
8. **Replication**:
- Node role discovery
- Write redirection from replica to primary
- Read distribution to replicas
- Connection handling when nodes are unavailable
- Read-after-write scenarios with potential replica lag
## Conclusion
When implementing a Kevo client SDK, focus on providing an idiomatic experience for the target language while correctly handling the underlying gRPC communication details. The goal is to make the client API intuitive for developers familiar with the language, while ensuring correct and efficient interaction with the Kevo server.

View File

@ -1,6 +1,6 @@
# Replication Package Documentation
# Replication System Documentation
The `replication` package implements the primary-replica replication protocol for the Kevo database engine. It ensures that replicas maintain a crash-resilient, consistent copy of the primarys data by streaming Write-Ahead Log (WAL) entries in strict logical order.
The replication system in Kevo implements a primary-replica architecture that allows scaling read operations across multiple replica nodes while maintaining a single writer (primary node). It ensures that replicas maintain a crash-resilient, consistent copy of the primary's data by streaming Write-Ahead Log (WAL) entries in strict logical order.
## Overview
@ -10,362 +10,394 @@ It guarantees:
- **Exactly-once application**: WAL entries are applied in order without duplication.
- **Crash resilience**: Both primary and replicas can recover cleanly after restart.
- **Simplicity**: Designed to be minimal, efficient, and extensible.
- **Transparent Client Experience**: Client SDKs automatically handle routing between primary and replicas.
The WAL sequence number acts as a **Lamport clock** to provide total ordering across all operations.
---
## Implementation Details
## Replication Architecture
The replication system is implemented across several packages:
- **Primary**: The sole writer; streams WAL entries.
- **Replica**: Receives WAL entries, applies them, and persists them locally.
- **Single-writer model**: Only the primary generates new WAL entries.
1. **pkg/replication**: Core replication functionality
- Primary implementation
- Replica implementation
- WAL streaming protocol
- Batching and compression
### Key Responsibilities
2. **pkg/engine**: Engine integration
- EngineFacade integration with ReplicationManager
- Read-only mode for replicas
Primary:
- Manage WAL files and entry streaming.
- Handle retransmission requests from replicas.
- Enforce authentication and security.
3. **pkg/client**: Client SDK integration
- Node role discovery protocol
- Automatic operation routing
- Failover handling
Replica:
- Request WAL entries starting from a specific sequence.
- Apply entries strictly in order.
- Persist entries before acknowledging.
- Handle gaps and perform recovery if needed.
## Node Roles
---
Kevo supports three node roles:
## Message Types and Protocol
1. **Standalone**: A single node with no replication
- Handles both reads and writes
- Default mode when replication is not configured
### gRPC Service Definition
2. **Primary**: The single writer node in a replication cluster
- Processes all write operations
- Streams WAL entries to replicas
- Can serve read operations but typically offloads them to replicas
3. **Replica**: Read-only nodes that replicate data from the primary
- Process read operations
- Apply WAL entries from the primary
- Reject write operations with redirection information
## Replication Manager
The `ReplicationManager` is the central component of the replication system. It:
1. Handles node configuration and setup
2. Starts the appropriate mode (primary or replica) based on configuration
3. Integrates with the storage engine and WAL
4. Exposes replication topology information
5. Manages the replication state machine
### Configuration
The ReplicationManager is configured via the `ManagerConfig` struct:
```go
type ManagerConfig struct {
Enabled bool // Enable replication
Mode string // "primary", "replica", or "standalone"
ListenAddr string // Address for primary to listen on (e.g., ":50053")
PrimaryAddr string // Address of the primary (for replica mode)
// Advanced settings
MaxBatchSize int // Maximum batch size for streaming
RetentionTime time.Duration // How long to retain WAL entries
CompressionEnabled bool // Enable compression
}
```
### Status Information
The ReplicationManager provides status information through its `Status()` method:
```go
// Example status information
{
"enabled": true,
"mode": "primary",
"active": true,
"listen_address": ":50053",
"connected_replicas": 2,
"last_sequence": 12345,
"bytes_transferred": 1048576
}
```
## Primary Node Implementation
The primary node is responsible for:
1. Observing WAL entries as they are written
2. Streaming entries to connected replicas
3. Handling acknowledgments from replicas
4. Tracking replica state and lag
### WAL Observer
The primary implements the `WALEntryObserver` interface to be notified of new WAL entries:
```go
// Simplified implementation
func (p *Primary) OnEntryWritten(entry *wal.Entry) {
p.buffer.Add(entry)
p.notifyReplicas()
}
```
### Streaming Implementation
The primary streams entries using a gRPC service:
```go
// Simplified streaming implementation
func (p *Primary) StreamWAL(req *proto.WALStreamRequest, stream proto.WALReplication_StreamWALServer) error {
startSeq := req.StartSequence
// Send initial entries from WAL
entries, err := p.wal.GetEntriesFrom(startSeq)
if err != nil {
return err
}
if err := p.sendEntries(entries, stream); err != nil {
return err
}
// Subscribe to new entries
subscription := p.subscribe()
defer p.unsubscribe(subscription)
for {
select {
case entries := <-subscription.Entries():
if err := p.sendEntries(entries, stream); err != nil {
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
```
## Replica Node Implementation
The replica node is responsible for:
1. Connecting to the primary
2. Receiving WAL entries
3. Applying entries to the local storage engine
4. Acknowledging successfully applied entries
### State Machine
The replica uses a state machine to manage its lifecycle:
```
CONNECTING → STREAMING_ENTRIES → APPLYING_ENTRIES → FSYNC_PENDING → ACKNOWLEDGING → WAITING_FOR_DATA
```
### Entry Application
Entries are applied in strict sequence order:
```go
// Simplified implementation
func (r *Replica) applyEntries(entries []*wal.Entry) error {
// Verify entries are in proper sequence
for _, entry := range entries {
if entry.Sequence != r.nextExpectedSequence {
return ErrSequenceGap
}
r.nextExpectedSequence++
}
// Apply entries to the engine
if err := r.engine.ApplyBatch(entries); err != nil {
return err
}
// Update last applied sequence
r.lastAppliedSequence = entries[len(entries)-1].Sequence
return nil
}
```
## Client SDK Integration
The client SDK provides a seamless experience for applications using Kevo with replication:
1. **Node Role Discovery**: On connection, clients discover the node's role and replication topology
2. **Automatic Write Redirection**: Write operations to replicas are transparently redirected to the primary
3. **Read Distribution**: When connected to a primary with replicas, reads can be distributed to replicas
4. **Connection Recovery**: Connection failures are handled with automatic retry and reconnection
### Node Information
When connecting, the client retrieves node information:
```go
// NodeInfo structure returned by the server
type NodeInfo struct {
Role string // "primary", "replica", or "standalone"
PrimaryAddr string // Address of the primary node (for replicas)
Replicas []ReplicaInfo // Available replica nodes (for primary)
LastSequence uint64 // Last applied sequence number
ReadOnly bool // Whether the node is in read-only mode
}
// Example ReplicaInfo
type ReplicaInfo struct {
Address string // Host:port of the replica
LastSequence uint64 // Last applied sequence number
Available bool // Whether the replica is available
Region string // Optional region information
Meta map[string]string // Additional metadata
}
```
### Smart Routing
The client automatically routes operations to the appropriate node:
```go
// Get retrieves a value by key
// If connected to a primary with replicas, it will route reads to a replica
func (c *Client) Get(ctx context.Context, key []byte) ([]byte, bool, error) {
// Check if we should route to replica
shouldUseReplica := c.nodeInfo != nil &&
c.nodeInfo.Role == "primary" &&
len(c.replicaConn) > 0
if shouldUseReplica {
// Select a replica for reading
selectedReplica := c.replicaConn[0]
// Try the replica first
resp, err := selectedReplica.Send(ctx, request)
// Fall back to primary if replica fails
if err != nil {
resp, err = c.client.Send(ctx, request)
}
} else {
// Use default connection
resp, err = c.client.Send(ctx, request)
}
// Process response...
}
// Put stores a key-value pair
// If connected to a replica, it will automatically route the write to the primary
func (c *Client) Put(ctx context.Context, key, value []byte) (bool, error) {
// Check if we should route to primary
shouldUsePrimary := c.nodeInfo != nil &&
c.nodeInfo.Role == "replica" &&
c.primaryConn != nil
if shouldUsePrimary {
// Use primary connection for writes when connected to replica
resp, err = c.primaryConn.Send(ctx, request)
} else {
// Use default connection
resp, err = c.client.Send(ctx, request)
// If we get a read-only error, try to discover topology and retry
if err != nil && isReadOnlyError(err) {
if err := c.discoverTopology(ctx); err == nil {
// Retry with primary if we now have one
if c.primaryConn != nil {
resp, err = c.primaryConn.Send(ctx, request)
}
}
}
}
// Process response...
}
```
## Server Configuration
To run a Kevo server with replication, use the following configuration options:
### Standalone Mode (Default)
```bash
kevo -server [database_path]
```
### Primary Mode
```bash
kevo -server [database_path] -replication.enabled=true -replication.mode=primary -replication.listen=:50053
```
### Replica Mode
```bash
kevo -server [database_path] -replication.enabled=true -replication.mode=replica -replication.primary=localhost:50053
```
## Implementation Considerations
### Durability
- Primary: All entries are durably written to WAL before being streamed
- Replica: Entries are applied and fsynced before acknowledgment
- WAL retention on primary ensures replicas can recover from short-term failures
### Consistency
- Primary is always the source of truth for writes
- Replicas may temporarily lag behind the primary
- Last sequence number indicates replication status
- Clients can choose to verify replica freshness for critical operations
### Performance
- Batch size is configurable to balance latency and throughput
- Compression can be enabled to reduce network bandwidth
- Read operations can be distributed across replicas for scaling
- Replicas operate in read-only mode, eliminating write contention
### Fault Tolerance
- Replica node restart: Recover local state, catch up missing entries
- Primary node restart: Resume serving WAL entries to replicas
- Network failures: Automatic reconnection with exponential backoff
- Gap detection: Replicas verify sequence continuity
## Protocol Details
The replication protocol is defined using Protocol Buffers:
```proto
service WALReplication {
rpc StreamWAL (WALStreamRequest) returns (stream WALStreamResponse);
rpc Acknowledge (Ack) returns (AckResponse);
}
```
### Message Structures
message WALStreamRequest {
uint64 start_sequence = 1;
uint32 protocol_version = 2;
bool compression_supported = 3;
}
#### WALStreamRequest
- `start_sequence (uint64)`: Starting Lamport clock.
- `protocol_version (uint32)`: Protocol negotiation.
- `compression_supported (bool)`: Indicates compression capability.
message WALStreamResponse {
repeated WALEntry entries = 1;
bool compressed = 2;
}
#### WALStreamResponse
- `entries (repeated WALEntry)`: WAL entries.
- `compressed (bool)`: Whether payloads are compressed.
message WALEntry {
uint64 sequence_number = 1;
bytes payload = 2;
FragmentType fragment_type = 3;
}
#### WALEntry
- `sequence_number (uint64)`: Lamport clock.
- `payload (bytes)`: Serialized WAL record.
- `fragment_type (enum)`: FULL, FIRST, MIDDLE, LAST.
message Ack {
uint64 acknowledged_up_to = 1;
}
#### Ack
- `acknowledged_up_to (uint64)`: Highest sequence number persisted.
#### AckResponse
- `success (bool)`: Acknowledgment result.
- `message (string)`: Optional response detail.
#### Nack (Optional Extension)
- `missing_from_sequence (uint64)`: Resend from this sequence.
---
## Replication State Machine
```plaintext
+------------------+
| CONNECTING |
| (Establish gRPC) |
+--------+---------+
|
v
+----------------------+
| STREAMING_ENTRIES |
| (Receiving WAL) |
+--------+-------------+
|
v
+----------------------+
| APPLYING_ENTRIES |
| (Enqueue and order) |
+--------+-------------+
|
v
+----------------------+
| FSYNC_PENDING |
| (Group writes) |
+--------+-------------+
|
v
+----------------------+
| ACKNOWLEDGING |
| (Send Ack to primary)|
+--------+-------------+
|
v
+----------------------+
| WAITING_FOR_DATA |
| (No new WAL yet) |
+--------+-------------+
|
+----+----+
| |
v v
ERROR STREAMING_ENTRIES
(RECONNECT / NACK) (New WAL arrived)
```
The replica progresses through the following states:
State | Description | Triggers
CONNECTING | Establish secure connection to primary | Start, reconnect, or recover
STREAMING_ENTRIES | Actively receiving WAL entries | Connection established, WAL available
APPLYING_ENTRIES | WAL entries are validated and ordered | Entries received
FSYNC_PENDING | Buffering writes to durable storage | After applying batch
ACKNOWLEDGING | Sending Ack for persisted entries | After fsync complete
WAITING_FOR_DATA | No entries available, waiting | WAL drained
ERROR | Handle connection loss, gaps, invalid data | Connection failure, NACK required
---
## Replication Session Flow
```plaintext
Replica Primary
| |
|--- Secure Connection Setup ----------------->|
| |
|--- WALStreamRequest(start_seq=X) ------------>|
| |
|<-- WALStreamResponse(entries) ---------------|
| |
| Apply entries sequentially |
| Buffer entries to durable WAL |
| fsync to disk |
| |
|--- Ack(ack_up_to_seq=Y) --------------------->|
| |
|<-- WALStreamResponse(next entries) ----------|
| |
[repeat until EOF or failure]
(if gap detected)
| |
|--- Nack(from_seq=Z) ------------------------>|
| |
|<-- WALStreamResponse(retransmit from Z) -----|
(on error)
| |
|--- Reconnect and resume -------------------->|
```
## Streaming Behavior
The replica lifecycle:
1. **Connection Setup**
- Replica establishes secure (TLS) gRPC connection to the primary.
2. **Streaming Request**
- Sends `WALStreamRequest { start_sequence = last_applied_sequence + 1 }`.
3. **WAL Entry Streaming**
- Primary streams WAL entries in batches.
- Entries are fragmented as needed (based on WAL fragmentation rules).
4. **Application and fsync**
- Replica applies entries strictly in order.
- Replica performs `fsync` after grouped application.
5. **Acknowledgment**
- Replica sends `Ack` for the highest persisted Lamport clock.
- Primary can then prune acknowledged WAL entries.
6. **Gap Recovery**
- If a sequence gap is detected, replica issues a `Nack`.
- Primary retransmits from the missing sequence.
7. **Heartbeat / Keepalive**
- Optional periodic messages to detect dead peers.
---
## WAL Entry Framing
Each gRPC `WALStreamResponse` frame includes:
| Field | Description |
|:------|:------------|
| MessageType (1 byte) | WALStreamResponse |
| CompressionFlag (1 byte) | Whether payload is compressed |
| EntryCount (4 bytes) | Number of entries |
| Entries | List of WALEntry |
Each `WALEntry`:
| Field | Description |
|:------|:------------|
| Sequence Number (8 bytes) | Logical Lamport clock |
| Fragment Type (1 byte) | FULL, FIRST, MIDDLE, LAST |
| Payload Size (4 bytes) | Payload size |
| Payload (variable) | WAL operation |
---
## Durability Guarantees
- Replica **must fsync** WAL entries before sending an acknowledgment.
- Acknowledgments cover *all entries up to and including* the acknowledged sequence number.
- If a batch is streamed, multiple entries can be fsynced together to amortize disk costs.
---
## Compression Support
- Compression is negotiated via `compression_supported` in `WALStreamRequest`.
- Payload compression uses lightweight codecs (e.g., snappy or zstd).
- Compressed payloads are transparently decompressed by the replica.
---
## Cold Restart Semantics
### Replica Restart
1. Recover in-memory state by replaying local WAL files.
2. Identify the `last_applied_sequence`.
3. Reconnect to the primary, requesting WAL from `last_applied_sequence + 1`.
4. Continue streaming.
If the requested sequence is no longer available:
- Primary sends an `Error(OUT_OF_RANGE)`.
- Replica must trigger **full resync** (out of scope for this document).
### Primary Restart
1. Recover in-memory WAL pointers by scanning local WAL files.
2. Retain WAL files long enough for lagging replicas (configurable retention window).
3. Serve WAL entries starting from requested sequence if available.
If requested sequence is missing:
- Return `Error(OUT_OF_RANGE)`.
---
## Failure and Recovery
| Failure Type | Recovery Behavior |
|:-------------|:------------------|
| Replica crash | Replay local WAL and reconnect |
| Primary crash | Recover WAL state and resume serving |
| WAL gap detected | Replica issues `Nack`, primary retransmits |
| Stale WAL requested | Full resync required |
| Network failure | Replica reconnects and resumes |
| Data corruption | Skips bad records when possible, follows WAL corruption recovery model |
---
## Transport and Tuning Parameters
| Setting | Recommended Value |
|:--------|:------------------|
| `grpc.max_receive_message_length` | 16MB |
| `grpc.max_send_message_length` | 16MB |
| Keepalive time | 10s |
| Keepalive timeout | 5s |
| Permit without stream | true |
| TLS security | Mandatory with mTLS |
| Retry policy | Application-level reconnect |
---
## Performance Considerations
### Batching
- Form batches up to 256KB512KB before sending.
- Respect transaction boundaries when batching.
#### Example smart batching algorithm in psudeocode
```psuedo
function formBatch(entries):
batch = []
batch_size = 0
for entry in entries:
if batch_size + entry.size > 512KB:
break
batch.append(entry)
batch_size += entry.size
return batch
```
### Flow Control
- Replica should apply and fsync frequently to avoid unbounded memory growth.
### WAL Retention
- Retain WAL files for at least the maximum replica lag window (e.g., 12 hours).
---
## Example Usage
### Starting Replication
```go
// Start replication stream from last applied Lamport clock
client := replication.NewWALReplicationClient(conn)
stream, err := client.StreamWAL(context.TODO(), &replication.WALStreamRequest{
StartSequence: lastAppliedSequence + 1,
ProtocolVersion: 1,
CompressionSupported: true,
})
// Receive entries
for {
resp, err := stream.Recv()
if err != nil {
log.Fatal(err)
}
for _, entry := range resp.Entries {
applyEntry(entry)
}
message AckResponse {
bool success = 1;
string message = 2;
}
```
### Acknowledging Entries
The protocol ensures:
- Entries are streamed in order
- Gaps are detected using sequence numbers
- Large entries can be fragmented
- Compression is negotiated for efficiency
```go
// After fsyncing applied entries
_, err := client.Acknowledge(context.TODO(), &replication.Ack{
AcknowledgedUpTo: latestSequence,
})
if err != nil {
log.Fatal(err)
}
```
## Limitations and Trade-offs
---
1. **Single Writer Model**: The system follows a strict single-writer architecture, limiting write throughput to a single primary node
2. **Replica Lag**: Replicas may be slightly behind the primary, requiring careful consideration for read-after-write scenarios
3. **Manual Failover**: The system does not implement automatic failover; if the primary fails, manual intervention is required
4. **Cold Start**: If WAL entries are pruned, new replicas require a full resync from the primary
## Trade-offs and Limitations
## Future Work
| Aspect | Trade-off |
|:-------|:----------|
| Write Amplification | WAL streamed first, then MemTable rebuild |
| Replica Lag | Dependent on disk fsync speed and network conditions |
| WAL Retention | Must balance storage cost against replica tolerance for lag |
| Cold Start | If WAL entries are pruned, full resync becomes necessary |
The current implementation provides a robust foundation for replication, with several planned enhancements:
---
## Future Work (Optional Extensions)
- Full Resync Protocol (snapshot transfer + WAL catch-up).
- Multi-replica support (fan-out streaming).
- Cross-region replication optimizations.
- Tunable flow control and backpressure management.
1. **Multi-region Replication**: Optimize for cross-region replication
2. **Replica Groups**: Support for replica tiers and read preferences
3. **Snapshot Transfer**: Efficient initialization of new replicas without WAL replay
4. **Flow Control**: Backpressure mechanisms to handle slow replicas