Compare commits

..

1 Commits

Author SHA1 Message Date
5a926633bb
feat: enhance wal recover statistics
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m48s
2025-04-22 13:25:26 -06:00
35 changed files with 449 additions and 449 deletions

View File

@ -81,14 +81,14 @@ Commands (interactive mode only):
// Config holds the application configuration
type Config struct {
ServerMode bool
DaemonMode bool
ListenAddr string
DBPath string
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
TLSCAFile string
ServerMode bool
DaemonMode bool
ListenAddr string
DBPath string
TLSEnabled bool
TLSCertFile string
TLSKeyFile string
TLSCAFile string
}
func main() {
@ -168,14 +168,14 @@ func parseFlags() Config {
}
return Config{
ServerMode: *serverMode,
DaemonMode: *daemonMode,
ListenAddr: *listenAddr,
DBPath: dbPath,
TLSEnabled: *tlsEnabled,
TLSCertFile: *tlsCertFile,
TLSKeyFile: *tlsKeyFile,
TLSCAFile: *tlsCAFile,
ServerMode: *serverMode,
DaemonMode: *daemonMode,
ListenAddr: *listenAddr,
DBPath: dbPath,
TLSEnabled: *tlsEnabled,
TLSCertFile: *tlsCertFile,
TLSKeyFile: *tlsKeyFile,
TLSCAFile: *tlsCAFile,
}
}

View File

