Compare commits
1 Commits
001934e7b5
...
f9c3f17391
Author | SHA1 | Date | |
---|---|---|---|
f9c3f17391 |
@ -510,31 +510,37 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
|
||||
count := 0
|
||||
var bytesWritten uint64
|
||||
|
||||
// Write all entries to the SSTable
|
||||
// Since memtable's skiplist returns keys in sorted order,
|
||||
// but possibly with duplicates (newer versions of same key first),
|
||||
// we need to track all processed keys (including tombstones)
|
||||
var processedKeys = make(map[string]struct{})
|
||||
// we need to track the latest key we've seen to avoid duplicates
|
||||
var lastKeyWritten []byte
|
||||
|
||||
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
|
||||
// Skip deletion markers, only add value entries
|
||||
if value := iter.Value(); value != nil {
|
||||
key := iter.Key()
|
||||
keyStr := string(key) // Use as map key
|
||||
|
||||
// Skip keys we've already processed (including tombstones)
|
||||
if _, seen := processedKeys[keyStr]; seen {
|
||||
// Skip duplicate keys (we've already written the newest version)
|
||||
if lastKeyWritten != nil && bytes.Equal(key, lastKeyWritten) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Mark this key as processed regardless of whether it's a value or tombstone
|
||||
processedKeys[keyStr] = struct{}{}
|
||||
|
||||
// Only write non-tombstone entries to the SSTable
|
||||
if value := iter.Value(); value != nil {
|
||||
bytesWritten += uint64(len(key) + len(value))
|
||||
if err := writer.Add(key, value); err != nil {
|
||||
writer.Abort()
|
||||
e.stats.WriteErrors.Add(1)
|
||||
return fmt.Errorf("failed to add entry to SSTable: %w", err)
|
||||
}
|
||||
|
||||
// Remember this key to avoid duplicates
|
||||
if lastKeyWritten == nil {
|
||||
lastKeyWritten = make([]byte, len(key))
|
||||
} else {
|
||||
lastKeyWritten = lastKeyWritten[:0] // Reuse the slice
|
||||
}
|
||||
lastKeyWritten = append(lastKeyWritten, key...)
|
||||
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
@ -74,128 +74,6 @@ func TestEngine_BasicOperations(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_SameKeyMultipleOperationsFlush(t *testing.T) {
|
||||
_, engine, cleanup := setupTest(t)
|
||||
defer cleanup()
|
||||
|
||||
// Simulate exactly the bug scenario from the CLI
|
||||
// Add the same key multiple times with different values
|
||||
key := []byte("foo")
|
||||
|
||||
// First add
|
||||
if err := engine.Put(key, []byte("23")); err != nil {
|
||||
t.Fatalf("Failed to put first value: %v", err)
|
||||
}
|
||||
|
||||
// Delete it
|
||||
if err := engine.Delete(key); err != nil {
|
||||
t.Fatalf("Failed to delete key: %v", err)
|
||||
}
|
||||
|
||||
// Add it again with different value
|
||||
if err := engine.Put(key, []byte("42")); err != nil {
|
||||
t.Fatalf("Failed to re-add key: %v", err)
|
||||
}
|
||||
|
||||
// Add another key
|
||||
if err := engine.Put([]byte("bar"), []byte("23")); err != nil {
|
||||
t.Fatalf("Failed to add another key: %v", err)
|
||||
}
|
||||
|
||||
// Add another key
|
||||
if err := engine.Put([]byte("user:1"), []byte(`{"name":"John"}`)); err != nil {
|
||||
t.Fatalf("Failed to add another key: %v", err)
|
||||
}
|
||||
|
||||
// Verify before flush
|
||||
value, err := engine.Get(key)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get key before flush: %v", err)
|
||||
}
|
||||
if !bytes.Equal(value, []byte("42")) {
|
||||
t.Errorf("Got incorrect value before flush. Expected: %s, Got: %s", "42", string(value))
|
||||
}
|
||||
|
||||
// Force a flush of the memtable - this would have failed before the fix
|
||||
tables := engine.memTablePool.GetMemTables()
|
||||
if err := engine.flushMemTable(tables[0]); err != nil {
|
||||
t.Fatalf("Error in flush with same key multiple operations: %v", err)
|
||||
}
|
||||
|
||||
// Verify all keys after flush
|
||||
value, err = engine.Get(key)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get key after flush: %v", err)
|
||||
}
|
||||
if !bytes.Equal(value, []byte("42")) {
|
||||
t.Errorf("Got incorrect value after flush. Expected: %s, Got: %s", "42", string(value))
|
||||
}
|
||||
|
||||
value, err = engine.Get([]byte("bar"))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get 'bar' after flush: %v", err)
|
||||
}
|
||||
if !bytes.Equal(value, []byte("23")) {
|
||||
t.Errorf("Got incorrect value for 'bar' after flush. Expected: %s, Got: %s", "23", string(value))
|
||||
}
|
||||
|
||||
value, err = engine.Get([]byte("user:1"))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get 'user:1' after flush: %v", err)
|
||||
}
|
||||
if !bytes.Equal(value, []byte(`{"name":"John"}`)) {
|
||||
t.Errorf("Got incorrect value for 'user:1' after flush. Expected: %s, Got: %s", `{"name":"John"}`, string(value))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DuplicateKeysFlush(t *testing.T) {
|
||||
_, engine, cleanup := setupTest(t)
|
||||
defer cleanup()
|
||||
|
||||
// Test with a key that will be deleted and re-added multiple times
|
||||
key := []byte("foo")
|
||||
|
||||
// Add the key
|
||||
if err := engine.Put(key, []byte("42")); err != nil {
|
||||
t.Fatalf("Failed to put initial value: %v", err)
|
||||
}
|
||||
|
||||
// Delete the key
|
||||
if err := engine.Delete(key); err != nil {
|
||||
t.Fatalf("Failed to delete key: %v", err)
|
||||
}
|
||||
|
||||
// Re-add the key with a different value
|
||||
if err := engine.Put(key, []byte("43")); err != nil {
|
||||
t.Fatalf("Failed to re-add key: %v", err)
|
||||
}
|
||||
|
||||
// Delete again
|
||||
if err := engine.Delete(key); err != nil {
|
||||
t.Fatalf("Failed to delete key again: %v", err)
|
||||
}
|
||||
|
||||
// Re-add once more
|
||||
if err := engine.Put(key, []byte("44")); err != nil {
|
||||
t.Fatalf("Failed to re-add key again: %v", err)
|
||||
}
|
||||
|
||||
// Force a flush of the memtable
|
||||
tables := engine.memTablePool.GetMemTables()
|
||||
if err := engine.flushMemTable(tables[0]); err != nil {
|
||||
t.Fatalf("Error flushing with duplicate keys: %v", err)
|
||||
}
|
||||
|
||||
// Verify the key has the latest value
|
||||
value, err := engine.Get(key)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get key after flush: %v", err)
|
||||
}
|
||||
if !bytes.Equal(value, []byte("44")) {
|
||||
t.Errorf("Got incorrect value after flush. Expected: %s, Got: %s", "44", string(value))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_MemTableFlush(t *testing.T) {
|
||||
dir, engine, cleanup := setupTest(t)
|
||||
defer cleanup()
|
||||
|
Loading…
Reference in New Issue
Block a user