Compare commits
No commits in common. "c0bfd835f75595bd159ebe364a84ca2c867cff07" and "9fb40779c7389ac41d56d1bf57154c0c2f6f194f" have entirely different histories.
c0bfd835f7
...
9fb40779c7
@ -1,45 +0,0 @@
|
|||||||
package replication
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// LamportClock implements a logical clock based on Lamport timestamps
|
|
||||||
// for maintaining causal ordering of events in a distributed system.
|
|
||||||
type LamportClock struct {
|
|
||||||
counter uint64
|
|
||||||
mu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLamportClock creates a new LamportClock instance
|
|
||||||
func NewLamportClock() *LamportClock {
|
|
||||||
return &LamportClock{counter: 0}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tick increments the clock and returns the new timestamp value
|
|
||||||
func (c *LamportClock) Tick() uint64 {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
c.counter++
|
|
||||||
return c.counter
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update updates the clock based on a received timestamp,
|
|
||||||
// ensuring the local clock is at least as large as the received timestamp,
|
|
||||||
// then increments and returns the new value
|
|
||||||
func (c *LamportClock) Update(received uint64) uint64 {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
if received > c.counter {
|
|
||||||
c.counter = received
|
|
||||||
}
|
|
||||||
c.counter++
|
|
||||||
return c.counter
|
|
||||||
}
|
|
||||||
|
|
||||||
// Current returns the current timestamp without incrementing the clock
|
|
||||||
func (c *LamportClock) Current() uint64 {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
return c.counter
|
|
||||||
}
|
|
@ -1,90 +0,0 @@
|
|||||||
package replication
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestLamportClockTick(t *testing.T) {
|
|
||||||
clock := NewLamportClock()
|
|
||||||
|
|
||||||
// Initial tick should return 1
|
|
||||||
if ts := clock.Tick(); ts != 1 {
|
|
||||||
t.Errorf("First tick should return 1, got %d", ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second tick should return 2
|
|
||||||
if ts := clock.Tick(); ts != 2 {
|
|
||||||
t.Errorf("Second tick should return 2, got %d", ts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLamportClockUpdate(t *testing.T) {
|
|
||||||
clock := NewLamportClock()
|
|
||||||
|
|
||||||
// Update with lower value should increment
|
|
||||||
ts := clock.Update(0)
|
|
||||||
if ts != 1 {
|
|
||||||
t.Errorf("Update with lower value should return 1, got %d", ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update with same value should increment
|
|
||||||
ts = clock.Update(1)
|
|
||||||
if ts != 2 {
|
|
||||||
t.Errorf("Update with same value should return 2, got %d", ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update with higher value should use that value and increment
|
|
||||||
ts = clock.Update(10)
|
|
||||||
if ts != 11 {
|
|
||||||
t.Errorf("Update with higher value should return 11, got %d", ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subsequent tick should continue from updated value
|
|
||||||
ts = clock.Tick()
|
|
||||||
if ts != 12 {
|
|
||||||
t.Errorf("Tick after update should return 12, got %d", ts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLamportClockCurrent(t *testing.T) {
|
|
||||||
clock := NewLamportClock()
|
|
||||||
|
|
||||||
// Initial current should be 0
|
|
||||||
if ts := clock.Current(); ts != 0 {
|
|
||||||
t.Errorf("Initial current should be 0, got %d", ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// After tick, current should reflect new value
|
|
||||||
clock.Tick()
|
|
||||||
if ts := clock.Current(); ts != 1 {
|
|
||||||
t.Errorf("Current after tick should be 1, got %d", ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Current should not increment the clock
|
|
||||||
if ts := clock.Current(); ts != 1 {
|
|
||||||
t.Errorf("Multiple calls to Current should return same value, got %d", ts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLamportClockConcurrency(t *testing.T) {
|
|
||||||
clock := NewLamportClock()
|
|
||||||
iterations := 1000
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
// Run multiple goroutines calling Tick concurrently
|
|
||||||
wg.Add(iterations)
|
|
||||||
for i := 0; i < iterations; i++ {
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
clock.Tick()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// After iterations concurrent ticks, value should be iterations
|
|
||||||
if ts := clock.Current(); ts != uint64(iterations) {
|
|
||||||
t.Errorf("After %d concurrent ticks, expected value %d, got %d",
|
|
||||||
iterations, iterations, ts)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,26 +0,0 @@
|
|||||||
package wal
|
|
||||||
|
|
||||||
// LamportClock is an interface for a logical clock based on Lamport timestamps
|
|
||||||
// for maintaining causal ordering of events in a distributed system.
|
|
||||||
type LamportClock interface {
|
|
||||||
// Tick increments the clock and returns the new timestamp value
|
|
||||||
Tick() uint64
|
|
||||||
|
|
||||||
// Update updates the clock based on a received timestamp,
|
|
||||||
// ensuring the local clock is at least as large as the received timestamp,
|
|
||||||
// then increments and returns the new value
|
|
||||||
Update(received uint64) uint64
|
|
||||||
|
|
||||||
// Current returns the current timestamp without incrementing the clock
|
|
||||||
Current() uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplicationHook is an interface for capturing WAL entries for replication purposes.
|
|
||||||
// It provides hook points for the WAL to notify when entries are written.
|
|
||||||
type ReplicationHook interface {
|
|
||||||
// OnEntryWritten is called when a single WAL entry is written
|
|
||||||
OnEntryWritten(entry *Entry)
|
|
||||||
|
|
||||||
// OnBatchWritten is called when a batch of WAL entries is written
|
|
||||||
OnBatchWritten(entries []*Entry)
|
|
||||||
}
|
|
@ -1,356 +0,0 @@
|
|||||||
package wal
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/KevoDB/kevo/pkg/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MockLamportClock implements the LamportClock interface for testing
|
|
||||||
type MockLamportClock struct {
|
|
||||||
counter uint64
|
|
||||||
mu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMockLamportClock() *MockLamportClock {
|
|
||||||
return &MockLamportClock{counter: 0}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *MockLamportClock) Tick() uint64 {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
c.counter++
|
|
||||||
return c.counter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *MockLamportClock) Update(received uint64) uint64 {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
if received > c.counter {
|
|
||||||
c.counter = received
|
|
||||||
}
|
|
||||||
c.counter++
|
|
||||||
return c.counter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *MockLamportClock) Current() uint64 {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
return c.counter
|
|
||||||
}
|
|
||||||
|
|
||||||
// MockReplicationHook implements the ReplicationHook interface for testing
|
|
||||||
type MockReplicationHook struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
entries []*Entry
|
|
||||||
batchEntries [][]*Entry
|
|
||||||
entriesReceived int
|
|
||||||
batchesReceived int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockReplicationHook) OnEntryWritten(entry *Entry) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
// Make a deep copy of the entry to ensure tests are not affected by later modifications
|
|
||||||
entryCopy := &Entry{
|
|
||||||
SequenceNumber: entry.SequenceNumber,
|
|
||||||
Type: entry.Type,
|
|
||||||
Key: append([]byte{}, entry.Key...),
|
|
||||||
}
|
|
||||||
if entry.Value != nil {
|
|
||||||
entryCopy.Value = append([]byte{}, entry.Value...)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.entries = append(m.entries, entryCopy)
|
|
||||||
m.entriesReceived++
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockReplicationHook) OnBatchWritten(entries []*Entry) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
// Make a deep copy of all entries
|
|
||||||
entriesCopy := make([]*Entry, len(entries))
|
|
||||||
for i, entry := range entries {
|
|
||||||
entriesCopy[i] = &Entry{
|
|
||||||
SequenceNumber: entry.SequenceNumber,
|
|
||||||
Type: entry.Type,
|
|
||||||
Key: append([]byte{}, entry.Key...),
|
|
||||||
}
|
|
||||||
if entry.Value != nil {
|
|
||||||
entriesCopy[i].Value = append([]byte{}, entry.Value...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
m.batchEntries = append(m.batchEntries, entriesCopy)
|
|
||||||
m.batchesReceived++
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockReplicationHook) GetEntries() []*Entry {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
return m.entries
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockReplicationHook) GetBatchEntries() [][]*Entry {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
return m.batchEntries
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockReplicationHook) GetStats() (int, int) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
return m.entriesReceived, m.batchesReceived
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWALReplicationHook(t *testing.T) {
|
|
||||||
// Create a temporary directory for the WAL
|
|
||||||
dir, err := os.MkdirTemp("", "wal_replication_test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create temp dir: %v", err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
|
|
||||||
// Create a mock replication hook
|
|
||||||
hook := &MockReplicationHook{}
|
|
||||||
|
|
||||||
// Create a Lamport clock
|
|
||||||
clock := NewMockLamportClock()
|
|
||||||
|
|
||||||
// Create a WAL with the replication hook
|
|
||||||
cfg := config.NewDefaultConfig(dir)
|
|
||||||
wal, err := NewWALWithReplication(cfg, dir, clock, hook)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create WAL: %v", err)
|
|
||||||
}
|
|
||||||
defer wal.Close()
|
|
||||||
|
|
||||||
// Test single entry writes
|
|
||||||
key1 := []byte("key1")
|
|
||||||
value1 := []byte("value1")
|
|
||||||
|
|
||||||
seq1, err := wal.Append(OpTypePut, key1, value1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to append to WAL: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that the hook received the entry
|
|
||||||
entries := hook.GetEntries()
|
|
||||||
if len(entries) != 1 {
|
|
||||||
t.Fatalf("Expected 1 entry, got %d", len(entries))
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := entries[0]
|
|
||||||
if entry.SequenceNumber != seq1 {
|
|
||||||
t.Errorf("Expected sequence number %d, got %d", seq1, entry.SequenceNumber)
|
|
||||||
}
|
|
||||||
if entry.Type != OpTypePut {
|
|
||||||
t.Errorf("Expected type %d, got %d", OpTypePut, entry.Type)
|
|
||||||
}
|
|
||||||
if string(entry.Key) != string(key1) {
|
|
||||||
t.Errorf("Expected key %q, got %q", key1, entry.Key)
|
|
||||||
}
|
|
||||||
if string(entry.Value) != string(value1) {
|
|
||||||
t.Errorf("Expected value %q, got %q", value1, entry.Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test batch writes
|
|
||||||
key2 := []byte("key2")
|
|
||||||
value2 := []byte("value2")
|
|
||||||
key3 := []byte("key3")
|
|
||||||
value3 := []byte("value3")
|
|
||||||
|
|
||||||
batchEntries := []*Entry{
|
|
||||||
{Type: OpTypePut, Key: key2, Value: value2},
|
|
||||||
{Type: OpTypePut, Key: key3, Value: value3},
|
|
||||||
}
|
|
||||||
|
|
||||||
batchSeq, err := wal.AppendBatch(batchEntries)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to append batch to WAL: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that the hook received the batch
|
|
||||||
batches := hook.GetBatchEntries()
|
|
||||||
if len(batches) != 1 {
|
|
||||||
t.Fatalf("Expected 1 batch, got %d", len(batches))
|
|
||||||
}
|
|
||||||
|
|
||||||
batch := batches[0]
|
|
||||||
if len(batch) != 2 {
|
|
||||||
t.Fatalf("Expected 2 entries in batch, got %d", len(batch))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check first entry in batch
|
|
||||||
if batch[0].SequenceNumber != batchSeq {
|
|
||||||
t.Errorf("Expected sequence number %d, got %d", batchSeq, batch[0].SequenceNumber)
|
|
||||||
}
|
|
||||||
if batch[0].Type != OpTypePut {
|
|
||||||
t.Errorf("Expected type %d, got %d", OpTypePut, batch[0].Type)
|
|
||||||
}
|
|
||||||
if string(batch[0].Key) != string(key2) {
|
|
||||||
t.Errorf("Expected key %q, got %q", key2, batch[0].Key)
|
|
||||||
}
|
|
||||||
if string(batch[0].Value) != string(value2) {
|
|
||||||
t.Errorf("Expected value %q, got %q", value2, batch[0].Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check second entry in batch
|
|
||||||
if batch[1].SequenceNumber != batchSeq+1 {
|
|
||||||
t.Errorf("Expected sequence number %d, got %d", batchSeq+1, batch[1].SequenceNumber)
|
|
||||||
}
|
|
||||||
if batch[1].Type != OpTypePut {
|
|
||||||
t.Errorf("Expected type %d, got %d", OpTypePut, batch[1].Type)
|
|
||||||
}
|
|
||||||
if string(batch[1].Key) != string(key3) {
|
|
||||||
t.Errorf("Expected key %q, got %q", key3, batch[1].Key)
|
|
||||||
}
|
|
||||||
if string(batch[1].Value) != string(value3) {
|
|
||||||
t.Errorf("Expected value %q, got %q", value3, batch[1].Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check call counts
|
|
||||||
entriesReceived, batchesReceived := hook.GetStats()
|
|
||||||
if entriesReceived != 1 {
|
|
||||||
t.Errorf("Expected 1 single entry received, got %d", entriesReceived)
|
|
||||||
}
|
|
||||||
if batchesReceived != 1 {
|
|
||||||
t.Errorf("Expected 1 batch received, got %d", batchesReceived)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWALWithLamportClock(t *testing.T) {
|
|
||||||
// Create a temporary directory for the WAL
|
|
||||||
dir, err := os.MkdirTemp("", "wal_lamport_test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create temp dir: %v", err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
|
|
||||||
// Create a Lamport clock
|
|
||||||
clock := NewMockLamportClock()
|
|
||||||
|
|
||||||
// Create a WAL with the Lamport clock but no hook
|
|
||||||
cfg := config.NewDefaultConfig(dir)
|
|
||||||
wal, err := NewWALWithReplication(cfg, dir, clock, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create WAL: %v", err)
|
|
||||||
}
|
|
||||||
defer wal.Close()
|
|
||||||
|
|
||||||
// Pre-tick the clock a few times to simulate a distributed system
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
clock.Tick()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Current clock value should be 5
|
|
||||||
if clock.Current() != 5 {
|
|
||||||
t.Fatalf("Expected clock value 5, got %d", clock.Current())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that the WAL uses the Lamport clock for sequence numbers
|
|
||||||
key1 := []byte("key1")
|
|
||||||
value1 := []byte("value1")
|
|
||||||
|
|
||||||
seq1, err := wal.Append(OpTypePut, key1, value1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to append to WAL: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sequence number should be 6 (previous 5 + 1 for this operation)
|
|
||||||
if seq1 != 6 {
|
|
||||||
t.Errorf("Expected sequence number 6, got %d", seq1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clock should have incremented
|
|
||||||
if clock.Current() != 6 {
|
|
||||||
t.Errorf("Expected clock value 6, got %d", clock.Current())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with a batch
|
|
||||||
entries := []*Entry{
|
|
||||||
{Type: OpTypePut, Key: []byte("key2"), Value: []byte("value2")},
|
|
||||||
{Type: OpTypePut, Key: []byte("key3"), Value: []byte("value3")},
|
|
||||||
}
|
|
||||||
|
|
||||||
batchSeq, err := wal.AppendBatch(entries)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to append batch to WAL: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Batch sequence should be 7
|
|
||||||
if batchSeq != 7 {
|
|
||||||
t.Errorf("Expected batch sequence number 7, got %d", batchSeq)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clock should have incremented again
|
|
||||||
if clock.Current() != 7 {
|
|
||||||
t.Errorf("Expected clock value 7, got %d", clock.Current())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWALHookAfterCreation(t *testing.T) {
|
|
||||||
// Create a temporary directory for the WAL
|
|
||||||
dir, err := os.MkdirTemp("", "wal_hook_after_test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create temp dir: %v", err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
|
|
||||||
// Create a WAL without hook initially
|
|
||||||
cfg := config.NewDefaultConfig(dir)
|
|
||||||
wal, err := NewWAL(cfg, dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create WAL: %v", err)
|
|
||||||
}
|
|
||||||
defer wal.Close()
|
|
||||||
|
|
||||||
// Write an entry before adding a hook
|
|
||||||
key1 := []byte("key1")
|
|
||||||
value1 := []byte("value1")
|
|
||||||
|
|
||||||
_, err = wal.Append(OpTypePut, key1, value1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to append to WAL: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create and add a hook after the fact
|
|
||||||
hook := &MockReplicationHook{}
|
|
||||||
wal.SetReplicationHook(hook)
|
|
||||||
|
|
||||||
// Create and add a Lamport clock after the fact
|
|
||||||
clock := NewMockLamportClock()
|
|
||||||
wal.SetLamportClock(clock)
|
|
||||||
|
|
||||||
// Write another entry, this should trigger the hook
|
|
||||||
key2 := []byte("key2")
|
|
||||||
value2 := []byte("value2")
|
|
||||||
|
|
||||||
seq2, err := wal.Append(OpTypePut, key2, value2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to append to WAL: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify hook received the entry
|
|
||||||
entries := hook.GetEntries()
|
|
||||||
if len(entries) != 1 {
|
|
||||||
t.Fatalf("Expected 1 entry in hook, got %d", len(entries))
|
|
||||||
}
|
|
||||||
|
|
||||||
if entries[0].SequenceNumber != seq2 {
|
|
||||||
t.Errorf("Expected sequence number %d, got %d", seq2, entries[0].SequenceNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(entries[0].Key) != string(key2) {
|
|
||||||
t.Errorf("Expected key %q, got %q", key2, entries[0].Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify the clock was used
|
|
||||||
if seq2 != 1 { // First tick of the clock
|
|
||||||
t.Errorf("Expected sequence from clock to be 1, got %d", seq2)
|
|
||||||
}
|
|
||||||
}
|
|
112
pkg/wal/wal.go
112
pkg/wal/wal.go
@ -81,19 +81,10 @@ type WAL struct {
|
|||||||
status int32 // Using atomic int32 for status flags
|
status int32 // Using atomic int32 for status flags
|
||||||
closed int32 // Atomic flag indicating if WAL is closed
|
closed int32 // Atomic flag indicating if WAL is closed
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
// Replication support
|
|
||||||
clock LamportClock // Lamport clock for logical timestamps
|
|
||||||
replicationHook ReplicationHook // Hook for replication events
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWAL creates a new write-ahead log
|
// NewWAL creates a new write-ahead log
|
||||||
func NewWAL(cfg *config.Config, dir string) (*WAL, error) {
|
func NewWAL(cfg *config.Config, dir string) (*WAL, error) {
|
||||||
return NewWALWithReplication(cfg, dir, nil, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWALWithReplication creates a new write-ahead log with replication support
|
|
||||||
func NewWALWithReplication(cfg *config.Config, dir string, clock LamportClock, hook ReplicationHook) (*WAL, error) {
|
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
return nil, errors.New("config cannot be nil")
|
return nil, errors.New("config cannot be nil")
|
||||||
}
|
}
|
||||||
@ -112,15 +103,13 @@ func NewWALWithReplication(cfg *config.Config, dir string, clock LamportClock, h
|
|||||||
}
|
}
|
||||||
|
|
||||||
wal := &WAL{
|
wal := &WAL{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
dir: dir,
|
dir: dir,
|
||||||
file: file,
|
file: file,
|
||||||
writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer
|
writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer
|
||||||
nextSequence: 1,
|
nextSequence: 1,
|
||||||
lastSync: time.Now(),
|
lastSync: time.Now(),
|
||||||
status: WALStatusActive,
|
status: WALStatusActive,
|
||||||
clock: clock,
|
|
||||||
replicationHook: hook,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return wal, nil
|
return wal, nil
|
||||||
@ -129,12 +118,6 @@ func NewWALWithReplication(cfg *config.Config, dir string, clock LamportClock, h
|
|||||||
// ReuseWAL attempts to reuse an existing WAL file for appending
|
// ReuseWAL attempts to reuse an existing WAL file for appending
|
||||||
// Returns nil, nil if no suitable WAL file is found
|
// Returns nil, nil if no suitable WAL file is found
|
||||||
func ReuseWAL(cfg *config.Config, dir string, nextSeq uint64) (*WAL, error) {
|
func ReuseWAL(cfg *config.Config, dir string, nextSeq uint64) (*WAL, error) {
|
||||||
return ReuseWALWithReplication(cfg, dir, nextSeq, nil, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReuseWALWithReplication attempts to reuse an existing WAL file for appending with replication support
|
|
||||||
// Returns nil, nil if no suitable WAL file is found
|
|
||||||
func ReuseWALWithReplication(cfg *config.Config, dir string, nextSeq uint64, clock LamportClock, hook ReplicationHook) (*WAL, error) {
|
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
return nil, errors.New("config cannot be nil")
|
return nil, errors.New("config cannot be nil")
|
||||||
}
|
}
|
||||||
@ -190,16 +173,14 @@ func ReuseWALWithReplication(cfg *config.Config, dir string, nextSeq uint64, clo
|
|||||||
}
|
}
|
||||||
|
|
||||||
wal := &WAL{
|
wal := &WAL{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
dir: dir,
|
dir: dir,
|
||||||
file: file,
|
file: file,
|
||||||
writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer
|
writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer
|
||||||
nextSequence: nextSeq,
|
nextSequence: nextSeq,
|
||||||
bytesWritten: stat.Size(),
|
bytesWritten: stat.Size(),
|
||||||
lastSync: time.Now(),
|
lastSync: time.Now(),
|
||||||
status: WALStatusActive,
|
status: WALStatusActive,
|
||||||
clock: clock,
|
|
||||||
replicationHook: hook,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return wal, nil
|
return wal, nil
|
||||||
@ -221,16 +202,9 @@ func (w *WAL) Append(entryType uint8, key, value []byte) (uint64, error) {
|
|||||||
return 0, ErrInvalidOpType
|
return 0, ErrInvalidOpType
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sequence number for this entry - use Lamport clock if available
|
// Sequence number for this entry
|
||||||
var seqNum uint64
|
seqNum := w.nextSequence
|
||||||
if w.clock != nil {
|
w.nextSequence++
|
||||||
// Generate Lamport timestamp (reusing SequenceNumber field)
|
|
||||||
seqNum = w.clock.Tick()
|
|
||||||
} else {
|
|
||||||
// Use traditional sequence number
|
|
||||||
seqNum = w.nextSequence
|
|
||||||
}
|
|
||||||
w.nextSequence = seqNum + 1
|
|
||||||
|
|
||||||
// Encode the entry
|
// Encode the entry
|
||||||
// Format: type(1) + seq(8) + keylen(4) + key + vallen(4) + val
|
// Format: type(1) + seq(8) + keylen(4) + key + vallen(4) + val
|
||||||
@ -257,17 +231,6 @@ func (w *WAL) Append(entryType uint8, key, value []byte) (uint64, error) {
|
|||||||
if err := w.maybeSync(); err != nil {
|
if err := w.maybeSync(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify replication hook if available
|
|
||||||
if w.replicationHook != nil {
|
|
||||||
entry := &Entry{
|
|
||||||
SequenceNumber: seqNum, // This now represents the Lamport timestamp
|
|
||||||
Type: entryType,
|
|
||||||
Key: key,
|
|
||||||
Value: value,
|
|
||||||
}
|
|
||||||
w.replicationHook.OnEntryWritten(entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
return seqNum, nil
|
return seqNum, nil
|
||||||
}
|
}
|
||||||
@ -506,15 +469,8 @@ func (w *WAL) AppendBatch(entries []*Entry) (uint64, error) {
|
|||||||
return w.nextSequence, nil
|
return w.nextSequence, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start sequence number for the batch - use Lamport clock if available
|
// Start sequence number for the batch
|
||||||
var startSeqNum uint64
|
startSeqNum := w.nextSequence
|
||||||
if w.clock != nil {
|
|
||||||
// Generate Lamport timestamp for the batch
|
|
||||||
startSeqNum = w.clock.Tick()
|
|
||||||
} else {
|
|
||||||
// Use traditional sequence number
|
|
||||||
startSeqNum = w.nextSequence
|
|
||||||
}
|
|
||||||
|
|
||||||
// Record this as a batch operation with the number of entries
|
// Record this as a batch operation with the number of entries
|
||||||
batchHeader := make([]byte, 1+8+4) // opType(1) + seqNum(8) + entryCount(4)
|
batchHeader := make([]byte, 1+8+4) // opType(1) + seqNum(8) + entryCount(4)
|
||||||
@ -536,17 +492,10 @@ func (w *WAL) AppendBatch(entries []*Entry) (uint64, error) {
|
|||||||
return 0, fmt.Errorf("failed to write batch header: %w", err)
|
return 0, fmt.Errorf("failed to write batch header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare entries for replication notification
|
|
||||||
entriesForReplication := make([]*Entry, len(entries))
|
|
||||||
|
|
||||||
// Process each entry in the batch
|
// Process each entry in the batch
|
||||||
for i, entry := range entries {
|
for i, entry := range entries {
|
||||||
// Assign sequential sequence numbers to each entry
|
// Assign sequential sequence numbers to each entry
|
||||||
seqNum := startSeqNum + uint64(i)
|
seqNum := startSeqNum + uint64(i)
|
||||||
|
|
||||||
// Save sequence number in the entry for replication
|
|
||||||
entry.SequenceNumber = seqNum
|
|
||||||
entriesForReplication[i] = entry
|
|
||||||
|
|
||||||
// Write the entry
|
// Write the entry
|
||||||
if entry.Value == nil {
|
if entry.Value == nil {
|
||||||
@ -569,11 +518,6 @@ func (w *WAL) AppendBatch(entries []*Entry) (uint64, error) {
|
|||||||
if err := w.maybeSync(); err != nil {
|
if err := w.maybeSync(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify replication hook if available
|
|
||||||
if w.replicationHook != nil {
|
|
||||||
w.replicationHook.OnBatchWritten(entriesForReplication)
|
|
||||||
}
|
|
||||||
|
|
||||||
return startSeqNum, nil
|
return startSeqNum, nil
|
||||||
}
|
}
|
||||||
@ -625,22 +569,6 @@ func (w *WAL) UpdateNextSequence(nextSeq uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetReplicationHook sets or updates the replication hook
|
|
||||||
func (w *WAL) SetReplicationHook(hook ReplicationHook) {
|
|
||||||
w.mu.Lock()
|
|
||||||
defer w.mu.Unlock()
|
|
||||||
|
|
||||||
w.replicationHook = hook
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLamportClock sets or updates the Lamport clock
|
|
||||||
func (w *WAL) SetLamportClock(clock LamportClock) {
|
|
||||||
w.mu.Lock()
|
|
||||||
defer w.mu.Unlock()
|
|
||||||
|
|
||||||
w.clock = clock
|
|
||||||
}
|
|
||||||
|
|
||||||
func min(a, b int) int {
|
func min(a, b int) int {
|
||||||
if a < b {
|
if a < b {
|
||||||
return a
|
return a
|
||||||
|
Loading…
Reference in New Issue
Block a user