Compare commits
3 Commits
f9c3f17391
...
001934e7b5
Author | SHA1 | Date | |
---|---|---|---|
001934e7b5 | |||
1a0de922af | |||
4e813aa3fd |
@ -510,37 +510,31 @@ func (e *Engine) flushMemTable(mem *memtable.MemTable) error {
|
|||||||
count := 0
|
count := 0
|
||||||
var bytesWritten uint64
|
var bytesWritten uint64
|
||||||
|
|
||||||
// Write all entries to the SSTable
|
|
||||||
// Since memtable's skiplist returns keys in sorted order,
|
// Since memtable's skiplist returns keys in sorted order,
|
||||||
// but possibly with duplicates (newer versions of same key first),
|
// but possibly with duplicates (newer versions of same key first),
|
||||||
// we need to track the latest key we've seen to avoid duplicates
|
// we need to track all processed keys (including tombstones)
|
||||||
var lastKeyWritten []byte
|
var processedKeys = make(map[string]struct{})
|
||||||
|
|
||||||
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
|
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
|
||||||
// Skip deletion markers, only add value entries
|
|
||||||
if value := iter.Value(); value != nil {
|
|
||||||
key := iter.Key()
|
key := iter.Key()
|
||||||
|
keyStr := string(key) // Use as map key
|
||||||
|
|
||||||
// Skip duplicate keys (we've already written the newest version)
|
// Skip keys we've already processed (including tombstones)
|
||||||
if lastKeyWritten != nil && bytes.Equal(key, lastKeyWritten) {
|
if _, seen := processedKeys[keyStr]; seen {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark this key as processed regardless of whether it's a value or tombstone
|
||||||
|
processedKeys[keyStr] = struct{}{}
|
||||||
|
|
||||||
|
// Only write non-tombstone entries to the SSTable
|
||||||
|
if value := iter.Value(); value != nil {
|
||||||
bytesWritten += uint64(len(key) + len(value))
|
bytesWritten += uint64(len(key) + len(value))
|
||||||
if err := writer.Add(key, value); err != nil {
|
if err := writer.Add(key, value); err != nil {
|
||||||
writer.Abort()
|
writer.Abort()
|
||||||
e.stats.WriteErrors.Add(1)
|
e.stats.WriteErrors.Add(1)
|
||||||
return fmt.Errorf("failed to add entry to SSTable: %w", err)
|
return fmt.Errorf("failed to add entry to SSTable: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remember this key to avoid duplicates
|
|
||||||
if lastKeyWritten == nil {
|
|
||||||
lastKeyWritten = make([]byte, len(key))
|
|
||||||
} else {
|
|
||||||
lastKeyWritten = lastKeyWritten[:0] // Reuse the slice
|
|
||||||
}
|
|
||||||
lastKeyWritten = append(lastKeyWritten, key...)
|
|
||||||
|
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -74,6 +74,128 @@ func TestEngine_BasicOperations(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEngine_SameKeyMultipleOperationsFlush(t *testing.T) {
|
||||||
|
_, engine, cleanup := setupTest(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Simulate exactly the bug scenario from the CLI
|
||||||
|
// Add the same key multiple times with different values
|
||||||
|
key := []byte("foo")
|
||||||
|
|
||||||
|
// First add
|
||||||
|
if err := engine.Put(key, []byte("23")); err != nil {
|
||||||
|
t.Fatalf("Failed to put first value: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete it
|
||||||
|
if err := engine.Delete(key); err != nil {
|
||||||
|
t.Fatalf("Failed to delete key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add it again with different value
|
||||||
|
if err := engine.Put(key, []byte("42")); err != nil {
|
||||||
|
t.Fatalf("Failed to re-add key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add another key
|
||||||
|
if err := engine.Put([]byte("bar"), []byte("23")); err != nil {
|
||||||
|
t.Fatalf("Failed to add another key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add another key
|
||||||
|
if err := engine.Put([]byte("user:1"), []byte(`{"name":"John"}`)); err != nil {
|
||||||
|
t.Fatalf("Failed to add another key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify before flush
|
||||||
|
value, err := engine.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get key before flush: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(value, []byte("42")) {
|
||||||
|
t.Errorf("Got incorrect value before flush. Expected: %s, Got: %s", "42", string(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force a flush of the memtable - this would have failed before the fix
|
||||||
|
tables := engine.memTablePool.GetMemTables()
|
||||||
|
if err := engine.flushMemTable(tables[0]); err != nil {
|
||||||
|
t.Fatalf("Error in flush with same key multiple operations: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all keys after flush
|
||||||
|
value, err = engine.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get key after flush: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(value, []byte("42")) {
|
||||||
|
t.Errorf("Got incorrect value after flush. Expected: %s, Got: %s", "42", string(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err = engine.Get([]byte("bar"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get 'bar' after flush: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(value, []byte("23")) {
|
||||||
|
t.Errorf("Got incorrect value for 'bar' after flush. Expected: %s, Got: %s", "23", string(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err = engine.Get([]byte("user:1"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get 'user:1' after flush: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(value, []byte(`{"name":"John"}`)) {
|
||||||
|
t.Errorf("Got incorrect value for 'user:1' after flush. Expected: %s, Got: %s", `{"name":"John"}`, string(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEngine_DuplicateKeysFlush(t *testing.T) {
|
||||||
|
_, engine, cleanup := setupTest(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Test with a key that will be deleted and re-added multiple times
|
||||||
|
key := []byte("foo")
|
||||||
|
|
||||||
|
// Add the key
|
||||||
|
if err := engine.Put(key, []byte("42")); err != nil {
|
||||||
|
t.Fatalf("Failed to put initial value: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the key
|
||||||
|
if err := engine.Delete(key); err != nil {
|
||||||
|
t.Fatalf("Failed to delete key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-add the key with a different value
|
||||||
|
if err := engine.Put(key, []byte("43")); err != nil {
|
||||||
|
t.Fatalf("Failed to re-add key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete again
|
||||||
|
if err := engine.Delete(key); err != nil {
|
||||||
|
t.Fatalf("Failed to delete key again: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-add once more
|
||||||
|
if err := engine.Put(key, []byte("44")); err != nil {
|
||||||
|
t.Fatalf("Failed to re-add key again: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force a flush of the memtable
|
||||||
|
tables := engine.memTablePool.GetMemTables()
|
||||||
|
if err := engine.flushMemTable(tables[0]); err != nil {
|
||||||
|
t.Fatalf("Error flushing with duplicate keys: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the key has the latest value
|
||||||
|
value, err := engine.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get key after flush: %v", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(value, []byte("44")) {
|
||||||
|
t.Errorf("Got incorrect value after flush. Expected: %s, Got: %s", "44", string(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEngine_MemTableFlush(t *testing.T) {
|
func TestEngine_MemTableFlush(t *testing.T) {
|
||||||
dir, engine, cleanup := setupTest(t)
|
dir, engine, cleanup := setupTest(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
Loading…
Reference in New Issue
Block a user