refactor: comprehensive restructuring of storage engine with improved WAL recovery and iterators

This commit is contained in:
Jeremy Tregunna 2025-04-20 12:51:00 -06:00
parent 4e7dea34fe
commit 7231a48d0e
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
20 changed files with 1629 additions and 1317 deletions

1
.gitignore vendored
View File

@ -11,6 +11,7 @@
benchmark-data
# Executables
gs
storage-bench
# Dependency directories

View File

@ -0,0 +1,149 @@
package compaction
import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"git.canoozie.net/jer/go-storage/pkg/config"
"git.canoozie.net/jer/go-storage/pkg/sstable"
)
// BaseCompactionStrategy provides common functionality for compaction strategies
type BaseCompactionStrategy struct {
// Configuration
cfg *config.Config
// SSTable directory
sstableDir string
// File information by level
levels map[int][]*SSTableInfo
}
// NewBaseCompactionStrategy creates a new base compaction strategy
func NewBaseCompactionStrategy(cfg *config.Config, sstableDir string) *BaseCompactionStrategy {
return &BaseCompactionStrategy{
cfg: cfg,
sstableDir: sstableDir,
levels: make(map[int][]*SSTableInfo),
}
}
// LoadSSTables scans the SSTable directory and loads metadata for all files
func (s *BaseCompactionStrategy) LoadSSTables() error {
// Clear existing data
s.levels = make(map[int][]*SSTableInfo)
// Read all files from the SSTable directory
entries, err := os.ReadDir(s.sstableDir)
if err != nil {
if os.IsNotExist(err) {
return nil // Directory doesn't exist yet
}
return fmt.Errorf("failed to read SSTable directory: %w", err)
}
// Parse filenames and collect information
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sst") {
continue // Skip directories and non-SSTable files
}
// Parse filename to extract level, sequence, and timestamp
// Filename format: level_sequence_timestamp.sst
var level int
var sequence uint64
var timestamp int64
if n, err := fmt.Sscanf(entry.Name(), "%d_%06d_%020d.sst",
&level, &sequence, &timestamp); n != 3 || err != nil {
// Skip files that don't match our naming pattern
continue
}
// Get file info for size
fi, err := entry.Info()
if err != nil {
return fmt.Errorf("failed to get file info for %s: %w", entry.Name(), err)
}
// Open the file to extract key range information
path := filepath.Join(s.sstableDir, entry.Name())
reader, err := sstable.OpenReader(path)
if err != nil {
return fmt.Errorf("failed to open SSTable %s: %w", path, err)
}
// Create iterator to get first and last keys
iter := reader.NewIterator()
var firstKey, lastKey []byte
// Get first key
iter.SeekToFirst()
if iter.Valid() {
firstKey = append([]byte{}, iter.Key()...)
}
// Get last key
iter.SeekToLast()
if iter.Valid() {
lastKey = append([]byte{}, iter.Key()...)
}
// Create SSTable info
info := &SSTableInfo{
Path: path,
Level: level,
Sequence: sequence,
Timestamp: timestamp,
Size: fi.Size(),
KeyCount: reader.GetKeyCount(),
FirstKey: firstKey,
LastKey: lastKey,
Reader: reader,
}
// Add to appropriate level
s.levels[level] = append(s.levels[level], info)
}
// Sort files within each level by sequence number
for level, files := range s.levels {
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
s.levels[level] = files
}
return nil
}
// Close closes all open SSTable readers
func (s *BaseCompactionStrategy) Close() error {
var lastErr error
for _, files := range s.levels {
for _, file := range files {
if file.Reader != nil {
if err := file.Reader.Close(); err != nil && lastErr == nil {
lastErr = err
}
file.Reader = nil
}
}
}
return lastErr
}
// GetLevelSize returns the total size of all files in a level
func (s *BaseCompactionStrategy) GetLevelSize(level int) int64 {
var size int64
for _, file := range s.levels[level] {
size += file.Size
}
return size
}

View File

