Compare commits

..

3 Commits

Author SHA1 Message Date
a0a1c0512f
chore: formatting
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m50s
2025-04-22 14:09:54 -06:00
e7974e008d
feat: enhance wal recover statistics 2025-04-22 14:09:45 -06:00
dependabot[bot]
3b3d1c27a4 chore(deps): bump golang.org/x/net from 0.35.0 to 0.38.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.35.0 to 0.38.0.
- [Commits](https://github.com/golang/net/compare/v0.35.0...v0.38.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-version: 0.38.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:10:02 -06:00
35 changed files with 449 additions and 449 deletions

View File

@ -81,14 +81,14 @@ Commands (interactive mode only):
// Config holds the application configuration // Config holds the application configuration
type Config struct { type Config struct {
ServerMode bool ServerMode bool
DaemonMode bool DaemonMode bool
ListenAddr string ListenAddr string
DBPath string DBPath string
TLSEnabled bool TLSEnabled bool
TLSCertFile string TLSCertFile string
TLSKeyFile string TLSKeyFile string
TLSCAFile string TLSCAFile string
} }
func main() { func main() {
@ -168,14 +168,14 @@ func parseFlags() Config {
} }
return Config{ return Config{
ServerMode: *serverMode, ServerMode: *serverMode,
DaemonMode: *daemonMode, DaemonMode: *daemonMode,
ListenAddr: *listenAddr, ListenAddr: *listenAddr,
DBPath: dbPath, DBPath: dbPath,
TLSEnabled: *tlsEnabled, TLSEnabled: *tlsEnabled,
TLSCertFile: *tlsCertFile, TLSCertFile: *tlsCertFile,
TLSKeyFile: *tlsKeyFile, TLSKeyFile: *tlsKeyFile,
TLSCAFile: *tlsCAFile, TLSCAFile: *tlsCAFile,
} }
} }

View File

@ -154,12 +154,12 @@ func (tr *TransactionRegistry) GracefulShutdown(ctx context.Context) error {
// Server represents the Kevo server // Server represents the Kevo server
type Server struct { type Server struct {
eng *engine.Engine eng *engine.Engine
txRegistry *TransactionRegistry txRegistry *TransactionRegistry
listener net.Listener listener net.Listener
grpcServer *grpc.Server grpcServer *grpc.Server
kevoService *grpcservice.KevoServiceServer kevoService *grpcservice.KevoServiceServer
config Config config Config
} }
// NewServer creates a new server instance // NewServer creates a new server instance

6
go.mod
View File

@ -10,8 +10,8 @@ require (
) )
require ( require (
golang.org/x/net v0.35.0 // indirect golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.30.0 // indirect golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.22.0 // indirect golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
) )

12
go.sum
View File

@ -28,13 +28,13 @@ go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=

View File

