feat: add support for TTL tracking to transactions, unified tx buffers
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled

This commit is contained in:
Jeremy Tregunna 2025-05-02 22:31:05 -06:00
parent 047e41e0b1
commit eefc915f3a
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
6 changed files with 193 additions and 258 deletions

View File

@ -57,6 +57,14 @@ type Config struct {
CompactionInterval int64 `json:"compaction_interval"`
MaxLevelWithTombstones int `json:"max_level_with_tombstones"` // Levels higher than this discard tombstones
// Transaction configuration
ReadOnlyTxTTL int64 `json:"read_only_tx_ttl"` // Time-to-live for read-only transactions in seconds (default: 180s)
ReadWriteTxTTL int64 `json:"read_write_tx_ttl"` // Time-to-live for read-write transactions in seconds (default: 60s)
IdleTxTimeout int64 `json:"idle_tx_timeout"` // Time after which an inactive transaction is considered idle in seconds (default: 30s)
TxCleanupInterval int64 `json:"tx_cleanup_interval"` // Interval for checking transaction TTLs in seconds (default: 30s)
TxWarningThreshold int `json:"tx_warning_threshold"` // Percentage of TTL after which to log warnings (default: 75)
TxCriticalThreshold int `json:"tx_critical_threshold"` // Percentage of TTL after which to log critical warnings (default: 90)
mu sync.RWMutex
}
@ -92,6 +100,14 @@ func NewDefaultConfig(dbPath string) *Config {
CompactionThreads: 2,
CompactionInterval: 30, // 30 seconds
MaxLevelWithTombstones: 1, // Keep tombstones in levels 0 and 1
// Transaction defaults
ReadOnlyTxTTL: 180, // 3 minutes
ReadWriteTxTTL: 60, // 1 minute
IdleTxTimeout: 30, // 30 seconds
TxCleanupInterval: 30, // 30 seconds
TxWarningThreshold: 75, // 75% of TTL
TxCriticalThreshold: 90, // 90% of TTL
}
}
@ -136,6 +152,31 @@ func (c *Config) Validate() error {
return fmt.Errorf("%w: Compaction ratio must be greater than 1.0", ErrInvalidConfig)
}
// Validate Transaction settings
if c.ReadOnlyTxTTL <= 0 {
return fmt.Errorf("%w: Read-only transaction TTL must be positive", ErrInvalidConfig)
}
if c.ReadWriteTxTTL <= 0 {
return fmt.Errorf("%w: Read-write transaction TTL must be positive", ErrInvalidConfig)
}
if c.IdleTxTimeout <= 0 {
return fmt.Errorf("%w: Idle transaction timeout must be positive", ErrInvalidConfig)
}
if c.TxCleanupInterval <= 0 {
return fmt.Errorf("%w: Transaction cleanup interval must be positive", ErrInvalidConfig)
}
if c.TxWarningThreshold <= 0 || c.TxWarningThreshold >= 100 {
return fmt.Errorf("%w: Transaction warning threshold must be between 1 and 99", ErrInvalidConfig)
}
if c.TxCriticalThreshold <= c.TxWarningThreshold || c.TxCriticalThreshold >= 100 {
return fmt.Errorf("%w: Transaction critical threshold must be between warning threshold and 99", ErrInvalidConfig)
}
return nil
}

View File

