fix: fixes an issue where the engine type would not be supported for transactions sometimes
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Failing after 15m6s

This commit is contained in:
Jeremy Tregunna 2025-05-17 12:40:45 -06:00
parent 7f825cae46
commit 1ff166fe77
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
9 changed files with 68 additions and 742 deletions

View File

@ -105,7 +105,7 @@ func main() {
config := parseFlags()
// Open database if path provided
var eng *engine.Engine
var eng *engine.EngineFacade
var err error
if config.DBPath != "" {
@ -210,7 +210,7 @@ func parseFlags() Config {
}
// runServer initializes and runs the Kevo server
func runServer(eng *engine.Engine, config Config) {
func runServer(eng *engine.EngineFacade, config Config) {
// Set up daemon mode if requested
if config.DaemonMode {
setupDaemonMode()
@ -274,7 +274,7 @@ func setupDaemonMode() {
}
// setupGracefulShutdown configures graceful shutdown on signals
func setupGracefulShutdown(server *Server, eng *engine.Engine) {
func setupGracefulShutdown(server *Server, eng *engine.EngineFacade) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
@ -299,7 +299,7 @@ func setupGracefulShutdown(server *Server, eng *engine.Engine) {
}
// runInteractive starts the interactive CLI mode
func runInteractive(eng *engine.Engine, dbPath string) {
func runInteractive(eng *engine.EngineFacade, dbPath string) {
fmt.Println("Kevo (kevo) version 1.0.2")
fmt.Println("Enter .help for usage hints.")

View File

@ -7,10 +7,10 @@ import (
"net"
"time"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
"github.com/KevoDB/kevo/pkg/engine/transaction"
"github.com/KevoDB/kevo/pkg/engine"
grpcservice "github.com/KevoDB/kevo/pkg/grpc/service"
"github.com/KevoDB/kevo/pkg/replication"
"github.com/KevoDB/kevo/pkg/transaction"
pb "github.com/KevoDB/kevo/proto/kevo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -19,8 +19,8 @@ import (
// Server represents the Kevo server
type Server struct {
eng interfaces.Engine
txRegistry interfaces.TxRegistry
eng *engine.EngineFacade
txRegistry transaction.Registry
listener net.Listener
grpcServer *grpc.Server
kevoService *grpcservice.KevoServiceServer
@ -29,10 +29,14 @@ type Server struct {
}
// NewServer creates a new server instance
func NewServer(eng interfaces.Engine, config Config) *Server {
func NewServer(eng *engine.EngineFacade, config Config) *Server {
// Create a transaction registry directly from the transaction package
// The transaction registry can work with any type that implements BeginTransaction
txRegistry := transaction.NewRegistry()
return &Server{
eng: eng,
txRegistry: transaction.NewRegistry(),
txRegistry: txRegistry,
config: config,
}
}

View File

@ -12,8 +12,8 @@ import (
"github.com/KevoDB/kevo/pkg/engine/compaction"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
"github.com/KevoDB/kevo/pkg/engine/storage"
"github.com/KevoDB/kevo/pkg/engine/transaction"
"github.com/KevoDB/kevo/pkg/stats"
"github.com/KevoDB/kevo/pkg/transaction"
"github.com/KevoDB/kevo/pkg/wal"
)
@ -30,7 +30,7 @@ type EngineFacade struct {
// Core components
storage interfaces.StorageManager
txManager interfaces.TransactionManager
txManager *transaction.Manager
compaction interfaces.CompactionManager
stats stats.Collector
@ -346,23 +346,6 @@ func (e *EngineFacade) BeginTransaction(readOnly bool) (interfaces.Transaction,
// Track the operation start
e.stats.TrackOperation(stats.OpTxBegin)
// Check if we have a registered transaction creator for legacy compatibility
creator := GetRegisteredTransactionCreator()
if creator != nil {
// For backward compatibility with existing code that might be using the legacy transaction system
// Try to use the registered creator
legacyTx, err := CreateTransactionWithCreator(e, readOnly)
if err == nil {
// Track that we successfully created a transaction
e.stats.TrackOperation(stats.OpTxBegin)
// We need to adapt between the legacy and new interfaces
// Both have the same methods, so we can use type assertion safely if we're
// sure the LegacyTransaction also implements interfaces.Transaction
return legacyTx.(interfaces.Transaction), nil
}
// If legacy creator fails, fall back to the new implementation
}
// Track operation latency
start := time.Now()
tx, err := e.txManager.BeginTransaction(readOnly)
@ -561,7 +544,7 @@ func (e *EngineFacade) GetStats() map[string]interface{} {
}
// GetTransactionManager returns the transaction manager
func (e *EngineFacade) GetTransactionManager() interfaces.TransactionManager {
func (e *EngineFacade) GetTransactionManager() transaction.TransactionManager {
return e.txManager
}
@ -593,6 +576,11 @@ func (e *EngineFacade) GetCompactionStats() (map[string]interface{}, error) {
}, nil
}
// IsReadOnly returns true if the engine is in read-only mode
func (e *EngineFacade) IsReadOnly() bool {
return e.readOnly.Load()
}
// Close closes the storage engine
func (e *EngineFacade) Close() error {
// First set the closed flag to prevent new operations

View File

@ -36,7 +36,4 @@ func (e *EngineFacade) SetReadOnly(readOnly bool) {
log.Info("Engine read-only mode set to: %v", readOnly)
}
// IsReadOnly returns whether the engine is in read-only mode
func (e *EngineFacade) IsReadOnly() bool {
return e.readOnly.Load()
}
// IsReadOnly moved to facade.go

View File

@ -1,155 +0,0 @@
package transaction
import (
"context"
"sync"
"github.com/KevoDB/kevo/pkg/common/iterator"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
"github.com/KevoDB/kevo/pkg/stats"
tx "github.com/KevoDB/kevo/pkg/transaction"
"github.com/KevoDB/kevo/pkg/wal"
)
// Forward engine transaction functions to the new implementation
// This is a transitional approach until all call sites are updated
// storageAdapter adapts the engine storage interface to the new transaction package
type storageAdapter struct {
storage interfaces.StorageManager
}
// Implement the transaction.StorageBackend interface
func (a *storageAdapter) Get(key []byte) ([]byte, error) {
return a.storage.Get(key)
}
func (a *storageAdapter) ApplyBatch(entries []*wal.Entry) error {
return a.storage.ApplyBatch(entries)
}
func (a *storageAdapter) GetIterator() (iterator.Iterator, error) {
return a.storage.GetIterator()
}
func (a *storageAdapter) GetRangeIterator(startKey, endKey []byte) (iterator.Iterator, error) {
return a.storage.GetRangeIterator(startKey, endKey)
}
// Create a wrapper for the transaction manager interface
type managerWrapper struct {
inner *tx.Manager
}
// Implement interfaces.TransactionManager methods
func (w *managerWrapper) BeginTransaction(readOnly bool) (interfaces.Transaction, error) {
transaction, err := w.inner.BeginTransaction(readOnly)
if err != nil {
return nil, err
}
// Since our transaction implements the same interface, wrap it
return &transactionWrapper{transaction}, nil
}
func (w *managerWrapper) GetRWLock() *sync.RWMutex {
return w.inner.GetRWLock()
}
func (w *managerWrapper) IncrementTxCompleted() {
w.inner.IncrementTxCompleted()
}
func (w *managerWrapper) IncrementTxAborted() {
w.inner.IncrementTxAborted()
}
func (w *managerWrapper) GetTransactionStats() map[string]interface{} {
return w.inner.GetTransactionStats()
}
// Create a wrapper for the transaction interface
type transactionWrapper struct {
inner tx.Transaction
}
// Implement interfaces.Transaction methods
func (w *transactionWrapper) Get(key []byte) ([]byte, error) {
return w.inner.Get(key)
}
func (w *transactionWrapper) Put(key, value []byte) error {
return w.inner.Put(key, value)
}
func (w *transactionWrapper) Delete(key []byte) error {
return w.inner.Delete(key)
}
func (w *transactionWrapper) NewIterator() iterator.Iterator {
return w.inner.NewIterator()
}
func (w *transactionWrapper) NewRangeIterator(startKey, endKey []byte) iterator.Iterator {
return w.inner.NewRangeIterator(startKey, endKey)
}
func (w *transactionWrapper) Commit() error {
return w.inner.Commit()
}
func (w *transactionWrapper) Rollback() error {
return w.inner.Rollback()
}
func (w *transactionWrapper) IsReadOnly() bool {
return w.inner.IsReadOnly()
}
// Create a wrapper for the registry interface
type registryWrapper struct {
inner tx.Registry
}
// Implement interfaces.TxRegistry methods
func (w *registryWrapper) Begin(ctx context.Context, eng interfaces.Engine, readOnly bool) (string, error) {
return w.inner.Begin(ctx, eng, readOnly)
}
func (w *registryWrapper) Get(txID string) (interfaces.Transaction, bool) {
transaction, found := w.inner.Get(txID)
if !found {
return nil, false
}
return &transactionWrapper{transaction}, true
}
func (w *registryWrapper) Remove(txID string) {
w.inner.Remove(txID)
}
func (w *registryWrapper) CleanupConnection(connectionID string) {
w.inner.CleanupConnection(connectionID)
}
func (w *registryWrapper) GracefulShutdown(ctx context.Context) error {
return w.inner.GracefulShutdown(ctx)
}
// NewManager forwards to the new implementation while maintaining the same signature
func NewManager(storage interfaces.StorageManager, statsCollector stats.Collector) interfaces.TransactionManager {
// Create a storage adapter that works with our new transaction implementation
adapter := &storageAdapter{storage: storage}
// Create the new transaction manager and wrap it
return &managerWrapper{
inner: tx.NewManager(adapter, statsCollector),
}
}
// NewRegistry forwards to the new implementation while maintaining the same signature
func NewRegistry() interfaces.TxRegistry {
// Create the new registry and wrap it
return &registryWrapper{
inner: tx.NewRegistry(),
}
}

View File

@ -1,223 +0,0 @@
package transaction
import (
"testing"
"time"
"github.com/KevoDB/kevo/pkg/common/iterator"
"github.com/KevoDB/kevo/pkg/stats"
"github.com/KevoDB/kevo/pkg/wal"
)
// MockStorage implements the StorageManager interface for testing
type MockStorage struct{}
func (m *MockStorage) Get(key []byte) ([]byte, error) {
return []byte("value"), nil
}
func (m *MockStorage) Put(key, value []byte) error {
return nil
}
func (m *MockStorage) Delete(key []byte) error {
return nil
}
func (m *MockStorage) IsDeleted(key []byte) (bool, error) {
return false, nil
}
func (m *MockStorage) GetIterator() (iterator.Iterator, error) {
return &MockIterator{}, nil
}
func (m *MockStorage) GetRangeIterator(startKey, endKey []byte) (iterator.Iterator, error) {
return &MockIterator{}, nil
}
func (m *MockStorage) ApplyBatch(entries []*wal.Entry) error {
return nil
}
func (m *MockStorage) FlushMemTables() error {
return nil
}
func (m *MockStorage) Close() error {
return nil
}
func (m *MockStorage) GetMemTableSize() uint64 {
return 0
}
func (m *MockStorage) IsFlushNeeded() bool {
return false
}
func (m *MockStorage) GetSSTables() []string {
return []string{}
}
func (m *MockStorage) ReloadSSTables() error {
return nil
}
func (m *MockStorage) RotateWAL() error {
return nil
}
func (m *MockStorage) GetStorageStats() map[string]interface{} {
return map[string]interface{}{}
}
// MockIterator is a simple iterator for testing
type MockIterator struct{}
func (it *MockIterator) SeekToFirst() {}
func (it *MockIterator) SeekToLast() {}
func (it *MockIterator) Seek(key []byte) bool {
return false
}
func (it *MockIterator) Next() bool {
return false
}
func (it *MockIterator) Key() []byte {
return nil
}
func (it *MockIterator) Value() []byte {
return nil
}
func (it *MockIterator) Valid() bool {
return false
}
func (it *MockIterator) IsTombstone() bool {
return false
}
// MockStatsCollector implements the stats.Collector interface for testing
type MockStatsCollector struct{}
func (m *MockStatsCollector) GetStats() map[string]interface{} {
return nil
}
func (m *MockStatsCollector) GetStatsFiltered(prefix string) map[string]interface{} {
return nil
}
func (m *MockStatsCollector) TrackOperation(op stats.OperationType) {}
func (m *MockStatsCollector) TrackOperationWithLatency(op stats.OperationType, latencyNs uint64) {}
func (m *MockStatsCollector) TrackError(errorType string) {}
func (m *MockStatsCollector) TrackBytes(isWrite bool, bytes uint64) {}
func (m *MockStatsCollector) TrackMemTableSize(size uint64) {}
func (m *MockStatsCollector) TrackFlush() {}
func (m *MockStatsCollector) TrackCompaction() {}
func (m *MockStatsCollector) StartRecovery() time.Time {
return time.Now()
}
func (m *MockStatsCollector) FinishRecovery(startTime time.Time, filesRecovered, entriesRecovered, corruptedEntries uint64) {
}
func TestForwardingLayer(t *testing.T) {
// Create mocks
storage := &MockStorage{}
statsCollector := &MockStatsCollector{}
// Create the manager through the forwarding layer
manager := NewManager(storage, statsCollector)
// Verify the manager was created
if manager == nil {
t.Fatal("Expected manager to be created, got nil")
}
// Get the RWLock
rwLock := manager.GetRWLock()
if rwLock == nil {
t.Fatal("Expected non-nil RWLock")
}
// Test transaction creation
tx, err := manager.BeginTransaction(true)
if err != nil {
t.Fatalf("Unexpected error beginning transaction: %v", err)
}
// Verify it's a read-only transaction
if !tx.IsReadOnly() {
t.Error("Expected read-only transaction")
}
// Test some operations
_, err = tx.Get([]byte("key"))
if err != nil {
t.Errorf("Unexpected error in Get: %v", err)
}
// Commit the transaction
err = tx.Commit()
if err != nil {
t.Errorf("Unexpected error committing transaction: %v", err)
}
// Create a read-write transaction
tx, err = manager.BeginTransaction(false)
if err != nil {
t.Fatalf("Unexpected error beginning transaction: %v", err)
}
// Verify it's a read-write transaction
if tx.IsReadOnly() {
t.Error("Expected read-write transaction")
}
// Test put operation
err = tx.Put([]byte("key"), []byte("value"))
if err != nil {
t.Errorf("Unexpected error in Put: %v", err)
}
// Test delete operation
err = tx.Delete([]byte("key"))
if err != nil {
t.Errorf("Unexpected error in Delete: %v", err)
}
// Test iterator
it := tx.NewIterator()
if it == nil {
t.Error("Expected non-nil iterator")
}
// Test range iterator
rangeIt := tx.NewRangeIterator([]byte("a"), []byte("z"))
if rangeIt == nil {
t.Error("Expected non-nil range iterator")
}
// Rollback the transaction
err = tx.Rollback()
if err != nil {
t.Errorf("Unexpected error rolling back transaction: %v", err)
}
// Verify IncrementTxCompleted and IncrementTxAborted are working
manager.IncrementTxCompleted()
manager.IncrementTxAborted()
// Test the registry creation
registry := NewRegistry()
if registry == nil {
t.Fatal("Expected registry to be created, got nil")
}
}

View File

@ -1,310 +0,0 @@
package transaction
import (
"testing"
"github.com/KevoDB/kevo/pkg/common/iterator"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
"github.com/KevoDB/kevo/pkg/stats"
"github.com/KevoDB/kevo/pkg/wal"
)
// MockStorageManager is a simple mock for the interfaces.StorageManager
type MockStorageManager struct {
data map[string][]byte
}
func NewMockStorageManager() *MockStorageManager {
return &MockStorageManager{
data: make(map[string][]byte),
}
}
func (m *MockStorageManager) Put(key, value []byte) error {
m.data[string(key)] = value
return nil
}
func (m *MockStorageManager) Get(key []byte) ([]byte, error) {
value, ok := m.data[string(key)]
if !ok {
return nil, interfaces.ErrKeyNotFound
}
return value, nil
}
func (m *MockStorageManager) Delete(key []byte) error {
delete(m.data, string(key))
return nil
}
func (m *MockStorageManager) IsDeleted(key []byte) (bool, error) {
_, exists := m.data[string(key)]
return !exists, nil
}
func (m *MockStorageManager) FlushMemTables() error {
return nil
}
func (m *MockStorageManager) GetIterator() (iterator.Iterator, error) {
return nil, nil // Not needed for these tests
}
func (m *MockStorageManager) GetRangeIterator(startKey, endKey []byte) (iterator.Iterator, error) {
return nil, nil // Not needed for these tests
}
func (m *MockStorageManager) ApplyBatch(entries []*wal.Entry) error {
// Process each entry in the batch
for _, entry := range entries {
switch entry.Type {
case wal.OpTypePut:
m.data[string(entry.Key)] = entry.Value
case wal.OpTypeDelete:
delete(m.data, string(entry.Key))
}
}
return nil
}
func (m *MockStorageManager) GetStorageStats() map[string]interface{} {
return nil // Not needed for these tests
}
func (m *MockStorageManager) Close() error {
return nil
}
// Additional methods required by the StorageManager interface
func (m *MockStorageManager) GetMemTableSize() uint64 {
return 0
}
func (m *MockStorageManager) IsFlushNeeded() bool {
return false
}
func (m *MockStorageManager) GetSSTables() []string {
return []string{}
}
func (m *MockStorageManager) ReloadSSTables() error {
return nil
}
func (m *MockStorageManager) RotateWAL() error {
return nil
}
func TestTransactionManager_BasicOperations(t *testing.T) {
// Create dependencies
storage := NewMockStorageManager()
collector := stats.NewAtomicCollector()
// Create the transaction manager
manager := NewManager(storage, collector)
// Begin a new read-write transaction
tx, err := manager.BeginTransaction(false)
if err != nil {
t.Fatalf("Failed to begin transaction: %v", err)
}
// Put a key-value pair
err = tx.Put([]byte("test-key"), []byte("test-value"))
if err != nil {
t.Fatalf("Failed to put key in transaction: %v", err)
}
// Verify we can get the value within the transaction
value, err := tx.Get([]byte("test-key"))
if err != nil {
t.Fatalf("Failed to get key from transaction: %v", err)
}
if string(value) != "test-value" {
t.Errorf("Got incorrect value in transaction. Expected: test-value, Got: %s", string(value))
}
// The value should not be in the storage yet (not committed)
_, err = storage.Get([]byte("test-key"))
if err == nil {
t.Errorf("Key should not be in storage before commit")
}
// Commit the transaction
err = tx.Commit()
if err != nil {
t.Fatalf("Failed to commit transaction: %v", err)
}
// Now the value should be in the storage
value, err = storage.Get([]byte("test-key"))
if err != nil {
t.Fatalf("Key not found in storage after commit: %v", err)
}
if string(value) != "test-value" {
t.Errorf("Got incorrect value in storage. Expected: test-value, Got: %s", string(value))
}
// Check transaction metrics
stats := manager.GetTransactionStats()
if count, ok := stats["tx_started"]; !ok || count.(uint64) != 1 {
t.Errorf("Incorrect tx_started count. Got: %v", count)
}
if count, ok := stats["tx_completed"]; !ok || count.(uint64) != 1 {
t.Errorf("Incorrect tx_completed count. Got: %v", count)
}
}
func TestTransactionManager_RollbackAndReadOnly(t *testing.T) {
// Create dependencies
storage := NewMockStorageManager()
collector := stats.NewAtomicCollector()
// Create the transaction manager
manager := NewManager(storage, collector)
// Test rollback
rwTx, err := manager.BeginTransaction(false)
if err != nil {
t.Fatalf("Failed to begin read-write transaction: %v", err)
}
// Make some changes
err = rwTx.Put([]byte("rollback-key"), []byte("rollback-value"))
if err != nil {
t.Fatalf("Failed to put key in transaction: %v", err)
}
// Rollback the transaction
err = rwTx.Rollback()
if err != nil {
t.Fatalf("Failed to rollback transaction: %v", err)
}
// Verify the changes were not applied
_, err = storage.Get([]byte("rollback-key"))
if err == nil {
t.Errorf("Key should not be in storage after rollback")
}
// Test read-only transaction
roTx, err := manager.BeginTransaction(true)
if err != nil {
t.Fatalf("Failed to begin read-only transaction: %v", err)
}
// Try to write in a read-only transaction (should fail)
err = roTx.Put([]byte("readonly-key"), []byte("readonly-value"))
if err == nil {
t.Errorf("Put should fail in a read-only transaction")
}
// Add data to storage directly
storage.Put([]byte("readonly-test"), []byte("readonly-value"))
// Read-only transaction should be able to read
value, err := roTx.Get([]byte("readonly-test"))
if err != nil {
t.Fatalf("Failed to get key in read-only transaction: %v", err)
}
if string(value) != "readonly-value" {
t.Errorf("Got incorrect value in read-only transaction. Expected: readonly-value, Got: %s", string(value))
}
// Commit should work for read-only transaction
err = roTx.Commit()
if err != nil {
t.Fatalf("Failed to commit read-only transaction: %v", err)
}
// Check transaction metrics
stats := manager.GetTransactionStats()
if count, ok := stats["tx_started"]; !ok || count.(uint64) != 2 {
t.Errorf("Incorrect tx_started count. Got: %v", count)
}
if count, ok := stats["tx_completed"]; !ok || count.(uint64) != 1 {
t.Errorf("Incorrect tx_completed count. Got: %v", count)
}
if count, ok := stats["tx_aborted"]; !ok || count.(uint64) != 1 {
t.Errorf("Incorrect tx_aborted count. Got: %v", count)
}
}
func TestTransactionManager_Isolation(t *testing.T) {
// Create dependencies
storage := NewMockStorageManager()
collector := stats.NewAtomicCollector()
// Create the transaction manager
manager := NewManager(storage, collector)
// Add initial data
storage.Put([]byte("isolation-key"), []byte("initial-value"))
// In a real scenario with proper locking, we'd test isolation across transactions
// But for unit testing, we'll simplify to avoid deadlocks
// Test part 1: uncommitted changes aren't visible to new transactions
{
// Begin a transaction and modify data
tx1, err := manager.BeginTransaction(false)
if err != nil {
t.Fatalf("Failed to begin transaction: %v", err)
}
// Modify the key in the transaction
err = tx1.Put([]byte("isolation-key"), []byte("tx1-value"))
if err != nil {
t.Fatalf("Failed to put key in transaction: %v", err)
}
// Ensure the change is in the transaction buffer but not committed yet
txValue, err := tx1.Get([]byte("isolation-key"))
if err != nil || string(txValue) != "tx1-value" {
t.Fatalf("Transaction doesn't see its own changes. Got: %s, err: %v", txValue, err)
}
// Storage should still have the original value
storageValue, err := storage.Get([]byte("isolation-key"))
if err != nil || string(storageValue) != "initial-value" {
t.Fatalf("Storage changed before commit. Got: %s, err: %v", storageValue, err)
}
// Commit the transaction
err = tx1.Commit()
if err != nil {
t.Fatalf("Failed to commit transaction: %v", err)
}
// Now storage should have the updated value
storageValue, err = storage.Get([]byte("isolation-key"))
if err != nil || string(storageValue) != "tx1-value" {
t.Fatalf("Storage not updated after commit. Got: %s, err: %v", storageValue, err)
}
}
// Test part 2: reading committed data
{
// A new transaction should see the updated value
tx2, err := manager.BeginTransaction(true)
if err != nil {
t.Fatalf("Failed to begin read-only transaction: %v", err)
}
value, err := tx2.Get([]byte("isolation-key"))
if err != nil {
t.Fatalf("Failed to get key in transaction: %v", err)
}
if string(value) != "tx1-value" {
t.Errorf("Transaction doesn't see committed changes. Expected: tx1-value, Got: %s", string(value))
}
// Commit the read-only transaction
err = tx2.Commit()
if err != nil {
t.Fatalf("Failed to commit read-only transaction: %v", err)
}
}
}

View File

@ -10,23 +10,18 @@ import (
"github.com/KevoDB/kevo/pkg/common/log"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
"github.com/KevoDB/kevo/pkg/replication"
"github.com/KevoDB/kevo/pkg/transaction"
pb "github.com/KevoDB/kevo/proto/kevo"
)
// TxRegistry is the interface we need for the transaction registry
type TxRegistry interface {
Begin(ctx context.Context, eng interfaces.Engine, readOnly bool) (string, error)
Get(txID string) (interfaces.Transaction, bool)
Remove(txID string)
CleanupConnection(connectionID string)
}
// Using the transaction registry directly
// KevoServiceServer implements the gRPC KevoService interface
type KevoServiceServer struct {
pb.UnimplementedKevoServiceServer
engine interfaces.Engine
txRegistry TxRegistry
activeTx sync.Map // map[string]interfaces.Transaction
txRegistry transaction.Registry
activeTx sync.Map // map[string]transaction.Transaction
txMu sync.Mutex
compactionSem chan struct{} // Semaphore for limiting concurrent compactions
maxKeySize int // Maximum allowed key size
@ -58,7 +53,7 @@ type ReplicationInfoProvider interface {
type ReplicaInfo = replication.ReplicationNodeInfo
// NewKevoServiceServer creates a new KevoServiceServer
func NewKevoServiceServer(engine interfaces.Engine, txRegistry TxRegistry, replicationManager ReplicationInfoProvider) *KevoServiceServer {
func NewKevoServiceServer(engine interfaces.Engine, txRegistry transaction.Registry, replicationManager ReplicationInfoProvider) *KevoServiceServer {
return &KevoServiceServer{
engine: engine,
txRegistry: txRegistry,
@ -249,8 +244,8 @@ func (s *KevoServiceServer) Scan(req *pb.ScanRequest, stream pb.KevoService_Scan
// BeginTransaction starts a new transaction
func (s *KevoServiceServer) BeginTransaction(ctx context.Context, req *pb.BeginTransactionRequest) (*pb.BeginTransactionResponse, error) {
// Force clean up of old transactions before creating new ones
if cleaner, ok := s.txRegistry.(interface{ removeStaleTransactions() }); ok {
cleaner.removeStaleTransactions()
if cleaner, ok := s.txRegistry.(*transaction.RegistryImpl); ok {
cleaner.CleanupStaleTransactions()
}
txID, err := s.txRegistry.Begin(ctx, s.engine, req.ReadOnly)

View File

@ -3,8 +3,11 @@ package transaction
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/KevoDB/kevo/pkg/common/log"
)
// Registry manages transaction lifecycle and connections
@ -64,7 +67,7 @@ func (r *RegistryImpl) cleanupStaleTx() {
for {
select {
case <-r.cleanupTicker.C:
r.cleanupStaleTransactions()
r.CleanupStaleTransactions()
case <-r.stopCleanup:
r.cleanupTicker.Stop()
return
@ -72,8 +75,9 @@ func (r *RegistryImpl) cleanupStaleTx() {
}
}
// cleanupStaleTransactions removes transactions that have been idle for too long
func (r *RegistryImpl) cleanupStaleTransactions() {
// CleanupStaleTransactions removes transactions that have been idle for too long
// This is exported so that the service can call it directly
func (r *RegistryImpl) CleanupStaleTransactions() {
r.mu.Lock()
defer r.mu.Unlock()
@ -193,11 +197,37 @@ func (r *RegistryImpl) Begin(ctx context.Context, engine interface{}, readOnly b
var tx Transaction
var err error
// Attempt to cast to different engine types
if manager, ok := engine.(TransactionManager); ok {
tx, err = manager.BeginTransaction(readOnly)
// Check for different types of engines
if engine != nil {
// Just directly try to get a transaction, without complex type checking
// The only real requirement is that the engine has a BeginTransaction method
// that returns a transaction that matches our Transaction interface
// Get the method using reflection to avoid type compatibility issues
val := reflect.ValueOf(engine)
method := val.MethodByName("BeginTransaction")
if !method.IsValid() {
err = fmt.Errorf("engine does not have BeginTransaction method")
return
}
// Call the method
log.Debug("Calling BeginTransaction via reflection")
args := []reflect.Value{reflect.ValueOf(readOnly)}
results := method.Call(args)
// Check for errors
if !results[1].IsNil() {
err = results[1].Interface().(error)
return
}
// Get the transaction
txVal := results[0].Interface()
tx = txVal.(Transaction)
} else {
err = fmt.Errorf("unsupported engine type for transactions")
err = fmt.Errorf("nil engine provided to transaction registry")
}
select {
@ -367,4 +397,4 @@ func (r *RegistryImpl) GracefulShutdown(ctx context.Context) error {
r.connectionTxs = make(map[string]map[string]struct{})
return lastErr
}
}