175 lines
3.3 KiB
Go
175 lines
3.3 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Timestamp int64
|
|
|
|
type VersionedValue struct {
|
|
Value string
|
|
Timestamp Timestamp
|
|
}
|
|
|
|
type KeyHistory struct {
|
|
Versions []VersionedValue
|
|
}
|
|
|
|
type Store struct {
|
|
mu sync.RWMutex
|
|
data map[string]*KeyHistory
|
|
clock Timestamp
|
|
}
|
|
|
|
type Transaction struct {
|
|
store *Store
|
|
readTS Timestamp
|
|
writeBuffer map[string]string
|
|
committed bool
|
|
}
|
|
|
|
func NewStore() *Store {
|
|
return &Store{
|
|
data: make(map[string]*KeyHistory),
|
|
clock: Timestamp(time.Now().UnixNano()),
|
|
}
|
|
}
|
|
|
|
func (s *Store) nextTimestamp() Timestamp {
|
|
s.clock++
|
|
return s.clock
|
|
}
|
|
|
|
func (s *Store) Read(key string, ts Timestamp) (string, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
history, exists := s.data[key]
|
|
if !exists {
|
|
return "", false
|
|
}
|
|
|
|
for i := len(history.Versions) - 1; i >= 0; i-- {
|
|
if history.Versions[i].Timestamp <= ts {
|
|
return history.Versions[i].Value, true
|
|
}
|
|
}
|
|
|
|
return "", false
|
|
}
|
|
|
|
func (s *Store) Begin() *Transaction {
|
|
readTS := s.nextTimestamp()
|
|
return &Transaction{
|
|
store: s,
|
|
readTS: readTS,
|
|
writeBuffer: make(map[string]string),
|
|
}
|
|
}
|
|
|
|
func (tx *Transaction) Read(key string) (string, bool) {
|
|
if val, ok := tx.writeBuffer[key]; ok {
|
|
return val, true
|
|
}
|
|
return tx.store.Read(key, tx.readTS)
|
|
}
|
|
|
|
func (tx *Transaction) Write(key, value string) {
|
|
tx.writeBuffer[key] = value
|
|
}
|
|
|
|
func (tx *Transaction) Commit() error {
|
|
if tx.committed {
|
|
return fmt.Errorf("transaction already committed")
|
|
}
|
|
|
|
tx.store.mu.Lock()
|
|
defer tx.store.mu.Unlock()
|
|
|
|
// Conflict detection
|
|
for key := range tx.writeBuffer {
|
|
history, exists := tx.store.data[key]
|
|
if !exists {
|
|
continue
|
|
}
|
|
latest := history.Versions[len(history.Versions)-1]
|
|
if latest.Timestamp > tx.readTS {
|
|
return fmt.Errorf("write-write conflict on key '%s'", key)
|
|
}
|
|
}
|
|
|
|
commitTS := tx.store.nextTimestamp()
|
|
for key, val := range tx.writeBuffer {
|
|
history, exists := tx.store.data[key]
|
|
if !exists {
|
|
history = &KeyHistory{}
|
|
tx.store.data[key] = history
|
|
}
|
|
history.Versions = append(history.Versions, VersionedValue{
|
|
Value: val,
|
|
Timestamp: commitTS,
|
|
})
|
|
}
|
|
|
|
tx.committed = true
|
|
return nil
|
|
}
|
|
|
|
func main() {
|
|
store := NewStore()
|
|
|
|
// Transaction 1: Writes "foo" -> "bar"
|
|
tx1 := store.Begin()
|
|
tx1.Write("foo", "bar")
|
|
err := tx1.Commit()
|
|
if err != nil {
|
|
fmt.Println("tx1 commit error:", err)
|
|
} else {
|
|
fmt.Println("tx1 committed")
|
|
}
|
|
|
|
// Transaction 2: Reads "foo"
|
|
tx2 := store.Begin()
|
|
val, ok := tx2.Read("foo")
|
|
if ok {
|
|
fmt.Printf("tx2 reads foo: %s\n", val)
|
|
} else {
|
|
fmt.Println("tx2 reads foo: not found")
|
|
}
|
|
|
|
// Transaction 3: Conflicting write
|
|
tx3 := store.Begin()
|
|
tx3.Write("foo", "baz")
|
|
// tx2 committed first, so tx3 will conflict if tx2 also wrote to "foo"
|
|
err = tx3.Commit()
|
|
if err != nil {
|
|
fmt.Println("tx3 commit error:", err)
|
|
} else {
|
|
fmt.Println("tx3 committed")
|
|
}
|
|
|
|
// Transaction 4: Should read latest "foo"
|
|
tx4 := store.Begin()
|
|
val, _ = tx4.Read("foo")
|
|
fmt.Printf("tx4 reads foo: %s\n", val)
|
|
|
|
tx5 := store.Begin()
|
|
tx6 := store.Begin()
|
|
|
|
tx5.Write("x", "b")
|
|
if err := tx5.Commit(); err != nil {
|
|
fmt.Println("tx5 commit error:", err)
|
|
} else {
|
|
fmt.Println("tx5 committed")
|
|
}
|
|
|
|
tx6.Write("x", "c")
|
|
if err := tx6.Commit(); err != nil {
|
|
fmt.Println("tx6 commit error (expected conflict):", err)
|
|
} else {
|
|
fmt.Println("tx6 committed (unexpectedly)")
|
|
}
|
|
}
|