@ -1,223 +0,0 @@
package transaction
import (
"bytes"
"encoding/base64"
"sort"
"sync"
)
// Operation represents a single operation in the transaction buffer
type Operation struct {
Key []byte
Value []byte
IsDelete bool
}
// Buffer stores pending changes for a transaction
type Buffer struct {
operations map[string]*Operation // Key string -> Operation (using base64 encoding for binary safety)
mu sync.RWMutex
}
// NewBuffer creates a new transaction buffer
func NewBuffer() *Buffer {
return &Buffer{
operations: make(map[string]*Operation),
}
}
// Put adds or updates a key-value pair in the buffer
func (b *Buffer) Put(key, value []byte) {
b.mu.Lock()
defer b.mu.Unlock()
// Copy the key and value to avoid external modification
keyCopy := make([]byte, len(key))
valueCopy := make([]byte, len(value))
copy(keyCopy, key)
copy(valueCopy, value)
// Create or update the operation - use base64 encoding for map key to avoid binary encoding issues
b.operations[base64.StdEncoding.EncodeToString(key)] = &Operation{
Key: keyCopy,
Value: valueCopy,
IsDelete: false,
}
}
// Delete marks a key for deletion in the buffer
func (b *Buffer) Delete(key []byte) {
b.mu.Lock()
defer b.mu.Unlock()
// Copy the key to avoid external modification
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
// Create or update the operation - use base64 encoding for map key to avoid binary encoding issues
b.operations[base64.StdEncoding.EncodeToString(key)] = &Operation{
Key: keyCopy,
Value: nil,
IsDelete: true,
}
}
// Get retrieves a value for the given key from the buffer
// Returns the value and a boolean indicating if the key was found
func (b *Buffer) Get(key []byte) ([]byte, bool) {
b.mu.RLock()
defer b.mu.RUnlock()
encodedKey := base64.StdEncoding.EncodeToString(key)
op, ok := b.operations[encodedKey]
if !ok {
return nil, false
}
// If this is a deletion marker, return nil
if op.IsDelete {
return nil, true
}
// Return a copy of the value to prevent modification
valueCopy := make([]byte, len(op.Value))
copy(valueCopy, op.Value)
return valueCopy, true
}
// Clear removes all operations from the buffer
func (b *Buffer) Clear() {
b.mu.Lock()
defer b.mu.Unlock()
// Create a new operations map
b.operations = make(map[string]*Operation)
}
// Size returns the number of operations in the buffer
func (b *Buffer) Size() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.operations)
}
// Operations returns a sorted list of operations
// This is used for applying the changes in order
func (b *Buffer) Operations() []*Operation {
b.mu.RLock()
defer b.mu.RUnlock()
// Create a list of operations
ops := make([]*Operation, 0, len(b.operations))
for _, op := range b.operations {
ops = append(ops, op)
}
// Sort by key for consistent application order
sort.Slice(ops, func(i, j int) bool {
return bytes.Compare(ops[i].Key, ops[j].Key) < 0
})
return ops
}
// Iterator returns a new iterator over the buffer
func (b *Buffer) NewIterator() *BufferIterator {
// Get all operations
ops := b.Operations()
return &BufferIterator{
operations: ops,
position: -1,
}
}
// BufferIterator is an iterator over the transaction buffer
type BufferIterator struct {
operations []*Operation
position int
}
// SeekToFirst positions the iterator at the first key
func (it *BufferIterator) SeekToFirst() {
if len(it.operations) > 0 {
it.position = 0
} else {
it.position = -1
}
}
// SeekToLast positions the iterator at the last key
func (it *BufferIterator) SeekToLast() {
if len(it.operations) > 0 {
it.position = len(it.operations) - 1
} else {
it.position = -1
}
}
// Seek positions the iterator at the first key >= target
func (it *BufferIterator) Seek(target []byte) bool {
if len(it.operations) == 0 {
return false
}
// Binary search to find the first key >= target
i := sort.Search(len(it.operations), func(i int) bool {
return bytes.Compare(it.operations[i].Key, target) >= 0
})
if i >= len(it.operations) {
it.position = -1
return false
}
it.position = i
return true
}
// Next advances to the next key
func (it *BufferIterator) Next() bool {
if it.position < 0 || it.position >= len(it.operations)-1 {
it.position = -1
return false
}
it.position++
return true
}
// Key returns the current key
func (it *BufferIterator) Key() []byte {
if it.position < 0 || it.position >= len(it.operations) {
return nil
}
return it.operations[it.position].Key
}
// Value returns the current value
func (it *BufferIterator) Value() []byte {
if it.position < 0 || it.position >= len(it.operations) {
return nil
}
return it.operations[it.position].Value
}
// Valid returns true if the iterator is valid
func (it *BufferIterator) Valid() bool {
return it.position >= 0 && it.position < len(it.operations)
}
// IsTombstone returns true if the current entry is a deletion marker
func (it *BufferIterator) IsTombstone() bool {
if it.position < 0 || it.position >= len(it.operations) {
return false
}
return it.operations[it.position].IsDelete
}

View File

