feat: implement detailed status information in replication manager
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled

This commit is contained in:
Jeremy Tregunna 2025-05-02 22:57:20 -06:00
parent fed04a8f38
commit 33a8a41e7d
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
2 changed files with 285 additions and 13 deletions

View File

@ -168,6 +168,155 @@ func (m *Manager) Stop() error {
return nil
}
// getPrimaryStatus returns detailed status information for a primary node
func (m *Manager) getPrimaryStatus(status map[string]interface{}) map[string]interface{} {
if m.primary == nil {
return status
}
status["listen_address"] = m.config.ListenAddr
// Get detailed primary status
m.primary.mu.RLock()
defer m.primary.mu.RUnlock()
// Add information about connected replicas
replicaCount := len(m.primary.sessions)
activeReplicas := 0
connectedReplicas := 0
// Create a replicas list with detailed information
replicas := make([]map[string]interface{}, 0, replicaCount)
for id, session := range m.primary.sessions {
// Track active and connected counts
if session.Connected {
connectedReplicas++
}
if session.Active && session.Connected {
activeReplicas++
}
// Create detailed replica info
replicaInfo := map[string]interface{}{
"id": id,
"connected": session.Connected,
"active": session.Active,
"last_activity": session.LastActivity.UnixNano() / int64(time.Millisecond),
"last_ack_sequence": session.LastAckSequence,
"listener_address": session.ListenerAddress,
"start_sequence": session.StartSequence,
"idle_time_seconds": time.Since(session.LastActivity).Seconds(),
}
replicas = append(replicas, replicaInfo)
}
// Get WAL sequence information
currentWalSeq := uint64(0)
if m.primary.wal != nil {
currentWalSeq = m.primary.wal.GetNextSequence() - 1 // Last used sequence
}
// Add primary-specific information to status
status["replica_count"] = replicaCount
status["connected_replica_count"] = connectedReplicas
status["active_replica_count"] = activeReplicas
status["replicas"] = replicas
status["current_wal_sequence"] = currentWalSeq
status["last_synced_sequence"] = m.primary.lastSyncedSeq
status["retention_config"] = map[string]interface{}{
"max_age_hours": m.primary.retentionConfig.MaxAgeHours,
"min_sequence_keep": m.primary.retentionConfig.MinSequenceKeep,
}
status["compression_enabled"] = m.primary.enableCompression
status["default_codec"] = m.primary.defaultCodec.String()
return status
}
// getReplicaStatus returns detailed status information for a replica node
func (m *Manager) getReplicaStatus(status map[string]interface{}) map[string]interface{} {
if m.replica == nil {
return status
}
// Basic replica information
status["primary_address"] = m.config.PrimaryAddr
status["last_applied_sequence"] = m.lastApplied
// Detailed state information
currentState := m.replica.GetStateString()
status["state"] = currentState
// Get the state tracker for more detailed information
stateTracker := m.replica.stateTracker
if stateTracker != nil {
// Add state duration
stateTime := stateTracker.GetStateDuration()
status["state_duration_seconds"] = stateTime.Seconds()
// Add error information if in error state
if currentState == "ERROR" {
if err := stateTracker.GetError(); err != nil {
status["last_error"] = err.Error()
}
}
// Get state transitions
transitions := stateTracker.GetTransitions()
if len(transitions) > 0 {
stateHistory := make([]map[string]interface{}, 0, len(transitions))
for _, t := range transitions {
stateHistory = append(stateHistory, map[string]interface{}{
"from": t.From.String(),
"to": t.To.String(),
"timestamp": t.Timestamp.UnixNano() / int64(time.Millisecond),
})
}
// Only include the last 10 transitions to keep the response size reasonable
if len(stateHistory) > 10 {
stateHistory = stateHistory[len(stateHistory)-10:]
}
status["state_history"] = stateHistory
}
}
// Add connection information
if m.replica.conn != nil {
status["connection_status"] = "connected"
} else {
status["connection_status"] = "disconnected"
}
// Add replication listener information
status["replication_listener_address"] = m.config.ListenAddr
// Include statistics
if m.replica.stats != nil {
status["entries_received"] = m.replica.stats.GetEntriesReceived()
status["entries_applied"] = m.replica.stats.GetEntriesApplied()
status["bytes_received"] = m.replica.stats.GetBytesReceived()
status["batch_count"] = m.replica.stats.GetBatchCount()
status["errors"] = m.replica.stats.GetErrorCount()
// Add last batch time information
lastBatchTime := m.replica.stats.GetLastBatchTime()
if lastBatchTime > 0 {
status["last_batch_time"] = lastBatchTime
status["seconds_since_last_batch"] = m.replica.stats.GetLastBatchTimeDuration().Seconds()
}
}
// Add configuration information
status["force_read_only"] = m.config.ForceReadOnly
return status
}
// Status returns the current status of the replication service
func (m *Manager) Status() map[string]interface{} {
m.mu.RLock()
@ -182,18 +331,9 @@ func (m *Manager) Status() map[string]interface{} {
// Add mode-specific status
switch m.config.Mode {
case ReplicationModePrimary:
if m.primary != nil {
// Add information about connected replicas, etc.
status["listen_address"] = m.config.ListenAddr
// TODO: Add more detailed primary status
}
status = m.getPrimaryStatus(status)
case ReplicationModeReplica:
if m.replica != nil {
status["primary_address"] = m.config.PrimaryAddr
status["last_applied_sequence"] = m.lastApplied
status["state"] = m.replica.GetStateString()
// TODO: Add more detailed replica status
}
status = m.getReplicaStatus(status)
}
return status

View File

@ -6,6 +6,7 @@ import (
"io"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/KevoDB/kevo/pkg/wal"
@ -18,6 +19,81 @@ import (
"google.golang.org/grpc/status"
)
// ReplicaStats tracks statistics for a replica node
type ReplicaStats struct {
entriesReceived atomic.Uint64 // Number of WAL entries received
entriesApplied atomic.Uint64 // Number of WAL entries successfully applied
bytesReceived atomic.Uint64 // Number of bytes received from the primary
batchCount atomic.Uint64 // Number of batches received
errors atomic.Uint64 // Number of errors during replication
lastBatchTime atomic.Int64 // Timestamp of the last batch received (Unix nanos)
}
// TrackEntriesReceived increments the count of received entries
func (s *ReplicaStats) TrackEntriesReceived(count uint64) {
s.entriesReceived.Add(count)
}
// TrackEntriesApplied increments the count of applied entries
func (s *ReplicaStats) TrackEntriesApplied(count uint64) {
s.entriesApplied.Add(count)
}
// TrackBytesReceived increments the count of bytes received
func (s *ReplicaStats) TrackBytesReceived(bytes uint64) {
s.bytesReceived.Add(bytes)
}
// TrackBatchReceived increments the batch count and updates the last batch time
func (s *ReplicaStats) TrackBatchReceived() {
s.batchCount.Add(1)
s.lastBatchTime.Store(time.Now().UnixNano())
}
// TrackError increments the error count
func (s *ReplicaStats) TrackError() {
s.errors.Add(1)
}
// GetEntriesReceived returns the number of entries received
func (s *ReplicaStats) GetEntriesReceived() uint64 {
return s.entriesReceived.Load()
}
// GetEntriesApplied returns the number of entries applied
func (s *ReplicaStats) GetEntriesApplied() uint64 {
return s.entriesApplied.Load()
}
// GetBytesReceived returns the number of bytes received
func (s *ReplicaStats) GetBytesReceived() uint64 {
return s.bytesReceived.Load()
}
// GetBatchCount returns the number of batches received
func (s *ReplicaStats) GetBatchCount() uint64 {
return s.batchCount.Load()
}
// GetErrorCount returns the number of errors encountered
func (s *ReplicaStats) GetErrorCount() uint64 {
return s.errors.Load()
}
// GetLastBatchTime returns the timestamp of the last batch
func (s *ReplicaStats) GetLastBatchTime() int64 {
return s.lastBatchTime.Load()
}
// GetLastBatchTimeDuration returns the duration since the last batch
func (s *ReplicaStats) GetLastBatchTimeDuration() time.Duration {
lastBatch := s.lastBatchTime.Load()
if lastBatch == 0 {
return 0
}
return time.Since(time.Unix(0, lastBatch))
}
// WALEntryApplier interface is defined in interfaces.go
// ConnectionConfig contains configuration for connecting to the primary
@ -111,6 +187,9 @@ type Replica struct {
// Stream client for receiving WAL entries
streamClient replication_proto.WALReplicationService_StreamWALClient
// Statistics for the replica
stats *ReplicaStats
// Session ID for communication with primary
sessionID string
@ -169,6 +248,7 @@ func NewReplica(lastAppliedSeq uint64, applier WALEntryApplier, config *ReplicaC
cancel: cancel,
shutdown: false,
connector: &DefaultPrimaryConnector{},
stats: &ReplicaStats{}, // Initialize statistics tracker
}
return replica, nil
@ -781,7 +861,21 @@ func (r *Replica) connectToPrimary() error {
// processEntriesWithoutStateTransitions processes a batch of WAL entries without attempting state transitions
// This function is called from handleStreamingState and skips the state transitions at the end
func (r *Replica) processEntriesWithoutStateTransitions(response *replication_proto.WALStreamResponse) error {
fmt.Printf("Processing %d entries (no state transitions)\n", len(response.Entries))
entryCount := len(response.Entries)
fmt.Printf("Processing %d entries (no state transitions)\n", entryCount)
// Track statistics
if r.stats != nil {
r.stats.TrackBatchReceived()
r.stats.TrackEntriesReceived(uint64(entryCount))
// Calculate total bytes received
var totalBytes uint64
for _, entry := range response.Entries {
totalBytes += uint64(len(entry.Payload))
}
r.stats.TrackBytesReceived(totalBytes)
}
// Check if entries are compressed
entries := response.Entries
@ -839,6 +933,18 @@ func (r *Replica) processEntriesWithoutStateTransitions(response *replication_pr
r.mu.Lock()
r.lastAppliedSeq = maxSeq
r.mu.Unlock()
// Track applied entries in statistics
if r.stats != nil {
// Calculate the number of entries that were successfully applied
appliedCount := uint64(0)
for _, entry := range entries {
if entry.SequenceNumber <= maxSeq {
appliedCount++
}
}
r.stats.TrackEntriesApplied(appliedCount)
}
// Perform fsync directly without transitioning state
fmt.Printf("Performing direct fsync to ensure entries are persisted\n")
@ -853,7 +959,21 @@ func (r *Replica) processEntriesWithoutStateTransitions(response *replication_pr
// processEntries processes a batch of WAL entries
func (r *Replica) processEntries(response *replication_proto.WALStreamResponse) error {
fmt.Printf("Processing %d entries\n", len(response.Entries))
entryCount := len(response.Entries)
fmt.Printf("Processing %d entries\n", entryCount)
// Track statistics
if r.stats != nil {
r.stats.TrackBatchReceived()
r.stats.TrackEntriesReceived(uint64(entryCount))
// Calculate total bytes received
var totalBytes uint64
for _, entry := range response.Entries {
totalBytes += uint64(len(entry.Payload))
}
r.stats.TrackBytesReceived(totalBytes)
}
// Check if entries are compressed
entries := response.Entries
@ -911,6 +1031,18 @@ func (r *Replica) processEntries(response *replication_proto.WALStreamResponse)
r.mu.Lock()
r.lastAppliedSeq = maxSeq
r.mu.Unlock()
// Track applied entries in statistics
if r.stats != nil {
// Calculate the number of entries that were successfully applied
appliedCount := uint64(0)
for _, entry := range entries {
if entry.SequenceNumber <= maxSeq {
appliedCount++
}
}
r.stats.TrackEntriesApplied(appliedCount)
}
// Move to fsync state
fmt.Printf("Moving to FSYNC_PENDING state\n")