perf: added bloom filters for sstables to improve performance
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Failing after 5m2s

This commit is contained in:
Jeremy Tregunna 2025-04-23 19:04:10 -06:00
parent 2e3c70e913
commit d5a90cf2e4
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
9 changed files with 1596 additions and 48 deletions

View File

@ -0,0 +1,228 @@
package bloomfilter
import (
"encoding/binary"
"hash/fnv"
"math"
"os"
"sync"
)
// BloomFilter is a probabilistic data structure that is used to test whether an element
// is a member of a set. False positives are possible, but false negatives are not.
type BloomFilter struct {
mu sync.RWMutex
bits []byte // The bit array
size uint64 // The size of the bit array in bits
hashFuncs uint64 // The number of hash functions
expectedN uint64 // The expected number of elements
insertions uint64 // The number of elements inserted
}
// NewBloomFilter creates a new Bloom filter with the given parameters.
// falsePositiveRate: The desired false positive rate (e.g., 0.01 for 1%)
// expectedElements: The expected number of elements to be inserted
func NewBloomFilter(falsePositiveRate float64, expectedElements uint64) *BloomFilter {
// Calculate optimal size and number of hash functions
size := calculateOptimalSize(expectedElements, falsePositiveRate)
hashFuncs := calculateOptimalHashFuncs(size, expectedElements)
// Allocate the bit array (divide by 8 to convert bits to bytes)
bits := make([]byte, (size+7)/8)
return &BloomFilter{
bits: bits,
size: size,
hashFuncs: hashFuncs,
expectedN: expectedElements,
insertions: 0,
}
}
// Add inserts an element into the Bloom filter
func (bf *BloomFilter) Add(key []byte) {
bf.mu.Lock()
defer bf.mu.Unlock()
// Generate hash values and set corresponding bits
for i := uint64(0); i < bf.hashFuncs; i++ {
position := bf.hash(key, i)
bf.setBit(position)
}
bf.insertions++
}
// Contains checks if an element might be in the Bloom filter
// Returns true if the element might be in the set (could be a false positive)
// Returns false if the element is definitely not in the set
func (bf *BloomFilter) Contains(key []byte) bool {
bf.mu.RLock()
defer bf.mu.RUnlock()
// Generate hash values and check corresponding bits
for i := uint64(0); i < bf.hashFuncs; i++ {
position := bf.hash(key, i)
if !bf.testBit(position) {
return false
}
}
// All bits were set, so the element might be in the set
return true
}
// Clear removes all elements from the Bloom filter
func (bf *BloomFilter) Clear() {
bf.mu.Lock()
defer bf.mu.Unlock()
// Reset the bit array
for i := range bf.bits {
bf.bits[i] = 0
}
bf.insertions = 0
}
// EstimatedFalsePositiveRate returns the estimated false positive rate
// based on the current number of insertions
func (bf *BloomFilter) EstimatedFalsePositiveRate() float64 {
bf.mu.RLock()
defer bf.mu.RUnlock()
// k: number of hash functions
// m: size of the bit array in bits
// n: number of elements inserted
k := float64(bf.hashFuncs)
m := float64(bf.size)
n := float64(bf.insertions)
// The formula for the false positive rate is (1 - e^(-kn/m))^k
return math.Pow(1-math.Exp(-k*n/m), k)
}
// SaveToFile saves the Bloom filter to a file
func (bf *BloomFilter) SaveToFile(filePath string) error {
bf.mu.RLock()
defer bf.mu.RUnlock()
file, err := os.Create(filePath)
if err != nil {
return err
}
defer file.Close()
// Write header: size, hash functions, expected elements, insertions
header := make([]byte, 32)
binary.LittleEndian.PutUint64(header[0:8], bf.size)
binary.LittleEndian.PutUint64(header[8:16], bf.hashFuncs)
binary.LittleEndian.PutUint64(header[16:24], bf.expectedN)
binary.LittleEndian.PutUint64(header[24:32], bf.insertions)
// Write header
if _, err := file.Write(header); err != nil {
return err
}
// Write bit array
if _, err := file.Write(bf.bits); err != nil {
return err
}
return nil
}
// LoadFromFile loads a Bloom filter from a file
func LoadBloomFilter(filePath string) (*BloomFilter, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
// Read header: size, hash functions, expected elements, insertions
header := make([]byte, 32)
if _, err := file.Read(header); err != nil {
return nil, err
}
size := binary.LittleEndian.Uint64(header[0:8])
hashFuncs := binary.LittleEndian.Uint64(header[8:16])
expectedN := binary.LittleEndian.Uint64(header[16:24])
insertions := binary.LittleEndian.Uint64(header[24:32])
// Read bit array
bits := make([]byte, (size+7)/8)
if _, err := file.Read(bits); err != nil {
return nil, err
}
return &BloomFilter{
bits: bits,
size: size,
hashFuncs: hashFuncs,
expectedN: expectedN,
insertions: insertions,
}, nil
}
// hash generates a hash value for a key using a variant of the FNV hash
// i is used to generate multiple hash functions from a single hash function
func (bf *BloomFilter) hash(key []byte, i uint64) uint64 {
// Use FNV-1a hash
h := fnv.New64a()
h.Write(key)
h.Write([]byte{byte(i), byte(i >> 8), byte(i >> 16), byte(i >> 24),
byte(i >> 32), byte(i >> 40), byte(i >> 48), byte(i >> 56)})
// Map the hash to a bit position
return h.Sum64() % bf.size
}
// setBit sets the bit at the specified position
func (bf *BloomFilter) setBit(position uint64) {
byteIndex := position / 8
bitOffset := position % 8
bf.bits[byteIndex] |= (1 << bitOffset)
}
// testBit checks if the bit at the specified position is set
func (bf *BloomFilter) testBit(position uint64) bool {
byteIndex := position / 8
bitOffset := position % 8
return (bf.bits[byteIndex] & (1 << bitOffset)) != 0
}
// Size returns the size of the Bloom filter in bits
func (bf *BloomFilter) Size() uint64 {
return bf.size
}
// HashFunctions returns the number of hash functions used
func (bf *BloomFilter) HashFunctions() uint64 {
return bf.hashFuncs
}
// Insertions returns the number of elements inserted
func (bf *BloomFilter) Insertions() uint64 {
bf.mu.RLock()
defer bf.mu.RUnlock()
return bf.insertions
}
// calculateOptimalSize calculates the optimal size of the bit array
// based on the expected number of elements and the desired false positive rate
func calculateOptimalSize(n uint64, p float64) uint64 {
// m = -n*ln(p) / (ln(2)^2)
m := float64(n) * math.Log(p) / (math.Log(2) * math.Log(2) * -1)
return uint64(math.Ceil(m))
}
// calculateOptimalHashFuncs calculates the optimal number of hash functions
// based on the size of the bit array and the expected number of elements
func calculateOptimalHashFuncs(m, n uint64) uint64 {
// k = (m/n) * ln(2)
k := float64(m) / float64(n) * math.Log(2)
return uint64(math.Max(1, math.Round(k)))
}

