Compare commits
3 Commits
5a926633bb
...
a0a1c0512f
Author | SHA1 | Date | |
---|---|---|---|
a0a1c0512f | |||
e7974e008d | |||
![]() |
3b3d1c27a4 |
141
cmd/kevo/main.go
141
cmd/kevo/main.go
@ -12,6 +12,7 @@ import (
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/chzyer/readline"
|
||||
|
||||
@ -80,14 +81,14 @@ Commands (interactive mode only):
|
||||
|
||||
// Config holds the application configuration
|
||||
type Config struct {
|
||||
ServerMode bool
|
||||
DaemonMode bool
|
||||
ListenAddr string
|
||||
DBPath string
|
||||
TLSEnabled bool
|
||||
TLSCertFile string
|
||||
TLSKeyFile string
|
||||
TLSCAFile string
|
||||
ServerMode bool
|
||||
DaemonMode bool
|
||||
ListenAddr string
|
||||
DBPath string
|
||||
TLSEnabled bool
|
||||
TLSCertFile string
|
||||
TLSKeyFile string
|
||||
TLSCAFile string
|
||||
}
|
||||
|
||||
func main() {
|
||||
@ -167,14 +168,14 @@ func parseFlags() Config {
|
||||
}
|
||||
|
||||
return Config{
|
||||
ServerMode: *serverMode,
|
||||
DaemonMode: *daemonMode,
|
||||
ListenAddr: *listenAddr,
|
||||
DBPath: dbPath,
|
||||
TLSEnabled: *tlsEnabled,
|
||||
TLSCertFile: *tlsCertFile,
|
||||
TLSKeyFile: *tlsKeyFile,
|
||||
TLSCAFile: *tlsCAFile,
|
||||
ServerMode: *serverMode,
|
||||
DaemonMode: *daemonMode,
|
||||
ListenAddr: *listenAddr,
|
||||
DBPath: dbPath,
|
||||
TLSEnabled: *tlsEnabled,
|
||||
TLSCertFile: *tlsCertFile,
|
||||
TLSKeyFile: *tlsKeyFile,
|
||||
TLSCAFile: *tlsCAFile,
|
||||
}
|
||||
}
|
||||
|
||||
@ -411,15 +412,89 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
|
||||
// Print statistics
|
||||
stats := eng.GetStats()
|
||||
fmt.Println("Database Statistics:")
|
||||
fmt.Printf(" Operations: %d puts, %d gets (%d hits, %d misses), %d deletes\n",
|
||||
stats["put_ops"], stats["get_ops"], stats["get_hits"], stats["get_misses"], stats["delete_ops"])
|
||||
fmt.Printf(" Transactions: %d started, %d committed, %d aborted\n",
|
||||
stats["tx_started"], stats["tx_completed"], stats["tx_aborted"])
|
||||
fmt.Printf(" Storage: %d bytes read, %d bytes written, %d flushes\n",
|
||||
stats["total_bytes_read"], stats["total_bytes_written"], stats["flush_count"])
|
||||
fmt.Printf(" Tables: %d sstables, %d immutable memtables\n",
|
||||
stats["sstable_count"], stats["immutable_memtable_count"])
|
||||
|
||||
// Format human-readable time for the last operation timestamps
|
||||
var lastPutTime, lastGetTime, lastDeleteTime time.Time
|
||||
if putTime, ok := stats["last_put_time"].(int64); ok && putTime > 0 {
|
||||
lastPutTime = time.Unix(0, putTime)
|
||||
}
|
||||
if getTime, ok := stats["last_get_time"].(int64); ok && getTime > 0 {
|
||||
lastGetTime = time.Unix(0, getTime)
|
||||
}
|
||||
if deleteTime, ok := stats["last_delete_time"].(int64); ok && deleteTime > 0 {
|
||||
lastDeleteTime = time.Unix(0, deleteTime)
|
||||
}
|
||||
|
||||
// Operations section
|
||||
fmt.Println("📊 Operations:")
|
||||
fmt.Printf(" • Puts: %d\n", stats["put_ops"])
|
||||
fmt.Printf(" • Gets: %d (Hits: %d, Misses: %d)\n", stats["get_ops"], stats["get_hits"], stats["get_misses"])
|
||||
fmt.Printf(" • Deletes: %d\n", stats["delete_ops"])
|
||||
|
||||
// Last Operation Times
|
||||
fmt.Println("\n⏱️ Last Operation Times:")
|
||||
if !lastPutTime.IsZero() {
|
||||
fmt.Printf(" • Last Put: %s\n", lastPutTime.Format(time.RFC3339))
|
||||
} else {
|
||||
fmt.Printf(" • Last Put: Never\n")
|
||||
}
|
||||
if !lastGetTime.IsZero() {
|
||||
fmt.Printf(" • Last Get: %s\n", lastGetTime.Format(time.RFC3339))
|
||||
} else {
|
||||
fmt.Printf(" • Last Get: Never\n")
|
||||
}
|
||||
if !lastDeleteTime.IsZero() {
|
||||
fmt.Printf(" • Last Delete: %s\n", lastDeleteTime.Format(time.RFC3339))
|
||||
} else {
|
||||
fmt.Printf(" • Last Delete: Never\n")
|
||||
}
|
||||
|
||||
// Transactions
|
||||
fmt.Println("\n💼 Transactions:")
|
||||
fmt.Printf(" • Started: %d\n", stats["tx_started"])
|
||||
fmt.Printf(" • Completed: %d\n", stats["tx_completed"])
|
||||
fmt.Printf(" • Aborted: %d\n", stats["tx_aborted"])
|
||||
|
||||
// Storage metrics
|
||||
fmt.Println("\n💾 Storage:")
|
||||
fmt.Printf(" • Total Bytes Read: %d\n", stats["total_bytes_read"])
|
||||
fmt.Printf(" • Total Bytes Written: %d\n", stats["total_bytes_written"])
|
||||
fmt.Printf(" • Flush Count: %d\n", stats["flush_count"])
|
||||
|
||||
// Table stats
|
||||
fmt.Println("\n📋 Tables:")
|
||||
fmt.Printf(" • SSTable Count: %d\n", stats["sstable_count"])
|
||||
fmt.Printf(" • Immutable MemTable Count: %d\n", stats["immutable_memtable_count"])
|
||||
fmt.Printf(" • Current MemTable Size: %d bytes\n", stats["memtable_size"])
|
||||
|
||||
// WAL recovery stats
|
||||
fmt.Println("\n🔄 WAL Recovery:")
|
||||
fmt.Printf(" • Files Recovered: %d\n", stats["wal_files_recovered"])
|
||||
fmt.Printf(" • Entries Recovered: %d\n", stats["wal_entries_recovered"])
|
||||
fmt.Printf(" • Corrupted Entries: %d\n", stats["wal_corrupted_entries"])
|
||||
if recoveryDuration, ok := stats["wal_recovery_duration_ms"]; ok {
|
||||
fmt.Printf(" • Recovery Duration: %d ms\n", recoveryDuration)
|
||||
}
|
||||
|
||||
// Error counts
|
||||
fmt.Println("\n⚠️ Errors:")
|
||||
fmt.Printf(" • Read Errors: %d\n", stats["read_errors"])
|
||||
fmt.Printf(" • Write Errors: %d\n", stats["write_errors"])
|
||||
|
||||
// Compaction stats (if available)
|
||||
if compactionOutputCount, ok := stats["compaction_last_outputs_count"]; ok {
|
||||
fmt.Println("\n🧹 Compaction:")
|
||||
fmt.Printf(" • Last Output Files Count: %d\n", compactionOutputCount)
|
||||
|
||||
// Display other compaction stats as available
|
||||
for key, value := range stats {
|
||||
if strings.HasPrefix(key, "compaction_") && key != "compaction_last_outputs_count" && key != "compaction_last_outputs" {
|
||||
// Format the key for display (remove prefix, replace underscores with spaces)
|
||||
displayKey := toTitle(strings.Replace(strings.TrimPrefix(key, "compaction_"), "_", " ", -1))
|
||||
fmt.Printf(" • %s: %v\n", displayKey, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case ".flush":
|
||||
if eng == nil {
|
||||
@ -735,3 +810,19 @@ func makeKeySuccessor(prefix []byte) []byte {
|
||||
successor[len(prefix)] = 0xFF
|
||||
return successor
|
||||
}
|
||||
|
||||
// toTitle replaces strings.Title which is deprecated
|
||||
// It converts the first character of each word to title case
|
||||
func toTitle(s string) string {
|
||||
prev := ' '
|
||||
return strings.Map(
|
||||
func(r rune) rune {
|
||||
if unicode.IsSpace(prev) || unicode.IsPunct(prev) {
|
||||
prev = r
|
||||
return unicode.ToTitle(r)
|
||||
}
|
||||
prev = r
|
||||
return r
|
||||
},
|
||||
s)
|
||||
}
|
||||
|
@ -154,12 +154,12 @@ func (tr *TransactionRegistry) GracefulShutdown(ctx context.Context) error {
|
||||
|
||||
// Server represents the Kevo server
|
||||
type Server struct {
|
||||
eng *engine.Engine
|
||||
txRegistry *TransactionRegistry
|
||||
listener net.Listener
|
||||
grpcServer *grpc.Server
|
||||
eng *engine.Engine
|
||||
txRegistry *TransactionRegistry
|
||||
listener net.Listener
|
||||
grpcServer *grpc.Server
|
||||
kevoService *grpcservice.KevoServiceServer
|
||||
config Config
|
||||
config Config
|
||||
}
|
||||
|
||||
// NewServer creates a new server instance
|
||||
|
6
go.mod
6
go.mod
@ -10,8 +10,8 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
golang.org/x/net v0.35.0 // indirect
|
||||
golang.org/x/sys v0.30.0 // indirect
|
||||
golang.org/x/text v0.22.0 // indirect
|
||||
golang.org/x/net v0.38.0 // indirect
|
||||
golang.org/x/sys v0.31.0 // indirect
|
||||
golang.org/x/text v0.23.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
|
||||
)
|
||||
|
12
go.sum
12
go.sum
@ -28,13 +28,13 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce
|
||||
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
|
||||
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
||||
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
|
||||
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
|
||||
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
|
||||
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
||||
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
|
||||
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
|
||||
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
|
||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
|
||||
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
|
||||
|
@ -23,11 +23,11 @@ const (
|
||||
// ClientOptions configures a Kevo client
|
||||
type ClientOptions struct {
|
||||
// Connection options
|
||||
Endpoint string // Server address
|
||||
ConnectTimeout time.Duration // Timeout for connection attempts
|
||||
RequestTimeout time.Duration // Default timeout for requests
|
||||
TransportType string // Transport type (e.g. "grpc")
|
||||
PoolSize int // Connection pool size
|
||||
Endpoint string // Server address
|
||||
ConnectTimeout time.Duration // Timeout for connection attempts
|
||||
RequestTimeout time.Duration // Default timeout for requests
|
||||
TransportType string // Transport type (e.g. "grpc")
|
||||
PoolSize int // Connection pool size
|
||||
|
||||
// Security options
|
||||
TLSEnabled bool // Enable TLS
|
||||
@ -50,19 +50,19 @@ type ClientOptions struct {
|
||||
// DefaultClientOptions returns sensible default client options
|
||||
func DefaultClientOptions() ClientOptions {
|
||||
return ClientOptions{
|
||||
Endpoint: "localhost:50051",
|
||||
ConnectTimeout: time.Second * 5,
|
||||
RequestTimeout: time.Second * 10,
|
||||
TransportType: "grpc",
|
||||
PoolSize: 5,
|
||||
TLSEnabled: false,
|
||||
MaxRetries: 3,
|
||||
InitialBackoff: time.Millisecond * 100,
|
||||
MaxBackoff: time.Second * 2,
|
||||
BackoffFactor: 1.5,
|
||||
RetryJitter: 0.2,
|
||||
Compression: CompressionNone,
|
||||
MaxMessageSize: 16 * 1024 * 1024, // 16MB
|
||||
Endpoint: "localhost:50051",
|
||||
ConnectTimeout: time.Second * 5,
|
||||
RequestTimeout: time.Second * 10,
|
||||
TransportType: "grpc",
|
||||
PoolSize: 5,
|
||||
TLSEnabled: false,
|
||||
MaxRetries: 3,
|
||||
InitialBackoff: time.Millisecond * 100,
|
||||
MaxBackoff: time.Second * 2,
|
||||
BackoffFactor: 1.5,
|
||||
RetryJitter: 0.2,
|
||||
Compression: CompressionNone,
|
||||
MaxMessageSize: 16 * 1024 * 1024, // 16MB
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,11 +12,11 @@ import (
|
||||
|
||||
// Transaction represents a database transaction
|
||||
type Transaction struct {
|
||||
client *Client
|
||||
id string
|
||||
readOnly bool
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
client *Client
|
||||
id string
|
||||
readOnly bool
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// ErrTransactionClosed is returned when attempting to use a closed transaction
|
||||
|
@ -134,7 +134,7 @@ func (h *HierarchicalIterator) Seek(target []byte) bool {
|
||||
// If a newer iterator has the same key, use its value
|
||||
if bytes.Equal(iter.Key(), bestKey) {
|
||||
bestValue = iter.Value()
|
||||
break // Since iterators are in newest-to-oldest order, we can stop at the first match
|
||||
break // Since iterators are in newest-to-oldest order, we can stop at the first match
|
||||
}
|
||||
}
|
||||
|
||||
@ -275,7 +275,7 @@ func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
|
||||
// If a newer iterator has the same key, use its value
|
||||
if bytes.Equal(iter.Key(), bestKey) {
|
||||
bestValue = iter.Value()
|
||||
break // Since iterators are in newest-to-oldest order, we can stop at the first match
|
||||
break // Since iterators are in newest-to-oldest order, we can stop at the first match
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,12 @@ type EngineStats struct {
|
||||
TxCompleted atomic.Uint64
|
||||
TxAborted atomic.Uint64
|
||||
|
||||
// Recovery stats
|
||||
WALFilesRecovered atomic.Uint64
|
||||
WALEntriesRecovered atomic.Uint64
|
||||
WALCorruptedEntries atomic.Uint64
|
||||
WALRecoveryDuration atomic.Int64 // nanoseconds
|
||||
|
||||
// Mutex for accessing non-atomic fields
|
||||
mu sync.RWMutex
|
||||
}
|
||||
@ -666,21 +672,22 @@ func (e *Engine) loadSSTables() error {
|
||||
|
||||
// recoverFromWAL recovers memtables from existing WAL files
|
||||
func (e *Engine) recoverFromWAL() error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Check if WAL directory exists
|
||||
if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
|
||||
return nil // No WAL directory, nothing to recover
|
||||
}
|
||||
|
||||
// List all WAL files for diagnostic purposes
|
||||
// List all WAL files
|
||||
walFiles, err := wal.FindWALFiles(e.walDir)
|
||||
if err != nil {
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Error listing WAL files: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles)
|
||||
}
|
||||
e.stats.ReadErrors.Add(1)
|
||||
return fmt.Errorf("error listing WAL files: %w", err)
|
||||
}
|
||||
|
||||
if len(walFiles) > 0 {
|
||||
e.stats.WALFilesRecovered.Add(uint64(len(walFiles)))
|
||||
}
|
||||
|
||||
// Get recovery options
|
||||
@ -690,17 +697,11 @@ func (e *Engine) recoverFromWAL() error {
|
||||
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
|
||||
if err != nil {
|
||||
// If recovery fails, let's try cleaning up WAL files
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("WAL recovery failed: %v\n", err)
|
||||
fmt.Printf("Attempting to recover by cleaning up WAL files...\n")
|
||||
}
|
||||
e.stats.ReadErrors.Add(1)
|
||||
|
||||
// Create a backup directory
|
||||
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
|
||||
if err := os.MkdirAll(backupDir, 0755); err != nil {
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Failed to create backup directory: %v\n", err)
|
||||
}
|
||||
return fmt.Errorf("failed to recover from WAL: %w", err)
|
||||
}
|
||||
|
||||
@ -708,11 +709,7 @@ func (e *Engine) recoverFromWAL() error {
|
||||
for _, walFile := range walFiles {
|
||||
destFile := filepath.Join(backupDir, filepath.Base(walFile))
|
||||
if err := os.Rename(walFile, destFile); err != nil {
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err)
|
||||
}
|
||||
} else if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Moved problematic WAL file to %s\n", destFile)
|
||||
e.stats.ReadErrors.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
@ -723,15 +720,28 @@ func (e *Engine) recoverFromWAL() error {
|
||||
}
|
||||
e.wal = newWal
|
||||
|
||||
// No memtables to recover, starting fresh
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Starting with a fresh WAL after recovery failure\n")
|
||||
}
|
||||
// Record recovery duration
|
||||
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update recovery statistics based on actual entries recovered
|
||||
if len(walFiles) > 0 {
|
||||
// Use WALDir function directly to get stats
|
||||
recoveryStats, statErr := wal.ReplayWALDir(e.cfg.WALDir, func(entry *wal.Entry) error {
|
||||
return nil // Just counting, not processing
|
||||
})
|
||||
|
||||
if statErr == nil && recoveryStats != nil {
|
||||
e.stats.WALEntriesRecovered.Add(recoveryStats.EntriesProcessed)
|
||||
e.stats.WALCorruptedEntries.Add(recoveryStats.EntriesSkipped)
|
||||
}
|
||||
}
|
||||
|
||||
// No memtables recovered or empty WAL
|
||||
if len(memTables) == 0 {
|
||||
// Record recovery duration
|
||||
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -755,10 +765,9 @@ func (e *Engine) recoverFromWAL() error {
|
||||
}
|
||||
}
|
||||
|
||||
if !wal.DisableRecoveryLogs {
|
||||
fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n",
|
||||
len(memTables), maxSeqNum)
|
||||
}
|
||||
// Record recovery stats
|
||||
e.stats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -925,6 +934,15 @@ func (e *Engine) GetStats() map[string]interface{} {
|
||||
stats["read_errors"] = e.stats.ReadErrors.Load()
|
||||
stats["write_errors"] = e.stats.WriteErrors.Load()
|
||||
|
||||
// Add WAL recovery statistics
|
||||
stats["wal_files_recovered"] = e.stats.WALFilesRecovered.Load()
|
||||
stats["wal_entries_recovered"] = e.stats.WALEntriesRecovered.Load()
|
||||
stats["wal_corrupted_entries"] = e.stats.WALCorruptedEntries.Load()
|
||||
recoveryDuration := e.stats.WALRecoveryDuration.Load()
|
||||
if recoveryDuration > 0 {
|
||||
stats["wal_recovery_duration_ms"] = recoveryDuration / int64(time.Millisecond)
|
||||
}
|
||||
|
||||
// Add timing information
|
||||
e.stats.mu.RLock()
|
||||
defer e.stats.mu.RUnlock()
|
||||
|
@ -226,7 +226,7 @@ func (pi *prefixIterator) Next() bool {
|
||||
// Check if current key has the prefix
|
||||
key := pi.iter.Key()
|
||||
if len(key) >= len(pi.prefix) &&
|
||||
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
|
||||
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"github.com/KevoDB/kevo/pkg/transport"
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
@ -19,13 +19,13 @@ import (
|
||||
|
||||
// GRPCClient implements the transport.Client interface for gRPC
|
||||
type GRPCClient struct {
|
||||
endpoint string
|
||||
options transport.TransportOptions
|
||||
conn *grpc.ClientConn
|
||||
client pb.KevoServiceClient
|
||||
status transport.TransportStatus
|
||||
statusMu sync.RWMutex
|
||||
metrics transport.MetricsCollector
|
||||
endpoint string
|
||||
options transport.TransportOptions
|
||||
conn *grpc.ClientConn
|
||||
client pb.KevoServiceClient
|
||||
status transport.TransportStatus
|
||||
statusMu sync.RWMutex
|
||||
metrics transport.MetricsCollector
|
||||
}
|
||||
|
||||
// NewGRPCClient creates a new gRPC client
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"github.com/KevoDB/kevo/pkg/transport"
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"github.com/KevoDB/kevo/pkg/transport"
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -16,13 +16,13 @@ import (
|
||||
|
||||
// GRPCServer implements the transport.Server interface for gRPC
|
||||
type GRPCServer struct {
|
||||
address string
|
||||
tlsConfig *tls.Config
|
||||
server *grpc.Server
|
||||
address string
|
||||
tlsConfig *tls.Config
|
||||
server *grpc.Server
|
||||
requestHandler transport.RequestHandler
|
||||
started bool
|
||||
mu sync.Mutex
|
||||
metrics *transport.ExtendedMetricsCollector
|
||||
started bool
|
||||
mu sync.Mutex
|
||||
metrics *transport.ExtendedMetricsCollector
|
||||
}
|
||||
|
||||
// NewGRPCServer creates a new gRPC server
|
||||
|
@ -5,8 +5,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"github.com/KevoDB/kevo/pkg/transport"
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -31,7 +31,7 @@ func DefaultRecoveryOptions(cfg *config.Config) *RecoveryOptions {
|
||||
}
|
||||
|
||||
// RecoverFromWAL rebuilds MemTables from the write-ahead log
|
||||
// Returns a list of recovered MemTables and the maximum sequence number seen
|
||||
// Returns a list of recovered MemTables, the maximum sequence number seen, and stats
|
||||
func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uint64, error) {
|
||||
if opts == nil {
|
||||
opts = DefaultRecoveryOptions(cfg)
|
||||
@ -76,10 +76,13 @@ func RecoverFromWAL(cfg *config.Config, opts *RecoveryOptions) ([]*MemTable, uin
|
||||
}
|
||||
|
||||
// Replay the WAL directory
|
||||
if err := wal.ReplayWALDir(cfg.WALDir, entryHandler); err != nil {
|
||||
_, err := wal.ReplayWALDir(cfg.WALDir, entryHandler)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to replay WAL: %w", err)
|
||||
}
|
||||
|
||||
// Stats will be captured in the engine directly
|
||||
|
||||
// For batch operations, we need to adjust maxSeqNum
|
||||
finalTable := memTables[len(memTables)-1]
|
||||
nextSeq := finalTable.GetNextSequenceNumber()
|
||||
|
@ -6,21 +6,21 @@ import (
|
||||
|
||||
// Standard request/response type constants
|
||||
const (
|
||||
TypeGet = "get"
|
||||
TypePut = "put"
|
||||
TypeDelete = "delete"
|
||||
TypeBatchWrite = "batch_write"
|
||||
TypeScan = "scan"
|
||||
TypeBeginTx = "begin_tx"
|
||||
TypeCommitTx = "commit_tx"
|
||||
TypeRollbackTx = "rollback_tx"
|
||||
TypeTxGet = "tx_get"
|
||||
TypeTxPut = "tx_put"
|
||||
TypeTxDelete = "tx_delete"
|
||||
TypeTxScan = "tx_scan"
|
||||
TypeGetStats = "get_stats"
|
||||
TypeCompact = "compact"
|
||||
TypeError = "error"
|
||||
TypeGet = "get"
|
||||
TypePut = "put"
|
||||
TypeDelete = "delete"
|
||||
TypeBatchWrite = "batch_write"
|
||||
TypeScan = "scan"
|
||||
TypeBeginTx = "begin_tx"
|
||||
TypeCommitTx = "commit_tx"
|
||||
TypeRollbackTx = "rollback_tx"
|
||||
TypeTxGet = "tx_get"
|
||||
TypeTxPut = "tx_put"
|
||||
TypeTxDelete = "tx_delete"
|
||||
TypeTxScan = "tx_scan"
|
||||
TypeGetStats = "get_stats"
|
||||
TypeCompact = "compact"
|
||||
TypeError = "error"
|
||||
)
|
||||
|
||||
// Common errors
|
||||
|
@ -82,7 +82,7 @@ func (c *BasicMetricsCollector) RecordRequest(requestType string, startTime time
|
||||
if exists {
|
||||
// Update running average - the common case for better branch prediction
|
||||
// new_avg = (old_avg * count + new_value) / (count + 1)
|
||||
totalDuration := currentAvg * time.Duration(currentCount) + latency
|
||||
totalDuration := currentAvg*time.Duration(currentCount) + latency
|
||||
newCount := currentCount + 1
|
||||
c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount)
|
||||
c.requestCountByType[requestType] = newCount
|
||||
|
@ -9,9 +9,9 @@ import (
|
||||
// Metrics struct extensions for server metrics
|
||||
type ServerMetrics struct {
|
||||
Metrics
|
||||
ServerStarted uint64
|
||||
ServerErrored uint64
|
||||
ServerStopped uint64
|
||||
ServerStarted uint64
|
||||
ServerErrored uint64
|
||||
ServerStopped uint64
|
||||
}
|
||||
|
||||
// Connection represents a connection to a remote endpoint
|
||||
@ -31,11 +31,11 @@ type Connection interface {
|
||||
|
||||
// ConnectionStatus represents the status of a connection
|
||||
type ConnectionStatus struct {
|
||||
Connected bool
|
||||
LastActivity time.Time
|
||||
ErrorCount int
|
||||
RequestCount int
|
||||
LatencyAvg time.Duration
|
||||
Connected bool
|
||||
LastActivity time.Time
|
||||
ErrorCount int
|
||||
RequestCount int
|
||||
LatencyAvg time.Duration
|
||||
}
|
||||
|
||||
// TransportManager is an interface for managing transport layer operations
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
|
||||
// registry implements the Registry interface
|
||||
type registry struct {
|
||||
mu sync.RWMutex
|
||||
mu sync.RWMutex
|
||||
clientFactories map[string]ClientFactory
|
||||
serverFactories map[string]ServerFactory
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ func TestBatchEncoding(t *testing.T) {
|
||||
// Replay and decode
|
||||
var decodedBatch *Batch
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypeBatch {
|
||||
var err error
|
||||
decodedBatch, err = DecodeBatch(entry)
|
||||
|
@ -229,6 +229,17 @@ func (r *Reader) Close() error {
|
||||
// EntryHandler is a function that processes WAL entries during replay
|
||||
type EntryHandler func(*Entry) error
|
||||
|
||||
// RecoveryStats tracks statistics about WAL recovery
|
||||
type RecoveryStats struct {
|
||||
EntriesProcessed uint64
|
||||
EntriesSkipped uint64
|
||||
}
|
||||
|
||||
// NewRecoveryStats creates a new RecoveryStats instance
|
||||
func NewRecoveryStats() *RecoveryStats {
|
||||
return &RecoveryStats{}
|
||||
}
|
||||
|
||||
// FindWALFiles returns a list of WAL files in the given directory
|
||||
func FindWALFiles(dir string) ([]string, error) {
|
||||
pattern := filepath.Join(dir, "*.wal")
|
||||
@ -267,16 +278,15 @@ func getEntryCount(path string) int {
|
||||
return count
|
||||
}
|
||||
|
||||
func ReplayWALFile(path string, handler EntryHandler) error {
|
||||
func ReplayWALFile(path string, handler EntryHandler) (*RecoveryStats, error) {
|
||||
reader, err := OpenReader(path)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// Track statistics for reporting
|
||||
entriesProcessed := 0
|
||||
entriesSkipped := 0
|
||||
// Track statistics
|
||||
stats := NewRecoveryStats()
|
||||
|
||||
for {
|
||||
entry, err := reader.ReadEntry()
|
||||
@ -290,14 +300,11 @@ func ReplayWALFile(path string, handler EntryHandler) error {
|
||||
if strings.Contains(err.Error(), "corrupt") ||
|
||||
strings.Contains(err.Error(), "invalid") {
|
||||
// Skip this corrupted entry
|
||||
if !DisableRecoveryLogs {
|
||||
fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err)
|
||||
}
|
||||
entriesSkipped++
|
||||
stats.EntriesSkipped++
|
||||
|
||||
// If we've seen too many corrupted entries in a row, give up on this file
|
||||
if entriesSkipped > 5 && entriesProcessed == 0 {
|
||||
return fmt.Errorf("too many corrupted entries at start of file %s", path)
|
||||
if stats.EntriesSkipped > 5 && stats.EntriesProcessed == 0 {
|
||||
return stats, fmt.Errorf("too many corrupted entries at start of file %s", path)
|
||||
}
|
||||
|
||||
// Try to recover by scanning ahead
|
||||
@ -310,7 +317,7 @@ func ReplayWALFile(path string, handler EntryHandler) error {
|
||||
break
|
||||
}
|
||||
// Couldn't recover
|
||||
return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
|
||||
return stats, fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
|
||||
}
|
||||
|
||||
// Successfully recovered, continue to the next entry
|
||||
@ -318,23 +325,18 @@ func ReplayWALFile(path string, handler EntryHandler) error {
|
||||
}
|
||||
|
||||
// For other errors, fail the replay
|
||||
return fmt.Errorf("error reading entry from %s: %w", path, err)
|
||||
return stats, fmt.Errorf("error reading entry from %s: %w", path, err)
|
||||
}
|
||||
|
||||
// Process the entry
|
||||
if err := handler(entry); err != nil {
|
||||
return fmt.Errorf("error handling entry: %w", err)
|
||||
return stats, fmt.Errorf("error handling entry: %w", err)
|
||||
}
|
||||
|
||||
entriesProcessed++
|
||||
stats.EntriesProcessed++
|
||||
}
|
||||
|
||||
if !DisableRecoveryLogs {
|
||||
fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n",
|
||||
entriesProcessed, path, entriesSkipped)
|
||||
}
|
||||
|
||||
return nil
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
|
||||
@ -356,54 +358,58 @@ func recoverFromCorruption(reader *Reader) error {
|
||||
}
|
||||
|
||||
// ReplayWALDir replays all WAL files in the given directory in order
|
||||
func ReplayWALDir(dir string, handler EntryHandler) error {
|
||||
func ReplayWALDir(dir string, handler EntryHandler) (*RecoveryStats, error) {
|
||||
files, err := FindWALFiles(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Track overall recovery stats
|
||||
totalStats := NewRecoveryStats()
|
||||
|
||||
// Track number of files processed successfully
|
||||
successfulFiles := 0
|
||||
var lastErr error
|
||||
|
||||
// Try to process each file, but continue on recoverable errors
|
||||
for _, file := range files {
|
||||
err := ReplayWALFile(file, handler)
|
||||
fileStats, err := ReplayWALFile(file, handler)
|
||||
if err != nil {
|
||||
if !DisableRecoveryLogs {
|
||||
fmt.Printf("Error processing WAL file %s: %v\n", file, err)
|
||||
}
|
||||
|
||||
// Record the error, but continue
|
||||
lastErr = err
|
||||
|
||||
// If we got some stats from the file before the error, add them to our totals
|
||||
if fileStats != nil {
|
||||
totalStats.EntriesProcessed += fileStats.EntriesProcessed
|
||||
totalStats.EntriesSkipped += fileStats.EntriesSkipped
|
||||
}
|
||||
|
||||
// Check if this is a file-level error or just a corrupt record
|
||||
if !strings.Contains(err.Error(), "corrupt") &&
|
||||
!strings.Contains(err.Error(), "invalid") {
|
||||
return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
|
||||
return totalStats, fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
|
||||
}
|
||||
|
||||
// Continue to the next file for corrupt/invalid errors
|
||||
continue
|
||||
}
|
||||
|
||||
if !DisableRecoveryLogs {
|
||||
fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n",
|
||||
getEntryCount(file), file)
|
||||
}
|
||||
// Add stats from this file to our totals
|
||||
totalStats.EntriesProcessed += fileStats.EntriesProcessed
|
||||
totalStats.EntriesSkipped += fileStats.EntriesSkipped
|
||||
|
||||
successfulFiles++
|
||||
}
|
||||
|
||||
// If we processed at least one file successfully, the WAL recovery is considered successful
|
||||
if successfulFiles > 0 {
|
||||
return nil
|
||||
return totalStats, nil
|
||||
}
|
||||
|
||||
// If no files were processed successfully and we had errors, return the last error
|
||||
if lastErr != nil {
|
||||
return fmt.Errorf("failed to process any WAL files: %w", lastErr)
|
||||
return totalStats, fmt.Errorf("failed to process any WAL files: %w", lastErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
return totalStats, nil
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ func TestWALWrite(t *testing.T) {
|
||||
// Verify entries by replaying
|
||||
entries := make(map[string]string)
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypePut {
|
||||
entries[string(entry.Key)] = string(entry.Value)
|
||||
} else if entry.Type == OpTypeDelete {
|
||||
@ -115,7 +115,7 @@ func TestWALDelete(t *testing.T) {
|
||||
// Verify entries by replaying
|
||||
var deleted bool
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypePut && bytes.Equal(entry.Key, key) {
|
||||
if deleted {
|
||||
deleted = false // Key was re-added
|
||||
@ -171,7 +171,7 @@ func TestWALLargeEntry(t *testing.T) {
|
||||
// Verify by replaying
|
||||
var foundLargeEntry bool
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypePut && len(entry.Key) == len(key) && len(entry.Value) == len(value) {
|
||||
// Verify key
|
||||
for i := range key {
|
||||
@ -240,7 +240,7 @@ func TestWALBatch(t *testing.T) {
|
||||
entries := make(map[string]string)
|
||||
batchCount := 0
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypeBatch {
|
||||
batchCount++
|
||||
|
||||
@ -336,7 +336,7 @@ func TestWALRecovery(t *testing.T) {
|
||||
// Verify entries by replaying all WAL files in order
|
||||
entries := make(map[string]string)
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypePut {
|
||||
entries[string(entry.Key)] = string(entry.Value)
|
||||
} else if entry.Type == OpTypeDelete {
|
||||
@ -410,7 +410,7 @@ func TestWALSyncModes(t *testing.T) {
|
||||
|
||||
// Verify entries by replaying
|
||||
count := 0
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypePut {
|
||||
count++
|
||||
}
|
||||
@ -471,7 +471,7 @@ func TestWALFragmentation(t *testing.T) {
|
||||
var reconstructedValue []byte
|
||||
var foundPut bool
|
||||
|
||||
err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
_, err = ReplayWALDir(dir, func(entry *Entry) error {
|
||||
if entry.Type == OpTypePut {
|
||||
foundPut = true
|
||||
reconstructedKey = entry.Key
|
||||
@ -580,7 +580,7 @@ func TestWALErrorHandling(t *testing.T) {
|
||||
|
||||
// Try to replay a non-existent file
|
||||
nonExistentPath := filepath.Join(dir, "nonexistent.wal")
|
||||
err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
|
||||
_, err = ReplayWALFile(nonExistentPath, func(entry *Entry) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user