Compare commits

...

3 Commits

Author SHA1 Message Date
a0a1c0512f
chore: formatting
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m50s
2025-04-22 14:09:54 -06:00
e7974e008d
feat: enhance wal recover statistics 2025-04-22 14:09:45 -06:00
dependabot[bot]
3b3d1c27a4 chore(deps): bump golang.org/x/net from 0.35.0 to 0.38.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.35.0 to 0.38.0.
- [Commits](https://github.com/golang/net/compare/v0.35.0...v0.38.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.38.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:10:02 -06:00
38 changed files with 631 additions and 513 deletions

View File

@ -12,6 +12,7 @@ import (
"strings" "strings"
"syscall" "syscall"
"time" "time"
"unicode"
"github.com/chzyer/readline" "github.com/chzyer/readline"
@ -80,14 +81,14 @@ Commands (interactive mode only):
// Config holds the application configuration // Config holds the application configuration
type Config struct { type Config struct {
ServerMode bool ServerMode bool
DaemonMode bool DaemonMode bool
ListenAddr string ListenAddr string
DBPath string DBPath string
TLSEnabled bool TLSEnabled bool
TLSCertFile string TLSCertFile string
TLSKeyFile string TLSKeyFile string
TLSCAFile string TLSCAFile string
} }
func main() { func main() {
@ -167,14 +168,14 @@ func parseFlags() Config {
} }
return Config{ return Config{
ServerMode: *serverMode, ServerMode: *serverMode,
DaemonMode: *daemonMode, DaemonMode: *daemonMode,
ListenAddr: *listenAddr, ListenAddr: *listenAddr,
DBPath: dbPath, DBPath: dbPath,
TLSEnabled: *tlsEnabled, TLSEnabled: *tlsEnabled,
TLSCertFile: *tlsCertFile, TLSCertFile: *tlsCertFile,
TLSKeyFile: *tlsKeyFile, TLSKeyFile: *tlsKeyFile,
TLSCAFile: *tlsCAFile, TLSCAFile: *tlsCAFile,
} }
} }
@ -411,15 +412,89 @@ func runInteractive(eng *engine.Engine, dbPath string) {
// Print statistics // Print statistics
stats := eng.GetStats() stats := eng.GetStats()
fmt.Println("Database Statistics:")
fmt.Printf(" Operations: %d puts, %d gets (%d hits, %d misses), %d deletes\n", // Format human-readable time for the last operation timestamps
stats["put_ops"], stats["get_ops"], stats["get_hits"], stats["get_misses"], stats["delete_ops"]) var lastPutTime, lastGetTime, lastDeleteTime time.Time
fmt.Printf(" Transactions: %d started, %d committed, %d aborted\n", if putTime, ok := stats["last_put_time"].(int64); ok && putTime > 0 {
stats["tx_started"], stats["tx_completed"], stats["tx_aborted"]) lastPutTime = time.Unix(0, putTime)
fmt.Printf(" Storage: %d bytes read, %d bytes written, %d flushes\n", }
stats["total_bytes_read"], stats["total_bytes_written"], stats["flush_count"]) if getTime, ok := stats["last_get_time"].(int64); ok && getTime > 0 {
fmt.Printf(" Tables: %d sstables, %d immutable memtables\n", lastGetTime = time.Unix(0, getTime)
stats["sstable_count"], stats["immutable_memtable_count"]) }
if deleteTime, ok := stats["last_delete_time"].(int64); ok && deleteTime > 0 {
lastDeleteTime = time.Unix(0, deleteTime)
}
// Operations section
fmt.Println("📊 Operations:")
fmt.Printf(" • Puts: %d\n", stats["put_ops"])
fmt.Printf(" • Gets: %d (Hits: %d, Misses: %d)\n", stats["get_ops"], stats["get_hits"], stats["get_misses"])
fmt.Printf(" • Deletes: %d\n", stats["delete_ops"])
// Last Operation Times
fmt.Println("\n⏱ Last Operation Times:")
if !lastPutTime.IsZero() {
fmt.Printf(" • Last Put: %s\n", lastPutTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Put: Never\n")
}
if !lastGetTime.IsZero() {
fmt.Printf(" • Last Get: %s\n", lastGetTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Get: Never\n")
}
if !lastDeleteTime.IsZero() {
fmt.Printf(" • Last Delete: %s\n", lastDeleteTime.Format(time.RFC3339))
} else {
fmt.Printf(" • Last Delete: Never\n")
}
// Transactions
fmt.Println("\n💼 Transactions:")
fmt.Printf(" • Started: %d\n", stats["tx_started"])
fmt.Printf(" • Completed: %d\n", stats["tx_completed"])
fmt.Printf(" • Aborted: %d\n", stats["tx_aborted"])
// Storage metrics
fmt.Println("\n💾 Storage:")
fmt.Printf(" • Total Bytes Read: %d\n", stats["total_bytes_read"])
fmt.Printf(" • Total Bytes Written: %d\n", stats["total_bytes_written"])
fmt.Printf(" • Flush Count: %d\n", stats["flush_count"])
// Table stats
fmt.Println("\n📋 Tables:")
fmt.Printf(" • SSTable Count: %d\n", stats["sstable_count"])
fmt.Printf(" • Immutable MemTable Count: %d\n", stats["immutable_memtable_count"])
fmt.Printf(" • Current MemTable Size: %d bytes\n", stats["memtable_size"])
// WAL recovery stats
fmt.Println("\n🔄 WAL Recovery:")
fmt.Printf(" • Files Recovered: %d\n", stats["wal_files_recovered"])
fmt.Printf(" • Entries Recovered: %d\n", stats["wal_entries_recovered"])
fmt.Printf(" • Corrupted Entries: %d\n", stats["wal_corrupted_entries"])
if recoveryDuration, ok := stats["wal_recovery_duration_ms"]; ok {
fmt.Printf(" • Recovery Duration: %d ms\n", recoveryDuration)
}
// Error counts
fmt.Println("\n⚠ Errors:")
fmt.Printf(" • Read Errors: %d\n", stats["read_errors"])
fmt.Printf(" • Write Errors: %d\n", stats["write_errors"])
// Compaction stats (if available)
if compactionOutputCount, ok := stats["compaction_last_outputs_count"]; ok {
fmt.Println("\n🧹 Compaction:")
fmt.Printf(" • Last Output Files Count: %d\n", compactionOutputCount)
// Display other compaction stats as available
for key, value := range stats {
if strings.HasPrefix(key, "compaction_") && key != "compaction_last_outputs_count" && key != "compaction_last_outputs" {
// Format the key for display (remove prefix, replace underscores with spaces)
displayKey := toTitle(strings.Replace(strings.TrimPrefix(key, "compaction_"), "_", " ", -1))
fmt.Printf(" • %s: %v\n", displayKey, value)
}
}
}
case ".flush": case ".flush":
if eng == nil { if eng == nil {
@ -735,3 +810,19 @@ func makeKeySuccessor(prefix []byte) []byte {
successor[len(prefix)] = 0xFF successor[len(prefix)] = 0xFF
return successor return successor
} }
// toTitle replaces strings.Title which is deprecated
// It converts the first character of each word to title case
func toTitle(s string) string {
prev := ' '
return strings.Map(
func(r rune) rune {
if unicode.IsSpace(prev) || unicode.IsPunct(prev) {
prev = r
return unicode.ToTitle(r)
}
prev = r
return r
},
s)
}

View File

@ -154,12 +154,12 @@ func (tr *TransactionRegistry) GracefulShutdown(ctx context.Context) error {
// Server represents the Kevo server // Server represents the Kevo server
type Server struct { type Server struct {
eng *engine.Engine eng *engine.Engine
txRegistry *TransactionRegistry txRegistry *TransactionRegistry
listener net.Listener listener net.Listener
grpcServer *grpc.Server grpcServer *grpc.Server
kevoService *grpcservice.KevoServiceServer kevoService *grpcservice.KevoServiceServer
config Config config Config
} }
// NewServer creates a new server instance // NewServer creates a new server instance

6
go.mod
View File

@ -10,8 +10,8 @@ require (
) )
require ( require (
golang.org/x/net v0.35.0 // indirect golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.30.0 // indirect golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.22.0 // indirect golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
) )

12
go.sum
View File

@ -28,13 +28,13 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=

View File

@ -23,11 +23,11 @@ const (
// ClientOptions configures a Kevo client // ClientOptions configures a Kevo client
type ClientOptions struct { type ClientOptions struct {
// Connection options // Connection options
Endpoint string // Server address Endpoint string // Server address
ConnectTimeout time.Duration // Timeout for connection attempts ConnectTimeout time.Duration // Timeout for connection attempts
RequestTimeout time.Duration // Default timeout for requests RequestTimeout time.Duration // Default timeout for requests
TransportType string // Transport type (e.g. "grpc") TransportType string // Transport type (e.g. "grpc")
PoolSize int // Connection pool size PoolSize int // Connection pool size
// Security options // Security options
TLSEnabled bool // Enable TLS TLSEnabled bool // Enable TLS
@ -50,19 +50,19 @@ type ClientOptions struct {
// DefaultClientOptions returns sensible default client options // DefaultClientOptions returns sensible default client options
func DefaultClientOptions() ClientOptions { func DefaultClientOptions() ClientOptions {
return ClientOptions{ return ClientOptions{
Endpoint: "localhost:50051", Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5, ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10, RequestTimeout: time.Second * 10,
TransportType: "grpc", TransportType: "grpc",
PoolSize: 5, PoolSize: 5,
TLSEnabled: false, TLSEnabled: false,
MaxRetries: 3, MaxRetries: 3,
InitialBackoff: time.Millisecond * 100, InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2, MaxBackoff: time.Second * 2,
BackoffFactor: 1.5, BackoffFactor: 1.5,
RetryJitter: 0.2, RetryJitter: 0.2,
Compression: CompressionNone, Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB MaxMessageSize: 16 * 1024 * 1024, // 16MB
} }
} }

View File

@ -12,11 +12,11 @@ import (
// Transaction represents a database transaction // Transaction represents a database transaction
type Transaction struct { type Transaction struct {
client *Client client *Client
id string id string
readOnly bool readOnly bool
closed bool closed bool
mu sync.RWMutex mu sync.RWMutex
} }
// ErrTransactionClosed is returned when attempting to use a closed transaction // ErrTransactionClosed is returned when attempting to use a closed transaction

View File

@ -134,7 +134,7 @@ func (h *HierarchicalIterator) Seek(target []byte) bool {
// If a newer iterator has the same key, use its value // If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) { if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value() bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match break // Since iterators are in newest-to-oldest order, we can stop at the first match
} }
} }
@ -275,7 +275,7 @@ func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
// If a newer iterator has the same key, use its value // If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) { if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value() bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match break // Since iterators are in newest-to-oldest order, we can stop at the first match
} }
} }

View File

@ -61,6 +61,12 @@ type EngineStats struct {
TxCompleted atomic.Uint64 TxCompleted atomic.Uint64
TxAborted atomic.Uint64 TxAborted atomic.Uint64
// Recovery stats
WALFilesRecovered atomic.Uint64
WALEntriesRecovered atomic.Uint64
WALCorruptedEntries atomic.Uint64
WALRecoveryDuration atomic.Int64 // nanoseconds
// Mutex for accessing non-atomic fields // Mutex for accessing non-atomic fields
mu sync.RWMutex mu sync.RWMutex
} }
@ -666,21 +672,22 @@ func (e *Engine) loadSSTables() error {
// recoverFromWAL recovers memtables from existing WAL files // recoverFromWAL recovers memtables from existing WAL files
func (e *Engine) recoverFromWAL() error { func (e *Engine) recoverFromWAL() error {
startTime := time.Now()
// Check if WAL directory exists // Check if WAL directory exists
if _, err := os.Stat(e.walDir); os.IsNotExist(err) { if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
return nil // No WAL directory, nothing to recover return nil // No WAL directory, nothing to recover
} }
// List all WAL files for diagnostic purposes // List all WAL files
walFiles, err := wal.FindWALFiles(e.walDir) walFiles, err := wal.FindWALFiles(e.walDir)
if err != nil { if err != nil {
if !wal.DisableRecoveryLogs { e.stats.ReadErrors.Add(1)
fmt.Printf("Error listing WAL files: %v\n", err) return fmt.Errorf("error listing WAL files: %w", err)
} }
} else {
if !wal.DisableRecoveryLogs { if len(walFiles) > 0 {
fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles) e.stats.WALFilesRecovered.Add(uint64(len(walFiles)))
}
} }
// Get recovery options // Get recovery options
@ -690,17 +697,11 @@ func (e *Engine) recoverFromWAL() error {
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts) memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
if err != nil { if err != nil {
// If recovery fails, let's try cleaning up WAL files // If recovery fails, let's try cleaning up WAL files
if !wal.DisableRecoveryLogs { e.stats.ReadErrors.Add(1)
fmt.Printf("WAL recovery failed: %v\n", err)
fmt.Printf("Attempting to recover by cleaning up WAL files...\n")
}
// Create a backup directory // Create a backup directory
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405")) backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
if err := os.MkdirAll(backupDir, 0755); err != nil { if err := os.MkdirAll(backupDir, 0755); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to create backup directory: %v\n", err)
}
return fmt.Errorf("failed to recover from WAL: %w", err) return fmt.Errorf("failed to recover from WAL: %w", err)
} }
@ -708,11 +709,7 @@ func (e *Engine) recoverFromWAL() error {
for _, walFile := range walFiles { for _, walFile := range walFiles {
destFile := filepath.Join(backupDir, filepath.Base(walFile)) destFile := filepath.Join(backupDir, filepath.Base(walFile))
if err := os.Rename(walFile, destFile); err != nil { if err := os.Rename(walFile, destFile); err != nil {
if !wal.DisableRecoveryLogs { e.stats.ReadErrors.Add(1)
fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err)
}
} else if !wal.DisableRecoveryLogs {
fmt.Printf("Moved problematic WAL file to %s\n", destFile)
} }
} }
@ -723,15 +720,28 @@ func (e *Engine) recoverFromWAL() error {
} }
e.wal = newWal e.wal = newWal
// No memtables to recover, starting fresh // Record recovery duration
if !wal.DisableRecoveryLogs { e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
fmt.Printf("Starting with a fresh WAL after recovery failure\n")
}
return nil return nil
} }
// Update recovery statistics based on actual entries recovered
if len(walFiles) > 0 {
// Use WALDir function directly to get stats
recoveryStats, statErr := wal.ReplayWALDir(e.cfg.WALDir, func(entry *wal.Entry) error {
return nil // Just counting, not processing
})
if statErr == nil && recoveryStats != nil {
e.stats.WALEntriesRecovered.Add(recoveryStats.EntriesProcessed)
e.stats.WALCorruptedEntries.Add(recoveryStats.EntriesSkipped)
}
}
// No memtables recovered or empty WAL // No memtables recovered or empty WAL
if len(memTables) == 0 { if len(memTables) == 0 {
// Record recovery duration
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
return nil return nil
} }
@ -755,10 +765,9 @@ func (e *Engine) recoverFromWAL() error {
} }
} }
if !wal.DisableRecoveryLogs { // Record recovery stats
fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n", e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
len(memTables), maxSeqNum)
}
return nil return nil
} }
@ -925,6 +934,15 @@ func (e *Engine) GetStats() map[string]interface{} {
stats["read_errors"] = e.stats.ReadErrors.Load() stats["read_errors"] = e.stats.ReadErrors.Load()
stats["write_errors"] = e.stats.WriteErrors.Load() stats["write_errors"] = e.stats.WriteErrors.Load()
// Add WAL recovery statistics
stats["wal_files_recovered"] = e.stats.WALFilesRecovered.Load()
stats["wal_entries_recovered"] = e.stats.WALEntriesRecovered.Load()
stats["wal_corrupted_entries"] = e.stats.WALCorruptedEntries.Load()
recoveryDuration := e.stats.WALRecoveryDuration.Load()
if recoveryDuration > 0 {
stats["wal_recovery_duration_ms"] = recoveryDuration / int64(time.Millisecond)
}
// Add timing information // Add timing information
e.stats.mu.RLock() e.stats.mu.RLock()
defer e.stats.mu.RUnlock() defer e.stats.mu.RUnlock()

View File

@ -226,7 +226,7 @@ func (pi *prefixIterator) Next() bool {
// Check if current key has the prefix // Check if current key has the prefix
key := pi.iter.Key() key := pi.iter.Key()
if len(key) >= len(pi.prefix) && if len(key) >= len(pi.prefix) &&
equalByteSlice(key[:len(pi.prefix)], pi.prefix) { equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
return true return true
} }
} }

