feat: add a standard stats collector, needs integration

This commit is contained in:
Jeremy Tregunna 2025-04-23 20:01:18 -06:00
parent d5a90cf2e4
commit 7dd816bdf5
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
3 changed files with 623 additions and 0 deletions

340
pkg/stats/collector.go Normal file
View File

@ -0,0 +1,340 @@
package stats
import (
"sync"
"sync/atomic"
"time"
)
// OperationType defines the type of operation being tracked
type OperationType string
// Common operation types
const (
OpPut OperationType = "put"
OpGet OperationType = "get"
OpDelete OperationType = "delete"
OpTxBegin OperationType = "tx_begin"
OpTxCommit OperationType = "tx_commit"
OpTxRollback OperationType = "tx_rollback"
OpFlush OperationType = "flush"
OpCompact OperationType = "compact"
OpSeek OperationType = "seek"
OpScan OperationType = "scan"
)
// AtomicCollector provides centralized statistics collection with minimal contention
// using atomic operations for thread safety
type AtomicCollector struct {
// Operation counters using atomic values
counts map[OperationType]*atomic.Uint64
countsMu sync.RWMutex // Only used when creating new counter entries
// Timing measurements for last operation timestamps
lastOpTime map[OperationType]time.Time
lastOpTimeMu sync.RWMutex // Only used for timestamp updates
// Usage metrics
memTableSize atomic.Uint64
totalBytesRead atomic.Uint64
totalBytesWritten atomic.Uint64
// Error tracking
errors map[string]*atomic.Uint64
errorsMu sync.RWMutex // Only used when creating new error entries
// Performance metrics
flushCount atomic.Uint64
compactionCount atomic.Uint64
// Recovery statistics
recoveryStats RecoveryStats
// Latency tracking
latencies map[OperationType]*LatencyTracker
latenciesMu sync.RWMutex // Only used when creating new latency trackers
}
// RecoveryStats tracks statistics related to WAL recovery
type RecoveryStats struct {
WALFilesRecovered atomic.Uint64
WALEntriesRecovered atomic.Uint64
WALCorruptedEntries atomic.Uint64
WALRecoveryDuration atomic.Int64 // nanoseconds
}
// LatencyTracker maintains running statistics about operation latencies
type LatencyTracker struct {
count atomic.Uint64
sum atomic.Uint64 // sum in nanoseconds
max atomic.Uint64 // max in nanoseconds
min atomic.Uint64 // min in nanoseconds (initialized to max uint64)
}
// NewCollector creates a new statistics collector
func NewCollector() *AtomicCollector {
return &AtomicCollector{
counts: make(map[OperationType]*atomic.Uint64),
lastOpTime: make(map[OperationType]time.Time),
errors: make(map[string]*atomic.Uint64),
latencies: make(map[OperationType]*LatencyTracker),
}
}
// TrackOperation increments the counter for the specified operation type
func (c *AtomicCollector) TrackOperation(op OperationType) {
counter := c.getOrCreateCounter(op)
counter.Add(1)
// Update last operation time (less critical, can use mutex)
c.lastOpTimeMu.Lock()
c.lastOpTime[op] = time.Now()
c.lastOpTimeMu.Unlock()
}
// TrackOperationWithLatency tracks an operation and its latency
func (c *AtomicCollector) TrackOperationWithLatency(op OperationType, latencyNs uint64) {
// Track operation count
counter := c.getOrCreateCounter(op)
counter.Add(1)
// Update last operation time
c.lastOpTimeMu.Lock()
c.lastOpTime[op] = time.Now()
c.lastOpTimeMu.Unlock()
// Update latency statistics
tracker := c.getOrCreateLatencyTracker(op)
tracker.count.Add(1)
tracker.sum.Add(latencyNs)
// Update max (using compare-and-swap pattern)
for {
current := tracker.max.Load()
if latencyNs <= current {
break // No need to update
}
if tracker.max.CompareAndSwap(current, latencyNs) {
break // Successfully updated
}
// If we get here, someone else updated it between our load and CAS, so we retry
}
// Update min (using compare-and-swap pattern)
for {
current := tracker.min.Load()
if current == 0 {
// First value
if tracker.min.CompareAndSwap(0, latencyNs) {
break
}
continue // Race condition, try again
}
if latencyNs >= current {
break // No need to update
}
if tracker.min.CompareAndSwap(current, latencyNs) {
break // Successfully updated
}
// If we get here, someone else updated it between our load and CAS, so we retry
}
}
// TrackError increments the counter for the specified error type
func (c *AtomicCollector) TrackError(errorType string) {
c.errorsMu.RLock()
counter, exists := c.errors[errorType]
c.errorsMu.RUnlock()
if !exists {
c.errorsMu.Lock()
if counter, exists = c.errors[errorType]; !exists {
counter = &atomic.Uint64{}
c.errors[errorType] = counter
}
c.errorsMu.Unlock()
}
counter.Add(1)
}
// TrackBytes adds the specified number of bytes to the read or write counter
func (c *AtomicCollector) TrackBytes(isWrite bool, bytes uint64) {
if isWrite {
c.totalBytesWritten.Add(bytes)
} else {
c.totalBytesRead.Add(bytes)
}
}
// TrackMemTableSize records the current memtable size
func (c *AtomicCollector) TrackMemTableSize(size uint64) {
c.memTableSize.Store(size)
}
// TrackFlush increments the flush counter
func (c *AtomicCollector) TrackFlush() {
c.flushCount.Add(1)
}
// TrackCompaction increments the compaction counter
func (c *AtomicCollector) TrackCompaction() {
c.compactionCount.Add(1)
}
// StartRecovery initializes recovery statistics
func (c *AtomicCollector) StartRecovery() time.Time {
// Reset recovery stats
c.recoveryStats.WALFilesRecovered.Store(0)
c.recoveryStats.WALEntriesRecovered.Store(0)
c.recoveryStats.WALCorruptedEntries.Store(0)
c.recoveryStats.WALRecoveryDuration.Store(0)
return time.Now()
}
// FinishRecovery completes recovery statistics
func (c *AtomicCollector) FinishRecovery(startTime time.Time, filesRecovered, entriesRecovered, corruptedEntries uint64) {
c.recoveryStats.WALFilesRecovered.Store(filesRecovered)
c.recoveryStats.WALEntriesRecovered.Store(entriesRecovered)
c.recoveryStats.WALCorruptedEntries.Store(corruptedEntries)
c.recoveryStats.WALRecoveryDuration.Store(time.Since(startTime).Nanoseconds())
}
// GetStats returns all statistics as a map
func (c *AtomicCollector) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
// Add operation counters
c.countsMu.RLock()
for op, counter := range c.counts {
stats[string(op)+"_ops"] = counter.Load()
}
c.countsMu.RUnlock()
// Add timing information
c.lastOpTimeMu.RLock()
for op, timestamp := range c.lastOpTime {
stats["last_"+string(op)+"_time"] = timestamp.UnixNano()
}
c.lastOpTimeMu.RUnlock()
// Add performance metrics
stats["memtable_size"] = c.memTableSize.Load()
stats["total_bytes_read"] = c.totalBytesRead.Load()
stats["total_bytes_written"] = c.totalBytesWritten.Load()
stats["flush_count"] = c.flushCount.Load()
stats["compaction_count"] = c.compactionCount.Load()
// Add error statistics
c.errorsMu.RLock()
errorStats := make(map[string]uint64)
for errType, counter := range c.errors {
errorStats[errType] = counter.Load()
}
c.errorsMu.RUnlock()
stats["errors"] = errorStats
// Add recovery statistics
recoveryStats := map[string]interface{}{
"wal_files_recovered": c.recoveryStats.WALFilesRecovered.Load(),
"wal_entries_recovered": c.recoveryStats.WALEntriesRecovered.Load(),
"wal_corrupted_entries": c.recoveryStats.WALCorruptedEntries.Load(),
}
recoveryDuration := c.recoveryStats.WALRecoveryDuration.Load()
if recoveryDuration > 0 {
recoveryStats["wal_recovery_duration_ms"] = recoveryDuration / int64(time.Millisecond)
}
stats["recovery"] = recoveryStats
// Add latency statistics
c.latenciesMu.RLock()
for op, tracker := range c.latencies {
count := tracker.count.Load()
if count == 0 {
continue
}
latencyStats := map[string]interface{}{
"count": count,
"avg_ns": tracker.sum.Load() / count,
}
// Only include min/max if we have values
if min := tracker.min.Load(); min != 0 {
latencyStats["min_ns"] = min
}
if max := tracker.max.Load(); max != 0 {
latencyStats["max_ns"] = max
}
stats[string(op)+"_latency"] = latencyStats
}
c.latenciesMu.RUnlock()
return stats
}
// GetStatsFiltered returns statistics filtered by prefix
func (c *AtomicCollector) GetStatsFiltered(prefix string) map[string]interface{} {
allStats := c.GetStats()
filtered := make(map[string]interface{})
for key, value := range allStats {
// Add entries that start with the prefix
if len(prefix) == 0 || startsWith(key, prefix) {
filtered[key] = value
}
}
return filtered
}
// getOrCreateCounter gets or creates an atomic counter for the operation
func (c *AtomicCollector) getOrCreateCounter(op OperationType) *atomic.Uint64 {
// Try read lock first (fast path)
c.countsMu.RLock()
counter, exists := c.counts[op]
c.countsMu.RUnlock()
if !exists {
// Slow path with write lock
c.countsMu.Lock()
if counter, exists = c.counts[op]; !exists {
counter = &atomic.Uint64{}
c.counts[op] = counter
}
c.countsMu.Unlock()
}
return counter
}
// getOrCreateLatencyTracker gets or creates a latency tracker for the operation
func (c *AtomicCollector) getOrCreateLatencyTracker(op OperationType) *LatencyTracker {
// Try read lock first (fast path)
c.latenciesMu.RLock()
tracker, exists := c.latencies[op]
c.latenciesMu.RUnlock()
if !exists {
// Slow path with write lock
c.latenciesMu.Lock()
if tracker, exists = c.latencies[op]; !exists {
tracker = &LatencyTracker{}
c.latencies[op] = tracker
}
c.latenciesMu.Unlock()
}
return tracker
}
// startsWith checks if a string starts with a prefix
func startsWith(s, prefix string) bool {
if len(s) < len(prefix) {
return false
}
return s[:len(prefix)] == prefix
}

