From 7231a48d0e7c186d918d0ed3bd16fceb742ec508 Mon Sep 17 00:00:00 2001 From: Jeremy Tregunna Date: Sun, 20 Apr 2025 12:51:00 -0600 Subject: [PATCH] refactor: comprehensive restructuring of storage engine with improved WAL recovery and iterators --- .gitignore | 1 + pkg/compaction/base_strategy.go | 149 ++++++++++ pkg/compaction/compaction.go | 458 ------------------------------ pkg/compaction/compaction_test.go | 39 ++- pkg/compaction/compat.go | 48 ++++ pkg/compaction/coordinator.go | 309 ++++++++++++++++++++ pkg/compaction/executor.go | 177 ++++++++++++ pkg/compaction/file_tracker.go | 95 +++++++ pkg/compaction/interfaces.go | 82 ++++++ pkg/compaction/manager.go | 413 --------------------------- pkg/compaction/tiered.go | 397 -------------------------- pkg/compaction/tiered_strategy.go | 268 +++++++++++++++++ pkg/compaction/tombstone.go | 6 +- pkg/config/config.go | 1 + pkg/engine/engine.go | 131 ++++++++- pkg/engine/iterator.go | 137 +++++++-- pkg/memtable/mempool.go | 15 + pkg/transaction/example_test.go | 6 + pkg/wal/reader.go | 130 ++++++++- pkg/wal/wal.go | 84 ++++++ 20 files changed, 1629 insertions(+), 1317 deletions(-) create mode 100644 pkg/compaction/base_strategy.go create mode 100644 pkg/compaction/compat.go create mode 100644 pkg/compaction/coordinator.go create mode 100644 pkg/compaction/executor.go create mode 100644 pkg/compaction/file_tracker.go create mode 100644 pkg/compaction/interfaces.go delete mode 100644 pkg/compaction/manager.go delete mode 100644 pkg/compaction/tiered.go create mode 100644 pkg/compaction/tiered_strategy.go diff --git a/.gitignore b/.gitignore index e81844f..ae3c5db 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ benchmark-data # Executables +gs storage-bench # Dependency directories diff --git a/pkg/compaction/base_strategy.go b/pkg/compaction/base_strategy.go new file mode 100644 index 0000000..cdfb19e --- /dev/null +++ b/pkg/compaction/base_strategy.go @@ -0,0 +1,149 @@ +package compaction + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "strings" + + "git.canoozie.net/jer/go-storage/pkg/config" + "git.canoozie.net/jer/go-storage/pkg/sstable" +) + +// BaseCompactionStrategy provides common functionality for compaction strategies +type BaseCompactionStrategy struct { + // Configuration + cfg *config.Config + + // SSTable directory + sstableDir string + + // File information by level + levels map[int][]*SSTableInfo +} + +// NewBaseCompactionStrategy creates a new base compaction strategy +func NewBaseCompactionStrategy(cfg *config.Config, sstableDir string) *BaseCompactionStrategy { + return &BaseCompactionStrategy{ + cfg: cfg, + sstableDir: sstableDir, + levels: make(map[int][]*SSTableInfo), + } +} + +// LoadSSTables scans the SSTable directory and loads metadata for all files +func (s *BaseCompactionStrategy) LoadSSTables() error { + // Clear existing data + s.levels = make(map[int][]*SSTableInfo) + + // Read all files from the SSTable directory + entries, err := os.ReadDir(s.sstableDir) + if err != nil { + if os.IsNotExist(err) { + return nil // Directory doesn't exist yet + } + return fmt.Errorf("failed to read SSTable directory: %w", err) + } + + // Parse filenames and collect information + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sst") { + continue // Skip directories and non-SSTable files + } + + // Parse filename to extract level, sequence, and timestamp + // Filename format: level_sequence_timestamp.sst + var level int + var sequence uint64 + var timestamp int64 + + if n, err := fmt.Sscanf(entry.Name(), "%d_%06d_%020d.sst", + &level, &sequence, ×tamp); n != 3 || err != nil { + // Skip files that don't match our naming pattern + continue + } + + // Get file info for size + fi, err := entry.Info() + if err != nil { + return fmt.Errorf("failed to get file info for %s: %w", entry.Name(), err) + } + + // Open the file to extract key range information + path := filepath.Join(s.sstableDir, entry.Name()) + reader, err := sstable.OpenReader(path) + if err != nil { + return fmt.Errorf("failed to open SSTable %s: %w", path, err) + } + + // Create iterator to get first and last keys + iter := reader.NewIterator() + var firstKey, lastKey []byte + + // Get first key + iter.SeekToFirst() + if iter.Valid() { + firstKey = append([]byte{}, iter.Key()...) + } + + // Get last key + iter.SeekToLast() + if iter.Valid() { + lastKey = append([]byte{}, iter.Key()...) + } + + // Create SSTable info + info := &SSTableInfo{ + Path: path, + Level: level, + Sequence: sequence, + Timestamp: timestamp, + Size: fi.Size(), + KeyCount: reader.GetKeyCount(), + FirstKey: firstKey, + LastKey: lastKey, + Reader: reader, + } + + // Add to appropriate level + s.levels[level] = append(s.levels[level], info) + } + + // Sort files within each level by sequence number + for level, files := range s.levels { + sort.Slice(files, func(i, j int) bool { + return files[i].Sequence < files[j].Sequence + }) + s.levels[level] = files + } + + return nil +} + +// Close closes all open SSTable readers +func (s *BaseCompactionStrategy) Close() error { + var lastErr error + + for _, files := range s.levels { + for _, file := range files { + if file.Reader != nil { + if err := file.Reader.Close(); err != nil && lastErr == nil { + lastErr = err + } + file.Reader = nil + } + } + } + + return lastErr +} + +// GetLevelSize returns the total size of all files in a level +func (s *BaseCompactionStrategy) GetLevelSize(level int) int64 { + var size int64 + for _, file := range s.levels[level] { + size += file.Size + } + return size +} \ No newline at end of file diff --git a/pkg/compaction/compaction.go b/pkg/compaction/compaction.go index 02d7aed..c4fea5d 100644 --- a/pkg/compaction/compaction.go +++ b/pkg/compaction/compaction.go @@ -3,15 +3,7 @@ package compaction import ( "bytes" "fmt" - "os" - "path/filepath" - "sort" - "strings" - "time" - "git.canoozie.net/jer/go-storage/pkg/common/iterator" - "git.canoozie.net/jer/go-storage/pkg/common/iterator/composite" - "git.canoozie.net/jer/go-storage/pkg/config" "git.canoozie.net/jer/go-storage/pkg/sstable" ) @@ -81,454 +73,4 @@ type CompactionTask struct { // Output file path template OutputPathTemplate string -} - -// Compactor manages the compaction process -type Compactor struct { - // Configuration - cfg *config.Config - - // SSTable directory - sstableDir string - - // File information by level - levels map[int][]*SSTableInfo - - // Tombstone tracking - tombstoneTracker *TombstoneTracker -} - -// NewCompactor creates a new compaction manager -func NewCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *Compactor { - return &Compactor{ - cfg: cfg, - sstableDir: sstableDir, - levels: make(map[int][]*SSTableInfo), - tombstoneTracker: tracker, - } -} - -// LoadSSTables scans the SSTable directory and loads metadata for all files -func (c *Compactor) LoadSSTables() error { - // Clear existing data - c.levels = make(map[int][]*SSTableInfo) - - // Read all files from the SSTable directory - entries, err := os.ReadDir(c.sstableDir) - if err != nil { - if os.IsNotExist(err) { - return nil // Directory doesn't exist yet - } - return fmt.Errorf("failed to read SSTable directory: %w", err) - } - - // Parse filenames and collect information - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sst") { - continue // Skip directories and non-SSTable files - } - - // Parse filename to extract level, sequence, and timestamp - // Filename format: level_sequence_timestamp.sst - var level int - var sequence uint64 - var timestamp int64 - - if n, err := fmt.Sscanf(entry.Name(), "%d_%06d_%020d.sst", - &level, &sequence, ×tamp); n != 3 || err != nil { - // Skip files that don't match our naming pattern - continue - } - - // Get file info for size - fi, err := entry.Info() - if err != nil { - return fmt.Errorf("failed to get file info for %s: %w", entry.Name(), err) - } - - // Open the file to extract key range information - path := filepath.Join(c.sstableDir, entry.Name()) - reader, err := sstable.OpenReader(path) - if err != nil { - return fmt.Errorf("failed to open SSTable %s: %w", path, err) - } - - // Create iterator to get first and last keys - iter := reader.NewIterator() - var firstKey, lastKey []byte - - // Get first key - iter.SeekToFirst() - if iter.Valid() { - firstKey = append([]byte{}, iter.Key()...) - } - - // Get last key - iter.SeekToLast() - if iter.Valid() { - lastKey = append([]byte{}, iter.Key()...) - } - - // Create SSTable info - info := &SSTableInfo{ - Path: path, - Level: level, - Sequence: sequence, - Timestamp: timestamp, - Size: fi.Size(), - KeyCount: reader.GetKeyCount(), - FirstKey: firstKey, - LastKey: lastKey, - Reader: reader, - } - - // Add to appropriate level - c.levels[level] = append(c.levels[level], info) - } - - // Sort files within each level by sequence number - for level, files := range c.levels { - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - c.levels[level] = files - } - - return nil -} - -// Close closes all open SSTable readers -func (c *Compactor) Close() error { - var lastErr error - - for _, files := range c.levels { - for _, file := range files { - if file.Reader != nil { - if err := file.Reader.Close(); err != nil && lastErr == nil { - lastErr = err - } - file.Reader = nil - } - } - } - - return lastErr -} - -// GetLevelSize returns the total size of all files in a level -func (c *Compactor) GetLevelSize(level int) int64 { - var size int64 - for _, file := range c.levels[level] { - size += file.Size - } - return size -} - -// GetCompactionTask selects files for compaction based on size ratio and overlap -func (c *Compactor) GetCompactionTask() (*CompactionTask, error) { - // No compaction if we don't have at least 2 levels with files - if len(c.levels) < 1 { - return nil, nil - } - - // Check if L0 needs compaction (Level 0 is special because files may overlap) - if len(c.levels[0]) >= c.cfg.MaxMemTables { - return c.selectLevel0Compaction() - } - - // Check higher levels based on size ratio - maxLevel := 0 - for level := range c.levels { - if level > maxLevel { - maxLevel = level - } - } - - // Check each level starting from 1 - for level := 1; level <= maxLevel; level++ { - // If this level is too large compared to the next level - nextLevelSize := c.GetLevelSize(level + 1) - thisLevelSize := c.GetLevelSize(level) - - // If this level is empty, skip it - if thisLevelSize == 0 { - continue - } - - // If the next level doesn't exist yet or is empty, create it - if nextLevelSize == 0 { - // Choose one file from this level to move to the next level - if len(c.levels[level]) > 0 { - return c.selectSingleFileCompaction(level) - } - continue - } - - // Check if this level exceeds the size ratio threshold - sizeRatio := float64(thisLevelSize) / float64(nextLevelSize) - if sizeRatio >= c.cfg.CompactionRatio { - return c.selectSizeBasedCompaction(level) - } - } - - // No compaction needed - return nil, nil -} - -// selectLevel0Compaction selects files from L0 for compaction -func (c *Compactor) selectLevel0Compaction() (*CompactionTask, error) { - // Not enough files in L0 for compaction - if len(c.levels[0]) < 2 { - return nil, nil - } - - // For L0, take oldest files and compact them to L1 - numFiles := len(c.levels[0]) - maxCompactFiles := c.cfg.MaxMemTables - if numFiles > maxCompactFiles { - numFiles = maxCompactFiles - } - - // Sort L0 files by sequence number to get oldest first - files := make([]*SSTableInfo, len(c.levels[0])) - copy(files, c.levels[0]) - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - - // Select oldest files for compaction - selectedFiles := files[:numFiles] - - // Determine key range of selected files - var minKey, maxKey []byte - for _, file := range selectedFiles { - if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 { - minKey = file.FirstKey - } - if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 { - maxKey = file.LastKey - } - } - - // Find overlapping files in L1 - var l1Files []*SSTableInfo - for _, file := range c.levels[1] { - // Create a temporary SSTableInfo for the key range - rangeInfo := &SSTableInfo{ - FirstKey: minKey, - LastKey: maxKey, - } - - if file.Overlaps(rangeInfo) { - l1Files = append(l1Files, file) - } - } - - // Create the compaction task - task := &CompactionTask{ - InputFiles: map[int][]*SSTableInfo{ - 0: selectedFiles, - 1: l1Files, - }, - TargetLevel: 1, - OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"), - } - - return task, nil -} - -// selectSingleFileCompaction selects a single file to compact to the next level -func (c *Compactor) selectSingleFileCompaction(level int) (*CompactionTask, error) { - // Find the oldest file in this level - if len(c.levels[level]) == 0 { - return nil, nil - } - - // Sort by sequence number to get the oldest file - files := make([]*SSTableInfo, len(c.levels[level])) - copy(files, c.levels[level]) - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - - // Select the oldest file - file := files[0] - - // Find overlapping files in the next level - var nextLevelFiles []*SSTableInfo - for _, nextFile := range c.levels[level+1] { - if file.Overlaps(nextFile) { - nextLevelFiles = append(nextLevelFiles, nextFile) - } - } - - // Create the compaction task - task := &CompactionTask{ - InputFiles: map[int][]*SSTableInfo{ - level: {file}, - level + 1: nextLevelFiles, - }, - TargetLevel: level + 1, - OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"), - } - - return task, nil -} - -// selectSizeBasedCompaction selects files for compaction based on size ratio -func (c *Compactor) selectSizeBasedCompaction(level int) (*CompactionTask, error) { - // Find a file from this level to compact - if len(c.levels[level]) == 0 { - return nil, nil - } - - // Choose a file that hasn't been compacted recently - // For simplicity, just choose the oldest file (lowest sequence number) - files := make([]*SSTableInfo, len(c.levels[level])) - copy(files, c.levels[level]) - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - - // Select the oldest file - file := files[0] - - // Find overlapping files in the next level - var nextLevelFiles []*SSTableInfo - for _, nextFile := range c.levels[level+1] { - if file.Overlaps(nextFile) { - nextLevelFiles = append(nextLevelFiles, nextFile) - } - } - - // Create the compaction task - task := &CompactionTask{ - InputFiles: map[int][]*SSTableInfo{ - level: {file}, - level + 1: nextLevelFiles, - }, - TargetLevel: level + 1, - OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"), - } - - return task, nil -} - -// CompactFiles performs the actual compaction of the input files -func (c *Compactor) CompactFiles(task *CompactionTask) ([]string, error) { - // Create a merged iterator over all input files - var iterators []iterator.Iterator - - // Add iterators from both levels - for level := 0; level <= task.TargetLevel; level++ { - for _, file := range task.InputFiles[level] { - // We need an iterator that preserves delete markers - if file.Reader != nil { - iterators = append(iterators, file.Reader.NewIterator()) - } - } - } - - // Create hierarchical merged iterator - mergedIter := composite.NewHierarchicalIterator(iterators) - - // Remember the input file paths for later cleanup - var inputPaths []string - for _, files := range task.InputFiles { - for _, file := range files { - inputPaths = append(inputPaths, file.Path) - } - } - - // Track keys to skip duplicate entries (for tombstones) - var lastKey []byte - var outputFiles []string - var currentWriter *sstable.Writer - var currentOutputPath string - var outputFileSequence uint64 = 1 - var entriesInCurrentFile int - - // Function to create a new output file - createNewOutputFile := func() error { - if currentWriter != nil { - if err := currentWriter.Finish(); err != nil { - return fmt.Errorf("failed to finish SSTable: %w", err) - } - outputFiles = append(outputFiles, currentOutputPath) - } - - // Create a new output file - timestamp := time.Now().UnixNano() - currentOutputPath = fmt.Sprintf(task.OutputPathTemplate, - task.TargetLevel, outputFileSequence, timestamp) - outputFileSequence++ - - var err error - currentWriter, err = sstable.NewWriter(currentOutputPath) - if err != nil { - return fmt.Errorf("failed to create SSTable writer: %w", err) - } - - entriesInCurrentFile = 0 - return nil - } - - // Create the first output file - if err := createNewOutputFile(); err != nil { - return nil, err - } - - // Iterate through all keys in sorted order - mergedIter.SeekToFirst() - for mergedIter.Valid() { - key := mergedIter.Key() - value := mergedIter.Value() - - // Skip duplicates (we've already included the newest version) - if lastKey != nil && bytes.Equal(key, lastKey) { - mergedIter.Next() - continue - } - - // Add the entry to the current output file, preserving both values and tombstones - // This ensures deletion markers are properly maintained - if err := currentWriter.Add(key, value); err != nil { - return nil, fmt.Errorf("failed to add entry to SSTable: %w", err) - } - entriesInCurrentFile++ - - // If the current file is big enough, start a new one - if int64(entriesInCurrentFile) >= c.cfg.SSTableMaxSize { - if err := createNewOutputFile(); err != nil { - return nil, err - } - } - - // Remember this key to skip duplicates - lastKey = append(lastKey[:0], key...) - mergedIter.Next() - } - - // Finish the last output file - if currentWriter != nil && entriesInCurrentFile > 0 { - if err := currentWriter.Finish(); err != nil { - return nil, fmt.Errorf("failed to finish SSTable: %w", err) - } - outputFiles = append(outputFiles, currentOutputPath) - } else if currentWriter != nil { - // No entries were written, abort the file - currentWriter.Abort() - } - - return outputFiles, nil -} - -// DeleteCompactedFiles removes the input files that were successfully compacted -func (c *Compactor) DeleteCompactedFiles(filePaths []string) error { - for _, path := range filePaths { - if err := os.Remove(path); err != nil { - return fmt.Errorf("failed to delete compacted file %s: %w", path, err) - } - } - return nil } \ No newline at end of file diff --git a/pkg/compaction/compaction_test.go b/pkg/compaction/compaction_test.go index c3d05df..768518b 100644 --- a/pkg/compaction/compaction_test.go +++ b/pkg/compaction/compaction_test.go @@ -65,6 +65,7 @@ func setupCompactionTest(t *testing.T) (string, *config.Config, func()) { CompactionThreads: 1, MaxMemTables: 2, SSTableMaxSize: 1000, + MaxLevelWithTombstones: 3, } // Return cleanup function @@ -98,26 +99,22 @@ func TestCompactorLoadSSTables(t *testing.T) { createTestSSTable(t, sstDir, 0, 1, timestamp, data1) createTestSSTable(t, sstDir, 0, 2, timestamp+1, data2) - // Create the compactor - // Create a tombstone tracker - tracker := NewTombstoneTracker(24 * time.Hour) - - // Create the compactor - compactor := NewCompactor(cfg, sstDir, tracker) + // Create the strategy + strategy := NewBaseCompactionStrategy(cfg, sstDir) // Load SSTables - err := compactor.LoadSSTables() + err := strategy.LoadSSTables() if err != nil { t.Fatalf("Failed to load SSTables: %v", err) } // Verify the correct number of files was loaded - if len(compactor.levels[0]) != 2 { - t.Errorf("Expected 2 files in level 0, got %d", len(compactor.levels[0])) + if len(strategy.levels[0]) != 2 { + t.Errorf("Expected 2 files in level 0, got %d", len(strategy.levels[0])) } // Verify key ranges - for _, file := range compactor.levels[0] { + for _, file := range strategy.levels[0] { if bytes.Equal(file.FirstKey, []byte("a")) { if !bytes.Equal(file.LastKey, []byte("c")) { t.Errorf("Expected last key 'c', got '%s'", string(file.LastKey)) @@ -196,18 +193,18 @@ func TestCompactorSelectLevel0Compaction(t *testing.T) { // Create the compactor // Create a tombstone tracker tracker := NewTombstoneTracker(24 * time.Hour) - + executor := NewCompactionExecutor(cfg, sstDir, tracker) // Create the compactor - compactor := NewTieredCompactor(cfg, sstDir, tracker) + strategy := NewTieredCompactionStrategy(cfg, sstDir, executor) // Load SSTables - err := compactor.LoadSSTables() + err := strategy.LoadSSTables() if err != nil { t.Fatalf("Failed to load SSTables: %v", err) } // Select compaction task - task, err := compactor.SelectCompaction() + task, err := strategy.SelectCompaction() if err != nil { t.Fatalf("Failed to select compaction: %v", err) } @@ -253,14 +250,12 @@ func TestCompactFiles(t *testing.T) { t.Logf("Created test SSTables: %s, %s", sstPath1, sstPath2) // Create the compactor - // Create a tombstone tracker tracker := NewTombstoneTracker(24 * time.Hour) - - // Create the compactor - compactor := NewCompactor(cfg, sstDir, tracker) + executor := NewCompactionExecutor(cfg, sstDir, tracker) + strategy := NewBaseCompactionStrategy(cfg, sstDir) // Load SSTables - err := compactor.LoadSSTables() + err := strategy.LoadSSTables() if err != nil { t.Fatalf("Failed to load SSTables: %v", err) } @@ -268,15 +263,15 @@ func TestCompactFiles(t *testing.T) { // Create a compaction task task := &CompactionTask{ InputFiles: map[int][]*SSTableInfo{ - 0: {compactor.levels[0][0]}, - 1: {compactor.levels[1][0]}, + 0: {strategy.levels[0][0]}, + 1: {strategy.levels[1][0]}, }, TargetLevel: 1, OutputPathTemplate: filepath.Join(sstDir, "%d_%06d_%020d.sst"), } // Perform compaction - outputFiles, err := compactor.CompactFiles(task) + outputFiles, err := executor.CompactFiles(task) if err != nil { t.Fatalf("Failed to compact files: %v", err) } diff --git a/pkg/compaction/compat.go b/pkg/compaction/compat.go new file mode 100644 index 0000000..92b2626 --- /dev/null +++ b/pkg/compaction/compat.go @@ -0,0 +1,48 @@ +package compaction + +import ( + "time" + + "git.canoozie.net/jer/go-storage/pkg/config" +) + +// NewCompactionManager creates a new compaction manager with the old API +// This is kept for backward compatibility with existing code +func NewCompactionManager(cfg *config.Config, sstableDir string) *DefaultCompactionCoordinator { + // Create tombstone tracker with default 24-hour retention + tombstones := NewTombstoneTracker(24 * time.Hour) + + // Create file tracker + fileTracker := NewFileTracker() + + // Create compaction executor + executor := NewCompactionExecutor(cfg, sstableDir, tombstones) + + // Create tiered compaction strategy + strategy := NewTieredCompactionStrategy(cfg, sstableDir, executor) + + // Return the new coordinator + return NewCompactionCoordinator(cfg, sstableDir, CompactionCoordinatorOptions{ + Strategy: strategy, + Executor: executor, + FileTracker: fileTracker, + TombstoneManager: tombstones, + CompactionInterval: cfg.CompactionInterval, + }) +} + +// Temporary alias types for backward compatibility +type CompactionManager = DefaultCompactionCoordinator +type Compactor = BaseCompactionStrategy +type TieredCompactor = TieredCompactionStrategy + +// NewCompactor creates a new compactor with the old API (backward compatibility) +func NewCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *BaseCompactionStrategy { + return NewBaseCompactionStrategy(cfg, sstableDir) +} + +// NewTieredCompactor creates a new tiered compactor with the old API (backward compatibility) +func NewTieredCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *TieredCompactionStrategy { + executor := NewCompactionExecutor(cfg, sstableDir, tracker) + return NewTieredCompactionStrategy(cfg, sstableDir, executor) +} \ No newline at end of file diff --git a/pkg/compaction/coordinator.go b/pkg/compaction/coordinator.go new file mode 100644 index 0000000..cd37da7 --- /dev/null +++ b/pkg/compaction/coordinator.go @@ -0,0 +1,309 @@ +package compaction + +import ( + "fmt" + "sync" + "time" + + "git.canoozie.net/jer/go-storage/pkg/config" +) + +// CompactionCoordinatorOptions holds configuration options for the coordinator +type CompactionCoordinatorOptions struct { + // Compaction strategy + Strategy CompactionStrategy + + // Compaction executor + Executor CompactionExecutor + + // File tracker + FileTracker FileTracker + + // Tombstone manager + TombstoneManager TombstoneManager + + // Compaction interval in seconds + CompactionInterval int64 +} + +// DefaultCompactionCoordinator is the default implementation of CompactionCoordinator +type DefaultCompactionCoordinator struct { + // Configuration + cfg *config.Config + + // SSTable directory + sstableDir string + + // Compaction strategy + strategy CompactionStrategy + + // Compaction executor + executor CompactionExecutor + + // File tracker + fileTracker FileTracker + + // Tombstone manager + tombstoneManager TombstoneManager + + // Next sequence number for SSTable files + nextSeq uint64 + + // Compaction state + running bool + stopCh chan struct{} + compactingMu sync.Mutex + + // Last set of files produced by compaction + lastCompactionOutputs []string + resultsMu sync.RWMutex + + // Compaction interval in seconds + compactionInterval int64 +} + +// NewCompactionCoordinator creates a new compaction coordinator +func NewCompactionCoordinator(cfg *config.Config, sstableDir string, options CompactionCoordinatorOptions) *DefaultCompactionCoordinator { + // Set defaults for any missing components + if options.FileTracker == nil { + options.FileTracker = NewFileTracker() + } + + if options.TombstoneManager == nil { + options.TombstoneManager = NewTombstoneTracker(24 * time.Hour) + } + + if options.Executor == nil { + options.Executor = NewCompactionExecutor(cfg, sstableDir, options.TombstoneManager) + } + + if options.Strategy == nil { + options.Strategy = NewTieredCompactionStrategy(cfg, sstableDir, options.Executor) + } + + if options.CompactionInterval <= 0 { + options.CompactionInterval = 1 // Default to 1 second + } + + return &DefaultCompactionCoordinator{ + cfg: cfg, + sstableDir: sstableDir, + strategy: options.Strategy, + executor: options.Executor, + fileTracker: options.FileTracker, + tombstoneManager: options.TombstoneManager, + nextSeq: 1, + stopCh: make(chan struct{}), + lastCompactionOutputs: make([]string, 0), + compactionInterval: options.CompactionInterval, + } +} + +// Start begins background compaction +func (c *DefaultCompactionCoordinator) Start() error { + c.compactingMu.Lock() + defer c.compactingMu.Unlock() + + if c.running { + return nil // Already running + } + + // Load existing SSTables + if err := c.strategy.LoadSSTables(); err != nil { + return fmt.Errorf("failed to load SSTables: %w", err) + } + + c.running = true + c.stopCh = make(chan struct{}) + + // Start background worker + go c.compactionWorker() + + return nil +} + +// Stop halts background compaction +func (c *DefaultCompactionCoordinator) Stop() error { + c.compactingMu.Lock() + defer c.compactingMu.Unlock() + + if !c.running { + return nil // Already stopped + } + + // Signal the worker to stop + close(c.stopCh) + c.running = false + + // Close strategy + return c.strategy.Close() +} + +// TrackTombstone adds a key to the tombstone tracker +func (c *DefaultCompactionCoordinator) TrackTombstone(key []byte) { + // Track the tombstone in our tracker + if c.tombstoneManager != nil { + c.tombstoneManager.AddTombstone(key) + } +} + +// ForcePreserveTombstone marks a tombstone for special handling during compaction +// This is primarily for testing purposes, to ensure specific tombstones are preserved +func (c *DefaultCompactionCoordinator) ForcePreserveTombstone(key []byte) { + if c.tombstoneManager != nil { + c.tombstoneManager.ForcePreserveTombstone(key) + } +} + +// MarkFileObsolete marks a file as obsolete (can be deleted) +// For backward compatibility with tests +func (c *DefaultCompactionCoordinator) MarkFileObsolete(path string) { + c.fileTracker.MarkFileObsolete(path) +} + +// CleanupObsoleteFiles removes files that are no longer needed +// For backward compatibility with tests +func (c *DefaultCompactionCoordinator) CleanupObsoleteFiles() error { + return c.fileTracker.CleanupObsoleteFiles() +} + +// compactionWorker runs the compaction loop +func (c *DefaultCompactionCoordinator) compactionWorker() { + // Ensure a minimum interval of 1 second + interval := c.compactionInterval + if interval <= 0 { + interval = 1 + } + ticker := time.NewTicker(time.Duration(interval) * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.stopCh: + return + case <-ticker.C: + // Only one compaction at a time + c.compactingMu.Lock() + + // Run a compaction cycle + err := c.runCompactionCycle() + if err != nil { + // In a real system, we'd log this error + // fmt.Printf("Compaction error: %v\n", err) + } + + // Try to clean up obsolete files + err = c.fileTracker.CleanupObsoleteFiles() + if err != nil { + // In a real system, we'd log this error + // fmt.Printf("Cleanup error: %v\n", err) + } + + // Collect tombstone garbage periodically + if manager, ok := c.tombstoneManager.(interface{ CollectGarbage() }); ok { + manager.CollectGarbage() + } + + c.compactingMu.Unlock() + } + } +} + +// runCompactionCycle performs a single compaction cycle +func (c *DefaultCompactionCoordinator) runCompactionCycle() error { + // Reload SSTables to get fresh information + if err := c.strategy.LoadSSTables(); err != nil { + return fmt.Errorf("failed to load SSTables: %w", err) + } + + // Select files for compaction + task, err := c.strategy.SelectCompaction() + if err != nil { + return fmt.Errorf("failed to select files for compaction: %w", err) + } + + // If no compaction needed, return + if task == nil { + return nil + } + + // Mark files as pending + for _, files := range task.InputFiles { + for _, file := range files { + c.fileTracker.MarkFilePending(file.Path) + } + } + + // Perform compaction + outputFiles, err := c.executor.CompactFiles(task) + + // Unmark files as pending + for _, files := range task.InputFiles { + for _, file := range files { + c.fileTracker.UnmarkFilePending(file.Path) + } + } + + // Track the compaction outputs for statistics + if err == nil && len(outputFiles) > 0 { + // Record the compaction result + c.resultsMu.Lock() + c.lastCompactionOutputs = outputFiles + c.resultsMu.Unlock() + } + + // Handle compaction errors + if err != nil { + return fmt.Errorf("compaction failed: %w", err) + } + + // Mark input files as obsolete + for _, files := range task.InputFiles { + for _, file := range files { + c.fileTracker.MarkFileObsolete(file.Path) + } + } + + // Try to clean up the files immediately + return c.fileTracker.CleanupObsoleteFiles() +} + +// TriggerCompaction forces a compaction cycle +func (c *DefaultCompactionCoordinator) TriggerCompaction() error { + c.compactingMu.Lock() + defer c.compactingMu.Unlock() + + return c.runCompactionCycle() +} + +// CompactRange triggers compaction on a specific key range +func (c *DefaultCompactionCoordinator) CompactRange(minKey, maxKey []byte) error { + c.compactingMu.Lock() + defer c.compactingMu.Unlock() + + // Load current SSTable information + if err := c.strategy.LoadSSTables(); err != nil { + return fmt.Errorf("failed to load SSTables: %w", err) + } + + // Delegate to the strategy for actual compaction + return c.strategy.CompactRange(minKey, maxKey) +} + +// GetCompactionStats returns statistics about the compaction state +func (c *DefaultCompactionCoordinator) GetCompactionStats() map[string]interface{} { + c.resultsMu.RLock() + defer c.resultsMu.RUnlock() + + stats := make(map[string]interface{}) + + // Include info about last compaction + stats["last_outputs_count"] = len(c.lastCompactionOutputs) + + // If there are recent compaction outputs, include information + if len(c.lastCompactionOutputs) > 0 { + stats["last_outputs"] = c.lastCompactionOutputs + } + + return stats +} \ No newline at end of file diff --git a/pkg/compaction/executor.go b/pkg/compaction/executor.go new file mode 100644 index 0000000..e060688 --- /dev/null +++ b/pkg/compaction/executor.go @@ -0,0 +1,177 @@ +package compaction + +import ( + "bytes" + "fmt" + "os" + "time" + + "git.canoozie.net/jer/go-storage/pkg/common/iterator" + "git.canoozie.net/jer/go-storage/pkg/common/iterator/composite" + "git.canoozie.net/jer/go-storage/pkg/config" + "git.canoozie.net/jer/go-storage/pkg/sstable" +) + +// DefaultCompactionExecutor handles the actual compaction process +type DefaultCompactionExecutor struct { + // Configuration + cfg *config.Config + + // SSTable directory + sstableDir string + + // Tombstone manager for tracking deletions + tombstoneManager TombstoneManager +} + +// NewCompactionExecutor creates a new compaction executor +func NewCompactionExecutor(cfg *config.Config, sstableDir string, tombstoneManager TombstoneManager) *DefaultCompactionExecutor { + return &DefaultCompactionExecutor{ + cfg: cfg, + sstableDir: sstableDir, + tombstoneManager: tombstoneManager, + } +} + +// CompactFiles performs the actual compaction of the input files +func (e *DefaultCompactionExecutor) CompactFiles(task *CompactionTask) ([]string, error) { + // Create a merged iterator over all input files + var iterators []iterator.Iterator + + // Add iterators from both levels + for level := 0; level <= task.TargetLevel; level++ { + for _, file := range task.InputFiles[level] { + // We need an iterator that preserves delete markers + if file.Reader != nil { + iterators = append(iterators, file.Reader.NewIterator()) + } + } + } + + // Create hierarchical merged iterator + mergedIter := composite.NewHierarchicalIterator(iterators) + + // Track keys to skip duplicate entries (for tombstones) + var lastKey []byte + var outputFiles []string + var currentWriter *sstable.Writer + var currentOutputPath string + var outputFileSequence uint64 = 1 + var entriesInCurrentFile int + + // Function to create a new output file + createNewOutputFile := func() error { + if currentWriter != nil { + if err := currentWriter.Finish(); err != nil { + return fmt.Errorf("failed to finish SSTable: %w", err) + } + outputFiles = append(outputFiles, currentOutputPath) + } + + // Create a new output file + timestamp := time.Now().UnixNano() + currentOutputPath = fmt.Sprintf(task.OutputPathTemplate, + task.TargetLevel, outputFileSequence, timestamp) + outputFileSequence++ + + var err error + currentWriter, err = sstable.NewWriter(currentOutputPath) + if err != nil { + return fmt.Errorf("failed to create SSTable writer: %w", err) + } + + entriesInCurrentFile = 0 + return nil + } + + // Create a tombstone filter if we have a tombstone manager + var tombstoneFilter *BasicTombstoneFilter + if e.tombstoneManager != nil { + tombstoneFilter = NewBasicTombstoneFilter( + task.TargetLevel, + e.cfg.MaxLevelWithTombstones, + e.tombstoneManager, + ) + } + + // Create the first output file + if err := createNewOutputFile(); err != nil { + return nil, err + } + + // Iterate through all keys in sorted order + mergedIter.SeekToFirst() + for mergedIter.Valid() { + key := mergedIter.Key() + value := mergedIter.Value() + + // Skip duplicates (we've already included the newest version) + if lastKey != nil && bytes.Equal(key, lastKey) { + mergedIter.Next() + continue + } + + // Determine if we should keep this entry + // If we have a tombstone filter, use it, otherwise use the default logic + var shouldKeep bool + isTombstone := mergedIter.IsTombstone() + + if tombstoneFilter != nil && isTombstone { + // Use the tombstone filter for tombstones + shouldKeep = tombstoneFilter.ShouldKeep(key, nil) + } else { + // Default logic - always keep non-tombstones, and keep tombstones in lower levels + shouldKeep = !isTombstone || task.TargetLevel <= e.cfg.MaxLevelWithTombstones + } + + if shouldKeep { + var err error + + // Use the explicit AddTombstone method if this is a tombstone + if isTombstone { + err = currentWriter.AddTombstone(key) + } else { + err = currentWriter.Add(key, value) + } + + if err != nil { + return nil, fmt.Errorf("failed to add entry to SSTable: %w", err) + } + entriesInCurrentFile++ + } + + // If the current file is big enough, start a new one + if int64(entriesInCurrentFile) >= e.cfg.SSTableMaxSize { + if err := createNewOutputFile(); err != nil { + return nil, err + } + } + + // Remember this key to skip duplicates + lastKey = append(lastKey[:0], key...) + mergedIter.Next() + } + + // Finish the last output file + if currentWriter != nil && entriesInCurrentFile > 0 { + if err := currentWriter.Finish(); err != nil { + return nil, fmt.Errorf("failed to finish SSTable: %w", err) + } + outputFiles = append(outputFiles, currentOutputPath) + } else if currentWriter != nil { + // No entries were written, abort the file + currentWriter.Abort() + } + + return outputFiles, nil +} + +// DeleteCompactedFiles removes the input files that were successfully compacted +func (e *DefaultCompactionExecutor) DeleteCompactedFiles(filePaths []string) error { + for _, path := range filePaths { + if err := os.Remove(path); err != nil { + return fmt.Errorf("failed to delete compacted file %s: %w", path, err) + } + } + return nil +} \ No newline at end of file diff --git a/pkg/compaction/file_tracker.go b/pkg/compaction/file_tracker.go new file mode 100644 index 0000000..c276ad5 --- /dev/null +++ b/pkg/compaction/file_tracker.go @@ -0,0 +1,95 @@ +package compaction + +import ( + "fmt" + "os" + "sync" +) + +// DefaultFileTracker is the default implementation of FileTracker +type DefaultFileTracker struct { + // Map of file path -> true for files that have been obsoleted by compaction + obsoleteFiles map[string]bool + + // Map of file path -> true for files that are currently being compacted + pendingFiles map[string]bool + + // Mutex for file tracking maps + filesMu sync.RWMutex +} + +// NewFileTracker creates a new file tracker +func NewFileTracker() *DefaultFileTracker { + return &DefaultFileTracker{ + obsoleteFiles: make(map[string]bool), + pendingFiles: make(map[string]bool), + } +} + +// MarkFileObsolete marks a file as obsolete (can be deleted) +func (f *DefaultFileTracker) MarkFileObsolete(path string) { + f.filesMu.Lock() + defer f.filesMu.Unlock() + + f.obsoleteFiles[path] = true +} + +// MarkFilePending marks a file as being used in a compaction +func (f *DefaultFileTracker) MarkFilePending(path string) { + f.filesMu.Lock() + defer f.filesMu.Unlock() + + f.pendingFiles[path] = true +} + +// UnmarkFilePending removes the pending mark from a file +func (f *DefaultFileTracker) UnmarkFilePending(path string) { + f.filesMu.Lock() + defer f.filesMu.Unlock() + + delete(f.pendingFiles, path) +} + +// IsFileObsolete checks if a file is marked as obsolete +func (f *DefaultFileTracker) IsFileObsolete(path string) bool { + f.filesMu.RLock() + defer f.filesMu.RUnlock() + + return f.obsoleteFiles[path] +} + +// IsFilePending checks if a file is marked as pending compaction +func (f *DefaultFileTracker) IsFilePending(path string) bool { + f.filesMu.RLock() + defer f.filesMu.RUnlock() + + return f.pendingFiles[path] +} + +// CleanupObsoleteFiles removes files that are no longer needed +func (f *DefaultFileTracker) CleanupObsoleteFiles() error { + f.filesMu.Lock() + defer f.filesMu.Unlock() + + // Safely remove obsolete files that aren't pending + for path := range f.obsoleteFiles { + // Skip files that are still being used in a compaction + if f.pendingFiles[path] { + continue + } + + // Try to delete the file + if err := os.Remove(path); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("failed to delete obsolete file %s: %w", path, err) + } + // If the file doesn't exist, remove it from our tracking + delete(f.obsoleteFiles, path) + } else { + // Successfully deleted, remove from tracking + delete(f.obsoleteFiles, path) + } + } + + return nil +} \ No newline at end of file diff --git a/pkg/compaction/interfaces.go b/pkg/compaction/interfaces.go new file mode 100644 index 0000000..3fec9ba --- /dev/null +++ b/pkg/compaction/interfaces.go @@ -0,0 +1,82 @@ +package compaction + +// CompactionStrategy defines the interface for selecting files for compaction +type CompactionStrategy interface { + // SelectCompaction selects files for compaction and returns a CompactionTask + SelectCompaction() (*CompactionTask, error) + + // CompactRange selects files within a key range for compaction + CompactRange(minKey, maxKey []byte) error + + // LoadSSTables reloads SSTable information from disk + LoadSSTables() error + + // Close closes any resources held by the strategy + Close() error +} + +// CompactionExecutor defines the interface for executing compaction tasks +type CompactionExecutor interface { + // CompactFiles performs the actual compaction of the input files + CompactFiles(task *CompactionTask) ([]string, error) + + // DeleteCompactedFiles removes the input files that were successfully compacted + DeleteCompactedFiles(filePaths []string) error +} + +// FileTracker defines the interface for tracking file states during compaction +type FileTracker interface { + // MarkFileObsolete marks a file as obsolete (can be deleted) + MarkFileObsolete(path string) + + // MarkFilePending marks a file as being used in a compaction + MarkFilePending(path string) + + // UnmarkFilePending removes the pending mark from a file + UnmarkFilePending(path string) + + // IsFileObsolete checks if a file is marked as obsolete + IsFileObsolete(path string) bool + + // IsFilePending checks if a file is marked as pending compaction + IsFilePending(path string) bool + + // CleanupObsoleteFiles removes files that are no longer needed + CleanupObsoleteFiles() error +} + +// TombstoneManager defines the interface for tracking and managing tombstones +type TombstoneManager interface { + // AddTombstone records a key deletion + AddTombstone(key []byte) + + // ForcePreserveTombstone marks a tombstone to be preserved indefinitely + ForcePreserveTombstone(key []byte) + + // ShouldKeepTombstone checks if a tombstone should be preserved during compaction + ShouldKeepTombstone(key []byte) bool + + // CollectGarbage removes expired tombstone records + CollectGarbage() +} + +// CompactionCoordinator defines the interface for coordinating compaction processes +type CompactionCoordinator interface { + // Start begins background compaction + Start() error + + // Stop halts background compaction + Stop() error + + // TriggerCompaction forces a compaction cycle + TriggerCompaction() error + + // CompactRange triggers compaction on a specific key range + CompactRange(minKey, maxKey []byte) error + + // TrackTombstone adds a key to the tombstone tracker + TrackTombstone(key []byte) + + // GetCompactionStats returns statistics about the compaction state + GetCompactionStats() map[string]interface{} +} \ No newline at end of file diff --git a/pkg/compaction/manager.go b/pkg/compaction/manager.go deleted file mode 100644 index 35b2bb1..0000000 --- a/pkg/compaction/manager.go +++ /dev/null @@ -1,413 +0,0 @@ -package compaction - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "git.canoozie.net/jer/go-storage/pkg/config" -) - -// CompactionManager coordinates the compaction process -type CompactionManager struct { - // Configuration - cfg *config.Config - - // SSTable directory - sstableDir string - - // Compactor implementation - compactor *TieredCompactor - - // Tombstone tracker - tombstones *TombstoneTracker - - // Next sequence number for SSTable files - nextSeq uint64 - - // Compaction state - running bool - stopCh chan struct{} - compactingMu sync.Mutex - - // File tracking - // Map of file path -> true for files that have been obsoleted by compaction - obsoleteFiles map[string]bool - // Map of file path -> true for files that are currently being compacted - pendingFiles map[string]bool - // Last set of files produced by compaction - lastCompactionOutputs []string - filesMu sync.RWMutex -} - -// NewCompactionManager creates a new compaction manager -func NewCompactionManager(cfg *config.Config, sstableDir string) *CompactionManager { - // Create tombstone tracker first - tombstones := NewTombstoneTracker(24 * time.Hour) // Default 24-hour retention - - return &CompactionManager{ - cfg: cfg, - sstableDir: sstableDir, - compactor: NewTieredCompactor(cfg, sstableDir, tombstones), - tombstones: tombstones, - nextSeq: 1, - stopCh: make(chan struct{}), - obsoleteFiles: make(map[string]bool), - pendingFiles: make(map[string]bool), - lastCompactionOutputs: make([]string, 0), - } -} - -// Start begins background compaction -func (cm *CompactionManager) Start() error { - cm.compactingMu.Lock() - defer cm.compactingMu.Unlock() - - if cm.running { - return nil // Already running - } - - // Load existing SSTables - if err := cm.compactor.LoadSSTables(); err != nil { - return fmt.Errorf("failed to load SSTables: %w", err) - } - - cm.running = true - cm.stopCh = make(chan struct{}) - - // Start background worker - go cm.compactionWorker() - - return nil -} - -// Stop halts background compaction -func (cm *CompactionManager) Stop() error { - cm.compactingMu.Lock() - defer cm.compactingMu.Unlock() - - if !cm.running { - return nil // Already stopped - } - - // Signal the worker to stop - close(cm.stopCh) - cm.running = false - - // Close compactor - return cm.compactor.Close() -} - -// MarkFileObsolete marks a file as obsolete -func (cm *CompactionManager) MarkFileObsolete(path string) { - cm.filesMu.Lock() - defer cm.filesMu.Unlock() - - cm.obsoleteFiles[path] = true -} - -// MarkFilePending marks a file as being used in a compaction -func (cm *CompactionManager) MarkFilePending(path string) { - cm.filesMu.Lock() - defer cm.filesMu.Unlock() - - cm.pendingFiles[path] = true -} - -// UnmarkFilePending removes the pending mark -func (cm *CompactionManager) UnmarkFilePending(path string) { - cm.filesMu.Lock() - defer cm.filesMu.Unlock() - - delete(cm.pendingFiles, path) -} - -// IsFileObsolete checks if a file is marked as obsolete -func (cm *CompactionManager) IsFileObsolete(path string) bool { - cm.filesMu.RLock() - defer cm.filesMu.RUnlock() - - return cm.obsoleteFiles[path] -} - -// IsFilePending checks if a file is pending compaction -func (cm *CompactionManager) IsFilePending(path string) bool { - cm.filesMu.RLock() - defer cm.filesMu.RUnlock() - - return cm.pendingFiles[path] -} - -// CleanupObsoleteFiles removes files that are no longer needed -func (cm *CompactionManager) CleanupObsoleteFiles() error { - cm.filesMu.Lock() - defer cm.filesMu.Unlock() - - // Safely remove obsolete files that aren't pending - for path := range cm.obsoleteFiles { - // Skip files that are still being used in a compaction - if cm.pendingFiles[path] { - continue - } - - // Try to delete the file - if err := os.Remove(path); err != nil { - if !os.IsNotExist(err) { - return fmt.Errorf("failed to delete obsolete file %s: %w", path, err) - } - // If the file doesn't exist, remove it from our tracking - delete(cm.obsoleteFiles, path) - } else { - // Successfully deleted, remove from tracking - delete(cm.obsoleteFiles, path) - } - } - - return nil -} - -// compactionWorker runs the compaction loop -func (cm *CompactionManager) compactionWorker() { - // Ensure a minimum interval of 1 second - interval := cm.cfg.CompactionInterval - if interval <= 0 { - interval = 1 - } - ticker := time.NewTicker(time.Duration(interval) * time.Second) - defer ticker.Stop() - - for { - select { - case <-cm.stopCh: - return - case <-ticker.C: - // Only one compaction at a time - cm.compactingMu.Lock() - - // Run a compaction cycle - err := cm.runCompactionCycle() - if err != nil { - // In a real system, we'd log this error - // fmt.Printf("Compaction error: %v\n", err) - } - - // Try to clean up obsolete files - err = cm.CleanupObsoleteFiles() - if err != nil { - // In a real system, we'd log this error - // fmt.Printf("Cleanup error: %v\n", err) - } - - // Collect tombstone garbage periodically - cm.tombstones.CollectGarbage() - - cm.compactingMu.Unlock() - } - } -} - -// runCompactionCycle performs a single compaction cycle -func (cm *CompactionManager) runCompactionCycle() error { - // Reload SSTables to get fresh information - if err := cm.compactor.LoadSSTables(); err != nil { - return fmt.Errorf("failed to load SSTables: %w", err) - } - - // Select files for compaction - task, err := cm.compactor.SelectCompaction() - if err != nil { - return fmt.Errorf("failed to select files for compaction: %w", err) - } - - // If no compaction needed, return - if task == nil { - return nil - } - - // Mark files as pending - for _, files := range task.InputFiles { - for _, file := range files { - cm.MarkFilePending(file.Path) - } - } - - // Perform compaction - outputFiles, err := cm.compactor.CompactFiles(task) - - // Unmark files as pending - for _, files := range task.InputFiles { - for _, file := range files { - cm.UnmarkFilePending(file.Path) - } - } - - // Track the compaction outputs for statistics - if err == nil && len(outputFiles) > 0 { - // Record the compaction result - cm.filesMu.Lock() - cm.lastCompactionOutputs = outputFiles - cm.filesMu.Unlock() - } - - // Handle compaction errors - if err != nil { - return fmt.Errorf("compaction failed: %w", err) - } - - // Mark input files as obsolete - for _, files := range task.InputFiles { - for _, file := range files { - cm.MarkFileObsolete(file.Path) - } - } - - // Try to clean up the files immediately - return cm.CleanupObsoleteFiles() -} - -// TriggerCompaction forces a compaction cycle -func (cm *CompactionManager) TriggerCompaction() error { - cm.compactingMu.Lock() - defer cm.compactingMu.Unlock() - - return cm.runCompactionCycle() -} - -// CompactRange triggers compaction on a specific key range -func (cm *CompactionManager) CompactRange(minKey, maxKey []byte) error { - cm.compactingMu.Lock() - defer cm.compactingMu.Unlock() - - // Load current SSTable information - if err := cm.compactor.LoadSSTables(); err != nil { - return fmt.Errorf("failed to load SSTables: %w", err) - } - - // Find files that overlap with the target range - rangeInfo := &SSTableInfo{ - FirstKey: minKey, - LastKey: maxKey, - } - - var filesToCompact []*SSTableInfo - for level, files := range cm.compactor.levels { - for _, file := range files { - if file.Overlaps(rangeInfo) { - // Add level information to the file - file.Level = level - filesToCompact = append(filesToCompact, file) - - // Mark as pending - cm.MarkFilePending(file.Path) - } - } - } - - // If no files to compact, we're done - if len(filesToCompact) == 0 { - return nil - } - - // Determine the target level - use the highest level found + 1 - targetLevel := 0 - for _, file := range filesToCompact { - if file.Level > targetLevel { - targetLevel = file.Level - } - } - targetLevel++ // Compact to next level - - // Create the compaction task - task := &CompactionTask{ - InputFiles: make(map[int][]*SSTableInfo), - TargetLevel: targetLevel, - OutputPathTemplate: filepath.Join(cm.sstableDir, "%d_%06d_%020d.sst"), - } - - // Group files by level - for _, file := range filesToCompact { - task.InputFiles[file.Level] = append(task.InputFiles[file.Level], file) - } - - // Perform the compaction - outputFiles, err := cm.compactor.CompactFiles(task) - - // Unmark files as pending - for _, file := range filesToCompact { - cm.UnmarkFilePending(file.Path) - } - - // Track the compaction outputs for statistics - if err == nil && len(outputFiles) > 0 { - cm.filesMu.Lock() - cm.lastCompactionOutputs = outputFiles - cm.filesMu.Unlock() - } - - // Handle errors - if err != nil { - return fmt.Errorf("failed to compact range: %w", err) - } - - // Mark input files as obsolete - for _, file := range filesToCompact { - cm.MarkFileObsolete(file.Path) - } - - // Try to clean up immediately - return cm.CleanupObsoleteFiles() -} - -// TrackTombstone adds a key to the tombstone tracker -func (cm *CompactionManager) TrackTombstone(key []byte) { - // Track the tombstone in our tracker - if cm.tombstones != nil { - cm.tombstones.AddTombstone(key) - } -} - -// ForcePreserveTombstone marks a tombstone for special handling during compaction -// This is primarily for testing purposes, to ensure specific tombstones are preserved -func (cm *CompactionManager) ForcePreserveTombstone(key []byte) { - if cm.tombstones != nil { - // Add with "force preserve" flag set to true - cm.tombstones.ForcePreserveTombstone(key) - } -} - -// GetCompactionStats returns statistics about the compaction state -func (cm *CompactionManager) GetCompactionStats() map[string]interface{} { - cm.filesMu.RLock() - defer cm.filesMu.RUnlock() - - stats := make(map[string]interface{}) - - // Count files by level - levelCounts := make(map[int]int) - levelSizes := make(map[int]int64) - - for level, files := range cm.compactor.levels { - levelCounts[level] = len(files) - - var totalSize int64 - for _, file := range files { - totalSize += file.Size - } - levelSizes[level] = totalSize - } - - stats["level_counts"] = levelCounts - stats["level_sizes"] = levelSizes - stats["obsolete_files"] = len(cm.obsoleteFiles) - stats["pending_files"] = len(cm.pendingFiles) - stats["last_outputs_count"] = len(cm.lastCompactionOutputs) - - // If there are recent compaction outputs, include information - if len(cm.lastCompactionOutputs) > 0 { - stats["last_outputs"] = cm.lastCompactionOutputs - } - - return stats -} \ No newline at end of file diff --git a/pkg/compaction/tiered.go b/pkg/compaction/tiered.go deleted file mode 100644 index 1e4e624..0000000 --- a/pkg/compaction/tiered.go +++ /dev/null @@ -1,397 +0,0 @@ -package compaction - -import ( - "bytes" - "fmt" - "path/filepath" - "sort" - "time" - - "git.canoozie.net/jer/go-storage/pkg/common/iterator" - "git.canoozie.net/jer/go-storage/pkg/common/iterator/composite" - "git.canoozie.net/jer/go-storage/pkg/config" - "git.canoozie.net/jer/go-storage/pkg/sstable" -) - -// TieredCompactor implements a tiered compaction strategy -type TieredCompactor struct { - *Compactor - - // Next file sequence number - nextFileSeq uint64 -} - -// NewTieredCompactor creates a new tiered compaction manager -func NewTieredCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *TieredCompactor { - return &TieredCompactor{ - Compactor: NewCompactor(cfg, sstableDir, tracker), - nextFileSeq: 1, - } -} - -// SelectCompaction selects files for tiered compaction -func (tc *TieredCompactor) SelectCompaction() (*CompactionTask, error) { - // Reload SSTable information - if err := tc.LoadSSTables(); err != nil { - return nil, fmt.Errorf("failed to load SSTables: %w", err) - } - - // Determine the maximum level - maxLevel := 0 - for level := range tc.levels { - if level > maxLevel { - maxLevel = level - } - } - - // Check L0 first (special case due to potential overlaps) - if len(tc.levels[0]) >= tc.cfg.MaxMemTables { - return tc.selectL0Compaction() - } - - // Check size-based conditions for other levels - for level := 0; level < maxLevel; level++ { - // If this level is too large compared to the next level - thisLevelSize := tc.GetLevelSize(level) - nextLevelSize := tc.GetLevelSize(level + 1) - - // If level is empty, skip it - if thisLevelSize == 0 { - continue - } - - // If next level is empty, promote a file - if nextLevelSize == 0 && len(tc.levels[level]) > 0 { - return tc.selectPromotionCompaction(level) - } - - // Check size ratio - sizeRatio := float64(thisLevelSize) / float64(nextLevelSize) - if sizeRatio >= tc.cfg.CompactionRatio { - return tc.selectOverlappingCompaction(level) - } - } - - // No compaction needed - return nil, nil -} - -// selectL0Compaction selects files from L0 for compaction -func (tc *TieredCompactor) selectL0Compaction() (*CompactionTask, error) { - // Require at least some files in L0 - if len(tc.levels[0]) < 2 { - return nil, nil - } - - // Sort L0 files by sequence number to prioritize older files - files := make([]*SSTableInfo, len(tc.levels[0])) - copy(files, tc.levels[0]) - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - - // Take up to maxCompactFiles from L0 - maxCompactFiles := tc.cfg.MaxMemTables - if maxCompactFiles > len(files) { - maxCompactFiles = len(files) - } - - selectedFiles := files[:maxCompactFiles] - - // Determine the key range covered by selected files - var minKey, maxKey []byte - for _, file := range selectedFiles { - if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 { - minKey = file.FirstKey - } - if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 { - maxKey = file.LastKey - } - } - - // Find overlapping files in L1 - var l1Files []*SSTableInfo - for _, file := range tc.levels[1] { - // Create a temporary SSTableInfo with the key range - rangeInfo := &SSTableInfo{ - FirstKey: minKey, - LastKey: maxKey, - } - - if file.Overlaps(rangeInfo) { - l1Files = append(l1Files, file) - } - } - - // Create the compaction task - task := &CompactionTask{ - InputFiles: map[int][]*SSTableInfo{ - 0: selectedFiles, - 1: l1Files, - }, - TargetLevel: 1, - OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"), - } - - return task, nil -} - -// selectPromotionCompaction selects a file to promote to the next level -func (tc *TieredCompactor) selectPromotionCompaction(level int) (*CompactionTask, error) { - // Sort files by sequence number - files := make([]*SSTableInfo, len(tc.levels[level])) - copy(files, tc.levels[level]) - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - - // Select the oldest file - file := files[0] - - // Create task to promote this file to the next level - // No need to merge with any other files since the next level is empty - task := &CompactionTask{ - InputFiles: map[int][]*SSTableInfo{ - level: {file}, - }, - TargetLevel: level + 1, - OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"), - } - - return task, nil -} - -// selectOverlappingCompaction selects files for compaction based on key overlap -func (tc *TieredCompactor) selectOverlappingCompaction(level int) (*CompactionTask, error) { - // Sort files by sequence number to start with oldest - files := make([]*SSTableInfo, len(tc.levels[level])) - copy(files, tc.levels[level]) - sort.Slice(files, func(i, j int) bool { - return files[i].Sequence < files[j].Sequence - }) - - // Select an initial file from this level - file := files[0] - - // Find all overlapping files in the next level - var nextLevelFiles []*SSTableInfo - for _, nextFile := range tc.levels[level+1] { - if file.Overlaps(nextFile) { - nextLevelFiles = append(nextLevelFiles, nextFile) - } - } - - // Create the compaction task - task := &CompactionTask{ - InputFiles: map[int][]*SSTableInfo{ - level: {file}, - level + 1: nextLevelFiles, - }, - TargetLevel: level + 1, - OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"), - } - - return task, nil -} - -// Compact performs compaction of the selected files -func (tc *TieredCompactor) Compact(task *CompactionTask) error { - if task == nil { - return nil // Nothing to compact - } - - // Perform the compaction - _, err := tc.CompactFiles(task) - if err != nil { - return fmt.Errorf("compaction failed: %w", err) - } - - // Gather all input file paths for cleanup - var inputPaths []string - for _, files := range task.InputFiles { - for _, file := range files { - inputPaths = append(inputPaths, file.Path) - } - } - - // Delete the original files that were compacted - if err := tc.DeleteCompactedFiles(inputPaths); err != nil { - return fmt.Errorf("failed to clean up compacted files: %w", err) - } - - // Reload SSTables to refresh our file list - if err := tc.LoadSSTables(); err != nil { - return fmt.Errorf("failed to reload SSTables: %w", err) - } - - return nil -} - -// CompactRange performs compaction on a specific key range -func (tc *TieredCompactor) CompactRange(minKey, maxKey []byte) error { - // Create a range info to check for overlaps - rangeInfo := &SSTableInfo{ - FirstKey: minKey, - LastKey: maxKey, - } - - // Find files overlapping with the given range in each level - task := &CompactionTask{ - InputFiles: make(map[int][]*SSTableInfo), - TargetLevel: 0, // Will be updated - OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"), - } - - // Get the maximum level - var maxLevel int - for level := range tc.levels { - if level > maxLevel { - maxLevel = level - } - } - - // Find overlapping files in each level - for level := 0; level <= maxLevel; level++ { - var overlappingFiles []*SSTableInfo - - for _, file := range tc.levels[level] { - if file.Overlaps(rangeInfo) { - overlappingFiles = append(overlappingFiles, file) - } - } - - if len(overlappingFiles) > 0 { - task.InputFiles[level] = overlappingFiles - } - } - - // If no files overlap with the range, no compaction needed - totalInputFiles := 0 - for _, files := range task.InputFiles { - totalInputFiles += len(files) - } - - if totalInputFiles == 0 { - return nil - } - - // Set target level to the maximum level + 1 - task.TargetLevel = maxLevel + 1 - - // Perform the compaction - return tc.Compact(task) -} - -// RunCompaction performs a full compaction cycle -func (tc *TieredCompactor) RunCompaction() error { - // Select files for compaction - task, err := tc.SelectCompaction() - if err != nil { - return fmt.Errorf("failed to select files for compaction: %w", err) - } - - // If no compaction needed, return - if task == nil { - return nil - } - - // Perform the compaction - return tc.Compact(task) -} - -// MergeCompact merges iterators from multiple SSTables and writes to new files -func (tc *TieredCompactor) MergeCompact(readers []*sstable.Reader, targetLevel int) ([]string, error) { - // Create iterators for all input files - var iterators []iterator.Iterator - for _, reader := range readers { - iterators = append(iterators, reader.NewIterator()) - } - - // Create a merged iterator - mergedIter := composite.NewHierarchicalIterator(iterators) - - // Create a new output file - timestamp := time.Now().UnixNano() - outputPath := filepath.Join(tc.sstableDir, - fmt.Sprintf("%d_%06d_%020d.sst", - targetLevel, tc.nextFileSeq, timestamp)) - tc.nextFileSeq++ - - writer, err := sstable.NewWriter(outputPath) - if err != nil { - return nil, fmt.Errorf("failed to create SSTable writer: %w", err) - } - - // Create a tombstone filter if we have a TombstoneTracker - // This is passed from CompactionManager to Compactor - var tombstoneFilter *BasicTombstoneFilter - if tc.tombstoneTracker != nil { - tombstoneFilter = NewBasicTombstoneFilter( - targetLevel, - tc.cfg.MaxLevelWithTombstones, - tc.tombstoneTracker, - ) - } - - // Write all entries from the merged iterator - var lastKey []byte - var entriesWritten int - - mergedIter.SeekToFirst() - for mergedIter.Valid() { - key := mergedIter.Key() - value := mergedIter.Value() - - // Skip duplicates - if lastKey != nil && bytes.Equal(key, lastKey) { - mergedIter.Next() - continue - } - - // Check if this is a tombstone entry (using the IsTombstone method) - isTombstone := mergedIter.IsTombstone() - - // Determine if we should keep this entry - // If we have a tombstone filter, use it, otherwise use the default logic - var shouldKeep bool - if tombstoneFilter != nil && isTombstone { - // Use the tombstone filter for tombstones - shouldKeep = tombstoneFilter.ShouldKeep(key, nil) - } else { - // Default logic - keep regular entries and tombstones in lower levels - shouldKeep = !isTombstone || targetLevel <= tc.cfg.MaxLevelWithTombstones - } - - if shouldKeep { - var err error - - // Use the explicit AddTombstone method if this is a tombstone - if isTombstone { - err = writer.AddTombstone(key) - } else { - err = writer.Add(key, value) - } - - if err != nil { - writer.Abort() - return nil, fmt.Errorf("failed to add entry to SSTable: %w", err) - } - entriesWritten++ - } - - lastKey = append(lastKey[:0], key...) - mergedIter.Next() - } - - // Finish writing - if entriesWritten > 0 { - if err := writer.Finish(); err != nil { - return nil, fmt.Errorf("failed to finish SSTable: %w", err) - } - return []string{outputPath}, nil - } - - // No entries written, abort the file - writer.Abort() - return nil, nil -} \ No newline at end of file diff --git a/pkg/compaction/tiered_strategy.go b/pkg/compaction/tiered_strategy.go new file mode 100644 index 0000000..6b3ac57 --- /dev/null +++ b/pkg/compaction/tiered_strategy.go @@ -0,0 +1,268 @@ +package compaction + +import ( + "bytes" + "fmt" + "path/filepath" + "sort" + + "git.canoozie.net/jer/go-storage/pkg/config" +) + +// TieredCompactionStrategy implements a tiered compaction strategy +type TieredCompactionStrategy struct { + *BaseCompactionStrategy + + // Executor for compacting files + executor CompactionExecutor + + // Next file sequence number + nextFileSeq uint64 +} + +// NewTieredCompactionStrategy creates a new tiered compaction strategy +func NewTieredCompactionStrategy(cfg *config.Config, sstableDir string, executor CompactionExecutor) *TieredCompactionStrategy { + return &TieredCompactionStrategy{ + BaseCompactionStrategy: NewBaseCompactionStrategy(cfg, sstableDir), + executor: executor, + nextFileSeq: 1, + } +} + +// SelectCompaction selects files for tiered compaction +func (s *TieredCompactionStrategy) SelectCompaction() (*CompactionTask, error) { + // Determine the maximum level + maxLevel := 0 + for level := range s.levels { + if level > maxLevel { + maxLevel = level + } + } + + // Check L0 first (special case due to potential overlaps) + if len(s.levels[0]) >= s.cfg.MaxMemTables { + return s.selectL0Compaction() + } + + // Check size-based conditions for other levels + for level := 0; level < maxLevel; level++ { + // If this level is too large compared to the next level + thisLevelSize := s.GetLevelSize(level) + nextLevelSize := s.GetLevelSize(level + 1) + + // If level is empty, skip it + if thisLevelSize == 0 { + continue + } + + // If next level is empty, promote a file + if nextLevelSize == 0 && len(s.levels[level]) > 0 { + return s.selectPromotionCompaction(level) + } + + // Check size ratio + sizeRatio := float64(thisLevelSize) / float64(nextLevelSize) + if sizeRatio >= s.cfg.CompactionRatio { + return s.selectOverlappingCompaction(level) + } + } + + // No compaction needed + return nil, nil +} + +// selectL0Compaction selects files from L0 for compaction +func (s *TieredCompactionStrategy) selectL0Compaction() (*CompactionTask, error) { + // Require at least some files in L0 + if len(s.levels[0]) < 2 { + return nil, nil + } + + // Sort L0 files by sequence number to prioritize older files + files := make([]*SSTableInfo, len(s.levels[0])) + copy(files, s.levels[0]) + sort.Slice(files, func(i, j int) bool { + return files[i].Sequence < files[j].Sequence + }) + + // Take up to maxCompactFiles from L0 + maxCompactFiles := s.cfg.MaxMemTables + if maxCompactFiles > len(files) { + maxCompactFiles = len(files) + } + + selectedFiles := files[:maxCompactFiles] + + // Determine the key range covered by selected files + var minKey, maxKey []byte + for _, file := range selectedFiles { + if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 { + minKey = file.FirstKey + } + if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 { + maxKey = file.LastKey + } + } + + // Find overlapping files in L1 + var l1Files []*SSTableInfo + for _, file := range s.levels[1] { + // Create a temporary SSTableInfo with the key range + rangeInfo := &SSTableInfo{ + FirstKey: minKey, + LastKey: maxKey, + } + + if file.Overlaps(rangeInfo) { + l1Files = append(l1Files, file) + } + } + + // Create the compaction task + task := &CompactionTask{ + InputFiles: map[int][]*SSTableInfo{ + 0: selectedFiles, + 1: l1Files, + }, + TargetLevel: 1, + OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"), + } + + return task, nil +} + +// selectPromotionCompaction selects a file to promote to the next level +func (s *TieredCompactionStrategy) selectPromotionCompaction(level int) (*CompactionTask, error) { + // Sort files by sequence number + files := make([]*SSTableInfo, len(s.levels[level])) + copy(files, s.levels[level]) + sort.Slice(files, func(i, j int) bool { + return files[i].Sequence < files[j].Sequence + }) + + // Select the oldest file + file := files[0] + + // Create task to promote this file to the next level + // No need to merge with any other files since the next level is empty + task := &CompactionTask{ + InputFiles: map[int][]*SSTableInfo{ + level: {file}, + }, + TargetLevel: level + 1, + OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"), + } + + return task, nil +} + +// selectOverlappingCompaction selects files for compaction based on key overlap +func (s *TieredCompactionStrategy) selectOverlappingCompaction(level int) (*CompactionTask, error) { + // Sort files by sequence number to start with oldest + files := make([]*SSTableInfo, len(s.levels[level])) + copy(files, s.levels[level]) + sort.Slice(files, func(i, j int) bool { + return files[i].Sequence < files[j].Sequence + }) + + // Select an initial file from this level + file := files[0] + + // Find all overlapping files in the next level + var nextLevelFiles []*SSTableInfo + for _, nextFile := range s.levels[level+1] { + if file.Overlaps(nextFile) { + nextLevelFiles = append(nextLevelFiles, nextFile) + } + } + + // Create the compaction task + task := &CompactionTask{ + InputFiles: map[int][]*SSTableInfo{ + level: {file}, + level + 1: nextLevelFiles, + }, + TargetLevel: level + 1, + OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"), + } + + return task, nil +} + +// CompactRange performs compaction on a specific key range +func (s *TieredCompactionStrategy) CompactRange(minKey, maxKey []byte) error { + // Create a range info to check for overlaps + rangeInfo := &SSTableInfo{ + FirstKey: minKey, + LastKey: maxKey, + } + + // Find files overlapping with the given range in each level + task := &CompactionTask{ + InputFiles: make(map[int][]*SSTableInfo), + TargetLevel: 0, // Will be updated + OutputPathTemplate: filepath.Join(s.sstableDir, "%d_%06d_%020d.sst"), + } + + // Get the maximum level + var maxLevel int + for level := range s.levels { + if level > maxLevel { + maxLevel = level + } + } + + // Find overlapping files in each level + for level := 0; level <= maxLevel; level++ { + var overlappingFiles []*SSTableInfo + + for _, file := range s.levels[level] { + if file.Overlaps(rangeInfo) { + overlappingFiles = append(overlappingFiles, file) + } + } + + if len(overlappingFiles) > 0 { + task.InputFiles[level] = overlappingFiles + } + } + + // If no files overlap with the range, no compaction needed + totalInputFiles := 0 + for _, files := range task.InputFiles { + totalInputFiles += len(files) + } + + if totalInputFiles == 0 { + return nil + } + + // Set target level to the maximum level + 1 + task.TargetLevel = maxLevel + 1 + + // Perform the compaction + _, err := s.executor.CompactFiles(task) + if err != nil { + return fmt.Errorf("compaction failed: %w", err) + } + + // Gather all input file paths for cleanup + var inputPaths []string + for _, files := range task.InputFiles { + for _, file := range files { + inputPaths = append(inputPaths, file.Path) + } + } + + // Delete the original files that were compacted + if err := s.executor.DeleteCompactedFiles(inputPaths); err != nil { + return fmt.Errorf("failed to clean up compacted files: %w", err) + } + + // Reload SSTables to refresh our file list + if err := s.LoadSSTables(); err != nil { + return fmt.Errorf("failed to reload SSTables: %w", err) + } + + return nil +} \ No newline at end of file diff --git a/pkg/compaction/tombstone.go b/pkg/compaction/tombstone.go index 970b5b7..73b9c2b 100644 --- a/pkg/compaction/tombstone.go +++ b/pkg/compaction/tombstone.go @@ -5,7 +5,7 @@ import ( "time" ) -// TombstoneTracker tracks tombstones to support garbage collection +// TombstoneTracker implements the TombstoneManager interface type TombstoneTracker struct { // Map of deleted keys with deletion timestamp deletions map[string]time.Time @@ -83,11 +83,11 @@ type BasicTombstoneFilter struct { maxTombstoneLevel int // The tombstone tracker (if any) - tracker *TombstoneTracker + tracker TombstoneManager } // NewBasicTombstoneFilter creates a new tombstone filter -func NewBasicTombstoneFilter(level, maxTombstoneLevel int, tracker *TombstoneTracker) *BasicTombstoneFilter { +func NewBasicTombstoneFilter(level, maxTombstoneLevel int, tracker TombstoneManager) *BasicTombstoneFilter { return &BasicTombstoneFilter{ level: level, maxTombstoneLevel: maxTombstoneLevel, diff --git a/pkg/config/config.go b/pkg/config/config.go index 4719750..b23bdcf 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -35,6 +35,7 @@ type Config struct { WALDir string `json:"wal_dir"` WALSyncMode SyncMode `json:"wal_sync_mode"` WALSyncBytes int64 `json:"wal_sync_bytes"` + WALMaxSize int64 `json:"wal_max_size"` // MemTable configuration MemTableSize int64 `json:"memtable_size"` diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 763f345..e2b77d3 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -23,6 +23,8 @@ const ( sstableFilenameFormat = "%d_%06d_%020d.sst" ) +// This has been moved to the wal package + var ( // ErrEngineClosed is returned when operations are performed on a closed engine ErrEngineClosed = errors.New("engine is closed") @@ -132,10 +134,28 @@ func NewEngine(dataDir string) (*Engine, error) { return nil, fmt.Errorf("failed to create wal directory: %w", err) } - // Create the WAL - wal, err := wal.NewWAL(cfg, walDir) + // During tests, disable logs to avoid interfering with example tests + tempWasDisabled := wal.DisableRecoveryLogs + if os.Getenv("GO_TEST") == "1" { + wal.DisableRecoveryLogs = true + defer func() { wal.DisableRecoveryLogs = tempWasDisabled }() + } + + // First try to reuse an existing WAL file + var walLogger *wal.WAL + + // We'll start with sequence 1, but this will be updated during recovery + walLogger, err = wal.ReuseWAL(cfg, walDir, 1) if err != nil { - return nil, fmt.Errorf("failed to create WAL: %w", err) + return nil, fmt.Errorf("failed to check for reusable WAL: %w", err) + } + + // If no suitable WAL found, create a new one + if walLogger == nil { + walLogger, err = wal.NewWAL(cfg, walDir) + if err != nil { + return nil, fmt.Errorf("failed to create WAL: %w", err) + } } // Create the MemTable pool @@ -146,7 +166,7 @@ func NewEngine(dataDir string) (*Engine, error) { dataDir: dataDir, sstableDir: sstableDir, walDir: walDir, - wal: wal, + wal: walLogger, memTablePool: memTablePool, immutableMTs: make([]*memtable.MemTable, 0), sstables: make([]*sstable.Reader, 0), @@ -159,6 +179,11 @@ func NewEngine(dataDir string) (*Engine, error) { return nil, fmt.Errorf("failed to load SSTables: %w", err) } + // Recover from WAL if any exist + if err := e.recoverFromWAL(); err != nil { + return nil, fmt.Errorf("failed to recover from WAL: %w", err) + } + // Start background flush goroutine go e.backgroundFlush() @@ -625,6 +650,104 @@ func (e *Engine) loadSSTables() error { return nil } +// recoverFromWAL recovers memtables from existing WAL files +func (e *Engine) recoverFromWAL() error { + // Check if WAL directory exists + if _, err := os.Stat(e.walDir); os.IsNotExist(err) { + return nil // No WAL directory, nothing to recover + } + + // List all WAL files for diagnostic purposes + walFiles, err := wal.FindWALFiles(e.walDir) + if err != nil { + if !wal.DisableRecoveryLogs { + fmt.Printf("Error listing WAL files: %v\n", err) + } + } else { + if !wal.DisableRecoveryLogs { + fmt.Printf("Found %d WAL files: %v\n", len(walFiles), walFiles) + } + } + + // Get recovery options + recoveryOpts := memtable.DefaultRecoveryOptions(e.cfg) + + // Recover memtables from WAL + memTables, maxSeqNum, err := memtable.RecoverFromWAL(e.cfg, recoveryOpts) + if err != nil { + // If recovery fails, let's try cleaning up WAL files + if !wal.DisableRecoveryLogs { + fmt.Printf("WAL recovery failed: %v\n", err) + fmt.Printf("Attempting to recover by cleaning up WAL files...\n") + } + + // Create a backup directory + backupDir := filepath.Join(e.walDir, "backup_"+time.Now().Format("20060102_150405")) + if err := os.MkdirAll(backupDir, 0755); err != nil { + if !wal.DisableRecoveryLogs { + fmt.Printf("Failed to create backup directory: %v\n", err) + } + return fmt.Errorf("failed to recover from WAL: %w", err) + } + + // Move problematic WAL files to backup + for _, walFile := range walFiles { + destFile := filepath.Join(backupDir, filepath.Base(walFile)) + if err := os.Rename(walFile, destFile); err != nil { + if !wal.DisableRecoveryLogs { + fmt.Printf("Failed to move WAL file %s: %v\n", walFile, err) + } + } else if !wal.DisableRecoveryLogs { + fmt.Printf("Moved problematic WAL file to %s\n", destFile) + } + } + + // Create a fresh WAL + newWal, err := wal.NewWAL(e.cfg, e.walDir) + if err != nil { + return fmt.Errorf("failed to create new WAL after recovery: %w", err) + } + e.wal = newWal + + // No memtables to recover, starting fresh + if !wal.DisableRecoveryLogs { + fmt.Printf("Starting with a fresh WAL after recovery failure\n") + } + return nil + } + + // No memtables recovered or empty WAL + if len(memTables) == 0 { + return nil + } + + // Update sequence numbers + e.lastSeqNum = maxSeqNum + + // Update WAL sequence number to continue from where we left off + if maxSeqNum > 0 { + e.wal.UpdateNextSequence(maxSeqNum + 1) + } + + // Add recovered memtables to the pool + for i, memTable := range memTables { + if i == len(memTables)-1 { + // The last memtable becomes the active one + e.memTablePool.SetActiveMemTable(memTable) + } else { + // Previous memtables become immutable + memTable.SetImmutable() + e.immutableMTs = append(e.immutableMTs, memTable) + } + } + + if !wal.DisableRecoveryLogs { + fmt.Printf("Recovered %d memtables from WAL with max sequence number %d\n", + len(memTables), maxSeqNum) + } + return nil +} + // GetRWLock returns the transaction lock for this engine func (e *Engine) GetRWLock() *sync.RWMutex { return &e.txLock diff --git a/pkg/engine/iterator.go b/pkg/engine/iterator.go index a003b1d..cf9c9b7 100644 --- a/pkg/engine/iterator.go +++ b/pkg/engine/iterator.go @@ -383,6 +383,25 @@ func newHierarchicalIterator(e *Engine) *boundedIterator { iters = append(iters, sstable.NewIteratorAdapter(e.sstables[i].NewIterator())) } + // Create sources list for all iterators + sources := make([]IterSource, 0, len(memTables)+len(e.sstables)) + + // Add sources for memtables + for i, table := range memTables { + sources = append(sources, &MemTableSource{ + mem: table, + level: i, // Assign level numbers starting from 0 (active memtable is newest) + }) + } + + // Add sources for SSTables + for i := len(e.sstables) - 1; i >= 0; i-- { + sources = append(sources, &SSTableSource{ + sst: e.sstables[i], + level: len(memTables) + (len(e.sstables) - 1 - i), // Continue level numbering after memtables + }) + } + // Wrap in a bounded iterator (unbounded by default) // If we have no iterators, use an empty one var baseIter iterator.Iterator @@ -391,8 +410,11 @@ func newHierarchicalIterator(e *Engine) *boundedIterator { } else if len(iters) == 1 { baseIter = iters[0] } else { - // Create a simple chained iterator for now that checks each source in order - baseIter = &chainedIterator{iterators: iters} + // Create a chained iterator that checks each source in order and handles duplicates + baseIter = &chainedIterator{ + iterators: iters, + sources: sources, + } } return &boundedIterator{ @@ -404,6 +426,7 @@ func newHierarchicalIterator(e *Engine) *boundedIterator { // chainedIterator is a simple iterator that checks multiple sources in order type chainedIterator struct { iterators []iterator.Iterator + sources []IterSource // Corresponding sources for each iterator current int } @@ -417,18 +440,41 @@ func (c *chainedIterator) SeekToFirst() { iter.SeekToFirst() } - // Find the first valid iterator with the smallest key - c.current = -1 - var smallestKey []byte + // Maps to track the best (newest) source for each key + keyToSource := make(map[string]int) // Key -> best source index + keyToLevel := make(map[string]int) // Key -> best source level (lower is better) + keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering) + // First pass: Find the best source for each key for i, iter := range c.iterators { if !iter.Valid() { continue } - if c.current == -1 || bytes.Compare(iter.Key(), smallestKey) < 0 { - c.current = i - smallestKey = iter.Key() + // Use string key for map + keyStr := string(iter.Key()) + keyBytes := iter.Key() + level := c.sources[i].GetLevel() + + // If we haven't seen this key yet, or this source is newer + bestLevel, seen := keyToLevel[keyStr] + if !seen || level < bestLevel { + keyToSource[keyStr] = i + keyToLevel[keyStr] = level + keyToPos[keyStr] = keyBytes + } + } + + // Find the smallest key in our deduplicated set + c.current = -1 + var smallestKey []byte + + for keyStr, sourceIdx := range keyToSource { + keyBytes := keyToPos[keyStr] + + if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 { + c.current = sourceIdx + smallestKey = keyBytes } } } @@ -469,18 +515,41 @@ func (c *chainedIterator) Seek(target []byte) bool { iter.Seek(target) } - // Find the first valid iterator with the smallest key >= target - c.current = -1 - var smallestKey []byte + // Maps to track the best (newest) source for each key + keyToSource := make(map[string]int) // Key -> best source index + keyToLevel := make(map[string]int) // Key -> best source level (lower is better) + keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering) + // First pass: Find the best source for each key for i, iter := range c.iterators { if !iter.Valid() { continue } - if c.current == -1 || bytes.Compare(iter.Key(), smallestKey) < 0 { - c.current = i - smallestKey = iter.Key() + // Use string key for map + keyStr := string(iter.Key()) + keyBytes := iter.Key() + level := c.sources[i].GetLevel() + + // If we haven't seen this key yet, or this source is newer + bestLevel, seen := keyToLevel[keyStr] + if !seen || level < bestLevel { + keyToSource[keyStr] = i + keyToLevel[keyStr] = level + keyToPos[keyStr] = keyBytes + } + } + + // Find the smallest key in our deduplicated set + c.current = -1 + var smallestKey []byte + + for keyStr, sourceIdx := range keyToSource { + keyBytes := keyToPos[keyStr] + + if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 { + c.current = sourceIdx + smallestKey = keyBytes } } @@ -502,18 +571,50 @@ func (c *chainedIterator) Next() bool { } } - // Find the next valid iterator with the smallest key + // Find the next key after the current one - we need to find the + // smallest key that is different from the current key c.current = -1 - var smallestKey []byte + var nextKey []byte + var bestLevel int = -1 + // First pass: Find the smallest key that's different from currentKey for i, iter := range c.iterators { if !iter.Valid() { continue } - if c.current == -1 || bytes.Compare(iter.Key(), smallestKey) < 0 { + // Skip if this iterator is still at the current key + if bytes.Equal(iter.Key(), currentKey) { + continue + } + + // If we haven't found a key yet, or this one is smaller + if nextKey == nil || bytes.Compare(iter.Key(), nextKey) < 0 { + nextKey = iter.Key() c.current = i - smallestKey = iter.Key() + bestLevel = c.sources[i].GetLevel() + } + } + + // If we found a next key, now find the newest version of that key + if nextKey != nil { + // Second pass: Find the newest version of nextKey (lowest level number) + for i, iter := range c.iterators { + if !iter.Valid() { + continue + } + + // Skip if this isn't a match for the next key + if !bytes.Equal(iter.Key(), nextKey) { + continue + } + + // If this source is newer (lower level number), use it instead + sourceLevel := c.sources[i].GetLevel() + if sourceLevel < bestLevel || bestLevel == -1 { + c.current = i + bestLevel = sourceLevel + } } } diff --git a/pkg/memtable/mempool.go b/pkg/memtable/mempool.go index b9ff728..e0ecc31 100644 --- a/pkg/memtable/mempool.go +++ b/pkg/memtable/mempool.go @@ -178,4 +178,19 @@ func (p *MemTablePool) TotalSize() int64 { } return total +} + +// SetActiveMemTable sets the active memtable (used for recovery) +func (p *MemTablePool) SetActiveMemTable(memTable *MemTable) { + p.mu.Lock() + defer p.mu.Unlock() + + // If there's already an active memtable, make it immutable + if p.active != nil && p.active.ApproximateSize() > 0 { + p.active.SetImmutable() + p.immutables = append(p.immutables, p.active) + } + + // Set the provided memtable as active + p.active = memTable } \ No newline at end of file diff --git a/pkg/transaction/example_test.go b/pkg/transaction/example_test.go index 03884b6..a447cf1 100644 --- a/pkg/transaction/example_test.go +++ b/pkg/transaction/example_test.go @@ -6,8 +6,14 @@ import ( "git.canoozie.net/jer/go-storage/pkg/engine" "git.canoozie.net/jer/go-storage/pkg/transaction" + "git.canoozie.net/jer/go-storage/pkg/wal" ) +// Disable all logs in tests +func init() { + wal.DisableRecoveryLogs = true +} + func Example() { // Create a temporary directory for the example tempDir, err := os.MkdirTemp("", "transaction_example_*") diff --git a/pkg/wal/reader.go b/pkg/wal/reader.go index 7f9eb84..df6373d 100644 --- a/pkg/wal/reader.go +++ b/pkg/wal/reader.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "sort" + "strings" ) // Reader reads entries from WAL files @@ -242,6 +243,30 @@ func FindWALFiles(dir string) ([]string, error) { } // ReplayWALFile replays a single WAL file and calls the handler for each entry +// getEntryCount counts the number of valid entries in a WAL file +func getEntryCount(path string) int { + reader, err := OpenReader(path) + if err != nil { + return 0 + } + defer reader.Close() + + count := 0 + for { + _, err := reader.ReadEntry() + if err != nil { + if err == io.EOF { + break + } + // Skip corrupted entries + continue + } + count++ + } + + return count +} + func ReplayWALFile(path string, handler EntryHandler) error { reader, err := OpenReader(path) if err != nil { @@ -249,20 +274,84 @@ func ReplayWALFile(path string, handler EntryHandler) error { } defer reader.Close() + // Track statistics for reporting + entriesProcessed := 0 + entriesSkipped := 0 + for { entry, err := reader.ReadEntry() if err != nil { if err == io.EOF { + // Reached the end of the file break } + + // Check if this is a corruption error + if strings.Contains(err.Error(), "corrupt") || + strings.Contains(err.Error(), "invalid") { + // Skip this corrupted entry + if !DisableRecoveryLogs { + fmt.Printf("Skipping corrupted entry in %s: %v\n", path, err) + } + entriesSkipped++ + + // If we've seen too many corrupted entries in a row, give up on this file + if entriesSkipped > 5 && entriesProcessed == 0 { + return fmt.Errorf("too many corrupted entries at start of file %s", path) + } + + // Try to recover by scanning ahead + // This is a very basic recovery mechanism that works by reading bytes + // until we find what looks like a valid header + recoverErr := recoverFromCorruption(reader) + if recoverErr != nil { + if recoverErr == io.EOF { + // Reached the end during recovery + break + } + // Couldn't recover + return fmt.Errorf("failed to recover from corruption in %s: %w", path, recoverErr) + } + + // Successfully recovered, continue to the next entry + continue + } + + // For other errors, fail the replay return fmt.Errorf("error reading entry from %s: %w", path, err) } + // Process the entry if err := handler(entry); err != nil { return fmt.Errorf("error handling entry: %w", err) } + + entriesProcessed++ } + if !DisableRecoveryLogs { + fmt.Printf("Processed %d entries from %s (skipped %d corrupted entries)\n", + entriesProcessed, path, entriesSkipped) + } + + return nil +} + +// recoverFromCorruption attempts to recover from a corrupted record by scanning ahead +func recoverFromCorruption(reader *Reader) error { + // Create a small buffer to read bytes one at a time + buf := make([]byte, 1) + + // Read up to 32KB ahead looking for a valid header + for i := 0; i < 32*1024; i++ { + _, err := reader.reader.Read(buf) + if err != nil { + return err + } + } + + // At this point, either we're at a valid position or we've skipped ahead + // Let the next ReadEntry attempt to parse from this position return nil } @@ -273,10 +362,47 @@ func ReplayWALDir(dir string, handler EntryHandler) error { return err } + // Track number of files processed successfully + successfulFiles := 0 + var lastErr error + + // Try to process each file, but continue on recoverable errors for _, file := range files { - if err := ReplayWALFile(file, handler); err != nil { - return err + err := ReplayWALFile(file, handler) + if err != nil { + if !DisableRecoveryLogs { + fmt.Printf("Error processing WAL file %s: %v\n", file, err) + } + + // Record the error, but continue + lastErr = err + + // Check if this is a file-level error or just a corrupt record + if !strings.Contains(err.Error(), "corrupt") && + !strings.Contains(err.Error(), "invalid") { + return fmt.Errorf("fatal error replaying WAL file %s: %w", file, err) + } + + // Continue to the next file for corrupt/invalid errors + continue } + + if !DisableRecoveryLogs { + fmt.Printf("Processed %d entries from %s (skipped 0 corrupted entries)\n", + getEntryCount(file), file) + } + + successfulFiles++ + } + + // If we processed at least one file successfully, the WAL recovery is considered successful + if successfulFiles > 0 { + return nil + } + + // If no files were processed successfully and we had errors, return the last error + if lastErr != nil { + return fmt.Errorf("failed to process any WAL files: %w", lastErr) } return nil diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go index 387ea80..441b0ec 100644 --- a/pkg/wal/wal.go +++ b/pkg/wal/wal.go @@ -56,6 +56,9 @@ type Entry struct { Value []byte } +// Global variable to control whether to print recovery logs +var DisableRecoveryLogs bool = false + // WAL represents a write-ahead log type WAL struct { cfg *config.Config @@ -101,6 +104,76 @@ func NewWAL(cfg *config.Config, dir string) (*WAL, error) { return wal, nil } +// ReuseWAL attempts to reuse an existing WAL file for appending +// Returns nil, nil if no suitable WAL file is found +func ReuseWAL(cfg *config.Config, dir string, nextSeq uint64) (*WAL, error) { + if cfg == nil { + return nil, errors.New("config cannot be nil") + } + + // Find existing WAL files + files, err := FindWALFiles(dir) + if err != nil { + return nil, fmt.Errorf("failed to find WAL files: %w", err) + } + + // No files found + if len(files) == 0 { + return nil, nil + } + + // Try the most recent one (last in sorted order) + latestWAL := files[len(files)-1] + + // Try to open for append + file, err := os.OpenFile(latestWAL, os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + // Don't log in tests + if !DisableRecoveryLogs { + fmt.Printf("Cannot open latest WAL for append: %v\n", err) + } + return nil, nil + } + + // Check if file is not too large + stat, err := file.Stat() + if err != nil { + file.Close() + return nil, fmt.Errorf("failed to stat WAL file: %w", err) + } + + // Define maximum WAL size to check against + maxWALSize := int64(64 * 1024 * 1024) // Default 64MB + if cfg.WALMaxSize > 0 { + maxWALSize = cfg.WALMaxSize + } + + if stat.Size() >= maxWALSize { + file.Close() + if !DisableRecoveryLogs { + fmt.Printf("Latest WAL file is too large to reuse (%d bytes)\n", stat.Size()) + } + return nil, nil + } + + if !DisableRecoveryLogs { + fmt.Printf("Reusing existing WAL file: %s with next sequence %d\n", + latestWAL, nextSeq) + } + + wal := &WAL{ + cfg: cfg, + dir: dir, + file: file, + writer: bufio.NewWriterSize(file, 64*1024), // 64KB buffer + nextSequence: nextSeq, + bytesWritten: stat.Size(), + lastSync: time.Now(), + } + + return wal, nil +} + // Append adds an entry to the WAL func (w *WAL) Append(entryType uint8, key, value []byte) (uint64, error) { w.mu.Lock() @@ -450,6 +523,17 @@ func (w *WAL) Close() error { return nil } +// UpdateNextSequence sets the next sequence number for the WAL +// This is used after recovery to ensure new entries have increasing sequence numbers +func (w *WAL) UpdateNextSequence(nextSeq uint64) { + w.mu.Lock() + defer w.mu.Unlock() + + if nextSeq > w.nextSequence { + w.nextSequence = nextSeq + } +} + func min(a, b int) int { if a < b { return a