kevo/pkg/compaction/manager.go

393 lines
9.4 KiB
Go

package compaction
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.canoozie.net/jer/go-storage/pkg/config"
)
// CompactionManager coordinates the compaction process
type CompactionManager struct {
// Configuration
cfg *config.Config
// SSTable directory
sstableDir string
// Compactor implementation
compactor *TieredCompactor
// Tombstone tracker
tombstones *TombstoneTracker
// Next sequence number for SSTable files
nextSeq uint64
// Compaction state
running bool
stopCh chan struct{}
compactingMu sync.Mutex
// File tracking
// Map of file path -> true for files that have been obsoleted by compaction
obsoleteFiles map[string]bool
// Map of file path -> true for files that are currently being compacted
pendingFiles map[string]bool
// Last set of files produced by compaction
lastCompactionOutputs []string
filesMu sync.RWMutex
}
// NewCompactionManager creates a new compaction manager
func NewCompactionManager(cfg *config.Config, sstableDir string) *CompactionManager {
return &CompactionManager{
cfg: cfg,
sstableDir: sstableDir,
compactor: NewTieredCompactor(cfg, sstableDir),
tombstones: NewTombstoneTracker(24 * time.Hour), // Default 24-hour retention
nextSeq: 1,
stopCh: make(chan struct{}),
obsoleteFiles: make(map[string]bool),
pendingFiles: make(map[string]bool),
lastCompactionOutputs: make([]string, 0),
}
}
// Start begins background compaction
func (cm *CompactionManager) Start() error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
if cm.running {
return nil // Already running
}
// Load existing SSTables
if err := cm.compactor.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
cm.running = true
cm.stopCh = make(chan struct{})
// Start background worker
go cm.compactionWorker()
return nil
}
// Stop halts background compaction
func (cm *CompactionManager) Stop() error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
if !cm.running {
return nil // Already stopped
}
// Signal the worker to stop
close(cm.stopCh)
cm.running = false
// Close compactor
return cm.compactor.Close()
}
// MarkFileObsolete marks a file as obsolete
func (cm *CompactionManager) MarkFileObsolete(path string) {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
cm.obsoleteFiles[path] = true
}
// MarkFilePending marks a file as being used in a compaction
func (cm *CompactionManager) MarkFilePending(path string) {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
cm.pendingFiles[path] = true
}
// UnmarkFilePending removes the pending mark
func (cm *CompactionManager) UnmarkFilePending(path string) {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
delete(cm.pendingFiles, path)
}
// IsFileObsolete checks if a file is marked as obsolete
func (cm *CompactionManager) IsFileObsolete(path string) bool {
cm.filesMu.RLock()
defer cm.filesMu.RUnlock()
return cm.obsoleteFiles[path]
}
// IsFilePending checks if a file is pending compaction
func (cm *CompactionManager) IsFilePending(path string) bool {
cm.filesMu.RLock()
defer cm.filesMu.RUnlock()
return cm.pendingFiles[path]
}
// CleanupObsoleteFiles removes files that are no longer needed
func (cm *CompactionManager) CleanupObsoleteFiles() error {
cm.filesMu.Lock()
defer cm.filesMu.Unlock()
// Safely remove obsolete files that aren't pending
for path := range cm.obsoleteFiles {
// Skip files that are still being used in a compaction
if cm.pendingFiles[path] {
continue
}
// Try to delete the file
if err := os.Remove(path); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("failed to delete obsolete file %s: %w", path, err)
}
// If the file doesn't exist, remove it from our tracking
delete(cm.obsoleteFiles, path)
} else {
// Successfully deleted, remove from tracking
delete(cm.obsoleteFiles, path)
}
}
return nil
}
// compactionWorker runs the compaction loop
func (cm *CompactionManager) compactionWorker() {
// Ensure a minimum interval of 1 second
interval := cm.cfg.CompactionInterval
if interval <= 0 {
interval = 1
}
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-cm.stopCh:
return
case <-ticker.C:
// Only one compaction at a time
cm.compactingMu.Lock()
// Run a compaction cycle
err := cm.runCompactionCycle()
if err != nil {
// In a real system, we'd log this error
// fmt.Printf("Compaction error: %v\n", err)
}
// Try to clean up obsolete files
err = cm.CleanupObsoleteFiles()
if err != nil {
// In a real system, we'd log this error
// fmt.Printf("Cleanup error: %v\n", err)
}
// Collect tombstone garbage periodically
cm.tombstones.CollectGarbage()
cm.compactingMu.Unlock()
}
}
}
// runCompactionCycle performs a single compaction cycle
func (cm *CompactionManager) runCompactionCycle() error {
// Reload SSTables to get fresh information
if err := cm.compactor.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
// Select files for compaction
task, err := cm.compactor.SelectCompaction()
if err != nil {
return fmt.Errorf("failed to select files for compaction: %w", err)
}
// If no compaction needed, return
if task == nil {
return nil
}
// Mark files as pending
for _, files := range task.InputFiles {
for _, file := range files {
cm.MarkFilePending(file.Path)
}
}
// Perform compaction
outputFiles, err := cm.compactor.CompactFiles(task)
// Unmark files as pending
for _, files := range task.InputFiles {
for _, file := range files {
cm.UnmarkFilePending(file.Path)
}
}
// Track the compaction outputs for statistics
if err == nil && len(outputFiles) > 0 {
// Record the compaction result
cm.filesMu.Lock()
cm.lastCompactionOutputs = outputFiles
cm.filesMu.Unlock()
}
// Handle compaction errors
if err != nil {
return fmt.Errorf("compaction failed: %w", err)
}
// Mark input files as obsolete
for _, files := range task.InputFiles {
for _, file := range files {
cm.MarkFileObsolete(file.Path)
}
}
// Try to clean up the files immediately
return cm.CleanupObsoleteFiles()
}
// TriggerCompaction forces a compaction cycle
func (cm *CompactionManager) TriggerCompaction() error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
return cm.runCompactionCycle()
}
// CompactRange triggers compaction on a specific key range
func (cm *CompactionManager) CompactRange(minKey, maxKey []byte) error {
cm.compactingMu.Lock()
defer cm.compactingMu.Unlock()
// Load current SSTable information
if err := cm.compactor.LoadSSTables(); err != nil {
return fmt.Errorf("failed to load SSTables: %w", err)
}
// Find files that overlap with the target range
rangeInfo := &SSTableInfo{
FirstKey: minKey,
LastKey: maxKey,
}
var filesToCompact []*SSTableInfo
for level, files := range cm.compactor.levels {
for _, file := range files {
if file.Overlaps(rangeInfo) {
// Add level information to the file
file.Level = level
filesToCompact = append(filesToCompact, file)
// Mark as pending
cm.MarkFilePending(file.Path)
}
}
}
// If no files to compact, we're done
if len(filesToCompact) == 0 {
return nil
}
// Determine the target level - use the highest level found + 1
targetLevel := 0
for _, file := range filesToCompact {
if file.Level > targetLevel {
targetLevel = file.Level
}
}
targetLevel++ // Compact to next level
// Create the compaction task
task := &CompactionTask{
InputFiles: make(map[int][]*SSTableInfo),
TargetLevel: targetLevel,
OutputPathTemplate: filepath.Join(cm.sstableDir, "%d_%06d_%020d.sst"),
}
// Group files by level
for _, file := range filesToCompact {
task.InputFiles[file.Level] = append(task.InputFiles[file.Level], file)
}
// Perform the compaction
outputFiles, err := cm.compactor.CompactFiles(task)
// Unmark files as pending
for _, file := range filesToCompact {
cm.UnmarkFilePending(file.Path)
}
// Track the compaction outputs for statistics
if err == nil && len(outputFiles) > 0 {
cm.filesMu.Lock()
cm.lastCompactionOutputs = outputFiles
cm.filesMu.Unlock()
}
// Handle errors
if err != nil {
return fmt.Errorf("failed to compact range: %w", err)
}
// Mark input files as obsolete
for _, file := range filesToCompact {
cm.MarkFileObsolete(file.Path)
}
// Try to clean up immediately
return cm.CleanupObsoleteFiles()
}
// GetCompactionStats returns statistics about the compaction state
func (cm *CompactionManager) GetCompactionStats() map[string]interface{} {
cm.filesMu.RLock()
defer cm.filesMu.RUnlock()
stats := make(map[string]interface{})
// Count files by level
levelCounts := make(map[int]int)
levelSizes := make(map[int]int64)
for level, files := range cm.compactor.levels {
levelCounts[level] = len(files)
var totalSize int64
for _, file := range files {
totalSize += file.Size
}
levelSizes[level] = totalSize
}
stats["level_counts"] = levelCounts
stats["level_sizes"] = levelSizes
stats["obsolete_files"] = len(cm.obsoleteFiles)
stats["pending_files"] = len(cm.pendingFiles)
stats["last_outputs_count"] = len(cm.lastCompactionOutputs)
// If there are recent compaction outputs, include information
if len(cm.lastCompactionOutputs) > 0 {
stats["last_outputs"] = cm.lastCompactionOutputs
}
return stats
}