236
pkg/stats/collector_test.go Normal file
View File

@ -0,0 +1,236 @@
package stats
import (
"sync"
"testing"
"time"
)
func TestCollector_TrackOperation(t *testing.T) {
collector := NewCollector()
// Track operations
collector.TrackOperation(OpPut)
collector.TrackOperation(OpPut)
collector.TrackOperation(OpGet)
// Get stats
stats := collector.GetStats()
// Verify counts
if stats["put_ops"].(uint64) != 2 {
t.Errorf("Expected 2 put operations, got %v", stats["put_ops"])
}
if stats["get_ops"].(uint64) != 1 {
t.Errorf("Expected 1 get operation, got %v", stats["get_ops"])
}
// Verify last operation times exist
if _, exists := stats["last_put_time"]; !exists {
t.Errorf("Expected last_put_time to exist in stats")
}
if _, exists := stats["last_get_time"]; !exists {
t.Errorf("Expected last_get_time to exist in stats")
}
}
func TestCollector_TrackOperationWithLatency(t *testing.T) {
collector := NewCollector()
// Track operations with latency
collector.TrackOperationWithLatency(OpGet, 100)
collector.TrackOperationWithLatency(OpGet, 200)
collector.TrackOperationWithLatency(OpGet, 300)
// Get stats
stats := collector.GetStats()
// Check latency stats
latencyStats, ok := stats["get_latency"].(map[string]interface{})
if !ok {
t.Fatalf("Expected get_latency to be a map, got %T", stats["get_latency"])
}
if count := latencyStats["count"].(uint64); count != 3 {
t.Errorf("Expected 3 latency records, got %v", count)
}
if avg := latencyStats["avg_ns"].(uint64); avg != 200 {
t.Errorf("Expected average latency 200ns, got %v", avg)
}
if min := latencyStats["min_ns"].(uint64); min != 100 {
t.Errorf("Expected min latency 100ns, got %v", min)
}
if max := latencyStats["max_ns"].(uint64); max != 300 {
t.Errorf("Expected max latency 300ns, got %v", max)
}
}
func TestCollector_ConcurrentAccess(t *testing.T) {
collector := NewCollector()
const numGoroutines = 10
const opsPerGoroutine = 1000
var wg sync.WaitGroup
wg.Add(numGoroutines)
// Launch goroutines to track operations concurrently
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < opsPerGoroutine; j++ {
// Mix different operations
switch j % 3 {
case 0:
collector.TrackOperation(OpPut)
case 1:
collector.TrackOperation(OpGet)
case 2:
collector.TrackOperationWithLatency(OpDelete, uint64(j))
}
}
}(i)
}
wg.Wait()
// Get stats
stats := collector.GetStats()
// There should be approximately opsPerGoroutine * numGoroutines / 3 operations of each type
expectedOps := uint64(numGoroutines * opsPerGoroutine / 3)
// Allow for small variations due to concurrent execution
// Use 99% of expected as minimum threshold
minThreshold := expectedOps * 99 / 100
if ops := stats["put_ops"].(uint64); ops < minThreshold {
t.Errorf("Expected approximately %d put operations, got %v (below threshold %d)",
expectedOps, ops, minThreshold)
}
if ops := stats["get_ops"].(uint64); ops < minThreshold {
t.Errorf("Expected approximately %d get operations, got %v (below threshold %d)",
expectedOps, ops, minThreshold)
}
if ops := stats["delete_ops"].(uint64); ops < minThreshold {
t.Errorf("Expected approximately %d delete operations, got %v (below threshold %d)",
expectedOps, ops, minThreshold)
}
}
func TestCollector_GetStatsFiltered(t *testing.T) {
collector := NewCollector()
// Track different operations
collector.TrackOperation(OpPut)
collector.TrackOperation(OpGet)
collector.TrackOperation(OpGet)
collector.TrackOperation(OpDelete)
collector.TrackError("io_error")
collector.TrackError("network_error")
// Filter by "get" prefix
getStats := collector.GetStatsFiltered("get")
// Should only contain get_ops and related stats
if len(getStats) == 0 {
t.Errorf("Expected non-empty filtered stats")
}
if _, exists := getStats["get_ops"]; !exists {
t.Errorf("Expected get_ops in filtered stats")
}
if _, exists := getStats["put_ops"]; exists {
t.Errorf("Did not expect put_ops in get-filtered stats")
}
// Filter by "error" prefix
errorStats := collector.GetStatsFiltered("error")
if _, exists := errorStats["errors"]; !exists {
t.Errorf("Expected errors in error-filtered stats")
}
}
func TestCollector_TrackBytes(t *testing.T) {
collector := NewCollector()
// Track read and write bytes
collector.TrackBytes(true, 1000) // write
collector.TrackBytes(false, 500) // read
stats := collector.GetStats()
if bytesWritten := stats["total_bytes_written"].(uint64); bytesWritten != 1000 {
t.Errorf("Expected 1000 bytes written, got %v", bytesWritten)
}
if bytesRead := stats["total_bytes_read"].(uint64); bytesRead != 500 {
t.Errorf("Expected 500 bytes read, got %v", bytesRead)
}
}
func TestCollector_TrackMemTableSize(t *testing.T) {
collector := NewCollector()
// Track memtable size
collector.TrackMemTableSize(2048)
stats := collector.GetStats()
if size := stats["memtable_size"].(uint64); size != 2048 {
t.Errorf("Expected memtable size 2048, got %v", size)
}
// Update memtable size
collector.TrackMemTableSize(4096)
stats = collector.GetStats()
if size := stats["memtable_size"].(uint64); size != 4096 {
t.Errorf("Expected updated memtable size 4096, got %v", size)
}
}
func TestCollector_RecoveryStats(t *testing.T) {
collector := NewCollector()
// Start recovery
startTime := collector.StartRecovery()
// Simulate some work
time.Sleep(10 * time.Millisecond)
// Finish recovery
collector.FinishRecovery(startTime, 5, 1000, 2)
stats := collector.GetStats()
recoveryStats, ok := stats["recovery"].(map[string]interface{})
if !ok {
t.Fatalf("Expected recovery stats to be a map")
}
if filesRecovered := recoveryStats["wal_files_recovered"].(uint64); filesRecovered != 5 {
t.Errorf("Expected 5 files recovered, got %v", filesRecovered)
}
if entriesRecovered := recoveryStats["wal_entries_recovered"].(uint64); entriesRecovered != 1000 {
t.Errorf("Expected 1000 entries recovered, got %v", entriesRecovered)
}
if corruptedEntries := recoveryStats["wal_corrupted_entries"].(uint64); corruptedEntries != 2 {
t.Errorf("Expected 2 corrupted entries, got %v", corruptedEntries)
}
if _, exists := recoveryStats["wal_recovery_duration_ms"]; !exists {
t.Errorf("Expected recovery duration to be recorded")
}
}

