feat: implement comprehensive storage statistics and metrics

This commit is contained in:
Jeremy Tregunna 2025-04-20 00:28:21 -06:00
parent 72007886f7
commit 5c70c128c7
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
6 changed files with 541 additions and 7 deletions

10
TODO.md
View File

@ -133,11 +133,11 @@ This document outlines the implementation tasks for the Go Storage Engine, organ
- [x] Add crash recovery for batches
- [x] Design extensible interfaces for future transaction support
- [ ] Add basic statistics and metrics
- [ ] Implement counters for operations
- [ ] Add timing measurements for critical paths
- [ ] Create exportable metrics interface
- [ ] Test accuracy of metrics
- [x] Add basic statistics and metrics
- [x] Implement counters for operations
- [x] Add timing measurements for critical paths
- [x] Create exportable metrics interface
- [x] Test accuracy of metrics
## Phase G: Optimization and Benchmarking

View File

@ -29,6 +29,39 @@ var (
ErrKeyNotFound = errors.New("key not found")
)
// EngineStats tracks statistics and metrics for the storage engine
type EngineStats struct {
// Operation counters
PutOps atomic.Uint64
GetOps atomic.Uint64
GetHits atomic.Uint64
GetMisses atomic.Uint64
DeleteOps atomic.Uint64
// Timing measurements
LastPutTime time.Time
LastGetTime time.Time
LastDeleteTime time.Time
// Performance stats
FlushCount atomic.Uint64
MemTableSize atomic.Uint64
TotalBytesRead atomic.Uint64
TotalBytesWritten atomic.Uint64
// Error tracking
ReadErrors atomic.Uint64
WriteErrors atomic.Uint64
// Transaction stats
TxStarted atomic.Uint64
TxCompleted atomic.Uint64
TxAborted atomic.Uint64
// Mutex for accessing non-atomic fields
mu sync.RWMutex
}
// Engine implements the core storage engine functionality
type Engine struct {
// Configuration and paths
@ -56,6 +89,9 @@ type Engine struct {
bgFlushCh chan struct{}
closed atomic.Bool
// Statistics
stats EngineStats
// Concurrency control
mu sync.RWMutex // Main lock for engine state
flushMu sync.Mutex // Lock for flushing operations
@ -138,23 +174,39 @@ func (e *Engine) Put(key, value []byte) error {
e.mu.Lock()
defer e.mu.Unlock()
// Track operation and time
e.stats.PutOps.Add(1)
e.stats.mu.Lock()
e.stats.LastPutTime = time.Now()
e.stats.mu.Unlock()
if e.closed.Load() {
e.stats.WriteErrors.Add(1)
return ErrEngineClosed
}
// Append to WAL
seqNum, err := e.wal.Append(wal.OpTypePut, key, value)
if err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to append to WAL: %w", err)
}
// Track bytes written
e.stats.TotalBytesWritten.Add(uint64(len(key) + len(value)))
// Add to MemTable
e.memTablePool.Put(key, value, seqNum)
e.lastSeqNum = seqNum
// Update memtable size estimate
e.stats.MemTableSize.Store(uint64(e.memTablePool.TotalSize()))
// Check if MemTable needs to be flushed
if e.memTablePool.IsFlushNeeded() {
if err := e.scheduleFlush(); err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to schedule flush: %w", err)
}
}
@ -204,17 +256,32 @@ func (e *Engine) Get(key []byte) ([]byte, error) {
e.mu.RLock()
defer e.mu.RUnlock()
// Track operation and time
e.stats.GetOps.Add(1)
e.stats.mu.Lock()
e.stats.LastGetTime = time.Now()
e.stats.mu.Unlock()
if e.closed.Load() {
e.stats.ReadErrors.Add(1)
return nil, ErrEngineClosed
}
// Track bytes read (key only at this point)
e.stats.TotalBytesRead.Add(uint64(len(key)))
// Check the MemTablePool (active + immutables)
if val, found := e.memTablePool.Get(key); found {
// The key was found, but check if it's a deletion marker
if val == nil {
// This is a deletion marker - the key exists but was deleted
e.stats.GetMisses.Add(1)
return nil, ErrKeyNotFound
}
// Track bytes read (value part)
e.stats.TotalBytesRead.Add(uint64(len(val)))
e.stats.GetHits.Add(1)
return val, nil
}
@ -240,13 +307,18 @@ func (e *Engine) Get(key []byte) ([]byte, error) {
// This should handle nil values that are tombstones
if iter.IsTombstone() {
// Found a tombstone, so this key is definitely deleted
e.stats.GetMisses.Add(1)
return nil, ErrKeyNotFound
}
// Found a non-tombstone value for this key
return iter.Value(), nil
value := iter.Value()
e.stats.TotalBytesRead.Add(uint64(len(value)))
e.stats.GetHits.Add(1)
return value, nil
}
e.stats.GetMisses.Add(1)
return nil, ErrKeyNotFound
}
@ -255,20 +327,35 @@ func (e *Engine) Delete(key []byte) error {
e.mu.Lock()
defer e.mu.Unlock()
// Track operation and time
e.stats.DeleteOps.Add(1)
e.stats.mu.Lock()
e.stats.LastDeleteTime = time.Now()
e.stats.mu.Unlock()
if e.closed.Load() {
e.stats.WriteErrors.Add(1)
return ErrEngineClosed
}
// Append to WAL
seqNum, err := e.wal.Append(wal.OpTypeDelete, key, nil)
if err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to append to WAL: %w", err)
}
// Track bytes written (just the key for deletes)
e.stats.TotalBytesWritten.Add(uint64(len(key)))
// Add deletion marker to MemTable
e.memTablePool.Delete(key, seqNum)
e.lastSeqNum = seqNum
// Update memtable size estimate
e.stats.MemTableSize.Store(uint64(e.memTablePool.TotalSize()))
// If compaction manager exists, also track this tombstone
if e.compactionMgr != nil {
e.compactionMgr.TrackTombstone(key)
@ -284,6 +371,7 @@ func (e *Engine) Delete(key []byte) error {
// Check if MemTable needs to be flushed
if e.memTablePool.IsFlushNeeded() {
if err := e.scheduleFlush(); err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to schedule flush: %w", err)
}
}
@ -374,6 +462,7 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
// Ensure the SSTable directory exists
err := os.MkdirAll(e.sstableDir, 0755)
if err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to create SSTable directory: %w", err)
}
@ -386,19 +475,24 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
// Create a new SSTable writer
writer, err := sstable.NewWriter(sstPath)
if err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to create SSTable writer: %w", err)
}
// Get an iterator over the MemTable
iter := mem.NewIterator()
count := 0
var bytesWritten uint64
// Write all entries to the SSTable
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
// Skip deletion markers, only add value entries
if value := iter.Value(); value != nil {
if err := writer.Add(iter.Key(), value); err != nil {
key := iter.Key()
bytesWritten += uint64(len(key) + len(value))
if err := writer.Add(key, value); err != nil {
writer.Abort()
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to add entry to SSTable: %w", err)
}
count++
@ -412,17 +506,26 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
// Finish writing the SSTable
if err := writer.Finish(); err != nil {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to finish SSTable: %w", err)
}
// Track bytes written to SSTable
e.stats.TotalBytesWritten.Add(bytesWritten)
// Track flush count
e.stats.FlushCount.Add(1)
// Verify the file was created
if _, err := os.Stat(sstPath); os.IsNotExist(err) {
e.stats.WriteErrors.Add(1)
return fmt.Errorf("SSTable file was not created at %s", sstPath)
}
// Open the new SSTable for reading
reader, err := sstable.OpenReader(sstPath)
if err != nil {
e.stats.ReadErrors.Add(1)
return fmt.Errorf("failed to open SSTable: %w", err)
}
@ -526,6 +629,67 @@ func (e *Engine) GetRWLock() *sync.RWMutex {
return &e.txLock
}
// Transaction interface for interactions with the engine package
type Transaction interface {
Get(key []byte) ([]byte, error)
Put(key, value []byte) error
Delete(key []byte) error
NewIterator() Iterator
NewRangeIterator(startKey, endKey []byte) Iterator
Commit() error
Rollback() error
IsReadOnly() bool
}
// TransactionCreator is implemented by packages that can create transactions
type TransactionCreator interface {
CreateTransaction(engine interface{}, readOnly bool) (Transaction, error)
}
// transactionCreatorFunc holds the function that creates transactions
var transactionCreatorFunc TransactionCreator
// RegisterTransactionCreator registers a function that can create transactions
func RegisterTransactionCreator(creator TransactionCreator) {
transactionCreatorFunc = creator
}
// BeginTransaction starts a new transaction with the given read-only flag
func (e *Engine) BeginTransaction(readOnly bool) (Transaction, error) {
// Verify engine is open
if e.closed.Load() {
return nil, ErrEngineClosed
}
// Track transaction start
e.stats.TxStarted.Add(1)
// Check if we have a transaction creator registered
if transactionCreatorFunc == nil {
e.stats.WriteErrors.Add(1)
return nil, fmt.Errorf("no transaction creator registered")
}
// Create a new transaction
txn, err := transactionCreatorFunc.CreateTransaction(e, readOnly)
if err != nil {
e.stats.WriteErrors.Add(1)
return nil, err
}
return txn, nil
}
// IncrementTxCompleted increments the completed transaction counter
func (e *Engine) IncrementTxCompleted() {
e.stats.TxCompleted.Add(1)
}
// IncrementTxAborted increments the aborted transaction counter
func (e *Engine) IncrementTxAborted() {
e.stats.TxAborted.Add(1)
}
// ApplyBatch atomically applies a batch of operations
func (e *Engine) ApplyBatch(entries []*wal.Entry) error {
e.mu.Lock()
@ -597,6 +761,55 @@ func (e *Engine) GetRangeIterator(startKey, endKey []byte) (Iterator, error) {
return iter, nil
}
// GetStats returns the current statistics for the engine
func (e *Engine) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
// Add operation counters
stats["put_ops"] = e.stats.PutOps.Load()
stats["get_ops"] = e.stats.GetOps.Load()
stats["get_hits"] = e.stats.GetHits.Load()
stats["get_misses"] = e.stats.GetMisses.Load()
stats["delete_ops"] = e.stats.DeleteOps.Load()
// Add transaction statistics
stats["tx_started"] = e.stats.TxStarted.Load()
stats["tx_completed"] = e.stats.TxCompleted.Load()
stats["tx_aborted"] = e.stats.TxAborted.Load()
// Add performance metrics
stats["flush_count"] = e.stats.FlushCount.Load()
stats["memtable_size"] = e.stats.MemTableSize.Load()
stats["total_bytes_read"] = e.stats.TotalBytesRead.Load()
stats["total_bytes_written"] = e.stats.TotalBytesWritten.Load()
// Add error statistics
stats["read_errors"] = e.stats.ReadErrors.Load()
stats["write_errors"] = e.stats.WriteErrors.Load()
// Add timing information
e.stats.mu.RLock()
defer e.stats.mu.RUnlock()
stats["last_put_time"] = e.stats.LastPutTime.UnixNano()
stats["last_get_time"] = e.stats.LastGetTime.UnixNano()
stats["last_delete_time"] = e.stats.LastDeleteTime.UnixNano()
// Add data store statistics
stats["sstable_count"] = len(e.sstables)
stats["immutable_memtable_count"] = len(e.immutableMTs)
// Add compaction statistics if available
if e.compactionMgr != nil {
compactionStats := e.compactionMgr.GetCompactionStats()
for k, v := range compactionStats {
stats["compaction_"+k] = v
}
}
return stats
}
// Close closes the storage engine
func (e *Engine) Close() error {
// First set the closed flag - use atomic operation to prevent race conditions

View File

@ -328,3 +328,99 @@ func TestEngine_Reload(t *testing.T) {
}
}
}
func TestEngine_Statistics(t *testing.T) {
_, engine, cleanup := setupTest(t)
defer cleanup()
// 1. Test Put operation stats
err := engine.Put([]byte("key1"), []byte("value1"))
if err != nil {
t.Fatalf("Failed to put key-value: %v", err)
}
stats := engine.GetStats()
if stats["put_ops"] != uint64(1) {
t.Errorf("Expected 1 put operation, got: %v", stats["put_ops"])
}
if stats["memtable_size"].(uint64) == 0 {
t.Errorf("Expected non-zero memtable size, got: %v", stats["memtable_size"])
}
if stats["get_ops"] != uint64(0) {
t.Errorf("Expected 0 get operations, got: %v", stats["get_ops"])
}
// 2. Test Get operation stats
val, err := engine.Get([]byte("key1"))
if err != nil {
t.Fatalf("Failed to get key: %v", err)
}
if !bytes.Equal(val, []byte("value1")) {
t.Errorf("Got incorrect value. Expected: %s, Got: %s", "value1", string(val))
}
_, err = engine.Get([]byte("nonexistent"))
if err != ErrKeyNotFound {
t.Errorf("Expected ErrKeyNotFound for non-existent key, got: %v", err)
}
stats = engine.GetStats()
if stats["get_ops"] != uint64(2) {
t.Errorf("Expected 2 get operations, got: %v", stats["get_ops"])
}
if stats["get_hits"] != uint64(1) {
t.Errorf("Expected 1 get hit, got: %v", stats["get_hits"])
}
if stats["get_misses"] != uint64(1) {
t.Errorf("Expected 1 get miss, got: %v", stats["get_misses"])
}
// 3. Test Delete operation stats
err = engine.Delete([]byte("key1"))
if err != nil {
t.Fatalf("Failed to delete key: %v", err)
}
stats = engine.GetStats()
if stats["delete_ops"] != uint64(1) {
t.Errorf("Expected 1 delete operation, got: %v", stats["delete_ops"])
}
// 4. Verify key is deleted
_, err = engine.Get([]byte("key1"))
if err != ErrKeyNotFound {
t.Errorf("Expected ErrKeyNotFound after delete, got: %v", err)
}
stats = engine.GetStats()
if stats["get_ops"] != uint64(3) {
t.Errorf("Expected 3 get operations, got: %v", stats["get_ops"])
}
if stats["get_misses"] != uint64(2) {
t.Errorf("Expected 2 get misses, got: %v", stats["get_misses"])
}
// 5. Test flush stats
for i := 0; i < 10; i++ {
key := []byte(fmt.Sprintf("bulk-key-%d", i))
value := []byte(fmt.Sprintf("bulk-value-%d", i))
if err := engine.Put(key, value); err != nil {
t.Fatalf("Failed to put bulk data: %v", err)
}
}
// Force a flush
if engine.memTablePool.IsFlushNeeded() {
engine.FlushImMemTables()
} else {
tables := engine.memTablePool.GetMemTables()
if len(tables) > 0 {
engine.flushMemTable(tables[0])
}
}
stats = engine.GetStats()
if stats["flush_count"].(uint64) == 0 {
t.Errorf("Expected at least 1 flush, got: %v", stats["flush_count"])
}
}

View File

@ -0,0 +1,33 @@
package transaction
import (
"git.canoozie.net/jer/go-storage/pkg/engine"
)
// TransactionCreatorImpl implements the engine.TransactionCreator interface
type TransactionCreatorImpl struct{}
// CreateTransaction creates a new transaction
func (tc *TransactionCreatorImpl) CreateTransaction(e interface{}, readOnly bool) (engine.Transaction, error) {
// Convert the interface to the engine.Engine type
eng, ok := e.(*engine.Engine)
if !ok {
return nil, ErrInvalidEngine
}
// Determine transaction mode
var mode TransactionMode
if readOnly {
mode = ReadOnly
} else {
mode = ReadWrite
}
// Create a new transaction
return NewTransaction(eng, mode)
}
// Register the transaction creator with the engine
func init() {
engine.RegisterTransactionCreator(&TransactionCreatorImpl{})
}

View File

@ -15,6 +15,7 @@ import (
var (
ErrReadOnlyTransaction = errors.New("cannot write to a read-only transaction")
ErrTransactionClosed = errors.New("transaction already committed or rolled back")
ErrInvalidEngine = errors.New("invalid engine type")
)
// EngineTransaction implements a SQLite-inspired transaction using reader-writer locks
@ -476,6 +477,9 @@ func (tx *EngineTransaction) Commit() error {
// For read-only transactions, just release the read lock
if tx.mode == ReadOnly {
tx.releaseReadLock()
// Track transaction completion
tx.engine.IncrementTxCompleted()
return nil
}
@ -515,6 +519,9 @@ func (tx *EngineTransaction) Commit() error {
tx.writeLock = nil
}
// Track transaction completion
tx.engine.IncrementTxCompleted()
return err
}
@ -539,6 +546,9 @@ func (tx *EngineTransaction) Rollback() error {
}
}
// Track transaction abort in engine stats
tx.engine.IncrementTxAborted()
return nil
}

