Compare commits
1 Commits
001934e7b5
...
f9c3f17391
Author | SHA1 | Date | |
---|---|---|---|
f9c3f17391 |
@ -94,48 +94,54 @@ func (h *HierarchicalIterator) Seek(target []byte) bool {
|
|||||||
iter.Seek(target)
|
iter.Seek(target)
|
||||||
}
|
}
|
||||||
|
|
||||||
// For seek, we need to treat it differently than findNextUniqueKey since we want
|
// For seek, we need to find the smallest key >= target
|
||||||
// keys >= target, not strictly > target
|
var bestKey []byte
|
||||||
var minKey []byte
|
var bestValue []byte
|
||||||
var minValue []byte
|
var bestIterIdx int = -1
|
||||||
var seenKeys = make(map[string]bool)
|
|
||||||
h.valid = false
|
h.valid = false
|
||||||
|
|
||||||
// Find the smallest key >= target from all iterators
|
// First pass: find the smallest key >= target
|
||||||
for _, iter := range h.iterators {
|
for i, iter := range h.iterators {
|
||||||
if !iter.Valid() {
|
if !iter.Valid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
key := iter.Key()
|
key := iter.Key()
|
||||||
value := iter.Value()
|
|
||||||
|
|
||||||
// Skip keys < target (Seek should return keys >= target)
|
// Skip keys < target (Seek should return keys >= target)
|
||||||
if bytes.Compare(key, target) < 0 {
|
if bytes.Compare(key, target) < 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert key to string for map lookup
|
// If we haven't found a valid key yet, or this key is smaller than the current best key
|
||||||
keyStr := string(key)
|
if bestIterIdx == -1 || bytes.Compare(key, bestKey) < 0 {
|
||||||
|
// This becomes our best candidate so far
|
||||||
// Only use this key if we haven't seen it from a newer iterator
|
bestKey = key
|
||||||
if !seenKeys[keyStr] {
|
bestValue = iter.Value()
|
||||||
// Mark as seen
|
bestIterIdx = i
|
||||||
seenKeys[keyStr] = true
|
|
||||||
|
|
||||||
// Update min key if needed
|
|
||||||
if minKey == nil || bytes.Compare(key, minKey) < 0 {
|
|
||||||
minKey = key
|
|
||||||
minValue = value
|
|
||||||
h.valid = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the found key/value
|
// Now we need to check if any newer iterators have the same key
|
||||||
if h.valid {
|
if bestIterIdx != -1 {
|
||||||
h.key = minKey
|
// Check all newer iterators (earlier in the slice) for the same key
|
||||||
h.value = minValue
|
for i := 0; i < bestIterIdx; i++ {
|
||||||
|
iter := h.iterators[i]
|
||||||
|
if !iter.Valid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a newer iterator has the same key, use its value
|
||||||
|
if bytes.Equal(iter.Key(), bestKey) {
|
||||||
|
bestValue = iter.Value()
|
||||||
|
break // Since iterators are in newest-to-oldest order, we can stop at the first match
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the found key/value
|
||||||
|
h.key = bestKey
|
||||||
|
h.value = bestValue
|
||||||
|
h.valid = true
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,23 +224,20 @@ func (h *HierarchicalIterator) GetSourceIterators() []iterator.Iterator {
|
|||||||
// Returns true if a valid key was found
|
// Returns true if a valid key was found
|
||||||
func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
|
func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
|
||||||
// Find the smallest key among all iterators that is > prevKey
|
// Find the smallest key among all iterators that is > prevKey
|
||||||
var minKey []byte
|
var bestKey []byte
|
||||||
var minValue []byte
|
var bestValue []byte
|
||||||
var seenKeys = make(map[string]bool)
|
var bestIterIdx int = -1
|
||||||
h.valid = false
|
h.valid = false
|
||||||
|
|
||||||
// First pass: collect all valid keys and find min key > prevKey
|
// First pass: advance all iterators past prevKey and find the smallest next key
|
||||||
for _, iter := range h.iterators {
|
for i, iter := range h.iterators {
|
||||||
// Skip invalid iterators
|
// Skip invalid iterators
|
||||||
if !iter.Valid() {
|
if !iter.Valid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
key := iter.Key()
|
|
||||||
value := iter.Value()
|
|
||||||
|
|
||||||
// Skip keys <= prevKey if we're looking for the next key
|
// Skip keys <= prevKey if we're looking for the next key
|
||||||
if prevKey != nil && bytes.Compare(key, prevKey) <= 0 {
|
if prevKey != nil && bytes.Compare(iter.Key(), prevKey) <= 0 {
|
||||||
// Advance to find a key > prevKey
|
// Advance to find a key > prevKey
|
||||||
for iter.Valid() && bytes.Compare(iter.Key(), prevKey) <= 0 {
|
for iter.Valid() && bytes.Compare(iter.Key(), prevKey) <= 0 {
|
||||||
if !iter.Next() {
|
if !iter.Next() {
|
||||||
@ -246,38 +249,40 @@ func (h *HierarchicalIterator) findNextUniqueKey(prevKey []byte) bool {
|
|||||||
if !iter.Valid() {
|
if !iter.Valid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the new key after advancing
|
|
||||||
key = iter.Key()
|
|
||||||
value = iter.Value()
|
|
||||||
|
|
||||||
// If key is still <= prevKey after advancing, skip this iterator
|
|
||||||
if bytes.Compare(key, prevKey) <= 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert key to string for map lookup
|
// Get the current key
|
||||||
keyStr := string(key)
|
key := iter.Key()
|
||||||
|
|
||||||
// If this key hasn't been seen before, or this is a newer source for the same key
|
// If we haven't found a valid key yet, or this key is smaller than the current best key
|
||||||
if !seenKeys[keyStr] {
|
if bestIterIdx == -1 || bytes.Compare(key, bestKey) < 0 {
|
||||||
// Mark this key as seen - it's from the newest source
|
// This becomes our best candidate so far
|
||||||
seenKeys[keyStr] = true
|
bestKey = key
|
||||||
|
bestValue = iter.Value()
|
||||||
// Check if this is a new minimum key
|
bestIterIdx = i
|
||||||
if minKey == nil || bytes.Compare(key, minKey) < 0 {
|
|
||||||
minKey = key
|
|
||||||
minValue = value
|
|
||||||
h.valid = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the key/value if we found a valid one
|
// Now we need to check if any newer iterators have the same key
|
||||||
if h.valid {
|
if bestIterIdx != -1 {
|
||||||
h.key = minKey
|
// Check all newer iterators (earlier in the slice) for the same key
|
||||||
h.value = minValue
|
for i := 0; i < bestIterIdx; i++ {
|
||||||
|
iter := h.iterators[i]
|
||||||
|
if !iter.Valid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a newer iterator has the same key, use its value
|
||||||
|
if bytes.Equal(iter.Key(), bestKey) {
|
||||||
|
bestValue = iter.Value()
|
||||||
|
break // Since iterators are in newest-to-oldest order, we can stop at the first match
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the found key/value
|
||||||
|
h.key = bestKey
|
||||||
|
h.value = bestValue
|
||||||
|
h.valid = true
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,16 +511,36 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
|
|||||||
var bytesWritten uint64
|
var bytesWritten uint64
|
||||||
|
|
||||||
// Write all entries to the SSTable
|
// Write all entries to the SSTable
|
||||||
|
// Since memtable's skiplist returns keys in sorted order,
|
||||||
|
// but possibly with duplicates (newer versions of same key first),
|
||||||
|
// we need to track the latest key we've seen to avoid duplicates
|
||||||
|
var lastKeyWritten []byte
|
||||||
|
|
||||||
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
|
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
|
||||||
// Skip deletion markers, only add value entries
|
// Skip deletion markers, only add value entries
|
||||||
if value := iter.Value(); value != nil {
|
if value := iter.Value(); value != nil {
|
||||||
key := iter.Key()
|
key := iter.Key()
|
||||||
|
|
||||||
|
// Skip duplicate keys (we've already written the newest version)
|
||||||
|
if lastKeyWritten != nil && bytes.Equal(key, lastKeyWritten) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
bytesWritten += uint64(len(key) + len(value))
|
bytesWritten += uint64(len(key) + len(value))
|
||||||
if err := writer.Add(key, value); err != nil {
|
if err := writer.Add(key, value); err != nil {
|
||||||
writer.Abort()
|
writer.Abort()
|
||||||
e.stats.WriteErrors.Add(1)
|
e.stats.WriteErrors.Add(1)
|
||||||
return fmt.Errorf("failed to add entry to SSTable: %w", err)
|
return fmt.Errorf("failed to add entry to SSTable: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remember this key to avoid duplicates
|
||||||
|
if lastKeyWritten == nil {
|
||||||
|
lastKeyWritten = make([]byte, len(key))
|
||||||
|
} else {
|
||||||
|
lastKeyWritten = lastKeyWritten[:0] // Reuse the slice
|
||||||
|
}
|
||||||
|
lastKeyWritten = append(lastKeyWritten, key...)
|
||||||
|
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -440,41 +440,23 @@ func (c *chainedIterator) SeekToFirst() {
|
|||||||
iter.SeekToFirst()
|
iter.SeekToFirst()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maps to track the best (newest) source for each key
|
// Find the iterator with the smallest key from the newest source
|
||||||
keyToSource := make(map[string]int) // Key -> best source index
|
c.current = -1
|
||||||
keyToLevel := make(map[string]int) // Key -> best source level (lower is better)
|
|
||||||
keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering)
|
|
||||||
|
|
||||||
// First pass: Find the best source for each key
|
// Find the smallest valid key
|
||||||
for i, iter := range c.iterators {
|
for i, iter := range c.iterators {
|
||||||
if !iter.Valid() {
|
if !iter.Valid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use string key for map
|
// If we haven't found a key yet, or this key is smaller than the current smallest
|
||||||
keyStr := string(iter.Key())
|
if c.current == -1 || bytes.Compare(iter.Key(), c.iterators[c.current].Key()) < 0 {
|
||||||
keyBytes := iter.Key()
|
c.current = i
|
||||||
level := c.sources[i].GetLevel()
|
} else if bytes.Equal(iter.Key(), c.iterators[c.current].Key()) {
|
||||||
|
// If keys are equal, prefer the newer source (lower level)
|
||||||
// If we haven't seen this key yet, or this source is newer
|
if c.sources[i].GetLevel() < c.sources[c.current].GetLevel() {
|
||||||
bestLevel, seen := keyToLevel[keyStr]
|
c.current = i
|
||||||
if !seen || level < bestLevel {
|
}
|
||||||
keyToSource[keyStr] = i
|
|
||||||
keyToLevel[keyStr] = level
|
|
||||||
keyToPos[keyStr] = keyBytes
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the smallest key in our deduplicated set
|
|
||||||
c.current = -1
|
|
||||||
var smallestKey []byte
|
|
||||||
|
|
||||||
for keyStr, sourceIdx := range keyToSource {
|
|
||||||
keyBytes := keyToPos[keyStr]
|
|
||||||
|
|
||||||
if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 {
|
|
||||||
c.current = sourceIdx
|
|
||||||
smallestKey = keyBytes
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -515,41 +497,23 @@ func (c *chainedIterator) Seek(target []byte) bool {
|
|||||||
iter.Seek(target)
|
iter.Seek(target)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maps to track the best (newest) source for each key
|
// Find the iterator with the smallest key from the newest source
|
||||||
keyToSource := make(map[string]int) // Key -> best source index
|
c.current = -1
|
||||||
keyToLevel := make(map[string]int) // Key -> best source level (lower is better)
|
|
||||||
keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering)
|
|
||||||
|
|
||||||
// First pass: Find the best source for each key
|
// Find the smallest valid key
|
||||||
for i, iter := range c.iterators {
|
for i, iter := range c.iterators {
|
||||||
if !iter.Valid() {
|
if !iter.Valid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use string key for map
|
// If we haven't found a key yet, or this key is smaller than the current smallest
|
||||||
keyStr := string(iter.Key())
|
if c.current == -1 || bytes.Compare(iter.Key(), c.iterators[c.current].Key()) < 0 {
|
||||||
keyBytes := iter.Key()
|
c.current = i
|
||||||
level := c.sources[i].GetLevel()
|
} else if bytes.Equal(iter.Key(), c.iterators[c.current].Key()) {
|
||||||
|
// If keys are equal, prefer the newer source (lower level)
|
||||||
// If we haven't seen this key yet, or this source is newer
|
if c.sources[i].GetLevel() < c.sources[c.current].GetLevel() {
|
||||||
bestLevel, seen := keyToLevel[keyStr]
|
c.current = i
|
||||||
if !seen || level < bestLevel {
|
}
|
||||||
keyToSource[keyStr] = i
|
|
||||||
keyToLevel[keyStr] = level
|
|
||||||
keyToPos[keyStr] = keyBytes
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the smallest key in our deduplicated set
|
|
||||||
c.current = -1
|
|
||||||
var smallestKey []byte
|
|
||||||
|
|
||||||
for keyStr, sourceIdx := range keyToSource {
|
|
||||||
keyBytes := keyToPos[keyStr]
|
|
||||||
|
|
||||||
if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 {
|
|
||||||
c.current = sourceIdx
|
|
||||||
smallestKey = keyBytes
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -571,46 +535,28 @@ func (c *chainedIterator) Next() bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maps to track the best (newest) source for each key
|
// Find the iterator with the smallest key from the newest source
|
||||||
keyToSource := make(map[string]int) // Key -> best source index
|
c.current = -1
|
||||||
keyToLevel := make(map[string]int) // Key -> best source level (lower is better)
|
|
||||||
keyToPos := make(map[string][]byte) // Key -> binary key value (for ordering)
|
|
||||||
|
|
||||||
// First pass: Find the best source for each key
|
// Find the smallest valid key that is greater than the current key
|
||||||
for i, iter := range c.iterators {
|
for i, iter := range c.iterators {
|
||||||
if !iter.Valid() {
|
if !iter.Valid() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use string key for map
|
// Skip if the key is the same as the current key (we've already advanced past it)
|
||||||
keyStr := string(iter.Key())
|
if bytes.Equal(iter.Key(), currentKey) {
|
||||||
keyBytes := iter.Key()
|
|
||||||
level := c.sources[i].GetLevel()
|
|
||||||
|
|
||||||
// If this key is the same as current, skip it
|
|
||||||
if bytes.Equal(keyBytes, currentKey) {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we haven't seen this key yet, or this source is newer
|
// If we haven't found a key yet, or this key is smaller than the current smallest
|
||||||
bestLevel, seen := keyToLevel[keyStr]
|
if c.current == -1 || bytes.Compare(iter.Key(), c.iterators[c.current].Key()) < 0 {
|
||||||
if !seen || level < bestLevel {
|
c.current = i
|
||||||
keyToSource[keyStr] = i
|
} else if bytes.Equal(iter.Key(), c.iterators[c.current].Key()) {
|
||||||
keyToLevel[keyStr] = level
|
// If keys are equal, prefer the newer source (lower level)
|
||||||
keyToPos[keyStr] = keyBytes
|
if c.sources[i].GetLevel() < c.sources[c.current].GetLevel() {
|
||||||
}
|
c.current = i
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the smallest key in our deduplicated set
|
|
||||||
c.current = -1
|
|
||||||
var smallestKey []byte
|
|
||||||
|
|
||||||
for keyStr, sourceIdx := range keyToSource {
|
|
||||||
keyBytes := keyToPos[keyStr]
|
|
||||||
|
|
||||||
if c.current == -1 || bytes.Compare(keyBytes, smallestKey) < 0 {
|
|
||||||
c.current = sourceIdx
|
|
||||||
smallestKey = keyBytes
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user