mvcc/main.go

201 lines
4.1 KiB
Go

package main
import (
"fmt"
"sync"
"time"
)
type Timestamp int64
type VersionedValue struct {
Value string
Timestamp Timestamp
}
type KeyHistory struct {
Versions []VersionedValue
}
type Store struct {
mu sync.RWMutex
data map[string]*KeyHistory
clock Timestamp
}
type Transaction struct {
store *Store
readTS Timestamp
readSet map[string]struct{}
writeBuffer map[string]string
committed bool
}
func NewStore() *Store {
return &Store{
data: make(map[string]*KeyHistory),
clock: Timestamp(time.Now().UnixNano()),
}
}
func (s *Store) nextTimestamp() Timestamp {
s.clock++
return s.clock
}
func (s *Store) Read(key string, ts Timestamp) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
history, exists := s.data[key]
if !exists {
return "", false
}
for i := len(history.Versions) - 1; i >= 0; i-- {
if history.Versions[i].Timestamp <= ts {
return history.Versions[i].Value, true
}
}
return "", false
}
func (s *Store) Begin() *Transaction {
readTS := s.nextTimestamp()
return &Transaction{
store: s,
readTS: readTS,
readSet: make(map[string]struct{}),
writeBuffer: make(map[string]string),
}
}
func (tx *Transaction) Read(key string) (string, bool) {
tx.readSet[key] = struct{}{}
if val, ok := tx.writeBuffer[key]; ok {
return val, true
}
return tx.store.Read(key, tx.readTS)
}
func (tx *Transaction) Write(key, value string) {
tx.writeBuffer[key] = value
}
func (tx *Transaction) Commit() error {
if tx.committed {
return fmt.Errorf("transaction already committed")
}
s := tx.store
s.mu.Lock()
defer s.mu.Unlock()
getLatestTS := func(key string) Timestamp {
if h, ok := s.data[key]; ok && len(h.Versions) > 0 {
return h.Versions[len(h.Versions)-1].Timestamp
}
return 0
}
// Write-write Conflict detection
for key := range tx.writeBuffer {
if getLatestTS(key) > tx.readTS {
return fmt.Errorf("write-write conflict on key: %s", key)
}
}
// Read-write conflict detection (excluding keys we're writing)
for key := range tx.readSet {
if _, isWriting := tx.writeBuffer[key]; isWriting {
continue
}
if getLatestTS(key) > tx.readTS {
return fmt.Errorf("read-write conflict on key: %s", key)
}
}
commitTS := tx.store.nextTimestamp()
for key, val := range tx.writeBuffer {
history, exists := s.data[key]
if !exists {
history = &KeyHistory{}
s.data[key] = history
}
history.Versions = append(history.Versions, VersionedValue{
Value: val,
Timestamp: commitTS,
})
}
tx.committed = true
return nil
}
func main() {
store := NewStore()
// Transaction 1: Writes "foo" -> "bar"
tx1 := store.Begin()
tx1.Write("foo", "bar")
err := tx1.Commit()
if err != nil {
fmt.Println("tx1 commit error:", err)
} else {
fmt.Println("tx1 committed")
}
// Transaction 2: Reads "foo"
tx2 := store.Begin()
val, ok := tx2.Read("foo")
if ok {
fmt.Printf("tx2 reads foo: %s\n", val)
} else {
fmt.Println("tx2 reads foo: not found")
}
// Transaction 3: Conflicting write — modifies "foo" after tx2 read it
tx3 := store.Begin()
tx3.Write("foo", "baz")
err = tx3.Commit()
if err != nil {
fmt.Println("tx3 commit error:", err)
} else {
fmt.Println("tx3 committed")
}
// Now tx2 tries to commit after reading a value that's been modified — should conflict
tx2.Write("bar", "new") // tx2 tries to do something unrelated, but it read "foo"
err = tx2.Commit()
if err != nil {
fmt.Println("tx2 commit error (expected read-write conflict):", err)
} else {
fmt.Println("tx2 committed (unexpected!)")
}
// Transaction 4: Should read latest "foo"
tx4 := store.Begin()
val, _ = tx4.Read("foo")
fmt.Printf("tx4 reads foo: %s\n", val)
// Additional write-write conflict test (unchanged)
tx5 := store.Begin()
tx6 := store.Begin()
tx5.Write("x", "b")
if err := tx5.Commit(); err != nil {
fmt.Println("tx5 commit error:", err)
} else {
fmt.Println("tx5 committed")
}
tx6.Write("x", "c")
if err := tx6.Commit(); err != nil {
fmt.Println("tx6 commit error (expected conflict):", err)
} else {
fmt.Println("tx6 committed (unexpectedly)")
}
}