View File

@ -9,8 +9,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
@ -19,13 +19,13 @@ import (
// GRPCClient implements the transport.Client interface for gRPC // GRPCClient implements the transport.Client interface for gRPC
type GRPCClient struct { type GRPCClient struct {
endpoint string endpoint string
options transport.TransportOptions options transport.TransportOptions
conn *grpc.ClientConn conn *grpc.ClientConn
client pb.KevoServiceClient client pb.KevoServiceClient
status transport.TransportStatus status transport.TransportStatus
statusMu sync.RWMutex statusMu sync.RWMutex
metrics transport.MetricsCollector metrics transport.MetricsCollector
} }
// NewGRPCClient creates a new gRPC client // NewGRPCClient creates a new gRPC client

View File

@ -7,8 +7,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"

View File

@ -7,8 +7,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
@ -16,13 +16,13 @@ import (
// GRPCServer implements the transport.Server interface for gRPC // GRPCServer implements the transport.Server interface for gRPC
type GRPCServer struct { type GRPCServer struct {
address string address string
tlsConfig *tls.Config tlsConfig *tls.Config
server *grpc.Server server *grpc.Server
requestHandler transport.RequestHandler requestHandler transport.RequestHandler
started bool started bool
mu sync.Mutex mu sync.Mutex
metrics *transport.ExtendedMetricsCollector metrics *transport.ExtendedMetricsCollector
} }
// NewGRPCServer creates a new gRPC server // NewGRPCServer creates a new gRPC server

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

View File

@ -31,7 +31,7 @@ func DefaultRecoveryOptions(cfg *config.Config) *RecoveryOptions {
} }
// RecoverFromWAL rebuilds MemTables from the write-ahead log // RecoverFromWAL rebuilds MemTables from the write-ahead log
// Returns a list of recovered MemTables and the maximum sequence number seen // Returns a list of recovered MemTables, the maximum sequence number seen, and stats
func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) { func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) {
if opts == nil { if opts == nil {
opts = DefaultRecoveryOptions(cfg) opts = DefaultRecoveryOptions(cfg)
@ -76,10 +76,13 @@ func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uin
} }
// Replay the WAL directory // Replay the WAL directory
if err := wal.ReplayWALDir(cfg.WALDir, entryHandler); err != nil { _, err := wal.ReplayWALDir(cfg.WALDir, entryHandler)
if err != nil {
return nil, 0, fmt.Errorf("failed to replay WAL: %w", err) return nil, 0, fmt.Errorf("failed to replay WAL: %w", err)
} }
// Stats will be captured in the engine directly
// For batch operations, we need to adjust maxSeqNum // For batch operations, we need to adjust maxSeqNum
finalTable := memTables[len(memTables)-1] finalTable := memTables[len(memTables)-1]
nextSeq := finalTable.GetNextSequenceNumber() nextSeq := finalTable.GetNextSequenceNumber()

View File

@ -6,21 +6,21 @@ import (
// Standard request/response type constants // Standard request/response type constants
const ( const (
TypeGet = "get" TypeGet = "get"
TypePut = "put" TypePut = "put"
TypeDelete = "delete" TypeDelete = "delete"
TypeBatchWrite = "batch_write" TypeBatchWrite = "batch_write"
TypeScan = "scan" TypeScan = "scan"
TypeBeginTx = "begin_tx" TypeBeginTx = "begin_tx"
TypeCommitTx = "commit_tx" TypeCommitTx = "commit_tx"
TypeRollbackTx = "rollback_tx" TypeRollbackTx = "rollback_tx"
TypeTxGet = "tx_get" TypeTxGet = "tx_get"
TypeTxPut = "tx_put" TypeTxPut = "tx_put"
TypeTxDelete = "tx_delete" TypeTxDelete = "tx_delete"
TypeTxScan = "tx_scan" TypeTxScan = "tx_scan"
TypeGetStats = "get_stats" TypeGetStats = "get_stats"
TypeCompact = "compact" TypeCompact = "compact"
TypeError = "error" TypeError = "error"
) )
// Common errors // Common errors

View File

@ -82,7 +82,7 @@ func (c *BasicMetricsCollector) RecordRequest(requestType string, startTime time
if exists { if exists {
// Update running average - the common case for better branch prediction // Update running average - the common case for better branch prediction
// new_avg = (old_avg * count + new_value) / (count + 1) // new_avg = (old_avg * count + new_value) / (count + 1)
totalDuration := currentAvg * time.Duration(currentCount) + latency totalDuration := currentAvg*time.Duration(currentCount) + latency
newCount := currentCount + 1 newCount := currentCount + 1
c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount) c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount)
c.requestCountByType[requestType] = newCount c.requestCountByType[requestType] = newCount

View File

@ -9,9 +9,9 @@ import (
// Metrics struct extensions for server metrics // Metrics struct extensions for server metrics
type ServerMetrics struct { type ServerMetrics struct {
Metrics Metrics
ServerStarted uint64 ServerStarted uint64
ServerErrored uint64 ServerErrored uint64
ServerStopped uint64 ServerStopped uint64
} }
// Connection represents a connection to a remote endpoint // Connection represents a connection to a remote endpoint
@ -31,11 +31,11 @@ type Connection interface {
// ConnectionStatus represents the status of a connection // ConnectionStatus represents the status of a connection
type ConnectionStatus struct { type ConnectionStatus struct {
Connected bool Connected bool
LastActivity time.Time LastActivity time.Time
ErrorCount int ErrorCount int
RequestCount int RequestCount int
LatencyAvg time.Duration LatencyAvg time.Duration
} }
// TransportManager is an interface for managing transport layer operations // TransportManager is an interface for managing transport layer operations

View File

@ -7,7 +7,7 @@ import (
// registry implements the Registry interface // registry implements the Registry interface
type registry struct { type registry struct {
mu sync.RWMutex mu sync.RWMutex
clientFactories map[string]ClientFactory clientFactories map[string]ClientFactory
serverFactories map[string]ServerFactory serverFactories map[string]ServerFactory
} }

View File

@ -75,7 +75,7 @@ func TestBatchEncoding(t *testing.T) {
// Replay and decode // Replay and decode
var decodedBatch *Batch var decodedBatch *Batch
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypeBatch { if entry.Type == OpTypeBatch {
var err error var err error
decodedBatch, err = DecodeBatch(entry) decodedBatch, err = DecodeBatch(entry)

View File

@ -229,6 +229,17 @@ func (r *Reader) Close() error {
// EntryHandler is a function that processes WAL entries during replay // EntryHandler is a function that processes WAL entries during replay
type EntryHandler func(*Entry) error type EntryHandler func(*Entry) error
// RecoveryStats tracks statistics about WAL recovery
type RecoveryStats struct {
EntriesProcessed uint64
EntriesSkipped uint64
}
// NewRecoveryStats creates a new RecoveryStats instance
func NewRecoveryStats() *RecoveryStats {
return &RecoveryStats{}
}
// FindWALFiles returns a list of WAL files in the given directory // FindWALFiles returns a list of WAL files in the given directory
func FindWALFiles(dir string) ([]string, error) { func FindWALFiles(dir string) ([]string, error) {
pattern := filepath.Join(dir, "*.wal") pattern := filepath.Join(dir, "*.wal")
@ -267,16 +278,15 @@ func getEntryCount(path string) int {
return count return count
} }
func ReplayWALFile(path string, handler EntryHandler) error { func ReplayWALFile(path string, handler EntryHandler) (*RecoveryStats, error) {
reader, err := OpenReader(path) reader, err := OpenReader(path)
if err != nil { if err != nil {
return err return nil, err
} }
defer reader.Close() defer reader.Close()
// Track statistics for reporting // Track statistics
entriesProcessed := 0 stats := NewRecoveryStats()
entriesSkipped := 0
for { for {
entry, err := reader.ReadEntry() entry, err := reader.ReadEntry()
@ -290,14 +300,11 @@ func ReplayWALFile(path string, handler EntryHandler) error {
if strings.Contains(err.Error(), "corrupt") || if strings.Contains(err.Error(), "corrupt") ||
strings.Contains(err.Error(), "invalid") { strings.Contains(err.Error(), "invalid") {
// Skip this corrupted entry // Skip this corrupted entry
if !DisableRecoveryLogs { stats.EntriesSkipped++
fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err)
}
entriesSkipped++
// If we've seen too many corrupted entries in a row, give up on this file // If we've seen too many corrupted entries in a row, give up on this file
if entriesSkipped > 5 && entriesProcessed == 0 { if stats.EntriesSkipped > 5 && stats.EntriesProcessed == 0 {
return fmt.Errorf("too many corrupted entries at start of file %s", path) return stats, fmt.Errorf("too many corrupted entries at start of file %s", path)
} }
// Try to recover by scanning ahead // Try to recover by scanning ahead
@ -310,7 +317,7 @@ func ReplayWALFile(path string, handler EntryHandler) error {
break break
} }
// Couldn't recover // Couldn't recover
return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr) return stats, fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
} }
// Successfully recovered, continue to the next entry // Successfully recovered, continue to the next entry
@ -318,23 +325,18 @@ func ReplayWALFile(path string, handler EntryHandler) error {
} }
// For other errors, fail the replay // For other errors, fail the replay
return fmt.Errorf("error reading entry from %s: %w", path, err) return stats, fmt.Errorf("error reading entry from %s: %w", path, err)
} }
// Process the entry // Process the entry
if err := handler(entry); err != nil { if err := handler(entry); err != nil {
return fmt.Errorf("error handling entry: %w", err) return stats, fmt.Errorf("error handling entry: %w", err)
} }
entriesProcessed++ stats.EntriesProcessed++
} }
if !DisableRecoveryLogs { return stats, nil
fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n",
entriesProcessed, path, entriesSkipped)
}
return nil
} }
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead // recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
@ -356,54 +358,58 @@ func recoverFromCorruption(reader *Reader) error {
} }
// ReplayWALDir replays all WAL files in the given directory in order // ReplayWALDir replays all WAL files in the given directory in order
func ReplayWALDir(dir string, handler EntryHandler) error { func ReplayWALDir(dir string, handler EntryHandler) (*RecoveryStats, error) {
files, err := FindWALFiles(dir) files, err := FindWALFiles(dir)
if err != nil { if err != nil {
return err return nil, err
} }
// Track overall recovery stats
totalStats := NewRecoveryStats()
// Track number of files processed successfully // Track number of files processed successfully
successfulFiles := 0 successfulFiles := 0
var lastErr error var lastErr error
// Try to process each file, but continue on recoverable errors // Try to process each file, but continue on recoverable errors
for _, file := range files { for _, file := range files {
err := ReplayWALFile(file, handler) fileStats, err := ReplayWALFile(file, handler)
if err != nil { if err != nil {
if !DisableRecoveryLogs {
fmt.Printf("Error processing WAL file %s: %v\n", file, err)
}
// Record the error, but continue // Record the error, but continue
lastErr = err lastErr = err
// If we got some stats from the file before the error, add them to our totals
if fileStats != nil {
totalStats.EntriesProcessed += fileStats.EntriesProcessed
totalStats.EntriesSkipped += fileStats.EntriesSkipped
}
// Check if this is a file-level error or just a corrupt record // Check if this is a file-level error or just a corrupt record
if !strings.Contains(err.Error(), "corrupt") && if !strings.Contains(err.Error(), "corrupt") &&
!strings.Contains(err.Error(), "invalid") { !strings.Contains(err.Error(), "invalid") {
return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err) return totalStats, fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
} }
// Continue to the next file for corrupt/invalid errors // Continue to the next file for corrupt/invalid errors
continue continue
} }
if !DisableRecoveryLogs { // Add stats from this file to our totals
fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n", totalStats.EntriesProcessed += fileStats.EntriesProcessed
getEntryCount(file), file) totalStats.EntriesSkipped += fileStats.EntriesSkipped
}
successfulFiles++ successfulFiles++
} }
// If we processed at least one file successfully, the WAL recovery is considered successful // If we processed at least one file successfully, the WAL recovery is considered successful
if successfulFiles > 0 { if successfulFiles > 0 {
return nil return totalStats, nil
} }
// If no files were processed successfully and we had errors, return the last error // If no files were processed successfully and we had errors, return the last error
if lastErr != nil { if lastErr != nil {
return fmt.Errorf("failed to process any WAL files: %w", lastErr) return totalStats, fmt.Errorf("failed to process any WAL files: %w", lastErr)
} }
return nil return totalStats, nil
} }

View File

@ -56,7 +56,7 @@ func TestWALWrite(t *testing.T) {
// Verify entries by replaying // Verify entries by replaying
entries := make(map[string]string) entries := make(map[string]string)
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut { if entry.Type == OpTypePut {
entries[string(entry.Key)] = string(entry.Value) entries[string(entry.Key)] = string(entry.Value)
} else if entry.Type == OpTypeDelete { } else if entry.Type == OpTypeDelete {
@ -115,7 +115,7 @@ func TestWALDelete(t *testing.T) {
// Verify entries by replaying // Verify entries by replaying
var deleted bool var deleted bool
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) { if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) {
if deleted { if deleted {
deleted = false // Key was re-added deleted = false // Key was re-added
@ -171,7 +171,7 @@ func TestWALLargeEntry(t *testing.T) {
// Verify by replaying // Verify by replaying
var foundLargeEntry bool var foundLargeEntry bool
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) { if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) {
// Verify key // Verify key
for i := range key { for i := range key {
@ -240,7 +240,7 @@ func TestWALBatch(t *testing.T) {
entries := make(map[string]string) entries := make(map[string]string)
batchCount := 0 batchCount := 0
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypeBatch { if entry.Type == OpTypeBatch {
batchCount++ batchCount++
@ -336,7 +336,7 @@ func TestWALRecovery(t *testing.T) {
// Verify entries by replaying all WAL files in order // Verify entries by replaying all WAL files in order
entries := make(map[string]string) entries := make(map[string]string)
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut { if entry.Type == OpTypePut {
entries[string(entry.Key)] = string(entry.Value) entries[string(entry.Key)] = string(entry.Value)
} else if entry.Type == OpTypeDelete { } else if entry.Type == OpTypeDelete {
@ -410,7 +410,7 @@ func TestWALSyncModes(t *testing.T) {
// Verify entries by replaying // Verify entries by replaying
count := 0 count := 0
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut { if entry.Type == OpTypePut {
count++ count++
} }
@ -471,7 +471,7 @@ func TestWALFragmentation(t *testing.T) {
var reconstructedValue []byte var reconstructedValue []byte
var foundPut bool var foundPut bool
err = ReplayWALDir(dir, func(entry *Entry) error { _, err = ReplayWALDir(dir, func(entry *Entry) error {
if entry.Type == OpTypePut { if entry.Type == OpTypePut {
foundPut = true foundPut = true
reconstructedKey = entry.Key reconstructedKey = entry.Key
@ -580,7 +580,7 @@ func TestWALErrorHandling(t *testing.T) {
// Try to replay a non-existent file // Try to replay a non-existent file
nonExistentPath := filepath.Join(dir, "nonexistent.wal") nonExistentPath := filepath.Join(dir, "nonexistent.wal")
err = ReplayWALFile(nonExistentPath, func(entry *Entry) error { _, err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
return nil return nil
}) })