kevo/pkg/common/clock/lamport.go

180 lines
5.0 KiB
Go

// Package clock provides logical clock implementations for distributed systems.
package clock
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"sync"
"time"
)
// NodeID represents a unique identifier for a node in the distributed system.
type NodeID [16]byte
// String returns a human-readable string representation of a NodeID.
func (id NodeID) String() string {
return hex.EncodeToString(id[:])
}
// Equal checks if two NodeIDs are equal.
func (id NodeID) Equal(other NodeID) bool {
return bytes.Equal(id[:], other[:])
}
// LamportClock implements a Lamport logical clock for event ordering in distributed systems.
// It maintains a monotonically increasing counter that is incremented on each local event.
// When receiving messages from other nodes, the counter is updated to the maximum of its
// current value and the received value, plus one.
type LamportClock struct {
counter uint64 // The logical clock counter
nodeID NodeID // This node's unique identifier
mu sync.RWMutex // Mutex to protect concurrent access
}
// Timestamp represents a Lamport timestamp with both the logical counter
// and the Node ID to break ties when logical counters are equal.
type Timestamp struct {
Counter uint64 // Logical counter value
Node NodeID // NodeID used to break ties
}
// NewLamportClock creates a new Lamport clock with the given NodeID.
func NewLamportClock(nodeID NodeID) *LamportClock {
return &LamportClock{
counter: 0,
nodeID: nodeID,
}
}
// Tick increments the local counter and returns a new timestamp.
// This should be called before generating any local event like writing to the WAL.
func (lc *LamportClock) Tick() Timestamp {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.counter++
return Timestamp{
Counter: lc.counter,
Node: lc.nodeID,
}
}
// Update compares the provided timestamp with the local counter and
// updates the local counter to be at least as large as the received one.
// This should be called when processing events from other nodes.
func (lc *LamportClock) Update(ts Timestamp) Timestamp {
lc.mu.Lock()
defer lc.mu.Unlock()
if ts.Counter > lc.counter {
lc.counter = ts.Counter
}
// Increment the counter regardless to ensure causality
lc.counter++
return Timestamp{
Counter: lc.counter,
Node: lc.nodeID,
}
}
// GetCurrent returns the current timestamp without incrementing the counter.
func (lc *LamportClock) GetCurrent() Timestamp {
lc.mu.RLock()
defer lc.mu.RUnlock()
return Timestamp{
Counter: lc.counter,
Node: lc.nodeID,
}
}
// ManualSet sets the counter to a specific value if it's greater than the current value.
// This is useful for bootstrap or recovery scenarios.
func (lc *LamportClock) ManualSet(counter uint64) {
lc.mu.Lock()
defer lc.mu.Unlock()
if counter > lc.counter {
lc.counter = counter
}
}
// Compare compares two timestamps according to the Lamport ordering.
// Returns:
// -1 if ts1 < ts2
// 0 if ts1 = ts2
// +1 if ts1 > ts2
func Compare(ts1, ts2 Timestamp) int {
if ts1.Counter < ts2.Counter {
return -1
}
if ts1.Counter > ts2.Counter {
return 1
}
// Break ties using node ID comparison
return bytes.Compare(ts1.Node[:], ts2.Node[:])
}
// Less returns true if ts1 is less than ts2 according to Lamport ordering.
func Less(ts1, ts2 Timestamp) bool {
return Compare(ts1, ts2) < 0
}
// Equal returns true if the two timestamps are equal.
func Equal(ts1, ts2 Timestamp) bool {
return Compare(ts1, ts2) == 0
}
// String returns a human-readable representation of the timestamp.
func (ts Timestamp) String() string {
return fmt.Sprintf("%d@%s", ts.Counter, ts.Node.String()[:8])
}
// Bytes serializes the timestamp to a byte array for network transmission.
func (ts Timestamp) Bytes() []byte {
buf := make([]byte, 8+16) // 8 bytes for counter + 16 bytes for NodeID
binary.BigEndian.PutUint64(buf[:8], ts.Counter)
copy(buf[8:], ts.Node[:])
return buf
}
// TimestampFromBytes deserializes a timestamp from a byte array.
func TimestampFromBytes(data []byte) (Timestamp, error) {
if len(data) < 24 {
return Timestamp{}, fmt.Errorf("invalid timestamp data: expected 24 bytes, got %d", len(data))
}
ts := Timestamp{
Counter: binary.BigEndian.Uint64(data[:8]),
}
copy(ts.Node[:], data[8:24])
return ts, nil
}
// WithPhysicalTimestamp creates a hybrid logical clock timestamp that incorporates
// both the logical counter and a physical timestamp for enhanced ordering.
func WithPhysicalTimestamp(ts Timestamp) HybridTimestamp {
return HybridTimestamp{
Logical: ts,
Physical: time.Now().UnixNano(),
}
}
// HybridTimestamp combines a logical Lamport timestamp with a physical clock timestamp.
// This provides both the causal ordering benefits of Lamport clocks and the
// real-time approximation of physical clocks.
type HybridTimestamp struct {
Logical Timestamp
Physical int64 // nanoseconds since Unix epoch
}
// String returns a human-readable representation of the hybrid timestamp.
func (hts HybridTimestamp) String() string {
return fmt.Sprintf("%s@%d", hts.Logical.String(), hts.Physical)
}