mvcc/main.go

175 lines
3.3 KiB
Go

package main
import (
"fmt"
"sync"
"time"
)
type Timestamp int64
type VersionedValue struct {
Value string
Timestamp Timestamp
}
type KeyHistory struct {
Versions []VersionedValue
}
type Store struct {
mu sync.RWMutex
data map[string]*KeyHistory
clock Timestamp
}
type Transaction struct {
store *Store
readTS Timestamp
writeBuffer map[string]string
committed bool
}
func NewStore() *Store {
return &Store{
data: make(map[string]*KeyHistory),
clock: Timestamp(time.Now().UnixNano()),
}
}
func (s *Store) nextTimestamp() Timestamp {
s.clock++
return s.clock
}
func (s *Store) Read(key string, ts Timestamp) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
history, exists := s.data[key]
if !exists {
return "", false
}
for i := len(history.Versions) - 1; i >= 0; i-- {
if history.Versions[i].Timestamp <= ts {
return history.Versions[i].Value, true
}
}
return "", false
}
func (s *Store) Begin() *Transaction {
readTS := s.nextTimestamp()
return &Transaction{
store: s,
readTS: readTS,
writeBuffer: make(map[string]string),
}
}
func (tx *Transaction) Read(key string) (string, bool) {
if val, ok := tx.writeBuffer[key]; ok {
return val, true
}
return tx.store.Read(key, tx.readTS)
}
func (tx *Transaction) Write(key, value string) {
tx.writeBuffer[key] = value
}
func (tx *Transaction) Commit() error {
if tx.committed {
return fmt.Errorf("transaction already committed")
}
tx.store.mu.Lock()
defer tx.store.mu.Unlock()
// Conflict detection
for key := range tx.writeBuffer {
history, exists := tx.store.data[key]
if !exists {
continue
}
latest := history.Versions[len(history.Versions)-1]
if latest.Timestamp > tx.readTS {
return fmt.Errorf("write-write conflict on key '%s'", key)
}
}
commitTS := tx.store.nextTimestamp()
for key, val := range tx.writeBuffer {
history, exists := tx.store.data[key]
if !exists {
history = &KeyHistory{}
tx.store.data[key] = history
}
history.Versions = append(history.Versions, VersionedValue{
Value: val,
Timestamp: commitTS,
})
}
tx.committed = true
return nil
}
func main() {
store := NewStore()
// Transaction 1: Writes "foo" -> "bar"
tx1 := store.Begin()
tx1.Write("foo", "bar")
err := tx1.Commit()
if err != nil {
fmt.Println("tx1 commit error:", err)
} else {
fmt.Println("tx1 committed")
}
// Transaction 2: Reads "foo"
tx2 := store.Begin()
val, ok := tx2.Read("foo")
if ok {
fmt.Printf("tx2 reads foo: %s\n", val)
} else {
fmt.Println("tx2 reads foo: not found")
}
// Transaction 3: Conflicting write
tx3 := store.Begin()
tx3.Write("foo", "baz")
// tx2 committed first, so tx3 will conflict if tx2 also wrote to "foo"
err = tx3.Commit()
if err != nil {
fmt.Println("tx3 commit error:", err)
} else {
fmt.Println("tx3 committed")
}
// Transaction 4: Should read latest "foo"
tx4 := store.Begin()
val, _ = tx4.Read("foo")
fmt.Printf("tx4 reads foo: %s\n", val)
tx5 := store.Begin()
tx6 := store.Begin()
tx5.Write("x", "b")
if err := tx5.Commit(); err != nil {
fmt.Println("tx5 commit error:", err)
} else {
fmt.Println("tx5 committed")
}
tx6.Write("x", "c")
if err := tx6.Commit(); err != nil {
fmt.Println("tx6 commit error (expected conflict):", err)
} else {
fmt.Println("tx6 committed (unexpectedly)")
}
}