feat: implement filtered iterator and reduce prefix/suffix iterator into its terms
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled

This commit is contained in:
Jeremy Tregunna 2025-05-02 23:43:46 -06:00
parent 0bcd547c28
commit 86194e5daa
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
3 changed files with 396 additions and 189 deletions

View File

@ -0,0 +1,136 @@
// Package filtered provides iterators that filter keys based on different criteria
package filtered
import (
"bytes"
"github.com/KevoDB/kevo/pkg/common/iterator"
)
// KeyFilterFunc is a function type for filtering keys
type KeyFilterFunc func(key []byte) bool
// FilteredIterator wraps an iterator and applies a key filter
type FilteredIterator struct {
iter iterator.Iterator
keyFilter KeyFilterFunc
}
// NewFilteredIterator creates a new iterator with a key filter
func NewFilteredIterator(iter iterator.Iterator, filter KeyFilterFunc) *FilteredIterator {
return &FilteredIterator{
iter: iter,
keyFilter: filter,
}
}
// Next advances to the next key that passes the filter
func (fi *FilteredIterator) Next() bool {
for fi.iter.Next() {
if fi.keyFilter(fi.iter.Key()) {
return true
}
}
return false
}
// Key returns the current key
func (fi *FilteredIterator) Key() []byte {
return fi.iter.Key()
}
// Value returns the current value
func (fi *FilteredIterator) Value() []byte {
return fi.iter.Value()
}
// Valid returns true if the iterator is at a valid position
func (fi *FilteredIterator) Valid() bool {
return fi.iter.Valid() && fi.keyFilter(fi.iter.Key())
}
// IsTombstone returns true if the current entry is a deletion marker
func (fi *FilteredIterator) IsTombstone() bool {
return fi.iter.IsTombstone()
}
// SeekToFirst positions at the first key that passes the filter
func (fi *FilteredIterator) SeekToFirst() {
fi.iter.SeekToFirst()
// Advance to the first key that passes the filter
if fi.iter.Valid() && !fi.keyFilter(fi.iter.Key()) {
fi.Next()
}
}
// SeekToLast positions at the last key that passes the filter
func (fi *FilteredIterator) SeekToLast() {
// This is a simplistic implementation that may not be efficient
// For a production-quality implementation, we might want a more
// sophisticated approach
fi.iter.SeekToLast()
// If we're at a valid position but it doesn't pass the filter,
// we need to find the last key that does
if fi.iter.Valid() && !fi.keyFilter(fi.iter.Key()) {
// Inefficient but correct - scan from beginning to find last valid key
var lastValidKey []byte
fi.iter.SeekToFirst()
for fi.iter.Valid() {
if fi.keyFilter(fi.iter.Key()) {
lastValidKey = make([]byte, len(fi.iter.Key()))
copy(lastValidKey, fi.iter.Key())
}
fi.iter.Next()
}
// If we found a valid key, seek to it
if lastValidKey != nil {
fi.iter.Seek(lastValidKey)
} else {
// No valid keys found
fi.iter.SeekToFirst()
// This will be invalid after the filter is applied
}
}
}
// Seek positions at the first key >= target that passes the filter
func (fi *FilteredIterator) Seek(target []byte) bool {
if !fi.iter.Seek(target) {
return false
}
// If the current position doesn't pass the filter, find the next one that does
if !fi.keyFilter(fi.iter.Key()) {
return fi.Next()
}
return true
}
// PrefixFilterFunc creates a filter function for keys with a specific prefix
func PrefixFilterFunc(prefix []byte) KeyFilterFunc {
return func(key []byte) bool {
return bytes.HasPrefix(key, prefix)
}
}
// SuffixFilterFunc creates a filter function for keys with a specific suffix
func SuffixFilterFunc(suffix []byte) KeyFilterFunc {
return func(key []byte) bool {
return bytes.HasSuffix(key, suffix)
}
}
// PrefixIterator returns an iterator that filters keys by prefix
func NewPrefixIterator(iter iterator.Iterator, prefix []byte) *FilteredIterator {
return NewFilteredIterator(iter, PrefixFilterFunc(prefix))
}
// SuffixIterator returns an iterator that filters keys by suffix
func NewSuffixIterator(iter iterator.Iterator, suffix []byte) *FilteredIterator {
return NewFilteredIterator(iter, SuffixFilterFunc(suffix))
}

View File