182
pkg/transaction/tx_test.go Normal file
View File

@ -0,0 +1,182 @@
package transaction
import (
"bytes"
"os"
"testing"
"git.canoozie.net/jer/go-storage/pkg/engine"
)
func setupTest(t *testing.T) (*engine.Engine, func()) {
// Create a temporary directory for the test
dir, err := os.MkdirTemp("", "transaction-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
// Create the engine
e, err := engine.NewEngine(dir)
if err != nil {
os.RemoveAll(dir)
t.Fatalf("Failed to create engine: %v", err)
}
// Return cleanup function
cleanup := func() {
e.Close()
os.RemoveAll(dir)
}
return e, cleanup
}
func TestTransaction_BasicOperations(t *testing.T) {
e, cleanup := setupTest(t)
defer cleanup()
// Get transaction statistics before starting
stats := e.GetStats()
txStarted := stats["tx_started"].(uint64)
// Begin a read-write transaction
tx, err := e.BeginTransaction(false)
if err != nil {
t.Fatalf("Failed to begin transaction: %v", err)
}
// Verify transaction started count increased
stats = e.GetStats()
if stats["tx_started"].(uint64) != txStarted+1 {
t.Errorf("Expected tx_started to be %d, got: %d", txStarted+1, stats["tx_started"].(uint64))
}
// Put a value in the transaction
err = tx.Put([]byte("tx-key1"), []byte("tx-value1"))
if err != nil {
t.Fatalf("Failed to put value in transaction: %v", err)
}
// Get the value from the transaction
val, err := tx.Get([]byte("tx-key1"))
if err != nil {
t.Fatalf("Failed to get value from transaction: %v", err)
}
if !bytes.Equal(val, []byte("tx-value1")) {
t.Errorf("Expected value 'tx-value1', got: %s", string(val))
}
// Commit the transaction
if err := tx.Commit(); err != nil {
t.Fatalf("Failed to commit transaction: %v", err)
}
// Verify transaction completed count increased
stats = e.GetStats()
if stats["tx_completed"].(uint64) != 1 {
t.Errorf("Expected tx_completed to be 1, got: %d", stats["tx_completed"].(uint64))
}
if stats["tx_aborted"].(uint64) != 0 {
t.Errorf("Expected tx_aborted to be 0, got: %d", stats["tx_aborted"].(uint64))
}
// Verify the value is accessible from the engine
val, err = e.Get([]byte("tx-key1"))
if err != nil {
t.Fatalf("Failed to get value from engine: %v", err)
}
if !bytes.Equal(val, []byte("tx-value1")) {
t.Errorf("Expected value 'tx-value1', got: %s", string(val))
}
}
func TestTransaction_Rollback(t *testing.T) {
e, cleanup := setupTest(t)
defer cleanup()
// Begin a read-write transaction
tx, err := e.BeginTransaction(false)
if err != nil {
t.Fatalf("Failed to begin transaction: %v", err)
}
// Put a value in the transaction
err = tx.Put([]byte("tx-key2"), []byte("tx-value2"))
if err != nil {
t.Fatalf("Failed to put value in transaction: %v", err)
}
// Get the value from the transaction
val, err := tx.Get([]byte("tx-key2"))
if err != nil {
t.Fatalf("Failed to get value from transaction: %v", err)
}
if !bytes.Equal(val, []byte("tx-value2")) {
t.Errorf("Expected value 'tx-value2', got: %s", string(val))
}
// Rollback the transaction
if err := tx.Rollback(); err != nil {
t.Fatalf("Failed to rollback transaction: %v", err)
}
// Verify transaction aborted count increased
stats := e.GetStats()
if stats["tx_completed"].(uint64) != 0 {
t.Errorf("Expected tx_completed to be 0, got: %d", stats["tx_completed"].(uint64))
}
if stats["tx_aborted"].(uint64) != 1 {
t.Errorf("Expected tx_aborted to be 1, got: %d", stats["tx_aborted"].(uint64))
}
// Verify the value is not accessible from the engine
_, err = e.Get([]byte("tx-key2"))
if err != engine.ErrKeyNotFound {
t.Errorf("Expected ErrKeyNotFound, got: %v", err)
}
}
func TestTransaction_ReadOnly(t *testing.T) {
e, cleanup := setupTest(t)
defer cleanup()
// Add some data to the engine
if err := e.Put([]byte("key-ro"), []byte("value-ro")); err != nil {
t.Fatalf("Failed to put value in engine: %v", err)
}
// Begin a read-only transaction
tx, err := e.BeginTransaction(true)
if err != nil {
t.Fatalf("Failed to begin transaction: %v", err)
}
if !tx.IsReadOnly() {
t.Errorf("Expected transaction to be read-only")
}
// Read the value
val, err := tx.Get([]byte("key-ro"))
if err != nil {
t.Fatalf("Failed to get value from transaction: %v", err)
}
if !bytes.Equal(val, []byte("value-ro")) {
t.Errorf("Expected value 'value-ro', got: %s", string(val))
}
// Attempt to write (should fail)
err = tx.Put([]byte("new-key"), []byte("new-value"))
if err == nil {
t.Errorf("Expected error when putting value in read-only transaction")
}
// Commit the transaction
if err := tx.Commit(); err != nil {
t.Fatalf("Failed to commit transaction: %v", err)
}
// Verify transaction completed count increased
stats := e.GetStats()
if stats["tx_completed"].(uint64) != 1 {
t.Errorf("Expected tx_completed to be 1, got: %d", stats["tx_completed"].(uint64))
}
}