@ -154,12 +154,12 @@ func (tr *TransactionRegistry) GracefulShutdown(ctx context.Context) error {
// Server represents the Kevo server
type Server struct {
eng *engine.Engine
txRegistry *TransactionRegistry
listener net.Listener
grpcServer *grpc.Server
eng *engine.Engine
txRegistry *TransactionRegistry
listener net.Listener
grpcServer *grpc.Server
kevoService *grpcservice.KevoServiceServer
config Config
config Config
}
// NewServer creates a new server instance

6
go.mod
View File

@ -10,8 +10,8 @@ require (
)
require (
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
)

12
go.sum
View File

@ -28,13 +28,13 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=

View File

@ -23,11 +23,11 @@ const (
// ClientOptions configures a Kevo client
type ClientOptions struct {
// Connection options
Endpoint string // Server address
ConnectTimeout time.Duration // Timeout for connection attempts
RequestTimeout time.Duration // Default timeout for requests
TransportType string // Transport type (e.g. "grpc")
PoolSize int // Connection pool size
Endpoint string // Server address
ConnectTimeout time.Duration // Timeout for connection attempts
RequestTimeout time.Duration // Default timeout for requests
TransportType string // Transport type (e.g. "grpc")
PoolSize int // Connection pool size
// Security options
TLSEnabled bool // Enable TLS
@ -50,19 +50,19 @@ type ClientOptions struct {
// DefaultClientOptions returns sensible default client options
func DefaultClientOptions() ClientOptions {
return ClientOptions{
Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10,
TransportType: "grpc",
PoolSize: 5,
TLSEnabled: false,
MaxRetries: 3,
InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2,
BackoffFactor: 1.5,
RetryJitter: 0.2,
Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10,
TransportType: "grpc",
PoolSize: 5,
TLSEnabled: false,
MaxRetries: 3,
InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2,
BackoffFactor: 1.5,
RetryJitter: 0.2,
Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
}
}

View File

@ -12,11 +12,11 @@ import (
// Transaction represents a database transaction
type Transaction struct {
client *Client
id string
readOnly bool
closed bool
mu sync.RWMutex
client *Client
id string
readOnly bool
closed bool
mu sync.RWMutex
}
// ErrTransactionClosed is returned when attempting to use a closed transaction

View File

@ -134,7 +134,7 @@ func (h *HierarchicalIterator) Seek(target []byte) bool {
// If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match
break // Since iterators are in newest-to-oldest order, we can stop at the first match
}
}
@ -275,7 +275,7 @@ func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
// If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match
break // Since iterators are in newest-to-oldest order, we can stop at the first match
}
}

View File

@ -62,7 +62,7 @@ type EngineStats struct {
TxAborted atomic.Uint64
// Recovery stats
WALFilesRecovered atomic.Uint64
WALFilesRecovered atomic.Uint64
WALEntriesRecovered atomic.Uint64
WALCorruptedEntries atomic.Uint64
WALRecoveryDuration atomic.Int64 // nanoseconds

View File

@ -226,7 +226,7 @@ func (pi *prefixIterator) Next() bool {
// Check if current key has the prefix
key := pi.iter.Key()
if len(key) >= len(pi.prefix) &&
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
return true
}
}

View File

@ -9,8 +9,8 @@ import (
"sync"
"time"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
@ -19,13 +19,13 @@ import (
// GRPCClient implements the transport.Client interface for gRPC
type GRPCClient struct {
endpoint string
options transport.TransportOptions
conn *grpc.ClientConn
client pb.KevoServiceClient
status transport.TransportStatus
statusMu sync.RWMutex
metrics transport.MetricsCollector
endpoint string
options transport.TransportOptions
conn *grpc.ClientConn
client pb.KevoServiceClient
status transport.TransportStatus
statusMu sync.RWMutex
metrics transport.MetricsCollector
}
// NewGRPCClient creates a new gRPC client

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
@ -16,13 +16,13 @@ import (
// GRPCServer implements the transport.Server interface for gRPC
type GRPCServer struct {
address string
tlsConfig *tls.Config
server *grpc.Server
address string
tlsConfig *tls.Config
server *grpc.Server
requestHandler transport.RequestHandler
started bool
mu sync.Mutex
metrics *transport.ExtendedMetricsCollector
started bool
mu sync.Mutex
metrics *transport.ExtendedMetricsCollector
}
// NewGRPCServer creates a new gRPC server

View File

@ -5,8 +5,8 @@ import (
"sync"
"time"
"github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport"
"google.golang.org/grpc"
)

View File

@ -6,21 +6,21 @@ import (
// Standard request/response type constants
const (
TypeGet = "get"
TypePut = "put"
TypeDelete = "delete"
TypeBatchWrite = "batch_write"
TypeScan = "scan"
TypeBeginTx = "begin_tx"
TypeCommitTx = "commit_tx"
TypeRollbackTx = "rollback_tx"
TypeTxGet = "tx_get"
TypeTxPut = "tx_put"
TypeTxDelete = "tx_delete"
TypeTxScan = "tx_scan"
TypeGetStats = "get_stats"
TypeCompact = "compact"
TypeError = "error"
TypeGet = "get"
TypePut = "put"
TypeDelete = "delete"
TypeBatchWrite = "batch_write"
TypeScan = "scan"
TypeBeginTx = "begin_tx"
TypeCommitTx = "commit_tx"
TypeRollbackTx = "rollback_tx"
TypeTxGet = "tx_get"
TypeTxPut = "tx_put"
TypeTxDelete = "tx_delete"
TypeTxScan = "tx_scan"
TypeGetStats = "get_stats"
TypeCompact = "compact"
TypeError = "error"
)
// Common errors

View File

@ -82,7 +82,7 @@ func (c *BasicMetricsCollector) RecordRequest(requestType string, startTime time
if exists {
// Update running average - the common case for better branch prediction
// new_avg = (old_avg * count + new_value) / (count + 1)
totalDuration := currentAvg*time.Duration(currentCount) + latency
totalDuration := currentAvg * time.Duration(currentCount) + latency
newCount := currentCount + 1
c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount)
c.requestCountByType[requestType] = newCount

View File

@ -9,9 +9,9 @@ import (
// Metrics struct extensions for server metrics
type ServerMetrics struct {
Metrics
ServerStarted uint64
ServerErrored uint64
ServerStopped uint64
ServerStarted uint64
ServerErrored uint64
ServerStopped uint64
}
// Connection represents a connection to a remote endpoint
@ -31,11 +31,11 @@ type Connection interface {
// ConnectionStatus represents the status of a connection
type ConnectionStatus struct {
Connected bool
LastActivity time.Time
ErrorCount int
RequestCount int
LatencyAvg time.Duration
Connected bool
LastActivity time.Time
ErrorCount int
RequestCount int
LatencyAvg time.Duration
}
// TransportManager is an interface for managing transport layer operations

View File

@ -7,7 +7,7 @@ import (
// registry implements the Registry interface
type registry struct {
mu sync.RWMutex
mu sync.RWMutex
clientFactories map[string]ClientFactory
serverFactories map[string]ServerFactory
}