All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m36s
534 lines
14 KiB
Go
534 lines
14 KiB
Go
package compaction
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.canoozie.net/jer/go-storage/pkg/common/iterator"
|
|
"git.canoozie.net/jer/go-storage/pkg/common/iterator/composite"
|
|
"git.canoozie.net/jer/go-storage/pkg/config"
|
|
"git.canoozie.net/jer/go-storage/pkg/sstable"
|
|
)
|
|
|
|
// SSTableInfo represents metadata about an SSTable file
|
|
type SSTableInfo struct {
|
|
// Path of the SSTable file
|
|
Path string
|
|
|
|
// Level number (0 to N)
|
|
Level int
|
|
|
|
// Sequence number for the file within its level
|
|
Sequence uint64
|
|
|
|
// Timestamp when the file was created
|
|
Timestamp int64
|
|
|
|
// Approximate size of the file in bytes
|
|
Size int64
|
|
|
|
// Estimated key count (may be approximate)
|
|
KeyCount int
|
|
|
|
// First key in the SSTable
|
|
FirstKey []byte
|
|
|
|
// Last key in the SSTable
|
|
LastKey []byte
|
|
|
|
// Reader for the SSTable
|
|
Reader *sstable.Reader
|
|
}
|
|
|
|
// Overlaps checks if this SSTable's key range overlaps with another SSTable
|
|
func (s *SSTableInfo) Overlaps(other *SSTableInfo) bool {
|
|
// If either SSTable has no keys, they don't overlap
|
|
if len(s.FirstKey) == 0 || len(s.LastKey) == 0 ||
|
|
len(other.FirstKey) == 0 || len(other.LastKey) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Check for overlap: not (s ends before other starts OR s starts after other ends)
|
|
// s.LastKey < other.FirstKey || s.FirstKey > other.LastKey
|
|
return !(bytes.Compare(s.LastKey, other.FirstKey) < 0 ||
|
|
bytes.Compare(s.FirstKey, other.LastKey) > 0)
|
|
}
|
|
|
|
// KeyRange returns a string representation of the key range in this SSTable
|
|
func (s *SSTableInfo) KeyRange() string {
|
|
return fmt.Sprintf("[%s, %s]",
|
|
string(s.FirstKey), string(s.LastKey))
|
|
}
|
|
|
|
// String returns a string representation of the SSTable info
|
|
func (s *SSTableInfo) String() string {
|
|
return fmt.Sprintf("L%d-%06d-%020d.sst Size:%d Keys:%d Range:%s",
|
|
s.Level, s.Sequence, s.Timestamp, s.Size, s.KeyCount, s.KeyRange())
|
|
}
|
|
|
|
// CompactionTask represents a set of SSTables to be compacted
|
|
type CompactionTask struct {
|
|
// Input SSTables to compact, grouped by level
|
|
InputFiles map[int][]*SSTableInfo
|
|
|
|
// Target level for compaction output
|
|
TargetLevel int
|
|
|
|
// Output file path template
|
|
OutputPathTemplate string
|
|
}
|
|
|
|
// Compactor manages the compaction process
|
|
type Compactor struct {
|
|
// Configuration
|
|
cfg *config.Config
|
|
|
|
// SSTable directory
|
|
sstableDir string
|
|
|
|
// File information by level
|
|
levels map[int][]*SSTableInfo
|
|
|
|
// Tombstone tracking
|
|
tombstoneTracker *TombstoneTracker
|
|
}
|
|
|
|
// NewCompactor creates a new compaction manager
|
|
func NewCompactor(cfg *config.Config, sstableDir string, tracker *TombstoneTracker) *Compactor {
|
|
return &Compactor{
|
|
cfg: cfg,
|
|
sstableDir: sstableDir,
|
|
levels: make(map[int][]*SSTableInfo),
|
|
tombstoneTracker: tracker,
|
|
}
|
|
}
|
|
|
|
// LoadSSTables scans the SSTable directory and loads metadata for all files
|
|
func (c *Compactor) LoadSSTables() error {
|
|
// Clear existing data
|
|
c.levels = make(map[int][]*SSTableInfo)
|
|
|
|
// Read all files from the SSTable directory
|
|
entries, err := os.ReadDir(c.sstableDir)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil // Directory doesn't exist yet
|
|
}
|
|
return fmt.Errorf("failed to read SSTable directory: %w", err)
|
|
}
|
|
|
|
// Parse filenames and collect information
|
|
for _, entry := range entries {
|
|
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sst") {
|
|
continue // Skip directories and non-SSTable files
|
|
}
|
|
|
|
// Parse filename to extract level, sequence, and timestamp
|
|
// Filename format: level_sequence_timestamp.sst
|
|
var level int
|
|
var sequence uint64
|
|
var timestamp int64
|
|
|
|
if n, err := fmt.Sscanf(entry.Name(), "%d_%06d_%020d.sst",
|
|
&level, &sequence, ×tamp); n != 3 || err != nil {
|
|
// Skip files that don't match our naming pattern
|
|
continue
|
|
}
|
|
|
|
// Get file info for size
|
|
fi, err := entry.Info()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get file info for %s: %w", entry.Name(), err)
|
|
}
|
|
|
|
// Open the file to extract key range information
|
|
path := filepath.Join(c.sstableDir, entry.Name())
|
|
reader, err := sstable.OpenReader(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open SSTable %s: %w", path, err)
|
|
}
|
|
|
|
// Create iterator to get first and last keys
|
|
iter := reader.NewIterator()
|
|
var firstKey, lastKey []byte
|
|
|
|
// Get first key
|
|
iter.SeekToFirst()
|
|
if iter.Valid() {
|
|
firstKey = append([]byte{}, iter.Key()...)
|
|
}
|
|
|
|
// Get last key
|
|
iter.SeekToLast()
|
|
if iter.Valid() {
|
|
lastKey = append([]byte{}, iter.Key()...)
|
|
}
|
|
|
|
// Create SSTable info
|
|
info := &SSTableInfo{
|
|
Path: path,
|
|
Level: level,
|
|
Sequence: sequence,
|
|
Timestamp: timestamp,
|
|
Size: fi.Size(),
|
|
KeyCount: reader.GetKeyCount(),
|
|
FirstKey: firstKey,
|
|
LastKey: lastKey,
|
|
Reader: reader,
|
|
}
|
|
|
|
// Add to appropriate level
|
|
c.levels[level] = append(c.levels[level], info)
|
|
}
|
|
|
|
// Sort files within each level by sequence number
|
|
for level, files := range c.levels {
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Sequence < files[j].Sequence
|
|
})
|
|
c.levels[level] = files
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes all open SSTable readers
|
|
func (c *Compactor) Close() error {
|
|
var lastErr error
|
|
|
|
for _, files := range c.levels {
|
|
for _, file := range files {
|
|
if file.Reader != nil {
|
|
if err := file.Reader.Close(); err != nil && lastErr == nil {
|
|
lastErr = err
|
|
}
|
|
file.Reader = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// GetLevelSize returns the total size of all files in a level
|
|
func (c *Compactor) GetLevelSize(level int) int64 {
|
|
var size int64
|
|
for _, file := range c.levels[level] {
|
|
size += file.Size
|
|
}
|
|
return size
|
|
}
|
|
|
|
// GetCompactionTask selects files for compaction based on size ratio and overlap
|
|
func (c *Compactor) GetCompactionTask() (*CompactionTask, error) {
|
|
// No compaction if we don't have at least 2 levels with files
|
|
if len(c.levels) < 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Check if L0 needs compaction (Level 0 is special because files may overlap)
|
|
if len(c.levels[0]) >= c.cfg.MaxMemTables {
|
|
return c.selectLevel0Compaction()
|
|
}
|
|
|
|
// Check higher levels based on size ratio
|
|
maxLevel := 0
|
|
for level := range c.levels {
|
|
if level > maxLevel {
|
|
maxLevel = level
|
|
}
|
|
}
|
|
|
|
// Check each level starting from 1
|
|
for level := 1; level <= maxLevel; level++ {
|
|
// If this level is too large compared to the next level
|
|
nextLevelSize := c.GetLevelSize(level + 1)
|
|
thisLevelSize := c.GetLevelSize(level)
|
|
|
|
// If this level is empty, skip it
|
|
if thisLevelSize == 0 {
|
|
continue
|
|
}
|
|
|
|
// If the next level doesn't exist yet or is empty, create it
|
|
if nextLevelSize == 0 {
|
|
// Choose one file from this level to move to the next level
|
|
if len(c.levels[level]) > 0 {
|
|
return c.selectSingleFileCompaction(level)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Check if this level exceeds the size ratio threshold
|
|
sizeRatio := float64(thisLevelSize) / float64(nextLevelSize)
|
|
if sizeRatio >= c.cfg.CompactionRatio {
|
|
return c.selectSizeBasedCompaction(level)
|
|
}
|
|
}
|
|
|
|
// No compaction needed
|
|
return nil, nil
|
|
}
|
|
|
|
// selectLevel0Compaction selects files from L0 for compaction
|
|
func (c *Compactor) selectLevel0Compaction() (*CompactionTask, error) {
|
|
// Not enough files in L0 for compaction
|
|
if len(c.levels[0]) < 2 {
|
|
return nil, nil
|
|
}
|
|
|
|
// For L0, take oldest files and compact them to L1
|
|
numFiles := len(c.levels[0])
|
|
maxCompactFiles := c.cfg.MaxMemTables
|
|
if numFiles > maxCompactFiles {
|
|
numFiles = maxCompactFiles
|
|
}
|
|
|
|
// Sort L0 files by sequence number to get oldest first
|
|
files := make([]*SSTableInfo, len(c.levels[0]))
|
|
copy(files, c.levels[0])
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Sequence < files[j].Sequence
|
|
})
|
|
|
|
// Select oldest files for compaction
|
|
selectedFiles := files[:numFiles]
|
|
|
|
// Determine key range of selected files
|
|
var minKey, maxKey []byte
|
|
for _, file := range selectedFiles {
|
|
if len(minKey) == 0 || bytes.Compare(file.FirstKey, minKey) < 0 {
|
|
minKey = file.FirstKey
|
|
}
|
|
if len(maxKey) == 0 || bytes.Compare(file.LastKey, maxKey) > 0 {
|
|
maxKey = file.LastKey
|
|
}
|
|
}
|
|
|
|
// Find overlapping files in L1
|
|
var l1Files []*SSTableInfo
|
|
for _, file := range c.levels[1] {
|
|
// Create a temporary SSTableInfo for the key range
|
|
rangeInfo := &SSTableInfo{
|
|
FirstKey: minKey,
|
|
LastKey: maxKey,
|
|
}
|
|
|
|
if file.Overlaps(rangeInfo) {
|
|
l1Files = append(l1Files, file)
|
|
}
|
|
}
|
|
|
|
// Create the compaction task
|
|
task := &CompactionTask{
|
|
InputFiles: map[int][]*SSTableInfo{
|
|
0: selectedFiles,
|
|
1: l1Files,
|
|
},
|
|
TargetLevel: 1,
|
|
OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"),
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
// selectSingleFileCompaction selects a single file to compact to the next level
|
|
func (c *Compactor) selectSingleFileCompaction(level int) (*CompactionTask, error) {
|
|
// Find the oldest file in this level
|
|
if len(c.levels[level]) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Sort by sequence number to get the oldest file
|
|
files := make([]*SSTableInfo, len(c.levels[level]))
|
|
copy(files, c.levels[level])
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Sequence < files[j].Sequence
|
|
})
|
|
|
|
// Select the oldest file
|
|
file := files[0]
|
|
|
|
// Find overlapping files in the next level
|
|
var nextLevelFiles []*SSTableInfo
|
|
for _, nextFile := range c.levels[level+1] {
|
|
if file.Overlaps(nextFile) {
|
|
nextLevelFiles = append(nextLevelFiles, nextFile)
|
|
}
|
|
}
|
|
|
|
// Create the compaction task
|
|
task := &CompactionTask{
|
|
InputFiles: map[int][]*SSTableInfo{
|
|
level: {file},
|
|
level + 1: nextLevelFiles,
|
|
},
|
|
TargetLevel: level + 1,
|
|
OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"),
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
// selectSizeBasedCompaction selects files for compaction based on size ratio
|
|
func (c *Compactor) selectSizeBasedCompaction(level int) (*CompactionTask, error) {
|
|
// Find a file from this level to compact
|
|
if len(c.levels[level]) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Choose a file that hasn't been compacted recently
|
|
// For simplicity, just choose the oldest file (lowest sequence number)
|
|
files := make([]*SSTableInfo, len(c.levels[level]))
|
|
copy(files, c.levels[level])
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Sequence < files[j].Sequence
|
|
})
|
|
|
|
// Select the oldest file
|
|
file := files[0]
|
|
|
|
// Find overlapping files in the next level
|
|
var nextLevelFiles []*SSTableInfo
|
|
for _, nextFile := range c.levels[level+1] {
|
|
if file.Overlaps(nextFile) {
|
|
nextLevelFiles = append(nextLevelFiles, nextFile)
|
|
}
|
|
}
|
|
|
|
// Create the compaction task
|
|
task := &CompactionTask{
|
|
InputFiles: map[int][]*SSTableInfo{
|
|
level: {file},
|
|
level + 1: nextLevelFiles,
|
|
},
|
|
TargetLevel: level + 1,
|
|
OutputPathTemplate: filepath.Join(c.sstableDir, "%d_%06d_%020d.sst"),
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
// CompactFiles performs the actual compaction of the input files
|
|
func (c *Compactor) CompactFiles(task *CompactionTask) ([]string, error) {
|
|
// Create a merged iterator over all input files
|
|
var iterators []iterator.Iterator
|
|
|
|
// Add iterators from both levels
|
|
for level := 0; level <= task.TargetLevel; level++ {
|
|
for _, file := range task.InputFiles[level] {
|
|
// We need an iterator that preserves delete markers
|
|
if file.Reader != nil {
|
|
iterators = append(iterators, file.Reader.NewIterator())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create hierarchical merged iterator
|
|
mergedIter := composite.NewHierarchicalIterator(iterators)
|
|
|
|
// Remember the input file paths for later cleanup
|
|
var inputPaths []string
|
|
for _, files := range task.InputFiles {
|
|
for _, file := range files {
|
|
inputPaths = append(inputPaths, file.Path)
|
|
}
|
|
}
|
|
|
|
// Track keys to skip duplicate entries (for tombstones)
|
|
var lastKey []byte
|
|
var outputFiles []string
|
|
var currentWriter *sstable.Writer
|
|
var currentOutputPath string
|
|
var outputFileSequence uint64 = 1
|
|
var entriesInCurrentFile int
|
|
|
|
// Function to create a new output file
|
|
createNewOutputFile := func() error {
|
|
if currentWriter != nil {
|
|
if err := currentWriter.Finish(); err != nil {
|
|
return fmt.Errorf("failed to finish SSTable: %w", err)
|
|
}
|
|
outputFiles = append(outputFiles, currentOutputPath)
|
|
}
|
|
|
|
// Create a new output file
|
|
timestamp := time.Now().UnixNano()
|
|
currentOutputPath = fmt.Sprintf(task.OutputPathTemplate,
|
|
task.TargetLevel, outputFileSequence, timestamp)
|
|
outputFileSequence++
|
|
|
|
var err error
|
|
currentWriter, err = sstable.NewWriter(currentOutputPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create SSTable writer: %w", err)
|
|
}
|
|
|
|
entriesInCurrentFile = 0
|
|
return nil
|
|
}
|
|
|
|
// Create the first output file
|
|
if err := createNewOutputFile(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Iterate through all keys in sorted order
|
|
mergedIter.SeekToFirst()
|
|
for mergedIter.Valid() {
|
|
key := mergedIter.Key()
|
|
value := mergedIter.Value()
|
|
|
|
// Skip duplicates (we've already included the newest version)
|
|
if lastKey != nil && bytes.Equal(key, lastKey) {
|
|
mergedIter.Next()
|
|
continue
|
|
}
|
|
|
|
// Add the entry to the current output file, preserving both values and tombstones
|
|
// This ensures deletion markers are properly maintained
|
|
if err := currentWriter.Add(key, value); err != nil {
|
|
return nil, fmt.Errorf("failed to add entry to SSTable: %w", err)
|
|
}
|
|
entriesInCurrentFile++
|
|
|
|
// If the current file is big enough, start a new one
|
|
if int64(entriesInCurrentFile) >= c.cfg.SSTableMaxSize {
|
|
if err := createNewOutputFile(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Remember this key to skip duplicates
|
|
lastKey = append(lastKey[:0], key...)
|
|
mergedIter.Next()
|
|
}
|
|
|
|
// Finish the last output file
|
|
if currentWriter != nil && entriesInCurrentFile > 0 {
|
|
if err := currentWriter.Finish(); err != nil {
|
|
return nil, fmt.Errorf("failed to finish SSTable: %w", err)
|
|
}
|
|
outputFiles = append(outputFiles, currentOutputPath)
|
|
} else if currentWriter != nil {
|
|
// No entries were written, abort the file
|
|
currentWriter.Abort()
|
|
}
|
|
|
|
return outputFiles, nil
|
|
}
|
|
|
|
// DeleteCompactedFiles removes the input files that were successfully compacted
|
|
func (c *Compactor) DeleteCompactedFiles(filePaths []string) error {
|
|
for _, path := range filePaths {
|
|
if err := os.Remove(path); err != nil {
|
|
return fmt.Errorf("failed to delete compacted file %s: %w", path, err)
|
|
}
|
|
}
|
|
return nil
|
|
} |