feat: enhance wal recover statistics

This commit is contained in:
Jeremy Tregunna 2025-04-22 13:25:26 -06:00
parent 3b3d1c27a4
commit e7974e008d
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
6 changed files with 204 additions and 86 deletions

View File

@ -12,6 +12,7 @@ import (
"strings"
"syscall"
"time"
"unicode"
"github.com/chzyer/readline"
@ -411,15 +412,89 @@ func runInteractive(eng *engine.Engine, dbPath string) {
// Print statistics
stats := eng.GetStats()
fmt.Println("Database Statistics:")
fmt.Printf(" Operations: %d puts, %d gets (%d hits, %d misses), %d deletes\n",
stats["put_ops"], stats["get_ops"], stats["get_hits"], stats["get_misses"], stats["delete_ops"])
fmt.Printf(" Transactions: %d started, %d committed, %d aborted\n",
stats["tx_started"], stats["tx_completed"], stats["tx_aborted"])
fmt.Printf(" Storage: %d bytes read, %d bytes written, %d flushes\n",
stats["total_bytes_read"], stats["total_bytes_written"], stats["flush_count"])
fmt.Printf(" Tables: %d sstables, %d immutable memtables\n",
stats["sstable_count"], stats["immutable_memtable_count"])
// Format human-readable time for the last operation timestamps
var lastPutTime, lastGetTime, lastDeleteTime time.Time
if putTime, ok := stats["last_put_time"].(int64); ok && putTime > 0 {
lastPutTime = time.Unix(0, putTime)
}
if getTime, ok := stats["last_get_time"].(int64); ok && getTime > 0 {
lastGetTime = time.Unix(0, getTime)
}
if deleteTime, ok := stats["last_delete_time"].(int64); ok && deleteTime > 0 {
lastDeleteTime = time.Unix(0, deleteTime)
}
// Operations section
fmt.Println("📊 Operations:")
fmt.Printf(" • Puts: %d\n", stats["put_ops"])
fmt.Printf(" • Gets: %d (Hits: %d, Misses: %d)\n", stats["get_ops"], stats["get_hits"], stats["get_misses"])
fmt.Printf(" • Deletes: %d\n", stats["delete_ops"])
// Last Operation Times
fmt.Println("\n⏱ Last Operation Times:")
if !lastPutTime.IsZero() {
fmt.Printf(" • Last Put: %s\n", lastPutTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Put: Never\n")
}
if !lastGetTime.IsZero() {
fmt.Printf(" • Last Get: %s\n", lastGetTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Get: Never\n")
}
if !lastDeleteTime.IsZero() {
fmt.Printf(" • Last Delete: %s\n", lastDeleteTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Delete: Never\n")
}
// Transactions
fmt.Println("\n💼 Transactions:")
fmt.Printf(" • Started: %d\n", stats["tx_started"])
fmt.Printf(" • Completed: %d\n", stats["tx_completed"])
fmt.Printf(" • Aborted: %d\n", stats["tx_aborted"])
// Storage metrics
fmt.Println("\n💾 Storage:")
fmt.Printf(" • Total Bytes Read: %d\n", stats["total_bytes_read"])
fmt.Printf(" • Total Bytes Written: %d\n", stats["total_bytes_written"])
fmt.Printf(" • Flush Count: %d\n", stats["flush_count"])
// Table stats
fmt.Println("\n📋 Tables:")
fmt.Printf(" • SSTable Count: %d\n", stats["sstable_count"])
fmt.Printf(" • Immutable MemTable Count: %d\n", stats["immutable_memtable_count"])
fmt.Printf(" • Current MemTable Size: %d bytes\n", stats["memtable_size"])
// WAL recovery stats
fmt.Println("\n🔄 WAL Recovery:")
fmt.Printf(" • Files Recovered: %d\n", stats["wal_files_recovered"])
fmt.Printf(" • Entries Recovered: %d\n", stats["wal_entries_recovered"])
fmt.Printf(" • Corrupted Entries: %d\n", stats["wal_corrupted_entries"])
if recoveryDuration, ok := stats["wal_recovery_duration_ms"]; ok {
fmt.Printf(" • Recovery Duration: %d ms\n", recoveryDuration)
}
// Error counts
fmt.Println("\n⚠ Errors:")
fmt.Printf(" • Read Errors: %d\n", stats["read_errors"])
fmt.Printf(" • Write Errors: %d\n", stats["write_errors"])
// Compaction stats (if available)
if compactionOutputCount, ok := stats["compaction_last_outputs_count"]; ok {
fmt.Println("\n🧹 Compaction:")
fmt.Printf(" • Last Output Files Count: %d\n", compactionOutputCount)
// Display other compaction stats as available
for key, value := range stats {
if strings.HasPrefix(key, "compaction_") && key != "compaction_last_outputs_count" && key != "compaction_last_outputs" {
// Format the key for display (remove prefix, replace underscores with spaces)
displayKey := toTitle(strings.Replace(strings.TrimPrefix(key, "compaction_"), "_", " ", -1))
fmt.Printf(" • %s: %v\n", displayKey, value)
}
}
}
case ".flush":
if eng == nil {
@ -734,4 +809,20 @@ func makeKeySuccessor(prefix []byte) []byte {
copy(successor, prefix)
successor[len(prefix)] = 0xFF
return successor
}
// toTitle replaces strings.Title which is deprecated
// It converts the first character of each word to title case
func toTitle(s string) string {
prev := ' '
return strings.Map(
func(r rune) rune {
if unicode.IsSpace(prev) || unicode.IsPunct(prev) {
prev = r
return unicode.ToTitle(r)
}
prev = r
return r
},
s)
}

View File

@ -61,6 +61,12 @@ type EngineStats struct {
TxCompleted atomic.Uint64
TxAborted atomic.Uint64
// Recovery stats
WALFilesRecovered atomic.Uint64
WALEntriesRecovered atomic.Uint64
WALCorruptedEntries atomic.Uint64
WALRecoveryDuration atomic.Int64 // nanoseconds
// Mutex for accessing non-atomic fields
mu sync.RWMutex
}
@ -666,21 +672,22 @@ func (e *Engine) loadSSTables() error {
// recoverFromWAL recovers memtables from existing WAL files
func (e *Engine) recoverFromWAL() error {
startTime := time.Now()
// Check if WAL directory exists
if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
return nil // No WAL directory, nothing to recover
}
// List all WAL files for diagnostic purposes
// List all WAL files
walFiles, err := wal.FindWALFiles(e.walDir)
if err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Error listing WAL files: %v\n", err)
}
} else {
if !wal.DisableRecoveryLogs {
fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles)
}
e.stats.ReadErrors.Add(1)
return fmt.Errorf("error listing WAL files: %w", err)
}
if len(walFiles) > 0 {
e.stats.WALFilesRecovered.Add(uint64(len(walFiles)))
}
// Get recovery options
@ -690,17 +697,11 @@ func (e *Engine) recoverFromWAL() error {
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
if err != nil {
// If recovery fails, let's try cleaning up WAL files
if !wal.DisableRecoveryLogs {
fmt.Printf("WAL recovery failed: %v\n", err)
fmt.Printf("Attempting to recover by cleaning up WAL files...\n")
}
e.stats.ReadErrors.Add(1)
// Create a backup directory
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
if err := os.MkdirAll(backupDir, 0755); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to create backup directory: %v\n", err)
}
return fmt.Errorf("failed to recover from WAL: %w", err)
}
@ -708,11 +709,7 @@ func (e *Engine) recoverFromWAL() error {
for _, walFile := range walFiles {
destFile := filepath.Join(backupDir, filepath.Base(walFile))
if err := os.Rename(walFile, destFile); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err)
}
} else if !wal.DisableRecoveryLogs {
fmt.Printf("Moved problematic WAL file to %s\n", destFile)
e.stats.ReadErrors.Add(1)
}
}
@ -723,15 +720,28 @@ func (e *Engine) recoverFromWAL() error {
}
e.wal = newWal
// No memtables to recover, starting fresh
if !wal.DisableRecoveryLogs {
fmt.Printf("Starting with a fresh WAL after recovery failure\n")
}
// Record recovery duration
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil
}
// Update recovery statistics based on actual entries recovered
if len(walFiles) > 0 {
// Use WALDir function directly to get stats
recoveryStats, statErr := wal.ReplayWALDir(e.cfg.WALDir, func(entry *wal.Entry) error {
return nil // Just counting, not processing
})
if statErr == nil && recoveryStats != nil {
e.stats.WALEntriesRecovered.Add(recoveryStats.EntriesProcessed)
e.stats.WALCorruptedEntries.Add(recoveryStats.EntriesSkipped)
}
}
// No memtables recovered or empty WAL
if len(memTables) == 0 {
// Record recovery duration
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil
}
@ -755,10 +765,9 @@ func (e *Engine) recoverFromWAL() error {
}
}
if !wal.DisableRecoveryLogs {
fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n",
len(memTables), maxSeqNum)
}
// Record recovery stats
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil
}
@ -924,6 +933,15 @@ func (e *Engine) GetStats() map[string]interface{} {
// Add error statistics
stats["read_errors"] = e.stats.ReadErrors.Load()
stats["write_errors"] = e.stats.WriteErrors.Load()
// Add WAL recovery statistics
stats["wal_files_recovered"] = e.stats.WALFilesRecovered.Load()
stats["wal_entries_recovered"] = e.stats.WALEntriesRecovered.Load()
stats["wal_corrupted_entries"] = e.stats.WALCorruptedEntries.Load()
recoveryDuration := e.stats.WALRecoveryDuration.Load()
if recoveryDuration > 0 {
stats["wal_recovery_duration_ms"] = recoveryDuration / int64(time.Millisecond)
}
// Add timing information
e.stats.mu.RLock()

View File

@ -31,7 +31,7 @@ func DefaultRecoveryOptions(cfg *config.Config) *RecoveryOptions {
}
// RecoverFromWAL rebuilds MemTables from the write-ahead log
// Returns a list of recovered MemTables and the maximum sequence number seen
// Returns a list of recovered MemTables, the maximum sequence number seen, and stats
func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) {
if opts == nil {
opts = DefaultRecoveryOptions(cfg)
@ -76,10 +76,13 @@ func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uin
}
// Replay the WAL directory
if err := wal.ReplayWALDir(cfg.WALDir, entryHandler); err != nil {
_, err := wal.ReplayWALDir(cfg.WALDir, entryHandler)
if err != nil {
return nil, 0, fmt.Errorf("failed to replay WAL: %w", err)
}
// Stats will be captured in the engine directly
// For batch operations, we need to adjust maxSeqNum
finalTable := memTables[len(memTables)-1]
nextSeq := finalTable.GetNextSequenceNumber()

View File

@ -75,7 +75,7 @@ func TestBatchEncoding(t *testing.T) {
// Replay and decode
var decodedBatch *Batch
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypeBatch {
var err error
decodedBatch, err = DecodeBatch(entry)

View File

@ -229,6 +229,17 @@ func (r *Reader) Close() error {
// EntryHandler is a function that processes WAL entries during replay
type EntryHandler func(*Entry) error
// RecoveryStats tracks statistics about WAL recovery
type RecoveryStats struct {
EntriesProcessed uint64
EntriesSkipped uint64
}
// NewRecoveryStats creates a new RecoveryStats instance
func NewRecoveryStats() *RecoveryStats {
return &RecoveryStats{}
}
// FindWALFiles returns a list of WAL files in the given directory
func FindWALFiles(dir string) ([]string, error) {
pattern := filepath.Join(dir, "*.wal")
@ -267,16 +278,15 @@ func getEntryCount(path string) int {
return count
}
func ReplayWALFile(path string, handler EntryHandler) error {
func ReplayWALFile(path string, handler EntryHandler) (*RecoveryStats, error) {
reader, err := OpenReader(path)
if err != nil {
return err
return nil, err
}
defer reader.Close()
// Track statistics for reporting
entriesProcessed := 0
entriesSkipped := 0
// Track statistics
stats := NewRecoveryStats()
for {
entry, err := reader.ReadEntry()
@ -290,14 +300,11 @@ func ReplayWALFile(path string, handler EntryHandler) error {
if strings.Contains(err.Error(), "corrupt") ||
strings.Contains(err.Error(), "invalid") {
// Skip this corrupted entry
if !DisableRecoveryLogs {
fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err)
}
entriesSkipped++
stats.EntriesSkipped++
// If we've seen too many corrupted entries in a row, give up on this file
if entriesSkipped > 5 && entriesProcessed == 0 {
return fmt.Errorf("too many corrupted entries at start of file %s", path)
if stats.EntriesSkipped > 5 && stats.EntriesProcessed == 0 {
return stats, fmt.Errorf("too many corrupted entries at start of file %s", path)
}
// Try to recover by scanning ahead
@ -310,7 +317,7 @@ func ReplayWALFile(path string, handler EntryHandler) error {
break
}
// Couldn't recover
return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
return stats, fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
}
// Successfully recovered, continue to the next entry
@ -318,23 +325,18 @@ func ReplayWALFile(path string, handler EntryHandler) error {
}
// For other errors, fail the replay
return fmt.Errorf("error reading entry from %s: %w", path, err)
return stats, fmt.Errorf("error reading entry from %s: %w", path, err)
}
// Process the entry
if err := handler(entry); err != nil {
return fmt.Errorf("error handling entry: %w", err)
return stats, fmt.Errorf("error handling entry: %w", err)
}
entriesProcessed++
stats.EntriesProcessed++
}
if !DisableRecoveryLogs {
fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n",
entriesProcessed, path, entriesSkipped)
}
return nil
return stats, nil
}
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
@ -356,54 +358,58 @@ func recoverFromCorruption(reader *Reader) error {
}
// ReplayWALDir replays all WAL files in the given directory in order
func ReplayWALDir(dir string, handler EntryHandler) error {
func ReplayWALDir(dir string, handler EntryHandler) (*RecoveryStats, error) {
files, err := FindWALFiles(dir)
if err != nil {
return err
return nil, err
}
// Track overall recovery stats
totalStats := NewRecoveryStats()
// Track number of files processed successfully
successfulFiles := 0
var lastErr error
// Try to process each file, but continue on recoverable errors
for _, file := range files {
err := ReplayWALFile(file, handler)
fileStats, err := ReplayWALFile(file, handler)
if err != nil {
if !DisableRecoveryLogs {
fmt.Printf("Error processing WAL file %s: %v\n", file, err)
}
// Record the error, but continue
lastErr = err
// If we got some stats from the file before the error, add them to our totals
if fileStats != nil {
totalStats.EntriesProcessed += fileStats.EntriesProcessed
totalStats.EntriesSkipped += fileStats.EntriesSkipped
}
// Check if this is a file-level error or just a corrupt record
if !strings.Contains(err.Error(), "corrupt") &&
!strings.Contains(err.Error(), "invalid") {
return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
return totalStats, fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
}
// Continue to the next file for corrupt/invalid errors
continue
}
if !DisableRecoveryLogs {
fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n",
getEntryCount(file), file)
}
// Add stats from this file to our totals
totalStats.EntriesProcessed += fileStats.EntriesProcessed
totalStats.EntriesSkipped += fileStats.EntriesSkipped
successfulFiles++
}
// If we processed at least one file successfully, the WAL recovery is considered successful
if successfulFiles > 0 {
return nil
return totalStats, nil
}
// If no files were processed successfully and we had errors, return the last error
if lastErr != nil {
return fmt.Errorf("failed to process any WAL files: %w", lastErr)
return totalStats, fmt.Errorf("failed to process any WAL files: %w", lastErr)
}
return nil
return totalStats, nil
}

View File

@ -56,7 +56,7 @@ func TestWALWrite(t *testing.T) {
// Verify entries by replaying
entries := make(map[string]string)
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
entries[string(entry.Key)] = string(entry.Value)
} else if entry.Type == OpTypeDelete {
@ -115,7 +115,7 @@ func TestWALDelete(t *testing.T) {
// Verify entries by replaying
var deleted bool
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) {
if deleted {
deleted = false // Key was re-added
@ -171,7 +171,7 @@ func TestWALLargeEntry(t *testing.T) {
// Verify by replaying
var foundLargeEntry bool
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) {
// Verify key
for i := range key {
@ -240,7 +240,7 @@ func TestWALBatch(t *testing.T) {
entries := make(map[string]string)
batchCount := 0
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypeBatch {
batchCount++
@ -336,7 +336,7 @@ func TestWALRecovery(t *testing.T) {
// Verify entries by replaying all WAL files in order
entries := make(map[string]string)
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
entries[string(entry.Key)] = string(entry.Value)
} else if entry.Type == OpTypeDelete {
@ -410,7 +410,7 @@ func TestWALSyncModes(t *testing.T) {
// Verify entries by replaying
count := 0
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
count++
}
@ -471,7 +471,7 @@ func TestWALFragmentation(t *testing.T) {
var reconstructedValue []byte
var foundPut bool
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
foundPut = true
reconstructedKey = entry.Key
@ -580,7 +580,7 @@ func TestWALErrorHandling(t *testing.T) {
// Try to replay a non-existent file
nonExistentPath := filepath.Join(dir, "nonexistent.wal")
err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
_, err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
return nil
})