Compare commits

..

1 Commits

Author SHA1 Message Date
f9c3f17391
refactor: optimize two pass deduping into a single pass across the codebase
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m38s
2025-04-21 15:59:10 -06:00
2 changed files with 21 additions and 137 deletions

View File

@ -510,31 +510,37 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
count := 0
var bytesWritten uint64
// Write all entries to the SSTable
// Since memtable's skiplist returns keys in sorted order,
// but possibly with duplicates (newer versions of same key first),
// we need to track all processed keys (including tombstones)
var processedKeys = make(map[string]struct{})
// we need to track the latest key we've seen to avoid duplicates
var lastKeyWritten []byte
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
key := iter.Key()
keyStr := string(key) // Use as map key
// Skip keys we've already processed (including tombstones)
if _, seen := processedKeys[keyStr]; seen {
continue
}
// Mark this key as processed regardless of whether it's a value or tombstone
processedKeys[keyStr] = struct{}{}
// Only write non-tombstone entries to the SSTable
// Skip deletion markers, only add value entries
if value := iter.Value(); value != nil {
key := iter.Key()
// Skip duplicate keys (we've already written the newest version)
if lastKeyWritten != nil && bytes.Equal(key, lastKeyWritten) {
continue
}
bytesWritten += uint64(len(key) + len(value))
if err := writer.Add(key, value); err != nil {
writer.Abort()
e.stats.WriteErrors.Add(1)
return fmt.Errorf("failed to add entry to SSTable: %w", err)
}
// Remember this key to avoid duplicates
if lastKeyWritten == nil {
lastKeyWritten = make([]byte, len(key))
} else {
lastKeyWritten = lastKeyWritten[:0] // Reuse the slice
}
lastKeyWritten = append(lastKeyWritten, key...)
count++
}
}

View File

@ -74,128 +74,6 @@ func TestEngine_BasicOperations(t *testing.T) {
}
}
func TestEngine_SameKeyMultipleOperationsFlush(t *testing.T) {
_, engine, cleanup := setupTest(t)
defer cleanup()
// Simulate exactly the bug scenario from the CLI
// Add the same key multiple times with different values
key := []byte("foo")
// First add
if err := engine.Put(key, []byte("23")); err != nil {
t.Fatalf("Failed to put first value: %v", err)
}
// Delete it
if err := engine.Delete(key); err != nil {
t.Fatalf("Failed to delete key: %v", err)
}
// Add it again with different value
if err := engine.Put(key, []byte("42")); err != nil {
t.Fatalf("Failed to re-add key: %v", err)
}
// Add another key
if err := engine.Put([]byte("bar"), []byte("23")); err != nil {
t.Fatalf("Failed to add another key: %v", err)
}
// Add another key
if err := engine.Put([]byte("user:1"), []byte(`{"name":"John"}`)); err != nil {
t.Fatalf("Failed to add another key: %v", err)
}
// Verify before flush
value, err := engine.Get(key)
if err != nil {
t.Fatalf("Failed to get key before flush: %v", err)
}
if !bytes.Equal(value, []byte("42")) {
t.Errorf("Got incorrect value before flush. Expected: %s, Got: %s", "42", string(value))
}
// Force a flush of the memtable - this would have failed before the fix
tables := engine.memTablePool.GetMemTables()
if err := engine.flushMemTable(tables[0]); err != nil {
t.Fatalf("Error in flush with same key multiple operations: %v", err)
}
// Verify all keys after flush
value, err = engine.Get(key)
if err != nil {
t.Fatalf("Failed to get key after flush: %v", err)
}
if !bytes.Equal(value, []byte("42")) {
t.Errorf("Got incorrect value after flush. Expected: %s, Got: %s", "42", string(value))
}
value, err = engine.Get([]byte("bar"))
if err != nil {
t.Fatalf("Failed to get 'bar' after flush: %v", err)
}
if !bytes.Equal(value, []byte("23")) {
t.Errorf("Got incorrect value for 'bar' after flush. Expected: %s, Got: %s", "23", string(value))
}
value, err = engine.Get([]byte("user:1"))
if err != nil {
t.Fatalf("Failed to get 'user:1' after flush: %v", err)
}
if !bytes.Equal(value, []byte(`{"name":"John"}`)) {
t.Errorf("Got incorrect value for 'user:1' after flush. Expected: %s, Got: %s", `{"name":"John"}`, string(value))
}
}
func TestEngine_DuplicateKeysFlush(t *testing.T) {
_, engine, cleanup := setupTest(t)
defer cleanup()
// Test with a key that will be deleted and re-added multiple times
key := []byte("foo")
// Add the key
if err := engine.Put(key, []byte("42")); err != nil {
t.Fatalf("Failed to put initial value: %v", err)
}
// Delete the key
if err := engine.Delete(key); err != nil {
t.Fatalf("Failed to delete key: %v", err)
}
// Re-add the key with a different value
if err := engine.Put(key, []byte("43")); err != nil {
t.Fatalf("Failed to re-add key: %v", err)
}
// Delete again
if err := engine.Delete(key); err != nil {
t.Fatalf("Failed to delete key again: %v", err)
}
// Re-add once more
if err := engine.Put(key, []byte("44")); err != nil {
t.Fatalf("Failed to re-add key again: %v", err)
}
// Force a flush of the memtable
tables := engine.memTablePool.GetMemTables()
if err := engine.flushMemTable(tables[0]); err != nil {
t.Fatalf("Error flushing with duplicate keys: %v", err)
}
// Verify the key has the latest value
value, err := engine.Get(key)
if err != nil {
t.Fatalf("Failed to get key after flush: %v", err)
}
if !bytes.Equal(value, []byte("44")) {
t.Errorf("Got incorrect value after flush. Expected: %s, Got: %s", "44", string(value))
}
}
func TestEngine_MemTableFlush(t *testing.T) {
dir, engine, cleanup := setupTest(t)
defer cleanup()