@ -10,6 +10,7 @@ import (
"github.com/KevoDB/kevo/pkg/common/iterator/composite"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
engineIterator "github.com/KevoDB/kevo/pkg/engine/iterator"
tx "github.com/KevoDB/kevo/pkg/transaction"
"github.com/KevoDB/kevo/pkg/wal"
)
@ -32,7 +33,7 @@ type Transaction struct {
readOnly bool
// Buffer for transaction operations
buffer *Buffer
buffer *tx.Buffer
// Transaction state
active atomic.Bool
@ -56,7 +57,7 @@ func NewTransaction(manager interfaces.TransactionManager, storage interfaces.St
manager: manager,
storage: storage,
readOnly: readOnly,
buffer: NewBuffer(),
buffer: tx.NewBuffer(),
iterFactory: engineIterator.NewFactory(),
startTime: time.Now(),
}

View File

@ -3,6 +3,7 @@ package transaction
import (
"sync"
"sync/atomic"
"time"
"github.com/KevoDB/kevo/pkg/stats"
)
@ -22,13 +23,32 @@ type Manager struct {
txStarted atomic.Uint64
txCompleted atomic.Uint64
txAborted atomic.Uint64
// TTL settings
readOnlyTxTTL time.Duration
readWriteTxTTL time.Duration
idleTxTimeout time.Duration
}
// NewManager creates a new transaction manager
// NewManager creates a new transaction manager with default TTL settings
func NewManager(storage StorageBackend, stats stats.Collector) *Manager {
return &Manager{
storage: storage,
stats: stats,
storage: storage,
stats: stats,
readOnlyTxTTL: 3 * time.Minute, // 3 minutes
readWriteTxTTL: 1 * time.Minute, // 1 minute
idleTxTimeout: 30 * time.Second, // 30 seconds
}
}
// NewManagerWithTTL creates a new transaction manager with custom TTL settings
func NewManagerWithTTL(storage StorageBackend, stats stats.Collector, readOnlyTTL, readWriteTTL, idleTimeout time.Duration) *Manager {
return &Manager{
storage: storage,
stats: stats,
readOnlyTxTTL: readOnlyTTL,
readWriteTxTTL: readWriteTTL,
idleTxTimeout: idleTimeout,
}
}
@ -47,12 +67,25 @@ func (m *Manager) BeginTransaction(readOnly bool) (Transaction, error) {
}
// Create a new transaction
now := time.Now()
// Set TTL based on transaction mode
var ttl time.Duration
if mode == ReadOnly {
ttl = m.readOnlyTxTTL
} else {
ttl = m.readWriteTxTTL
}
tx := &TransactionImpl{
storage: m.storage,
mode: mode,
buffer: NewBuffer(),
rwLock: &m.txLock,
stats: m,
storage: m.storage,
mode: mode,
buffer: NewBuffer(),
rwLock: &m.txLock,
stats: m,
creationTime: now,
lastActiveTime: now,
ttl: ttl,
}
// Set transaction as active

View File

@ -9,22 +9,28 @@ import (
// Registry manages transaction lifecycle and connections
type RegistryImpl struct {
mu sync.RWMutex
transactions map[string]Transaction
nextID uint64
cleanupTicker *time.Ticker
stopCleanup chan struct{}
connectionTxs map[string]map[string]struct{}
txTTL time.Duration
mu sync.RWMutex
transactions map[string]Transaction
nextID uint64
cleanupTicker *time.Ticker
stopCleanup chan struct{}
connectionTxs map[string]map[string]struct{}
txTTL time.Duration
txWarningThreshold int
txCriticalThreshold int
idleTxTTL time.Duration
}
// NewRegistry creates a new transaction registry
// NewRegistry creates a new transaction registry with default settings
func NewRegistry() Registry {
r := &RegistryImpl{
transactions: make(map[string]Transaction),
connectionTxs: make(map[string]map[string]struct{}),
stopCleanup: make(chan struct{}),
txTTL: 5 * time.Minute, // Default TTL
transactions: make(map[string]Transaction),
connectionTxs: make(map[string]map[string]struct{}),
stopCleanup: make(chan struct{}),
txTTL: 5 * time.Minute, // Default TTL
idleTxTTL: 30 * time.Second, // Idle timeout
txWarningThreshold: 75, // 75% of TTL
txCriticalThreshold: 90, // 90% of TTL
}
// Start periodic cleanup
@ -35,12 +41,15 @@ func NewRegistry() Registry {
}
// NewRegistryWithTTL creates a new transaction registry with a specific TTL
func NewRegistryWithTTL(ttl time.Duration) Registry {
func NewRegistryWithTTL(ttl time.Duration, idleTimeout time.Duration, warningThreshold, criticalThreshold int) Registry {
r := &RegistryImpl{
transactions: make(map[string]Transaction),
connectionTxs: make(map[string]map[string]struct{}),
stopCleanup: make(chan struct{}),
txTTL: ttl,
transactions: make(map[string]Transaction),
connectionTxs: make(map[string]map[string]struct{}),
stopCleanup: make(chan struct{}),
txTTL: ttl,
idleTxTTL: idleTimeout,
txWarningThreshold: warningThreshold,
txCriticalThreshold: criticalThreshold,
}
// Start periodic cleanup
@ -68,18 +77,71 @@ func (r *RegistryImpl) cleanupStaleTransactions() {
r.mu.Lock()
defer r.mu.Unlock()
// Use the configured TTL (TODO: Add TTL tracking)
// Find stale transactions
now := time.Now()
var staleIDs []string
for id := range r.transactions {
// For simplicity, we don't check the creation time for now
// A more sophisticated implementation would track last activity time
staleIDs = append(staleIDs, id)
var warningIDs []string
var criticalIDs []string
// Find stale, warning, and critical transactions
for id, tx := range r.transactions {
// Skip if not a TransactionImpl
txImpl, ok := tx.(*TransactionImpl)
if !ok {
continue
}
// Check transaction age
txAge := now.Sub(txImpl.creationTime)
if txAge > txImpl.ttl {
staleIDs = append(staleIDs, id)
continue
}
// Check idle time
idleTime := now.Sub(txImpl.lastActiveTime)
if idleTime > r.idleTxTTL {
staleIDs = append(staleIDs, id)
continue
}
// Check warning threshold
warningThresholdDuration := time.Duration(float64(txImpl.ttl) * (float64(r.txWarningThreshold) / 100.0))
if txAge > warningThresholdDuration && txAge <= txImpl.ttl {
warningIDs = append(warningIDs, id)
}
// Check critical threshold
criticalThresholdDuration := time.Duration(float64(txImpl.ttl) * (float64(r.txCriticalThreshold) / 100.0))
if txAge > criticalThresholdDuration && txAge <= txImpl.ttl {
criticalIDs = append(criticalIDs, id)
}
}
// Log warnings
for _, id := range warningIDs {
if tx, exists := r.transactions[id]; exists {
if txImpl, ok := tx.(*TransactionImpl); ok {
fmt.Printf("WARNING: Transaction %s has been running for %s (%.1f%% of TTL)\n",
id, now.Sub(txImpl.creationTime).String(),
(float64(now.Sub(txImpl.creationTime)) / float64(txImpl.ttl)) * 100)
}
}
}
// Log critical warnings
for _, id := range criticalIDs {
if tx, exists := r.transactions[id]; exists {
if txImpl, ok := tx.(*TransactionImpl); ok {
fmt.Printf("CRITICAL: Transaction %s has been running for %s (%.1f%% of TTL)\n",
id, now.Sub(txImpl.creationTime).String(),
(float64(now.Sub(txImpl.creationTime)) / float64(txImpl.ttl)) * 100)
}
}
}
// Log stale transactions
if len(staleIDs) > 0 {
fmt.Printf("Cleaning up %d potentially stale transactions\n", len(staleIDs))
fmt.Printf("Cleaning up %d stale transactions\n", len(staleIDs))
}
// Clean up stale transactions

View File

@ -3,6 +3,7 @@ package transaction
import (
"sync"
"sync/atomic"
"time"
"github.com/KevoDB/kevo/pkg/common/iterator"
"github.com/KevoDB/kevo/pkg/common/iterator/bounded"
@ -38,6 +39,11 @@ type TransactionImpl struct {
// Stats collector
stats StatsCollector
// TTL tracking
creationTime time.Time
lastActiveTime time.Time
ttl time.Duration
}
// StatsCollector defines the interface for collecting transaction statistics
@ -56,6 +62,9 @@ func (tx *TransactionImpl) Get(key []byte) ([]byte, error) {
if !tx.active.Load() {
return nil, ErrTransactionClosed
}
// Update last active time
tx.lastActiveTime = time.Now()
// First check the transaction buffer for any pending changes
if val, found := tx.buffer.Get(key); found {
@ -80,6 +89,9 @@ func (tx *TransactionImpl) Put(key, value []byte) error {
if !tx.active.Load() {
return ErrTransactionClosed
}
// Update last active time
tx.lastActiveTime = time.Now()
// Check if transaction is read-only
if tx.mode == ReadOnly {
@ -101,6 +113,9 @@ func (tx *TransactionImpl) Delete(key []byte) error {
if !tx.active.Load() {
return ErrTransactionClosed
}
// Update last active time
tx.lastActiveTime = time.Now()
// Check if transaction is read-only
if tx.mode == ReadOnly {
@ -123,6 +138,9 @@ func (tx *TransactionImpl) NewIterator() iterator.Iterator {
// Return an empty iterator
return &emptyIterator{}
}
// Update last active time
tx.lastActiveTime = time.Now()
// Get the storage iterator
storageIter, err := tx.storage.GetIterator()
@ -154,6 +172,9 @@ func (tx *TransactionImpl) NewRangeIterator(startKey, endKey []byte) iterator.It
// Return an empty iterator
return &emptyIterator{}
}
// Update last active time
tx.lastActiveTime = time.Now()
// Get the storage iterator for the range
storageIter, err := tx.storage.GetRangeIterator(startKey, endKey)