kevo/pkg/engine/iterator.go
Jeremy Tregunna 68283a5fed
feat: implement merged iterator across all levels with improved tombstone handling
- Remove redundant MergedIterator (was just an alias for HierarchicalIterator)
- Add IsTombstone method to all iterators to detect deletion markers
- Enhance tombstone tracking in compaction manager with preservation options
- Fix SSTable reader to properly handle tombstone entries
- Update engine tests to directly verify tombstone behavior
- Update TODO.md to mark merged iterator task as complete
2025-04-19 22:18:12 -06:00

671 lines
15 KiB
Go

package engine
import (
"bytes"
"container/heap"
"sync"
"git.canoozie.net/jer/go-storage/pkg/iterator"
"git.canoozie.net/jer/go-storage/pkg/memtable"
"git.canoozie.net/jer/go-storage/pkg/sstable"
)
// Iterator is an interface for iterating over key-value pairs
type Iterator interface {
// SeekToFirst positions the iterator at the first key
SeekToFirst()
// SeekToLast positions the iterator at the last key
SeekToLast()
// Seek positions the iterator at the first key >= target
Seek(target []byte) bool
// Next advances the iterator to the next key
Next() bool
// Key returns the current key
Key() []byte
// Value returns the current value
Value() []byte
// Valid returns true if the iterator is positioned at a valid entry
Valid() bool
// IsTombstone returns true if the current entry is a deletion marker
// This is used during compaction to distinguish between a regular nil value and a tombstone
IsTombstone() bool
}
// iterHeapItem represents an item in the priority queue of iterators
type iterHeapItem struct {
// The original source iterator
source IterSource
// The current key and value
key []byte
value []byte
// Internal heap index
index int
}
// iterHeap is a min-heap of iterators, ordered by their current key
type iterHeap []*iterHeapItem
// Implement heap.Interface
func (h iterHeap) Len() int { return len(h) }
func (h iterHeap) Less(i, j int) bool {
// Sort by key (primary) in ascending order
return bytes.Compare(h[i].key, h[j].key) < 0
}
func (h iterHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *iterHeap) Push(x interface{}) {
item := x.(*iterHeapItem)
item.index = len(*h)
*h = append(*h, item)
}
func (h *iterHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1
*h = old[0 : n-1]
return item
}
// IterSource is an interface for any source that can provide key-value pairs
type IterSource interface {
// GetIterator returns an iterator for this source
GetIterator() Iterator
// GetLevel returns the level of this source (lower is newer)
GetLevel() int
}
// MemTableSource is an iterator source backed by a MemTable
type MemTableSource struct {
mem *memtable.MemTable
level int
}
func (m *MemTableSource) GetIterator() Iterator {
return newMemTableIterAdapter(m.mem.NewIterator())
}
func (m *MemTableSource) GetLevel() int {
return m.level
}
// SSTableSource is an iterator source backed by an SSTable
type SSTableSource struct {
sst *sstable.Reader
level int
}
func (s *SSTableSource) GetIterator() Iterator {
return newSSTableIterAdapter(s.sst.NewIterator())
}
func (s *SSTableSource) GetLevel() int {
return s.level
}
// MemTableIterAdapter adapts a memtable.Iterator to our Iterator interface
type MemTableIterAdapter struct {
iter *memtable.Iterator
}
func newMemTableIterAdapter(iter *memtable.Iterator) *MemTableIterAdapter {
return &MemTableIterAdapter{iter: iter}
}
func (a *MemTableIterAdapter) SeekToFirst() {
a.iter.SeekToFirst()
}
func (a *MemTableIterAdapter) SeekToLast() {
// This is an inefficient implementation because the MemTable iterator
// doesn't directly support SeekToLast. We simulate it by scanning to the end.
a.iter.SeekToFirst()
// If no items, return early
if !a.iter.Valid() {
return
}
// Store the last key we've seen
var lastKey []byte
// Scan to find the last element
for a.iter.Valid() {
lastKey = a.iter.Key()
a.iter.Next()
}
// Re-position at the last key we found
if lastKey != nil {
a.iter.Seek(lastKey)
}
}
func (a *MemTableIterAdapter) Seek(target []byte) bool {
a.iter.Seek(target)
return a.iter.Valid()
}
func (a *MemTableIterAdapter) Next() bool {
if !a.Valid() {
return false
}
a.iter.Next()
return a.iter.Valid()
}
func (a *MemTableIterAdapter) Key() []byte {
if !a.Valid() {
return nil
}
return a.iter.Key()
}
func (a *MemTableIterAdapter) Value() []byte {
if !a.Valid() {
return nil
}
// Check if this is a tombstone (deletion marker)
if a.iter.IsTombstone() {
// Special case: return nil but with a marker that this is a tombstone
// This ensures that during compaction, we know this is a deletion marker
// See memtable.Iterator.IsTombstone() for details
return nil
}
return a.iter.Value()
}
// IsTombstone returns true if the current entry is a deletion marker
func (a *MemTableIterAdapter) IsTombstone() bool {
return a.iter != nil && a.iter.IsTombstone()
}
func (a *MemTableIterAdapter) Valid() bool {
return a.iter != nil && a.iter.Valid()
}
// SSTableIterAdapter adapts an sstable.Iterator to our Iterator interface
type SSTableIterAdapter struct {
iter *sstable.Iterator
}
func newSSTableIterAdapter(iter *sstable.Iterator) *SSTableIterAdapter {
return &SSTableIterAdapter{iter: iter}
}
func (a *SSTableIterAdapter) SeekToFirst() {
a.iter.SeekToFirst()
}
func (a *SSTableIterAdapter) SeekToLast() {
a.iter.SeekToLast()
}
func (a *SSTableIterAdapter) Seek(target []byte) bool {
return a.iter.Seek(target)
}
func (a *SSTableIterAdapter) Next() bool {
return a.iter.Next()
}
func (a *SSTableIterAdapter) Key() []byte {
if !a.Valid() {
return nil
}
return a.iter.Key()
}
func (a *SSTableIterAdapter) Value() []byte {
if !a.Valid() {
return nil
}
return a.iter.Value()
}
func (a *SSTableIterAdapter) Valid() bool {
return a.iter != nil && a.iter.Valid()
}
// IsTombstone returns true if the current entry is a deletion marker
// For SSTable iterators, we have to infer this from the value being nil
func (a *SSTableIterAdapter) IsTombstone() bool {
return a.Valid() && a.Value() == nil
}
// MergedIterator merges multiple iterators into a single sorted view
// It uses a heap to efficiently merge the iterators
type MergedIterator struct {
sources []IterSource
iters []Iterator
heap iterHeap
current *iterHeapItem
mu sync.Mutex
}
// NewMergedIterator creates a new merged iterator from the given sources
// The sources should be provided in newest-to-oldest order
func NewMergedIterator(sources []IterSource) *MergedIterator {
return &MergedIterator{
sources: sources,
iters: make([]Iterator, len(sources)),
heap: make(iterHeap, 0, len(sources)),
}
}
// SeekToFirst positions the iterator at the first key
func (m *MergedIterator) SeekToFirst() {
m.mu.Lock()
defer m.mu.Unlock()
// Initialize iterators if needed
if len(m.iters) != len(m.sources) {
m.initIterators()
}
// Position all iterators at their first key
m.heap = m.heap[:0] // Clear heap
for i, iter := range m.iters {
iter.SeekToFirst()
if iter.Valid() {
heap.Push(&m.heap, &iterHeapItem{
source: m.sources[i],
key: iter.Key(),
value: iter.Value(),
})
}
}
m.advanceHeap()
}
// Seek positions the iterator at the first key >= target
func (m *MergedIterator) Seek(target []byte) bool {
m.mu.Lock()
defer m.mu.Unlock()
// Initialize iterators if needed
if len(m.iters) != len(m.sources) {
m.initIterators()
}
// Position all iterators at or after the target key
m.heap = m.heap[:0] // Clear heap
for i, iter := range m.iters {
if iter.Seek(target) {
heap.Push(&m.heap, &iterHeapItem{
source: m.sources[i],
key: iter.Key(),
value: iter.Value(),
})
}
}
m.advanceHeap()
return m.current != nil
}
// SeekToLast positions the iterator at the last key
func (m *MergedIterator) SeekToLast() {
m.mu.Lock()
defer m.mu.Unlock()
// Initialize iterators if needed
if len(m.iters) != len(m.sources) {
m.initIterators()
}
// Position all iterators at their last key
var lastKey []byte
var lastValue []byte
var lastSource IterSource
var lastLevel int = -1
for i, iter := range m.iters {
iter.SeekToLast()
if !iter.Valid() {
continue
}
key := iter.Key()
// If this is a new maximum key, or the same key but from a newer level
if lastKey == nil ||
bytes.Compare(key, lastKey) > 0 ||
(bytes.Equal(key, lastKey) && m.sources[i].GetLevel() < lastLevel) {
lastKey = key
lastValue = iter.Value()
lastSource = m.sources[i]
lastLevel = m.sources[i].GetLevel()
}
}
if lastKey != nil {
m.current = &iterHeapItem{
source: lastSource,
key: lastKey,
value: lastValue,
}
} else {
m.current = nil
}
}
// Next advances the iterator to the next key
func (m *MergedIterator) Next() bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.current == nil {
return false
}
// Get the current key to skip duplicates
currentKey := m.current.key
// Add back the iterator for the current source if it has more keys
sourceIndex := -1
for i, s := range m.sources {
if s == m.current.source {
sourceIndex = i
break
}
}
if sourceIndex >= 0 {
iter := m.iters[sourceIndex]
if iter.Next() && !bytes.Equal(iter.Key(), currentKey) {
heap.Push(&m.heap, &iterHeapItem{
source: m.sources[sourceIndex],
key: iter.Key(),
value: iter.Value(),
})
}
}
// Skip any entries with the same key (we've already returned the value from the newest source)
for len(m.heap) > 0 && bytes.Equal(m.heap[0].key, currentKey) {
item := heap.Pop(&m.heap).(*iterHeapItem)
sourceIndex = -1
for i, s := range m.sources {
if s == item.source {
sourceIndex = i
break
}
}
if sourceIndex >= 0 {
iter := m.iters[sourceIndex]
if iter.Next() && !bytes.Equal(iter.Key(), currentKey) {
heap.Push(&m.heap, &iterHeapItem{
source: m.sources[sourceIndex],
key: iter.Key(),
value: iter.Value(),
})
}
}
}
m.advanceHeap()
return m.current != nil
}
// Key returns the current key
func (m *MergedIterator) Key() []byte {
m.mu.Lock()
defer m.mu.Unlock()
if m.current == nil {
return nil
}
return m.current.key
}
// Value returns the current value
func (m *MergedIterator) Value() []byte {
m.mu.Lock()
defer m.mu.Unlock()
if m.current == nil {
return nil
}
return m.current.value
}
// Valid returns true if the iterator is positioned at a valid entry
func (m *MergedIterator) Valid() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.current != nil
}
// IsTombstone returns true if the current entry is a deletion marker
func (m *MergedIterator) IsTombstone() bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.current == nil {
return false
}
// In a MergedIterator, we need to check if the source iterator marks this as a tombstone
for _, source := range m.sources {
if source == m.current.source {
iter := source.GetIterator()
return iter.IsTombstone()
}
}
return false
}
// initIterators initializes all iterators from sources
func (m *MergedIterator) initIterators() {
for i, source := range m.sources {
m.iters[i] = source.GetIterator()
}
}
// advanceHeap advances the heap and updates the current item
func (m *MergedIterator) advanceHeap() {
if len(m.heap) == 0 {
m.current = nil
return
}
// Get the smallest key
m.current = heap.Pop(&m.heap).(*iterHeapItem)
// Skip any entries with duplicate keys (keeping the one from the newest source)
// Sources are already provided in newest-to-oldest order, and we've popped
// the smallest key, so any item in the heap with the same key is from an older source
currentKey := m.current.key
for len(m.heap) > 0 && bytes.Equal(m.heap[0].key, currentKey) {
item := heap.Pop(&m.heap).(*iterHeapItem)
sourceIndex := -1
for i, s := range m.sources {
if s == item.source {
sourceIndex = i
break
}
}
if sourceIndex >= 0 {
iter := m.iters[sourceIndex]
if iter.Next() && !bytes.Equal(iter.Key(), currentKey) {
heap.Push(&m.heap, &iterHeapItem{
source: m.sources[sourceIndex],
key: iter.Key(),
value: iter.Value(),
})
}
}
}
}
// GetIterator returns an iterator over the entire database
func (e *Engine) GetIterator() (Iterator, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closed.Load() {
return nil, ErrEngineClosed
}
// Get all MemTables from the pool
memTables := e.memTablePool.GetMemTables()
// Create a list of all iterator sources in newest-to-oldest order
sources := make([]IterSource, 0, len(memTables)+len(e.sstables))
// Add MemTables (active first, then immutables)
for i, table := range memTables {
sources = append(sources, &MemTableSource{
mem: table,
level: i, // Level corresponds to position in the list
})
}
// Add SSTables (levels after MemTables)
baseLevel := len(memTables)
for i := len(e.sstables) - 1; i >= 0; i-- {
sources = append(sources, &SSTableSource{
sst: e.sstables[i],
level: baseLevel + (len(e.sstables) - 1 - i),
})
}
// Convert sources to actual iterators
iters := make([]iterator.Iterator, 0, len(sources))
for _, src := range sources {
iters = append(iters, src.GetIterator())
}
// Create and return a hierarchical iterator that understands LSM-tree structure
return iterator.NewHierarchicalIterator(iters), nil
}
// GetRangeIterator returns an iterator over a specific key range
func (e *Engine) GetRangeIterator(start, end []byte) (Iterator, error) {
iter, err := e.GetIterator()
if err != nil {
return nil, err
}
// Position at the start key
if start != nil {
if !iter.Seek(start) {
// No keys in range
return iter, nil
}
} else {
iter.SeekToFirst()
if !iter.Valid() {
// Empty database
return iter, nil
}
}
// If we have an end key, wrap the iterator to limit the range
if end != nil {
iter = &boundedIterator{
Iterator: iter,
end: end,
}
}
return iter, nil
}
// boundedIterator wraps an iterator and limits it to a specific range
type boundedIterator struct {
Iterator
end []byte
}
func (b *boundedIterator) SeekToFirst() {
b.Iterator.SeekToFirst()
b.checkBounds()
}
func (b *boundedIterator) Seek(target []byte) bool {
if b.Iterator.Seek(target) {
return b.checkBounds()
}
return false
}
func (b *boundedIterator) Next() bool {
// First check if we're already at or beyond the end boundary
if !b.checkBounds() {
return false
}
// Then try to advance
if !b.Iterator.Next() {
return false
}
// Check if the new position is within bounds
return b.checkBounds()
}
func (b *boundedIterator) Valid() bool {
return b.Iterator.Valid() && b.checkBounds()
}
func (b *boundedIterator) Key() []byte {
if !b.Valid() {
return nil
}
return b.Iterator.Key()
}
func (b *boundedIterator) Value() []byte {
if !b.Valid() {
return nil
}
return b.Iterator.Value()
}
// IsTombstone returns true if the current entry is a deletion marker
func (b *boundedIterator) IsTombstone() bool {
if !b.Valid() {
return false
}
return b.Iterator.IsTombstone()
}
func (b *boundedIterator) checkBounds() bool {
if !b.Iterator.Valid() {
return false
}
// Check if the current key is beyond the end bound
if b.end != nil && len(b.end) > 0 {
// For a range query [start, end), the end key is exclusive
if bytes.Compare(b.Iterator.Key(), b.end) >= 0 {
return false
}
}
return true
}