Compare commits
1 Commits
a0a1c0512f
...
5a926633bb
Author | SHA1 | Date | |
---|---|---|---|
5a926633bb |
109
cmd/kevo/main.go
109
cmd/kevo/main.go
@ -12,6 +12,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
"unicode"
|
||||||
|
|
||||||
"github.com/chzyer/readline"
|
"github.com/chzyer/readline"
|
||||||
|
|
||||||
@ -411,15 +412,89 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
|||||||
|
|
||||||
// Print statistics
|
// Print statistics
|
||||||
stats := eng.GetStats()
|
stats := eng.GetStats()
|
||||||
fmt.Println("Database Statistics:")
|
|
||||||
fmt.Printf(" Operations: %d puts, %d gets (%d hits, %d misses), %d deletes\n",
|
// Format human-readable time for the last operation timestamps
|
||||||
stats["put_ops"], stats["get_ops"], stats["get_hits"], stats["get_misses"], stats["delete_ops"])
|
var lastPutTime, lastGetTime, lastDeleteTime time.Time
|
||||||
fmt.Printf(" Transactions: %d started, %d committed, %d aborted\n",
|
if putTime, ok := stats["last_put_time"].(int64); ok && putTime > 0 {
|
||||||
stats["tx_started"], stats["tx_completed"], stats["tx_aborted"])
|
lastPutTime = time.Unix(0, putTime)
|
||||||
fmt.Printf(" Storage: %d bytes read, %d bytes written, %d flushes\n",
|
}
|
||||||
stats["total_bytes_read"], stats["total_bytes_written"], stats["flush_count"])
|
if getTime, ok := stats["last_get_time"].(int64); ok && getTime > 0 {
|
||||||
fmt.Printf(" Tables: %d sstables, %d immutable memtables\n",
|
lastGetTime = time.Unix(0, getTime)
|
||||||
stats["sstable_count"], stats["immutable_memtable_count"])
|
}
|
||||||
|
if deleteTime, ok := stats["last_delete_time"].(int64); ok && deleteTime > 0 {
|
||||||
|
lastDeleteTime = time.Unix(0, deleteTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Operations section
|
||||||
|
fmt.Println("📊 Operations:")
|
||||||
|
fmt.Printf(" • Puts: %d\n", stats["put_ops"])
|
||||||
|
fmt.Printf(" • Gets: %d (Hits: %d, Misses: %d)\n", stats["get_ops"], stats["get_hits"], stats["get_misses"])
|
||||||
|
fmt.Printf(" • Deletes: %d\n", stats["delete_ops"])
|
||||||
|
|
||||||
|
// Last Operation Times
|
||||||
|
fmt.Println("\n⏱️ Last Operation Times:")
|
||||||
|
if !lastPutTime.IsZero() {
|
||||||
|
fmt.Printf(" • Last Put: %s\n", lastPutTime.Format(time.RFC3339))
|
||||||
|
} else {
|
||||||
|
fmt.Printf(" • Last Put: Never\n")
|
||||||
|
}
|
||||||
|
if !lastGetTime.IsZero() {
|
||||||
|
fmt.Printf(" • Last Get: %s\n", lastGetTime.Format(time.RFC3339))
|
||||||
|
} else {
|
||||||
|
fmt.Printf(" • Last Get: Never\n")
|
||||||
|
}
|
||||||
|
if !lastDeleteTime.IsZero() {
|
||||||
|
fmt.Printf(" • Last Delete: %s\n", lastDeleteTime.Format(time.RFC3339))
|
||||||
|
} else {
|
||||||
|
fmt.Printf(" • Last Delete: Never\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transactions
|
||||||
|
fmt.Println("\n💼 Transactions:")
|
||||||
|
fmt.Printf(" • Started: %d\n", stats["tx_started"])
|
||||||
|
fmt.Printf(" • Completed: %d\n", stats["tx_completed"])
|
||||||
|
fmt.Printf(" • Aborted: %d\n", stats["tx_aborted"])
|
||||||
|
|
||||||
|
// Storage metrics
|
||||||
|
fmt.Println("\n💾 Storage:")
|
||||||
|
fmt.Printf(" • Total Bytes Read: %d\n", stats["total_bytes_read"])
|
||||||
|
fmt.Printf(" • Total Bytes Written: %d\n", stats["total_bytes_written"])
|
||||||
|
fmt.Printf(" • Flush Count: %d\n", stats["flush_count"])
|
||||||
|
|
||||||
|
// Table stats
|
||||||
|
fmt.Println("\n📋 Tables:")
|
||||||
|
fmt.Printf(" • SSTable Count: %d\n", stats["sstable_count"])
|
||||||
|
fmt.Printf(" • Immutable MemTable Count: %d\n", stats["immutable_memtable_count"])
|
||||||
|
fmt.Printf(" • Current MemTable Size: %d bytes\n", stats["memtable_size"])
|
||||||
|
|
||||||
|
// WAL recovery stats
|
||||||
|
fmt.Println("\n🔄 WAL Recovery:")
|
||||||
|
fmt.Printf(" • Files Recovered: %d\n", stats["wal_files_recovered"])
|
||||||
|
fmt.Printf(" • Entries Recovered: %d\n", stats["wal_entries_recovered"])
|
||||||
|
fmt.Printf(" • Corrupted Entries: %d\n", stats["wal_corrupted_entries"])
|
||||||
|
if recoveryDuration, ok := stats["wal_recovery_duration_ms"]; ok {
|
||||||
|
fmt.Printf(" • Recovery Duration: %d ms\n", recoveryDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error counts
|
||||||
|
fmt.Println("\n⚠️ Errors:")
|
||||||
|
fmt.Printf(" • Read Errors: %d\n", stats["read_errors"])
|
||||||
|
fmt.Printf(" • Write Errors: %d\n", stats["write_errors"])
|
||||||
|
|
||||||
|
// Compaction stats (if available)
|
||||||
|
if compactionOutputCount, ok := stats["compaction_last_outputs_count"]; ok {
|
||||||
|
fmt.Println("\n🧹 Compaction:")
|
||||||
|
fmt.Printf(" • Last Output Files Count: %d\n", compactionOutputCount)
|
||||||
|
|
||||||
|
// Display other compaction stats as available
|
||||||
|
for key, value := range stats {
|
||||||
|
if strings.HasPrefix(key, "compaction_") && key != "compaction_last_outputs_count" && key != "compaction_last_outputs" {
|
||||||
|
// Format the key for display (remove prefix, replace underscores with spaces)
|
||||||
|
displayKey := toTitle(strings.Replace(strings.TrimPrefix(key, "compaction_"), "_", " ", -1))
|
||||||
|
fmt.Printf(" • %s: %v\n", displayKey, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
case ".flush":
|
case ".flush":
|
||||||
if eng == nil {
|
if eng == nil {
|
||||||
@ -735,3 +810,19 @@ func makeKeySuccessor(prefix []byte) []byte {
|
|||||||
successor[len(prefix)] = 0xFF
|
successor[len(prefix)] = 0xFF
|
||||||
return successor
|
return successor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// toTitle replaces strings.Title which is deprecated
|
||||||
|
// It converts the first character of each word to title case
|
||||||
|
func toTitle(s string) string {
|
||||||
|
prev := ' '
|
||||||
|
return strings.Map(
|
||||||
|
func(r rune) rune {
|
||||||
|
if unicode.IsSpace(prev) || unicode.IsPunct(prev) {
|
||||||
|
prev = r
|
||||||
|
return unicode.ToTitle(r)
|
||||||
|
}
|
||||||
|
prev = r
|
||||||
|
return r
|
||||||
|
},
|
||||||
|
s)
|
||||||
|
}
|
@ -61,6 +61,12 @@ type EngineStats struct {
|
|||||||
TxCompleted atomic.Uint64
|
TxCompleted atomic.Uint64
|
||||||
TxAborted atomic.Uint64
|
TxAborted atomic.Uint64
|
||||||
|
|
||||||
|
// Recovery stats
|
||||||
|
WALFilesRecovered atomic.Uint64
|
||||||
|
WALEntriesRecovered atomic.Uint64
|
||||||
|
WALCorruptedEntries atomic.Uint64
|
||||||
|
WALRecoveryDuration atomic.Int64 // nanoseconds
|
||||||
|
|
||||||
// Mutex for accessing non-atomic fields
|
// Mutex for accessing non-atomic fields
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
@ -666,21 +672,22 @@ func (e *Engine) loadSSTables() error {
|
|||||||
|
|
||||||
// recoverFromWAL recovers memtables from existing WAL files
|
// recoverFromWAL recovers memtables from existing WAL files
|
||||||
func (e *Engine) recoverFromWAL() error {
|
func (e *Engine) recoverFromWAL() error {
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
// Check if WAL directory exists
|
// Check if WAL directory exists
|
||||||
if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
|
if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
|
||||||
return nil // No WAL directory, nothing to recover
|
return nil // No WAL directory, nothing to recover
|
||||||
}
|
}
|
||||||
|
|
||||||
// List all WAL files for diagnostic purposes
|
// List all WAL files
|
||||||
walFiles, err := wal.FindWALFiles(e.walDir)
|
walFiles, err := wal.FindWALFiles(e.walDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !wal.DisableRecoveryLogs {
|
e.stats.ReadErrors.Add(1)
|
||||||
fmt.Printf("Error listing WAL files: %v\n", err)
|
return fmt.Errorf("error listing WAL files: %w", err)
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if !wal.DisableRecoveryLogs {
|
|
||||||
fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(walFiles) > 0 {
|
||||||
|
e.stats.WALFilesRecovered.Add(uint64(len(walFiles)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get recovery options
|
// Get recovery options
|
||||||
@ -690,17 +697,11 @@ func (e *Engine) recoverFromWAL() error {
|
|||||||
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
|
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If recovery fails, let's try cleaning up WAL files
|
// If recovery fails, let's try cleaning up WAL files
|
||||||
if !wal.DisableRecoveryLogs {
|
e.stats.ReadErrors.Add(1)
|
||||||
fmt.Printf("WAL recovery failed: %v\n", err)
|
|
||||||
fmt.Printf("Attempting to recover by cleaning up WAL files...\n")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a backup directory
|
// Create a backup directory
|
||||||
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
|
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
|
||||||
if err := os.MkdirAll(backupDir, 0755); err != nil {
|
if err := os.MkdirAll(backupDir, 0755); err != nil {
|
||||||
if !wal.DisableRecoveryLogs {
|
|
||||||
fmt.Printf("Failed to create backup directory: %v\n", err)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("failed to recover from WAL: %w", err)
|
return fmt.Errorf("failed to recover from WAL: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -708,11 +709,7 @@ func (e *Engine) recoverFromWAL() error {
|
|||||||
for _, walFile := range walFiles {
|
for _, walFile := range walFiles {
|
||||||
destFile := filepath.Join(backupDir, filepath.Base(walFile))
|
destFile := filepath.Join(backupDir, filepath.Base(walFile))
|
||||||
if err := os.Rename(walFile, destFile); err != nil {
|
if err := os.Rename(walFile, destFile); err != nil {
|
||||||
if !wal.DisableRecoveryLogs {
|
e.stats.ReadErrors.Add(1)
|
||||||
fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err)
|
|
||||||
}
|
|
||||||
} else if !wal.DisableRecoveryLogs {
|
|
||||||
fmt.Printf("Moved problematic WAL file to %s\n", destFile)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -723,15 +720,28 @@ func (e *Engine) recoverFromWAL() error {
|
|||||||
}
|
}
|
||||||
e.wal = newWal
|
e.wal = newWal
|
||||||
|
|
||||||
// No memtables to recover, starting fresh
|
// Record recovery duration
|
||||||
if !wal.DisableRecoveryLogs {
|
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
|
||||||
fmt.Printf("Starting with a fresh WAL after recovery failure\n")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update recovery statistics based on actual entries recovered
|
||||||
|
if len(walFiles) > 0 {
|
||||||
|
// Use WALDir function directly to get stats
|
||||||
|
recoveryStats, statErr := wal.ReplayWALDir(e.cfg.WALDir, func(entry *wal.Entry) error {
|
||||||
|
return nil // Just counting, not processing
|
||||||
|
})
|
||||||
|
|
||||||
|
if statErr == nil && recoveryStats != nil {
|
||||||
|
e.stats.WALEntriesRecovered.Add(recoveryStats.EntriesProcessed)
|
||||||
|
e.stats.WALCorruptedEntries.Add(recoveryStats.EntriesSkipped)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// No memtables recovered or empty WAL
|
// No memtables recovered or empty WAL
|
||||||
if len(memTables) == 0 {
|
if len(memTables) == 0 {
|
||||||
|
// Record recovery duration
|
||||||
|
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -755,10 +765,9 @@ func (e *Engine) recoverFromWAL() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !wal.DisableRecoveryLogs {
|
// Record recovery stats
|
||||||
fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n",
|
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
|
||||||
len(memTables), maxSeqNum)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -925,6 +934,15 @@ func (e *Engine) GetStats() map[string]interface{} {
|
|||||||
stats["read_errors"] = e.stats.ReadErrors.Load()
|
stats["read_errors"] = e.stats.ReadErrors.Load()
|
||||||
stats["write_errors"] = e.stats.WriteErrors.Load()
|
stats["write_errors"] = e.stats.WriteErrors.Load()
|
||||||
|
|
||||||
|
// Add WAL recovery statistics
|
||||||
|
stats["wal_files_recovered"] = e.stats.WALFilesRecovered.Load()
|
||||||
|
stats["wal_entries_recovered"] = e.stats.WALEntriesRecovered.Load()
|
||||||
|
stats["wal_corrupted_entries"] = e.stats.WALCorruptedEntries.Load()
|
||||||
|
recoveryDuration := e.stats.WALRecoveryDuration.Load()
|
||||||
|
if recoveryDuration > 0 {
|
||||||
|
stats["wal_recovery_duration_ms"] = recoveryDuration / int64(time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
// Add timing information
|
// Add timing information
|
||||||
e.stats.mu.RLock()
|
e.stats.mu.RLock()
|
||||||
defer e.stats.mu.RUnlock()
|
defer e.stats.mu.RUnlock()
|
||||||
|
@ -31,7 +31,7 @@ func DefaultRecoveryOptions(cfg *config.Config) *RecoveryOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RecoverFromWAL rebuilds MemTables from the write-ahead log
|
// RecoverFromWAL rebuilds MemTables from the write-ahead log
|
||||||
// Returns a list of recovered MemTables and the maximum sequence number seen
|
// Returns a list of recovered MemTables, the maximum sequence number seen, and stats
|
||||||
func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) {
|
func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultRecoveryOptions(cfg)
|
opts = DefaultRecoveryOptions(cfg)
|
||||||
@ -76,10 +76,13 @@ func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uin
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Replay the WAL directory
|
// Replay the WAL directory
|
||||||
if err := wal.ReplayWALDir(cfg.WALDir, entryHandler); err != nil {
|
_, err := wal.ReplayWALDir(cfg.WALDir, entryHandler)
|
||||||
|
if err != nil {
|
||||||
return nil, 0, fmt.Errorf("failed to replay WAL: %w", err)
|
return nil, 0, fmt.Errorf("failed to replay WAL: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stats will be captured in the engine directly
|
||||||
|
|
||||||
// For batch operations, we need to adjust maxSeqNum
|
// For batch operations, we need to adjust maxSeqNum
|
||||||
finalTable := memTables[len(memTables)-1]
|
finalTable := memTables[len(memTables)-1]
|
||||||
nextSeq := finalTable.GetNextSequenceNumber()
|
nextSeq := finalTable.GetNextSequenceNumber()
|
||||||
|
@ -75,7 +75,7 @@ func TestBatchEncoding(t *testing.T) {
|
|||||||
// Replay and decode
|
// Replay and decode
|
||||||
var decodedBatch *Batch
|
var decodedBatch *Batch
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypeBatch {
|
if entry.Type == OpTypeBatch {
|
||||||
var err error
|
var err error
|
||||||
decodedBatch, err = DecodeBatch(entry)
|
decodedBatch, err = DecodeBatch(entry)
|
||||||
|
@ -229,6 +229,17 @@ func (r *Reader) Close() error {
|
|||||||
// EntryHandler is a function that processes WAL entries during replay
|
// EntryHandler is a function that processes WAL entries during replay
|
||||||
type EntryHandler func(*Entry) error
|
type EntryHandler func(*Entry) error
|
||||||
|
|
||||||
|
// RecoveryStats tracks statistics about WAL recovery
|
||||||
|
type RecoveryStats struct {
|
||||||
|
EntriesProcessed uint64
|
||||||
|
EntriesSkipped uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRecoveryStats creates a new RecoveryStats instance
|
||||||
|
func NewRecoveryStats() *RecoveryStats {
|
||||||
|
return &RecoveryStats{}
|
||||||
|
}
|
||||||
|
|
||||||
// FindWALFiles returns a list of WAL files in the given directory
|
// FindWALFiles returns a list of WAL files in the given directory
|
||||||
func FindWALFiles(dir string) ([]string, error) {
|
func FindWALFiles(dir string) ([]string, error) {
|
||||||
pattern := filepath.Join(dir, "*.wal")
|
pattern := filepath.Join(dir, "*.wal")
|
||||||
@ -267,16 +278,15 @@ func getEntryCount(path string) int {
|
|||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReplayWALFile(path string, handler EntryHandler) error {
|
func ReplayWALFile(path string, handler EntryHandler) (*RecoveryStats, error) {
|
||||||
reader, err := OpenReader(path)
|
reader, err := OpenReader(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
// Track statistics for reporting
|
// Track statistics
|
||||||
entriesProcessed := 0
|
stats := NewRecoveryStats()
|
||||||
entriesSkipped := 0
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
entry, err := reader.ReadEntry()
|
entry, err := reader.ReadEntry()
|
||||||
@ -290,14 +300,11 @@ func ReplayWALFile(path string, handler EntryHandler) error {
|
|||||||
if strings.Contains(err.Error(), "corrupt") ||
|
if strings.Contains(err.Error(), "corrupt") ||
|
||||||
strings.Contains(err.Error(), "invalid") {
|
strings.Contains(err.Error(), "invalid") {
|
||||||
// Skip this corrupted entry
|
// Skip this corrupted entry
|
||||||
if !DisableRecoveryLogs {
|
stats.EntriesSkipped++
|
||||||
fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err)
|
|
||||||
}
|
|
||||||
entriesSkipped++
|
|
||||||
|
|
||||||
// If we've seen too many corrupted entries in a row, give up on this file
|
// If we've seen too many corrupted entries in a row, give up on this file
|
||||||
if entriesSkipped > 5 && entriesProcessed == 0 {
|
if stats.EntriesSkipped > 5 && stats.EntriesProcessed == 0 {
|
||||||
return fmt.Errorf("too many corrupted entries at start of file %s", path)
|
return stats, fmt.Errorf("too many corrupted entries at start of file %s", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to recover by scanning ahead
|
// Try to recover by scanning ahead
|
||||||
@ -310,7 +317,7 @@ func ReplayWALFile(path string, handler EntryHandler) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Couldn't recover
|
// Couldn't recover
|
||||||
return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
|
return stats, fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Successfully recovered, continue to the next entry
|
// Successfully recovered, continue to the next entry
|
||||||
@ -318,23 +325,18 @@ func ReplayWALFile(path string, handler EntryHandler) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// For other errors, fail the replay
|
// For other errors, fail the replay
|
||||||
return fmt.Errorf("error reading entry from %s: %w", path, err)
|
return stats, fmt.Errorf("error reading entry from %s: %w", path, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the entry
|
// Process the entry
|
||||||
if err := handler(entry); err != nil {
|
if err := handler(entry); err != nil {
|
||||||
return fmt.Errorf("error handling entry: %w", err)
|
return stats, fmt.Errorf("error handling entry: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
entriesProcessed++
|
stats.EntriesProcessed++
|
||||||
}
|
}
|
||||||
|
|
||||||
if !DisableRecoveryLogs {
|
return stats, nil
|
||||||
fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n",
|
|
||||||
entriesProcessed, path, entriesSkipped)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
|
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
|
||||||
@ -356,54 +358,58 @@ func recoverFromCorruption(reader *Reader) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ReplayWALDir replays all WAL files in the given directory in order
|
// ReplayWALDir replays all WAL files in the given directory in order
|
||||||
func ReplayWALDir(dir string, handler EntryHandler) error {
|
func ReplayWALDir(dir string, handler EntryHandler) (*RecoveryStats, error) {
|
||||||
files, err := FindWALFiles(dir)
|
files, err := FindWALFiles(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track overall recovery stats
|
||||||
|
totalStats := NewRecoveryStats()
|
||||||
|
|
||||||
// Track number of files processed successfully
|
// Track number of files processed successfully
|
||||||
successfulFiles := 0
|
successfulFiles := 0
|
||||||
var lastErr error
|
var lastErr error
|
||||||
|
|
||||||
// Try to process each file, but continue on recoverable errors
|
// Try to process each file, but continue on recoverable errors
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
err := ReplayWALFile(file, handler)
|
fileStats, err := ReplayWALFile(file, handler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !DisableRecoveryLogs {
|
|
||||||
fmt.Printf("Error processing WAL file %s: %v\n", file, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Record the error, but continue
|
// Record the error, but continue
|
||||||
lastErr = err
|
lastErr = err
|
||||||
|
|
||||||
|
// If we got some stats from the file before the error, add them to our totals
|
||||||
|
if fileStats != nil {
|
||||||
|
totalStats.EntriesProcessed += fileStats.EntriesProcessed
|
||||||
|
totalStats.EntriesSkipped += fileStats.EntriesSkipped
|
||||||
|
}
|
||||||
|
|
||||||
// Check if this is a file-level error or just a corrupt record
|
// Check if this is a file-level error or just a corrupt record
|
||||||
if !strings.Contains(err.Error(), "corrupt") &&
|
if !strings.Contains(err.Error(), "corrupt") &&
|
||||||
!strings.Contains(err.Error(), "invalid") {
|
!strings.Contains(err.Error(), "invalid") {
|
||||||
return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
|
return totalStats, fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Continue to the next file for corrupt/invalid errors
|
// Continue to the next file for corrupt/invalid errors
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !DisableRecoveryLogs {
|
// Add stats from this file to our totals
|
||||||
fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n",
|
totalStats.EntriesProcessed += fileStats.EntriesProcessed
|
||||||
getEntryCount(file), file)
|
totalStats.EntriesSkipped += fileStats.EntriesSkipped
|
||||||
}
|
|
||||||
|
|
||||||
successfulFiles++
|
successfulFiles++
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we processed at least one file successfully, the WAL recovery is considered successful
|
// If we processed at least one file successfully, the WAL recovery is considered successful
|
||||||
if successfulFiles > 0 {
|
if successfulFiles > 0 {
|
||||||
return nil
|
return totalStats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no files were processed successfully and we had errors, return the last error
|
// If no files were processed successfully and we had errors, return the last error
|
||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
return fmt.Errorf("failed to process any WAL files: %w", lastErr)
|
return totalStats, fmt.Errorf("failed to process any WAL files: %w", lastErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return totalStats, nil
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@ func TestWALWrite(t *testing.T) {
|
|||||||
// Verify entries by replaying
|
// Verify entries by replaying
|
||||||
entries := make(map[string]string)
|
entries := make(map[string]string)
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypePut {
|
if entry.Type == OpTypePut {
|
||||||
entries[string(entry.Key)] = string(entry.Value)
|
entries[string(entry.Key)] = string(entry.Value)
|
||||||
} else if entry.Type == OpTypeDelete {
|
} else if entry.Type == OpTypeDelete {
|
||||||
@ -115,7 +115,7 @@ func TestWALDelete(t *testing.T) {
|
|||||||
// Verify entries by replaying
|
// Verify entries by replaying
|
||||||
var deleted bool
|
var deleted bool
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) {
|
if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) {
|
||||||
if deleted {
|
if deleted {
|
||||||
deleted = false // Key was re-added
|
deleted = false // Key was re-added
|
||||||
@ -171,7 +171,7 @@ func TestWALLargeEntry(t *testing.T) {
|
|||||||
// Verify by replaying
|
// Verify by replaying
|
||||||
var foundLargeEntry bool
|
var foundLargeEntry bool
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) {
|
if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) {
|
||||||
// Verify key
|
// Verify key
|
||||||
for i := range key {
|
for i := range key {
|
||||||
@ -240,7 +240,7 @@ func TestWALBatch(t *testing.T) {
|
|||||||
entries := make(map[string]string)
|
entries := make(map[string]string)
|
||||||
batchCount := 0
|
batchCount := 0
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypeBatch {
|
if entry.Type == OpTypeBatch {
|
||||||
batchCount++
|
batchCount++
|
||||||
|
|
||||||
@ -336,7 +336,7 @@ func TestWALRecovery(t *testing.T) {
|
|||||||
// Verify entries by replaying all WAL files in order
|
// Verify entries by replaying all WAL files in order
|
||||||
entries := make(map[string]string)
|
entries := make(map[string]string)
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypePut {
|
if entry.Type == OpTypePut {
|
||||||
entries[string(entry.Key)] = string(entry.Value)
|
entries[string(entry.Key)] = string(entry.Value)
|
||||||
} else if entry.Type == OpTypeDelete {
|
} else if entry.Type == OpTypeDelete {
|
||||||
@ -410,7 +410,7 @@ func TestWALSyncModes(t *testing.T) {
|
|||||||
|
|
||||||
// Verify entries by replaying
|
// Verify entries by replaying
|
||||||
count := 0
|
count := 0
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypePut {
|
if entry.Type == OpTypePut {
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
@ -471,7 +471,7 @@ func TestWALFragmentation(t *testing.T) {
|
|||||||
var reconstructedValue []byte
|
var reconstructedValue []byte
|
||||||
var foundPut bool
|
var foundPut bool
|
||||||
|
|
||||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||||
if entry.Type == OpTypePut {
|
if entry.Type == OpTypePut {
|
||||||
foundPut = true
|
foundPut = true
|
||||||
reconstructedKey = entry.Key
|
reconstructedKey = entry.Key
|
||||||
@ -580,7 +580,7 @@ func TestWALErrorHandling(t *testing.T) {
|
|||||||
|
|
||||||
// Try to replay a non-existent file
|
// Try to replay a non-existent file
|
||||||
nonExistentPath := filepath.Join(dir, "nonexistent.wal")
|
nonExistentPath := filepath.Join(dir, "nonexistent.wal")
|
||||||
err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
|
_, err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user