feat: implement WAL retentation management functionality
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled
This commit is contained in:
parent
bbb54a0d6c
commit
fed04a8f38
@ -778,13 +778,51 @@ func (p *Primary) getSession(id string) *ReplicaSession {
|
||||
|
||||
// maybeManageWALRetention checks if WAL retention management should be triggered
|
||||
func (p *Primary) maybeManageWALRetention() {
|
||||
// This method would analyze all replica acknowledgments to determine
|
||||
// the minimum acknowledged sequence across all replicas, then use that
|
||||
// to decide which WAL files can be safely deleted.
|
||||
|
||||
// For now, this is a placeholder that would need to be connected to the
|
||||
// actual WAL retention management logic
|
||||
// TODO: Implement WAL retention management
|
||||
p.mu.RLock()
|
||||
|
||||
// Find minimum acknowledged sequence across all connected replicas
|
||||
minAcknowledgedSeq := uint64(^uint64(0)) // Start with max value
|
||||
activeReplicas := 0
|
||||
|
||||
for id, session := range p.sessions {
|
||||
if session.Connected && session.Active {
|
||||
activeReplicas++
|
||||
if session.LastAckSequence < minAcknowledgedSeq {
|
||||
minAcknowledgedSeq = session.LastAckSequence
|
||||
}
|
||||
log.Info("Replica %s has acknowledged up to sequence %d",
|
||||
id, session.LastAckSequence)
|
||||
}
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
||||
// Only proceed if we have valid data and active replicas
|
||||
if minAcknowledgedSeq == uint64(^uint64(0)) || minAcknowledgedSeq == 0 {
|
||||
log.Info("No minimum acknowledged sequence found, skipping WAL retention")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("WAL retention: minimum acknowledged sequence across %d active replicas: %d",
|
||||
activeReplicas, minAcknowledgedSeq)
|
||||
|
||||
// Apply the retention policy using the existing WAL API
|
||||
config := wal.WALRetentionConfig{
|
||||
MaxAge: time.Duration(p.retentionConfig.MaxAgeHours) * time.Hour,
|
||||
MinSequenceKeep: minAcknowledgedSeq,
|
||||
}
|
||||
|
||||
filesDeleted, err := p.wal.ManageRetention(config)
|
||||
if err != nil {
|
||||
log.Error("Failed to manage WAL retention: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if filesDeleted > 0 {
|
||||
log.Info("WAL retention: deleted %d files, min sequence kept: %d",
|
||||
filesDeleted, minAcknowledgedSeq)
|
||||
} else {
|
||||
log.Info("WAL retention: no files eligible for deletion")
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the primary, unregistering from WAL and cleaning up resources
|
||||
|
Loading…
Reference in New Issue
Block a user