fix: fix the build error from before
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Failing after 5m2s

This commit is contained in:
Jeremy Tregunna 2025-04-26 04:11:32 -06:00
parent ae75f2935f
commit 8e04c2cea3
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
3 changed files with 461 additions and 4 deletions

View File

@ -21,12 +21,13 @@ const (
var (
// Command line flags
benchmarkType = flag.String("type", "all", "Type of benchmark to run (write, read, scan, mixed, tune, or all)")
benchmarkType = flag.String("type", "all", "Type of benchmark to run (write, random-write, sequential-write, read, random-read, scan, range-scan, mixed, tune, compaction, or all)")
duration = flag.Duration("duration", 10*time.Second, "Duration to run the benchmark")
numKeys = flag.Int("keys", defaultKeyCount, "Number of keys to use")
valueSize = flag.Int("value-size", defaultValueSize, "Size of values in bytes")
dataDir = flag.String("data-dir", "./benchmark-data", "Directory to store benchmark data")
sequential = flag.Bool("sequential", false, "Use sequential keys instead of random")
scanSize = flag.Int("scan-size", 100, "Number of entries to scan in range scan benchmarks")
cpuProfile = flag.String("cpu-profile", "", "Write CPU profile to file")
memProfile = flag.String("mem-profile", "", "Write memory profile to file")
resultsFile = flag.String("results", "", "File to write results to (in addition to stdout)")
@ -97,15 +98,44 @@ func main() {
case "write":
result := runWriteBenchmark(e)
results = append(results, result)
case "random-write":
oldSequential := *sequential
*sequential = false
*valueSize = 1024 // Set to 1KB for random write benchmarks
result := runRandomWriteBenchmark(e)
*sequential = oldSequential
results = append(results, result)
case "sequential-write":
oldSequential := *sequential
*sequential = true
result := runSequentialWriteBenchmark(e)
*sequential = oldSequential
results = append(results, result)
case "read":
result := runReadBenchmark(e)
results = append(results, result)
case "random-read":
oldSequential := *sequential
*sequential = false
result := runRandomReadBenchmark(e)
*sequential = oldSequential
results = append(results, result)
case "scan":
result := runScanBenchmark(e)
results = append(results, result)
case "range-scan":
result := runRangeScanBenchmark(e)
results = append(results, result)
case "mixed":
result := runMixedBenchmark(e)
results = append(results, result)
case "compaction":
fmt.Println("Running compaction benchmark...")
if err := CustomCompactionBenchmark(*numKeys, *valueSize, *duration); err != nil {
fmt.Fprintf(os.Stderr, "Compaction benchmark failed: %v\n", err)
continue
}
return // Exit after compaction benchmark
case "tune":
fmt.Println("Running configuration tuning benchmarks...")
if err := RunFullTuningBenchmark(); err != nil {
@ -115,8 +145,12 @@ func main() {
return // Exit after tuning
case "all":
results = append(results, runWriteBenchmark(e))
results = append(results, runRandomWriteBenchmark(e))
results = append(results, runSequentialWriteBenchmark(e))
results = append(results, runReadBenchmark(e))
results = append(results, runRandomReadBenchmark(e))
results = append(results, runScanBenchmark(e))
results = append(results, runRangeScanBenchmark(e))
results = append(results, runMixedBenchmark(e))
default:
fmt.Fprintf(os.Stderr, "Unknown benchmark type: %s\n", typ)
@ -246,6 +280,7 @@ benchmarkEnd:
result := fmt.Sprintf("\nWrite Benchmark Results:")
result += fmt.Sprintf("\n Status: %s", status)
result += fmt.Sprintf("\n Key Mode: %s", keyMode())
result += fmt.Sprintf("\n Operations: %d", opsCount)
result += fmt.Sprintf("\n WAL rotation retries: %d", walRotationCount)
result += fmt.Sprintf("\n Data Written: %.2f MB", float64(opsCount)*float64(*valueSize)/(1024*1024))
@ -257,6 +292,198 @@ benchmarkEnd:
return result
}
// runRandomWriteBenchmark benchmarks random write performance with 1KB values
func runRandomWriteBenchmark(e *engine.EngineFacade) string {
fmt.Println("Running Random Write Benchmark (1KB values)...")
// For random writes with 1KB values, use a moderate batch size
batchSize := 500
start := time.Now()
deadline := start.Add(*duration)
// Create 1KB value
value := make([]byte, 1024) // Fixed at 1KB
for i := range value {
value[i] = byte(i % 256)
}
var opsCount int
var consecutiveErrors int
maxConsecutiveErrors := 10
var walRotationCount int
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for time.Now().Before(deadline) {
// Process in batches
for i := 0; i < batchSize && time.Now().Before(deadline); i++ {
// Generate a random key with a counter to ensure uniqueness
key := []byte(fmt.Sprintf("key-%s-%010d",
strconv.FormatUint(r.Uint64(), 16), opsCount))
if err := e.Put(key, value); err != nil {
if err == engine.ErrEngineClosed {
fmt.Fprintf(os.Stderr, "Engine closed, stopping benchmark\n")
consecutiveErrors++
if consecutiveErrors >= maxConsecutiveErrors {
goto benchmarkEnd
}
time.Sleep(10 * time.Millisecond)
continue
}
// Handle WAL rotation errors
if strings.Contains(err.Error(), "WAL is rotating") ||
strings.Contains(err.Error(), "WAL is closed") {
walRotationCount++
if walRotationCount % 100 == 0 {
fmt.Printf("Retrying due to WAL rotation (%d retries so far)...\n", walRotationCount)
}
time.Sleep(20 * time.Millisecond)
i-- // Retry this key
continue
}
fmt.Fprintf(os.Stderr, "Write error (key #%d): %v\n", opsCount, err)
consecutiveErrors++
if consecutiveErrors >= maxConsecutiveErrors {
fmt.Fprintf(os.Stderr, "Too many consecutive errors, stopping benchmark\n")
goto benchmarkEnd
}
continue
}
consecutiveErrors = 0 // Reset error counter on successful writes
opsCount++
}
// Pause between batches to give background operations time to complete
time.Sleep(5 * time.Millisecond)
}
benchmarkEnd:
elapsed := time.Since(start)
opsPerSecond := float64(opsCount) / elapsed.Seconds()
mbPerSecond := float64(opsCount) * 1024.0 / (1024 * 1024) / elapsed.Seconds()
var status string
if consecutiveErrors >= maxConsecutiveErrors {
status = "COMPLETED WITH ERRORS (expected during WAL rotation)"
} else {
status = "COMPLETED SUCCESSFULLY"
}
result := fmt.Sprintf("\nRandom Write Benchmark Results (1KB values):")
result += fmt.Sprintf("\n Status: %s", status)
result += fmt.Sprintf("\n Operations: %d", opsCount)
result += fmt.Sprintf("\n WAL rotation retries: %d", walRotationCount)
result += fmt.Sprintf("\n Data Written: %.2f MB", float64(opsCount)*1024.0/(1024*1024))
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
result += fmt.Sprintf("\n Throughput: %.2f ops/sec (%.2f MB/sec)", opsPerSecond, mbPerSecond)
result += fmt.Sprintf("\n Latency: %.3f µs/op", 1000000.0/opsPerSecond)
result += fmt.Sprintf("\n Note: This benchmark specifically tests random writes with 1KB values")
return result
}
// runSequentialWriteBenchmark benchmarks sequential write performance for throughput measurement
func runSequentialWriteBenchmark(e *engine.EngineFacade) string {
fmt.Println("Running Sequential Write Benchmark (throughput measurement)...")
// Use larger batch sizes for sequential writes to maximize throughput
batchSize := 2000
if *valueSize > 1024 {
batchSize = 1000
} else if *valueSize > 4096 {
batchSize = 200
}
start := time.Now()
deadline := start.Add(*duration)
value := make([]byte, *valueSize)
for i := range value {
value[i] = byte(i % 256)
}
var opsCount int
var consecutiveErrors int
maxConsecutiveErrors := 10
var walRotationCount int
for time.Now().Before(deadline) {
// Process in batches
for i := 0; i < batchSize && time.Now().Before(deadline); i++ {
// Generate sequential keys
key := []byte(fmt.Sprintf("seq-key-%010d", opsCount))
if err := e.Put(key, value); err != nil {
if err == engine.ErrEngineClosed {
fmt.Fprintf(os.Stderr, "Engine closed, stopping benchmark\n")
consecutiveErrors++
if consecutiveErrors >= maxConsecutiveErrors {
goto benchmarkEnd
}
time.Sleep(10 * time.Millisecond)
continue
}
// Handle WAL rotation errors
if strings.Contains(err.Error(), "WAL is rotating") ||
strings.Contains(err.Error(), "WAL is closed") {
walRotationCount++
if walRotationCount % 100 == 0 {
fmt.Printf("Retrying due to WAL rotation (%d retries so far)...\n", walRotationCount)
}
time.Sleep(20 * time.Millisecond)
i-- // Retry this key
continue
}
fmt.Fprintf(os.Stderr, "Write error (key #%d): %v\n", opsCount, err)
consecutiveErrors++
if consecutiveErrors >= maxConsecutiveErrors {
fmt.Fprintf(os.Stderr, "Too many consecutive errors, stopping benchmark\n")
goto benchmarkEnd
}
continue
}
consecutiveErrors = 0 // Reset error counter on successful writes
opsCount++
}
// Shorter pause between batches for sequential writes for better throughput
time.Sleep(1 * time.Millisecond)
}
benchmarkEnd:
elapsed := time.Since(start)
opsPerSecond := float64(opsCount) / elapsed.Seconds()
mbPerSecond := float64(opsCount) * float64(*valueSize) / (1024 * 1024) / elapsed.Seconds()
var status string
if consecutiveErrors >= maxConsecutiveErrors {
status = "COMPLETED WITH ERRORS (expected during WAL rotation)"
} else {
status = "COMPLETED SUCCESSFULLY"
}
result := fmt.Sprintf("\nSequential Write Benchmark Results:")
result += fmt.Sprintf("\n Status: %s", status)
result += fmt.Sprintf("\n Operations: %d", opsCount)
result += fmt.Sprintf("\n WAL rotation retries: %d", walRotationCount)
result += fmt.Sprintf("\n Data Written: %.2f MB", float64(opsCount)*float64(*valueSize)/(1024*1024))
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
result += fmt.Sprintf("\n Throughput: %.2f ops/sec", opsPerSecond)
result += fmt.Sprintf("\n Data Throughput: %.2f MB/sec", mbPerSecond)
result += fmt.Sprintf("\n Latency: %.3f µs/op", 1000000.0/opsPerSecond)
result += fmt.Sprintf("\n Note: This benchmark measures maximum sequential write throughput")
return result
}
// runReadBenchmark benchmarks read performance
func runReadBenchmark(e *engine.EngineFacade) string {
fmt.Println("Preparing data for Read Benchmark...")
@ -328,6 +555,7 @@ benchmarkEnd:
hitRate := float64(hitCount) / float64(opsCount) * 100
result := fmt.Sprintf("\nRead Benchmark Results:")
result += fmt.Sprintf("\n Key Mode: %s", keyMode())
result += fmt.Sprintf("\n Operations: %d", opsCount)
result += fmt.Sprintf("\n Hit Rate: %.2f%%", hitRate)
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
@ -337,6 +565,94 @@ benchmarkEnd:
return result
}
// runRandomReadBenchmark benchmarks random read performance
func runRandomReadBenchmark(e *engine.EngineFacade) string {
fmt.Println("Preparing data for Random Read Benchmark...")
// First, write data to read, using random keys
actualNumKeys := *numKeys
if actualNumKeys > 100000 {
actualNumKeys = 100000
fmt.Println("Limiting to 100,000 keys for random read preparation phase")
}
// Prepare both the keys and a value
r := rand.New(rand.NewSource(time.Now().UnixNano()))
keys := make([][]byte, actualNumKeys)
value := make([]byte, 1024) // Use 1KB values for random reads
for i := range value {
value[i] = byte(i % 256)
}
// Write the test data with random keys
for i := 0; i < actualNumKeys; i++ {
keys[i] = []byte(fmt.Sprintf("rand-key-%s-%06d",
strconv.FormatUint(r.Uint64(), 16), i))
if err := e.Put(keys[i], value); err != nil {
if err == engine.ErrEngineClosed {
fmt.Fprintf(os.Stderr, "Engine closed during preparation\n")
return "Random Read Benchmark Failed: Engine closed"
}
fmt.Fprintf(os.Stderr, "Write error during preparation: %v\n", err)
return "Random Read Benchmark Failed: Error preparing data"
}
// Add small pause every 1000 keys
if i > 0 && i%1000 == 0 {
time.Sleep(5 * time.Millisecond)
}
}
// Wait a bit for any compaction to settle
time.Sleep(1 * time.Second)
fmt.Println("Running Random Read Benchmark...")
start := time.Now()
deadline := start.Add(*duration)
var opsCount, hitCount int
readRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for time.Now().Before(deadline) {
// Process in smaller batches
batchSize := 200
for i := 0; i < batchSize && time.Now().Before(deadline); i++ {
// Read a random key from our set
idx := readRand.Intn(actualNumKeys)
key := keys[idx]
val, err := e.Get(key)
if err == engine.ErrEngineClosed {
fmt.Fprintf(os.Stderr, "Engine closed, stopping benchmark\n")
goto benchmarkEnd
}
if err == nil && val != nil {
hitCount++
}
opsCount++
}
// Small pause between batches
time.Sleep(1 * time.Millisecond)
}
benchmarkEnd:
elapsed := time.Since(start)
opsPerSecond := float64(opsCount) / elapsed.Seconds()
hitRate := float64(hitCount) / float64(opsCount) * 100
result := fmt.Sprintf("\nRandom Read Benchmark Results:")
result += fmt.Sprintf("\n Operations: %d", opsCount)
result += fmt.Sprintf("\n Hit Rate: %.2f%%", hitRate)
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
result += fmt.Sprintf("\n Throughput: %.2f ops/sec", opsPerSecond)
result += fmt.Sprintf("\n Latency: %.3f µs/op", 1000000.0/opsPerSecond)
result += fmt.Sprintf("\n Note: This benchmark specifically tests random key access patterns")
return result
}
// runScanBenchmark benchmarks range scan performance
func runScanBenchmark(e *engine.EngineFacade) string {
fmt.Println("Preparing data for Scan Benchmark...")
@ -421,13 +737,151 @@ benchmarkEnd:
scansPerSecond := float64(opsCount) / elapsed.Seconds()
entriesPerSecond := float64(entriesScanned) / elapsed.Seconds()
// Calculate data throughput in MB/sec
dataThroughputMB := float64(entriesScanned) * float64(*valueSize) / (1024 * 1024) / elapsed.Seconds()
result := fmt.Sprintf("\nScan Benchmark Results:")
result += fmt.Sprintf("\n Scan Operations: %d", opsCount)
result += fmt.Sprintf("\n Entries Scanned: %d", entriesScanned)
result += fmt.Sprintf("\n Total Data Read: %.2f MB", float64(entriesScanned)*float64(*valueSize)/(1024*1024))
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
result += fmt.Sprintf("\n Throughput: %.2f scans/sec", scansPerSecond)
result += fmt.Sprintf("\n Scan Throughput: %.2f scans/sec", scansPerSecond)
result += fmt.Sprintf("\n Entry Throughput: %.2f entries/sec", entriesPerSecond)
result += fmt.Sprintf("\n Latency: %.3f ms/scan", 1000.0/scansPerSecond)
result += fmt.Sprintf("\n Data Throughput: %.2f MB/sec", dataThroughputMB)
result += fmt.Sprintf("\n Scan Latency: %.3f ms/scan", 1000.0/scansPerSecond)
result += fmt.Sprintf("\n Entry Latency: %.3f µs/entry", 1000000.0/entriesPerSecond)
return result
}
// runRangeScanBenchmark benchmarks range scan performance with configurable scan size
func runRangeScanBenchmark(e *engine.EngineFacade) string {
fmt.Println("Preparing data for Range Scan Benchmark...")
// First, write data to scan - use sequential keys with fixed prefixes
// to make range scans more representative
actualNumKeys := *numKeys
if actualNumKeys > 100000 {
actualNumKeys = 100000
fmt.Println("Limiting to 100,000 keys for range scan benchmark")
}
// Create keys with prefix that can be used for range scans
// Keys will be organized into buckets for realistic scanning
const BUCKETS = 100
keysPerBucket := actualNumKeys / BUCKETS
value := make([]byte, *valueSize)
for i := range value {
value[i] = byte(i % 256)
}
fmt.Printf("Creating %d buckets with approximately %d keys each...\n",
BUCKETS, keysPerBucket)
for bucket := 0; bucket < BUCKETS; bucket++ {
bucketPrefix := fmt.Sprintf("bucket-%03d:", bucket)
// Create keys within this bucket
for i := 0; i < keysPerBucket; i++ {
key := []byte(fmt.Sprintf("%s%06d", bucketPrefix, i))
if err := e.Put(key, value); err != nil {
if err == engine.ErrEngineClosed {
fmt.Fprintf(os.Stderr, "Engine closed during preparation\n")
return "Range Scan Benchmark Failed: Engine closed"
}
fmt.Fprintf(os.Stderr, "Write error during preparation: %v\n", err)
return "Range Scan Benchmark Failed: Error preparing data"
}
}
// Pause after each bucket to allow for compaction
if bucket > 0 && bucket%10 == 0 {
time.Sleep(100 * time.Millisecond)
fmt.Printf("Created %d/%d buckets...\n", bucket, BUCKETS)
}
}
// Wait a bit for any pending compactions
time.Sleep(1 * time.Second)
fmt.Println("Running Range Scan Benchmark...")
start := time.Now()
deadline := start.Add(*duration)
var opsCount, entriesScanned int
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Use configured scan size or default to 100
scanSize := *scanSize
for time.Now().Before(deadline) {
// Pick a random bucket to scan
bucket := r.Intn(BUCKETS)
bucketPrefix := fmt.Sprintf("bucket-%03d:", bucket)
// Determine scan range - either full bucket or partial depending on scan size
var startKey, endKey []byte
if scanSize >= keysPerBucket {
// Scan whole bucket
startKey = []byte(fmt.Sprintf("%s%06d", bucketPrefix, 0))
endKey = []byte(fmt.Sprintf("%s%06d", bucketPrefix, keysPerBucket))
} else {
// Scan part of the bucket
startIdx := r.Intn(keysPerBucket - scanSize)
endIdx := startIdx + scanSize
startKey = []byte(fmt.Sprintf("%s%06d", bucketPrefix, startIdx))
endKey = []byte(fmt.Sprintf("%s%06d", bucketPrefix, endIdx))
}
iter, err := e.GetRangeIterator(startKey, endKey)
if err != nil {
if err == engine.ErrEngineClosed {
fmt.Fprintf(os.Stderr, "Engine closed, stopping benchmark\n")
goto benchmarkEnd
}
fmt.Fprintf(os.Stderr, "Failed to create iterator: %v\n", err)
continue
}
// Perform the range scan
var scanned int
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
// Access both key and value to simulate real-world usage
_ = iter.Key()
_ = iter.Value()
scanned++
}
entriesScanned += scanned
opsCount++
// Small pause between scans
time.Sleep(5 * time.Millisecond)
}
benchmarkEnd:
elapsed := time.Since(start)
scansPerSecond := float64(opsCount) / elapsed.Seconds()
entriesPerSecond := float64(entriesScanned) / elapsed.Seconds()
avgEntriesPerScan := float64(entriesScanned) / float64(opsCount)
// Calculate data throughput in MB/sec
dataThroughputMB := float64(entriesScanned) * float64(*valueSize) / (1024 * 1024) / elapsed.Seconds()
result := fmt.Sprintf("\nRange Scan Benchmark Results:")
result += fmt.Sprintf("\n Target Scan Size: %d entries", scanSize)
result += fmt.Sprintf("\n Scan Operations: %d", opsCount)
result += fmt.Sprintf("\n Total Entries Scanned: %d", entriesScanned)
result += fmt.Sprintf("\n Avg Entries Per Scan: %.1f", avgEntriesPerScan)
result += fmt.Sprintf("\n Total Data Read: %.2f MB", float64(entriesScanned)*float64(*valueSize)/(1024*1024))
result += fmt.Sprintf("\n Time: %.2f seconds", elapsed.Seconds())
result += fmt.Sprintf("\n Scan Throughput: %.2f scans/sec", scansPerSecond)
result += fmt.Sprintf("\n Entry Throughput: %.2f entries/sec", entriesPerSecond)
result += fmt.Sprintf("\n Data Throughput: %.2f MB/sec", dataThroughputMB)
result += fmt.Sprintf("\n Scan Latency: %.3f ms/scan", 1000.0/scansPerSecond)
result += fmt.Sprintf("\n Entry Latency: %.3f µs/entry", 1000000.0/entriesPerSecond)
return result
}

View File

@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync/atomic"
)
const (
@ -133,7 +134,7 @@ func (b *Batch) Write(w *WAL) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
if atomic.LoadInt32(&w.closed) == 1 {
return ErrWALClosed
}

View File

@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/KevoDB/kevo/pkg/config"
@ -78,6 +79,7 @@ type WAL struct {
lastSync time.Time
batchByteSize int64
status int32 // Using atomic int32 for status flags
closed int32 // Atomic flag indicating if WAL is closed
mu sync.Mutex
}