test: add a concurrent read/write test on memtables
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m48s
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m48s
This commit is contained in:
parent
00b2566464
commit
2e3c70e913
@ -1,6 +1,10 @@
|
||||
package memtable
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -70,6 +74,149 @@ func TestMemTableSequenceNumbers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestConcurrentReadWrite tests that concurrent reads and writes work as expected
|
||||
func TestConcurrentReadWrite(t *testing.T) {
|
||||
mt := NewMemTable()
|
||||
|
||||
// Create some initial data
|
||||
const initialKeys = 1000
|
||||
for i := 0; i < initialKeys; i++ {
|
||||
key := []byte(fmt.Sprintf("initial-key-%d", i))
|
||||
value := []byte(fmt.Sprintf("initial-value-%d", i))
|
||||
mt.Put(key, value, uint64(i))
|
||||
}
|
||||
|
||||
// Perform concurrent reads and writes
|
||||
const (
|
||||
numReaders = 4
|
||||
numWriters = 2
|
||||
opsPerGoroutine = 1000
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numReaders + numWriters)
|
||||
|
||||
// Start reader goroutines
|
||||
for r := 0; r < numReaders; r++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
// Each reader has its own random source
|
||||
rnd := rand.New(rand.NewSource(int64(id)))
|
||||
|
||||
for i := 0; i < opsPerGoroutine; i++ {
|
||||
// Read an existing key (one we know exists)
|
||||
idx := rnd.Intn(initialKeys)
|
||||
key := []byte(fmt.Sprintf("initial-key-%d", idx))
|
||||
expectedValue := fmt.Sprintf("initial-value-%d", idx)
|
||||
|
||||
value, found := mt.Get(key)
|
||||
if !found {
|
||||
t.Errorf("Reader %d: expected to find key %s but it wasn't found", id, string(key))
|
||||
continue
|
||||
}
|
||||
|
||||
// Due to concurrent writes, the value might have been updated or deleted
|
||||
// but we at least expect to find the key
|
||||
if value != nil && string(value) != expectedValue {
|
||||
// This is ok - it means a writer updated this key while we were reading
|
||||
// Just ensure it follows the expected pattern for a writer
|
||||
if !strings.HasPrefix(string(value), "updated-value-") {
|
||||
t.Errorf("Reader %d: unexpected value for key %s: %s", id, string(key), string(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}(r)
|
||||
}
|
||||
|
||||
// Start writer goroutines
|
||||
for w := 0; w < numWriters; w++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
// Each writer has its own random source
|
||||
rnd := rand.New(rand.NewSource(int64(id + numReaders)))
|
||||
|
||||
for i := 0; i < opsPerGoroutine; i++ {
|
||||
// Pick an operation: 50% updates, 25% inserts, 25% deletes
|
||||
op := rnd.Intn(4)
|
||||
var key []byte
|
||||
|
||||
if op < 2 {
|
||||
// Update an existing key
|
||||
idx := rnd.Intn(initialKeys)
|
||||
key = []byte(fmt.Sprintf("initial-key-%d", idx))
|
||||
value := []byte(fmt.Sprintf("updated-value-%d-%d-%d", id, i, idx))
|
||||
mt.Put(key, value, uint64(initialKeys + id*opsPerGoroutine + i))
|
||||
} else if op == 2 {
|
||||
// Insert a new key
|
||||
key = []byte(fmt.Sprintf("new-key-%d-%d", id, i))
|
||||
value := []byte(fmt.Sprintf("new-value-%d-%d", id, i))
|
||||
mt.Put(key, value, uint64(initialKeys + id*opsPerGoroutine + i))
|
||||
} else {
|
||||
// Delete a key
|
||||
idx := rnd.Intn(initialKeys)
|
||||
key = []byte(fmt.Sprintf("initial-key-%d", idx))
|
||||
mt.Delete(key, uint64(initialKeys + id*opsPerGoroutine + i))
|
||||
}
|
||||
}
|
||||
}(w)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to finish
|
||||
wg.Wait()
|
||||
|
||||
// Verify the memtable is in a consistent state
|
||||
verifyInitialKeys := 0
|
||||
verifyNewKeys := 0
|
||||
verifyUpdatedKeys := 0
|
||||
verifyDeletedKeys := 0
|
||||
|
||||
for i := 0; i < initialKeys; i++ {
|
||||
key := []byte(fmt.Sprintf("initial-key-%d", i))
|
||||
value, found := mt.Get(key)
|
||||
|
||||
if !found {
|
||||
// This key should always be found, but it might be deleted
|
||||
t.Errorf("expected to find key %s, but it wasn't found", string(key))
|
||||
continue
|
||||
}
|
||||
|
||||
if value == nil {
|
||||
// This key was deleted
|
||||
verifyDeletedKeys++
|
||||
} else if strings.HasPrefix(string(value), "initial-value-") {
|
||||
// This key still has its original value
|
||||
verifyInitialKeys++
|
||||
} else if strings.HasPrefix(string(value), "updated-value-") {
|
||||
// This key was updated
|
||||
verifyUpdatedKeys++
|
||||
}
|
||||
}
|
||||
|
||||
// Check for new keys that were inserted
|
||||
for w := 0; w < numWriters; w++ {
|
||||
for i := 0; i < opsPerGoroutine; i++ {
|
||||
key := []byte(fmt.Sprintf("new-key-%d-%d", w, i))
|
||||
_, found := mt.Get(key)
|
||||
if found {
|
||||
verifyNewKeys++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log the statistics of what happened
|
||||
t.Logf("Verified keys after concurrent operations:")
|
||||
t.Logf(" - Original keys remaining: %d", verifyInitialKeys)
|
||||
t.Logf(" - Updated keys: %d", verifyUpdatedKeys)
|
||||
t.Logf(" - Deleted keys: %d", verifyDeletedKeys)
|
||||
t.Logf(" - New keys inserted: %d", verifyNewKeys)
|
||||
|
||||
// Make sure the counts add up correctly
|
||||
if verifyInitialKeys + verifyUpdatedKeys + verifyDeletedKeys != initialKeys {
|
||||
t.Errorf("Key count mismatch: %d + %d + %d != %d",
|
||||
verifyInitialKeys, verifyUpdatedKeys, verifyDeletedKeys, initialKeys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemTableImmutability(t *testing.T) {
|
||||
mt := NewMemTable()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user