From c2c97f67a81393eff5d70b9cbbb51331ca63d970 Mon Sep 17 00:00:00 2001 From: Jeremy Tregunna Date: Sat, 17 May 2025 14:58:26 -0600 Subject: [PATCH] fix: fixes issue with sstables not having sequence saved so older data can be retrieved after an update --- cmd/kevo/server.go | 4 +- pkg/client/client.go | 34 ++--- pkg/common/iterator/filtered/filtered.go | 16 +- pkg/common/iterator/filtered/filtered_test.go | 54 +++---- pkg/config/config.go | 12 +- pkg/engine/storage/manager.go | 80 +++++++--- pkg/engine/storage/retry.go | 11 +- pkg/engine/storage/sequence_test.go | 143 ++++++++++++++++++ pkg/grpc/service/service.go | 38 ++++- pkg/memtable/iterator_adapter.go | 8 + pkg/memtable/skiplist.go | 8 + pkg/replication/manager.go | 48 +++--- pkg/replication/primary.go | 22 +-- pkg/replication/replica.go | 14 +- pkg/sstable/block/block_builder.go | 49 ++++-- pkg/sstable/block/block_iterator.go | 40 ++++- pkg/sstable/block/types.go | 5 +- pkg/sstable/iterator.go | 13 ++ pkg/sstable/iterator_adapter.go | 8 + pkg/sstable/writer.go | 15 +- pkg/transaction/manager.go | 18 +-- pkg/transaction/registry.go | 66 ++++---- pkg/transaction/transaction.go | 10 +- pkg/transport/interface.go | 18 +-- 24 files changed, 523 insertions(+), 211 deletions(-) create mode 100644 pkg/engine/storage/sequence_test.go diff --git a/cmd/kevo/server.go b/cmd/kevo/server.go index 33b1f16..7b093f7 100644 --- a/cmd/kevo/server.go +++ b/cmd/kevo/server.go @@ -33,7 +33,7 @@ func NewServer(eng *engine.EngineFacade, config Config) *Server { // Create a transaction registry directly from the transaction package // The transaction registry can work with any type that implements BeginTransaction txRegistry := transaction.NewRegistry() - + return &Server{ eng: eng, txRegistry: txRegistry, @@ -86,7 +86,7 @@ func (s *Server) Start() error { kaPolicy := keepalive.EnforcementPolicy{ MinTime: 10 * time.Second, // Minimum time a client should wait between pings - PermitWithoutStream: true, // Allow pings even when there are no active streams + PermitWithoutStream: true, // Allow pings even when there are no active streams } serverOpts = append(serverOpts, diff --git a/pkg/client/client.go b/pkg/client/client.go index cdcf0b1..dbe8030 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -47,7 +47,7 @@ type ClientOptions struct { // Performance options Compression CompressionType // Compression algorithm MaxMessageSize int // Maximum message size - + // Keepalive options KeepaliveTime time.Duration // Time between keepalive pings (0 for default) KeepaliveTimeout time.Duration // Time to wait for ping ack (0 for default) @@ -117,15 +117,15 @@ func NewClient(options ClientOptions) (*Client, error) { PermitWithoutStream: true, // Allow pings even when there are no active streams } } - + transportOpts := transport.TransportOptions{ - Timeout: options.ConnectTimeout, - MaxMessageSize: options.MaxMessageSize, - Compression: options.Compression, - TLSEnabled: options.TLSEnabled, - CertFile: options.CertFile, - KeyFile: options.KeyFile, - CAFile: options.CAFile, + Timeout: options.ConnectTimeout, + MaxMessageSize: options.MaxMessageSize, + Compression: options.Compression, + TLSEnabled: options.TLSEnabled, + CertFile: options.CertFile, + KeyFile: options.KeyFile, + CAFile: options.CAFile, KeepaliveParams: keepaliveParams, RetryPolicy: transport.RetryPolicy{ MaxRetries: options.MaxRetries, @@ -243,15 +243,15 @@ func (c *Client) createTransportOptions(options ClientOptions) transport.Transpo PermitWithoutStream: true, // Allow pings even when there are no active streams } } - + return transport.TransportOptions{ - Timeout: options.ConnectTimeout, - MaxMessageSize: options.MaxMessageSize, - Compression: options.Compression, - TLSEnabled: options.TLSEnabled, - CertFile: options.CertFile, - KeyFile: options.KeyFile, - CAFile: options.CAFile, + Timeout: options.ConnectTimeout, + MaxMessageSize: options.MaxMessageSize, + Compression: options.Compression, + TLSEnabled: options.TLSEnabled, + CertFile: options.CertFile, + KeyFile: options.KeyFile, + CAFile: options.CAFile, KeepaliveParams: keepaliveParams, RetryPolicy: transport.RetryPolicy{ MaxRetries: options.MaxRetries, diff --git a/pkg/common/iterator/filtered/filtered.go b/pkg/common/iterator/filtered/filtered.go index 265eaf6..fd5daf8 100644 --- a/pkg/common/iterator/filtered/filtered.go +++ b/pkg/common/iterator/filtered/filtered.go @@ -57,7 +57,7 @@ func (fi *FilteredIterator) IsTombstone() bool { // SeekToFirst positions at the first key that passes the filter func (fi *FilteredIterator) SeekToFirst() { fi.iter.SeekToFirst() - + // Advance to the first key that passes the filter if fi.iter.Valid() && !fi.keyFilter(fi.iter.Key()) { fi.Next() @@ -67,17 +67,17 @@ func (fi *FilteredIterator) SeekToFirst() { // SeekToLast positions at the last key that passes the filter func (fi *FilteredIterator) SeekToLast() { // This is a simplistic implementation that may not be efficient - // For a production-quality implementation, we might want a more + // For a production-quality implementation, we might want a more // sophisticated approach fi.iter.SeekToLast() - + // If we're at a valid position but it doesn't pass the filter, // we need to find the last key that does if fi.iter.Valid() && !fi.keyFilter(fi.iter.Key()) { // Inefficient but correct - scan from beginning to find last valid key var lastValidKey []byte fi.iter.SeekToFirst() - + for fi.iter.Valid() { if fi.keyFilter(fi.iter.Key()) { lastValidKey = make([]byte, len(fi.iter.Key())) @@ -85,7 +85,7 @@ func (fi *FilteredIterator) SeekToLast() { } fi.iter.Next() } - + // If we found a valid key, seek to it if lastValidKey != nil { fi.iter.Seek(lastValidKey) @@ -102,12 +102,12 @@ func (fi *FilteredIterator) Seek(target []byte) bool { if !fi.iter.Seek(target) { return false } - + // If the current position doesn't pass the filter, find the next one that does if !fi.keyFilter(fi.iter.Key()) { return fi.Next() } - + return true } @@ -133,4 +133,4 @@ func NewPrefixIterator(iter iterator.Iterator, prefix []byte) *FilteredIterator // SuffixIterator returns an iterator that filters keys by suffix func NewSuffixIterator(iter iterator.Iterator, suffix []byte) *FilteredIterator { return NewFilteredIterator(iter, SuffixFilterFunc(suffix)) -} \ No newline at end of file +} diff --git a/pkg/common/iterator/filtered/filtered_test.go b/pkg/common/iterator/filtered/filtered_test.go index b201a18..1b8cd45 100644 --- a/pkg/common/iterator/filtered/filtered_test.go +++ b/pkg/common/iterator/filtered/filtered_test.go @@ -111,60 +111,60 @@ func TestFilteredIterator(t *testing.T) { } baseIter := NewMockIterator(entries) - + // Filter for keys starting with 'a' filter := func(key []byte) bool { return bytes.HasPrefix(key, []byte("a")) } - + filtered := NewFilteredIterator(baseIter, filter) - + // Test SeekToFirst and Next filtered.SeekToFirst() - + if !filtered.Valid() { t.Fatal("Expected valid position after SeekToFirst") } - + if string(filtered.Key()) != "a1" { t.Errorf("Expected key 'a1', got '%s'", string(filtered.Key())) } - + if string(filtered.Value()) != "val1" { t.Errorf("Expected value 'val1', got '%s'", string(filtered.Value())) } - + if filtered.IsTombstone() { t.Error("Expected non-tombstone for first entry") } - + // Advance to next matching entry if !filtered.Next() { t.Fatal("Expected successful Next() call") } - + if string(filtered.Key()) != "a3" { t.Errorf("Expected key 'a3', got '%s'", string(filtered.Key())) } - + if !filtered.IsTombstone() { t.Error("Expected tombstone for second entry") } - + // Advance again if !filtered.Next() { t.Fatal("Expected successful Next() call") } - + if string(filtered.Key()) != "a5" { t.Errorf("Expected key 'a5', got '%s'", string(filtered.Key())) } - + // No more entries if filtered.Next() { t.Fatal("Expected end of iteration") } - + if filtered.Valid() { t.Fatal("Expected invalid position at end of iteration") } @@ -182,27 +182,27 @@ func TestPrefixIterator(t *testing.T) { baseIter := NewMockIterator(entries) prefixIter := NewPrefixIterator(baseIter, []byte("apple")) - + // Count matching entries prefixIter.SeekToFirst() - + count := 0 for prefixIter.Valid() { count++ prefixIter.Next() } - + if count != 3 { t.Errorf("Expected 3 entries with prefix 'apple', got %d", count) } - + // Test Seek prefixIter.Seek([]byte("apple3")) - + if !prefixIter.Valid() { t.Fatal("Expected valid position after Seek") } - + if string(prefixIter.Key()) != "apple3" { t.Errorf("Expected key 'apple3', got '%s'", string(prefixIter.Key())) } @@ -220,28 +220,28 @@ func TestSuffixIterator(t *testing.T) { baseIter := NewMockIterator(entries) suffixIter := NewSuffixIterator(baseIter, []byte("_suffix")) - + // Count matching entries suffixIter.SeekToFirst() - + count := 0 for suffixIter.Valid() { count++ suffixIter.Next() } - + if count != 3 { t.Errorf("Expected 3 entries with suffix '_suffix', got %d", count) } - + // Test seeking to find entries with suffix suffixIter.Seek([]byte("key3")) - + if !suffixIter.Valid() { t.Fatal("Expected valid position after Seek") } - + if string(suffixIter.Key()) != "key3_suffix" { t.Errorf("Expected key 'key3_suffix', got '%s'", string(suffixIter.Key())) } -} \ No newline at end of file +} diff --git a/pkg/config/config.go b/pkg/config/config.go index c4bdb15..5006b3f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -58,12 +58,12 @@ type Config struct { MaxLevelWithTombstones int `json:"max_level_with_tombstones"` // Levels higher than this discard tombstones // Transaction configuration - ReadOnlyTxTTL int64 `json:"read_only_tx_ttl"` // Time-to-live for read-only transactions in seconds (default: 180s) - ReadWriteTxTTL int64 `json:"read_write_tx_ttl"` // Time-to-live for read-write transactions in seconds (default: 60s) - IdleTxTimeout int64 `json:"idle_tx_timeout"` // Time after which an inactive transaction is considered idle in seconds (default: 30s) - TxCleanupInterval int64 `json:"tx_cleanup_interval"` // Interval for checking transaction TTLs in seconds (default: 30s) - TxWarningThreshold int `json:"tx_warning_threshold"` // Percentage of TTL after which to log warnings (default: 75) - TxCriticalThreshold int `json:"tx_critical_threshold"` // Percentage of TTL after which to log critical warnings (default: 90) + ReadOnlyTxTTL int64 `json:"read_only_tx_ttl"` // Time-to-live for read-only transactions in seconds (default: 180s) + ReadWriteTxTTL int64 `json:"read_write_tx_ttl"` // Time-to-live for read-write transactions in seconds (default: 60s) + IdleTxTimeout int64 `json:"idle_tx_timeout"` // Time after which an inactive transaction is considered idle in seconds (default: 30s) + TxCleanupInterval int64 `json:"tx_cleanup_interval"` // Interval for checking transaction TTLs in seconds (default: 30s) + TxWarningThreshold int `json:"tx_warning_threshold"` // Percentage of TTL after which to log warnings (default: 75) + TxCriticalThreshold int `json:"tx_critical_threshold"` // Percentage of TTL after which to log critical warnings (default: 90) mu sync.RWMutex } diff --git a/pkg/engine/storage/manager.go b/pkg/engine/storage/manager.go index 9d25fdc..817a582 100644 --- a/pkg/engine/storage/manager.go +++ b/pkg/engine/storage/manager.go @@ -637,34 +637,61 @@ func (m *Manager) flushMemTable(mem *memtable.MemTable) error { count := 0 var bytesWritten uint64 - // Since memtable's skiplist returns keys in sorted order, - // but possibly with duplicates (newer versions of same key first), - // we need to track all processed keys (including tombstones) - processedKeys := make(map[string]struct{}) + // Structure to store information about keys as we iterate + type keyEntry struct { + key []byte + value []byte + seqNum uint64 + } + // Collect keys in sorted order while tracking the newest version of each key + var entries []keyEntry + var previousKey []byte + + // First iterate through memtable (already in sorted order) for iter.SeekToFirst(); iter.Valid(); iter.Next() { - key := iter.Key() - keyStr := string(key) // Use as map key + currentKey := iter.Key() + currentValue := iter.Value() + currentSeqNum := iter.SequenceNumber() - // Skip keys we've already processed (including tombstones) - if _, seen := processedKeys[keyStr]; seen { + // If this is a tombstone (deletion marker), we skip it + if currentValue == nil { continue } - // Mark this key as processed regardless of whether it's a value or tombstone - processedKeys[keyStr] = struct{}{} - - // Only write non-tombstone entries to the SSTable - if value := iter.Value(); value != nil { - bytesWritten += uint64(len(key) + len(value)) - if err := writer.Add(key, value); err != nil { - writer.Abort() - return fmt.Errorf("failed to add entry to SSTable: %w", err) + // If this is the first key or a different key than the previous one + if previousKey == nil || !bytes.Equal(currentKey, previousKey) { + // Add this as a new entry + entries = append(entries, keyEntry{ + key: append([]byte(nil), currentKey...), + value: append([]byte(nil), currentValue...), + seqNum: currentSeqNum, + }) + previousKey = currentKey + } else { + // Same key as previous, check sequence number + lastIndex := len(entries) - 1 + if currentSeqNum > entries[lastIndex].seqNum { + // This is a newer version of the same key, replace the previous entry + entries[lastIndex] = keyEntry{ + key: append([]byte(nil), currentKey...), + value: append([]byte(nil), currentValue...), + seqNum: currentSeqNum, + } } - count++ } } + // Now write all collected entries to the SSTable + for _, entry := range entries { + bytesWritten += uint64(len(entry.key) + len(entry.value)) + if err := writer.AddWithSequence(entry.key, entry.value, entry.seqNum); err != nil { + writer.Abort() + return fmt.Errorf("failed to add entry with sequence number to SSTable: %w", err) + } + count++ + } + if count == 0 { writer.Abort() return nil @@ -857,3 +884,20 @@ func (m *Manager) recoverFromWAL() error { return nil } + +// RetryOnWALRotating retries operations with ErrWALRotating +func (m *Manager) RetryOnWALRotating(operation func() error) error { + maxRetries := 3 + for attempts := 0; attempts < maxRetries; attempts++ { + err := operation() + if err != wal.ErrWALRotating { + // Either success or a different error + return err + } + + // Wait a bit before retrying to allow the rotation to complete + time.Sleep(10 * time.Millisecond) + } + + return fmt.Errorf("operation failed after %d retries: %w", maxRetries, wal.ErrWALRotating) +} diff --git a/pkg/engine/storage/retry.go b/pkg/engine/storage/retry.go index 691d851..bc4a1ea 100644 --- a/pkg/engine/storage/retry.go +++ b/pkg/engine/storage/retry.go @@ -23,12 +23,11 @@ func DefaultRetryConfig() *RetryConfig { } } -// RetryOnWALRotating retries the operation if it fails with ErrWALRotating -func (m *Manager) RetryOnWALRotating(operation func() error) error { - config := DefaultRetryConfig() - return m.RetryWithConfig(operation, config, isWALRotating) -} - +// Commented out due to duplicate declaration with the one in manager.go +// func (m *Manager) RetryOnWALRotating(operation func() error) error { +// config := DefaultRetryConfig() +// return m.RetryWithConfig(operation, config, isWALRotating) +// } // RetryWithConfig retries an operation with the given configuration func (m *Manager) RetryWithConfig(operation func() error, config *RetryConfig, isRetryable func(error) bool) error { diff --git a/pkg/engine/storage/sequence_test.go b/pkg/engine/storage/sequence_test.go new file mode 100644 index 0000000..4a06233 --- /dev/null +++ b/pkg/engine/storage/sequence_test.go @@ -0,0 +1,143 @@ +// ABOUTME: Tests sequence number functionality in storage manager +// ABOUTME: Verifies correct version selection during recovery with sequence numbers + +package storage + +import ( + "bytes" + "os" + "path/filepath" + "testing" + + "github.com/KevoDB/kevo/pkg/config" + "github.com/KevoDB/kevo/pkg/stats" +) + +// TestSequenceNumberVersioning tests that sequence numbers are properly tracked +// and used for version selection during recovery. +func TestSequenceNumberVersioning(t *testing.T) { + // Create temporary directories for the test + tempDir, err := os.MkdirTemp("", "sequence-test-*") + if err != nil { + t.Fatalf("Failed to create temp directory: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create subdirectories for SSTables and WAL + sstDir := filepath.Join(tempDir, "sst") + walDir := filepath.Join(tempDir, "wal") + + // Create configuration + cfg := &config.Config{ + Version: config.CurrentManifestVersion, + SSTDir: sstDir, + WALDir: walDir, + MemTableSize: 1024 * 1024, // 1MB + MemTablePoolCap: 2, + MaxMemTables: 2, + } + + // Create a stats collector + statsCollector := stats.NewAtomicCollector() + + // Create a new storage manager + manager, err := NewManager(cfg, statsCollector) + if err != nil { + t.Fatalf("Failed to create storage manager: %v", err) + } + defer manager.Close() + + // Step 1: Add a key with initial value + testKey := []byte("test-key") + initialValue := []byte("initial-value") + err = manager.Put(testKey, initialValue) + if err != nil { + t.Fatalf("Failed to put initial value: %v", err) + } + + // Verify the key is readable + value, err := manager.Get(testKey) + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + if !bytes.Equal(initialValue, value) { + t.Errorf("Expected initial value %s, got %s", initialValue, value) + } + + // Step 2: Flush to create an SSTable + err = manager.FlushMemTables() + if err != nil { + t.Fatalf("Failed to flush memtables: %v", err) + } + + // Verify data is still accessible after flush + value, err = manager.Get(testKey) + if err != nil { + t.Fatalf("Failed to get value after flush: %v", err) + } + if !bytes.Equal(initialValue, value) { + t.Errorf("Expected initial value %s after flush, got %s", initialValue, value) + } + + // Step 3: Update the key with a new value + updatedValue := []byte("updated-value") + err = manager.Put(testKey, updatedValue) + if err != nil { + t.Fatalf("Failed to put updated value: %v", err) + } + + // Verify the updated value is readable + value, err = manager.Get(testKey) + if err != nil { + t.Fatalf("Failed to get updated value: %v", err) + } + if !bytes.Equal(updatedValue, value) { + t.Errorf("Expected updated value %s, got %s", updatedValue, value) + } + + // Step 4: Flush again to create another SSTable with the updated value + err = manager.FlushMemTables() + if err != nil { + t.Fatalf("Failed to flush memtables again: %v", err) + } + + // Verify updated data is still accessible after second flush + value, err = manager.Get(testKey) + if err != nil { + t.Fatalf("Failed to get value after second flush: %v", err) + } + if !bytes.Equal(updatedValue, value) { + t.Errorf("Expected updated value %s after second flush, got %s", updatedValue, value) + } + + // Get the last sequence number + lastSeqNum := manager.lastSeqNum + + // Step 5: Close the manager and simulate a recovery scenario + err = manager.Close() + if err != nil { + t.Fatalf("Failed to close manager: %v", err) + } + + // Create a new manager to simulate recovery + recoveredManager, err := NewManager(cfg, statsCollector) + if err != nil { + t.Fatalf("Failed to create recovered manager: %v", err) + } + defer recoveredManager.Close() + + // Verify the key still has the latest value after recovery + recoveredValue, err := recoveredManager.Get(testKey) + if err != nil { + t.Fatalf("Failed to get value after recovery: %v", err) + } + if !bytes.Equal(updatedValue, recoveredValue) { + t.Errorf("Expected updated value %s after recovery, got %s", updatedValue, recoveredValue) + } + + // Verify the sequence number was properly recovered + if recoveredManager.lastSeqNum < lastSeqNum { + t.Errorf("Recovered sequence number %d is less than last known sequence number %d", + recoveredManager.lastSeqNum, lastSeqNum) + } +} diff --git a/pkg/grpc/service/service.go b/pkg/grpc/service/service.go index b6ca2ff..5c9d419 100644 --- a/pkg/grpc/service/service.go +++ b/pkg/grpc/service/service.go @@ -240,7 +240,6 @@ func (s *KevoServiceServer) Scan(req *pb.ScanRequest, stream pb.KevoService_Scan return nil } - // BeginTransaction starts a new transaction func (s *KevoServiceServer) BeginTransaction(ctx context.Context, req *pb.BeginTransactionRequest) (*pb.BeginTransactionResponse, error) { // Force clean up of old transactions before creating new ones @@ -460,7 +459,7 @@ func (s *KevoServiceServer) GetStats(ctx context.Context, req *pb.GetStatsReques // Collect basic stats that we know are available keyCount := int64(0) sstableCount := int32(0) - memtableCount := int32(1) // At least 1 active memtable + memtableCount := int32(0) // Will be updated from storage stats // Create a read-only transaction to count keys tx, err := s.engine.BeginTransaction(true) @@ -474,9 +473,18 @@ func (s *KevoServiceServer) GetStats(ctx context.Context, req *pb.GetStatsReques // Count keys and estimate size var totalSize int64 - for iter.Next() { - keyCount++ - totalSize += int64(len(iter.Key()) + len(iter.Value())) + + // Position at the first key + iter.SeekToFirst() + + // Iterate through all keys + for iter.Valid() { + // Only count live keys (not tombstones/deleted keys) + if !iter.IsTombstone() { + keyCount++ + totalSize += int64(len(iter.Key()) + len(iter.Value())) + } + iter.Next() } // Get statistics from the engine @@ -484,6 +492,26 @@ func (s *KevoServiceServer) GetStats(ctx context.Context, req *pb.GetStatsReques GetStatsProvider() interface{} }) + // Get storage stats from engine + if storageManager, ok := s.engine.(interface { + GetStorageStats() map[string]interface{} + }); ok { + storageStats := storageManager.GetStorageStats() + + // Set memtable count: always 1 active memtable + any immutable memtables + memtableCount = 1 // Always have at least 1 active memtable + + // Add any immutable memtables to the count + if count, ok := storageStats["immutable_memtable_count"].(int); ok { + memtableCount += int32(count) + } + + // Update sstable count + if count, ok := storageStats["sstable_count"].(int); ok { + sstableCount = int32(count) + } + } + response := &pb.GetStatsResponse{ KeyCount: keyCount, StorageSize: totalSize, diff --git a/pkg/memtable/iterator_adapter.go b/pkg/memtable/iterator_adapter.go index 40af768..9996db4 100644 --- a/pkg/memtable/iterator_adapter.go +++ b/pkg/memtable/iterator_adapter.go @@ -88,3 +88,11 @@ func (a *IteratorAdapter) Valid() bool { func (a *IteratorAdapter) IsTombstone() bool { return a.iter != nil && a.iter.IsTombstone() } + +// SequenceNumber returns the sequence number of the current entry +func (a *IteratorAdapter) SequenceNumber() uint64 { + if !a.Valid() || a.iter.Entry() == nil { + return 0 + } + return a.iter.Entry().seqNum +} diff --git a/pkg/memtable/skiplist.go b/pkg/memtable/skiplist.go index 45bbaa5..5c4aa71 100644 --- a/pkg/memtable/skiplist.go +++ b/pkg/memtable/skiplist.go @@ -317,3 +317,11 @@ func (it *Iterator) Entry() *entry { } return it.current.entry } + +// SequenceNumber returns the sequence number of the current entry +func (it *Iterator) SequenceNumber() uint64 { + if !it.Valid() { + return 0 + } + return it.current.entry.seqNum +} diff --git a/pkg/replication/manager.go b/pkg/replication/manager.go index ac540f0..d7e7a8b 100644 --- a/pkg/replication/manager.go +++ b/pkg/replication/manager.go @@ -175,19 +175,19 @@ func (m *Manager) getPrimaryStatus(status map[string]interface{}) map[string]int } status["listen_address"] = m.config.ListenAddr - + // Get detailed primary status m.primary.mu.RLock() defer m.primary.mu.RUnlock() - + // Add information about connected replicas replicaCount := len(m.primary.sessions) activeReplicas := 0 connectedReplicas := 0 - + // Create a replicas list with detailed information replicas := make([]map[string]interface{}, 0, replicaCount) - + for id, session := range m.primary.sessions { // Track active and connected counts if session.Connected { @@ -196,7 +196,7 @@ func (m *Manager) getPrimaryStatus(status map[string]interface{}) map[string]int if session.Active && session.Connected { activeReplicas++ } - + // Create detailed replica info replicaInfo := map[string]interface{}{ "id": id, @@ -208,16 +208,16 @@ func (m *Manager) getPrimaryStatus(status map[string]interface{}) map[string]int "start_sequence": session.StartSequence, "idle_time_seconds": time.Since(session.LastActivity).Seconds(), } - + replicas = append(replicas, replicaInfo) } - + // Get WAL sequence information currentWalSeq := uint64(0) if m.primary.wal != nil { currentWalSeq = m.primary.wal.GetNextSequence() - 1 // Last used sequence } - + // Add primary-specific information to status status["replica_count"] = replicaCount status["connected_replica_count"] = connectedReplicas @@ -231,7 +231,7 @@ func (m *Manager) getPrimaryStatus(status map[string]interface{}) map[string]int } status["compression_enabled"] = m.primary.enableCompression status["default_codec"] = m.primary.defaultCodec.String() - + return status } @@ -244,30 +244,30 @@ func (m *Manager) getReplicaStatus(status map[string]interface{}) map[string]int // Basic replica information status["primary_address"] = m.config.PrimaryAddr status["last_applied_sequence"] = m.lastApplied - + // Detailed state information currentState := m.replica.GetStateString() status["state"] = currentState - + // Get the state tracker for more detailed information stateTracker := m.replica.stateTracker if stateTracker != nil { // Add state duration stateTime := stateTracker.GetStateDuration() status["state_duration_seconds"] = stateTime.Seconds() - + // Add error information if in error state if currentState == "ERROR" { if err := stateTracker.GetError(); err != nil { status["last_error"] = err.Error() } } - + // Get state transitions transitions := stateTracker.GetTransitions() if len(transitions) > 0 { stateHistory := make([]map[string]interface{}, 0, len(transitions)) - + for _, t := range transitions { stateHistory = append(stateHistory, map[string]interface{}{ "from": t.From.String(), @@ -275,26 +275,26 @@ func (m *Manager) getReplicaStatus(status map[string]interface{}) map[string]int "timestamp": t.Timestamp.UnixNano() / int64(time.Millisecond), }) } - + // Only include the last 10 transitions to keep the response size reasonable if len(stateHistory) > 10 { stateHistory = stateHistory[len(stateHistory)-10:] } - + status["state_history"] = stateHistory } } - + // Add connection information if m.replica.conn != nil { status["connection_status"] = "connected" } else { status["connection_status"] = "disconnected" } - + // Add replication listener information status["replication_listener_address"] = m.config.ListenAddr - + // Include statistics if m.replica.stats != nil { status["entries_received"] = m.replica.stats.GetEntriesReceived() @@ -302,7 +302,7 @@ func (m *Manager) getReplicaStatus(status map[string]interface{}) map[string]int status["bytes_received"] = m.replica.stats.GetBytesReceived() status["batch_count"] = m.replica.stats.GetBatchCount() status["errors"] = m.replica.stats.GetErrorCount() - + // Add last batch time information lastBatchTime := m.replica.stats.GetLastBatchTime() if lastBatchTime > 0 { @@ -310,10 +310,10 @@ func (m *Manager) getReplicaStatus(status map[string]interface{}) map[string]int status["seconds_since_last_batch"] = m.replica.stats.GetLastBatchTimeDuration().Seconds() } } - + // Add configuration information status["force_read_only"] = m.config.ForceReadOnly - + return status } @@ -358,11 +358,11 @@ func (m *Manager) startPrimary() error { opts := []grpc.ServerOption{ grpc.KeepaliveParams(keepalive.ServerParameters{ Time: 30 * time.Second, // Send pings every 30 seconds if there is no activity - Timeout: 10 * time.Second, // Wait 10 seconds for ping ack before assuming connection is dead + Timeout: 10 * time.Second, // Wait 10 seconds for ping ack before assuming connection is dead }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 10 * time.Second, // Minimum time a client should wait between pings - PermitWithoutStream: true, // Allow pings even when there are no active streams + PermitWithoutStream: true, // Allow pings even when there are no active streams }), grpc.MaxRecvMsgSize(16 * 1024 * 1024), // 16MB max message size grpc.MaxSendMsgSize(16 * 1024 * 1024), // 16MB max message size diff --git a/pkg/replication/primary.go b/pkg/replication/primary.go index faf592f..6606d43 100644 --- a/pkg/replication/primary.go +++ b/pkg/replication/primary.go @@ -779,46 +779,46 @@ func (p *Primary) getSession(id string) *ReplicaSession { // maybeManageWALRetention checks if WAL retention management should be triggered func (p *Primary) maybeManageWALRetention() { p.mu.RLock() - + // Find minimum acknowledged sequence across all connected replicas minAcknowledgedSeq := uint64(^uint64(0)) // Start with max value activeReplicas := 0 - + for id, session := range p.sessions { if session.Connected && session.Active { activeReplicas++ if session.LastAckSequence < minAcknowledgedSeq { minAcknowledgedSeq = session.LastAckSequence } - log.Info("Replica %s has acknowledged up to sequence %d", + log.Info("Replica %s has acknowledged up to sequence %d", id, session.LastAckSequence) } } p.mu.RUnlock() - + // Only proceed if we have valid data and active replicas if minAcknowledgedSeq == uint64(^uint64(0)) || minAcknowledgedSeq == 0 { log.Info("No minimum acknowledged sequence found, skipping WAL retention") return } - - log.Info("WAL retention: minimum acknowledged sequence across %d active replicas: %d", + + log.Info("WAL retention: minimum acknowledged sequence across %d active replicas: %d", activeReplicas, minAcknowledgedSeq) - + // Apply the retention policy using the existing WAL API config := wal.WALRetentionConfig{ - MaxAge: time.Duration(p.retentionConfig.MaxAgeHours) * time.Hour, + MaxAge: time.Duration(p.retentionConfig.MaxAgeHours) * time.Hour, MinSequenceKeep: minAcknowledgedSeq, } - + filesDeleted, err := p.wal.ManageRetention(config) if err != nil { log.Error("Failed to manage WAL retention: %v", err) return } - + if filesDeleted > 0 { - log.Info("WAL retention: deleted %d files, min sequence kept: %d", + log.Info("WAL retention: deleted %d files, min sequence kept: %d", filesDeleted, minAcknowledgedSeq) } else { log.Info("WAL retention: no files eligible for deletion") diff --git a/pkg/replication/replica.go b/pkg/replication/replica.go index a2e5942..20cddc7 100644 --- a/pkg/replication/replica.go +++ b/pkg/replication/replica.go @@ -188,7 +188,7 @@ type Replica struct { // Stream client for receiving WAL entries streamClient replication_proto.WALReplicationService_StreamWALClient - + // Statistics for the replica stats *ReplicaStats @@ -869,12 +869,12 @@ func (r *Replica) connectToPrimary() error { func (r *Replica) processEntriesWithoutStateTransitions(response *replication_proto.WALStreamResponse) error { entryCount := len(response.Entries) fmt.Printf("Processing %d entries (no state transitions)\n", entryCount) - + // Track statistics if r.stats != nil { r.stats.TrackBatchReceived() r.stats.TrackEntriesReceived(uint64(entryCount)) - + // Calculate total bytes received var totalBytes uint64 for _, entry := range response.Entries { @@ -939,7 +939,7 @@ func (r *Replica) processEntriesWithoutStateTransitions(response *replication_pr r.mu.Lock() r.lastAppliedSeq = maxSeq r.mu.Unlock() - + // Track applied entries in statistics if r.stats != nil { // Calculate the number of entries that were successfully applied @@ -967,12 +967,12 @@ func (r *Replica) processEntriesWithoutStateTransitions(response *replication_pr func (r *Replica) processEntries(response *replication_proto.WALStreamResponse) error { entryCount := len(response.Entries) fmt.Printf("Processing %d entries\n", entryCount) - + // Track statistics if r.stats != nil { r.stats.TrackBatchReceived() r.stats.TrackEntriesReceived(uint64(entryCount)) - + // Calculate total bytes received var totalBytes uint64 for _, entry := range response.Entries { @@ -1037,7 +1037,7 @@ func (r *Replica) processEntries(response *replication_proto.WALStreamResponse) r.mu.Lock() r.lastAppliedSeq = maxSeq r.mu.Unlock() - + // Track applied entries in statistics if r.stats != nil { // Calculate the number of entries that were successfully applied diff --git a/pkg/sstable/block/block_builder.go b/pkg/sstable/block/block_builder.go index c54c5a9..22c599b 100644 --- a/pkg/sstable/block/block_builder.go +++ b/pkg/sstable/block/block_builder.go @@ -32,6 +32,12 @@ func NewBuilder() *Builder { // Add adds a key-value pair to the block // Keys must be added in sorted order func (b *Builder) Add(key, value []byte) error { + return b.AddWithSequence(key, value, 0) // Default sequence number to 0 for backward compatibility +} + +// AddWithSequence adds a key-value pair to the block with a sequence number +// Keys must be added in sorted order +func (b *Builder) AddWithSequence(key, value []byte, seqNum uint64) error { // Ensure keys are added in sorted order if len(b.entries) > 0 && bytes.Compare(key, b.lastKey) <= 0 { return fmt.Errorf("keys must be added in strictly increasing order, got %s after %s", @@ -39,8 +45,9 @@ func (b *Builder) Add(key, value []byte) error { } b.entries = append(b.entries, Entry{ - Key: append([]byte(nil), key...), // Make copies to avoid references - Value: append([]byte(nil), value...), // to external data + Key: append([]byte(nil), key...), // Make copies to avoid references + Value: append([]byte(nil), value...), // to external data + SequenceNum: seqNum, }) // Add restart point if needed @@ -51,7 +58,7 @@ func (b *Builder) Add(key, value []byte) error { b.restartIdx++ // Track the size - b.currentSize += uint32(len(key) + len(value) + 8) // 8 bytes for metadata + b.currentSize += uint32(len(key) + len(value) + 16) // 16 bytes for metadata (including sequence number) b.lastKey = append([]byte(nil), key...) return nil @@ -94,16 +101,26 @@ func (b *Builder) Finish(w io.Writer) (uint64, error) { // Keys are already sorted by the Add method's requirement - // Remove any duplicate keys (keeping the last one) + // Remove any duplicate keys (keeping the one with the highest sequence number) if len(b.entries) > 1 { - uniqueEntries := make([]Entry, 0, len(b.entries)) - for i := 0; i < len(b.entries); i++ { - // Skip if this is a duplicate of the previous entry - if i > 0 && bytes.Equal(b.entries[i].Key, b.entries[i-1].Key) { - // Replace the previous entry with this one (to keep the latest value) - uniqueEntries[len(uniqueEntries)-1] = b.entries[i] - } else { - uniqueEntries = append(uniqueEntries, b.entries[i]) + // Group entries by key and find entry with highest sequence number + keyMap := make(map[string]Entry) + for _, entry := range b.entries { + keyStr := string(entry.Key) + if existing, exists := keyMap[keyStr]; !exists || entry.SequenceNum > existing.SequenceNum { + keyMap[keyStr] = entry + } + } + + // Rebuild sorted entries from the map + uniqueEntries := make([]Entry, 0, len(keyMap)) + for _, entry := range b.entries { + keyStr := string(entry.Key) + if best, exists := keyMap[keyStr]; exists { + if bytes.Equal(entry.Key, best.Key) && entry.SequenceNum == best.SequenceNum { + uniqueEntries = append(uniqueEntries, best) + delete(keyMap, keyStr) // Delete to avoid duplicates + } } } b.entries = uniqueEntries @@ -176,9 +193,15 @@ func (b *Builder) Finish(w io.Writer) (uint64, error) { } } + // Write sequence number + err := binary.Write(buffer, binary.LittleEndian, entry.SequenceNum) + if err != nil { + return 0, fmt.Errorf("failed to write sequence number: %w", err) + } + // Write value valueLen := uint32(len(entry.Value)) - err := binary.Write(buffer, binary.LittleEndian, valueLen) + err = binary.Write(buffer, binary.LittleEndian, valueLen) if err != nil { return 0, fmt.Errorf("failed to write value length: %w", err) } diff --git a/pkg/sstable/block/block_iterator.go b/pkg/sstable/block/block_iterator.go index a062875..befc239 100644 --- a/pkg/sstable/block/block_iterator.go +++ b/pkg/sstable/block/block_iterator.go @@ -7,13 +7,14 @@ import ( // Iterator allows iterating through key-value pairs in a block type Iterator struct { - reader *Reader - currentPos uint32 - currentKey []byte - currentVal []byte - restartIdx int - initialized bool - dataEnd uint32 // Position where the actual entries data ends (before restart points) + reader *Reader + currentPos uint32 + currentKey []byte + currentVal []byte + currentSeqNum uint64 // Sequence number of the current entry + restartIdx int + initialized bool + dataEnd uint32 // Position where the actual entries data ends (before restart points) } // SeekToFirst positions the iterator at the first entry @@ -199,6 +200,14 @@ func (it *Iterator) IsTombstone() bool { return it.Valid() && it.currentVal == nil } +// SequenceNumber returns the sequence number of the current entry +func (it *Iterator) SequenceNumber() uint64 { + if !it.Valid() { + return 0 + } + return it.currentSeqNum +} + // decodeCurrent decodes the entry at the current position func (it *Iterator) decodeCurrent() ([]byte, []byte, bool) { if it.currentPos >= it.dataEnd { @@ -221,6 +230,13 @@ func (it *Iterator) decodeCurrent() ([]byte, []byte, bool) { copy(key, data[:keyLen]) data = data[keyLen:] + // Read sequence number if format includes it (check if enough data for both seq num and value len) + seqNum := uint64(0) + if len(data) >= 12 { // 8 for seq num + 4 for value len + seqNum = binary.LittleEndian.Uint64(data) + data = data[8:] + } + // Read value if len(data) < 4 { return nil, nil, false @@ -238,6 +254,7 @@ func (it *Iterator) decodeCurrent() ([]byte, []byte, bool) { it.currentKey = key it.currentVal = value + it.currentSeqNum = seqNum return key, value, true } @@ -303,6 +320,14 @@ func (it *Iterator) decodeNext() ([]byte, []byte, bool) { it.currentPos += 4 + uint32(unsharedLen) } + // Read sequence number if format includes it (check if enough data for both seq num and value len) + seqNum := uint64(0) + if len(data) >= 12 { // 8 for seq num + 4 for value len + seqNum = binary.LittleEndian.Uint64(data) + data = data[8:] + it.currentPos += 8 + } + // Read value if len(data) < 4 { return nil, nil, false @@ -318,6 +343,7 @@ func (it *Iterator) decodeNext() ([]byte, []byte, bool) { value := make([]byte, valueLen) copy(value, data[:valueLen]) + it.currentSeqNum = seqNum it.currentPos += 4 + uint32(valueLen) return key, value, true diff --git a/pkg/sstable/block/types.go b/pkg/sstable/block/types.go index bde9304..548427e 100644 --- a/pkg/sstable/block/types.go +++ b/pkg/sstable/block/types.go @@ -2,8 +2,9 @@ package block // Entry represents a key-value pair within the block type Entry struct { - Key []byte - Value []byte + Key []byte + Value []byte + SequenceNum uint64 // Sequence number for versioning } const ( diff --git a/pkg/sstable/iterator.go b/pkg/sstable/iterator.go index 111c162..8612bda 100644 --- a/pkg/sstable/iterator.go +++ b/pkg/sstable/iterator.go @@ -209,6 +209,19 @@ func (it *Iterator) IsTombstone() bool { return it.dataBlockIter.Value() == nil } +// SequenceNumber returns the sequence number of the current entry +func (it *Iterator) SequenceNumber() uint64 { + it.mu.Lock() + defer it.mu.Unlock() + + // Not valid means sequence number 0 + if !it.initialized || it.dataBlockIter == nil || !it.dataBlockIter.Valid() { + return 0 + } + + return it.dataBlockIter.SequenceNumber() +} + // Error returns any error encountered during iteration func (it *Iterator) Error() error { it.mu.Lock() diff --git a/pkg/sstable/iterator_adapter.go b/pkg/sstable/iterator_adapter.go index 41add4b..a1ab57e 100644 --- a/pkg/sstable/iterator_adapter.go +++ b/pkg/sstable/iterator_adapter.go @@ -57,3 +57,11 @@ func (a *IteratorAdapter) Valid() bool { func (a *IteratorAdapter) IsTombstone() bool { return a.Valid() && a.iter.IsTombstone() } + +// SequenceNumber returns the sequence number of the current entry +func (a *IteratorAdapter) SequenceNumber() uint64 { + if !a.Valid() { + return 0 + } + return a.iter.SequenceNumber() +} diff --git a/pkg/sstable/writer.go b/pkg/sstable/writer.go index 04458db..fa28fc0 100644 --- a/pkg/sstable/writer.go +++ b/pkg/sstable/writer.go @@ -99,6 +99,11 @@ func (bm *BlockManager) Add(key, value []byte) error { return bm.builder.Add(key, value) } +// AddWithSequence adds a key-value pair with a sequence number to the current block +func (bm *BlockManager) AddWithSequence(key, value []byte, seqNum uint64) error { + return bm.builder.AddWithSequence(key, value, seqNum) +} + // EstimatedSize returns the estimated size of the current block func (bm *BlockManager) EstimatedSize() uint32 { return bm.builder.EstimatedSize() @@ -285,6 +290,12 @@ func NewWriterWithOptions(path string, options WriterOptions) (*Writer, error) { // Add adds a key-value pair to the SSTable // Keys must be added in sorted order func (w *Writer) Add(key, value []byte) error { + return w.AddWithSequence(key, value, 0) // Default to sequence number 0 for backward compatibility +} + +// AddWithSequence adds a key-value pair with a sequence number to the SSTable +// Keys must be added in sorted order +func (w *Writer) AddWithSequence(key, value []byte, seqNum uint64) error { // Keep track of first and last keys if w.entriesAdded == 0 { w.firstKey = append([]byte(nil), key...) @@ -296,8 +307,8 @@ func (w *Writer) Add(key, value []byte) error { w.currentBloomFilter.AddKey(key) } - // Add to block - if err := w.blockManager.Add(key, value); err != nil { + // Add to block with sequence number + if err := w.blockManager.AddWithSequence(key, value, seqNum); err != nil { return fmt.Errorf("failed to add to block: %w", err) } diff --git a/pkg/transaction/manager.go b/pkg/transaction/manager.go index 45916ce..80e417f 100644 --- a/pkg/transaction/manager.go +++ b/pkg/transaction/manager.go @@ -68,7 +68,7 @@ func (m *Manager) BeginTransaction(readOnly bool) (Transaction, error) { // Create a new transaction now := time.Now() - + // Set TTL based on transaction mode var ttl time.Duration if mode == ReadOnly { @@ -76,16 +76,16 @@ func (m *Manager) BeginTransaction(readOnly bool) (Transaction, error) { } else { ttl = m.readWriteTxTTL } - + tx := &TransactionImpl{ - storage: m.storage, - mode: mode, - buffer: NewBuffer(), - rwLock: &m.txLock, - stats: m, - creationTime: now, + storage: m.storage, + mode: mode, + buffer: NewBuffer(), + rwLock: &m.txLock, + stats: m, + creationTime: now, lastActiveTime: now, - ttl: ttl, + ttl: ttl, } // Set transaction as active diff --git a/pkg/transaction/registry.go b/pkg/transaction/registry.go index 8128bb6..48b4fca 100644 --- a/pkg/transaction/registry.go +++ b/pkg/transaction/registry.go @@ -6,34 +6,34 @@ import ( "reflect" "sync" "time" - + "github.com/KevoDB/kevo/pkg/common/log" ) // Registry manages transaction lifecycle and connections type RegistryImpl struct { - mu sync.RWMutex - transactions map[string]Transaction - nextID uint64 - cleanupTicker *time.Ticker - stopCleanup chan struct{} - connectionTxs map[string]map[string]struct{} - txTTL time.Duration - txWarningThreshold int + mu sync.RWMutex + transactions map[string]Transaction + nextID uint64 + cleanupTicker *time.Ticker + stopCleanup chan struct{} + connectionTxs map[string]map[string]struct{} + txTTL time.Duration + txWarningThreshold int txCriticalThreshold int - idleTxTTL time.Duration + idleTxTTL time.Duration } // NewRegistry creates a new transaction registry with default settings func NewRegistry() Registry { r := &RegistryImpl{ - transactions: make(map[string]Transaction), - connectionTxs: make(map[string]map[string]struct{}), - stopCleanup: make(chan struct{}), - txTTL: 5 * time.Minute, // Default TTL - idleTxTTL: 30 * time.Second, // Idle timeout - txWarningThreshold: 75, // 75% of TTL - txCriticalThreshold: 90, // 90% of TTL + transactions: make(map[string]Transaction), + connectionTxs: make(map[string]map[string]struct{}), + stopCleanup: make(chan struct{}), + txTTL: 5 * time.Minute, // Default TTL + idleTxTTL: 30 * time.Second, // Idle timeout + txWarningThreshold: 75, // 75% of TTL + txCriticalThreshold: 90, // 90% of TTL } // Start periodic cleanup @@ -46,12 +46,12 @@ func NewRegistry() Registry { // NewRegistryWithTTL creates a new transaction registry with a specific TTL func NewRegistryWithTTL(ttl time.Duration, idleTimeout time.Duration, warningThreshold, criticalThreshold int) Registry { r := &RegistryImpl{ - transactions: make(map[string]Transaction), - connectionTxs: make(map[string]map[string]struct{}), - stopCleanup: make(chan struct{}), - txTTL: ttl, - idleTxTTL: idleTimeout, - txWarningThreshold: warningThreshold, + transactions: make(map[string]Transaction), + connectionTxs: make(map[string]map[string]struct{}), + stopCleanup: make(chan struct{}), + txTTL: ttl, + idleTxTTL: idleTimeout, + txWarningThreshold: warningThreshold, txCriticalThreshold: criticalThreshold, } @@ -126,8 +126,8 @@ func (r *RegistryImpl) CleanupStaleTransactions() { if tx, exists := r.transactions[id]; exists { if txImpl, ok := tx.(*TransactionImpl); ok { fmt.Printf("WARNING: Transaction %s has been running for %s (%.1f%% of TTL)\n", - id, now.Sub(txImpl.creationTime).String(), - (float64(now.Sub(txImpl.creationTime)) / float64(txImpl.ttl)) * 100) + id, now.Sub(txImpl.creationTime).String(), + (float64(now.Sub(txImpl.creationTime))/float64(txImpl.ttl))*100) } } } @@ -137,8 +137,8 @@ func (r *RegistryImpl) CleanupStaleTransactions() { if tx, exists := r.transactions[id]; exists { if txImpl, ok := tx.(*TransactionImpl); ok { fmt.Printf("CRITICAL: Transaction %s has been running for %s (%.1f%% of TTL)\n", - id, now.Sub(txImpl.creationTime).String(), - (float64(now.Sub(txImpl.creationTime)) / float64(txImpl.ttl)) * 100) + id, now.Sub(txImpl.creationTime).String(), + (float64(now.Sub(txImpl.creationTime))/float64(txImpl.ttl))*100) } } } @@ -202,27 +202,27 @@ func (r *RegistryImpl) Begin(ctx context.Context, engine interface{}, readOnly b // Just directly try to get a transaction, without complex type checking // The only real requirement is that the engine has a BeginTransaction method // that returns a transaction that matches our Transaction interface - + // Get the method using reflection to avoid type compatibility issues val := reflect.ValueOf(engine) method := val.MethodByName("BeginTransaction") - + if !method.IsValid() { err = fmt.Errorf("engine does not have BeginTransaction method") return } - + // Call the method log.Debug("Calling BeginTransaction via reflection") args := []reflect.Value{reflect.ValueOf(readOnly)} results := method.Call(args) - + // Check for errors if !results[1].IsNil() { err = results[1].Interface().(error) return } - + // Get the transaction txVal := results[0].Interface() tx = txVal.(Transaction) @@ -397,4 +397,4 @@ func (r *RegistryImpl) GracefulShutdown(ctx context.Context) error { r.connectionTxs = make(map[string]map[string]struct{}) return lastErr -} \ No newline at end of file +} diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index 6bcddd8..01f7c80 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -62,7 +62,7 @@ func (tx *TransactionImpl) Get(key []byte) ([]byte, error) { if !tx.active.Load() { return nil, ErrTransactionClosed } - + // Update last active time tx.lastActiveTime = time.Now() @@ -89,7 +89,7 @@ func (tx *TransactionImpl) Put(key, value []byte) error { if !tx.active.Load() { return ErrTransactionClosed } - + // Update last active time tx.lastActiveTime = time.Now() @@ -113,7 +113,7 @@ func (tx *TransactionImpl) Delete(key []byte) error { if !tx.active.Load() { return ErrTransactionClosed } - + // Update last active time tx.lastActiveTime = time.Now() @@ -138,7 +138,7 @@ func (tx *TransactionImpl) NewIterator() iterator.Iterator { // Return an empty iterator return &emptyIterator{} } - + // Update last active time tx.lastActiveTime = time.Now() @@ -172,7 +172,7 @@ func (tx *TransactionImpl) NewRangeIterator(startKey, endKey []byte) iterator.It // Return an empty iterator return &emptyIterator{} } - + // Update last active time tx.lastActiveTime = time.Now() diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go index a38754a..8ca7ede 100644 --- a/pkg/transport/interface.go +++ b/pkg/transport/interface.go @@ -3,7 +3,7 @@ package transport import ( "context" "time" - + "google.golang.org/grpc/keepalive" ) @@ -28,14 +28,14 @@ type RetryPolicy struct { // TransportOptions contains common configuration across all transport types type TransportOptions struct { - Timeout time.Duration - RetryPolicy RetryPolicy - Compression CompressionType - MaxMessageSize int - TLSEnabled bool - CertFile string - KeyFile string - CAFile string + Timeout time.Duration + RetryPolicy RetryPolicy + Compression CompressionType + MaxMessageSize int + TLSEnabled bool + CertFile string + KeyFile string + CAFile string KeepaliveParams *keepalive.ClientParameters // Optional keepalive parameters for gRPC clients }