kevo/pkg/compaction/tiered.go
Jeremy Tregunna 62c40ba608
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m36s
refactor: cleaning up iterators, SRP, common interface control, etc.
2025-04-20 11:28:49 -06:00

397 lines
10 KiB
Go

package compaction
import (
"bytes"
"fmt"
"path/filepath"
"sort"
"time"
"git.canoozie.net/jer/go-storage/pkg/common/iterator"
"git.canoozie.net/jer/go-storage/pkg/common/iterator/composite"
"git.canoozie.net/jer/go-storage/pkg/config"
"git.canoozie.net/jer/go-storage/pkg/sstable"
)
// TieredCompactor implements a tiered compaction strategy
type TieredCompactor struct {
*Compactor
// Next file sequence number
nextFileSeq uint64
}
// NewTieredCompactor creates a new tiered compaction manager
func NewTieredCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *TieredCompactor {
return &TieredCompactor{
Compactor: NewCompactor(cfg, sstableDir, tracker),
nextFileSeq: 1,
}
}
// SelectCompaction selects files for tiered compaction
func (tc *TieredCompactor) SelectCompaction() (*CompactionTask, error) {
// Reload SSTable information
if err := tc.LoadSSTables(); err != nil {
return nil, fmt.Errorf("failed to load SSTables: %w", err)
}
// Determine the maximum level
maxLevel := 0
for level := range tc.levels {
if level > maxLevel {
maxLevel = level
}
}
// Check L0 first (special case due to potential overlaps)
if len(tc.levels[0]) >= tc.cfg.MaxMemTables {
return tc.selectL0Compaction()
}
// Check size-based conditions for other levels
for level := 0; level < maxLevel; level++ {
// If this level is too large compared to the next level
thisLevelSize := tc.GetLevelSize(level)
nextLevelSize := tc.GetLevelSize(level + 1)
// If level is empty, skip it
if thisLevelSize == 0 {
continue
}
// If next level is empty, promote a file
if nextLevelSize == 0 && len(tc.levels[level]) > 0 {
return tc.selectPromotionCompaction(level)
}
// Check size ratio
sizeRatio := float64(thisLevelSize) / float64(nextLevelSize)
if sizeRatio >= tc.cfg.CompactionRatio {
return tc.selectOverlappingCompaction(level)
}
}
// No compaction needed
return nil, nil
}
// selectL0Compaction selects files from L0 for compaction
func (tc *TieredCompactor) selectL0Compaction() (*CompactionTask, error) {
// Require at least some files in L0
if len(tc.levels[0]) < 2 {
return nil, nil
}
// Sort L0 files by sequence number to prioritize older files
files := make([]*SSTableInfo, len(tc.levels[0]))
copy(files, tc.levels[0])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Take up to maxCompactFiles from L0
maxCompactFiles := tc.cfg.MaxMemTables
if maxCompactFiles > len(files) {
maxCompactFiles = len(files)
}
selectedFiles := files[:maxCompactFiles]
// Determine the key range covered by selected files
var minKey, maxKey []byte
for _, file := range selectedFiles {
if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 {
minKey = file.FirstKey
}
if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 {
maxKey = file.LastKey
}
}
// Find overlapping files in L1
var l1Files []*SSTableInfo
for _, file := range tc.levels[1] {
// Create a temporary SSTableInfo with the key range
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
if file.Overlaps(rangeInfo) {
l1Files = append(l1Files, file)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
0: selectedFiles,
1: l1Files,
},
TargetLevel: 1,
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectPromotionCompaction selects a file to promote to the next level
func (tc *TieredCompactor) selectPromotionCompaction(level int) (*CompactionTask, error) {
// Sort files by sequence number
files := make([]*SSTableInfo, len(tc.levels[level]))
copy(files, tc.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select the oldest file
file := files[0]
// Create task to promote this file to the next level
// No need to merge with any other files since the next level is empty
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// selectOverlappingCompaction selects files for compaction based on key overlap
func (tc *TieredCompactor) selectOverlappingCompaction(level int) (*CompactionTask, error) {
// Sort files by sequence number to start with oldest
files := make([]*SSTableInfo, len(tc.levels[level]))
copy(files, tc.levels[level])
sort.Slice(files, func(i, j int) bool {
return files[i].Sequence < files[j].Sequence
})
// Select an initial file from this level
file := files[0]
// Find all overlapping files in the next level
var nextLevelFiles []*SSTableInfo
for _, nextFile := range tc.levels[level+1] {
if file.Overlaps(nextFile) {
nextLevelFiles = append(nextLevelFiles, nextFile)
}
}
// Create the compaction task
task := &CompactionTask{
InputFiles: map[int][]*SSTableInfo{
level: {file},
level + 1: nextLevelFiles,
},
TargetLevel: level + 1,
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
return task, nil
}
// Compact performs compaction of the selected files
func (tc *TieredCompactor) Compact(task *CompactionTask) error {
if task == nil {
return nil // Nothing to compact
}
// Perform the compaction
_, err := tc.CompactFiles(task)
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// Gather all input file paths for cleanup
var inputPaths []string
for _, files := range task.InputFiles {
for _, file := range files {
inputPaths = append(inputPaths, file.Path)
}
}
// Delete the original files that were compacted
if err := tc.DeleteCompactedFiles(inputPaths); err != nil {
return fmt.Errorf("failed to clean up compacted files: %w", err)
}
// Reload SSTables to refresh our file list
if err := tc.LoadSSTables(); err != nil {
return fmt.Errorf("failed to reload SSTables: %w", err)
}
return nil
}
// CompactRange performs compaction on a specific key range
func (tc *TieredCompactor) CompactRange(minKey, maxKey []byte) error {
// Create a range info to check for overlaps
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
// Find files overlapping with the given range in each level
task := &CompactionTask{
InputFiles: make(map[int][]*SSTableInfo),
TargetLevel: 0, // Will be updated
OutputPathTemplate: filepath.Join(tc.sstableDir, "%d_%06d_%020d.sst"),
}
// Get the maximum level
var maxLevel int
for level := range tc.levels {
if level > maxLevel {
maxLevel = level
}
}
// Find overlapping files in each level
for level := 0; level <= maxLevel; level++ {
var overlappingFiles []*SSTableInfo
for _, file := range tc.levels[level] {
if file.Overlaps(rangeInfo) {
overlappingFiles = append(overlappingFiles, file)
}
}
if len(overlappingFiles) > 0 {
task.InputFiles[level] = overlappingFiles
}
}
// If no files overlap with the range, no compaction needed
totalInputFiles := 0
for _, files := range task.InputFiles {
totalInputFiles += len(files)
}
if totalInputFiles == 0 {
return nil
}
// Set target level to the maximum level + 1
task.TargetLevel = maxLevel + 1
// Perform the compaction
return tc.Compact(task)
}
// RunCompaction performs a full compaction cycle
func (tc *TieredCompactor) RunCompaction() error {
// Select files for compaction
task, err := tc.SelectCompaction()
if err != nil {
return fmt.Errorf("failed to select files for compaction: %w", err)
}
// If no compaction needed, return
if task == nil {
return nil
}
// Perform the compaction
return tc.Compact(task)
}
// MergeCompact merges iterators from multiple SSTables and writes to new files
func (tc *TieredCompactor) MergeCompact(readers []*sstable.Reader, targetLevel int) ([]string, error) {
// Create iterators for all input files
var iterators []iterator.Iterator
for _, reader := range readers {
iterators = append(iterators, reader.NewIterator())
}
// Create a merged iterator
mergedIter := composite.NewHierarchicalIterator(iterators)
// Create a new output file
timestamp := time.Now().UnixNano()
outputPath := filepath.Join(tc.sstableDir,
fmt.Sprintf("%d_%06d_%020d.sst",
targetLevel, tc.nextFileSeq, timestamp))
tc.nextFileSeq++
writer, err := sstable.NewWriter(outputPath)
if err != nil {
return nil, fmt.Errorf("failed to create SSTable writer: %w", err)
}
// Create a tombstone filter if we have a TombstoneTracker
// This is passed from CompactionManager to Compactor
var tombstoneFilter *BasicTombstoneFilter
if tc.tombstoneTracker != nil {
tombstoneFilter = NewBasicTombstoneFilter(
targetLevel,
tc.cfg.MaxLevelWithTombstones,
tc.tombstoneTracker,
)
}
// Write all entries from the merged iterator
var lastKey []byte
var entriesWritten int
mergedIter.SeekToFirst()
for mergedIter.Valid() {
key := mergedIter.Key()
value := mergedIter.Value()
// Skip duplicates
if lastKey != nil && bytes.Equal(key, lastKey) {
mergedIter.Next()
continue
}
// Check if this is a tombstone entry (using the IsTombstone method)
isTombstone := mergedIter.IsTombstone()
// Determine if we should keep this entry
// If we have a tombstone filter, use it, otherwise use the default logic
var shouldKeep bool
if tombstoneFilter != nil && isTombstone {
// Use the tombstone filter for tombstones
shouldKeep = tombstoneFilter.ShouldKeep(key, nil)
} else {
// Default logic - keep regular entries and tombstones in lower levels
shouldKeep = !isTombstone || targetLevel <= tc.cfg.MaxLevelWithTombstones
}
if shouldKeep {
var err error
// Use the explicit AddTombstone method if this is a tombstone
if isTombstone {
err = writer.AddTombstone(key)
} else {
err = writer.Add(key, value)
}
if err != nil {
writer.Abort()
return nil, fmt.Errorf("failed to add entry to SSTable: %w", err)
}
entriesWritten++
}
lastKey = append(lastKey[:0], key...)
mergedIter.Next()
}
// Finish writing
if entriesWritten > 0 {
if err := writer.Finish(); err != nil {
return nil, fmt.Errorf("failed to finish SSTable: %w", err)
}
return []string{outputPath}, nil
}
// No entries written, abort the file
writer.Abort()
return nil, nil
}