feat: add suffix scanning
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m49s
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m49s
This commit is contained in:
parent
3219e7527e
commit
6f83fa1ade
@ -134,6 +134,11 @@ kevo> SCAN user:
|
||||
user:1: {"name":"John","email":"john@example.com"}
|
||||
user:2: {"name":"Jane","email":"jane@example.com"}
|
||||
2 entries found
|
||||
|
||||
kevo> SCAN SUFFIX @example.com
|
||||
user:1: {"name":"John","email":"john@example.com"}
|
||||
user:2: {"name":"Jane","email":"jane@example.com"}
|
||||
2 entries found
|
||||
```
|
||||
|
||||
Type `.help` in the CLI for more commands.
|
||||
|
@ -42,6 +42,7 @@ var completer = readline.NewPrefixCompleter(
|
||||
readline.PcItem("DELETE"),
|
||||
readline.PcItem("SCAN",
|
||||
readline.PcItem("RANGE"),
|
||||
readline.PcItem("SUFFIX"),
|
||||
),
|
||||
)
|
||||
|
||||
@ -75,6 +76,7 @@ Commands (interactive mode only):
|
||||
|
||||
SCAN - Scan all key-value pairs
|
||||
SCAN prefix - Scan key-value pairs with given prefix
|
||||
SCAN SUFFIX suffix - Scan key-value pairs with given suffix
|
||||
SCAN RANGE start end - Scan key-value pairs in range [start, end)
|
||||
- Note: start and end are treated as string keys, not numeric indices
|
||||
`
|
||||
@ -701,6 +703,9 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
if len(parts) == 1 {
|
||||
// Full scan
|
||||
iter = tx.NewIterator()
|
||||
} else if len(parts) == 3 && strings.ToUpper(parts[1]) == "SUFFIX" {
|
||||
// Suffix scan - we'll create a regular iterator and filter for the suffix later
|
||||
iter = tx.NewIterator()
|
||||
} else if len(parts) == 2 {
|
||||
// Prefix scan
|
||||
prefix := []byte(parts[1])
|
||||
@ -713,7 +718,7 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
} else if len(parts) == 4 && strings.ToUpper(parts[1]) == "RANGE" {
|
||||
// Range scan with explicit RANGE keyword
|
||||
iter = tx.NewRangeIterator([]byte(parts[2]), []byte(parts[3]))
|
||||
} else if len(parts) == 3 {
|
||||
} else if len(parts) == 3 && strings.ToUpper(parts[1]) != "SUFFIX" {
|
||||
// Old style range scan
|
||||
fmt.Println("Warning: Using deprecated range syntax. Use 'SCAN RANGE start end' instead.")
|
||||
iter = tx.NewRangeIterator([]byte(parts[1]), []byte(parts[2]))
|
||||
@ -733,6 +738,9 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
if len(parts) == 1 {
|
||||
// Full scan
|
||||
iter, iterErr = eng.GetIterator()
|
||||
} else if len(parts) == 3 && strings.ToUpper(parts[1]) == "SUFFIX" {
|
||||
// Suffix scan - create a regular iterator and filter in the scan loop
|
||||
iter, iterErr = eng.GetIterator()
|
||||
} else if len(parts) == 2 {
|
||||
// Prefix scan
|
||||
prefix := []byte(parts[1])
|
||||
@ -745,7 +753,7 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
} else if len(parts) == 4 && strings.ToUpper(parts[1]) == "RANGE" {
|
||||
// Range scan with explicit RANGE keyword
|
||||
iter, iterErr = eng.GetRangeIterator([]byte(parts[2]), []byte(parts[3]))
|
||||
} else if len(parts) == 3 {
|
||||
} else if len(parts) == 3 && strings.ToUpper(parts[1]) != "SUFFIX" {
|
||||
// Old style range scan
|
||||
fmt.Println("Warning: Using deprecated range syntax. Use 'SCAN RANGE start end' instead.")
|
||||
iter, iterErr = eng.GetRangeIterator([]byte(parts[1]), []byte(parts[2]))
|
||||
@ -760,6 +768,13 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we're doing a suffix scan
|
||||
isSuffixScan := len(parts) == 3 && strings.ToUpper(parts[1]) == "SUFFIX"
|
||||
suffix := []byte{}
|
||||
if isSuffixScan {
|
||||
suffix = []byte(parts[2])
|
||||
}
|
||||
|
||||
// Perform the scan
|
||||
count := 0
|
||||
seenKeys := make(map[string]bool)
|
||||
@ -770,6 +785,14 @@ func runInteractive(eng *engine.Engine, dbPath string) {
|
||||
continue
|
||||
}
|
||||
|
||||
// For suffix scans, check if the key ends with the suffix
|
||||
if isSuffixScan {
|
||||
key := iter.Key()
|
||||
if len(key) < len(suffix) || !hasSuffix(key, suffix) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Mark this key as seen
|
||||
seenKeys[keyStr] = true
|
||||
|
||||
@ -811,6 +834,19 @@ func makeKeySuccessor(prefix []byte) []byte {
|
||||
return successor
|
||||
}
|
||||
|
||||
// hasSuffix checks if a byte slice ends with a specific suffix
|
||||
func hasSuffix(data, suffix []byte) bool {
|
||||
if len(data) < len(suffix) {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(suffix); i++ {
|
||||
if data[len(data)-len(suffix)+i] != suffix[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// toTitle replaces strings.Title which is deprecated
|
||||
// It converts the first character of each word to title case
|
||||
func toTitle(s string) string {
|
||||
|
@ -65,6 +65,23 @@ Bounded iterators limit the range of keys an iterator will traverse:
|
||||
- Filter out keys outside the desired range
|
||||
- Maintain the underlying iterator's properties otherwise
|
||||
|
||||
### Filtering Iterators
|
||||
|
||||
Filtering iterators apply specific criteria to the underlying data:
|
||||
|
||||
1. **Prefix Iterator**:
|
||||
- Only returns keys that start with a specific prefix
|
||||
- Efficiently filters keys during iteration
|
||||
|
||||
2. **Suffix Iterator**:
|
||||
- Only returns keys that end with a specific suffix
|
||||
- Checks suffix condition during iteration
|
||||
|
||||
3. **Implementation Approach**:
|
||||
- Wrap an existing iterator
|
||||
- Apply filtering logic in the Next() method
|
||||
- Allow combining with other iterator types (e.g., bounded)
|
||||
|
||||
### Composite Iterators
|
||||
|
||||
Composite iterators combine multiple source iterators into a single view:
|
||||
@ -180,6 +197,26 @@ for rangeIter.SeekToFirst(); rangeIter.Valid(); rangeIter.Next() {
|
||||
}
|
||||
```
|
||||
|
||||
### Prefix and Suffix Iterator
|
||||
|
||||
```go
|
||||
// Create a prefix iterator
|
||||
prefixIter := newPrefixIterator(sourceIter, []byte("user:"))
|
||||
|
||||
// Iterate through keys with prefix
|
||||
for prefixIter.SeekToFirst(); prefixIter.Valid(); prefixIter.Next() {
|
||||
fmt.Printf("Key with prefix: %s\n", prefixIter.Key())
|
||||
}
|
||||
|
||||
// Create a suffix iterator
|
||||
suffixIter := newSuffixIterator(sourceIter, []byte("@example.com"))
|
||||
|
||||
// Iterate through keys with suffix
|
||||
for suffixIter.SeekToFirst(); suffixIter.Valid(); suffixIter.Next() {
|
||||
fmt.Printf("Key with suffix: %s\n", suffixIter.Key())
|
||||
}
|
||||
```
|
||||
|
||||
### Hierarchical Multi-Source Iterator
|
||||
|
||||
```go
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
type ScanOptions struct {
|
||||
// Prefix limit the scan to keys with this prefix
|
||||
Prefix []byte
|
||||
// Suffix limit the scan to keys with this suffix
|
||||
Suffix []byte
|
||||
// StartKey sets the starting point for the scan (inclusive)
|
||||
StartKey []byte
|
||||
// EndKey sets the ending point for the scan (exclusive)
|
||||
@ -74,11 +76,13 @@ func (c *Client) Scan(ctx context.Context, options ScanOptions) (Scanner, error)
|
||||
// Create the scan request
|
||||
req := struct {
|
||||
Prefix []byte `json:"prefix"`
|
||||
Suffix []byte `json:"suffix"`
|
||||
StartKey []byte `json:"start_key"`
|
||||
EndKey []byte `json:"end_key"`
|
||||
Limit int32 `json:"limit"`
|
||||
}{
|
||||
Prefix: options.Prefix,
|
||||
Suffix: options.Suffix,
|
||||
StartKey: options.StartKey,
|
||||
EndKey: options.EndKey,
|
||||
Limit: options.Limit,
|
||||
@ -206,12 +210,14 @@ func (tx *Transaction) Scan(ctx context.Context, options ScanOptions) (Scanner,
|
||||
req := struct {
|
||||
TransactionID string `json:"transaction_id"`
|
||||
Prefix []byte `json:"prefix"`
|
||||
Suffix []byte `json:"suffix"`
|
||||
StartKey []byte `json:"start_key"`
|
||||
EndKey []byte `json:"end_key"`
|
||||
Limit int32 `json:"limit"`
|
||||
}{
|
||||
TransactionID: tx.id,
|
||||
Prefix: options.Prefix,
|
||||
Suffix: options.Suffix,
|
||||
StartKey: options.StartKey,
|
||||
EndKey: options.EndKey,
|
||||
Limit: options.Limit,
|
||||
|
@ -1,511 +1,586 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/KevoDB/kevo/pkg/common/iterator"
|
||||
"github.com/KevoDB/kevo/pkg/engine"
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
"github.com/KevoDB/kevo/pkg/common/iterator"
|
||||
"github.com/KevoDB/kevo/pkg/engine"
|
||||
pb "github.com/KevoDB/kevo/proto/kevo"
|
||||
)
|
||||
|
||||
// TxRegistry is the interface we need for the transaction registry
|
||||
type TxRegistry interface {
|
||||
Begin(ctx context.Context, eng *engine.Engine, readOnly bool) (string, error)
|
||||
Get(txID string) (engine.Transaction, bool)
|
||||
Remove(txID string)
|
||||
Begin(ctx context.Context, eng *engine.Engine, readOnly bool) (string, error)
|
||||
Get(txID string) (engine.Transaction, bool)
|
||||
Remove(txID string)
|
||||
}
|
||||
|
||||
// KevoServiceServer implements the gRPC KevoService interface
|
||||
type KevoServiceServer struct {
|
||||
pb.UnimplementedKevoServiceServer
|
||||
engine *engine.Engine
|
||||
txRegistry TxRegistry
|
||||
activeTx sync.Map // map[string]engine.Transaction
|
||||
txMu sync.Mutex
|
||||
compactionSem chan struct{} // Semaphore for limiting concurrent compactions
|
||||
maxKeySize int // Maximum allowed key size
|
||||
maxValueSize int // Maximum allowed value size
|
||||
maxBatchSize int // Maximum number of operations in a batch
|
||||
maxTransactions int // Maximum number of concurrent transactions
|
||||
transactionTTL int64 // Maximum time in seconds a transaction can be idle
|
||||
activeTransCount int32 // Count of active transactions
|
||||
pb.UnimplementedKevoServiceServer
|
||||
engine *engine.Engine
|
||||
txRegistry TxRegistry
|
||||
activeTx sync.Map // map[string]engine.Transaction
|
||||
txMu sync.Mutex
|
||||
compactionSem chan struct{} // Semaphore for limiting concurrent compactions
|
||||
maxKeySize int // Maximum allowed key size
|
||||
maxValueSize int // Maximum allowed value size
|
||||
maxBatchSize int // Maximum number of operations in a batch
|
||||
maxTransactions int // Maximum number of concurrent transactions
|
||||
transactionTTL int64 // Maximum time in seconds a transaction can be idle
|
||||
activeTransCount int32 // Count of active transactions
|
||||
}
|
||||
|
||||
// NewKevoServiceServer creates a new KevoServiceServer
|
||||
func NewKevoServiceServer(engine *engine.Engine, txRegistry TxRegistry) *KevoServiceServer {
|
||||
return &KevoServiceServer{
|
||||
engine: engine,
|
||||
txRegistry: txRegistry,
|
||||
compactionSem: make(chan struct{}, 1), // Allow only one compaction at a time
|
||||
maxKeySize: 4096, // 4KB
|
||||
maxValueSize: 10 * 1024 * 1024, // 10MB
|
||||
maxBatchSize: 1000,
|
||||
maxTransactions: 1000,
|
||||
transactionTTL: 300, // 5 minutes
|
||||
}
|
||||
return &KevoServiceServer{
|
||||
engine: engine,
|
||||
txRegistry: txRegistry,
|
||||
compactionSem: make(chan struct{}, 1), // Allow only one compaction at a time
|
||||
maxKeySize: 4096, // 4KB
|
||||
maxValueSize: 10 * 1024 * 1024, // 10MB
|
||||
maxBatchSize: 1000,
|
||||
maxTransactions: 1000,
|
||||
transactionTTL: 300, // 5 minutes
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a value for a given key
|
||||
func (s *KevoServiceServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
|
||||
value, err := s.engine.Get(req.Key)
|
||||
if err != nil {
|
||||
return &pb.GetResponse{Found: false}, nil
|
||||
}
|
||||
value, err := s.engine.Get(req.Key)
|
||||
if err != nil {
|
||||
return &pb.GetResponse{Found: false}, nil
|
||||
}
|
||||
|
||||
return &pb.GetResponse{
|
||||
Value: value,
|
||||
Found: true,
|
||||
}, nil
|
||||
return &pb.GetResponse{
|
||||
Value: value,
|
||||
Found: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Put stores a key-value pair
|
||||
func (s *KevoServiceServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error) {
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
|
||||
if len(req.Value) > s.maxValueSize {
|
||||
return nil, fmt.Errorf("value too large")
|
||||
}
|
||||
if len(req.Value) > s.maxValueSize {
|
||||
return nil, fmt.Errorf("value too large")
|
||||
}
|
||||
|
||||
if err := s.engine.Put(req.Key, req.Value); err != nil {
|
||||
return &pb.PutResponse{Success: false}, err
|
||||
}
|
||||
if err := s.engine.Put(req.Key, req.Value); err != nil {
|
||||
return &pb.PutResponse{Success: false}, err
|
||||
}
|
||||
|
||||
return &pb.PutResponse{Success: true}, nil
|
||||
return &pb.PutResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// Delete removes a key-value pair
|
||||
func (s *KevoServiceServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
|
||||
if err := s.engine.Delete(req.Key); err != nil {
|
||||
return &pb.DeleteResponse{Success: false}, err
|
||||
}
|
||||
if err := s.engine.Delete(req.Key); err != nil {
|
||||
return &pb.DeleteResponse{Success: false}, err
|
||||
}
|
||||
|
||||
return &pb.DeleteResponse{Success: true}, nil
|
||||
return &pb.DeleteResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// BatchWrite performs multiple operations in a batch
|
||||
func (s *KevoServiceServer) BatchWrite(ctx context.Context, req *pb.BatchWriteRequest) (*pb.BatchWriteResponse, error) {
|
||||
if len(req.Operations) == 0 {
|
||||
return &pb.BatchWriteResponse{Success: true}, nil
|
||||
}
|
||||
if len(req.Operations) == 0 {
|
||||
return &pb.BatchWriteResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
if len(req.Operations) > s.maxBatchSize {
|
||||
return nil, fmt.Errorf("batch size exceeds maximum allowed (%d)", s.maxBatchSize)
|
||||
}
|
||||
if len(req.Operations) > s.maxBatchSize {
|
||||
return nil, fmt.Errorf("batch size exceeds maximum allowed (%d)", s.maxBatchSize)
|
||||
}
|
||||
|
||||
// Start a transaction for atomic batch operations
|
||||
tx, err := s.engine.BeginTransaction(false) // Read-write transaction
|
||||
if err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, fmt.Errorf("failed to start transaction: %w", err)
|
||||
}
|
||||
// Start a transaction for atomic batch operations
|
||||
tx, err := s.engine.BeginTransaction(false) // Read-write transaction
|
||||
if err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, fmt.Errorf("failed to start transaction: %w", err)
|
||||
}
|
||||
|
||||
// Ensure we either commit or rollback
|
||||
defer func() {
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
// Ensure we either commit or rollback
|
||||
defer func() {
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
// Process each operation
|
||||
for _, op := range req.Operations {
|
||||
if len(op.Key) == 0 || len(op.Key) > s.maxKeySize {
|
||||
err = fmt.Errorf("invalid key size in batch operation")
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
// Process each operation
|
||||
for _, op := range req.Operations {
|
||||
if len(op.Key) == 0 || len(op.Key) > s.maxKeySize {
|
||||
err = fmt.Errorf("invalid key size in batch operation")
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
|
||||
switch op.Type {
|
||||
case pb.Operation_PUT:
|
||||
if len(op.Value) > s.maxValueSize {
|
||||
err = fmt.Errorf("value too large in batch operation")
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
if err = tx.Put(op.Key, op.Value); err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
case pb.Operation_DELETE:
|
||||
if err = tx.Delete(op.Key); err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unknown operation type")
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
}
|
||||
switch op.Type {
|
||||
case pb.Operation_PUT:
|
||||
if len(op.Value) > s.maxValueSize {
|
||||
err = fmt.Errorf("value too large in batch operation")
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
if err = tx.Put(op.Key, op.Value); err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
case pb.Operation_DELETE:
|
||||
if err = tx.Delete(op.Key); err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unknown operation type")
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the transaction
|
||||
if err = tx.Commit(); err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
// Commit the transaction
|
||||
if err = tx.Commit(); err != nil {
|
||||
return &pb.BatchWriteResponse{Success: false}, err
|
||||
}
|
||||
|
||||
return &pb.BatchWriteResponse{Success: true}, nil
|
||||
return &pb.BatchWriteResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// Scan iterates over a range of keys
|
||||
func (s *KevoServiceServer) Scan(req *pb.ScanRequest, stream pb.KevoService_ScanServer) error {
|
||||
var limit int32 = 0
|
||||
if req.Limit > 0 {
|
||||
limit = req.Limit
|
||||
}
|
||||
var limit int32 = 0
|
||||
if req.Limit > 0 {
|
||||
limit = req.Limit
|
||||
}
|
||||
|
||||
// Create a read-only transaction for consistent snapshot
|
||||
tx, err := s.engine.BeginTransaction(true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback() // Always rollback read-only TX when done
|
||||
// Create a read-only transaction for consistent snapshot
|
||||
tx, err := s.engine.BeginTransaction(true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback() // Always rollback read-only TX when done
|
||||
|
||||
// Create appropriate iterator based on request parameters
|
||||
var iter iterator.Iterator
|
||||
if len(req.Prefix) > 0 {
|
||||
// Create a prefix iterator
|
||||
prefixIter := tx.NewIterator()
|
||||
iter = newPrefixIterator(prefixIter, req.Prefix)
|
||||
} else if len(req.StartKey) > 0 || len(req.EndKey) > 0 {
|
||||
// Create a range iterator
|
||||
iter = tx.NewRangeIterator(req.StartKey, req.EndKey)
|
||||
} else {
|
||||
// Create a full scan iterator
|
||||
iter = tx.NewIterator()
|
||||
}
|
||||
// Create appropriate iterator based on request parameters
|
||||
var iter iterator.Iterator
|
||||
if len(req.Prefix) > 0 && len(req.Suffix) > 0 {
|
||||
// Create a combined prefix-suffix iterator
|
||||
baseIter := tx.NewIterator()
|
||||
prefixIter := newPrefixIterator(baseIter, req.Prefix)
|
||||
iter = newSuffixIterator(prefixIter, req.Suffix)
|
||||
} else if len(req.Prefix) > 0 {
|
||||
// Create a prefix iterator
|
||||
prefixIter := tx.NewIterator()
|
||||
iter = newPrefixIterator(prefixIter, req.Prefix)
|
||||
} else if len(req.Suffix) > 0 {
|
||||
// Create a suffix iterator
|
||||
suffixIter := tx.NewIterator()
|
||||
iter = newSuffixIterator(suffixIter, req.Suffix)
|
||||
} else if len(req.StartKey) > 0 || len(req.EndKey) > 0 {
|
||||
// Create a range iterator
|
||||
iter = tx.NewRangeIterator(req.StartKey, req.EndKey)
|
||||
} else {
|
||||
// Create a full scan iterator
|
||||
iter = tx.NewIterator()
|
||||
}
|
||||
|
||||
count := int32(0)
|
||||
// Position iterator at the first entry
|
||||
iter.SeekToFirst()
|
||||
count := int32(0)
|
||||
// Position iterator at the first entry
|
||||
iter.SeekToFirst()
|
||||
|
||||
// Iterate through all valid entries
|
||||
for iter.Valid() {
|
||||
if limit > 0 && count >= limit {
|
||||
break
|
||||
}
|
||||
// Iterate through all valid entries
|
||||
for iter.Valid() {
|
||||
if limit > 0 && count >= limit {
|
||||
break
|
||||
}
|
||||
|
||||
// Skip tombstones (deletion markers)
|
||||
if !iter.IsTombstone() {
|
||||
if err := stream.Send(&pb.ScanResponse{
|
||||
Key: iter.Key(),
|
||||
Value: iter.Value(),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
}
|
||||
// Skip tombstones (deletion markers)
|
||||
if !iter.IsTombstone() {
|
||||
if err := stream.Send(&pb.ScanResponse{
|
||||
Key: iter.Key(),
|
||||
Value: iter.Value(),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
// Move to the next entry
|
||||
iter.Next()
|
||||
}
|
||||
// Move to the next entry
|
||||
iter.Next()
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// prefixIterator wraps another iterator and filters for a prefix
|
||||
type prefixIterator struct {
|
||||
iter iterator.Iterator
|
||||
prefix []byte
|
||||
err error
|
||||
iter iterator.Iterator
|
||||
prefix []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func newPrefixIterator(iter iterator.Iterator, prefix []byte) *prefixIterator {
|
||||
return &prefixIterator{
|
||||
iter: iter,
|
||||
prefix: prefix,
|
||||
}
|
||||
return &prefixIterator{
|
||||
iter: iter,
|
||||
prefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) Next() bool {
|
||||
for pi.iter.Next() {
|
||||
// Check if current key has the prefix
|
||||
key := pi.iter.Key()
|
||||
if len(key) >= len(pi.prefix) &&
|
||||
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
for pi.iter.Next() {
|
||||
// Check if current key has the prefix
|
||||
key := pi.iter.Key()
|
||||
if len(key) >= len(pi.prefix) &&
|
||||
equalByteSlice(key[:len(pi.prefix)], pi.prefix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) Key() []byte {
|
||||
return pi.iter.Key()
|
||||
return pi.iter.Key()
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) Value() []byte {
|
||||
return pi.iter.Value()
|
||||
return pi.iter.Value()
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) Valid() bool {
|
||||
return pi.iter.Valid()
|
||||
return pi.iter.Valid()
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) IsTombstone() bool {
|
||||
return pi.iter.IsTombstone()
|
||||
return pi.iter.IsTombstone()
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) SeekToFirst() {
|
||||
pi.iter.SeekToFirst()
|
||||
pi.iter.SeekToFirst()
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) SeekToLast() {
|
||||
pi.iter.SeekToLast()
|
||||
pi.iter.SeekToLast()
|
||||
}
|
||||
|
||||
func (pi *prefixIterator) Seek(target []byte) bool {
|
||||
return pi.iter.Seek(target)
|
||||
return pi.iter.Seek(target)
|
||||
}
|
||||
|
||||
// equalByteSlice compares two byte slices for equality
|
||||
func equalByteSlice(a, b []byte) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(a); i++ {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < len(a); i++ {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// suffixIterator wraps another iterator and filters for a suffix
|
||||
type suffixIterator struct {
|
||||
iter iterator.Iterator
|
||||
suffix []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func newSuffixIterator(iter iterator.Iterator, suffix []byte) *suffixIterator {
|
||||
return &suffixIterator{
|
||||
iter: iter,
|
||||
suffix: suffix,
|
||||
}
|
||||
}
|
||||
|
||||
func (si *suffixIterator) Next() bool {
|
||||
for si.iter.Next() {
|
||||
// Check if current key has the suffix
|
||||
key := si.iter.Key()
|
||||
if len(key) >= len(si.suffix) {
|
||||
// Compare the suffix portion
|
||||
suffixStart := len(key) - len(si.suffix)
|
||||
if equalByteSlice(key[suffixStart:], si.suffix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (si *suffixIterator) Key() []byte {
|
||||
return si.iter.Key()
|
||||
}
|
||||
|
||||
func (si *suffixIterator) Value() []byte {
|
||||
return si.iter.Value()
|
||||
}
|
||||
|
||||
func (si *suffixIterator) Valid() bool {
|
||||
return si.iter.Valid()
|
||||
}
|
||||
|
||||
func (si *suffixIterator) IsTombstone() bool {
|
||||
return si.iter.IsTombstone()
|
||||
}
|
||||
|
||||
func (si *suffixIterator) SeekToFirst() {
|
||||
si.iter.SeekToFirst()
|
||||
}
|
||||
|
||||
func (si *suffixIterator) SeekToLast() {
|
||||
si.iter.SeekToLast()
|
||||
}
|
||||
|
||||
func (si *suffixIterator) Seek(target []byte) bool {
|
||||
return si.iter.Seek(target)
|
||||
}
|
||||
|
||||
// BeginTransaction starts a new transaction
|
||||
func (s *KevoServiceServer) BeginTransaction(ctx context.Context, req *pb.BeginTransactionRequest) (*pb.BeginTransactionResponse, error) {
|
||||
txID, err := s.txRegistry.Begin(ctx, s.engine, req.ReadOnly)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
txID, err := s.txRegistry.Begin(ctx, s.engine, req.ReadOnly)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
return &pb.BeginTransactionResponse{
|
||||
TransactionId: txID,
|
||||
}, nil
|
||||
return &pb.BeginTransactionResponse{
|
||||
TransactionId: txID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CommitTransaction commits an ongoing transaction
|
||||
func (s *KevoServiceServer) CommitTransaction(ctx context.Context, req *pb.CommitTransactionRequest) (*pb.CommitTransactionResponse, error) {
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return &pb.CommitTransactionResponse{Success: false}, err
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return &pb.CommitTransactionResponse{Success: false}, err
|
||||
}
|
||||
|
||||
s.txRegistry.Remove(req.TransactionId)
|
||||
return &pb.CommitTransactionResponse{Success: true}, nil
|
||||
s.txRegistry.Remove(req.TransactionId)
|
||||
return &pb.CommitTransactionResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// RollbackTransaction aborts an ongoing transaction
|
||||
func (s *KevoServiceServer) RollbackTransaction(ctx context.Context, req *pb.RollbackTransactionRequest) (*pb.RollbackTransactionResponse, error) {
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
|
||||
if err := tx.Rollback(); err != nil {
|
||||
return &pb.RollbackTransactionResponse{Success: false}, err
|
||||
}
|
||||
if err := tx.Rollback(); err != nil {
|
||||
return &pb.RollbackTransactionResponse{Success: false}, err
|
||||
}
|
||||
|
||||
s.txRegistry.Remove(req.TransactionId)
|
||||
return &pb.RollbackTransactionResponse{Success: true}, nil
|
||||
s.txRegistry.Remove(req.TransactionId)
|
||||
return &pb.RollbackTransactionResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// TxGet retrieves a value for a given key within a transaction
|
||||
func (s *KevoServiceServer) TxGet(ctx context.Context, req *pb.TxGetRequest) (*pb.TxGetResponse, error) {
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
|
||||
value, err := tx.Get(req.Key)
|
||||
if err != nil {
|
||||
return &pb.TxGetResponse{Found: false}, nil
|
||||
}
|
||||
value, err := tx.Get(req.Key)
|
||||
if err != nil {
|
||||
return &pb.TxGetResponse{Found: false}, nil
|
||||
}
|
||||
|
||||
return &pb.TxGetResponse{
|
||||
Value: value,
|
||||
Found: true,
|
||||
}, nil
|
||||
return &pb.TxGetResponse{
|
||||
Value: value,
|
||||
Found: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TxPut stores a key-value pair within a transaction
|
||||
func (s *KevoServiceServer) TxPut(ctx context.Context, req *pb.TxPutRequest) (*pb.TxPutResponse, error) {
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
|
||||
if tx.IsReadOnly() {
|
||||
return nil, fmt.Errorf("cannot write to read-only transaction")
|
||||
}
|
||||
if tx.IsReadOnly() {
|
||||
return nil, fmt.Errorf("cannot write to read-only transaction")
|
||||
}
|
||||
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
|
||||
if len(req.Value) > s.maxValueSize {
|
||||
return nil, fmt.Errorf("value too large")
|
||||
}
|
||||
if len(req.Value) > s.maxValueSize {
|
||||
return nil, fmt.Errorf("value too large")
|
||||
}
|
||||
|
||||
if err := tx.Put(req.Key, req.Value); err != nil {
|
||||
return &pb.TxPutResponse{Success: false}, err
|
||||
}
|
||||
if err := tx.Put(req.Key, req.Value); err != nil {
|
||||
return &pb.TxPutResponse{Success: false}, err
|
||||
}
|
||||
|
||||
return &pb.TxPutResponse{Success: true}, nil
|
||||
return &pb.TxPutResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// TxDelete removes a key-value pair within a transaction
|
||||
func (s *KevoServiceServer) TxDelete(ctx context.Context, req *pb.TxDeleteRequest) (*pb.TxDeleteResponse, error) {
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
|
||||
if tx.IsReadOnly() {
|
||||
return nil, fmt.Errorf("cannot delete in read-only transaction")
|
||||
}
|
||||
if tx.IsReadOnly() {
|
||||
return nil, fmt.Errorf("cannot delete in read-only transaction")
|
||||
}
|
||||
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
if len(req.Key) == 0 || len(req.Key) > s.maxKeySize {
|
||||
return nil, fmt.Errorf("invalid key size")
|
||||
}
|
||||
|
||||
if err := tx.Delete(req.Key); err != nil {
|
||||
return &pb.TxDeleteResponse{Success: false}, err
|
||||
}
|
||||
if err := tx.Delete(req.Key); err != nil {
|
||||
return &pb.TxDeleteResponse{Success: false}, err
|
||||
}
|
||||
|
||||
return &pb.TxDeleteResponse{Success: true}, nil
|
||||
return &pb.TxDeleteResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
// TxScan iterates over a range of keys within a transaction
|
||||
func (s *KevoServiceServer) TxScan(req *pb.TxScanRequest, stream pb.KevoService_TxScanServer) error {
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
tx, exists := s.txRegistry.Get(req.TransactionId)
|
||||
if !exists {
|
||||
return fmt.Errorf("transaction not found: %s", req.TransactionId)
|
||||
}
|
||||
|
||||
var limit int32 = 0
|
||||
if req.Limit > 0 {
|
||||
limit = req.Limit
|
||||
}
|
||||
var limit int32 = 0
|
||||
if req.Limit > 0 {
|
||||
limit = req.Limit
|
||||
}
|
||||
|
||||
// Create appropriate iterator based on request parameters
|
||||
var iter iterator.Iterator
|
||||
if len(req.Prefix) > 0 {
|
||||
// Create a prefix iterator
|
||||
rawIter := tx.NewIterator()
|
||||
iter = newPrefixIterator(rawIter, req.Prefix)
|
||||
} else if len(req.StartKey) > 0 || len(req.EndKey) > 0 {
|
||||
// Create a range iterator
|
||||
iter = tx.NewRangeIterator(req.StartKey, req.EndKey)
|
||||
} else {
|
||||
// Create a full scan iterator
|
||||
iter = tx.NewIterator()
|
||||
}
|
||||
// Create appropriate iterator based on request parameters
|
||||
var iter iterator.Iterator
|
||||
if len(req.Prefix) > 0 && len(req.Suffix) > 0 {
|
||||
// Create a combined prefix-suffix iterator
|
||||
baseIter := tx.NewIterator()
|
||||
prefixIter := newPrefixIterator(baseIter, req.Prefix)
|
||||
iter = newSuffixIterator(prefixIter, req.Suffix)
|
||||
} else if len(req.Prefix) > 0 {
|
||||
// Create a prefix iterator
|
||||
rawIter := tx.NewIterator()
|
||||
iter = newPrefixIterator(rawIter, req.Prefix)
|
||||
} else if len(req.Suffix) > 0 {
|
||||
// Create a suffix iterator
|
||||
rawIter := tx.NewIterator()
|
||||
iter = newSuffixIterator(rawIter, req.Suffix)
|
||||
} else if len(req.StartKey) > 0 || len(req.EndKey) > 0 {
|
||||
// Create a range iterator
|
||||
iter = tx.NewRangeIterator(req.StartKey, req.EndKey)
|
||||
} else {
|
||||
// Create a full scan iterator
|
||||
iter = tx.NewIterator()
|
||||
}
|
||||
|
||||
count := int32(0)
|
||||
// Position iterator at the first entry
|
||||
iter.SeekToFirst()
|
||||
count := int32(0)
|
||||
// Position iterator at the first entry
|
||||
iter.SeekToFirst()
|
||||
|
||||
// Iterate through all valid entries
|
||||
for iter.Valid() {
|
||||
if limit > 0 && count >= limit {
|
||||
break
|
||||
}
|
||||
// Iterate through all valid entries
|
||||
for iter.Valid() {
|
||||
if limit > 0 && count >= limit {
|
||||
break
|
||||
}
|
||||
|
||||
// Skip tombstones (deletion markers)
|
||||
if !iter.IsTombstone() {
|
||||
if err := stream.Send(&pb.TxScanResponse{
|
||||
Key: iter.Key(),
|
||||
Value: iter.Value(),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
}
|
||||
// Skip tombstones (deletion markers)
|
||||
if !iter.IsTombstone() {
|
||||
if err := stream.Send(&pb.TxScanResponse{
|
||||
Key: iter.Key(),
|
||||
Value: iter.Value(),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
// Move to the next entry
|
||||
iter.Next()
|
||||
}
|
||||
// Move to the next entry
|
||||
iter.Next()
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStats retrieves database statistics
|
||||
func (s *KevoServiceServer) GetStats(ctx context.Context, req *pb.GetStatsRequest) (*pb.GetStatsResponse, error) {
|
||||
// Collect basic stats that we know are available
|
||||
keyCount := int64(0)
|
||||
sstableCount := int32(0)
|
||||
memtableCount := int32(1) // At least 1 active memtable
|
||||
// Collect basic stats that we know are available
|
||||
keyCount := int64(0)
|
||||
sstableCount := int32(0)
|
||||
memtableCount := int32(1) // At least 1 active memtable
|
||||
|
||||
// Create a read-only transaction to count keys
|
||||
tx, err := s.engine.BeginTransaction(true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to begin transaction for stats: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
// Create a read-only transaction to count keys
|
||||
tx, err := s.engine.BeginTransaction(true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to begin transaction for stats: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Use an iterator to count keys
|
||||
iter := tx.NewIterator()
|
||||
// Use an iterator to count keys
|
||||
iter := tx.NewIterator()
|
||||
|
||||
// Count keys and estimate size
|
||||
var totalSize int64
|
||||
for iter.Next() {
|
||||
keyCount++
|
||||
totalSize += int64(len(iter.Key()) + len(iter.Value()))
|
||||
}
|
||||
// Count keys and estimate size
|
||||
var totalSize int64
|
||||
for iter.Next() {
|
||||
keyCount++
|
||||
totalSize += int64(len(iter.Key()) + len(iter.Value()))
|
||||
}
|
||||
|
||||
return &pb.GetStatsResponse{
|
||||
KeyCount: keyCount,
|
||||
StorageSize: totalSize,
|
||||
MemtableCount: memtableCount,
|
||||
SstableCount: sstableCount,
|
||||
WriteAmplification: 1.0, // Placeholder
|
||||
ReadAmplification: 1.0, // Placeholder
|
||||
}, nil
|
||||
return &pb.GetStatsResponse{
|
||||
KeyCount: keyCount,
|
||||
StorageSize: totalSize,
|
||||
MemtableCount: memtableCount,
|
||||
SstableCount: sstableCount,
|
||||
WriteAmplification: 1.0, // Placeholder
|
||||
ReadAmplification: 1.0, // Placeholder
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Compact triggers database compaction
|
||||
func (s *KevoServiceServer) Compact(ctx context.Context, req *pb.CompactRequest) (*pb.CompactResponse, error) {
|
||||
// Use a semaphore to prevent multiple concurrent compactions
|
||||
select {
|
||||
case s.compactionSem <- struct{}{}:
|
||||
// We got the semaphore, proceed with compaction
|
||||
defer func() { <-s.compactionSem }()
|
||||
default:
|
||||
// Semaphore is full, compaction is already running
|
||||
return &pb.CompactResponse{Success: false}, fmt.Errorf("compaction is already in progress")
|
||||
}
|
||||
// Use a semaphore to prevent multiple concurrent compactions
|
||||
select {
|
||||
case s.compactionSem <- struct{}{}:
|
||||
// We got the semaphore, proceed with compaction
|
||||
defer func() { <-s.compactionSem }()
|
||||
default:
|
||||
// Semaphore is full, compaction is already running
|
||||
return &pb.CompactResponse{Success: false}, fmt.Errorf("compaction is already in progress")
|
||||
}
|
||||
|
||||
// For now, Compact just performs a memtable flush as we don't have a public
|
||||
// Compact method on the engine yet
|
||||
tx, err := s.engine.BeginTransaction(false)
|
||||
if err != nil {
|
||||
return &pb.CompactResponse{Success: false}, err
|
||||
}
|
||||
// For now, Compact just performs a memtable flush as we don't have a public
|
||||
// Compact method on the engine yet
|
||||
tx, err := s.engine.BeginTransaction(false)
|
||||
if err != nil {
|
||||
return &pb.CompactResponse{Success: false}, err
|
||||
}
|
||||
|
||||
// Do a dummy write to force a flush
|
||||
if req.Force {
|
||||
err = tx.Put([]byte("__compact_marker__"), []byte("force"))
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return &pb.CompactResponse{Success: false}, err
|
||||
}
|
||||
}
|
||||
// Do a dummy write to force a flush
|
||||
if req.Force {
|
||||
err = tx.Put([]byte("__compact_marker__"), []byte("force"))
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return &pb.CompactResponse{Success: false}, err
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return &pb.CompactResponse{Success: false}, err
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return &pb.CompactResponse{Success: false}, err
|
||||
}
|
||||
|
||||
return &pb.CompactResponse{Success: true}, nil
|
||||
return &pb.CompactResponse{Success: true}, nil
|
||||
}
|
||||
|
@ -525,6 +525,7 @@ func (x *BatchWriteResponse) GetSuccess() bool {
|
||||
type ScanRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Prefix []byte `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"`
|
||||
Suffix []byte `protobuf:"bytes,5,opt,name=suffix,proto3" json:"suffix,omitempty"`
|
||||
StartKey []byte `protobuf:"bytes,2,opt,name=start_key,json=startKey,proto3" json:"start_key,omitempty"`
|
||||
EndKey []byte `protobuf:"bytes,3,opt,name=end_key,json=endKey,proto3" json:"end_key,omitempty"`
|
||||
Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit,omitempty"`
|
||||
@ -569,6 +570,13 @@ func (x *ScanRequest) GetPrefix() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *ScanRequest) GetSuffix() []byte {
|
||||
if x != nil {
|
||||
return x.Suffix
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *ScanRequest) GetStartKey() []byte {
|
||||
if x != nil {
|
||||
return x.StartKey
|
||||
@ -1215,6 +1223,7 @@ type TxScanRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
TransactionId string `protobuf:"bytes,1,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"`
|
||||
Prefix []byte `protobuf:"bytes,2,opt,name=prefix,proto3" json:"prefix,omitempty"`
|
||||
Suffix []byte `protobuf:"bytes,6,opt,name=suffix,proto3" json:"suffix,omitempty"`
|
||||
StartKey []byte `protobuf:"bytes,3,opt,name=start_key,json=startKey,proto3" json:"start_key,omitempty"`
|
||||
EndKey []byte `protobuf:"bytes,4,opt,name=end_key,json=endKey,proto3" json:"end_key,omitempty"`
|
||||
Limit int32 `protobuf:"varint,5,opt,name=limit,proto3" json:"limit,omitempty"`
|
||||
@ -1266,6 +1275,13 @@ func (x *TxScanRequest) GetPrefix() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *TxScanRequest) GetSuffix() []byte {
|
||||
if x != nil {
|
||||
return x.Suffix
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *TxScanRequest) GetStartKey() []byte {
|
||||
if x != nil {
|
||||
return x.StartKey
|
||||
@ -1585,9 +1601,10 @@ const file_proto_kevo_service_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\x06DELETE\x10\x01\".\n" +
|
||||
"\x12BatchWriteResponse\x12\x18\n" +
|
||||
"\asuccess\x18\x01 \x01(\bR\asuccess\"q\n" +
|
||||
"\asuccess\x18\x01 \x01(\bR\asuccess\"\x89\x01\n" +
|
||||
"\vScanRequest\x12\x16\n" +
|
||||
"\x06prefix\x18\x01 \x01(\fR\x06prefix\x12\x1b\n" +
|
||||
"\x06prefix\x18\x01 \x01(\fR\x06prefix\x12\x16\n" +
|
||||
"\x06suffix\x18\x05 \x01(\fR\x06suffix\x12\x1b\n" +
|
||||
"\tstart_key\x18\x02 \x01(\fR\bstartKey\x12\x17\n" +
|
||||
"\aend_key\x18\x03 \x01(\fR\x06endKey\x12\x14\n" +
|
||||
"\x05limit\x18\x04 \x01(\x05R\x05limit\"6\n" +
|
||||
@ -1622,10 +1639,11 @@ const file_proto_kevo_service_proto_rawDesc = "" +
|
||||
"\x0etransaction_id\x18\x01 \x01(\tR\rtransactionId\x12\x10\n" +
|
||||
"\x03key\x18\x02 \x01(\fR\x03key\",\n" +
|
||||
"\x10TxDeleteResponse\x12\x18\n" +
|
||||
"\asuccess\x18\x01 \x01(\bR\asuccess\"\x9a\x01\n" +
|
||||
"\asuccess\x18\x01 \x01(\bR\asuccess\"\xb2\x01\n" +
|
||||
"\rTxScanRequest\x12%\n" +
|
||||
"\x0etransaction_id\x18\x01 \x01(\tR\rtransactionId\x12\x16\n" +
|
||||
"\x06prefix\x18\x02 \x01(\fR\x06prefix\x12\x1b\n" +
|
||||
"\x06prefix\x18\x02 \x01(\fR\x06prefix\x12\x16\n" +
|
||||
"\x06suffix\x18\x06 \x01(\fR\x06suffix\x12\x1b\n" +
|
||||
"\tstart_key\x18\x03 \x01(\fR\bstartKey\x12\x17\n" +
|
||||
"\aend_key\x18\x04 \x01(\fR\x06endKey\x12\x14\n" +
|
||||
"\x05limit\x18\x05 \x01(\x05R\x05limit\"8\n" +
|
||||
@ -1659,7 +1677,7 @@ const file_proto_kevo_service_proto_rawDesc = "" +
|
||||
"\bTxDelete\x12\x15.kevo.TxDeleteRequest\x1a\x16.kevo.TxDeleteResponse\x125\n" +
|
||||
"\x06TxScan\x12\x13.kevo.TxScanRequest\x1a\x14.kevo.TxScanResponse0\x01\x129\n" +
|
||||
"\bGetStats\x12\x15.kevo.GetStatsRequest\x1a\x16.kevo.GetStatsResponse\x126\n" +
|
||||
"\aCompact\x12\x14.kevo.CompactRequest\x1a\x15.kevo.CompactResponseB5Z3github.com/KevoDB/kevo/pkg/grpc/proto;protob\x06proto3"
|
||||
"\aCompact\x12\x14.kevo.CompactRequest\x1a\x15.kevo.CompactResponseB5Z3github.com/jeremytregunna/kevo/pkg/grpc/proto;protob\x06proto3"
|
||||
|
||||
var (
|
||||
file_proto_kevo_service_proto_rawDescOnce sync.Once
|
||||
|
@ -84,6 +84,7 @@ message BatchWriteResponse {
|
||||
// Iterator operations
|
||||
message ScanRequest {
|
||||
bytes prefix = 1;
|
||||
bytes suffix = 5;
|
||||
bytes start_key = 2;
|
||||
bytes end_key = 3;
|
||||
int32 limit = 4;
|
||||
@ -151,6 +152,7 @@ message TxDeleteResponse {
|
||||
message TxScanRequest {
|
||||
string transaction_id = 1;
|
||||
bytes prefix = 2;
|
||||
bytes suffix = 6;
|
||||
bytes start_key = 3;
|
||||
bytes end_key = 4;
|
||||
int32 limit = 5;
|
||||
|
Loading…
Reference in New Issue
Block a user