Compare commits

...

3 Commits

Author SHA1 Message Date
a0a1c0512f
chore: formatting
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m50s
2025-04-22 14:09:54 -06:00
e7974e008d
feat: enhance wal recover statistics 2025-04-22 14:09:45 -06:00
dependabot[bot]
3b3d1c27a4 chore(deps): bump golang.org/x/net from 0.35.0 to 0.38.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.35.0 to 0.38.0.
- [Commits](https://github.com/golang/net/compare/v0.35.0...v0.38.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.38.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:10:02 -06:00
38 changed files with 631 additions and 513 deletions

View File

@ -12,6 +12,7 @@ import (
"strings"
"syscall"
"time"
"unicode"
"github.com/chzyer/readline"
@ -80,14 +81,14 @@ Commands (interactive mode only):
// Config holds the application configuration
type Config struct {
ServerMode bool
DaemonMode bool
ListenAddr string
DBPath string
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
TLSCAFile string
ServerMode bool
DaemonMode bool
ListenAddr string
DBPath string
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
TLSCAFile string
}
func main() {
@ -167,14 +168,14 @@ func parseFlags() Config {
}
return Config{
ServerMode: *serverMode,
DaemonMode: *daemonMode,
ListenAddr: *listenAddr,
DBPath: dbPath,
TLSEnabled: *tlsEnabled,
TLSCertFile: *tlsCertFile,
TLSKeyFile: *tlsKeyFile,
TLSCAFile: *tlsCAFile,
ServerMode: *serverMode,
DaemonMode: *daemonMode,
ListenAddr: *listenAddr,
DBPath: dbPath,
TLSEnabled: *tlsEnabled,
TLSCertFile: *tlsCertFile,
TLSKeyFile: *tlsKeyFile,
TLSCAFile: *tlsCAFile,
}
}
@ -411,15 +412,89 @@ func runInteractive(eng *engine.Engine, dbPath string) {
// Print statistics
stats := eng.GetStats()
fmt.Println("Database Statistics:")
fmt.Printf(" Operations: %d puts, %d gets (%d hits, %d misses), %d deletes\n",
stats["put_ops"], stats["get_ops"], stats["get_hits"], stats["get_misses"], stats["delete_ops"])
fmt.Printf(" Transactions: %d started, %d committed, %d aborted\n",
stats["tx_started"], stats["tx_completed"], stats["tx_aborted"])
fmt.Printf(" Storage: %d bytes read, %d bytes written, %d flushes\n",
stats["total_bytes_read"], stats["total_bytes_written"], stats["flush_count"])
fmt.Printf(" Tables: %d sstables, %d immutable memtables\n",
stats["sstable_count"], stats["immutable_memtable_count"])
// Format human-readable time for the last operation timestamps
var lastPutTime, lastGetTime, lastDeleteTime time.Time
if putTime, ok := stats["last_put_time"].(int64); ok && putTime > 0 {
lastPutTime = time.Unix(0, putTime)
}
if getTime, ok := stats["last_get_time"].(int64); ok && getTime > 0 {
lastGetTime = time.Unix(0, getTime)
}
if deleteTime, ok := stats["last_delete_time"].(int64); ok && deleteTime > 0 {
lastDeleteTime = time.Unix(0, deleteTime)
}
// Operations section
fmt.Println("📊 Operations:")
fmt.Printf(" • Puts: %d\n", stats["put_ops"])
fmt.Printf(" • Gets: %d (Hits: %d, Misses: %d)\n", stats["get_ops"], stats["get_hits"], stats["get_misses"])
fmt.Printf(" • Deletes: %d\n", stats["delete_ops"])
// Last Operation Times
fmt.Println("\n⏱ Last Operation Times:")
if !lastPutTime.IsZero() {
fmt.Printf(" • Last Put: %s\n", lastPutTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Put: Never\n")
}
if !lastGetTime.IsZero() {
fmt.Printf(" • Last Get: %s\n", lastGetTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Get: Never\n")
}
if !lastDeleteTime.IsZero() {
fmt.Printf(" • Last Delete: %s\n", lastDeleteTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Delete: Never\n")
}
// Transactions
fmt.Println("\n💼 Transactions:")
fmt.Printf(" • Started: %d\n", stats["tx_started"])
fmt.Printf(" • Completed: %d\n", stats["tx_completed"])
fmt.Printf(" • Aborted: %d\n", stats["tx_aborted"])
// Storage metrics
fmt.Println("\n💾 Storage:")
fmt.Printf(" • Total Bytes Read: %d\n", stats["total_bytes_read"])
fmt.Printf(" • Total Bytes Written: %d\n", stats["total_bytes_written"])
fmt.Printf(" • Flush Count: %d\n", stats["flush_count"])
// Table stats
fmt.Println("\n📋 Tables:")
fmt.Printf(" • SSTable Count: %d\n", stats["sstable_count"])
fmt.Printf(" • Immutable MemTable Count: %d\n", stats["immutable_memtable_count"])
fmt.Printf(" • Current MemTable Size: %d bytes\n", stats["memtable_size"])
// WAL recovery stats
fmt.Println("\n🔄 WAL Recovery:")
fmt.Printf(" • Files Recovered: %d\n", stats["wal_files_recovered"])
fmt.Printf(" • Entries Recovered: %d\n", stats["wal_entries_recovered"])
fmt.Printf(" • Corrupted Entries: %d\n", stats["wal_corrupted_entries"])
if recoveryDuration, ok := stats["wal_recovery_duration_ms"]; ok {
fmt.Printf(" • Recovery Duration: %d ms\n", recoveryDuration)
}
// Error counts
fmt.Println("\n⚠ Errors:")
fmt.Printf(" • Read Errors: %d\n", stats["read_errors"])
fmt.Printf(" • Write Errors: %d\n", stats["write_errors"])
// Compaction stats (if available)
if compactionOutputCount, ok := stats["compaction_last_outputs_count"]; ok {
fmt.Println("\n🧹 Compaction:")
fmt.Printf(" • Last Output Files Count: %d\n", compactionOutputCount)
// Display other compaction stats as available
for key, value := range stats {
if strings.HasPrefix(key, "compaction_") && key != "compaction_last_outputs_count" && key != "compaction_last_outputs" {
// Format the key for display (remove prefix, replace underscores with spaces)
displayKey := toTitle(strings.Replace(strings.TrimPrefix(key, "compaction_"), "_", " ", -1))
fmt.Printf(" • %s: %v\n", displayKey, value)
}
}
}
case ".flush":
if eng == nil {
@ -735,3 +810,19 @@ func makeKeySuccessor(prefix []byte) []byte {
successor[len(prefix)] = 0xFF
return successor
}
// toTitle replaces strings.Title which is deprecated
// It converts the first character of each word to title case
func toTitle(s string) string {
prev := ' '
return strings.Map(
func(r rune) rune {
if unicode.IsSpace(prev) || unicode.IsPunct(prev) {
prev = r
return unicode.ToTitle(r)
}
prev = r
return r
},
s)
}

View File

@ -154,12 +154,12 @@ func (tr *TransactionRegistry) GracefulShutdown(ctx context.Context) error {
// Server represents the Kevo server
type Server struct {
eng *engine.Engine
txRegistry *TransactionRegistry
listener net.Listener
grpcServer *grpc.Server
eng *engine.Engine
txRegistry *TransactionRegistry
listener net.Listener
grpcServer *grpc.Server
kevoService *grpcservice.KevoServiceServer
config Config
config Config
}
// NewServer creates a new server instance

6
go.mod
View File

@ -10,8 +10,8 @@ require (
)
require (
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
)

12
go.sum
View File

@ -28,13 +28,13 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=

View File

@ -23,11 +23,11 @@ const (
// ClientOptions configures a Kevo client
type ClientOptions struct {
// Connection options
Endpoint string // Server address
ConnectTimeout time.Duration // Timeout for connection attempts
RequestTimeout time.Duration // Default timeout for requests
TransportType string // Transport type (e.g. "grpc")
PoolSize int // Connection pool size
Endpoint string // Server address
ConnectTimeout time.Duration // Timeout for connection attempts
RequestTimeout time.Duration // Default timeout for requests
TransportType string // Transport type (e.g. "grpc")
PoolSize int // Connection pool size
// Security options
TLSEnabled bool // Enable TLS
@ -50,19 +50,19 @@ type ClientOptions struct {
// DefaultClientOptions returns sensible default client options
func DefaultClientOptions() ClientOptions {
return ClientOptions{
Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10,
TransportType: "grpc",
PoolSize: 5,
TLSEnabled: false,
MaxRetries: 3,
InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2,
BackoffFactor: 1.5,
RetryJitter: 0.2,
Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10,
TransportType: "grpc",
PoolSize: 5,
TLSEnabled: false,
MaxRetries: 3,
InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2,
BackoffFactor: 1.5,
RetryJitter: 0.2,
Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
}
}

View File

@ -12,11 +12,11 @@ import (
// Transaction represents a database transaction
type Transaction struct {
client *Client
id string
readOnly bool
closed bool
mu sync.RWMutex
client *Client
id string
readOnly bool
closed bool
mu sync.RWMutex
}
// ErrTransactionClosed is returned when attempting to use a closed transaction

View File

@ -134,7 +134,7 @@ func (h *HierarchicalIterator) Seek(target []byte) bool {
// If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match
break // Since iterators are in newest-to-oldest order, we can stop at the first match
}
}
@ -275,7 +275,7 @@ func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
// If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match
break // Since iterators are in newest-to-oldest order, we can stop at the first match
}
}

View File

@ -61,6 +61,12 @@ type EngineStats struct {
TxCompleted atomic.Uint64
TxAborted atomic.Uint64
// Recovery stats
WALFilesRecovered atomic.Uint64
WALEntriesRecovered atomic.Uint64
WALCorruptedEntries atomic.Uint64
WALRecoveryDuration atomic.Int64 // nanoseconds
// Mutex for accessing non-atomic fields
mu sync.RWMutex
}
@ -666,21 +672,22 @@ func (e *Engine) loadSSTables() error {
// recoverFromWAL recovers memtables from existing WAL files
func (e *Engine) recoverFromWAL() error {
startTime := time.Now()
// Check if WAL directory exists
if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
return nil // No WAL directory, nothing to recover
}
// List all WAL files for diagnostic purposes
// List all WAL files
walFiles, err := wal.FindWALFiles(e.walDir)
if err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Error listing WAL files: %v\n", err)
}
} else {
if !wal.DisableRecoveryLogs {
fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles)
}
e.stats.ReadErrors.Add(1)
return fmt.Errorf("error listing WAL files: %w", err)
}
if len(walFiles) > 0 {
e.stats.WALFilesRecovered.Add(uint64(len(walFiles)))
}
// Get recovery options
@ -690,17 +697,11 @@ func (e *Engine) recoverFromWAL() error {
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
if err != nil {
// If recovery fails, let's try cleaning up WAL files
if !wal.DisableRecoveryLogs {
fmt.Printf("WAL recovery failed: %v\n", err)
fmt.Printf("Attempting to recover by cleaning up WAL files...\n")
}
e.stats.ReadErrors.Add(1)
// Create a backup directory
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
if err := os.MkdirAll(backupDir, 0755); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to create backup directory: %v\n", err)
}
return fmt.Errorf("failed to recover from WAL: %w", err)
}
@ -708,11 +709,7 @@ func (e *Engine) recoverFromWAL() error {
for _, walFile := range walFiles {
destFile := filepath.Join(backupDir, filepath.Base(walFile))
if err := os.Rename(walFile, destFile); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err)
}
} else if !wal.DisableRecoveryLogs {
fmt.Printf("Moved problematic WAL file to %s\n", destFile)
e.stats.ReadErrors.Add(1)
}
}
@ -723,15 +720,28 @@ func (e *Engine) recoverFromWAL() error {
}
e.wal = newWal
// No memtables to recover, starting fresh
if !wal.DisableRecoveryLogs {
fmt.Printf("Starting with a fresh WAL after recovery failure\n")
}
// Record recovery duration
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil
}
// Update recovery statistics based on actual entries recovered
if len(walFiles) > 0 {
// Use WALDir function directly to get stats
recoveryStats, statErr := wal.ReplayWALDir(e.cfg.WALDir, func(entry *wal.Entry) error {
return nil // Just counting, not processing
})
if statErr == nil && recoveryStats != nil {
e.stats.WALEntriesRecovered.Add(recoveryStats.EntriesProcessed)
e.stats.WALCorruptedEntries.Add(recoveryStats.EntriesSkipped)
}
}
// No memtables recovered or empty WAL
if len(memTables) == 0 {
// Record recovery duration
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil
}
@ -755,10 +765,9 @@ func (e *Engine) recoverFromWAL() error {
}
}
if !wal.DisableRecoveryLogs {
fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n",
len(memTables), maxSeqNum)
}
// Record recovery stats
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil
}
@ -925,6 +934,15 @@ func (e *Engine) GetStats() map[string]interface{} {
stats["read_errors"] = e.stats.ReadErrors.Load()
stats["write_errors"] = e.stats.WriteErrors.Load()
// Add WAL recovery statistics
stats["wal_files_recovered"] = e.stats.WALFilesRecovered.Load()
stats["wal_entries_recovered"] = e.stats.WALEntriesRecovered.Load()
stats["wal_corrupted_entries"] = e.stats.WALCorruptedEntries.Load()
recoveryDuration := e.stats.WALRecoveryDuration.Load()
if recoveryDuration > 0 {
stats["wal_recovery_duration_ms"] = recoveryDuration / int64(time.Millisecond)
}
// Add timing information
e.stats.mu.RLock()
defer e.stats.mu.RUnlock()

View File

@ -226,7 +226,7 @@ func (pi *prefixIterator) Next() bool {
// Check if current key has the prefix
key := pi.iter.Key()
if len(key) >= len(pi.prefix) &&
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
return true
}
}

View File

@ -9,8 +9,8 @@ import (
"sync"
"time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
@ -19,13 +19,13 @@ import (
// GRPCClient implements the transport.Client interface for gRPC
type GRPCClient struct {
endpoint string
options transport.TransportOptions
conn *grpc.ClientConn
client pb.KevoServiceClient
status transport.TransportStatus
statusMu sync.RWMutex
metrics transport.MetricsCollector
endpoint string
options transport.TransportOptions
conn *grpc.ClientConn
client pb.KevoServiceClient
status transport.TransportStatus
statusMu sync.RWMutex
metrics transport.MetricsCollector
}
// NewGRPCClient creates a new gRPC client

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
@ -16,13 +16,13 @@ import (
// GRPCServer implements the transport.Server interface for gRPC
type GRPCServer struct {
address string
tlsConfig *tls.Config
server *grpc.Server
address string
tlsConfig *tls.Config
server *grpc.Server
requestHandler transport.RequestHandler
started bool
mu sync.Mutex
metrics *transport.ExtendedMetricsCollector
started bool
mu sync.Mutex
metrics *transport.ExtendedMetricsCollector
}
// NewGRPCServer creates a new gRPC server

View File

@ -5,8 +5,8 @@ import (
"sync"
"time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc"
)

View File

@ -31,7 +31,7 @@ func DefaultRecoveryOptions(cfg *config.Config) *RecoveryOptions {
}
// RecoverFromWAL rebuilds MemTables from the write-ahead log
// Returns a list of recovered MemTables and the maximum sequence number seen
// Returns a list of recovered MemTables, the maximum sequence number seen, and stats
func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) {
if opts == nil {
opts = DefaultRecoveryOptions(cfg)
@ -76,10 +76,13 @@ func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uin
}
// Replay the WAL directory
if err := wal.ReplayWALDir(cfg.WALDir, entryHandler); err != nil {
_, err := wal.ReplayWALDir(cfg.WALDir, entryHandler)
if err != nil {
return nil, 0, fmt.Errorf("failed to replay WAL: %w", err)
}
// Stats will be captured in the engine directly
// For batch operations, we need to adjust maxSeqNum
finalTable := memTables[len(memTables)-1]
nextSeq := finalTable.GetNextSequenceNumber()

View File

@ -6,21 +6,21 @@ import (
// Standard request/response type constants
const (
TypeGet = "get"
TypePut = "put"
TypeDelete = "delete"
TypeBatchWrite = "batch_write"
TypeScan = "scan"
TypeBeginTx = "begin_tx"
TypeCommitTx = "commit_tx"
TypeRollbackTx = "rollback_tx"
TypeTxGet = "tx_get"
TypeTxPut = "tx_put"
TypeTxDelete = "tx_delete"
TypeTxScan = "tx_scan"
TypeGetStats = "get_stats"
TypeCompact = "compact"
TypeError = "error"
TypeGet = "get"
TypePut = "put"
TypeDelete = "delete"
TypeBatchWrite = "batch_write"
TypeScan = "scan"
TypeBeginTx = "begin_tx"
TypeCommitTx = "commit_tx"
TypeRollbackTx = "rollback_tx"
TypeTxGet = "tx_get"
TypeTxPut = "tx_put"
TypeTxDelete = "tx_delete"
TypeTxScan = "tx_scan"
TypeGetStats = "get_stats"
TypeCompact = "compact"
TypeError = "error"
)
// Common errors

View File

@ -82,7 +82,7 @@ func (c *BasicMetricsCollector) RecordRequest(requestType string, startTime time
if exists {
// Update running average - the common case for better branch prediction
// new_avg = (old_avg * count + new_value) / (count + 1)
totalDuration := currentAvg * time.Duration(currentCount) + latency
totalDuration := currentAvg*time.Duration(currentCount) + latency
newCount := currentCount + 1
c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount)
c.requestCountByType[requestType] = newCount

View File

@ -9,9 +9,9 @@ import (
// Metrics struct extensions for server metrics
type ServerMetrics struct {
Metrics
ServerStarted uint64
ServerErrored uint64
ServerStopped uint64
ServerStarted uint64
ServerErrored uint64
ServerStopped uint64
}
// Connection represents a connection to a remote endpoint
@ -31,11 +31,11 @@ type Connection interface {
// ConnectionStatus represents the status of a connection
type ConnectionStatus struct {
Connected bool
LastActivity time.Time
ErrorCount int
RequestCount int
LatencyAvg time.Duration
Connected bool
LastActivity time.Time
ErrorCount int
RequestCount int
LatencyAvg time.Duration
}
// TransportManager is an interface for managing transport layer operations

View File

@ -7,7 +7,7 @@ import (
// registry implements the Registry interface
type registry struct {
mu sync.RWMutex
mu sync.RWMutex
clientFactories map[string]ClientFactory
serverFactories map[string]ServerFactory
}

View File

@ -75,7 +75,7 @@ func TestBatchEncoding(t *testing.T) {
// Replay and decode
var decodedBatch *Batch
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypeBatch {
var err error
decodedBatch, err = DecodeBatch(entry)

View File

@ -229,6 +229,17 @@ func (r *Reader) Close() error {
// EntryHandler is a function that processes WAL entries during replay
type EntryHandler func(*Entry) error
// RecoveryStats tracks statistics about WAL recovery
type RecoveryStats struct {
EntriesProcessed uint64
EntriesSkipped uint64
}
// NewRecoveryStats creates a new RecoveryStats instance
func NewRecoveryStats() *RecoveryStats {
return &RecoveryStats{}
}
// FindWALFiles returns a list of WAL files in the given directory
func FindWALFiles(dir string) ([]string, error) {
pattern := filepath.Join(dir, "*.wal")
@ -267,16 +278,15 @@ func getEntryCount(path string) int {
return count
}
func ReplayWALFile(path string, handler EntryHandler) error {
func ReplayWALFile(path string, handler EntryHandler) (*RecoveryStats, error) {
reader, err := OpenReader(path)
if err != nil {
return err
return nil, err
}
defer reader.Close()
// Track statistics for reporting
entriesProcessed := 0
entriesSkipped := 0
// Track statistics
stats := NewRecoveryStats()
for {
entry, err := reader.ReadEntry()
@ -290,14 +300,11 @@ func ReplayWALFile(path string, handler EntryHandler) error {
if strings.Contains(err.Error(), "corrupt") ||
strings.Contains(err.Error(), "invalid") {
// Skip this corrupted entry
if !DisableRecoveryLogs {
fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err)
}
entriesSkipped++
stats.EntriesSkipped++
// If we've seen too many corrupted entries in a row, give up on this file
if entriesSkipped > 5 && entriesProcessed == 0 {
return fmt.Errorf("too many corrupted entries at start of file %s", path)
if stats.EntriesSkipped > 5 && stats.EntriesProcessed == 0 {
return stats, fmt.Errorf("too many corrupted entries at start of file %s", path)
}
// Try to recover by scanning ahead
@ -310,7 +317,7 @@ func ReplayWALFile(path string, handler EntryHandler) error {
break
}
// Couldn't recover
return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
return stats, fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
}
// Successfully recovered, continue to the next entry
@ -318,23 +325,18 @@ func ReplayWALFile(path string, handler EntryHandler) error {
}
// For other errors, fail the replay
return fmt.Errorf("error reading entry from %s: %w", path, err)
return stats, fmt.Errorf("error reading entry from %s: %w", path, err)
}
// Process the entry
if err := handler(entry); err != nil {
return fmt.Errorf("error handling entry: %w", err)
return stats, fmt.Errorf("error handling entry: %w", err)
}
entriesProcessed++
stats.EntriesProcessed++
}
if !DisableRecoveryLogs {
fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n",
entriesProcessed, path, entriesSkipped)
}
return nil
return stats, nil
}
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
@ -356,54 +358,58 @@ func recoverFromCorruption(reader *Reader) error {
}
// ReplayWALDir replays all WAL files in the given directory in order
func ReplayWALDir(dir string, handler EntryHandler) error {
func ReplayWALDir(dir string, handler EntryHandler) (*RecoveryStats, error) {
files, err := FindWALFiles(dir)
if err != nil {
return err
return nil, err
}
// Track overall recovery stats
totalStats := NewRecoveryStats()
// Track number of files processed successfully
successfulFiles := 0
var lastErr error
// Try to process each file, but continue on recoverable errors
for _, file := range files {
err := ReplayWALFile(file, handler)
fileStats, err := ReplayWALFile(file, handler)
if err != nil {
if !DisableRecoveryLogs {
fmt.Printf("Error processing WAL file %s: %v\n", file, err)
}
// Record the error, but continue
lastErr = err
// If we got some stats from the file before the error, add them to our totals
if fileStats != nil {
totalStats.EntriesProcessed += fileStats.EntriesProcessed
totalStats.EntriesSkipped += fileStats.EntriesSkipped
}
// Check if this is a file-level error or just a corrupt record
if !strings.Contains(err.Error(), "corrupt") &&
!strings.Contains(err.Error(), "invalid") {
return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
return totalStats, fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
}
// Continue to the next file for corrupt/invalid errors
continue
}
if !DisableRecoveryLogs {
fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n",
getEntryCount(file), file)
}
// Add stats from this file to our totals
totalStats.EntriesProcessed += fileStats.EntriesProcessed
totalStats.EntriesSkipped += fileStats.EntriesSkipped
successfulFiles++
}
// If we processed at least one file successfully, the WAL recovery is considered successful
if successfulFiles > 0 {
return nil
return totalStats, nil
}
// If no files were processed successfully and we had errors, return the last error
if lastErr != nil {
return fmt.Errorf("failed to process any WAL files: %w", lastErr)
return totalStats, fmt.Errorf("failed to process any WAL files: %w", lastErr)
}
return nil
return totalStats, nil
}

View File

@ -56,7 +56,7 @@ func TestWALWrite(t *testing.T) {
// Verify entries by replaying
entries := make(map[string]string)
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
entries[string(entry.Key)] = string(entry.Value)
} else if entry.Type == OpTypeDelete {
@ -115,7 +115,7 @@ func TestWALDelete(t *testing.T) {
// Verify entries by replaying
var deleted bool
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) {
if deleted {
deleted = false // Key was re-added
@ -171,7 +171,7 @@ func TestWALLargeEntry(t *testing.T) {
// Verify by replaying
var foundLargeEntry bool
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) {
// Verify key
for i := range key {
@ -240,7 +240,7 @@ func TestWALBatch(t *testing.T) {
entries := make(map[string]string)
batchCount := 0
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypeBatch {
batchCount++
@ -336,7 +336,7 @@ func TestWALRecovery(t *testing.T) {
// Verify entries by replaying all WAL files in order
entries := make(map[string]string)
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
entries[string(entry.Key)] = string(entry.Value)
} else if entry.Type == OpTypeDelete {
@ -410,7 +410,7 @@ func TestWALSyncModes(t *testing.T) {
// Verify entries by replaying
count := 0
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
count++
}
@ -471,7 +471,7 @@ func TestWALFragmentation(t *testing.T) {
var reconstructedValue []byte
var foundPut bool
err = ReplayWALDir(dir, func(entry *Entry) error {
_, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut {
foundPut = true
reconstructedKey = entry.Key
@ -580,7 +580,7 @@ func TestWALErrorHandling(t *testing.T) {
// Try to replay a non-existent file
nonExistentPath := filepath.Join(dir, "nonexistent.wal")
err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
_, err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
return nil
})