diff --git a/TODO.md b/TODO.md index 48f95e2..7cca314 100644 --- a/TODO.md +++ b/TODO.md @@ -133,11 +133,11 @@ This document outlines the implementation tasks for the Go Storage Engine, organ - [x] Add crash recovery for batches - [x] Design extensible interfaces for future transaction support -- [ ] Add basic statistics and metrics - - [ ] Implement counters for operations - - [ ] Add timing measurements for critical paths - - [ ] Create exportable metrics interface - - [ ] Test accuracy of metrics +- [x] Add basic statistics and metrics + - [x] Implement counters for operations + - [x] Add timing measurements for critical paths + - [x] Create exportable metrics interface + - [x] Test accuracy of metrics ## Phase G: Optimization and Benchmarking diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 5d23d1a..560b9b7 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -29,6 +29,39 @@ var ( ErrKeyNotFound = errors.New("key not found") ) +// EngineStats tracks statistics and metrics for the storage engine +type EngineStats struct { + // Operation counters + PutOps atomic.Uint64 + GetOps atomic.Uint64 + GetHits atomic.Uint64 + GetMisses atomic.Uint64 + DeleteOps atomic.Uint64 + + // Timing measurements + LastPutTime time.Time + LastGetTime time.Time + LastDeleteTime time.Time + + // Performance stats + FlushCount atomic.Uint64 + MemTableSize atomic.Uint64 + TotalBytesRead atomic.Uint64 + TotalBytesWritten atomic.Uint64 + + // Error tracking + ReadErrors atomic.Uint64 + WriteErrors atomic.Uint64 + + // Transaction stats + TxStarted atomic.Uint64 + TxCompleted atomic.Uint64 + TxAborted atomic.Uint64 + + // Mutex for accessing non-atomic fields + mu sync.RWMutex +} + // Engine implements the core storage engine functionality type Engine struct { // Configuration and paths @@ -56,6 +89,9 @@ type Engine struct { bgFlushCh chan struct{} closed atomic.Bool + // Statistics + stats EngineStats + // Concurrency control mu sync.RWMutex // Main lock for engine state flushMu sync.Mutex // Lock for flushing operations @@ -138,23 +174,39 @@ func (e *Engine) Put(key, value []byte) error { e.mu.Lock() defer e.mu.Unlock() + // Track operation and time + e.stats.PutOps.Add(1) + + e.stats.mu.Lock() + e.stats.LastPutTime = time.Now() + e.stats.mu.Unlock() + if e.closed.Load() { + e.stats.WriteErrors.Add(1) return ErrEngineClosed } // Append to WAL seqNum, err := e.wal.Append(wal.OpTypePut, key, value) if err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to append to WAL: %w", err) } + // Track bytes written + e.stats.TotalBytesWritten.Add(uint64(len(key) + len(value))) + // Add to MemTable e.memTablePool.Put(key, value, seqNum) e.lastSeqNum = seqNum + + // Update memtable size estimate + e.stats.MemTableSize.Store(uint64(e.memTablePool.TotalSize())) // Check if MemTable needs to be flushed if e.memTablePool.IsFlushNeeded() { if err := e.scheduleFlush(); err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to schedule flush: %w", err) } } @@ -204,17 +256,32 @@ func (e *Engine) Get(key []byte) ([]byte, error) { e.mu.RLock() defer e.mu.RUnlock() + // Track operation and time + e.stats.GetOps.Add(1) + + e.stats.mu.Lock() + e.stats.LastGetTime = time.Now() + e.stats.mu.Unlock() + if e.closed.Load() { + e.stats.ReadErrors.Add(1) return nil, ErrEngineClosed } + // Track bytes read (key only at this point) + e.stats.TotalBytesRead.Add(uint64(len(key))) + // Check the MemTablePool (active + immutables) if val, found := e.memTablePool.Get(key); found { // The key was found, but check if it's a deletion marker if val == nil { // This is a deletion marker - the key exists but was deleted + e.stats.GetMisses.Add(1) return nil, ErrKeyNotFound } + // Track bytes read (value part) + e.stats.TotalBytesRead.Add(uint64(len(val))) + e.stats.GetHits.Add(1) return val, nil } @@ -240,13 +307,18 @@ func (e *Engine) Get(key []byte) ([]byte, error) { // This should handle nil values that are tombstones if iter.IsTombstone() { // Found a tombstone, so this key is definitely deleted + e.stats.GetMisses.Add(1) return nil, ErrKeyNotFound } // Found a non-tombstone value for this key - return iter.Value(), nil + value := iter.Value() + e.stats.TotalBytesRead.Add(uint64(len(value))) + e.stats.GetHits.Add(1) + return value, nil } + e.stats.GetMisses.Add(1) return nil, ErrKeyNotFound } @@ -255,19 +327,34 @@ func (e *Engine) Delete(key []byte) error { e.mu.Lock() defer e.mu.Unlock() + // Track operation and time + e.stats.DeleteOps.Add(1) + + e.stats.mu.Lock() + e.stats.LastDeleteTime = time.Now() + e.stats.mu.Unlock() + if e.closed.Load() { + e.stats.WriteErrors.Add(1) return ErrEngineClosed } // Append to WAL seqNum, err := e.wal.Append(wal.OpTypeDelete, key, nil) if err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to append to WAL: %w", err) } + // Track bytes written (just the key for deletes) + e.stats.TotalBytesWritten.Add(uint64(len(key))) + // Add deletion marker to MemTable e.memTablePool.Delete(key, seqNum) e.lastSeqNum = seqNum + + // Update memtable size estimate + e.stats.MemTableSize.Store(uint64(e.memTablePool.TotalSize())) // If compaction manager exists, also track this tombstone if e.compactionMgr != nil { @@ -284,6 +371,7 @@ func (e *Engine) Delete(key []byte) error { // Check if MemTable needs to be flushed if e.memTablePool.IsFlushNeeded() { if err := e.scheduleFlush(); err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to schedule flush: %w", err) } } @@ -374,6 +462,7 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error { // Ensure the SSTable directory exists err := os.MkdirAll(e.sstableDir, 0755) if err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to create SSTable directory: %w", err) } @@ -386,19 +475,24 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error { // Create a new SSTable writer writer, err := sstable.NewWriter(sstPath) if err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to create SSTable writer: %w", err) } // Get an iterator over the MemTable iter := mem.NewIterator() count := 0 + var bytesWritten uint64 // Write all entries to the SSTable for iter.SeekToFirst(); iter.Valid(); iter.Next() { // Skip deletion markers, only add value entries if value := iter.Value(); value != nil { - if err := writer.Add(iter.Key(), value); err != nil { + key := iter.Key() + bytesWritten += uint64(len(key) + len(value)) + if err := writer.Add(key, value); err != nil { writer.Abort() + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to add entry to SSTable: %w", err) } count++ @@ -412,17 +506,26 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error { // Finish writing the SSTable if err := writer.Finish(); err != nil { + e.stats.WriteErrors.Add(1) return fmt.Errorf("failed to finish SSTable: %w", err) } + // Track bytes written to SSTable + e.stats.TotalBytesWritten.Add(bytesWritten) + + // Track flush count + e.stats.FlushCount.Add(1) + // Verify the file was created if _, err := os.Stat(sstPath); os.IsNotExist(err) { + e.stats.WriteErrors.Add(1) return fmt.Errorf("SSTable file was not created at %s", sstPath) } // Open the new SSTable for reading reader, err := sstable.OpenReader(sstPath) if err != nil { + e.stats.ReadErrors.Add(1) return fmt.Errorf("failed to open SSTable: %w", err) } @@ -526,6 +629,67 @@ func (e *Engine) GetRWLock() *sync.RWMutex { return &e.txLock } +// Transaction interface for interactions with the engine package +type Transaction interface { + Get(key []byte) ([]byte, error) + Put(key, value []byte) error + Delete(key []byte) error + NewIterator() Iterator + NewRangeIterator(startKey, endKey []byte) Iterator + Commit() error + Rollback() error + IsReadOnly() bool +} + +// TransactionCreator is implemented by packages that can create transactions +type TransactionCreator interface { + CreateTransaction(engine interface{}, readOnly bool) (Transaction, error) +} + +// transactionCreatorFunc holds the function that creates transactions +var transactionCreatorFunc TransactionCreator + +// RegisterTransactionCreator registers a function that can create transactions +func RegisterTransactionCreator(creator TransactionCreator) { + transactionCreatorFunc = creator +} + +// BeginTransaction starts a new transaction with the given read-only flag +func (e *Engine) BeginTransaction(readOnly bool) (Transaction, error) { + // Verify engine is open + if e.closed.Load() { + return nil, ErrEngineClosed + } + + // Track transaction start + e.stats.TxStarted.Add(1) + + // Check if we have a transaction creator registered + if transactionCreatorFunc == nil { + e.stats.WriteErrors.Add(1) + return nil, fmt.Errorf("no transaction creator registered") + } + + // Create a new transaction + txn, err := transactionCreatorFunc.CreateTransaction(e, readOnly) + if err != nil { + e.stats.WriteErrors.Add(1) + return nil, err + } + + return txn, nil +} + +// IncrementTxCompleted increments the completed transaction counter +func (e *Engine) IncrementTxCompleted() { + e.stats.TxCompleted.Add(1) +} + +// IncrementTxAborted increments the aborted transaction counter +func (e *Engine) IncrementTxAborted() { + e.stats.TxAborted.Add(1) +} + // ApplyBatch atomically applies a batch of operations func (e *Engine) ApplyBatch(entries []*wal.Entry) error { e.mu.Lock() @@ -597,6 +761,55 @@ func (e *Engine) GetRangeIterator(startKey, endKey []byte) (Iterator, error) { return iter, nil } +// GetStats returns the current statistics for the engine +func (e *Engine) GetStats() map[string]interface{} { + stats := make(map[string]interface{}) + + // Add operation counters + stats["put_ops"] = e.stats.PutOps.Load() + stats["get_ops"] = e.stats.GetOps.Load() + stats["get_hits"] = e.stats.GetHits.Load() + stats["get_misses"] = e.stats.GetMisses.Load() + stats["delete_ops"] = e.stats.DeleteOps.Load() + + // Add transaction statistics + stats["tx_started"] = e.stats.TxStarted.Load() + stats["tx_completed"] = e.stats.TxCompleted.Load() + stats["tx_aborted"] = e.stats.TxAborted.Load() + + // Add performance metrics + stats["flush_count"] = e.stats.FlushCount.Load() + stats["memtable_size"] = e.stats.MemTableSize.Load() + stats["total_bytes_read"] = e.stats.TotalBytesRead.Load() + stats["total_bytes_written"] = e.stats.TotalBytesWritten.Load() + + // Add error statistics + stats["read_errors"] = e.stats.ReadErrors.Load() + stats["write_errors"] = e.stats.WriteErrors.Load() + + // Add timing information + e.stats.mu.RLock() + defer e.stats.mu.RUnlock() + + stats["last_put_time"] = e.stats.LastPutTime.UnixNano() + stats["last_get_time"] = e.stats.LastGetTime.UnixNano() + stats["last_delete_time"] = e.stats.LastDeleteTime.UnixNano() + + // Add data store statistics + stats["sstable_count"] = len(e.sstables) + stats["immutable_memtable_count"] = len(e.immutableMTs) + + // Add compaction statistics if available + if e.compactionMgr != nil { + compactionStats := e.compactionMgr.GetCompactionStats() + for k, v := range compactionStats { + stats["compaction_"+k] = v + } + } + + return stats +} + // Close closes the storage engine func (e *Engine) Close() error { // First set the closed flag - use atomic operation to prevent race conditions diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index ae10304..2c61205 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -327,4 +327,100 @@ func TestEngine_Reload(t *testing.T) { t.Errorf("Got incorrect value for key %s. Expected: %s, Got: %s", data.key, data.value, string(value)) } } +} + +func TestEngine_Statistics(t *testing.T) { + _, engine, cleanup := setupTest(t) + defer cleanup() + + // 1. Test Put operation stats + err := engine.Put([]byte("key1"), []byte("value1")) + if err != nil { + t.Fatalf("Failed to put key-value: %v", err) + } + + stats := engine.GetStats() + if stats["put_ops"] != uint64(1) { + t.Errorf("Expected 1 put operation, got: %v", stats["put_ops"]) + } + if stats["memtable_size"].(uint64) == 0 { + t.Errorf("Expected non-zero memtable size, got: %v", stats["memtable_size"]) + } + if stats["get_ops"] != uint64(0) { + t.Errorf("Expected 0 get operations, got: %v", stats["get_ops"]) + } + + // 2. Test Get operation stats + val, err := engine.Get([]byte("key1")) + if err != nil { + t.Fatalf("Failed to get key: %v", err) + } + if !bytes.Equal(val, []byte("value1")) { + t.Errorf("Got incorrect value. Expected: %s, Got: %s", "value1", string(val)) + } + + _, err = engine.Get([]byte("nonexistent")) + if err != ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound for non-existent key, got: %v", err) + } + + stats = engine.GetStats() + if stats["get_ops"] != uint64(2) { + t.Errorf("Expected 2 get operations, got: %v", stats["get_ops"]) + } + if stats["get_hits"] != uint64(1) { + t.Errorf("Expected 1 get hit, got: %v", stats["get_hits"]) + } + if stats["get_misses"] != uint64(1) { + t.Errorf("Expected 1 get miss, got: %v", stats["get_misses"]) + } + + // 3. Test Delete operation stats + err = engine.Delete([]byte("key1")) + if err != nil { + t.Fatalf("Failed to delete key: %v", err) + } + + stats = engine.GetStats() + if stats["delete_ops"] != uint64(1) { + t.Errorf("Expected 1 delete operation, got: %v", stats["delete_ops"]) + } + + // 4. Verify key is deleted + _, err = engine.Get([]byte("key1")) + if err != ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound after delete, got: %v", err) + } + + stats = engine.GetStats() + if stats["get_ops"] != uint64(3) { + t.Errorf("Expected 3 get operations, got: %v", stats["get_ops"]) + } + if stats["get_misses"] != uint64(2) { + t.Errorf("Expected 2 get misses, got: %v", stats["get_misses"]) + } + + // 5. Test flush stats + for i := 0; i < 10; i++ { + key := []byte(fmt.Sprintf("bulk-key-%d", i)) + value := []byte(fmt.Sprintf("bulk-value-%d", i)) + if err := engine.Put(key, value); err != nil { + t.Fatalf("Failed to put bulk data: %v", err) + } + } + + // Force a flush + if engine.memTablePool.IsFlushNeeded() { + engine.FlushImMemTables() + } else { + tables := engine.memTablePool.GetMemTables() + if len(tables) > 0 { + engine.flushMemTable(tables[0]) + } + } + + stats = engine.GetStats() + if stats["flush_count"].(uint64) == 0 { + t.Errorf("Expected at least 1 flush, got: %v", stats["flush_count"]) + } } \ No newline at end of file diff --git a/pkg/transaction/creator.go b/pkg/transaction/creator.go new file mode 100644 index 0000000..a8ae378 --- /dev/null +++ b/pkg/transaction/creator.go @@ -0,0 +1,33 @@ +package transaction + +import ( + "git.canoozie.net/jer/go-storage/pkg/engine" +) + +// TransactionCreatorImpl implements the engine.TransactionCreator interface +type TransactionCreatorImpl struct{} + +// CreateTransaction creates a new transaction +func (tc *TransactionCreatorImpl) CreateTransaction(e interface{}, readOnly bool) (engine.Transaction, error) { + // Convert the interface to the engine.Engine type + eng, ok := e.(*engine.Engine) + if !ok { + return nil, ErrInvalidEngine + } + + // Determine transaction mode + var mode TransactionMode + if readOnly { + mode = ReadOnly + } else { + mode = ReadWrite + } + + // Create a new transaction + return NewTransaction(eng, mode) +} + +// Register the transaction creator with the engine +func init() { + engine.RegisterTransactionCreator(&TransactionCreatorImpl{}) +} \ No newline at end of file diff --git a/pkg/transaction/tx_impl.go b/pkg/transaction/tx_impl.go index 1ad7028..b34dc5d 100644 --- a/pkg/transaction/tx_impl.go +++ b/pkg/transaction/tx_impl.go @@ -15,6 +15,7 @@ import ( var ( ErrReadOnlyTransaction = errors.New("cannot write to a read-only transaction") ErrTransactionClosed = errors.New("transaction already committed or rolled back") + ErrInvalidEngine = errors.New("invalid engine type") ) // EngineTransaction implements a SQLite-inspired transaction using reader-writer locks @@ -476,6 +477,9 @@ func (tx *EngineTransaction) Commit() error { // For read-only transactions, just release the read lock if tx.mode == ReadOnly { tx.releaseReadLock() + + // Track transaction completion + tx.engine.IncrementTxCompleted() return nil } @@ -515,6 +519,9 @@ func (tx *EngineTransaction) Commit() error { tx.writeLock = nil } + // Track transaction completion + tx.engine.IncrementTxCompleted() + return err } @@ -539,6 +546,9 @@ func (tx *EngineTransaction) Rollback() error { } } + // Track transaction abort in engine stats + tx.engine.IncrementTxAborted() + return nil } diff --git a/pkg/transaction/tx_test.go b/pkg/transaction/tx_test.go new file mode 100644 index 0000000..20ee982 --- /dev/null +++ b/pkg/transaction/tx_test.go @@ -0,0 +1,182 @@ +package transaction + +import ( + "bytes" + "os" + "testing" + + "git.canoozie.net/jer/go-storage/pkg/engine" +) + +func setupTest(t *testing.T) (*engine.Engine, func()) { + // Create a temporary directory for the test + dir, err := os.MkdirTemp("", "transaction-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + // Create the engine + e, err := engine.NewEngine(dir) + if err != nil { + os.RemoveAll(dir) + t.Fatalf("Failed to create engine: %v", err) + } + + // Return cleanup function + cleanup := func() { + e.Close() + os.RemoveAll(dir) + } + + return e, cleanup +} + +func TestTransaction_BasicOperations(t *testing.T) { + e, cleanup := setupTest(t) + defer cleanup() + + // Get transaction statistics before starting + stats := e.GetStats() + txStarted := stats["tx_started"].(uint64) + + // Begin a read-write transaction + tx, err := e.BeginTransaction(false) + if err != nil { + t.Fatalf("Failed to begin transaction: %v", err) + } + + // Verify transaction started count increased + stats = e.GetStats() + if stats["tx_started"].(uint64) != txStarted+1 { + t.Errorf("Expected tx_started to be %d, got: %d", txStarted+1, stats["tx_started"].(uint64)) + } + + // Put a value in the transaction + err = tx.Put([]byte("tx-key1"), []byte("tx-value1")) + if err != nil { + t.Fatalf("Failed to put value in transaction: %v", err) + } + + // Get the value from the transaction + val, err := tx.Get([]byte("tx-key1")) + if err != nil { + t.Fatalf("Failed to get value from transaction: %v", err) + } + if !bytes.Equal(val, []byte("tx-value1")) { + t.Errorf("Expected value 'tx-value1', got: %s", string(val)) + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + t.Fatalf("Failed to commit transaction: %v", err) + } + + // Verify transaction completed count increased + stats = e.GetStats() + if stats["tx_completed"].(uint64) != 1 { + t.Errorf("Expected tx_completed to be 1, got: %d", stats["tx_completed"].(uint64)) + } + if stats["tx_aborted"].(uint64) != 0 { + t.Errorf("Expected tx_aborted to be 0, got: %d", stats["tx_aborted"].(uint64)) + } + + // Verify the value is accessible from the engine + val, err = e.Get([]byte("tx-key1")) + if err != nil { + t.Fatalf("Failed to get value from engine: %v", err) + } + if !bytes.Equal(val, []byte("tx-value1")) { + t.Errorf("Expected value 'tx-value1', got: %s", string(val)) + } +} + +func TestTransaction_Rollback(t *testing.T) { + e, cleanup := setupTest(t) + defer cleanup() + + // Begin a read-write transaction + tx, err := e.BeginTransaction(false) + if err != nil { + t.Fatalf("Failed to begin transaction: %v", err) + } + + // Put a value in the transaction + err = tx.Put([]byte("tx-key2"), []byte("tx-value2")) + if err != nil { + t.Fatalf("Failed to put value in transaction: %v", err) + } + + // Get the value from the transaction + val, err := tx.Get([]byte("tx-key2")) + if err != nil { + t.Fatalf("Failed to get value from transaction: %v", err) + } + if !bytes.Equal(val, []byte("tx-value2")) { + t.Errorf("Expected value 'tx-value2', got: %s", string(val)) + } + + // Rollback the transaction + if err := tx.Rollback(); err != nil { + t.Fatalf("Failed to rollback transaction: %v", err) + } + + // Verify transaction aborted count increased + stats := e.GetStats() + if stats["tx_completed"].(uint64) != 0 { + t.Errorf("Expected tx_completed to be 0, got: %d", stats["tx_completed"].(uint64)) + } + if stats["tx_aborted"].(uint64) != 1 { + t.Errorf("Expected tx_aborted to be 1, got: %d", stats["tx_aborted"].(uint64)) + } + + // Verify the value is not accessible from the engine + _, err = e.Get([]byte("tx-key2")) + if err != engine.ErrKeyNotFound { + t.Errorf("Expected ErrKeyNotFound, got: %v", err) + } +} + +func TestTransaction_ReadOnly(t *testing.T) { + e, cleanup := setupTest(t) + defer cleanup() + + // Add some data to the engine + if err := e.Put([]byte("key-ro"), []byte("value-ro")); err != nil { + t.Fatalf("Failed to put value in engine: %v", err) + } + + // Begin a read-only transaction + tx, err := e.BeginTransaction(true) + if err != nil { + t.Fatalf("Failed to begin transaction: %v", err) + } + if !tx.IsReadOnly() { + t.Errorf("Expected transaction to be read-only") + } + + // Read the value + val, err := tx.Get([]byte("key-ro")) + if err != nil { + t.Fatalf("Failed to get value from transaction: %v", err) + } + if !bytes.Equal(val, []byte("value-ro")) { + t.Errorf("Expected value 'value-ro', got: %s", string(val)) + } + + // Attempt to write (should fail) + err = tx.Put([]byte("new-key"), []byte("new-value")) + if err == nil { + t.Errorf("Expected error when putting value in read-only transaction") + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + t.Fatalf("Failed to commit transaction: %v", err) + } + + // Verify transaction completed count increased + stats := e.GetStats() + if stats["tx_completed"].(uint64) != 1 { + t.Errorf("Expected tx_completed to be 1, got: %d", stats["tx_completed"].(uint64)) + } +} \ No newline at end of file