@ -23,11 +23,11 @@ const (
// ClientOptions configures a Kevo client // ClientOptions configures a Kevo client
type ClientOptions struct { type ClientOptions struct {
// Connection options // Connection options
Endpoint string // Server address Endpoint string // Server address
ConnectTimeout time.Duration // Timeout for connection attempts ConnectTimeout time.Duration // Timeout for connection attempts
RequestTimeout time.Duration // Default timeout for requests RequestTimeout time.Duration // Default timeout for requests
TransportType string // Transport type (e.g. "grpc") TransportType string // Transport type (e.g. "grpc")
PoolSize int // Connection pool size PoolSize int // Connection pool size
// Security options // Security options
TLSEnabled bool // Enable TLS TLSEnabled bool // Enable TLS
@ -50,19 +50,19 @@ type ClientOptions struct {
// DefaultClientOptions returns sensible default client options // DefaultClientOptions returns sensible default client options
func DefaultClientOptions() ClientOptions { func DefaultClientOptions() ClientOptions {
return ClientOptions{ return ClientOptions{
Endpoint: "localhost:50051", Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5, ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10, RequestTimeout: time.Second * 10,
TransportType: "grpc", TransportType: "grpc",
PoolSize: 5, PoolSize: 5,
TLSEnabled: false, TLSEnabled: false,
MaxRetries: 3, MaxRetries: 3,
InitialBackoff: time.Millisecond * 100, InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2, MaxBackoff: time.Second * 2,
BackoffFactor: 1.5, BackoffFactor: 1.5,
RetryJitter: 0.2, RetryJitter: 0.2,
Compression: CompressionNone, Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB MaxMessageSize: 16 * 1024 * 1024, // 16MB
} }
} }

View File

@ -12,11 +12,11 @@ import (
// Transaction represents a database transaction // Transaction represents a database transaction
type Transaction struct { type Transaction struct {
client *Client client *Client
id string id string
readOnly bool readOnly bool
closed bool closed bool
mu sync.RWMutex mu sync.RWMutex
} }
// ErrTransactionClosed is returned when attempting to use a closed transaction // ErrTransactionClosed is returned when attempting to use a closed transaction

View File

@ -134,7 +134,7 @@ func (h *HierarchicalIterator) Seek(target []byte) bool {
// If a newer iterator has the same key, use its value // If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) { if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value() bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match break // Since iterators are in newest-to-oldest order, we can stop at the first match
} }
} }
@ -275,7 +275,7 @@ func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
// If a newer iterator has the same key, use its value // If a newer iterator has the same key, use its value
if bytes.Equal(iter.Key(), bestKey) { if bytes.Equal(iter.Key(), bestKey) {
bestValue = iter.Value() bestValue = iter.Value()
break // Since iterators are in newest-to-oldest order, we can stop at the first match break // Since iterators are in newest-to-oldest order, we can stop at the first match
} }
} }

View File

@ -62,7 +62,7 @@ type EngineStats struct {
TxAborted atomic.Uint64 TxAborted atomic.Uint64
// Recovery stats // Recovery stats
WALFilesRecovered atomic.Uint64 WALFilesRecovered atomic.Uint64
WALEntriesRecovered atomic.Uint64 WALEntriesRecovered atomic.Uint64
WALCorruptedEntries atomic.Uint64 WALCorruptedEntries atomic.Uint64
WALRecoveryDuration atomic.Int64 // nanoseconds WALRecoveryDuration atomic.Int64 // nanoseconds

View File

@ -226,7 +226,7 @@ func (pi *prefixIterator) Next() bool {
// Check if current key has the prefix // Check if current key has the prefix
key := pi.iter.Key() key := pi.iter.Key()
if len(key) >= len(pi.prefix) && if len(key) >= len(pi.prefix) &&
equalByteSlice(key[:len(pi.prefix)], pi.prefix) { equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
return true return true
} }
} }

View File