47
pkg/stats/interface.go Normal file
View File

@ -0,0 +1,47 @@
package stats
import "time"
// Provider defines the interface for components that provide statistics
type Provider interface {
// GetStats returns all statistics
GetStats() map[string]interface{}
// GetStatsFiltered returns statistics filtered by prefix
GetStatsFiltered(prefix string) map[string]interface{}
}
// Collector interface defines methods for collecting statistics
type Collector interface {
Provider
// TrackOperation records a single operation
TrackOperation(op OperationType)
// TrackOperationWithLatency records an operation with its latency
TrackOperationWithLatency(op OperationType, latencyNs uint64)
// TrackError increments the counter for the specified error type
TrackError(errorType string)
// TrackBytes adds the specified number of bytes to the read or write counter
TrackBytes(isWrite bool, bytes uint64)
// TrackMemTableSize records the current memtable size
TrackMemTableSize(size uint64)
// TrackFlush increments the flush counter
TrackFlush()
// TrackCompaction increments the compaction counter
TrackCompaction()
// StartRecovery initializes recovery statistics
StartRecovery() time.Time
// FinishRecovery completes recovery statistics
FinishRecovery(startTime time.Time, filesRecovered, entriesRecovered, corruptedEntries uint64)
}
// Ensure AtomicCollector implements the Collector interface
var _ Collector = (*AtomicCollector)(nil)