@ -0,0 +1,247 @@
package filtered
import (
"bytes"
"testing"
"github.com/KevoDB/kevo/pkg/common/iterator"
)
// MockEntry represents a single entry in the mock iterator
type MockEntry struct {
Key []byte
Value []byte
Tombstone bool
}
// MockIterator is a simple in-memory iterator for testing
type MockIterator struct {
entries []MockEntry
currentIdx int
}
// NewMockIterator creates a new mock iterator with the given entries
func NewMockIterator(entries []MockEntry) *MockIterator {
return &MockIterator{
entries: entries,
currentIdx: -1, // Start before the first entry
}
}
// SeekToFirst positions at the first entry
func (mi *MockIterator) SeekToFirst() {
if len(mi.entries) > 0 {
mi.currentIdx = 0
} else {
mi.currentIdx = -1
}
}
// SeekToLast positions at the last entry
func (mi *MockIterator) SeekToLast() {
if len(mi.entries) > 0 {
mi.currentIdx = len(mi.entries) - 1
} else {
mi.currentIdx = -1
}
}
// Seek positions at the first entry with key >= target
func (mi *MockIterator) Seek(target []byte) bool {
for i, entry := range mi.entries {
if bytes.Compare(entry.Key, target) >= 0 {
mi.currentIdx = i
return true
}
}
mi.currentIdx = len(mi.entries)
return false
}
// Next advances to the next entry
func (mi *MockIterator) Next() bool {
if mi.currentIdx < len(mi.entries)-1 {
mi.currentIdx++
return true
}
mi.currentIdx = len(mi.entries)
return false
}
// Key returns the current key
func (mi *MockIterator) Key() []byte {
if mi.Valid() {
return mi.entries[mi.currentIdx].Key
}
return nil
}
// Value returns the current value
func (mi *MockIterator) Value() []byte {
if mi.Valid() {
return mi.entries[mi.currentIdx].Value
}
return nil
}
// Valid returns true if positioned at a valid entry
func (mi *MockIterator) Valid() bool {
return mi.currentIdx >= 0 && mi.currentIdx < len(mi.entries)
}
// IsTombstone returns whether the current entry is a tombstone
func (mi *MockIterator) IsTombstone() bool {
if mi.Valid() {
return mi.entries[mi.currentIdx].Tombstone
}
return false
}
// Verify the MockIterator implements Iterator
var _ iterator.Iterator = (*MockIterator)(nil)
// Test the FilteredIterator with a simple filter
func TestFilteredIterator(t *testing.T) {
entries := []MockEntry{
{Key: []byte("a1"), Value: []byte("val1"), Tombstone: false},
{Key: []byte("b2"), Value: []byte("val2"), Tombstone: false},
{Key: []byte("a3"), Value: []byte("val3"), Tombstone: true},
{Key: []byte("c4"), Value: []byte("val4"), Tombstone: false},
{Key: []byte("a5"), Value: []byte("val5"), Tombstone: false},
}
baseIter := NewMockIterator(entries)
// Filter for keys starting with 'a'
filter := func(key []byte) bool {
return bytes.HasPrefix(key, []byte("a"))
}
filtered := NewFilteredIterator(baseIter, filter)
// Test SeekToFirst and Next
filtered.SeekToFirst()
if !filtered.Valid() {
t.Fatal("Expected valid position after SeekToFirst")
}
if string(filtered.Key()) != "a1" {
t.Errorf("Expected key 'a1', got '%s'", string(filtered.Key()))
}
if string(filtered.Value()) != "val1" {
t.Errorf("Expected value 'val1', got '%s'", string(filtered.Value()))
}
if filtered.IsTombstone() {
t.Error("Expected non-tombstone for first entry")
}
// Advance to next matching entry
if !filtered.Next() {
t.Fatal("Expected successful Next() call")
}
if string(filtered.Key()) != "a3" {
t.Errorf("Expected key 'a3', got '%s'", string(filtered.Key()))
}
if !filtered.IsTombstone() {
t.Error("Expected tombstone for second entry")
}
// Advance again
if !filtered.Next() {
t.Fatal("Expected successful Next() call")
}
if string(filtered.Key()) != "a5" {
t.Errorf("Expected key 'a5', got '%s'", string(filtered.Key()))
}
// No more entries
if filtered.Next() {
t.Fatal("Expected end of iteration")
}
if filtered.Valid() {
t.Fatal("Expected invalid position at end of iteration")
}
}
// Test the PrefixIterator
func TestPrefixIterator(t *testing.T) {
entries := []MockEntry{
{Key: []byte("apple1"), Value: []byte("val1"), Tombstone: false},
{Key: []byte("banana2"), Value: []byte("val2"), Tombstone: false},
{Key: []byte("apple3"), Value: []byte("val3"), Tombstone: true},
{Key: []byte("cherry4"), Value: []byte("val4"), Tombstone: false},
{Key: []byte("apple5"), Value: []byte("val5"), Tombstone: false},
}
baseIter := NewMockIterator(entries)
prefixIter := NewPrefixIterator(baseIter, []byte("apple"))
// Count matching entries
prefixIter.SeekToFirst()
count := 0
for prefixIter.Valid() {
count++
prefixIter.Next()
}
if count != 3 {
t.Errorf("Expected 3 entries with prefix 'apple', got %d", count)
}
// Test Seek
prefixIter.Seek([]byte("apple3"))
if !prefixIter.Valid() {
t.Fatal("Expected valid position after Seek")
}
if string(prefixIter.Key()) != "apple3" {
t.Errorf("Expected key 'apple3', got '%s'", string(prefixIter.Key()))
}
}
// Test the SuffixIterator
func TestSuffixIterator(t *testing.T) {
entries := []MockEntry{
{Key: []byte("key1_suffix"), Value: []byte("val1"), Tombstone: false},
{Key: []byte("key2_other"), Value: []byte("val2"), Tombstone: false},
{Key: []byte("key3_suffix"), Value: []byte("val3"), Tombstone: true},
{Key: []byte("key4_test"), Value: []byte("val4"), Tombstone: false},
{Key: []byte("key5_suffix"), Value: []byte("val5"), Tombstone: false},
}
baseIter := NewMockIterator(entries)
suffixIter := NewSuffixIterator(baseIter, []byte("_suffix"))
// Count matching entries
suffixIter.SeekToFirst()
count := 0
for suffixIter.Valid() {
count++
suffixIter.Next()
}
if count != 3 {
t.Errorf("Expected 3 entries with suffix '_suffix', got %d", count)
}
// Test seeking to find entries with suffix
suffixIter.Seek([]byte("key3"))
if !suffixIter.Valid() {
t.Fatal("Expected valid position after Seek")
}
if string(suffixIter.Key()) != "key3_suffix" {
t.Errorf("Expected key 'key3_suffix', got '%s'", string(suffixIter.Key()))
}
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"github.com/KevoDB/kevo/pkg/common/iterator"
"github.com/KevoDB/kevo/pkg/common/iterator/filtered"
"github.com/KevoDB/kevo/pkg/common/log"
"github.com/KevoDB/kevo/pkg/engine/interfaces"
"github.com/KevoDB/kevo/pkg/replication"
@ -198,16 +199,16 @@ func (s *KevoServiceServer) Scan(req *pb.ScanRequest, stream pb.KevoService_Scan
if len(req.Prefix) > 0 && len(req.Suffix) > 0 {
// Create a combined prefix-suffix iterator
baseIter := tx.NewIterator()
prefixIter := newPrefixIterator(baseIter, req.Prefix)
iter = newSuffixIterator(prefixIter, req.Suffix)
prefixIter := filtered.NewPrefixIterator(baseIter, req.Prefix)
iter = filtered.NewSuffixIterator(prefixIter, req.Suffix)
} else if len(req.Prefix) > 0 {
// Create a prefix iterator
prefixIter := tx.NewIterator()
iter = newPrefixIterator(prefixIter, req.Prefix)
baseIter := tx.NewIterator()
iter = filtered.NewPrefixIterator(baseIter, req.Prefix)
} else if len(req.Suffix) > 0 {
// Create a suffix iterator
suffixIter := tx.NewIterator()
iter = newSuffixIterator(suffixIter, req.Suffix)
baseIter := tx.NewIterator()
iter = filtered.NewSuffixIterator(baseIter, req.Suffix)
} else if len(req.StartKey) > 0 || len(req.EndKey) > 0 {
// Create a range iterator
iter = tx.NewRangeIterator(req.StartKey, req.EndKey)
@ -244,183 +245,6 @@ func (s *KevoServiceServer) Scan(req *pb.ScanRequest, stream pb.KevoService_Scan
return nil
}
// prefixIterator wraps another iterator and filters for a prefix
type prefixIterator struct {
iter iterator.Iterator
prefix []byte
err error
}
func newPrefixIterator(iter iterator.Iterator, prefix []byte) *prefixIterator {
return &prefixIterator{
iter: iter,
prefix: prefix,
}
}
func (pi *prefixIterator) Next() bool {
for pi.iter.Next() {
// Check if current key has the prefix
key := pi.iter.Key()
if len(key) >= len(pi.prefix) &&
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
return true
}
}
return false
}
func (pi *prefixIterator) Key() []byte {
return pi.iter.Key()
}
func (pi *prefixIterator) Value() []byte {
return pi.iter.Value()
}
func (pi *prefixIterator) Valid() bool {
if !pi.iter.Valid() {
return false
}
// Check if the current key has the correct prefix
key := pi.iter.Key()
if len(key) < len(pi.prefix) {
return false
}
return equalByteSlice(key[:len(pi.prefix)], pi.prefix)
}
func (pi *prefixIterator) IsTombstone() bool {
return pi.iter.IsTombstone()
}
func (pi *prefixIterator) SeekToFirst() {
pi.iter.SeekToFirst()
// After seeking to first, we need to advance to the first key
// that actually matches our prefix
if pi.iter.Valid() {
key := pi.iter.Key()
if len(key) >= len(pi.prefix) {
if equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
// Found a match, no need to advance
return
}
}
// Current key doesn't match, find the first one that does
pi.Next()
}
}
func (pi *prefixIterator) SeekToLast() {
pi.iter.SeekToLast()
}
func (pi *prefixIterator) Seek(target []byte) bool {
return pi.iter.Seek(target)
}
// equalByteSlice compares two byte slices for equality
func equalByteSlice(a, b []byte) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
return false
}
}
return true
}
// suffixIterator wraps another iterator and filters for a suffix
type suffixIterator struct {
iter iterator.Iterator
suffix []byte
err error
}
func newSuffixIterator(iter iterator.Iterator, suffix []byte) *suffixIterator {
return &suffixIterator{
iter: iter,
suffix: suffix,
}
}
func (si *suffixIterator) Next() bool {
// Keep advancing the underlying iterator until we find a key with the correct suffix
// or reach the end
for si.iter.Next() {
// Check if current key has the suffix
key := si.iter.Key()
if len(key) >= len(si.suffix) {
// Compare the suffix portion
suffixStart := len(key) - len(si.suffix)
if equalByteSlice(key[suffixStart:], si.suffix) {
return true
}
}
}
return false
}
func (si *suffixIterator) Key() []byte {
return si.iter.Key()
}
func (si *suffixIterator) Value() []byte {
return si.iter.Value()
}
func (si *suffixIterator) Valid() bool {
if !si.iter.Valid() {
return false
}
// Check if the current key has the correct suffix
key := si.iter.Key()
if len(key) < len(si.suffix) {
return false
}
suffixStart := len(key) - len(si.suffix)
return equalByteSlice(key[suffixStart:], si.suffix)
}
func (si *suffixIterator) IsTombstone() bool {
return si.iter.IsTombstone()
}
func (si *suffixIterator) SeekToFirst() {
si.iter.SeekToFirst()
// After seeking to first, we need to advance to the first key
// that actually matches our suffix
if si.iter.Valid() {
key := si.iter.Key()
if len(key) >= len(si.suffix) {
suffixStart := len(key) - len(si.suffix)
if equalByteSlice(key[suffixStart:], si.suffix) {
// Found a match, no need to advance
return
}
}
// Current key doesn't match, find the first one that does
si.Next()
}
}
func (si *suffixIterator) SeekToLast() {
si.iter.SeekToLast()
}
func (si *suffixIterator) Seek(target []byte) bool {
return si.iter.Seek(target)
}
// BeginTransaction starts a new transaction
func (s *KevoServiceServer) BeginTransaction(ctx context.Context, req *pb.BeginTransactionRequest) (*pb.BeginTransactionResponse, error) {
@ -590,16 +414,16 @@ func (s *KevoServiceServer) TxScan(req *pb.TxScanRequest, stream pb.KevoService_
if len(req.Prefix) > 0 && len(req.Suffix) > 0 {
// Create a combined prefix-suffix iterator
baseIter := tx.NewIterator()
prefixIter := newPrefixIterator(baseIter, req.Prefix)
iter = newSuffixIterator(prefixIter, req.Suffix)
prefixIter := filtered.NewPrefixIterator(baseIter, req.Prefix)
iter = filtered.NewSuffixIterator(prefixIter, req.Suffix)
} else if len(req.Prefix) > 0 {
// Create a prefix iterator
rawIter := tx.NewIterator()
iter = newPrefixIterator(rawIter, req.Prefix)
baseIter := tx.NewIterator()
iter = filtered.NewPrefixIterator(baseIter, req.Prefix)
} else if len(req.Suffix) > 0 {
// Create a suffix iterator
rawIter := tx.NewIterator()
iter = newSuffixIterator(rawIter, req.Suffix)
baseIter := tx.NewIterator()
iter = filtered.NewSuffixIterator(baseIter, req.Suffix)
} else if len(req.StartKey) > 0 || len(req.EndKey) > 0 {
// Create a range iterator
iter = tx.NewRangeIterator(req.StartKey, req.EndKey)