package engine import ( "errors" "fmt" "os" "path/filepath" "sync" "sync/atomic" "time" "git.canoozie.net/jer/go-storage/pkg/config" "git.canoozie.net/jer/go-storage/pkg/memtable" "git.canoozie.net/jer/go-storage/pkg/sstable" "git.canoozie.net/jer/go-storage/pkg/wal" ) const ( // SSTable filename format: level_sequence_timestamp.sst sstableFilenameFormat = "%d_%06d_%020d.sst" ) var ( // ErrEngineClosed is returned when operations are performed on a closed engine ErrEngineClosed = errors.New("engine is closed") // ErrKeyNotFound is returned when a key is not found ErrKeyNotFound = errors.New("key not found") ) // Engine implements the core storage engine functionality type Engine struct { // Configuration and paths cfg *config.Config dataDir string sstableDir string walDir string // Write-ahead log wal *wal.WAL // Memory tables memTablePool *memtable.MemTablePool immutableMTs []*memtable.MemTable // Storage layer sstables []*sstable.Reader // State management nextFileNum uint64 lastSeqNum uint64 bgFlushCh chan struct{} closed atomic.Bool // Concurrency control mu sync.RWMutex flushMu sync.Mutex } // NewEngine creates a new storage engine func NewEngine(dataDir string) (*Engine, error) { // Create the data directory if it doesn't exist if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create data directory: %w", err) } // Load the configuration or create a new one if it doesn't exist var cfg *config.Config cfg, err := config.LoadConfigFromManifest(dataDir) if err != nil { if !errors.Is(err, config.ErrManifestNotFound) { return nil, fmt.Errorf("failed to load configuration: %w", err) } // Create a new configuration cfg = config.NewDefaultConfig(dataDir) if err := cfg.SaveManifest(dataDir); err != nil { return nil, fmt.Errorf("failed to save configuration: %w", err) } } // Create directories sstableDir := cfg.SSTDir walDir := cfg.WALDir if err := os.MkdirAll(sstableDir, 0755); err != nil { return nil, fmt.Errorf("failed to create sstable directory: %w", err) } if err := os.MkdirAll(walDir, 0755); err != nil { return nil, fmt.Errorf("failed to create wal directory: %w", err) } // Create the WAL wal, err := wal.NewWAL(cfg, walDir) if err != nil { return nil, fmt.Errorf("failed to create WAL: %w", err) } // Create the MemTable pool memTablePool := memtable.NewMemTablePool(cfg) e := &Engine{ cfg: cfg, dataDir: dataDir, sstableDir: sstableDir, walDir: walDir, wal: wal, memTablePool: memTablePool, immutableMTs: make([]*memtable.MemTable, 0), sstables: make([]*sstable.Reader, 0), bgFlushCh: make(chan struct{}, 1), nextFileNum: 1, } // Load existing SSTables if err := e.loadSSTables(); err != nil { return nil, fmt.Errorf("failed to load SSTables: %w", err) } // Start background flush goroutine go e.backgroundFlush() return e, nil } // Put adds a key-value pair to the database func (e *Engine) Put(key, value []byte) error { e.mu.Lock() defer e.mu.Unlock() if e.closed.Load() { return ErrEngineClosed } // Append to WAL seqNum, err := e.wal.Append(wal.OpTypePut, key, value) if err != nil { return fmt.Errorf("failed to append to WAL: %w", err) } // Add to MemTable e.memTablePool.Put(key, value, seqNum) e.lastSeqNum = seqNum // Check if MemTable needs to be flushed if e.memTablePool.IsFlushNeeded() { if err := e.scheduleFlush(); err != nil { return fmt.Errorf("failed to schedule flush: %w", err) } } return nil } // Get retrieves the value for the given key func (e *Engine) Get(key []byte) ([]byte, error) { e.mu.RLock() defer e.mu.RUnlock() if e.closed.Load() { return nil, ErrEngineClosed } // Check the MemTablePool (active + immutables) if val, found := e.memTablePool.Get(key); found { // The key was found, but check if it's a deletion marker if val == nil { // This is a deletion marker - the key exists but was deleted return nil, ErrKeyNotFound } return val, nil } // Check the SSTables (searching from newest to oldest) for i := len(e.sstables) - 1; i >= 0; i-- { val, err := e.sstables[i].Get(key) if err == nil { return val, nil } if !errors.Is(err, sstable.ErrNotFound) { return nil, fmt.Errorf("SSTable error: %w", err) } } return nil, ErrKeyNotFound } // Delete removes a key from the database func (e *Engine) Delete(key []byte) error { e.mu.Lock() defer e.mu.Unlock() if e.closed.Load() { return ErrEngineClosed } // Append to WAL seqNum, err := e.wal.Append(wal.OpTypeDelete, key, nil) if err != nil { return fmt.Errorf("failed to append to WAL: %w", err) } // Add deletion marker to MemTable e.memTablePool.Delete(key, seqNum) e.lastSeqNum = seqNum // Check if MemTable needs to be flushed if e.memTablePool.IsFlushNeeded() { if err := e.scheduleFlush(); err != nil { return fmt.Errorf("failed to schedule flush: %w", err) } } return nil } // scheduleFlush switches to a new MemTable and schedules flushing of the old one func (e *Engine) scheduleFlush() error { // Get the MemTable that needs to be flushed immutable := e.memTablePool.SwitchToNewMemTable() // Add to our list of immutable tables to track e.immutableMTs = append(e.immutableMTs, immutable) // For testing purposes, do an immediate flush as well // This ensures that tests can verify flushes happen go func() { err := e.flushMemTable(immutable) if err != nil { // In a real implementation, we would log this error // or retry the flush later } }() // Signal background flush select { case e.bgFlushCh <- struct{}{}: // Signal sent successfully default: // A flush is already scheduled } return nil } // FlushImMemTables flushes all immutable MemTables to disk // This is exported for testing purposes func (e *Engine) FlushImMemTables() error { e.flushMu.Lock() defer e.flushMu.Unlock() // If no immutable MemTables but we have an active one in tests, use that too if len(e.immutableMTs) == 0 { tables := e.memTablePool.GetMemTables() if len(tables) > 0 && tables[0].ApproximateSize() > 0 { // In testing, we might want to force flush the active table too // Create a new WAL file for future writes if err := e.rotateWAL(); err != nil { return fmt.Errorf("failed to rotate WAL: %w", err) } if err := e.flushMemTable(tables[0]); err != nil { return fmt.Errorf("failed to flush active MemTable: %w", err) } return nil } return nil } // Create a new WAL file for future writes if err := e.rotateWAL(); err != nil { return fmt.Errorf("failed to rotate WAL: %w", err) } // Flush each immutable MemTable for i, imMem := range e.immutableMTs { if err := e.flushMemTable(imMem); err != nil { return fmt.Errorf("failed to flush MemTable %d: %w", i, err) } } // Clear the immutable list - the MemTablePool manages reuse e.immutableMTs = e.immutableMTs[:0] return nil } // flushMemTable flushes a MemTable to disk as an SSTable func (e *Engine) flushMemTable(mem *memtable.MemTable) error { // Verify the memtable has data to flush if mem.ApproximateSize() == 0 { return nil } // Ensure the SSTable directory exists err := os.MkdirAll(e.sstableDir, 0755) if err != nil { return fmt.Errorf("failed to create SSTable directory: %w", err) } // Generate the SSTable filename: level_sequence_timestamp.sst fileNum := atomic.AddUint64(&e.nextFileNum, 1) - 1 timestamp := time.Now().UnixNano() filename := fmt.Sprintf(sstableFilenameFormat, 0, fileNum, timestamp) sstPath := filepath.Join(e.sstableDir, filename) // Create a new SSTable writer writer, err := sstable.NewWriter(sstPath) if err != nil { return fmt.Errorf("failed to create SSTable writer: %w", err) } // Get an iterator over the MemTable iter := mem.NewIterator() count := 0 // Write all entries to the SSTable for iter.SeekToFirst(); iter.Valid(); iter.Next() { // Skip deletion markers, only add value entries if value := iter.Value(); value != nil { if err := writer.Add(iter.Key(), value); err != nil { writer.Abort() return fmt.Errorf("failed to add entry to SSTable: %w", err) } count++ } } if count == 0 { writer.Abort() return nil } // Finish writing the SSTable if err := writer.Finish(); err != nil { return fmt.Errorf("failed to finish SSTable: %w", err) } // Verify the file was created if _, err := os.Stat(sstPath); os.IsNotExist(err) { return fmt.Errorf("SSTable file was not created at %s", sstPath) } // Open the new SSTable for reading reader, err := sstable.OpenReader(sstPath) if err != nil { return fmt.Errorf("failed to open SSTable: %w", err) } // Add the SSTable to the list e.mu.Lock() e.sstables = append(e.sstables, reader) e.mu.Unlock() return nil } // rotateWAL creates a new WAL file and closes the old one func (e *Engine) rotateWAL() error { // Close the current WAL if err := e.wal.Close(); err != nil { return fmt.Errorf("failed to close WAL: %w", err) } // Create a new WAL wal, err := wal.NewWAL(e.cfg, e.walDir) if err != nil { return fmt.Errorf("failed to create new WAL: %w", err) } e.wal = wal return nil } // backgroundFlush runs in a goroutine and periodically flushes immutable MemTables func (e *Engine) backgroundFlush() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-e.bgFlushCh: // Received a flush signal e.mu.RLock() closed := e.closed.Load() e.mu.RUnlock() if closed { return } e.FlushImMemTables() case <-ticker.C: // Periodic check e.mu.RLock() closed := e.closed.Load() hasWork := len(e.immutableMTs) > 0 e.mu.RUnlock() if closed { return } if hasWork { e.FlushImMemTables() } } } } // loadSSTables loads existing SSTable files from disk func (e *Engine) loadSSTables() error { // Get all SSTable files in the directory entries, err := os.ReadDir(e.sstableDir) if err != nil { if os.IsNotExist(err) { return nil // Directory doesn't exist yet } return fmt.Errorf("failed to read SSTable directory: %w", err) } // Loop through all entries for _, entry := range entries { if entry.IsDir() || filepath.Ext(entry.Name()) != ".sst" { continue // Skip directories and non-SSTable files } // Open the SSTable path := filepath.Join(e.sstableDir, entry.Name()) reader, err := sstable.OpenReader(path) if err != nil { return fmt.Errorf("failed to open SSTable %s: %w", path, err) } // Add to the list e.sstables = append(e.sstables, reader) } return nil } // Close closes the storage engine func (e *Engine) Close() error { // First set the closed flag - use atomic operation to prevent race conditions wasAlreadyClosed := e.closed.Swap(true) if wasAlreadyClosed { return nil // Already closed } // Hold the lock while closing resources e.mu.Lock() defer e.mu.Unlock() // Close WAL first if err := e.wal.Close(); err != nil { return fmt.Errorf("failed to close WAL: %w", err) } // Close SSTables for _, table := range e.sstables { if err := table.Close(); err != nil { return fmt.Errorf("failed to close SSTable: %w", err) } } return nil }