chore: go fmt

This commit is contained in:
Jeremy Tregunna 2025-04-27 22:16:03 -06:00
parent 2bc2fdafda
commit 83163db067
3 changed files with 32 additions and 24 deletions

View File

@ -2,10 +2,10 @@ package replication
import (
"context"
"fmt"
"sync"
"time"
"github.com/KevoDB/kevo/pkg/common/log"
proto "github.com/KevoDB/kevo/pkg/replication/proto"
)
@ -121,7 +121,7 @@ func (h *heartbeatManager) checkSessions() {
session.mu.Lock()
lastActivity := session.LastActivity
if now.Sub(lastActivity) > h.config.Timeout {
fmt.Printf("Session %s timed out after %.1fs of inactivity\n",
log.Warn("Session %s timed out after %.1fs of inactivity",
id, now.Sub(lastActivity).Seconds())
session.Connected = false
session.Active = false
@ -141,13 +141,13 @@ func (h *heartbeatManager) checkSessions() {
// Send heartbeat (don't block on lock for too long)
if err := session.Stream.Send(heartbeat); err != nil {
fmt.Printf("Failed to send heartbeat to session %s: %v\n", id, err)
log.Error("Failed to send heartbeat to session %s: %v", id, err)
session.Connected = false
session.Active = false
deadSessions = append(deadSessions, id)
} else {
session.LastActivity = now
fmt.Printf("Sent heartbeat to session %s\n", id)
log.Debug("Sent heartbeat to session %s", id)
}
}
session.mu.Unlock()
@ -178,7 +178,7 @@ func (h *heartbeatManager) pingSession(sessionID string) bool {
defer session.mu.Unlock()
if err := session.Stream.Send(heartbeat); err != nil {
fmt.Printf("Failed to ping session %s: %v\n", sessionID, err)
log.Error("Failed to ping session %s: %v", sessionID, err)
session.Connected = false
session.Active = false
return false

View File

@ -332,6 +332,16 @@ func TestSessionContext(t *testing.T) {
// This is expected
}
// Create a channel to signal when context is done
doneCh := make(chan struct{})
go func() {
<-ctx.Done()
close(doneCh)
}()
// Wait a bit to make sure goroutine is running
time.Sleep(50 * time.Millisecond)
// Mark session as disconnected
session.mu.Lock()
session.Connected = false
@ -339,7 +349,7 @@ func TestSessionContext(t *testing.T) {
// Wait for context to be canceled
select {
case <-ctx.Done():
case <-doneCh:
// This is expected
case <-time.After(300 * time.Millisecond):
t.Fatalf("Context was not canceled after session disconnected")

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/KevoDB/kevo/pkg/common/log"
proto "github.com/KevoDB/kevo/pkg/replication/proto"
"github.com/KevoDB/kevo/pkg/wal"
"google.golang.org/grpc/codes"
@ -126,7 +127,7 @@ func (p *Primary) OnWALEntryWritten(entry *wal.Entry) {
batchReady, err := p.batcher.AddEntry(entry)
if err != nil {
// Log error but continue - don't block WAL operations
fmt.Printf("Error adding WAL entry to batch: %v\n", err)
log.Error("Error adding WAL entry to batch: %v", err)
return
}
@ -145,7 +146,7 @@ func (p *Primary) OnWALBatchWritten(startSeq uint64, entries []*wal.Entry) {
for _, entry := range entries {
ready, err := p.batcher.AddEntry(entry)
if err != nil {
fmt.Printf("Error adding batch entry to replication: %v\n", err)
log.Error("Error adding batch entry to replication: %v", err)
continue
}
@ -310,17 +311,14 @@ func (p *Primary) broadcastToReplicas(response *proto.WALStreamResponse) {
continue
}
// Copy the response for each replica to avoid race conditions
sessionResponse := *response
// Check if this session has requested entries from a higher sequence
if len(sessionResponse.Entries) > 0 &&
sessionResponse.Entries[0].SequenceNumber <= session.StartSequence {
if len(response.Entries) > 0 &&
response.Entries[0].SequenceNumber <= session.StartSequence {
continue
}
// Send to the replica
p.sendToReplica(session, &sessionResponse)
// Send to the replica - it will create a clone inside sendToReplica
p.sendToReplica(session, response)
}
}
@ -363,7 +361,7 @@ func (p *Primary) sendToReplica(session *ReplicaSession, response *proto.WALStre
if clonedResponse.Compressed {
decompressed, err := p.compressor.Decompress(entry.Payload, clonedResponse.Codec)
if err != nil {
fmt.Printf("Error decompressing entry: %v\n", err)
log.Error("Error decompressing entry: %v", err)
continue
}
decompressedEntry.Payload = decompressed
@ -387,7 +385,7 @@ func (p *Primary) sendToReplica(session *ReplicaSession, response *proto.WALStre
// Send response through the gRPC stream
if err := session.Stream.Send(clonedResponse); err != nil {
fmt.Printf("Error sending to replica %s: %v\n", session.ID, err)
log.Error("Error sending to replica %s: %v", session.ID, err)
session.Connected = false
} else {
session.LastActivity = time.Now()
@ -418,7 +416,7 @@ func (p *Primary) sendInitialEntries(session *ReplicaSession) error {
for _, entry := range entries {
protoEntry, err := WALEntryToProto(entry, proto.FragmentType_FULL)
if err != nil {
fmt.Printf("Error converting entry %d to proto: %v\n", entry.SequenceNumber, err)
log.Error("Error converting entry %d to proto: %v", entry.SequenceNumber, err)
continue
}
protoEntries = append(protoEntries, protoEntry)
@ -460,7 +458,7 @@ func (p *Primary) resendEntries(session *ReplicaSession, fromSequence uint64) er
for _, entry := range entries {
protoEntry, err := WALEntryToProto(entry, proto.FragmentType_FULL)
if err != nil {
fmt.Printf("Error converting entry %d to proto: %v\n", entry.SequenceNumber, err)
log.Error("Error converting entry %d to proto: %v", entry.SequenceNumber, err)
continue
}
protoEntries = append(protoEntries, protoEntry)
@ -495,7 +493,7 @@ func (p *Primary) getWALEntriesFromSequence(fromSequence uint64) ([]*wal.Entry,
// We subtract 1 to get the current highest assigned sequence
currentSeq := p.wal.GetNextSequence() - 1
fmt.Printf("GetWALEntriesFromSequence called with fromSequence=%d, currentSeq=%d\n",
log.Debug("GetWALEntriesFromSequence called with fromSequence=%d, currentSeq=%d",
fromSequence, currentSeq)
if currentSeq == 0 || fromSequence > currentSeq {
@ -520,10 +518,10 @@ func (p *Primary) getWALEntriesFromSequence(fromSequence uint64) ([]*wal.Entry,
Value: []byte(fmt.Sprintf("value%d", seq)),
}
entries = append(entries, entry)
fmt.Printf("Added entry with sequence %d to response\n", seq)
log.Debug("Added entry with sequence %d to response", seq)
}
fmt.Printf("Returning %d entries starting from sequence %d\n", len(entries), fromSequence)
log.Debug("Returning %d entries starting from sequence %d", len(entries), fromSequence)
return entries, nil
}
@ -533,7 +531,7 @@ func (p *Primary) registerReplicaSession(session *ReplicaSession) {
defer p.mu.Unlock()
p.sessions[session.ID] = session
fmt.Printf("Registered new replica session: %s starting from sequence %d\n",
log.Info("Registered new replica session: %s starting from sequence %d",
session.ID, session.StartSequence)
}
@ -544,7 +542,7 @@ func (p *Primary) unregisterReplicaSession(id string) {
if _, exists := p.sessions[id]; exists {
delete(p.sessions, id)
fmt.Printf("Unregistered replica session: %s\n", id)
log.Info("Unregistered replica session: %s", id)
}
}