fix: update keepalive and grpc connection options to be standard across all sdks
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Failing after 15m5s

This commit is contained in:
Jeremy Tregunna 2025-05-17 13:20:12 -06:00
parent 1ff166fe77
commit acc6d7ee98
Signed by: jer
GPG Key ID: 1278B36BA6F5D5E4
5 changed files with 58 additions and 20 deletions

View File

@ -80,13 +80,13 @@ func (s *Server) Start() error {
MaxConnectionIdle: 60 * time.Second,
MaxConnectionAge: 5 * time.Minute,
MaxConnectionAgeGrace: 5 * time.Second,
Time: 15 * time.Second,
Timeout: 5 * time.Second,
Time: 30 * time.Second, // Send pings every 30 seconds if there is no activity
Timeout: 10 * time.Second, // Wait 10 seconds for ping ack before assuming connection is dead
}
kaPolicy := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
MinTime: 10 * time.Second, // Minimum time a client should wait between pings
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
serverOpts = append(serverOpts,

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/KevoDB/kevo/pkg/transport"
"google.golang.org/grpc/keepalive"
)
// CompressionType represents a compression algorithm
@ -46,24 +47,30 @@ type ClientOptions struct {
// Performance options
Compression CompressionType // Compression algorithm
MaxMessageSize int // Maximum message size
// Keepalive options
KeepaliveTime time.Duration // Time between keepalive pings (0 for default)
KeepaliveTimeout time.Duration // Time to wait for ping ack (0 for default)
}
// DefaultClientOptions returns sensible default client options
func DefaultClientOptions() ClientOptions {
return ClientOptions{
Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10,
TransportType: "grpc",
PoolSize: 5,
TLSEnabled: false,
MaxRetries: 3,
InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2,
BackoffFactor: 1.5,
RetryJitter: 0.2,
Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
Endpoint: "localhost:50051",
ConnectTimeout: time.Second * 5,
RequestTimeout: time.Second * 10,
TransportType: "grpc",
PoolSize: 5,
TLSEnabled: false,
MaxRetries: 3,
InitialBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 2,
BackoffFactor: 1.5,
RetryJitter: 0.2,
Compression: CompressionNone,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
KeepaliveTime: 30 * time.Second, // 30 seconds keepalive time
KeepaliveTimeout: 10 * time.Second, // 10 seconds timeout
}
}
@ -101,6 +108,16 @@ func NewClient(options ClientOptions) (*Client, error) {
return nil, errors.New("endpoint is required")
}
// Configure keepalive parameters if specified
var keepaliveParams *keepalive.ClientParameters
if options.KeepaliveTime > 0 && options.KeepaliveTimeout > 0 {
keepaliveParams = &keepalive.ClientParameters{
Time: options.KeepaliveTime,
Timeout: options.KeepaliveTimeout,
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
}
transportOpts := transport.TransportOptions{
Timeout: options.ConnectTimeout,
MaxMessageSize: options.MaxMessageSize,
@ -109,6 +126,7 @@ func NewClient(options ClientOptions) (*Client, error) {
CertFile: options.CertFile,
KeyFile: options.KeyFile,
CAFile: options.CAFile,
KeepaliveParams: keepaliveParams,
RetryPolicy: transport.RetryPolicy{
MaxRetries: options.MaxRetries,
InitialBackoff: options.InitialBackoff,
@ -216,6 +234,16 @@ func (c *Client) discoverTopology(ctx context.Context) error {
// createTransportOptions converts client options to transport options
func (c *Client) createTransportOptions(options ClientOptions) transport.TransportOptions {
// Configure keepalive parameters if specified
var keepaliveParams *keepalive.ClientParameters
if options.KeepaliveTime > 0 && options.KeepaliveTimeout > 0 {
keepaliveParams = &keepalive.ClientParameters{
Time: options.KeepaliveTime,
Timeout: options.KeepaliveTimeout,
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
}
return transport.TransportOptions{
Timeout: options.ConnectTimeout,
MaxMessageSize: options.MaxMessageSize,
@ -224,6 +252,7 @@ func (c *Client) createTransportOptions(options ClientOptions) transport.Transpo
CertFile: options.CertFile,
KeyFile: options.KeyFile,
CAFile: options.CAFile,
KeepaliveParams: keepaliveParams,
RetryPolicy: transport.RetryPolicy{
MaxRetries: options.MaxRetries,
InitialBackoff: options.InitialBackoff,

View File

@ -357,11 +357,11 @@ func (m *Manager) startPrimary() error {
// Configure gRPC server options
opts := []grpc.ServerOption{
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Second, // Send pings every 10 seconds if there is no activity
Timeout: 5 * time.Second, // Wait 5 seconds for ping ack before assuming connection is dead
Time: 30 * time.Second, // Send pings every 30 seconds if there is no activity
Timeout: 10 * time.Second, // Wait 10 seconds for ping ack before assuming connection is dead
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // Minimum time a client should wait before sending a ping
MinTime: 10 * time.Second, // Minimum time a client should wait between pings
PermitWithoutStream: true, // Allow pings even when there are no active streams
}),
grpc.MaxRecvMsgSize(16 * 1024 * 1024), // 16MB max message size

View File

@ -15,6 +15,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@ -818,6 +819,11 @@ func (c *DefaultPrimaryConnector) Connect(r *Replica) error {
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTimeout(r.config.Connection.DialTimeout),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // Send pings every 30 seconds if there is no activity
Timeout: 10 * time.Second, // Wait 10 seconds for ping ack before assuming connection is dead
PermitWithoutStream: true, // Allow pings even when there are no active streams
}),
}
// Set up transport security

View File

@ -3,6 +3,8 @@ package transport
import (
"context"
"time"
"google.golang.org/grpc/keepalive"
)
// CompressionType defines the compression algorithm used
@ -34,6 +36,7 @@ type TransportOptions struct {
CertFile string
KeyFile string
CAFile string
KeepaliveParams *keepalive.ClientParameters // Optional keepalive parameters for gRPC clients
}
// TransportStatus contains information about the current transport state