@ -3,15 +3,7 @@ package compaction
import (
"bytes"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
"git.canoozie.net/jer/go-storage/pkg/common/iterator"
"git.canoozie.net/jer/go-storage/pkg/common/iterator/composite"
"git.canoozie.net/jer/go-storage/pkg/config"
"git.canoozie.net/jer/go-storage/pkg/sstable"
)
@ -82,453 +74,3 @@ type CompactionTask struct {
// Output file path template
OutputPathTemplate string
}
// Compactor manages the compaction process
type Compactor struct {
// Configuration
cfg *config.Config
// SSTable directory
sstableDir string
// File information by level
levels map[int][]*SSTableInfo
// Tombstone tracking
tombstoneTracker *TombstoneTracker
}
// NewCompactor creates a new compaction manager
func NewCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *Compactor {
return &Compactor{
cfg: cfg,
sstableDir: sstableDir,
levels: make(map[int][]*SSTableInfo),
tombstoneTracker: tracker,
}
}
// LoadSSTables scans the SSTable directory and loads metadata for all files
func (c *Compactor) LoadSSTables() error {
// Clear existing data
c.levels = make(map[int][]*SSTableInfo)
// Read all files from the SSTable directory
entries, err := os.ReadDir(c.sstableDir)
if err != nil {
if os.IsNotExist(err) {
return nil // Directory doesn't exist yet
}
return fmt.Errorf("failed to read SSTable directory: %w", err)
}
// Parse filenames and collect information
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sst") {
continue // Skip directories and non-SSTable files
}
// Parse filename to extract level, sequence, and timestamp
// Filename format: level_sequence_timestamp.sst
var level int
var sequence uint64
var timestamp int64
if n, err := fmt.Sscanf(entry.Name(), "%d_%06d_%020d.sst",
&level, &sequence, &timestamp); n != 3 || err != nil {
// Skip files that don't match our naming pattern
continue
}
// Get file info for size
fi, err := entry.Info()
if err != nil {
return fmt.Errorf("failed to get file info for %s: %w", entry.Name(), err)
}
// Open the file to extract key range information
path := filepath.Join(c.sstableDir, entry.Name())
reader, err := sstable.OpenReader(path)
if err != nil {
return fmt.Errorf("failed to open SSTable %s: %w", path, err)
}
// Create iterator to get first and last keys
iter := reader.NewIterator()
var firstKey, lastKey []byte
// Get first key
iter.SeekToFirst()
if iter.Valid() {
firstKey = append([]byte{}, iter.Key()...)
}
// Get last key
iter.SeekToLast()
if iter.Valid() {
lastKey = append([]byte{}, iter.Key()...)
}
// Create SSTable info
info := &SSTableInfo{
Path: path,
Level: level,
Sequence: sequence,
Timestamp: timestamp,
Size: fi.Size(),
KeyCount: reader.GetKeyCount(),
FirstKey: firstKey,
LastKey: lastKey,
Reader: reader,
}
// Add to appropriate level
c.levels[level] = append(c.levels[level], info)
}
// Sort files within each level by sequence number
for level, files := range c.levels {
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
c.levels[level] = files
}
return nil
}
// Close closes all open SSTable readers
func (c *Compactor) Close() error {
var lastErr error
for _, files := range c.levels {
for _, file := range files {
if file.Reader != nil {
if err := file.Reader.Close(); err != nil && lastErr == nil {
lastErr = err
}
file.Reader = nil
}
}
}
return lastErr
}
// GetLevelSize returns the total size of all files in a level
func (c *Compactor) GetLevelSize(level int) int64 {
var size int64
for _, file := range c.levels[level] {
size += file.Size
}
return size
}
// GetCompactionTask selects files for compaction based on size ratio and overlap
func (c *Compactor) GetCompactionTask() (*CompactionTask, error) {
// No compaction if we don't have at least 2 levels with files
if len(c.levels) < 1 {
return nil, nil
}
// Check if L0 needs compaction (Level 0 is special because files may overlap)
if len(c.levels[0]) >= c.cfg.MaxMemTables {
return c.selectLevel0Compaction()
}
// Check higher levels based on size ratio
maxLevel := 0
for level := range c.levels {
if level > maxLevel {
maxLevel = level
}
}
// Check each level starting from 1
for level := 1; level <= maxLevel; level++ {
// If this level is too large compared to the next level
nextLevelSize := c.GetLevelSize(level + 1)
thisLevelSize := c.GetLevelSize(level)
// If this level is empty, skip it
if thisLevelSize == 0 {
continue
}
// If the next level doesn't exist yet or is empty, create it
if nextLevelSize == 0 {
// Choose one file from this level to move to the next level
if len(c.levels[level]) > 0 {
return c.selectSingleFileCompaction(level)
}
continue
}
// Check if this level exceeds the size ratio threshold
sizeRatio := float64(thisLevelSize) / float64(nextLevelSize)
if sizeRatio >= c.cfg.CompactionRatio {
return c.selectSizeBasedCompaction(level)
}
}
// No compaction needed
return nil, nil
}
// selectLevel0Compaction selects files from L0 for compaction
func (c *Compactor) selectLevel0Compaction() (*CompactionTask, error) {
// Not enough files in L0 for compaction
if len(c.levels[0]) < 2 {
return nil, nil
}
// For L0, take oldest files and compact them to L1
numFiles := len(c.levels[0])
maxCompactFiles := c.cfg.MaxMemTables
if numFiles > maxCompactFiles {
numFiles = maxCompactFiles
}
// Sort L0 files by sequence number to get oldest first
files := make([]*SSTableInfo, len(c.levels[0]))
copy(files, c.levels[0])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select oldest files for compaction
selectedFiles := files[:numFiles]
// Determine key range of selected files
var minKey, maxKey []byte
for _, file := range selectedFiles {
if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 {
minKey = file.FirstKey
}
if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 {
maxKey = file.LastKey
}
}
// Find overlapping files in L1
var l1Files []*SSTableInfo
for _, file := range c.levels[1] {
// Create a temporary SSTableInfo for the key range
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
if file.Overlaps(rangeInfo) {
l1Files = append(l1Files, file)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
0: selectedFiles,
1: l1Files,
},
TargetLevel: 1,
OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectSingleFileCompaction selects a single file to compact to the next level
func (c *Compactor) selectSingleFileCompaction(level int) (*CompactionTask, error) {
// Find the oldest file in this level
if len(c.levels[level]) == 0 {
return nil, nil
}
// Sort by sequence number to get the oldest file
files := make([]*SSTableInfo, len(c.levels[level]))
copy(files, c.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select the oldest file
file := files[0]
// Find overlapping files in the next level
var nextLevelFiles []*SSTableInfo
for _, nextFile := range c.levels[level+1] {
if file.Overlaps(nextFile) {
nextLevelFiles = append(nextLevelFiles, nextFile)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
level + 1: nextLevelFiles,
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectSizeBasedCompaction selects files for compaction based on size ratio
func (c *Compactor) selectSizeBasedCompaction(level int) (*CompactionTask, error) {
// Find a file from this level to compact
if len(c.levels[level]) == 0 {
return nil, nil
}
// Choose a file that hasn't been compacted recently
// For simplicity, just choose the oldest file (lowest sequence number)
files := make([]*SSTableInfo, len(c.levels[level]))
copy(files, c.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select the oldest file
file := files[0]
// Find overlapping files in the next level
var nextLevelFiles []*SSTableInfo
for _, nextFile := range c.levels[level+1] {
if file.Overlaps(nextFile) {
nextLevelFiles = append(nextLevelFiles, nextFile)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
level + 1: nextLevelFiles,
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// CompactFiles performs the actual compaction of the input files
func (c *Compactor) CompactFiles(task *CompactionTask) ([]string, error) {
// Create a merged iterator over all input files
var iterators []iterator.Iterator
// Add iterators from both levels
for level := 0; level <= task.TargetLevel; level++ {
for _, file := range task.InputFiles[level] {
// We need an iterator that preserves delete markers
if file.Reader != nil {
iterators = append(iterators, file.Reader.NewIterator())
}
}
}
// Create hierarchical merged iterator
mergedIter := composite.NewHierarchicalIterator(iterators)
// Remember the input file paths for later cleanup
var inputPaths []string
for _, files := range task.InputFiles {
for _, file := range files {
inputPaths = append(inputPaths, file.Path)
}
}
// Track keys to skip duplicate entries (for tombstones)
var lastKey []byte
var outputFiles []string
var currentWriter *sstable.Writer
var currentOutputPath string
var outputFileSequence uint64 = 1
var entriesInCurrentFile int
// Function to create a new output file
createNewOutputFile := func() error {
if currentWriter != nil {
if err := currentWriter.Finish(); err != nil {
return fmt.Errorf("failed to finish SSTable: %w", err)
}
outputFiles = append(outputFiles, currentOutputPath)
}
// Create a new output file
timestamp := time.Now().UnixNano()
currentOutputPath = fmt.Sprintf(task.OutputPathTemplate,
task.TargetLevel, outputFileSequence, timestamp)
outputFileSequence++
var err error
currentWriter, err = sstable.NewWriter(currentOutputPath)
if err != nil {
return fmt.Errorf("failed to create SSTable writer: %w", err)
}
entriesInCurrentFile = 0
return nil
}
// Create the first output file
if err := createNewOutputFile(); err != nil {
return nil, err
}
// Iterate through all keys in sorted order
mergedIter.SeekToFirst()
for mergedIter.Valid() {
key := mergedIter.Key()
value := mergedIter.Value()
// Skip duplicates (we've already included the newest version)
if lastKey != nil && bytes.Equal(key, lastKey) {
mergedIter.Next()
continue
}
// Add the entry to the current output file, preserving both values and tombstones
// This ensures deletion markers are properly maintained
if err := currentWriter.Add(key, value); err != nil {
return nil, fmt.Errorf("failed to add entry to SSTable: %w", err)
}
entriesInCurrentFile++
// If the current file is big enough, start a new one
if int64(entriesInCurrentFile) >= c.cfg.SSTableMaxSize {
if err := createNewOutputFile(); err != nil {
return nil, err
}
}
// Remember this key to skip duplicates
lastKey = append(lastKey[:0], key...)
mergedIter.Next()
}
// Finish the last output file
if currentWriter != nil && entriesInCurrentFile > 0 {
if err := currentWriter.Finish(); err != nil {
return nil, fmt.Errorf("failed to finish SSTable: %w", err)
}
outputFiles = append(outputFiles, currentOutputPath)
} else if currentWriter != nil {
// No entries were written, abort the file
currentWriter.Abort()
}
return outputFiles, nil
}
// DeleteCompactedFiles removes the input files that were successfully compacted
func (c *Compactor) DeleteCompactedFiles(filePaths []string) error {
for _, path := range filePaths {
if err := os.Remove(path); err != nil {
return fmt.Errorf("failed to delete compacted file %s: %w", path, err)
}
}
return nil
}

View File

@ -65,6 +65,7 @@ func setupCompactionTest(t *testing.T) (string, *config.Config, func()) {
CompactionThreads: 1,
MaxMemTables: 2,
SSTableMaxSize: 1000,
MaxLevelWithTombstones: 3,
}
// Return cleanup function
@ -98,26 +99,22 @@ func TestCompactorLoadSSTables(t *testing.T) {
createTestSSTable(t, sstDir, 0, 1, timestamp, data1)
createTestSSTable(t, sstDir, 0, 2, timestamp+1, data2)
// Create the compactor
// Create a tombstone tracker
tracker := NewTombstoneTracker(24 * time.Hour)
// Create the compactor
compactor := NewCompactor(cfg, sstDir, tracker)
// Create the strategy
strategy := NewBaseCompactionStrategy(cfg, sstDir)
// Load SSTables
err := compactor.LoadSSTables()
err := strategy.LoadSSTables()
if err != nil {
t.Fatalf("Failed to load SSTables: %v", err)
}
// Verify the correct number of files was loaded
if len(compactor.levels[0]) != 2 {
t.Errorf("Expected 2 files in level 0, got %d", len(compactor.levels[0]))
if len(strategy.levels[0]) != 2 {
t.Errorf("Expected 2 files in level 0, got %d", len(strategy.levels[0]))
}
// Verify key ranges
for _, file := range compactor.levels[0] {
for _, file := range strategy.levels[0] {
if bytes.Equal(file.FirstKey, []byte("a")) {
if !bytes.Equal(file.LastKey, []byte("c")) {
t.Errorf("Expected last key 'c', got '%s'", string(file.LastKey))
@ -196,18 +193,18 @@ func TestCompactorSelectLevel0Compaction(t *testing.T) {
// Create the compactor
// Create a tombstone tracker
tracker := NewTombstoneTracker(24 * time.Hour)
executor := NewCompactionExecutor(cfg, sstDir, tracker)
// Create the compactor
compactor := NewTieredCompactor(cfg, sstDir, tracker)
strategy := NewTieredCompactionStrategy(cfg, sstDir, executor)
// Load SSTables
err := compactor.LoadSSTables()
err := strategy.LoadSSTables()
if err != nil {
t.Fatalf("Failed to load SSTables: %v", err)
}
// Select compaction task
task, err := compactor.SelectCompaction()
task, err := strategy.SelectCompaction()
if err != nil {
t.Fatalf("Failed to select compaction: %v", err)
}
@ -253,14 +250,12 @@ func TestCompactFiles(t *testing.T) {
t.Logf("Created test SSTables: %s, %s", sstPath1, sstPath2)
// Create the compactor
// Create a tombstone tracker
tracker := NewTombstoneTracker(24 * time.Hour)
// Create the compactor
compactor := NewCompactor(cfg, sstDir, tracker)
executor := NewCompactionExecutor(cfg, sstDir, tracker)
strategy := NewBaseCompactionStrategy(cfg, sstDir)
// Load SSTables
err := compactor.LoadSSTables()
err := strategy.LoadSSTables()
if err != nil {
t.Fatalf("Failed to load SSTables: %v", err)
}
@ -268,15 +263,15 @@ func TestCompactFiles(t *testing.T) {
// Create a compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
0: {compactor.levels[0][0]},
1: {compactor.levels[1][0]},
0: {strategy.levels[0][0]},
1: {strategy.levels[1][0]},
},
TargetLevel: 1,
OutputPathTemplate: filepath.Join(sstDir, "%d_%06d_%020d.sst"),
}
// Perform compaction
outputFiles, err := compactor.CompactFiles(task)
outputFiles, err := executor.CompactFiles(task)
if err != nil {
t.Fatalf("Failed to compact files: %v", err)
}

48
pkg/compaction/compat.go Normal file
View File

@ -0,0 +1,48 @@
package compaction
import (
"time"
"git.canoozie.net/jer/go-storage/pkg/config"
)
// NewCompactionManager creates a new compaction manager with the old API
// This is kept for backward compatibility with existing code
func NewCompactionManager(cfg *config.Config, sstableDir string) *DefaultCompactionCoordinator {
// Create tombstone tracker with default 24-hour retention
tombstones := NewTombstoneTracker(24 * time.Hour)
// Create file tracker
fileTracker := NewFileTracker()
// Create compaction executor
executor := NewCompactionExecutor(cfg, sstableDir, tombstones)
// Create tiered compaction strategy
strategy := NewTieredCompactionStrategy(cfg, sstableDir, executor)
// Return the new coordinator
return NewCompactionCoordinator(cfg, sstableDir, CompactionCoordinatorOptions{
Strategy: strategy,
Executor: executor,
FileTracker: fileTracker,
TombstoneManager: tombstones,
CompactionInterval: cfg.CompactionInterval,
})
}
// Temporary alias types for backward compatibility
type CompactionManager = DefaultCompactionCoordinator
type Compactor = BaseCompactionStrategy
type TieredCompactor = TieredCompactionStrategy
// NewCompactor creates a new compactor with the old API (backward compatibility)
func NewCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *BaseCompactionStrategy {
return NewBaseCompactionStrategy(cfg, sstableDir)
}
// NewTieredCompactor creates a new tiered compactor with the old API (backward compatibility)
func NewTieredCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *TieredCompactionStrategy {
executor := NewCompactionExecutor(cfg, sstableDir, tracker)
return NewTieredCompactionStrategy(cfg, sstableDir, executor)
}

View File

@ -0,0 +1,309 @@
package compaction
import (
"fmt"
"sync"
"time"
"git.canoozie.net/jer/go-storage/pkg/config"
)
// CompactionCoordinatorOptions holds configuration options for the coordinator
type CompactionCoordinatorOptions struct {
// Compaction strategy
Strategy CompactionStrategy
// Compaction executor
Executor CompactionExecutor
// File tracker
FileTracker FileTracker
// Tombstone manager
TombstoneManager TombstoneManager
// Compaction interval in seconds
CompactionInterval int64
}
// DefaultCompactionCoordinator is the default implementation of CompactionCoordinator
type DefaultCompactionCoordinator struct {
// Configuration
cfg *config.Config
// SSTable directory
sstableDir string
// Compaction strategy
strategy CompactionStrategy
// Compaction executor
executor CompactionExecutor
// File tracker
fileTracker FileTracker
// Tombstone manager
tombstoneManager TombstoneManager
// Next sequence number for SSTable files
nextSeq uint64
// Compaction state
running bool
stopCh chan struct{}
compactingMu sync.Mutex
// Last set of files produced by compaction
lastCompactionOutputs []string
resultsMu sync.RWMutex
// Compaction interval in seconds
compactionInterval int64
}
// NewCompactionCoordinator creates a new compaction coordinator
func NewCompactionCoordinator(cfg *config.Config, sstableDir string, options CompactionCoordinatorOptions) *DefaultCompactionCoordinator {
// Set defaults for any missing components
if options.FileTracker == nil {
options.FileTracker = NewFileTracker()
}
if options.TombstoneManager == nil {
options.TombstoneManager = NewTombstoneTracker(24 * time.Hour)
}
if options.Executor == nil {
options.Executor = NewCompactionExecutor(cfg, sstableDir, options.TombstoneManager)
}
if options.Strategy == nil {
options.Strategy = NewTieredCompactionStrategy(cfg, sstableDir, options.Executor)
}
if options.CompactionInterval <= 0 {
options.CompactionInterval = 1 // Default to 1 second
}
return &DefaultCompactionCoordinator{
cfg: cfg,
sstableDir: sstableDir,
strategy: options.Strategy,
executor: options.Executor,
fileTracker: options.FileTracker,
tombstoneManager: options.TombstoneManager,
nextSeq: 1,
stopCh: make(chan struct{}),
lastCompactionOutputs: make([]string, 0),
compactionInterval: options.CompactionInterval,
}
}
// Start begins background compaction
func (c *DefaultCompactionCoordinator) Start() error {
c.compactingMu.Lock()
defer c.compactingMu.Unlock()
if c.running {
return nil // Already running
}
// Load existing SSTables
if err := c.strategy.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
c.running = true
c.stopCh = make(chan struct{})
// Start background worker
go c.compactionWorker()
return nil
}
// Stop halts background compaction
func (c *DefaultCompactionCoordinator) Stop() error {
c.compactingMu.Lock()
defer c.compactingMu.Unlock()
if !c.running {
return nil // Already stopped
}
// Signal the worker to stop
close(c.stopCh)
c.running = false
// Close strategy
return c.strategy.Close()
}
// TrackTombstone adds a key to the tombstone tracker
func (c *DefaultCompactionCoordinator) TrackTombstone(key []byte) {
// Track the tombstone in our tracker
if c.tombstoneManager != nil {
c.tombstoneManager.AddTombstone(key)
}
}
// ForcePreserveTombstone marks a tombstone for special handling during compaction
// This is primarily for testing purposes, to ensure specific tombstones are preserved
func (c *DefaultCompactionCoordinator) ForcePreserveTombstone(key []byte) {
if c.tombstoneManager != nil {
c.tombstoneManager.ForcePreserveTombstone(key)
}
}
// MarkFileObsolete marks a file as obsolete (can be deleted)
// For backward compatibility with tests
func (c *DefaultCompactionCoordinator) MarkFileObsolete(path string) {
c.fileTracker.MarkFileObsolete(path)
}
// CleanupObsoleteFiles removes files that are no longer needed
// For backward compatibility with tests
func (c *DefaultCompactionCoordinator) CleanupObsoleteFiles() error {
return c.fileTracker.CleanupObsoleteFiles()
}
// compactionWorker runs the compaction loop
func (c *DefaultCompactionCoordinator) compactionWorker() {
// Ensure a minimum interval of 1 second
interval := c.compactionInterval
if interval <= 0 {
interval = 1
}
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
// Only one compaction at a time
c.compactingMu.Lock()
// Run a compaction cycle
err := c.runCompactionCycle()
if err != nil {
// In a real system, we'd log this error
// fmt.Printf("Compaction error: %v\n", err)
}
// Try to clean up obsolete files
err = c.fileTracker.CleanupObsoleteFiles()
if err != nil {
// In a real system, we'd log this error
// fmt.Printf("Cleanup error: %v\n", err)
}
// Collect tombstone garbage periodically
if manager, ok := c.tombstoneManager.(interface{ CollectGarbage() }); ok {
manager.CollectGarbage()
}
c.compactingMu.Unlock()
}
}
}
// runCompactionCycle performs a single compaction cycle
func (c *DefaultCompactionCoordinator) runCompactionCycle() error {
// Reload SSTables to get fresh information
if err := c.strategy.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
// Select files for compaction
task, err := c.strategy.SelectCompaction()
if err != nil {
return fmt.Errorf("failed to select files for compaction: %w", err)
}
// If no compaction needed, return
if task == nil {
return nil
}
// Mark files as pending
for _, files := range task.InputFiles {
for _, file := range files {
c.fileTracker.MarkFilePending(file.Path)
}
}
// Perform compaction
outputFiles, err := c.executor.CompactFiles(task)
// Unmark files as pending
for _, files := range task.InputFiles {
for _, file := range files {
c.fileTracker.UnmarkFilePending(file.Path)
}
}
// Track the compaction outputs for statistics
if err == nil && len(outputFiles) > 0 {
// Record the compaction result
c.resultsMu.Lock()
c.lastCompactionOutputs = outputFiles
c.resultsMu.Unlock()
}
// Handle compaction errors
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// Mark input files as obsolete
for _, files := range task.InputFiles {
for _, file := range files {
c.fileTracker.MarkFileObsolete(file.Path)
}
}
// Try to clean up the files immediately
return c.fileTracker.CleanupObsoleteFiles()
}
// TriggerCompaction forces a compaction cycle
func (c *DefaultCompactionCoordinator) TriggerCompaction() error {
c.compactingMu.Lock()
defer c.compactingMu.Unlock()
return c.runCompactionCycle()
}
// CompactRange triggers compaction on a specific key range
func (c *DefaultCompactionCoordinator) CompactRange(minKey, maxKey []byte) error {
c.compactingMu.Lock()
defer c.compactingMu.Unlock()
// Load current SSTable information
if err := c.strategy.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
// Delegate to the strategy for actual compaction
return c.strategy.CompactRange(minKey, maxKey)
}
// GetCompactionStats returns statistics about the compaction state
func (c *DefaultCompactionCoordinator) GetCompactionStats() map[string]interface{} {
c.resultsMu.RLock()
defer c.resultsMu.RUnlock()
stats := make(map[string]interface{})
// Include info about last compaction
stats["last_outputs_count"] = len(c.lastCompactionOutputs)
// If there are recent compaction outputs, include information
if len(c.lastCompactionOutputs) > 0 {
stats["last_outputs"] = c.lastCompactionOutputs
}
return stats
}

177
pkg/compaction/executor.go Normal file
View File

@ -0,0 +1,177 @@
package compaction
import (
"bytes"
"fmt"
"os"
"time"
"git.canoozie.net/jer/go-storage/pkg/common/iterator"
"git.canoozie.net/jer/go-storage/pkg/common/iterator/composite"
"git.canoozie.net/jer/go-storage/pkg/config"
"git.canoozie.net/jer/go-storage/pkg/sstable"
)
// DefaultCompactionExecutor handles the actual compaction process
type DefaultCompactionExecutor struct {
// Configuration
cfg *config.Config
// SSTable directory
sstableDir string
// Tombstone manager for tracking deletions
tombstoneManager TombstoneManager
}
// NewCompactionExecutor creates a new compaction executor
func NewCompactionExecutor(cfg *config.Config, sstableDir string, tombstoneManager TombstoneManager) *DefaultCompactionExecutor {
return &DefaultCompactionExecutor{
cfg: cfg,
sstableDir: sstableDir,
tombstoneManager: tombstoneManager,
}
}
// CompactFiles performs the actual compaction of the input files
func (e *DefaultCompactionExecutor) CompactFiles(task *CompactionTask) ([]string, error) {
// Create a merged iterator over all input files
var iterators []iterator.Iterator
// Add iterators from both levels
for level := 0; level <= task.TargetLevel; level++ {
for _, file := range task.InputFiles[level] {
// We need an iterator that preserves delete markers
if file.Reader != nil {
iterators = append(iterators, file.Reader.NewIterator())
}
}
}
// Create hierarchical merged iterator
mergedIter := composite.NewHierarchicalIterator(iterators)
// Track keys to skip duplicate entries (for tombstones)
var lastKey []byte
var outputFiles []string
var currentWriter *sstable.Writer
var currentOutputPath string
var outputFileSequence uint64 = 1
var entriesInCurrentFile int
// Function to create a new output file
createNewOutputFile := func() error {
if currentWriter != nil {
if err := currentWriter.Finish(); err != nil {
return fmt.Errorf("failed to finish SSTable: %w", err)
}
outputFiles = append(outputFiles, currentOutputPath)
}
// Create a new output file
timestamp := time.Now().UnixNano()
currentOutputPath = fmt.Sprintf(task.OutputPathTemplate,
task.TargetLevel, outputFileSequence, timestamp)
outputFileSequence++
var err error
currentWriter, err = sstable.NewWriter(currentOutputPath)
if err != nil {
return fmt.Errorf("failed to create SSTable writer: %w", err)
}
entriesInCurrentFile = 0
return nil
}
// Create a tombstone filter if we have a tombstone manager
var tombstoneFilter *BasicTombstoneFilter
if e.tombstoneManager != nil {
tombstoneFilter = NewBasicTombstoneFilter(
task.TargetLevel,
e.cfg.MaxLevelWithTombstones,
e.tombstoneManager,
)
}
// Create the first output file
if err := createNewOutputFile(); err != nil {
return nil, err
}
// Iterate through all keys in sorted order
mergedIter.SeekToFirst()
for mergedIter.Valid() {
key := mergedIter.Key()
value := mergedIter.Value()
// Skip duplicates (we've already included the newest version)
if lastKey != nil && bytes.Equal(key, lastKey) {
mergedIter.Next()
continue
}
// Determine if we should keep this entry
// If we have a tombstone filter, use it, otherwise use the default logic
var shouldKeep bool
isTombstone := mergedIter.IsTombstone()
if tombstoneFilter != nil && isTombstone {
// Use the tombstone filter for tombstones
shouldKeep = tombstoneFilter.ShouldKeep(key, nil)
} else {
// Default logic - always keep non-tombstones, and keep tombstones in lower levels
shouldKeep = !isTombstone || task.TargetLevel <= e.cfg.MaxLevelWithTombstones
}
if shouldKeep {
var err error
// Use the explicit AddTombstone method if this is a tombstone
if isTombstone {
err = currentWriter.AddTombstone(key)
} else {
err = currentWriter.Add(key, value)
}
if err != nil {
return nil, fmt.Errorf("failed to add entry to SSTable: %w", err)
}
entriesInCurrentFile++
}
// If the current file is big enough, start a new one
if int64(entriesInCurrentFile) >= e.cfg.SSTableMaxSize {
if err := createNewOutputFile(); err != nil {
return nil, err
}
}
// Remember this key to skip duplicates
lastKey = append(lastKey[:0], key...)
mergedIter.Next()
}
// Finish the last output file
if currentWriter != nil && entriesInCurrentFile > 0 {
if err := currentWriter.Finish(); err != nil {
return nil, fmt.Errorf("failed to finish SSTable: %w", err)
}
outputFiles = append(outputFiles, currentOutputPath)
} else if currentWriter != nil {
// No entries were written, abort the file
currentWriter.Abort()
}
return outputFiles, nil
}
// DeleteCompactedFiles removes the input files that were successfully compacted
func (e *DefaultCompactionExecutor) DeleteCompactedFiles(filePaths []string) error {
for _, path := range filePaths {
if err := os.Remove(path); err != nil {
return fmt.Errorf("failed to delete compacted file %s: %w", path, err)
}
}
return nil
}

View File

@ -0,0 +1,95 @@
package compaction
import (
"fmt"
"os"
"sync"
)
// DefaultFileTracker is the default implementation of FileTracker
type DefaultFileTracker struct {
// Map of file path -> true for files that have been obsoleted by compaction
obsoleteFiles map[string]bool
// Map of file path -> true for files that are currently being compacted
pendingFiles map[string]bool
// Mutex for file tracking maps
filesMu sync.RWMutex
}
// NewFileTracker creates a new file tracker
func NewFileTracker() *DefaultFileTracker {
return &DefaultFileTracker{
obsoleteFiles: make(map[string]bool),
pendingFiles: make(map[string]bool),
}
}
// MarkFileObsolete marks a file as obsolete (can be deleted)
func (f *DefaultFileTracker) MarkFileObsolete(path string) {
f.filesMu.Lock()
defer f.filesMu.Unlock()
f.obsoleteFiles[path] = true
}
// MarkFilePending marks a file as being used in a compaction
func (f *DefaultFileTracker) MarkFilePending(path string) {
f.filesMu.Lock()
defer f.filesMu.Unlock()
f.pendingFiles[path] = true
}
// UnmarkFilePending removes the pending mark from a file
func (f *DefaultFileTracker) UnmarkFilePending(path string) {
f.filesMu.Lock()
defer f.filesMu.Unlock()
delete(f.pendingFiles, path)
}
// IsFileObsolete checks if a file is marked as obsolete
func (f *DefaultFileTracker) IsFileObsolete(path string) bool {
f.filesMu.RLock()
defer f.filesMu.RUnlock()
return f.obsoleteFiles[path]
}
// IsFilePending checks if a file is marked as pending compaction
func (f *DefaultFileTracker) IsFilePending(path string) bool {
f.filesMu.RLock()
defer f.filesMu.RUnlock()
return f.pendingFiles[path]
}
// CleanupObsoleteFiles removes files that are no longer needed
func (f *DefaultFileTracker) CleanupObsoleteFiles() error {
f.filesMu.Lock()
defer f.filesMu.Unlock()
// Safely remove obsolete files that aren't pending
for path := range f.obsoleteFiles {
// Skip files that are still being used in a compaction
if f.pendingFiles[path] {
continue
}
// Try to delete the file
if err := os.Remove(path); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("failed to delete obsolete file %s: %w", path, err)
}
// If the file doesn't exist, remove it from our tracking
delete(f.obsoleteFiles, path)
} else {
// Successfully deleted, remove from tracking
delete(f.obsoleteFiles, path)
}
}
return nil
}

View File

@ -0,0 +1,82 @@
package compaction
// CompactionStrategy defines the interface for selecting files for compaction
type CompactionStrategy interface {
// SelectCompaction selects files for compaction and returns a CompactionTask
SelectCompaction() (*CompactionTask, error)
// CompactRange selects files within a key range for compaction
CompactRange(minKey, maxKey []byte) error
// LoadSSTables reloads SSTable information from disk
LoadSSTables() error
// Close closes any resources held by the strategy
Close() error
}
// CompactionExecutor defines the interface for executing compaction tasks
type CompactionExecutor interface {
// CompactFiles performs the actual compaction of the input files
CompactFiles(task *CompactionTask) ([]string, error)
// DeleteCompactedFiles removes the input files that were successfully compacted
DeleteCompactedFiles(filePaths []string) error
}
// FileTracker defines the interface for tracking file states during compaction
type FileTracker interface {
// MarkFileObsolete marks a file as obsolete (can be deleted)
MarkFileObsolete(path string)
// MarkFilePending marks a file as being used in a compaction
MarkFilePending(path string)
// UnmarkFilePending removes the pending mark from a file
UnmarkFilePending(path string)
// IsFileObsolete checks if a file is marked as obsolete
IsFileObsolete(path string) bool
// IsFilePending checks if a file is marked as pending compaction
IsFilePending(path string) bool
// CleanupObsoleteFiles removes files that are no longer needed
CleanupObsoleteFiles() error
}
// TombstoneManager defines the interface for tracking and managing tombstones
type TombstoneManager interface {
// AddTombstone records a key deletion
AddTombstone(key []byte)
// ForcePreserveTombstone marks a tombstone to be preserved indefinitely
ForcePreserveTombstone(key []byte)
// ShouldKeepTombstone checks if a tombstone should be preserved during compaction
ShouldKeepTombstone(key []byte) bool
// CollectGarbage removes expired tombstone records
CollectGarbage()
}
// CompactionCoordinator defines the interface for coordinating compaction processes
type CompactionCoordinator interface {
// Start begins background compaction
Start() error
// Stop halts background compaction
Stop() error
// TriggerCompaction forces a compaction cycle
TriggerCompaction() error
// CompactRange triggers compaction on a specific key range
CompactRange(minKey, maxKey []byte) error
// TrackTombstone adds a key to the tombstone tracker
TrackTombstone(key []byte)
// GetCompactionStats returns statistics about the compaction state
GetCompactionStats() map[string]interface{}
}

View File

@ -1,413 +0,0 @@
package compaction
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.canoozie.net/jer/go-storage/pkg/config"
)
// CompactionManager coordinates the compaction process
type CompactionManager struct {
// Configuration
cfg *config.Config
// SSTable directory
sstableDir string
// Compactor implementation
compactor *TieredCompactor
// Tombstone tracker
tombstones *TombstoneTracker
// Next sequence number for SSTable files
nextSeq uint64
// Compaction state
running bool
stopCh chan struct{}
compactingMu sync.Mutex
// File tracking
// Map of file path -> true for files that have been obsoleted by compaction
obsoleteFiles map[string]bool
// Map of file path -> true for files that are currently being compacted
pendingFiles map[string]bool
// Last set of files produced by compaction
lastCompactionOutputs []string
filesMu sync.RWMutex
}
// NewCompactionManager creates a new compaction manager
func NewCompactionManager(cfg *config.Config, sstableDir string) *CompactionManager {
// Create tombstone tracker first
tombstones := NewTombstoneTracker(24 * time.Hour) // Default 24-hour retention
return &CompactionManager{
cfg: cfg,
sstableDir: sstableDir,
compactor: NewTieredCompactor(cfg, sstableDir, tombstones),
tombstones: tombstones,
nextSeq: 1,
stopCh: make(chan struct{}),
obsoleteFiles: make(map[string]bool),
pendingFiles: make(map[string]bool),
lastCompactionOutputs: make([]string, 0),
}
}
// Start begins background compaction
func (cm *CompactionManager) Start() error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
if cm.running {
return nil // Already running
}
// Load existing SSTables
if err := cm.compactor.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
cm.running = true
cm.stopCh = make(chan struct{})
// Start background worker
go cm.compactionWorker()
return nil
}
// Stop halts background compaction
func (cm *CompactionManager) Stop() error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
if !cm.running {
return nil // Already stopped
}
// Signal the worker to stop
close(cm.stopCh)
cm.running = false
// Close compactor
return cm.compactor.Close()
}
// MarkFileObsolete marks a file as obsolete
func (cm *CompactionManager) MarkFileObsolete(path string) {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
cm.obsoleteFiles[path] = true
}
// MarkFilePending marks a file as being used in a compaction
func (cm *CompactionManager) MarkFilePending(path string) {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
cm.pendingFiles[path] = true
}
// UnmarkFilePending removes the pending mark
func (cm *CompactionManager) UnmarkFilePending(path string) {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
delete(cm.pendingFiles, path)
}
// IsFileObsolete checks if a file is marked as obsolete
func (cm *CompactionManager) IsFileObsolete(path string) bool {
cm.filesMu.RLock()
defer cm.filesMu.RUnlock()
return cm.obsoleteFiles[path]
}
// IsFilePending checks if a file is pending compaction
func (cm *CompactionManager) IsFilePending(path string) bool {
cm.filesMu.RLock()
defer cm.filesMu.RUnlock()
return cm.pendingFiles[path]
}
// CleanupObsoleteFiles removes files that are no longer needed
func (cm *CompactionManager) CleanupObsoleteFiles() error {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
// Safely remove obsolete files that aren't pending
for path := range cm.obsoleteFiles {
// Skip files that are still being used in a compaction
if cm.pendingFiles[path] {
continue
}
// Try to delete the file
if err := os.Remove(path); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("failed to delete obsolete file %s: %w", path, err)
}
// If the file doesn't exist, remove it from our tracking
delete(cm.obsoleteFiles, path)
} else {
// Successfully deleted, remove from tracking
delete(cm.obsoleteFiles, path)
}
}
return nil
}
// compactionWorker runs the compaction loop
func (cm *CompactionManager) compactionWorker() {
// Ensure a minimum interval of 1 second
interval := cm.cfg.CompactionInterval
if interval <= 0 {
interval = 1
}
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-cm.stopCh:
return
case <-ticker.C:
// Only one compaction at a time
cm.compactingMu.Lock()
// Run a compaction cycle
err := cm.runCompactionCycle()
if err != nil {
// In a real system, we'd log this error
// fmt.Printf("Compaction error: %v\n", err)
}
// Try to clean up obsolete files
err = cm.CleanupObsoleteFiles()
if err != nil {
// In a real system, we'd log this error
// fmt.Printf("Cleanup error: %v\n", err)
}
// Collect tombstone garbage periodically
cm.tombstones.CollectGarbage()
cm.compactingMu.Unlock()
}
}
}
// runCompactionCycle performs a single compaction cycle
func (cm *CompactionManager) runCompactionCycle() error {
// Reload SSTables to get fresh information
if err := cm.compactor.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
// Select files for compaction
task, err := cm.compactor.SelectCompaction()
if err != nil {
return fmt.Errorf("failed to select files for compaction: %w", err)
}
// If no compaction needed, return
if task == nil {
return nil
}
// Mark files as pending
for _, files := range task.InputFiles {
for _, file := range files {
cm.MarkFilePending(file.Path)
}
}
// Perform compaction
outputFiles, err := cm.compactor.CompactFiles(task)
// Unmark files as pending
for _, files := range task.InputFiles {
for _, file := range files {
cm.UnmarkFilePending(file.Path)
}
}
// Track the compaction outputs for statistics
if err == nil && len(outputFiles) > 0 {
// Record the compaction result
cm.filesMu.Lock()
cm.lastCompactionOutputs = outputFiles
cm.filesMu.Unlock()
}
// Handle compaction errors
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// Mark input files as obsolete
for _, files := range task.InputFiles {
for _, file := range files {
cm.MarkFileObsolete(file.Path)
}
}
// Try to clean up the files immediately
return cm.CleanupObsoleteFiles()
}
// TriggerCompaction forces a compaction cycle
func (cm *CompactionManager) TriggerCompaction() error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
return cm.runCompactionCycle()
}
// CompactRange triggers compaction on a specific key range
func (cm *CompactionManager) CompactRange(minKey, maxKey []byte) error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
// Load current SSTable information
if err := cm.compactor.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
// Find files that overlap with the target range
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
var filesToCompact []*SSTableInfo
for level, files := range cm.compactor.levels {
for _, file := range files {
if file.Overlaps(rangeInfo) {
// Add level information to the file
file.Level = level
filesToCompact = append(filesToCompact, file)
// Mark as pending
cm.MarkFilePending(file.Path)
}
}
}
// If no files to compact, we're done
if len(filesToCompact) == 0 {
return nil
}
// Determine the target level - use the highest level found + 1
targetLevel := 0
for _, file := range filesToCompact {
if file.Level > targetLevel {
targetLevel = file.Level
}
}
targetLevel++ // Compact to next level
// Create the compaction task
task := &CompactionTask{
InputFiles: make(map[int][]*SSTableInfo),
TargetLevel: targetLevel,
OutputPathTemplate: filepath.Join(cm.sstableDir, "%d_%06d_%020d.sst"),
}
// Group files by level
for _, file := range filesToCompact {
task.InputFiles[file.Level] = append(task.InputFiles[file.Level], file)
}
// Perform the compaction
outputFiles, err := cm.compactor.CompactFiles(task)
// Unmark files as pending
for _, file := range filesToCompact {
cm.UnmarkFilePending(file.Path)
}
// Track the compaction outputs for statistics
if err == nil && len(outputFiles) > 0 {
cm.filesMu.Lock()
cm.lastCompactionOutputs = outputFiles
cm.filesMu.Unlock()
}
// Handle errors
if err != nil {
return fmt.Errorf("failed to compact range: %w", err)
}
// Mark input files as obsolete
for _, file := range filesToCompact {
cm.MarkFileObsolete(file.Path)
}
// Try to clean up immediately
return cm.CleanupObsoleteFiles()
}
// TrackTombstone adds a key to the tombstone tracker
func (cm *CompactionManager) TrackTombstone(key []byte) {
// Track the tombstone in our tracker
if cm.tombstones != nil {
cm.tombstones.AddTombstone(key)
}
}
// ForcePreserveTombstone marks a tombstone for special handling during compaction
// This is primarily for testing purposes, to ensure specific tombstones are preserved
func (cm *CompactionManager) ForcePreserveTombstone(key []byte) {
if cm.tombstones != nil {
// Add with "force preserve" flag set to true
cm.tombstones.ForcePreserveTombstone(key)
}
}
// GetCompactionStats returns statistics about the compaction state
func (cm *CompactionManager) GetCompactionStats() map[string]interface{} {
cm.filesMu.RLock()
defer cm.filesMu.RUnlock()
stats := make(map[string]interface{})
// Count files by level
levelCounts := make(map[int]int)
levelSizes := make(map[int]int64)
for level, files := range cm.compactor.levels {
levelCounts[level] = len(files)
var totalSize int64
for _, file := range files {
totalSize += file.Size
}
levelSizes[level] = totalSize
}
stats["level_counts"] = levelCounts
stats["level_sizes"] = levelSizes
stats["obsolete_files"] = len(cm.obsoleteFiles)
stats["pending_files"] = len(cm.pendingFiles)
stats["last_outputs_count"] = len(cm.lastCompactionOutputs)
// If there are recent compaction outputs, include information
if len(cm.lastCompactionOutputs) > 0 {
stats["last_outputs"] = cm.lastCompactionOutputs
}
return stats
}

View File

@ -1,397 +0,0 @@
package compaction
import (
"bytes"
"fmt"
"path/filepath"
"sort"
"time"
"git.canoozie.net/jer/go-storage/pkg/common/iterator"
"git.canoozie.net/jer/go-storage/pkg/common/iterator/composite"
"git.canoozie.net/jer/go-storage/pkg/config"
"git.canoozie.net/jer/go-storage/pkg/sstable"
)
// TieredCompactor implements a tiered compaction strategy
type TieredCompactor struct {
*Compactor
// Next file sequence number
nextFileSeq uint64
}
// NewTieredCompactor creates a new tiered compaction manager
func NewTieredCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *TieredCompactor {
return &TieredCompactor{
Compactor: NewCompactor(cfg, sstableDir, tracker),
nextFileSeq: 1,
}
}
// SelectCompaction selects files for tiered compaction
func (tc *TieredCompactor) SelectCompaction() (*CompactionTask, error) {
// Reload SSTable information
if err := tc.LoadSSTables(); err != nil {
return nil, fmt.Errorf("failed to load SSTables: %w", err)
}
// Determine the maximum level
maxLevel := 0
for level := range tc.levels {
if level > maxLevel {
maxLevel = level
}
}
// Check L0 first (special case due to potential overlaps)
if len(tc.levels[0]) >= tc.cfg.MaxMemTables {
return tc.selectL0Compaction()
}
// Check size-based conditions for other levels
for level := 0; level < maxLevel; level++ {
// If this level is too large compared to the next level
thisLevelSize := tc.GetLevelSize(level)
nextLevelSize := tc.GetLevelSize(level + 1)
// If level is empty, skip it
if thisLevelSize == 0 {
continue
}
// If next level is empty, promote a file
if nextLevelSize == 0 && len(tc.levels[level]) > 0 {
return tc.selectPromotionCompaction(level)
}
// Check size ratio
sizeRatio := float64(thisLevelSize) / float64(nextLevelSize)
if sizeRatio >= tc.cfg.CompactionRatio {
return tc.selectOverlappingCompaction(level)
}
}
// No compaction needed
return nil, nil
}
// selectL0Compaction selects files from L0 for compaction
func (tc *TieredCompactor) selectL0Compaction() (*CompactionTask, error) {
// Require at least some files in L0
if len(tc.levels[0]) < 2 {
return nil, nil
}
// Sort L0 files by sequence number to prioritize older files
files := make([]*SSTableInfo, len(tc.levels[0]))
copy(files, tc.levels[0])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Take up to maxCompactFiles from L0
maxCompactFiles := tc.cfg.MaxMemTables
if maxCompactFiles > len(files) {
maxCompactFiles = len(files)
}
selectedFiles := files[:maxCompactFiles]
// Determine the key range covered by selected files
var minKey, maxKey []byte
for _, file := range selectedFiles {
if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 {
minKey = file.FirstKey
}
if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 {
maxKey = file.LastKey
}
}
// Find overlapping files in L1
var l1Files []*SSTableInfo
for _, file := range tc.levels[1] {
// Create a temporary SSTableInfo with the key range
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
if file.Overlaps(rangeInfo) {
l1Files = append(l1Files, file)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
0: selectedFiles,
1: l1Files,
},
TargetLevel: 1,
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectPromotionCompaction selects a file to promote to the next level
func (tc *TieredCompactor) selectPromotionCompaction(level int) (*CompactionTask, error) {
// Sort files by sequence number
files := make([]*SSTableInfo, len(tc.levels[level]))
copy(files, tc.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select the oldest file
file := files[0]
// Create task to promote this file to the next level
// No need to merge with any other files since the next level is empty
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectOverlappingCompaction selects files for compaction based on key overlap
func (tc *TieredCompactor) selectOverlappingCompaction(level int) (*CompactionTask, error) {
// Sort files by sequence number to start with oldest
files := make([]*SSTableInfo, len(tc.levels[level]))
copy(files, tc.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select an initial file from this level
file := files[0]
// Find all overlapping files in the next level
var nextLevelFiles []*SSTableInfo
for _, nextFile := range tc.levels[level+1] {
if file.Overlaps(nextFile) {
nextLevelFiles = append(nextLevelFiles, nextFile)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
level + 1: nextLevelFiles,
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// Compact performs compaction of the selected files
func (tc *TieredCompactor) Compact(task *CompactionTask) error {
if task == nil {
return nil // Nothing to compact
}
// Perform the compaction
_, err := tc.CompactFiles(task)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// Gather all input file paths for cleanup
var inputPaths []string
for _, files := range task.InputFiles {
for _, file := range files {
inputPaths = append(inputPaths, file.Path)
}
}
// Delete the original files that were compacted
if err := tc.DeleteCompactedFiles(inputPaths); err != nil {
return fmt.Errorf("failed to clean up compacted files: %w", err)
}
// Reload SSTables to refresh our file list
if err := tc.LoadSSTables(); err != nil {
return fmt.Errorf("failed to reload SSTables: %w", err)
}
return nil
}
// CompactRange performs compaction on a specific key range
func (tc *TieredCompactor) CompactRange(minKey, maxKey []byte) error {
// Create a range info to check for overlaps
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
// Find files overlapping with the given range in each level
task := &CompactionTask{
InputFiles: make(map[int][]*SSTableInfo),
TargetLevel: 0, // Will be updated
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
// Get the maximum level
var maxLevel int
for level := range tc.levels {
if level > maxLevel {
maxLevel = level
}
}
// Find overlapping files in each level
for level := 0; level <= maxLevel; level++ {
var overlappingFiles []*SSTableInfo
for _, file := range tc.levels[level] {
if file.Overlaps(rangeInfo) {
overlappingFiles = append(overlappingFiles, file)
}
}
if len(overlappingFiles) > 0 {
task.InputFiles[level] = overlappingFiles
}
}
// If no files overlap with the range, no compaction needed
totalInputFiles := 0
for _, files := range task.InputFiles {
totalInputFiles += len(files)
}
if totalInputFiles == 0 {
return nil
}
// Set target level to the maximum level + 1
task.TargetLevel = maxLevel + 1
// Perform the compaction
return tc.Compact(task)
}
// RunCompaction performs a full compaction cycle
func (tc *TieredCompactor) RunCompaction() error {
// Select files for compaction
task, err := tc.SelectCompaction()
if err != nil {
return fmt.Errorf("failed to select files for compaction: %w", err)
}
// If no compaction needed, return
if task == nil {
return nil
}
// Perform the compaction
return tc.Compact(task)
}
// MergeCompact merges iterators from multiple SSTables and writes to new files
func (tc *TieredCompactor) MergeCompact(readers []*sstable.Reader, targetLevel int) ([]string, error) {
// Create iterators for all input files
var iterators []iterator.Iterator
for _, reader := range readers {
iterators = append(iterators, reader.NewIterator())
}
// Create a merged iterator
mergedIter := composite.NewHierarchicalIterator(iterators)
// Create a new output file
timestamp := time.Now().UnixNano()
outputPath := filepath.Join(tc.sstableDir,
fmt.Sprintf("%d_%06d_%020d.sst",
targetLevel, tc.nextFileSeq, timestamp))
tc.nextFileSeq++
writer, err := sstable.NewWriter(outputPath)
if err != nil {
return nil, fmt.Errorf("failed to create SSTable writer: %w", err)
}
// Create a tombstone filter if we have a TombstoneTracker
// This is passed from CompactionManager to Compactor
var tombstoneFilter *BasicTombstoneFilter
if tc.tombstoneTracker != nil {
tombstoneFilter = NewBasicTombstoneFilter(
targetLevel,
tc.cfg.MaxLevelWithTombstones,
tc.tombstoneTracker,
)
}
// Write all entries from the merged iterator
var lastKey []byte
var entriesWritten int
mergedIter.SeekToFirst()
for mergedIter.Valid() {
key := mergedIter.Key()
value := mergedIter.Value()
// Skip duplicates
if lastKey != nil && bytes.Equal(key, lastKey) {
mergedIter.Next()
continue
}
// Check if this is a tombstone entry (using the IsTombstone method)
isTombstone := mergedIter.IsTombstone()
// Determine if we should keep this entry
// If we have a tombstone filter, use it, otherwise use the default logic
var shouldKeep bool
if tombstoneFilter != nil && isTombstone {
// Use the tombstone filter for tombstones
shouldKeep = tombstoneFilter.ShouldKeep(key, nil)
} else {
// Default logic - keep regular entries and tombstones in lower levels
shouldKeep = !isTombstone || targetLevel <= tc.cfg.MaxLevelWithTombstones
}
if shouldKeep {
var err error
// Use the explicit AddTombstone method if this is a tombstone
if isTombstone {
err = writer.AddTombstone(key)
} else {
err = writer.Add(key, value)
}
if err != nil {
writer.Abort()
return nil, fmt.Errorf("failed to add entry to SSTable: %w", err)
}
entriesWritten++
}
lastKey = append(lastKey[:0], key...)
mergedIter.Next()
}
// Finish writing
if entriesWritten > 0 {
if err := writer.Finish(); err != nil {
return nil, fmt.Errorf("failed to finish SSTable: %w", err)
}
return []string{outputPath}, nil
}
// No entries written, abort the file
writer.Abort()
return nil, nil
}

View File

@ -0,0 +1,268 @@
package compaction
import (
"bytes"
"fmt"
"path/filepath"
"sort"
"git.canoozie.net/jer/go-storage/pkg/config"
)
// TieredCompactionStrategy implements a tiered compaction strategy
type TieredCompactionStrategy struct {
*BaseCompactionStrategy
// Executor for compacting files
executor CompactionExecutor
// Next file sequence number
nextFileSeq uint64
}
// NewTieredCompactionStrategy creates a new tiered compaction strategy
func NewTieredCompactionStrategy(cfg *config.Config, sstableDir string, executor CompactionExecutor) *TieredCompactionStrategy {
return &TieredCompactionStrategy{
BaseCompactionStrategy: NewBaseCompactionStrategy(cfg, sstableDir),
executor: executor,
nextFileSeq: 1,
}
}
// SelectCompaction selects files for tiered compaction
func (s *TieredCompactionStrategy) SelectCompaction() (*CompactionTask, error) {
// Determine the maximum level
maxLevel := 0
for level := range s.levels {
if level > maxLevel {
maxLevel = level
}
}
// Check L0 first (special case due to potential overlaps)
if len(s.levels[0]) >= s.cfg.MaxMemTables {
return s.selectL0Compaction()
}
// Check size-based conditions for other levels
for level := 0; level < maxLevel; level++ {
// If this level is too large compared to the next level
thisLevelSize := s.GetLevelSize(level)
nextLevelSize := s.GetLevelSize(level + 1)
// If level is empty, skip it
if thisLevelSize == 0 {
continue
}
// If next level is empty, promote a file
if nextLevelSize == 0 && len(s.levels[level]) > 0 {
return s.selectPromotionCompaction(level)
}
// Check size ratio
sizeRatio := float64(thisLevelSize) / float64(nextLevelSize)
if sizeRatio >= s.cfg.CompactionRatio {
return s.selectOverlappingCompaction(level)
}
}
// No compaction needed
return nil, nil
}
// selectL0Compaction selects files from L0 for compaction
func (s *TieredCompactionStrategy) selectL0Compaction() (*CompactionTask, error) {
// Require at least some files in L0
if len(s.levels[0]) < 2 {
return nil, nil
}
// Sort L0 files by sequence number to prioritize older files
files := make([]*SSTableInfo, len(s.levels[0]))
copy(files, s.levels[0])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Take up to maxCompactFiles from L0
maxCompactFiles := s.cfg.MaxMemTables
if maxCompactFiles > len(files) {
maxCompactFiles = len(files)
}
selectedFiles := files[:maxCompactFiles]
// Determine the key range covered by selected files
var minKey, maxKey []byte
for _, file := range selectedFiles {
if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 {
minKey = file.FirstKey
}
if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 {
maxKey = file.LastKey
}
}
// Find overlapping files in L1
var l1Files []*SSTableInfo
for _, file := range s.levels[1] {
// Create a temporary SSTableInfo with the key range
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
if file.Overlaps(rangeInfo) {
l1Files = append(l1Files, file)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
0: selectedFiles,
1: l1Files,
},
TargetLevel: 1,
OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectPromotionCompaction selects a file to promote to the next level
func (s *TieredCompactionStrategy) selectPromotionCompaction(level int) (*CompactionTask, error) {
// Sort files by sequence number
files := make([]*SSTableInfo, len(s.levels[level]))
copy(files, s.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select the oldest file
file := files[0]
// Create task to promote this file to the next level
// No need to merge with any other files since the next level is empty
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectOverlappingCompaction selects files for compaction based on key overlap
func (s *TieredCompactionStrategy) selectOverlappingCompaction(level int) (*CompactionTask, error) {
// Sort files by sequence number to start with oldest
files := make([]*SSTableInfo, len(s.levels[level]))
copy(files, s.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select an initial file from this level
file := files[0]
// Find all overlapping files in the next level
var nextLevelFiles []*SSTableInfo
for _, nextFile := range s.levels[level+1] {
if file.Overlaps(nextFile) {
nextLevelFiles = append(nextLevelFiles, nextFile)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
level + 1: nextLevelFiles,
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// CompactRange performs compaction on a specific key range
func (s *TieredCompactionStrategy) CompactRange(minKey, maxKey []byte) error {
// Create a range info to check for overlaps
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
// Find files overlapping with the given range in each level
task := &CompactionTask{
InputFiles: make(map[int][]*SSTableInfo),
TargetLevel: 0, // Will be updated
OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"),
}
// Get the maximum level
var maxLevel int
for level := range s.levels {
if level > maxLevel {
maxLevel = level
}
}
// Find overlapping files in each level
for level := 0; level <= maxLevel; level++ {
var overlappingFiles []*SSTableInfo
for _, file := range s.levels[level] {
if file.Overlaps(rangeInfo) {
overlappingFiles = append(overlappingFiles, file)
}
}
if len(overlappingFiles) > 0 {
task.InputFiles[level] = overlappingFiles
}
}
// If no files overlap with the range, no compaction needed
totalInputFiles := 0
for _, files := range task.InputFiles {
totalInputFiles += len(files)
}
if totalInputFiles == 0 {
return nil
}
// Set target level to the maximum level + 1
task.TargetLevel = maxLevel + 1
// Perform the compaction
_, err := s.executor.CompactFiles(task)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// Gather all input file paths for cleanup
var inputPaths []string
for _, files := range task.InputFiles {
for _, file := range files {
inputPaths = append(inputPaths, file.Path)
}
}
// Delete the original files that were compacted
if err := s.executor.DeleteCompactedFiles(inputPaths); err != nil {
return fmt.Errorf("failed to clean up compacted files: %w", err)
}
// Reload SSTables to refresh our file list
if err := s.LoadSSTables(); err != nil {
return fmt.Errorf("failed to reload SSTables: %w", err)
}
return nil
}

View File

@ -5,7 +5,7 @@ import (
"time"
)
// TombstoneTracker tracks tombstones to support garbage collection
// TombstoneTracker implements the TombstoneManager interface
type TombstoneTracker struct {
// Map of deleted keys with deletion timestamp
deletions map[string]time.Time
@ -83,11 +83,11 @@ type BasicTombstoneFilter struct {
maxTombstoneLevel int
// The tombstone tracker (if any)
tracker *TombstoneTracker
tracker TombstoneManager
}
// NewBasicTombstoneFilter creates a new tombstone filter
func NewBasicTombstoneFilter(level, maxTombstoneLevel int, tracker *TombstoneTracker) *BasicTombstoneFilter {
func NewBasicTombstoneFilter(level, maxTombstoneLevel int, tracker TombstoneManager) *BasicTombstoneFilter {
return &BasicTombstoneFilter{
level: level,
maxTombstoneLevel: maxTombstoneLevel,

View File

@ -35,6 +35,7 @@ type Config struct {
WALDir string `json:"wal_dir"`
WALSyncMode SyncMode `json:"wal_sync_mode"`
WALSyncBytes int64 `json:"wal_sync_bytes"`
WALMaxSize int64 `json:"wal_max_size"`
// MemTable configuration
MemTableSize int64 `json:"memtable_size"`

View File

@ -23,6 +23,8 @@ const (
sstableFilenameFormat = "%d_%06d_%020d.sst"
)
// This has been moved to the wal package
var (
// ErrEngineClosed is returned when operations are performed on a closed engine
ErrEngineClosed = errors.New("engine is closed")
@ -132,10 +134,28 @@ func NewEngine(dataDir string) (*Engine, error) {
return nil, fmt.Errorf("failed to create wal directory: %w", err)
}
// Create the WAL
wal, err := wal.NewWAL(cfg, walDir)
// During tests, disable logs to avoid interfering with example tests
tempWasDisabled := wal.DisableRecoveryLogs
if os.Getenv("GO_TEST") == "1" {
wal.DisableRecoveryLogs = true
defer func() { wal.DisableRecoveryLogs = tempWasDisabled }()
}
// First try to reuse an existing WAL file
var walLogger *wal.WAL
// We'll start with sequence 1, but this will be updated during recovery
walLogger, err = wal.ReuseWAL(cfg, walDir, 1)
if err != nil {
return nil, fmt.Errorf("failed to create WAL: %w", err)
return nil, fmt.Errorf("failed to check for reusable WAL: %w", err)
}
// If no suitable WAL found, create a new one
if walLogger == nil {
walLogger, err = wal.NewWAL(cfg, walDir)
if err != nil {
return nil, fmt.Errorf("failed to create WAL: %w", err)
}
}
// Create the MemTable pool
@ -146,7 +166,7 @@ func NewEngine(dataDir string) (*Engine, error) {
dataDir: dataDir,
sstableDir: sstableDir,
walDir: walDir,
wal: wal,
wal: walLogger,
memTablePool: memTablePool,
immutableMTs: make([]*memtable.MemTable, 0),
sstables: make([]*sstable.Reader, 0),
@ -159,6 +179,11 @@ func NewEngine(dataDir string) (*Engine, error) {
return nil, fmt.Errorf("failed to load SSTables: %w", err)
}
// Recover from WAL if any exist
if err := e.recoverFromWAL(); err != nil {
return nil, fmt.Errorf("failed to recover from WAL: %w", err)
}
// Start background flush goroutine
go e.backgroundFlush()
@ -625,6 +650,104 @@ func (e *Engine) loadSSTables() error {
return nil
}
// recoverFromWAL recovers memtables from existing WAL files
func (e *Engine) recoverFromWAL() error {
// Check if WAL directory exists
if _, err := os.Stat(e.walDir); os.IsNotExist(err) {
return nil // No WAL directory, nothing to recover
}
// List all WAL files for diagnostic purposes
walFiles, err := wal.FindWALFiles(e.walDir)
if err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Error listing WAL files: %v\n", err)
}
} else {
if !wal.DisableRecoveryLogs {
fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles)
}
}
// Get recovery options
recoveryOpts := memtable.DefaultRecoveryOptions(e.cfg)
// Recover memtables from WAL
memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts)
if err != nil {
// If recovery fails, let's try cleaning up WAL files
if !wal.DisableRecoveryLogs {
fmt.Printf("WAL recovery failed: %v\n", err)
fmt.Printf("Attempting to recover by cleaning up WAL files...\n")
}
// Create a backup directory
backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405"))
if err := os.MkdirAll(backupDir, 0755); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to create backup directory: %v\n", err)
}
return fmt.Errorf("failed to recover from WAL: %w", err)
}
// Move problematic WAL files to backup
for _, walFile := range walFiles {
destFile := filepath.Join(backupDir, filepath.Base(walFile))
if err := os.Rename(walFile, destFile); err != nil {
if !wal.DisableRecoveryLogs {
fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err)
}
} else if !wal.DisableRecoveryLogs {
fmt.Printf("Moved problematic WAL file to %s\n", destFile)
}
}
// Create a fresh WAL
newWal, err := wal.NewWAL(e.cfg, e.walDir)
if err != nil {
return fmt.Errorf("failed to create new WAL after recovery: %w", err)
}
e.wal = newWal
// No memtables to recover, starting fresh
if !wal.DisableRecoveryLogs {
fmt.Printf("Starting with a fresh WAL after recovery failure\n")
}
return nil
}
// No memtables recovered or empty WAL
if len(memTables) == 0 {
return nil
}
// Update sequence numbers
e.lastSeqNum = maxSeqNum
// Update WAL sequence number to continue from where we left off
if maxSeqNum > 0 {
e.wal.UpdateNextSequence(maxSeqNum + 1)
}
// Add recovered memtables to the pool
for i, memTable := range memTables {
if i == len(memTables)-1 {
// The last memtable becomes the active one
e.memTablePool.SetActiveMemTable(memTable)
} else {
// Previous memtables become immutable
memTable.SetImmutable()
e.immutableMTs = append(e.immutableMTs, memTable)
}
}
if !wal.DisableRecoveryLogs {
fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n",
len(memTables), maxSeqNum)
}
return nil
}
// GetRWLock returns the transaction lock for this engine
func (e *Engine) GetRWLock() *sync.RWMutex {
return &e.txLock

View File

@ -383,6 +383,25 @@ func newHierarchicalIterator(e *Engine) *boundedIterator {
iters = append(iters, sstable.NewIteratorAdapter(e.sstables[i].NewIterator()))
}
// Create sources list for all iterators
sources := make([]IterSource, 0, len(memTables)+len(e.sstables))
// Add sources for memtables
for i, table := range memTables {
sources = append(sources, &MemTableSource{
mem: table,
level: i, // Assign level numbers starting from 0 (active memtable is newest)
})
}
// Add sources for SSTables
for i := len(e.sstables) - 1; i >= 0; i-- {
sources = append(sources, &SSTableSource{
sst: e.sstables[i],
level: len(memTables) + (len(e.sstables) - 1 - i), // Continue level numbering after memtables
})
}
// Wrap in a bounded iterator (unbounded by default)
// If we have no iterators, use an empty one
var baseIter iterator.Iterator
@ -391,8 +410,11 @@ func newHierarchicalIterator(e *Engine) *boundedIterator {
} else if len(iters) == 1 {
baseIter = iters[0]
} else {
// Create a simple chained iterator for now that checks each source in order
baseIter = &chainedIterator{iterators: iters}
// Create a chained iterator that checks each source in order and handles duplicates
baseIter = &chainedIterator{
iterators: iters,
sources: sources,
}
}
return &boundedIterator{
@ -404,6 +426,7 @@ func newHierarchicalIterator(e *Engine) *boundedIterator {
// chainedIterator is a simple iterator that checks multiple sources in order
type chainedIterator struct {
iterators []iterator.Iterator
sources []IterSource // Corresponding sources for each iterator
current int
}
@ -417,18 +440,41 @@ func (c *chainedIterator) SeekToFirst() {
iter.SeekToFirst()
}
// Find the first valid iterator with the smallest key
c.current = -1
var smallestKey []byte
// Maps to track the best (newest) source for each key
keyToSource := make(map[string]int) // Key -> best source index
keyToLevel := make(map[string]int) // Key -> best source level (lower is better)
keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering)
// First pass: Find the best source for each key
for i, iter := range c.iterators {
if !iter.Valid() {
continue
}
if c.current == -1 || bytes.Compare(iter.Key(), smallestKey) < 0 {
c.current = i
smallestKey = iter.Key()
// Use string key for map
keyStr := string(iter.Key())
keyBytes := iter.Key()
level := c.sources[i].GetLevel()
// If we haven't seen this key yet, or this source is newer
bestLevel, seen := keyToLevel[keyStr]
if !seen || level < bestLevel {
keyToSource[keyStr] = i
keyToLevel[keyStr] = level
keyToPos[keyStr] = keyBytes
}
}
// Find the smallest key in our deduplicated set
c.current = -1
var smallestKey []byte
for keyStr, sourceIdx := range keyToSource {
keyBytes := keyToPos[keyStr]
if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 {
c.current = sourceIdx
smallestKey = keyBytes
}
}
}
@ -469,18 +515,41 @@ func (c *chainedIterator) Seek(target []byte) bool {
iter.Seek(target)
}
// Find the first valid iterator with the smallest key >= target
c.current = -1
var smallestKey []byte
// Maps to track the best (newest) source for each key
keyToSource := make(map[string]int) // Key -> best source index
keyToLevel := make(map[string]int) // Key -> best source level (lower is better)
keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering)
// First pass: Find the best source for each key
for i, iter := range c.iterators {
if !iter.Valid() {
continue
}
if c.current == -1 || bytes.Compare(iter.Key(), smallestKey) < 0 {
c.current = i
smallestKey = iter.Key()
// Use string key for map
keyStr := string(iter.Key())
keyBytes := iter.Key()
level := c.sources[i].GetLevel()
// If we haven't seen this key yet, or this source is newer
bestLevel, seen := keyToLevel[keyStr]
if !seen || level < bestLevel {
keyToSource[keyStr] = i
keyToLevel[keyStr] = level
keyToPos[keyStr] = keyBytes
}
}
// Find the smallest key in our deduplicated set
c.current = -1
var smallestKey []byte
for keyStr, sourceIdx := range keyToSource {
keyBytes := keyToPos[keyStr]
if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 {
c.current = sourceIdx
smallestKey = keyBytes
}
}
@ -502,18 +571,50 @@ func (c *chainedIterator) Next() bool {
}
}
// Find the next valid iterator with the smallest key
// Find the next key after the current one - we need to find the
// smallest key that is different from the current key
c.current = -1
var smallestKey []byte
var nextKey []byte
var bestLevel int = -1
// First pass: Find the smallest key that's different from currentKey
for i, iter := range c.iterators {
if !iter.Valid() {
continue
}
if c.current == -1 || bytes.Compare(iter.Key(), smallestKey) < 0 {
// Skip if this iterator is still at the current key
if bytes.Equal(iter.Key(), currentKey) {
continue
}
// If we haven't found a key yet, or this one is smaller
if nextKey == nil || bytes.Compare(iter.Key(), nextKey) < 0 {
nextKey = iter.Key()
c.current = i
smallestKey = iter.Key()
bestLevel = c.sources[i].GetLevel()
}
}
// If we found a next key, now find the newest version of that key
if nextKey != nil {
// Second pass: Find the newest version of nextKey (lowest level number)
for i, iter := range c.iterators {
if !iter.Valid() {
continue
}
// Skip if this isn't a match for the next key
if !bytes.Equal(iter.Key(), nextKey) {
continue
}
// If this source is newer (lower level number), use it instead
sourceLevel := c.sources[i].GetLevel()
if sourceLevel < bestLevel || bestLevel == -1 {
c.current = i
bestLevel = sourceLevel
}
}
}

View File

@ -179,3 +179,18 @@ func (p *MemTablePool) TotalSize() int64 {
return total
}
// SetActiveMemTable sets the active memtable (used for recovery)
func (p *MemTablePool) SetActiveMemTable(memTable *MemTable) {
p.mu.Lock()
defer p.mu.Unlock()
// If there's already an active memtable, make it immutable
if p.active != nil && p.active.ApproximateSize() > 0 {
p.active.SetImmutable()
p.immutables = append(p.immutables, p.active)
}
// Set the provided memtable as active
p.active = memTable
}

View File

@ -6,8 +6,14 @@ import (
"git.canoozie.net/jer/go-storage/pkg/engine"
"git.canoozie.net/jer/go-storage/pkg/transaction"
"git.canoozie.net/jer/go-storage/pkg/wal"
)
// Disable all logs in tests
func init() {
wal.DisableRecoveryLogs = true
}
func Example() {
// Create a temporary directory for the example
tempDir, err := os.MkdirTemp("", "transaction_example_*")

View File

@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
)
// Reader reads entries from WAL files
@ -242,6 +243,30 @@ func FindWALFiles(dir string) ([]string, error) {
}
// ReplayWALFile replays a single WAL file and calls the handler for each entry
// getEntryCount counts the number of valid entries in a WAL file
func getEntryCount(path string) int {
reader, err := OpenReader(path)
if err != nil {
return 0
}
defer reader.Close()
count := 0
for {
_, err := reader.ReadEntry()
if err != nil {
if err == io.EOF {
break
}
// Skip corrupted entries
continue
}
count++
}
return count
}
func ReplayWALFile(path string, handler EntryHandler) error {
reader, err := OpenReader(path)
if err != nil {
@ -249,20 +274,84 @@ func ReplayWALFile(path string, handler EntryHandler) error {
}
defer reader.Close()
// Track statistics for reporting
entriesProcessed := 0
entriesSkipped := 0
for {
entry, err := reader.ReadEntry()
if err != nil {
if err == io.EOF {
// Reached the end of the file
break
}
// Check if this is a corruption error
if strings.Contains(err.Error(), "corrupt") ||
strings.Contains(err.Error(), "invalid") {
// Skip this corrupted entry
if !DisableRecoveryLogs {
fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err)
}
entriesSkipped++
// If we've seen too many corrupted entries in a row, give up on this file
if entriesSkipped > 5 && entriesProcessed == 0 {
return fmt.Errorf("too many corrupted entries at start of file %s", path)
}
// Try to recover by scanning ahead
// This is a very basic recovery mechanism that works by reading bytes
// until we find what looks like a valid header
recoverErr := recoverFromCorruption(reader)
if recoverErr != nil {
if recoverErr == io.EOF {
// Reached the end during recovery
break
}
// Couldn't recover
return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr)
}
// Successfully recovered, continue to the next entry
continue
}
// For other errors, fail the replay
return fmt.Errorf("error reading entry from %s: %w", path, err)
}
// Process the entry
if err := handler(entry); err != nil {
return fmt.Errorf("error handling entry: %w", err)
}
entriesProcessed++
}
if !DisableRecoveryLogs {
fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n",
entriesProcessed, path, entriesSkipped)
}
return nil
}
// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead
func recoverFromCorruption(reader *Reader) error {
// Create a small buffer to read bytes one at a time
buf := make([]byte, 1)
// Read up to 32KB ahead looking for a valid header
for i := 0; i < 32*1024; i++ {
_, err := reader.reader.Read(buf)
if err != nil {
return err
}
}
// At this point, either we're at a valid position or we've skipped ahead
// Let the next ReadEntry attempt to parse from this position
return nil
}
@ -273,10 +362,47 @@ func ReplayWALDir(dir string, handler EntryHandler) error {
return err
}
// Track number of files processed successfully
successfulFiles := 0
var lastErr error
// Try to process each file, but continue on recoverable errors
for _, file := range files {
if err := ReplayWALFile(file, handler); err != nil {
return err
err := ReplayWALFile(file, handler)
if err != nil {
if !DisableRecoveryLogs {
fmt.Printf("Error processing WAL file %s: %v\n", file, err)
}
// Record the error, but continue
lastErr = err
// Check if this is a file-level error or just a corrupt record
if !strings.Contains(err.Error(), "corrupt") &&
!strings.Contains(err.Error(), "invalid") {
return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err)
}
// Continue to the next file for corrupt/invalid errors
continue
}
if !DisableRecoveryLogs {
fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n",
getEntryCount(file), file)
}
successfulFiles++
}
// If we processed at least one file successfully, the WAL recovery is considered successful
if successfulFiles > 0 {
return nil
}
// If no files were processed successfully and we had errors, return the last error
if lastErr != nil {
return fmt.Errorf("failed to process any WAL files: %w", lastErr)
}
return nil

View File

@ -56,6 +56,9 @@ type Entry struct {
Value []byte
}
// Global variable to control whether to print recovery logs
var DisableRecoveryLogs bool = false
// WAL represents a write-ahead log
type WAL struct {
cfg *config.Config
@ -101,6 +104,76 @@ func NewWAL(cfg *config.Config, dir string) (*WAL, error) {
return wal, nil
}
// ReuseWAL attempts to reuse an existing WAL file for appending
// Returns nil, nil if no suitable WAL file is found
func ReuseWAL(cfg *config.Config, dir string, nextSeq uint64) (*WAL, error) {
if cfg == nil {
return nil, errors.New("config cannot be nil")
}
// Find existing WAL files
files, err := FindWALFiles(dir)
if err != nil {
return nil, fmt.Errorf("failed to find WAL files: %w", err)
}
// No files found
if len(files) == 0 {
return nil, nil
}
// Try the most recent one (last in sorted order)
latestWAL := files[len(files)-1]
// Try to open for append
file, err := os.OpenFile(latestWAL, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
// Don't log in tests
if !DisableRecoveryLogs {
fmt.Printf("Cannot open latest WAL for append: %v\n", err)
}
return nil, nil
}
// Check if file is not too large
stat, err := file.Stat()
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to stat WAL file: %w", err)
}
// Define maximum WAL size to check against
maxWALSize := int64(64 * 1024 * 1024) // Default 64MB
if cfg.WALMaxSize > 0 {
maxWALSize = cfg.WALMaxSize
}
if stat.Size() >= maxWALSize {
file.Close()
if !DisableRecoveryLogs {
fmt.Printf("Latest WAL file is too large to reuse (%d bytes)\n", stat.Size())
}
return nil, nil
}
if !DisableRecoveryLogs {
fmt.Printf("Reusing existing WAL file: %s with next sequence %d\n",
latestWAL, nextSeq)
}
wal := &WAL{
cfg: cfg,
dir: dir,
file: file,
writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer
nextSequence: nextSeq,
bytesWritten: stat.Size(),
lastSync: time.Now(),
}
return wal, nil
}
// Append adds an entry to the WAL
func (w *WAL) Append(entryType uint8, key, value []byte) (uint64, error) {
w.mu.Lock()
@ -450,6 +523,17 @@ func (w *WAL) Close() error {
return nil
}
// UpdateNextSequence sets the next sequence number for the WAL
// This is used after recovery to ensure new entries have increasing sequence numbers
func (w *WAL) UpdateNextSequence(nextSeq uint64) {
w.mu.Lock()
defer w.mu.Unlock()
if nextSeq > w.nextSequence {
w.nextSequence = nextSeq
}
}
func min(a, b int) int {
if a < b {
return a