fix: fix the WAL is closed errors on rotation in tight loops, increase durability
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Failing after 5m5s

This commit is contained in:
Jeremy Tregunna 2025-04-26 00:15:50 -06:00
parent 7e226825df
commit ae75f2935f
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
3 changed files with 165 additions and 82 deletions

View File

@ -184,6 +184,7 @@ func runWriteBenchmark(e *engine.EngineFacade) string {
var opsCount int
var consecutiveErrors int
maxConsecutiveErrors := 10
var walRotationCount int
for time.Now().Before(deadline) {
// Process in batches
@ -200,6 +201,19 @@ func runWriteBenchmark(e *engine.EngineFacade) string {
continue
}
// Handle WAL rotation errors more gracefully
if strings.Contains(err.Error(), "WAL is rotating") ||
strings.Contains(err.Error(), "WAL is closed") {
// These are expected during WAL rotation, just retry after a short delay
walRotationCount++
if walRotationCount % 100 == 0 {
fmt.Printf("Retrying due to WAL rotation (%d retries so far)...\n", walRotationCount)
}
time.Sleep(20 * time.Millisecond)
i-- // Retry this key
continue
}
fmt.Fprintf(os.Stderr, "Write error (key #%d): %v\n", opsCount, err)
consecutiveErrors++
if consecutiveErrors >= maxConsecutiveErrors {
@ -233,6 +247,7 @@ benchmarkEnd:
result := fmt.Sprintf("\nWrite Benchmark Results:")
result += fmt.Sprintf("\n Status: %s", status)
result += fmt.Sprintf("\n Operations: %d", opsCount)
result += fmt.Sprintf("\n WAL rotation retries: %d", walRotationCount)
result += fmt.Sprintf("\n Data Written: %.2f MB", float64(opsCount)*float64(*valueSize)/(1024*1024))
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
result += fmt.Sprintf("\n Throughput: %.2f ops/sec (%.2f MB/sec)", opsPerSecond, mbPerSecond)
@ -524,4 +539,4 @@ func generateKey(counter int) []byte {
// Random key with counter to ensure uniqueness
return []byte(fmt.Sprintf("key-%s-%010d",
strconv.FormatUint(rand.Uint64(), 16), counter))
}
}

View File

@ -150,29 +150,38 @@ func (m *Manager) Put(key, value []byte) error {
return ErrStorageClosed
}
// Append to WAL
seqNum, err := m.wal.Append(wal.OpTypePut, key, value)
if err != nil {
m.stats.TrackError("wal_append_error")
return fmt.Errorf("failed to append to WAL: %w", err)
}
// Add to MemTable
m.memTablePool.Put(key, value, seqNum)
m.lastSeqNum = seqNum
// Update memtable size estimate
m.stats.TrackMemTableSize(uint64(m.memTablePool.TotalSize()))
// Check if MemTable needs to be flushed
if m.memTablePool.IsFlushNeeded() {
if err := m.scheduleFlush(); err != nil {
m.stats.TrackError("flush_schedule_error")
return fmt.Errorf("failed to schedule flush: %w", err)
// Define the operation with retry support
operation := func() error {
// Append to WAL with retry support
seqNum, err := m.wal.Append(wal.OpTypePut, key, value)
if err != nil {
if err != wal.ErrWALRotating {
m.stats.TrackError("wal_append_error")
return fmt.Errorf("failed to append to WAL: %w", err)
}
return err // Return ErrWALRotating for retry handling
}
// Add to MemTable
m.memTablePool.Put(key, value, seqNum)
m.lastSeqNum = seqNum
// Update memtable size estimate
m.stats.TrackMemTableSize(uint64(m.memTablePool.TotalSize()))
// Check if MemTable needs to be flushed
if m.memTablePool.IsFlushNeeded() {
if flushErr := m.scheduleFlush(); flushErr != nil {
m.stats.TrackError("flush_schedule_error")
return fmt.Errorf("failed to schedule flush: %w", flushErr)
}
}
return nil
}
return nil
// Execute with retry mechanism
return m.RetryOnWALRotating(operation)
}
// Get retrieves the value for the given key
@ -234,29 +243,38 @@ func (m *Manager) Delete(key []byte) error {
return ErrStorageClosed
}
// Append to WAL
seqNum, err := m.wal.Append(wal.OpTypeDelete, key, nil)
if err != nil {
m.stats.TrackError("wal_append_error")
return fmt.Errorf("failed to append to WAL: %w", err)
}
// Add deletion marker to MemTable
m.memTablePool.Delete(key, seqNum)
m.lastSeqNum = seqNum
// Update memtable size estimate
m.stats.TrackMemTableSize(uint64(m.memTablePool.TotalSize()))
// Check if MemTable needs to be flushed
if m.memTablePool.IsFlushNeeded() {
if err := m.scheduleFlush(); err != nil {
m.stats.TrackError("flush_schedule_error")
return fmt.Errorf("failed to schedule flush: %w", err)
// Define the operation with retry support
operation := func() error {
// Append to WAL with retry support
seqNum, err := m.wal.Append(wal.OpTypeDelete, key, nil)
if err != nil {
if err != wal.ErrWALRotating {
m.stats.TrackError("wal_append_error")
return fmt.Errorf("failed to append to WAL: %w", err)
}
return err // Return ErrWALRotating for retry handling
}
// Add deletion marker to MemTable
m.memTablePool.Delete(key, seqNum)
m.lastSeqNum = seqNum
// Update memtable size estimate
m.stats.TrackMemTableSize(uint64(m.memTablePool.TotalSize()))
// Check if MemTable needs to be flushed
if m.memTablePool.IsFlushNeeded() {
if flushErr := m.scheduleFlush(); flushErr != nil {
m.stats.TrackError("flush_schedule_error")
return fmt.Errorf("failed to schedule flush: %w", flushErr)
}
}
return nil
}
return nil
// Execute with retry mechanism
return m.RetryOnWALRotating(operation)
}
// IsDeleted returns true if the key exists and is marked as deleted
@ -339,39 +357,48 @@ func (m *Manager) ApplyBatch(entries []*wal.Entry) error {
return ErrStorageClosed
}
// Append batch to WAL
startSeqNum, err := m.wal.AppendBatch(entries)
if err != nil {
m.stats.TrackError("wal_append_batch_error")
return fmt.Errorf("failed to append batch to WAL: %w", err)
}
// Apply each entry to the MemTable
for i, entry := range entries {
seqNum := startSeqNum + uint64(i)
switch entry.Type {
case wal.OpTypePut:
m.memTablePool.Put(entry.Key, entry.Value, seqNum)
case wal.OpTypeDelete:
m.memTablePool.Delete(entry.Key, seqNum)
// Define the operation with retry support
operation := func() error {
// Append batch to WAL with retry support
startSeqNum, err := m.wal.AppendBatch(entries)
if err != nil {
if err != wal.ErrWALRotating {
m.stats.TrackError("wal_append_batch_error")
return fmt.Errorf("failed to append batch to WAL: %w", err)
}
return err // Return ErrWALRotating for retry handling
}
m.lastSeqNum = seqNum
}
// Apply each entry to the MemTable
for i, entry := range entries {
seqNum := startSeqNum + uint64(i)
// Update memtable size
m.stats.TrackMemTableSize(uint64(m.memTablePool.TotalSize()))
switch entry.Type {
case wal.OpTypePut:
m.memTablePool.Put(entry.Key, entry.Value, seqNum)
case wal.OpTypeDelete:
m.memTablePool.Delete(entry.Key, seqNum)
}
// Check if MemTable needs to be flushed
if m.memTablePool.IsFlushNeeded() {
if err := m.scheduleFlush(); err != nil {
m.stats.TrackError("flush_schedule_error")
return fmt.Errorf("failed to schedule flush: %w", err)
m.lastSeqNum = seqNum
}
// Update memtable size
m.stats.TrackMemTableSize(uint64(m.memTablePool.TotalSize()))
// Check if MemTable needs to be flushed
if m.memTablePool.IsFlushNeeded() {
if flushErr := m.scheduleFlush(); flushErr != nil {
m.stats.TrackError("flush_schedule_error")
return fmt.Errorf("failed to schedule flush: %w", flushErr)
}
}
return nil
}
return nil
// Execute with retry mechanism
return m.RetryOnWALRotating(operation)
}
// FlushMemTables flushes all immutable MemTables to disk
@ -501,18 +528,26 @@ func (m *Manager) RotateWAL() error {
// rotateWAL is the internal implementation of RotateWAL
func (m *Manager) rotateWAL() error {
// Close the current WAL
if err := m.wal.Close(); err != nil {
return fmt.Errorf("failed to close WAL: %w", err)
}
// Create a new WAL
wal, err := wal.NewWAL(m.cfg, m.walDir)
// Create a new WAL first before closing the old one
newWAL, err := wal.NewWAL(m.cfg, m.walDir)
if err != nil {
return fmt.Errorf("failed to create new WAL: %w", err)
}
m.wal = wal
// Store the old WAL for proper closure
oldWAL := m.wal
// Atomically update the WAL reference
m.wal = newWAL
// Now close the old WAL after the new one is in place
if err := oldWAL.Close(); err != nil {
// Just log the error but don't fail the rotation
// since we've already switched to the new WAL
m.stats.TrackError("wal_close_error")
fmt.Printf("Warning: error closing old WAL: %v\n", err)
}
return nil
}

View File

@ -45,6 +45,7 @@ var (
ErrInvalidRecordType = errors.New("invalid record type")
ErrInvalidOpType = errors.New("invalid operation type")
ErrWALClosed = errors.New("WAL is closed")
ErrWALRotating = errors.New("WAL is rotating")
ErrWALFull = errors.New("WAL file is full")
)
@ -59,6 +60,13 @@ type Entry struct {
// Global variable to control whether to print recovery logs
var DisableRecoveryLogs bool = false
// WAL status constants
const (
WALStatusActive = 0
WALStatusRotating = 1
WALStatusClosed = 2
)
// WAL represents a write-ahead log
type WAL struct {
cfg *config.Config
@ -69,7 +77,7 @@ type WAL struct {
bytesWritten int64
lastSync time.Time
batchByteSize int64
closed bool
status int32 // Using atomic int32 for status flags
mu sync.Mutex
}
@ -99,6 +107,7 @@ func NewWAL(cfg *config.Config, dir string) (*WAL, error) {
writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer
nextSequence: 1,
lastSync: time.Now(),
status: WALStatusActive,
}
return wal, nil
@ -169,6 +178,7 @@ func ReuseWAL(cfg *config.Config, dir string, nextSeq uint64) (*WAL, error) {
nextSequence: nextSeq,
bytesWritten: stat.Size(),
lastSync: time.Now(),
status: WALStatusActive,
}
return wal, nil
@ -179,8 +189,11 @@ func (w *WAL) Append(entryType uint8, key, value []byte) (uint64, error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
status := atomic.LoadInt32(&w.status)
if status == WALStatusClosed {
return 0, ErrWALClosed
} else if status == WALStatusRotating {
return 0, ErrWALRotating
}
if entryType != OpTypePut && entryType != OpTypeDelete && entryType != OpTypeMerge {
@ -409,8 +422,11 @@ func (w *WAL) maybeSync() error {
// syncLocked performs the sync operation assuming the mutex is already held
func (w *WAL) syncLocked() error {
if w.closed {
status := atomic.LoadInt32(&w.status)
if status == WALStatusClosed {
return ErrWALClosed
} else if status == WALStatusRotating {
return ErrWALRotating
}
if err := w.writer.Flush(); err != nil {
@ -440,8 +456,11 @@ func (w *WAL) AppendBatch(entries []*Entry) (uint64, error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
status := atomic.LoadInt32(&w.status)
if status == WALStatusClosed {
return 0, ErrWALClosed
} else if status == WALStatusRotating {
return 0, ErrWALRotating
}
if len(entries) == 0 {
@ -506,12 +525,16 @@ func (w *WAL) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
status := atomic.LoadInt32(&w.status)
if status == WALStatusClosed {
return nil
}
// Mark as rotating first to block new operations
atomic.StoreInt32(&w.status, WALStatusRotating)
// Use syncLocked to flush and sync
if err := w.syncLocked(); err != nil {
if err := w.syncLocked(); err != nil && err != ErrWALRotating {
return err
}
@ -519,10 +542,20 @@ func (w *WAL) Close() error {
return fmt.Errorf("failed to close WAL file: %w", err)
}
w.closed = true
atomic.StoreInt32(&w.status, WALStatusClosed)
return nil
}
// SetRotating marks the WAL as rotating
func (w *WAL) SetRotating() {
atomic.StoreInt32(&w.status, WALStatusRotating)
}
// SetActive marks the WAL as active
func (w *WAL) SetActive() {
atomic.StoreInt32(&w.status, WALStatusActive)
}
// UpdateNextSequence sets the next sequence number for the WAL
// This is used after recovery to ensure new entries have increasing sequence numbers
func (w *WAL) UpdateNextSequence(nextSeq uint64) {