@ -9,8 +9,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
@ -19,13 +19,13 @@ import (
// GRPCClient implements the transport.Client interface for gRPC // GRPCClient implements the transport.Client interface for gRPC
type GRPCClient struct { type GRPCClient struct {
endpoint string endpoint string
options transport.TransportOptions options transport.TransportOptions
conn *grpc.ClientConn conn *grpc.ClientConn
client pb.KevoServiceClient client pb.KevoServiceClient
status transport.TransportStatus status transport.TransportStatus
statusMu sync.RWMutex statusMu sync.RWMutex
metrics transport.MetricsCollector metrics transport.MetricsCollector
} }
// NewGRPCClient creates a new gRPC client // NewGRPCClient creates a new gRPC client

View File

@ -7,8 +7,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"

View File

@ -7,8 +7,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
@ -16,13 +16,13 @@ import (
// GRPCServer implements the transport.Server interface for gRPC // GRPCServer implements the transport.Server interface for gRPC
type GRPCServer struct { type GRPCServer struct {
address string address string
tlsConfig *tls.Config tlsConfig *tls.Config
server *grpc.Server server *grpc.Server
requestHandler transport.RequestHandler requestHandler transport.RequestHandler
started bool started bool
mu sync.Mutex mu sync.Mutex
metrics *transport.ExtendedMetricsCollector metrics *transport.ExtendedMetricsCollector
} }
// NewGRPCServer creates a new gRPC server // NewGRPCServer creates a new gRPC server

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"time" "time"
pb "github.com/KevoDB/kevo/proto/kevo"
"github.com/KevoDB/kevo/pkg/transport" "github.com/KevoDB/kevo/pkg/transport"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

View File

@ -6,21 +6,21 @@ import (
// Standard request/response type constants // Standard request/response type constants
const ( const (
TypeGet = "get" TypeGet = "get"
TypePut = "put" TypePut = "put"
TypeDelete = "delete" TypeDelete = "delete"
TypeBatchWrite = "batch_write" TypeBatchWrite = "batch_write"
TypeScan = "scan" TypeScan = "scan"
TypeBeginTx = "begin_tx" TypeBeginTx = "begin_tx"
TypeCommitTx = "commit_tx" TypeCommitTx = "commit_tx"
TypeRollbackTx = "rollback_tx" TypeRollbackTx = "rollback_tx"
TypeTxGet = "tx_get" TypeTxGet = "tx_get"
TypeTxPut = "tx_put" TypeTxPut = "tx_put"
TypeTxDelete = "tx_delete" TypeTxDelete = "tx_delete"
TypeTxScan = "tx_scan" TypeTxScan = "tx_scan"
TypeGetStats = "get_stats" TypeGetStats = "get_stats"
TypeCompact = "compact" TypeCompact = "compact"
TypeError = "error" TypeError = "error"
) )
// Common errors // Common errors

View File

@ -82,7 +82,7 @@ func (c *BasicMetricsCollector) RecordRequest(requestType string, startTime time
if exists { if exists {
// Update running average - the common case for better branch prediction // Update running average - the common case for better branch prediction
// new_avg = (old_avg * count + new_value) / (count + 1) // new_avg = (old_avg * count + new_value) / (count + 1)
totalDuration := currentAvg * time.Duration(currentCount) + latency totalDuration := currentAvg*time.Duration(currentCount) + latency
newCount := currentCount + 1 newCount := currentCount + 1
c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount) c.avgLatencyByType[requestType] = totalDuration / time.Duration(newCount)
c.requestCountByType[requestType] = newCount c.requestCountByType[requestType] = newCount

View File

@ -9,9 +9,9 @@ import (
// Metrics struct extensions for server metrics // Metrics struct extensions for server metrics
type ServerMetrics struct { type ServerMetrics struct {
Metrics Metrics
ServerStarted uint64 ServerStarted uint64
ServerErrored uint64 ServerErrored uint64
ServerStopped uint64 ServerStopped uint64
} }
// Connection represents a connection to a remote endpoint // Connection represents a connection to a remote endpoint
@ -31,11 +31,11 @@ type Connection interface {
// ConnectionStatus represents the status of a connection // ConnectionStatus represents the status of a connection
type ConnectionStatus struct { type ConnectionStatus struct {
Connected bool Connected bool
LastActivity time.Time LastActivity time.Time
ErrorCount int ErrorCount int
RequestCount int RequestCount int
LatencyAvg time.Duration LatencyAvg time.Duration
} }
// TransportManager is an interface for managing transport layer operations // TransportManager is an interface for managing transport layer operations

View File

@ -7,7 +7,7 @@ import (
// registry implements the Registry interface // registry implements the Registry interface
type registry struct { type registry struct {
mu sync.RWMutex mu sync.RWMutex
clientFactories map[string]ClientFactory clientFactories map[string]ClientFactory
serverFactories map[string]ServerFactory serverFactories map[string]ServerFactory
} }