View File

@ -0,0 +1,228 @@
package bloomfilter
import (
"fmt"
"os"
"path/filepath"
"testing"
)
func TestBloomFilterBasic(t *testing.T) {
// Create a new Bloom filter with 1% false positive rate and 1000 expected elements
bf := NewBloomFilter(0.01, 1000)
// Add some elements
bf.Add([]byte("apple"))
bf.Add([]byte("banana"))
bf.Add([]byte("cherry"))
// Test positive lookups
if !bf.Contains([]byte("apple")) {
t.Error("Expected 'apple' to be in the filter")
}
if !bf.Contains([]byte("banana")) {
t.Error("Expected 'banana' to be in the filter")
}
if !bf.Contains([]byte("cherry")) {
t.Error("Expected 'cherry' to be in the filter")
}
// Test negative lookup
if bf.Contains([]byte("durian")) {
t.Error("Did not expect 'durian' to be in the filter")
}
}
func TestBloomFilterFalsePositiveRate(t *testing.T) {
// Create Bloom filters with different false positive rates
testCases := []struct {
fpr float64 // Target false positive rate
elements uint64 // Number of elements to insert
}{
{0.01, 1000}, // 1% false positive rate
{0.001, 1000}, // 0.1% false positive rate
{0.1, 1000}, // 10% false positive rate
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("FPR=%.3f", tc.fpr), func(t *testing.T) {
bf := NewBloomFilter(tc.fpr, tc.elements)
// Insert the expected number of elements
for i := uint64(0); i < tc.elements; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
bf.Add(key)
}
// Estimated FPR should be close to the target
estimatedFPR := bf.EstimatedFalsePositiveRate()
t.Logf("Target FPR: %.6f, Estimated FPR: %.6f", tc.fpr, estimatedFPR)
// The estimated FPR should be within a reasonable range of the target
if estimatedFPR > tc.fpr*3 || estimatedFPR < tc.fpr/3 {
t.Errorf("Estimated FPR (%.6f) is not within expected range of target FPR (%.6f)",
estimatedFPR, tc.fpr)
}
// Test negative lookups to estimate actual false positive rate
falsePositives := 0
testCount := 10000
for i := 0; i < testCount; i++ {
key := []byte(fmt.Sprintf("non-existent-key-%d", i+int(tc.elements)))
if bf.Contains(key) {
falsePositives++
}
}
actualFPR := float64(falsePositives) / float64(testCount)
t.Logf("Actual FPR from tests: %.6f", actualFPR)
// Actual FPR should be close to the target as well
// But this is probabilistic, so be generous with the range
if actualFPR > tc.fpr*100 {
t.Errorf("Actual FPR (%.6f) is significantly higher than target FPR (%.6f)",
actualFPR, tc.fpr)
}
})
}
}
func TestBloomFilterClear(t *testing.T) {
bf := NewBloomFilter(0.01, 1000)
// Add some elements
bf.Add([]byte("apple"))
bf.Add([]byte("banana"))
// Verify they're in the filter
if !bf.Contains([]byte("apple")) || !bf.Contains([]byte("banana")) {
t.Fatal("Elements not added to filter correctly")
}
// Clear the filter
bf.Clear()
// Verify elements are no longer in the filter
if bf.Contains([]byte("apple")) || bf.Contains([]byte("banana")) {
t.Error("Filter was not cleared correctly")
}
// Insertions count should be reset
if bf.Insertions() != 0 {
t.Errorf("Insertion count should be 0 after clear, got %d", bf.Insertions())
}
}
func TestBloomFilterSaveLoad(t *testing.T) {
// Create a temporary directory for test files
tempDir, err := os.MkdirTemp("", "bloom_filter_test")
if err != nil {
t.Fatalf("Failed to create temp directory: %v", err)
}
defer os.RemoveAll(tempDir)
filterPath := filepath.Join(tempDir, "test_filter.bloom")
// Create and populate a filter
bf := NewBloomFilter(0.01, 1000)
testKeys := []string{"apple", "banana", "cherry", "date", "elderberry"}
for _, key := range testKeys {
bf.Add([]byte(key))
}
// Save to file
err = bf.SaveToFile(filterPath)
if err != nil {
t.Fatalf("Failed to save filter: %v", err)
}
// Load from file
loadedBF, err := LoadBloomFilter(filterPath)
if err != nil {
t.Fatalf("Failed to load filter: %v", err)
}
// Verify properties
if loadedBF.Size() != bf.Size() {
t.Errorf("Size mismatch: expected %d, got %d", bf.Size(), loadedBF.Size())
}
if loadedBF.HashFunctions() != bf.HashFunctions() {
t.Errorf("Hash function count mismatch: expected %d, got %d",
bf.HashFunctions(), loadedBF.HashFunctions())
}
if loadedBF.Insertions() != bf.Insertions() {
t.Errorf("Insertion count mismatch: expected %d, got %d",
bf.Insertions(), loadedBF.Insertions())
}
// Verify all test keys are found
for _, key := range testKeys {
if !loadedBF.Contains([]byte(key)) {
t.Errorf("Loaded filter doesn't contain key: %s", key)
}
}
// Verify non-existent key behavior is preserved
if loadedBF.Contains([]byte("non-existent")) != bf.Contains([]byte("non-existent")) {
t.Error("Loaded filter behavior differs for non-existent key")
}
}
func TestBloomFilterPerformance(t *testing.T) {
if testing.Short() {
t.Skip("Skipping performance test in short mode")
}
// Create a large filter
bf := NewBloomFilter(0.01, 1000000) // 1% FPR, 1 million elements
// Insert a large number of elements
for i := 0; i < 1000000; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
bf.Add(key)
}
// Verify all inserted elements are found
for i := 0; i < 1000; i++ { // Sample 1000 keys
key := []byte(fmt.Sprintf("key-%d", i))
if !bf.Contains(key) {
t.Errorf("False negative: key %s should be in the filter", key)
}
}
}
func BenchmarkBloomFilterAdd(b *testing.B) {
bf := NewBloomFilter(0.01, uint64(b.N))
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
bf.Add(key)
}
}
func BenchmarkBloomFilterContains(b *testing.B) {
bf := NewBloomFilter(0.01, 1000000)
// Insert some elements first
for i := 0; i < 1000000; i++ {
key := []byte(fmt.Sprintf("key-%d", i))
bf.Add(key)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Mix of existing and non-existing keys
keyExists := i%2 == 0
var key []byte
if keyExists {
key = []byte(fmt.Sprintf("key-%d", i%1000000))
} else {
key = []byte(fmt.Sprintf("non-existent-%d", i))
}
bf.Contains(key)
}
}

