package engine import ( "fmt" "os" "path/filepath" "sync" "github.com/google/uuid" "github.com/jeremytregunna/kevo/pkg/common/clock" "github.com/jeremytregunna/kevo/pkg/common/log" "github.com/jeremytregunna/kevo/pkg/wal" ) // NodeIDFile is the name of the file that stores the database's node ID const NodeIDFile = "NODE_ID" // loadNodeID attempts to load the node ID from the file system // Returns the NodeID if successful or an error if the file doesn't exist or is invalid func loadNodeID(dbDir string) (clock.NodeID, error) { idPath := filepath.Join(dbDir, NodeIDFile) data, err := os.ReadFile(idPath) if err != nil { return clock.NodeID{}, err // Return the original error for better diagnostics } // Validate length if len(data) != 16 { return clock.NodeID{}, fmt.Errorf("invalid node ID file format (expected 16 bytes, got %d)", len(data)) } // Convert to NodeID var nodeID clock.NodeID copy(nodeID[:], data) return nodeID, nil } // createNodeID generates a new node ID and persists it func createNodeID(dbDir string) (clock.NodeID, error) { var nodeID clock.NodeID // Generate a UUID v4 id, err := uuid.NewRandom() if err != nil { return clock.NodeID{}, fmt.Errorf("failed to generate UUID: %w", err) } // Copy the UUID bytes to our NodeID copy(nodeID[:], id[:]) // Ensure directory exists if err := os.MkdirAll(dbDir, 0755); err != nil { return clock.NodeID{}, fmt.Errorf("failed to create database directory: %w", err) } // Write the ID to the file idPath := filepath.Join(dbDir, NodeIDFile) if err := os.WriteFile(idPath, nodeID[:], 0644); err != nil { return clock.NodeID{}, fmt.Errorf("failed to write node ID file: %w", err) } return nodeID, nil } // loadOrCreateNodeID first tries to load an existing node ID, and if that fails, // creates a new one func loadOrCreateNodeID(dbDir string) (clock.NodeID, error) { nodeID, err := loadNodeID(dbDir) if err == nil { return nodeID, nil } if os.IsNotExist(err) { // File doesn't exist, create new ID return createNodeID(dbDir) } // Some other error occurred while loading return clock.NodeID{}, fmt.Errorf("failed to access node ID file: %w", err) } // initializeReplication sets up replication components for the engine // This should be called during engine initialization func (e *Engine) initializeReplication() error { // Load or create node ID nodeID, err := loadOrCreateNodeID(e.dataDir) if err != nil { return fmt.Errorf("failed to initialize node ID: %w", err) } // Create Lamport clock with this node ID e.lamportClock = clock.NewLamportClock(nodeID) e.nodeID = nodeID // Set the clock on the WAL if err := e.wal.SetLogicalClock(e.lamportClock); err != nil { return fmt.Errorf("failed to set logical clock on WAL: %w", err) } // Create the replication manager e.replicationMgr = NewReplicationManager(e) // Set the replication hook on the WAL e.wal.SetReplicationHook(e.replicationMgr) return nil } // GetNodeID returns the database instance's stable node ID func (e *Engine) GetNodeID() clock.NodeID { return e.nodeID } // GetLamportClock returns the engine's Lamport clock func (e *Engine) GetLamportClock() *clock.LamportClock { return e.lamportClock } // ReplicationManager handles replication operations for the engine type ReplicationManager struct { engine *Engine mu sync.RWMutex isLeader bool isReplica bool replicaIDs map[string]clock.NodeID // Maps replica IDs to their NodeIDs replicaPositions map[string]clock.Timestamp // Tracks the latest position for each replica logChan chan *ReplicationLogEntry // Channel for log entries to be processed stopChan chan struct{} // Channel to signal stopping logger log.Logger // Logger interface for replication events } // ReplicationLogEntry represents a WAL entry that needs to be replicated type ReplicationLogEntry struct { entry *wal.Entry // The WAL entry timestamp clock.Timestamp // The Lamport timestamp batch bool // Whether this is part of a batch batchSize int // Size of batch if part of batch } // NewReplicationManager creates a new replication manager for the engine func NewReplicationManager(engine *Engine) *ReplicationManager { logger := log.GetDefaultLogger().WithField("component", "replication") rm := &ReplicationManager{ engine: engine, replicaIDs: make(map[string]clock.NodeID), replicaPositions: make(map[string]clock.Timestamp), logChan: make(chan *ReplicationLogEntry, 1000), // Buffer for 1000 entries stopChan: make(chan struct{}), logger: logger, } // Start the replication processor goroutine go rm.processReplicationEntries() logger.Info("Replication manager initialized") return rm } // processReplicationEntries handles replication entries in the background func (rm *ReplicationManager) processReplicationEntries() { for { select { case entry := <-rm.logChan: rm.handleLogEntry(entry) case <-rm.stopChan: rm.logger.Info("Stopping replication log processor") return } } } // handleLogEntry processes a replication log entry func (rm *ReplicationManager) handleLogEntry(entry *ReplicationLogEntry) { // Skip processing if we're a replica if rm.isReplica { return } // TODO: Implement actual replication to remote nodes // For now, we'll just log the event if entry.batch { rm.logger.Debug("Processing batch entry for replication: timestamp=%s, batch_size=%d", entry.timestamp.String(), entry.batchSize) } else { rm.logger.Debug("Processing single entry for replication: type=%d, timestamp=%s, key_size=%d", entry.entry.Type, entry.timestamp.String(), len(entry.entry.Key)) } } // SetLeader sets this node as the leader for replication func (rm *ReplicationManager) SetLeader(isLeader bool) { rm.mu.Lock() defer rm.mu.Unlock() rm.isLeader = isLeader rm.isReplica = !isLeader if isLeader { rm.logger.Info("Node set as replication leader") } else { rm.logger.Info("Node set as replica") } } // IsLeader returns whether this node is the leader func (rm *ReplicationManager) IsLeader() bool { rm.mu.RLock() defer rm.mu.RUnlock() return rm.isLeader } // IsReplica returns whether this node is a replica func (rm *ReplicationManager) IsReplica() bool { rm.mu.RLock() defer rm.mu.RUnlock() return rm.isReplica } // AddReplica adds a replica node to the replication system func (rm *ReplicationManager) AddReplica(id string, nodeID clock.NodeID) { rm.mu.Lock() defer rm.mu.Unlock() rm.replicaIDs[id] = nodeID rm.logger.Info("Added replica: %s with NodeID: %s", id, nodeID.String()) } // RemoveReplica removes a replica node from the replication system func (rm *ReplicationManager) RemoveReplica(id string) { rm.mu.Lock() defer rm.mu.Unlock() delete(rm.replicaIDs, id) delete(rm.replicaPositions, id) rm.logger.Info("Removed replica: %s", id) } // GetReplicaIDs returns the list of replica IDs func (rm *ReplicationManager) GetReplicaIDs() []string { rm.mu.RLock() defer rm.mu.RUnlock() ids := make([]string, 0, len(rm.replicaIDs)) for id := range rm.replicaIDs { ids = append(ids, id) } return ids } // GetReplicaPosition returns the current replication position for a replica func (rm *ReplicationManager) GetReplicaPosition(replicaID string) (clock.Timestamp, bool) { rm.mu.RLock() defer rm.mu.RUnlock() pos, exists := rm.replicaPositions[replicaID] return pos, exists } // UpdateReplicaPosition updates the position for a specific replica func (rm *ReplicationManager) UpdateReplicaPosition(replicaID string, position clock.Timestamp) { rm.mu.Lock() defer rm.mu.Unlock() current, exists := rm.replicaPositions[replicaID] if !exists || clock.Compare(position, current) > 0 { rm.replicaPositions[replicaID] = position rm.logger.Debug("Updated replica %s position to %s", replicaID, position.String()) } } // OnEntryWritten implements the wal.ReplicationHook interface // It's called when a single entry is written to the WAL func (rm *ReplicationManager) OnEntryWritten(entry *wal.Entry, timestamp clock.Timestamp) error { // Skip processing if we're a replica to avoid replication loops if rm.isReplica { return nil } // Queue the entry for processing select { case rm.logChan <- &ReplicationLogEntry{ entry: entry, timestamp: timestamp, batch: false, }: // Successfully queued default: // Channel is full, log warning but don't block the write path rm.logger.Error("Replication queue is full, dropping entry") } return nil } // OnBatchWritten implements the wal.ReplicationHook interface // It's called when a batch of entries is written to the WAL func (rm *ReplicationManager) OnBatchWritten(entries []*wal.Entry, startTimestamp clock.Timestamp) error { // Skip processing if we're a replica to avoid replication loops if rm.isReplica { return nil } // Process each entry in the batch for i, entry := range entries { // Calculate timestamp for this entry based on batch start entryTimestamp := clock.Timestamp{ Counter: startTimestamp.Counter + uint64(i), Node: startTimestamp.Node, } // Queue the entry for processing select { case rm.logChan <- &ReplicationLogEntry{ entry: entry, timestamp: entryTimestamp, batch: true, batchSize: len(entries), }: // Successfully queued default: // Channel is full, log warning but don't block the write path rm.logger.Error("Replication queue is full, dropping batch entry %d/%d", i+1, len(entries)) } } return nil } // Close shuts down the replication manager func (rm *ReplicationManager) Close() error { close(rm.stopChan) rm.logger.Info("Replication manager shut down") return nil } // SetReplicationHook sets a custom replication hook on the WAL // This is mainly used for testing or when you need to override the default hook func (e *Engine) SetReplicationHook(hook interface{}) error { walHook, ok := hook.(wal.ReplicationHook) if !ok { return fmt.Errorf("invalid replication hook type: %T", hook) } e.wal.SetReplicationHook(walHook) return nil }