View File

@ -0,0 +1,113 @@
package sstable
import (
"fmt"
"os"
"path/filepath"
"testing"
)
func BenchmarkBloomFilterGet(b *testing.B) {
// Test with and without bloom filters
for _, enableBloomFilter := range []bool{false, true} {
name := "WithoutBloomFilter"
if enableBloomFilter {
name = "WithBloomFilter"
}
b.Run(name, func(b *testing.B) {
// Create temporary directory for the test
tmpDir, err := os.MkdirTemp("", "sstable_bloom_benchmark")
if err != nil {
b.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
// Create SSTable file path
tablePath := filepath.Join(tmpDir, fmt.Sprintf("bench_%s.sst", name))
// Create writer with or without bloom filters
options := DefaultWriterOptions()
options.EnableBloomFilter = enableBloomFilter
writer, err := NewWriterWithOptions(tablePath, options)
if err != nil {
b.Fatalf("Failed to create writer: %v", err)
}
// Insert some known keys
// Use fewer keys for faster benchmarking
const numKeys = 1000
// Create sorted keys (SSTable requires sorted keys)
keys := make([]string, numKeys)
for i := 0; i < numKeys; i++ {
keys[i] = fmt.Sprintf("key%08d", i)
}
// Add them to the SSTable
for _, key := range keys {
value := []byte(fmt.Sprintf("val-%s", key))
if err := writer.Add([]byte(key), value); err != nil {
b.Fatalf("Failed to add key %s: %v", key, err)
}
}
// Finish writing
if err := writer.Finish(); err != nil {
b.Fatalf("Failed to finish writer: %v", err)
}
// Open reader
reader, err := OpenReader(tablePath)
if err != nil {
b.Fatalf("Failed to open reader: %v", err)
}
defer reader.Close()
// Test a few specific lookups to ensure the table was written correctly
for i := 0; i < 5; i++ {
testKey := []byte(fmt.Sprintf("key%08d", i))
expectedValue := []byte(fmt.Sprintf("val-key%08d", i))
val, err := reader.Get(testKey)
if err != nil {
b.Fatalf("Verification failed: couldn't find key %s: %v", testKey, err)
}
if string(val) != string(expectedValue) {
b.Fatalf("Value mismatch for key %s: got %q, expected %q", testKey, val, expectedValue)
}
b.Logf("Successfully verified key: %s", testKey)
}
// Reset timer for the benchmark
b.ResetTimer()
// Run benchmark - alternate between existing and non-existing keys
for i := 0; i < b.N; i++ {
var key []byte
if i%2 == 0 {
// Existing key
keyIdx := i % numKeys
key = []byte(fmt.Sprintf("key%08d", keyIdx))
// Should find this key
_, err := reader.Get(key)
if err != nil {
b.Fatalf("Failed to find existing key %s: %v", key, err)
}
} else {
// Non-existing key - this is where bloom filters really help
key = []byte(fmt.Sprintf("nonexistent%08d", i))
// Should not find this key
_, err := reader.Get(key)
if err != ErrNotFound {
b.Fatalf("Expected ErrNotFound for key %s, got: %v", key, err)
}
}
}
})
}
}

548
pkg/sstable/bench_test.go Normal file
View File

@ -0,0 +1,548 @@
package sstable
import (
"fmt"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
)
// keyForIndex generates a consistent key format for a given index
func keyForIndex(idx int) []byte {
return []byte(fmt.Sprintf("key-%08d", idx))
}
// setupBenchmarkSSTable creates a temporary SSTable file with benchmark data
func setupBenchmarkSSTable(b *testing.B, numEntries int) (string, error) {
// Create a temporary directory
dir, err := os.MkdirTemp("", "sstable-bench-*")
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err)
}
// Create a temporary SSTable file
path := filepath.Join(dir, "benchmark.sst")
// Create a writer
writer, err := NewWriter(path)
if err != nil {
os.RemoveAll(dir)
return "", fmt.Errorf("failed to create SSTable writer: %w", err)
}
// Add entries
for i := 0; i < numEntries; i++ {
key := keyForIndex(i)
value := []byte(fmt.Sprintf("value%08d", i))
if err := writer.Add(key, value); err != nil {
writer.Finish() // Ignore error here as we're already in an error path
os.RemoveAll(dir)
return "", fmt.Errorf("failed to add entry: %w", err)
}
}
// Finalize the SSTable
if err := writer.Finish(); err != nil {
os.RemoveAll(dir)
return "", fmt.Errorf("failed to finalize SSTable: %w", err)
}
return path, nil
}
// cleanup removes the temporary directory and SSTable file
func cleanup(path string) {
dir := filepath.Dir(path)
os.RemoveAll(dir)
}
// Basic test to check if SSTable read/write works at all
func TestSSTableBasicOps(t *testing.T) {
// Create a temporary directory
dir, err := os.MkdirTemp("", "sstable-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(dir)
// Create a temporary SSTable file
path := filepath.Join(dir, "basic.sst")
// Create a writer
writer, err := NewWriter(path)
if err != nil {
t.Fatalf("Failed to create SSTable writer: %v", err)
}
// Add a single entry
key := []byte("test-key")
value := []byte("test-value")
if err := writer.Add(key, value); err != nil {
writer.Finish() // Ignore error here as we're already in an error path
t.Fatalf("Failed to add entry: %v", err)
}
// Finalize the SSTable
if err := writer.Finish(); err != nil {
t.Fatalf("Failed to finalize SSTable: %v", err)
}
// Open the SSTable reader
reader, err := OpenReader(path)
if err != nil {
t.Fatalf("Failed to open SSTable reader: %v", err)
}
defer reader.Close()
// Try to get the key
result, err := reader.Get(key)
if err != nil {
t.Fatalf("Failed to get key: %v", err)
}
// Verify the value
if string(result) != string(value) {
t.Fatalf("Expected value %q, got %q", value, result)
}
t.Logf("Basic SSTable operations work correctly!")
}
// BenchmarkSSTableGet benchmarks the Get operation
func BenchmarkSSTableGet(b *testing.B) {
// Test with and without bloom filters
for _, enableBloomFilter := range []bool{false, true} {
name := "WithoutBloomFilter"
if enableBloomFilter {
name = "WithBloomFilter"
}
b.Run(name, func(b *testing.B) {
// Create a temporary directory
dir, err := os.MkdirTemp("", "sstable-bench-*")
if err != nil {
b.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(dir)
// Create a temporary SSTable file
path := filepath.Join(dir, fmt.Sprintf("benchmark_%s.sst", name))
// Create writer with appropriate options
options := DefaultWriterOptions()
options.EnableBloomFilter = enableBloomFilter
writer, err := NewWriterWithOptions(path, options)
if err != nil {
b.Fatalf("Failed to create SSTable writer: %v", err)
}
// Add entries (larger number for a more realistic benchmark)
const numEntries = 10000
for i := 0; i < numEntries; i++ {
key := []byte(fmt.Sprintf("key%08d", i))
value := []byte(fmt.Sprintf("value%08d", i))
if err := writer.Add(key, value); err != nil {
b.Fatalf("Failed to add entry: %v", err)
}
}
// Finalize the SSTable
if err := writer.Finish(); err != nil {
b.Fatalf("Failed to finalize SSTable: %v", err)
}
// Open the SSTable reader
reader, err := OpenReader(path)
if err != nil {
b.Fatalf("Failed to open SSTable reader: %v", err)
}
defer reader.Close()
// Verify that we can read a key
testKey := []byte("key00000000")
_, err = reader.Get(testKey)
if err != nil {
b.Fatalf("Failed to get test key %q: %v", testKey, err)
}
// Set up keys for benchmarking
var key []byte
r := rand.New(rand.NewSource(42)) // Use fixed seed for reproducibility
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Alternate between existing and non-existing keys
// This highlights the benefit of bloom filters for negative lookups
if i%2 == 0 {
// Existing key
idx := r.Intn(numEntries)
key = []byte(fmt.Sprintf("key%08d", idx))
// Perform the Get operation
_, err := reader.Get(key)
if err != nil {
b.Fatalf("Failed to get key %s: %v", key, err)
}
} else {
// Non-existing key
key = []byte(fmt.Sprintf("nonexistent%08d", i))
// Perform the Get operation (expect not found)
_, err := reader.Get(key)
if err != ErrNotFound {
b.Fatalf("Expected ErrNotFound for key %s, got: %v", key, err)
}
}
}
})
}
}
// BenchmarkSSTableIterator benchmarks iterator operations
func BenchmarkSSTableIterator(b *testing.B) {
// Create a temporary directory
dir, err := os.MkdirTemp("", "sstable-bench-*")
if err != nil {
b.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(dir)
// Create a temporary SSTable file with just 100 entries
path := filepath.Join(dir, "benchmark.sst")
writer, err := NewWriter(path)
if err != nil {
b.Fatalf("Failed to create SSTable writer: %v", err)
}
// Add entries
const numEntries = 100
for i := 0; i < numEntries; i++ {
key := []byte(fmt.Sprintf("key-%05d", i))
value := []byte(fmt.Sprintf("value-%05d", i))
if err := writer.Add(key, value); err != nil {
b.Fatalf("Failed to add entry: %v", err)
}
}
// Finalize the SSTable
if err := writer.Finish(); err != nil {
b.Fatalf("Failed to finalize SSTable: %v", err)
}
// Open the SSTable reader
reader, err := OpenReader(path)
if err != nil {
b.Fatalf("Failed to open SSTable reader: %v", err)
}
defer reader.Close()
b.ResetTimer()
b.Run("SeekFirst", func(b *testing.B) {
for i := 0; i < b.N; i++ {
iter := reader.NewIterator()
iter.SeekToFirst()
if !iter.Valid() {
b.Fatal("Iterator should be valid after SeekToFirst")
}
}
})
b.Run("FullScan", func(b *testing.B) {
// Do a test scan to determine the actual number of entries
testIter := reader.NewIterator()
actualCount := 0
for testIter.SeekToFirst(); testIter.Valid(); testIter.Next() {
actualCount++
}
for i := 0; i < b.N; i++ {
b.StopTimer()
iter := reader.NewIterator()
b.StartTimer()
count := 0
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
// Just access the key and value to ensure they're loaded
_ = iter.Key()
_ = iter.Value()
count++
}
if count != actualCount {
b.Fatalf("Expected %d entries, got %d", actualCount, count)
}
}
})
b.Run("RandomSeek", func(b *testing.B) {
// Use a fixed iterator for all seeks
iter := reader.NewIterator()
r := rand.New(rand.NewSource(42))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Generate a key to seek
idx := r.Intn(numEntries)
key := []byte(fmt.Sprintf("key-%05d", idx))
// Perform the seek
found := iter.Seek(key)
if !found || !iter.Valid() {
b.Fatalf("Failed to seek to key %s", key)
}
}
})
}
// BenchmarkConcurrentSSTableGet benchmarks concurrent Get operations
func BenchmarkConcurrentSSTableGet(b *testing.B) {
// Create a temporary directory
dir, err := os.MkdirTemp("", "sstable-bench-*")
if err != nil {
b.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(dir)
// Create a temporary SSTable file with just 100 entries
path := filepath.Join(dir, "benchmark.sst")
writer, err := NewWriter(path)
if err != nil {
b.Fatalf("Failed to create SSTable writer: %v", err)
}
// Add entries
const numEntries = 100
for i := 0; i < numEntries; i++ {
key := []byte(fmt.Sprintf("key-%05d", i))
value := []byte(fmt.Sprintf("value-%05d", i))
if err := writer.Add(key, value); err != nil {
b.Fatalf("Failed to add entry: %v", err)
}
}
// Finalize the SSTable
if err := writer.Finish(); err != nil {
b.Fatalf("Failed to finalize SSTable: %v", err)
}
// Open the SSTable reader
reader, err := OpenReader(path)
if err != nil {
b.Fatalf("Failed to open SSTable reader: %v", err)
}
defer reader.Close()
// Verify that we can read a key
testKey := []byte("key-00000")
_, err = reader.Get(testKey)
if err != nil {
b.Fatalf("Failed to get test key %q: %v", testKey, err)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
// Each goroutine gets its own random number generator
r := rand.New(rand.NewSource(rand.Int63()))
var key []byte
for pb.Next() {
// Use a random key from our range
idx := r.Intn(numEntries)
key = []byte(fmt.Sprintf("key-%05d", idx))
// Perform the Get operation
_, err := reader.Get(key)
if err != nil {
b.Fatalf("Failed to get key %s: %v", key, err)
}
}
})
}
// BenchmarkConcurrentSSTableIterators benchmarks concurrent iterators
func BenchmarkConcurrentSSTableIterators(b *testing.B) {
// Use a smaller number of entries to make test setup faster
const numEntries = 10000
path, err := setupBenchmarkSSTable(b, numEntries)
if err != nil {
b.Fatalf("Failed to set up benchmark SSTable: %v", err)
}
defer cleanup(path)
// Open the SSTable reader
reader, err := OpenReader(path)
if err != nil {
b.Fatalf("Failed to open SSTable reader: %v", err)
}
defer reader.Close()
// Create a pool of random keys for seeking
seekKeys := make([][]byte, 1000)
r := rand.New(rand.NewSource(42))
for i := 0; i < len(seekKeys); i++ {
idx := r.Intn(numEntries)
seekKeys[i] = keyForIndex(idx)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
// Each goroutine gets its own iterator and random number generator
iter := reader.NewIterator()
localRand := rand.New(rand.NewSource(rand.Int63()))
for pb.Next() {
// Choose a random operation type:
// 0 = Seek, 1 = SeekToFirst, 2 = Next
op := localRand.Intn(3)
switch op {
case 0:
// Random seek
keyIdx := localRand.Intn(len(seekKeys))
found := iter.Seek(seekKeys[keyIdx])
if !found {
// It's okay if we occasionally don't find a key
// (e.g., if we seek past the end)
continue
}
// Access the key/value to ensure they're loaded
if iter.Valid() {
_ = iter.Key()
_ = iter.Value()
}
case 1:
// Seek to first and read a few entries
iter.SeekToFirst()
if iter.Valid() {
_ = iter.Key()
_ = iter.Value()
// Read a few more entries
count := 0
max := localRand.Intn(10) + 1 // 1-10 entries
for count < max && iter.Next() {
_ = iter.Key()
_ = iter.Value()
count++
}
}
case 2:
// If we have a valid position, move to next
if iter.Valid() {
iter.Next()
if iter.Valid() {
_ = iter.Key()
_ = iter.Value()
}
} else {
// If not valid, seek to first
iter.SeekToFirst()
if iter.Valid() {
_ = iter.Key()
_ = iter.Value()
}
}
}
}
})
}
// BenchmarkMultipleSSTableReaders benchmarks operations with multiple SSTable readers
func BenchmarkMultipleSSTableReaders(b *testing.B) {
// Create multiple SSTable files
const numSSTables = 5
const entriesPerTable = 5000
paths := make([]string, numSSTables)
for i := 0; i < numSSTables; i++ {
path, err := setupBenchmarkSSTable(b, entriesPerTable)
if err != nil {
// Clean up any created files
for j := 0; j < i; j++ {
cleanup(paths[j])
}
b.Fatalf("Failed to set up benchmark SSTable %d: %v", i, err)
}
paths[i] = path
}
// Make sure we clean up all files
defer func() {
for _, path := range paths {
cleanup(path)
}
}()
// Open readers for all the SSTable files
readers := make([]*Reader, numSSTables)
for i, path := range paths {
reader, err := OpenReader(path)
if err != nil {
b.Fatalf("Failed to open SSTable reader for %s: %v", path, err)
}
readers[i] = reader
defer reader.Close()
}
// Prepare random keys for lookup
keys := make([][]byte, b.N)
tableIdx := make([]int, b.N)
r := rand.New(rand.NewSource(42))
for i := 0; i < b.N; i++ {
// Pick a random SSTable and a random key in that table
tableIdx[i] = r.Intn(numSSTables)
keyIdx := r.Intn(entriesPerTable)
keys[i] = keyForIndex(keyIdx)
}
b.ResetTimer()
b.Run("SerialGet", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := readers[tableIdx[i]].Get(keys[i])
if err != nil {
b.Fatalf("Failed to get key %s from SSTable %d: %v",
keys[i], tableIdx[i], err)
}
}
})
b.Run("ConcurrentGet", func(b *testing.B) {
var wg sync.WaitGroup
// Use 10 goroutines
numWorkers := 10
batchSize := b.N / numWorkers
if batchSize == 0 {
batchSize = 1
}
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
start := workerID * batchSize
end := (workerID + 1) * batchSize
if end > b.N {
end = b.N
}
for i := start; i < end; i++ {
_, err := readers[tableIdx[i]].Get(keys[i])
if err != nil {
b.Fatalf("Worker %d: Failed to get key %s from SSTable %d: %v",
workerID, keys[i], tableIdx[i], err)
}
}
}(w)
}
wg.Wait()
})
}

94
pkg/sstable/bloom_test.go Normal file
View File

@ -0,0 +1,94 @@
package sstable
import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"
)
func TestBasicBloomFilter(t *testing.T) {
// Create a temporary directory
tempDir, err := os.MkdirTemp("", "bloom_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
// Create an SSTable with bloom filters enabled
sst := filepath.Join(tempDir, "test_bloom.sst")
// Create the writer with bloom filters enabled
options := DefaultWriterOptions()
options.EnableBloomFilter = true
writer, err := NewWriterWithOptions(sst, options)
if err != nil {
t.Fatalf("Failed to create writer: %v", err)
}
// Add just a few keys
keys := []string{
"apple",
"banana",
"cherry",
"date",
"elderberry",
}
for _, key := range keys {
value := fmt.Sprintf("value-%s", key)
if err := writer.Add([]byte(key), []byte(value)); err != nil {
t.Fatalf("Failed to add key %s: %v", key, err)
}
}
// Finish writing
if err := writer.Finish(); err != nil {
t.Fatalf("Failed to finish writer: %v", err)
}
// Open the reader
reader, err := OpenReader(sst)
if err != nil {
t.Fatalf("Failed to open reader: %v", err)
}
defer reader.Close()
// Check that reader has bloom filters
if !reader.hasBloomFilter {
t.Errorf("Reader does not have bloom filters even though they were enabled")
}
// Check that all keys can be found
for _, key := range keys {
expectedValue := []byte(fmt.Sprintf("value-%s", key))
value, err := reader.Get([]byte(key))
if err != nil {
t.Errorf("Failed to find key %s: %v", key, err)
continue
}
if !bytes.Equal(value, expectedValue) {
t.Errorf("Value mismatch for key %s: got %q, expected %q", key, value, expectedValue)
} else {
t.Logf("Successfully found key %s", key)
}
}
// Check that non-existent keys are not found
nonExistentKeys := []string{
"fig",
"grape",
"honeydew",
}
for _, key := range nonExistentKeys {
_, err := reader.Get([]byte(key))
if err != ErrNotFound {
t.Errorf("Expected ErrNotFound for key %s, got: %v", key, err)
} else {
t.Logf("Correctly reported key %s as not found", key)
}
}
}

View File

@ -11,11 +11,11 @@ import (
const (
// FooterSize is the fixed size of the footer in bytes
FooterSize = 52
FooterSize = 68
// FooterMagic is a magic number to verify we're reading a valid footer
FooterMagic = uint64(0xFACEFEEDFACEFEED)
// CurrentVersion is the current file format version
CurrentVersion = uint32(1)
CurrentVersion = uint32(2) // Updated for bloom filter support
)
// Footer contains metadata for an SSTable file
@ -36,24 +36,30 @@ type Footer struct {
MinKeyOffset uint32
// Largest key in the file
MaxKeyOffset uint32
// Bloom filter offset (0 if no bloom filter)
BloomFilterOffset uint64
// Bloom filter size (0 if no bloom filter)
BloomFilterSize uint32
// Checksum of all footer fields excluding the checksum itself
Checksum uint64
}
// NewFooter creates a new footer with the given parameters
func NewFooter(indexOffset uint64, indexSize uint32, numEntries uint32,
minKeyOffset, maxKeyOffset uint32) *Footer {
minKeyOffset, maxKeyOffset uint32, bloomFilterOffset uint64, bloomFilterSize uint32) *Footer {
return &Footer{
Magic: FooterMagic,
Version: CurrentVersion,
Timestamp: time.Now().UnixNano(),
IndexOffset: indexOffset,
IndexSize: indexSize,
NumEntries: numEntries,
MinKeyOffset: minKeyOffset,
MaxKeyOffset: maxKeyOffset,
Checksum: 0, // Will be calculated during serialization
Magic: FooterMagic,
Version: CurrentVersion,
Timestamp: time.Now().UnixNano(),
IndexOffset: indexOffset,
IndexSize: indexSize,
NumEntries: numEntries,
MinKeyOffset: minKeyOffset,
MaxKeyOffset: maxKeyOffset,
BloomFilterOffset: bloomFilterOffset,
BloomFilterSize: bloomFilterSize,
Checksum: 0, // Will be calculated during serialization
}
}
@ -70,10 +76,13 @@ func (f *Footer) Encode() []byte {
binary.LittleEndian.PutUint32(result[32:36], f.NumEntries)
binary.LittleEndian.PutUint32(result[36:40], f.MinKeyOffset)
binary.LittleEndian.PutUint32(result[40:44], f.MaxKeyOffset)
binary.LittleEndian.PutUint64(result[44:52], f.BloomFilterOffset)
binary.LittleEndian.PutUint32(result[52:56], f.BloomFilterSize)
// 4 bytes of padding (56:60)
// Calculate checksum of all fields excluding the checksum itself
f.Checksum = xxhash.Sum64(result[:44])
binary.LittleEndian.PutUint64(result[44:], f.Checksum)
f.Checksum = xxhash.Sum64(result[:60])
binary.LittleEndian.PutUint64(result[60:], f.Checksum)
return result
}
@ -101,7 +110,21 @@ func Decode(data []byte) (*Footer, error) {
NumEntries: binary.LittleEndian.Uint32(data[32:36]),
MinKeyOffset: binary.LittleEndian.Uint32(data[36:40]),
MaxKeyOffset: binary.LittleEndian.Uint32(data[40:44]),
Checksum: binary.LittleEndian.Uint64(data[44:]),
}
// Check version to determine how to decode the rest
// Version 1: Original format without bloom filters
// Version 2+: Format with bloom filters
if footer.Version >= 2 {
footer.BloomFilterOffset = binary.LittleEndian.Uint64(data[44:52])
footer.BloomFilterSize = binary.LittleEndian.Uint32(data[52:56])
// 4 bytes of padding (56:60)
footer.Checksum = binary.LittleEndian.Uint64(data[60:])
} else {
// Legacy format without bloom filters
footer.BloomFilterOffset = 0
footer.BloomFilterSize = 0
footer.Checksum = binary.LittleEndian.Uint64(data[44:52])
}
// Verify magic number
@ -110,8 +133,14 @@ func Decode(data []byte) (*Footer, error) {
footer.Magic, FooterMagic)
}
// Verify checksum
expectedChecksum := xxhash.Sum64(data[:44])
// Verify checksum based on version
var expectedChecksum uint64
if footer.Version >= 2 {
expectedChecksum = xxhash.Sum64(data[:60])
} else {
expectedChecksum = xxhash.Sum64(data[:44])
}
if footer.Checksum != expectedChecksum {
return nil, fmt.Errorf("footer checksum mismatch: file has %d, calculated %d",
footer.Checksum, expectedChecksum)

View File

@ -14,6 +14,8 @@ func TestFooterEncodeDecode(t *testing.T) {
1234, // numEntries
100, // minKeyOffset
200, // maxKeyOffset
5000, // bloomFilterOffset
300, // bloomFilterSize
)
// Encode the footer
@ -76,6 +78,8 @@ func TestFooterWriteTo(t *testing.T) {
1234, // numEntries
100, // minKeyOffset
200, // maxKeyOffset
5000, // bloomFilterOffset
300, // bloomFilterSize
)
// Write to a buffer
@ -115,6 +119,8 @@ func TestFooterCorruption(t *testing.T) {
1234, // numEntries
100, // minKeyOffset
200, // maxKeyOffset
5000, // bloomFilterOffset
300, // bloomFilterSize
)
// Encode the footer
@ -150,7 +156,7 @@ func TestFooterCorruption(t *testing.T) {
func TestFooterVersionCheck(t *testing.T) {
// Create a footer with the current version
f := NewFooter(1000, 500, 1234, 100, 200)
f := NewFooter(1000, 500, 1234, 100, 200, 5000, 300)
// Create a modified version
f.Version = 9999

View File

@ -7,6 +7,7 @@ import (
"os"
"sync"
bloomfilter "github.com/KevoDB/kevo/pkg/bloom_filter"
"github.com/KevoDB/kevo/pkg/sstable/block"
"github.com/KevoDB/kevo/pkg/sstable/footer"
)
@ -131,6 +132,55 @@ func ParseBlockLocator(key, value []byte) (BlockLocator, error) {
}, nil
}
// BlockCache is a simple LRU cache for data blocks
type BlockCache struct {
blocks map[uint64]*block.Reader
maxBlocks int
// Using a simple approach for now - more sophisticated LRU could be implemented
// with a linked list or other data structure for better eviction
mu sync.RWMutex
}
// NewBlockCache creates a new block cache with the specified capacity
func NewBlockCache(capacity int) *BlockCache {
return &BlockCache{
blocks: make(map[uint64]*block.Reader),
maxBlocks: capacity,
}
}
// Get retrieves a block from the cache
func (c *BlockCache) Get(offset uint64) (*block.Reader, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
block, found := c.blocks[offset]
return block, found
}
// Put adds a block to the cache
func (c *BlockCache) Put(offset uint64, block *block.Reader) {
c.mu.Lock()
defer c.mu.Unlock()
// If cache is full, evict a random block (simple strategy for now)
if len(c.blocks) >= c.maxBlocks {
// Pick a random offset to evict
for k := range c.blocks {
delete(c.blocks, k)
break
}
}
c.blocks[offset] = block
}
// BlockBloomFilter associates a bloom filter with a block offset
type BlockBloomFilter struct {
blockOffset uint64
filter *bloomfilter.BloomFilter
}
// Reader reads an SSTable file
type Reader struct {
ioManager *IOManager
@ -141,6 +191,11 @@ type Reader struct {
indexBlock *block.Reader
ft *footer.Footer
mu sync.RWMutex
// Add block cache
blockCache *BlockCache
// Add bloom filters
bloomFilters []BlockBloomFilter
hasBloomFilter bool
}
// OpenReader opens an SSTable file for reading
@ -188,15 +243,82 @@ func OpenReader(path string) (*Reader, error) {
return nil, fmt.Errorf("failed to create index block reader: %w", err)
}
return &Reader{
ioManager: ioManager,
blockFetcher: blockFetcher,
indexOffset: ft.IndexOffset,
indexSize: ft.IndexSize,
numEntries: ft.NumEntries,
indexBlock: indexBlock,
ft: ft,
}, nil
// Initialize reader with basic fields
reader := &Reader{
ioManager: ioManager,
blockFetcher: blockFetcher,
indexOffset: ft.IndexOffset,
indexSize: ft.IndexSize,
numEntries: ft.NumEntries,
indexBlock: indexBlock,
ft: ft,
blockCache: NewBlockCache(100), // Cache up to 100 blocks by default
bloomFilters: make([]BlockBloomFilter, 0),
hasBloomFilter: ft.BloomFilterOffset > 0 && ft.BloomFilterSize > 0,
}
// Load bloom filters if they exist
if reader.hasBloomFilter {
// Read the bloom filter data
bloomFilterData := make([]byte, ft.BloomFilterSize)
_, err = ioManager.ReadAt(bloomFilterData, int64(ft.BloomFilterOffset))
if err != nil {
ioManager.Close()
return nil, fmt.Errorf("failed to read bloom filter data: %w", err)
}
// Process the bloom filter data
var pos uint32 = 0
for pos < ft.BloomFilterSize {
// Read the block offset and filter size
if pos+12 > ft.BloomFilterSize {
break // Not enough data for header
}
blockOffset := binary.LittleEndian.Uint64(bloomFilterData[pos:pos+8])
filterSize := binary.LittleEndian.Uint32(bloomFilterData[pos+8:pos+12])
pos += 12
// Ensure we have enough data for the filter
if pos+filterSize > ft.BloomFilterSize {
break
}
// Create a temporary file to load the bloom filter
tempFile, err := os.CreateTemp("", "bloom-filter-*.tmp")
if err != nil {
continue // Skip this filter if we can't create temp file
}
tempPath := tempFile.Name()
// Write the bloom filter data to the temp file
_, err = tempFile.Write(bloomFilterData[pos:pos+filterSize])
tempFile.Close()
if err != nil {
os.Remove(tempPath)
continue
}
// Load the bloom filter
filter, err := bloomfilter.LoadBloomFilter(tempPath)
os.Remove(tempPath) // Clean up temp file
if err != nil {
continue // Skip this filter
}
// Add the bloom filter to our list
reader.bloomFilters = append(reader.bloomFilters, BlockBloomFilter{
blockOffset: blockOffset,
filter: filter,
})
// Move to the next filter
pos += filterSize
}
}
return reader, nil
}
// FindBlockForKey finds the block that might contain the given key
@ -265,9 +387,43 @@ func (r *Reader) Get(key []byte) ([]byte, error) {
// Search through each block
for _, locator := range blocks {
blockReader, err := r.blockFetcher.FetchBlock(locator.Offset, locator.Size)
if err != nil {
return nil, err
// Check bloom filter first if available
if r.hasBloomFilter {
// Find the bloom filter for this block
var shouldSkip = true
for _, bf := range r.bloomFilters {
if bf.blockOffset == locator.Offset {
// Found a bloom filter for this block
// If the key might be in this block, we'll check it
if bf.filter.Contains(key) {
shouldSkip = false
}
break
}
}
// If the bloom filter says the key definitely isn't in this block, skip it
if shouldSkip {
continue
}
}
var blockReader *block.Reader
// Try to get the block from cache first
cachedBlock, found := r.blockCache.Get(locator.Offset)
if found {
// Use cached block
blockReader = cachedBlock
} else {
// Block not in cache, fetch from disk
blockReader, err = r.blockFetcher.FetchBlock(locator.Offset, locator.Size)
if err != nil {
return nil, err
}
// Add to cache for future use
r.blockCache.Put(locator.Offset, blockReader)
}
// Search for the key in this block

View File

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
bloomfilter "github.com/KevoDB/kevo/pkg/bloom_filter"
"github.com/KevoDB/kevo/pkg/sstable/block"
"github.com/KevoDB/kevo/pkg/sstable/footer"
)
@ -173,31 +174,112 @@ func (ib *IndexBuilder) Serialize() ([]byte, error) {
return buf.Bytes(), nil
}
// Writer writes an SSTable file
type Writer struct {
fileManager *FileManager
blockManager *BlockManager
indexBuilder *IndexBuilder
dataOffset uint64
firstKey []byte
lastKey []byte
entriesAdded uint32
// BlockBloomFilterBuilder collects keys for a data block's bloom filter
type BlockBloomFilterBuilder struct {
blockOffset uint64
filter *bloomfilter.BloomFilter
}
// NewWriter creates a new SSTable writer
// NewBlockBloomFilterBuilder creates a new builder for a block's bloom filter
func NewBlockBloomFilterBuilder(blockOffset uint64, expectedEntries uint64) *BlockBloomFilterBuilder {
// Use 1% false positive rate for a good balance of size and accuracy
filter := bloomfilter.NewBloomFilter(0.01, expectedEntries)
return &BlockBloomFilterBuilder{
blockOffset: blockOffset,
filter: filter,
}
}
// AddKey adds a key to the bloom filter
func (b *BlockBloomFilterBuilder) AddKey(key []byte) {
b.filter.Add(key)
}
// Serialize returns the serialized bloom filter
func (b *BlockBloomFilterBuilder) Serialize() ([]byte, error) {
// Create a temporary file to save the filter
tempFile, err := os.CreateTemp("", "bloom-filter-*.tmp")
if err != nil {
return nil, fmt.Errorf("failed to create temp file for bloom filter: %w", err)
}
tempPath := tempFile.Name()
tempFile.Close()
defer os.Remove(tempPath)
// Save the filter to the temp file
if err := b.filter.SaveToFile(tempPath); err != nil {
return nil, fmt.Errorf("failed to save bloom filter: %w", err)
}
// Read the file contents
data, err := os.ReadFile(tempPath)
if err != nil {
return nil, fmt.Errorf("failed to read bloom filter data: %w", err)
}
return data, nil
}
// Writer writes an SSTable file
type Writer struct {
fileManager *FileManager
blockManager *BlockManager
indexBuilder *IndexBuilder
dataOffset uint64
firstKey []byte
lastKey []byte
entriesAdded uint32
// Bloom filter support
bloomFilterEnabled bool
bloomFilters []*BlockBloomFilterBuilder
currentBloomFilter *BlockBloomFilterBuilder
}
// Options for configuring the SSTable writer
type WriterOptions struct {
// Enable bloom filters for faster lookups (recommended)
EnableBloomFilter bool
// Expected entries per block (helps size bloom filters appropriately)
ExpectedEntriesPerBlock uint64
}
// DefaultWriterOptions returns the default options for the writer
func DefaultWriterOptions() WriterOptions {
return WriterOptions{
EnableBloomFilter: true,
ExpectedEntriesPerBlock: 1000, // Reasonable default for many workloads
}
}
// NewWriter creates a new SSTable writer with default options
func NewWriter(path string) (*Writer, error) {
return NewWriterWithOptions(path, DefaultWriterOptions())
}
// NewWriterWithOptions creates a new SSTable writer with custom options
func NewWriterWithOptions(path string, options WriterOptions) (*Writer, error) {
fileManager, err := NewFileManager(path)
if err != nil {
return nil, err
}
return &Writer{
fileManager: fileManager,
blockManager: NewBlockManager(),
indexBuilder: NewIndexBuilder(),
dataOffset: 0,
entriesAdded: 0,
}, nil
w := &Writer{
fileManager: fileManager,
blockManager: NewBlockManager(),
indexBuilder: NewIndexBuilder(),
dataOffset: 0,
entriesAdded: 0,
bloomFilterEnabled: options.EnableBloomFilter,
bloomFilters: make([]*BlockBloomFilterBuilder, 0),
}
// Initialize the first bloom filter if enabled
if w.bloomFilterEnabled {
w.currentBloomFilter = NewBlockBloomFilterBuilder(0, options.ExpectedEntriesPerBlock)
}
return w, nil
}
// Add adds a key-value pair to the SSTable
@ -209,6 +291,11 @@ func (w *Writer) Add(key, value []byte) error {
}
w.lastKey = append([]byte(nil), key...)
// Add to bloom filter if enabled
if w.bloomFilterEnabled && w.currentBloomFilter != nil {
w.currentBloomFilter.AddKey(key)
}
// Add to block
if err := w.blockManager.Add(key, value); err != nil {
return fmt.Errorf("failed to add to block: %w", err)
@ -274,6 +361,15 @@ func (w *Writer) flushBlock() error {
FirstKey: firstKey,
})
// Finalize the current bloom filter for this block
if w.bloomFilterEnabled && w.currentBloomFilter != nil {
// Store the bloom filter for this block
w.bloomFilters = append(w.bloomFilters, w.currentBloomFilter)
// Create a new bloom filter for the next block
w.currentBloomFilter = NewBlockBloomFilterBuilder(w.dataOffset, DefaultWriterOptions().ExpectedEntriesPerBlock)
}
// Update offset for next block
w.dataOffset += uint64(n)
@ -296,6 +392,51 @@ func (w *Writer) Finish() error {
}
}
// Write bloom filters if enabled
var bloomFilterOffset uint64 = 0
var bloomFilterSize uint32 = 0
if w.bloomFilterEnabled && len(w.bloomFilters) > 0 {
bloomFilterOffset = w.dataOffset
// Write each bloom filter to the file
for _, bf := range w.bloomFilters {
// Serialize the bloom filter
bfData, err := bf.Serialize()
if err != nil {
return fmt.Errorf("failed to serialize bloom filter: %w", err)
}
// First write the block offset and size of this filter
// Format: 8 bytes for offset, 4 bytes for filter size
offsetBytes := make([]byte, 12)
binary.LittleEndian.PutUint64(offsetBytes[:8], bf.blockOffset)
binary.LittleEndian.PutUint32(offsetBytes[8:12], uint32(len(bfData)))
// Write the offset/size header
n, err := w.fileManager.Write(offsetBytes)
if err != nil {
return fmt.Errorf("failed to write bloom filter header: %w", err)
}
if n != len(offsetBytes) {
return fmt.Errorf("wrote incomplete bloom filter header: %d of %d bytes", n, len(offsetBytes))
}
// Write the actual bloom filter data
n, err = w.fileManager.Write(bfData)
if err != nil {
return fmt.Errorf("failed to write bloom filter data: %w", err)
}
if n != len(bfData) {
return fmt.Errorf("wrote incomplete bloom filter data: %d of %d bytes", n, len(bfData))
}
// Update the data offset
w.dataOffset += uint64(len(offsetBytes) + len(bfData))
bloomFilterSize += uint32(len(offsetBytes) + len(bfData))
}
}
// Create index block
indexOffset := w.dataOffset
@ -321,13 +462,18 @@ func (w *Writer) Finish() error {
n, len(indexData))
}
// Create footer
// Update offset after writing index
w.dataOffset += uint64(n)
// Create footer with bloom filter information
ft := footer.NewFooter(
indexOffset,
indexSize,
w.entriesAdded,
0, // MinKeyOffset - not implemented yet
0, // MaxKeyOffset - not implemented yet
bloomFilterOffset,
bloomFilterSize,
)
// Serialize footer