kevo/pkg/sstable/sstable.go
Jeremy Tregunna 1fba7f27f3
All checks were successful
Go Tests / Run Tests (1.24.2) (push) Successful in 9m38s
feat: implement SSTable package with block handling and restart points
2025-04-19 17:39:43 -06:00

779 lines
19 KiB
Go

package sstable
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"git.canoozie.net/jer/go-storage/pkg/sstable/block"
"git.canoozie.net/jer/go-storage/pkg/sstable/footer"
)
const (
// IndexBlockEntrySize is the approximate size of an index entry
IndexBlockEntrySize = 20
// DefaultBlockSize is the target size for data blocks
DefaultBlockSize = block.BlockSize
// IndexKeyInterval controls how frequently we add keys to the index
IndexKeyInterval = 64 * 1024 // Add index entry every ~64KB
)
var (
// ErrNotFound indicates a key was not found in the SSTable
ErrNotFound = errors.New("key not found in sstable")
// ErrCorruption indicates data corruption was detected
ErrCorruption = errors.New("sstable corruption detected")
)
// IndexEntry represents a block index entry
type IndexEntry struct {
// BlockOffset is the offset of the block in the file
BlockOffset uint64
// BlockSize is the size of the block in bytes
BlockSize uint32
// FirstKey is the first key in the block
FirstKey []byte
}
// Writer writes an SSTable file
type Writer struct {
path string
tmpPath string
file *os.File
blockBuilder *block.Builder
indexBuilder *block.Builder
dataOffset uint64
pendingIndexEntries []*IndexEntry
firstKey []byte
lastKey []byte
entriesAdded uint32
}
// NewWriter creates a new SSTable writer
func NewWriter(path string) (*Writer, error) {
// Create temporary file for writing
dir := filepath.Dir(path)
tmpPath := filepath.Join(dir, fmt.Sprintf(".%s.tmp", filepath.Base(path)))
file, err := os.Create(tmpPath)
if err != nil {
return nil, fmt.Errorf("failed to create temporary file: %w", err)
}
// Set a smaller block size for testing
blockBuilder := block.NewBuilder()
return &Writer{
path: path,
tmpPath: tmpPath,
file: file,
blockBuilder: blockBuilder,
indexBuilder: block.NewBuilder(),
dataOffset: 0,
pendingIndexEntries: make([]*IndexEntry, 0),
entriesAdded: 0,
}, nil
}
// Add adds a key-value pair to the SSTable
// Keys must be added in sorted order
func (w *Writer) Add(key, value []byte) error {
// Keep track of first and last keys
if w.entriesAdded == 0 {
w.firstKey = append([]byte(nil), key...)
}
w.lastKey = append([]byte(nil), key...)
// Add to block
if err := w.blockBuilder.Add(key, value); err != nil {
return fmt.Errorf("failed to add to block: %w", err)
}
w.entriesAdded++
// Flush the block if it's getting too large
// Use IndexKeyInterval to determine when to flush based on accumulated data size
if w.blockBuilder.EstimatedSize() >= IndexKeyInterval {
if err := w.flushBlock(); err != nil {
return err
}
}
return nil
}
// flushBlock writes the current block to the file and adds an index entry
func (w *Writer) flushBlock() error {
// Skip if the block is empty
if w.blockBuilder.Entries() == 0 {
return nil
}
// Record the offset of this block
blockOffset := w.dataOffset
// Get first key
entries := w.blockBuilder.GetEntries()
if len(entries) == 0 {
return fmt.Errorf("block has no entries")
}
firstKey := entries[0].Key
// Serialize the block
var blockBuf bytes.Buffer
_, err := w.blockBuilder.Finish(&blockBuf)
if err != nil {
return fmt.Errorf("failed to finish block: %w", err)
}
blockData := blockBuf.Bytes()
blockSize := uint32(len(blockData))
// Write the block to file
n, err := w.file.Write(blockData)
if err != nil {
return fmt.Errorf("failed to write block to file: %w", err)
}
if n != len(blockData) {
return fmt.Errorf("wrote incomplete block: %d of %d bytes", n, len(blockData))
}
// Add the index entry
w.pendingIndexEntries = append(w.pendingIndexEntries, &IndexEntry{
BlockOffset: blockOffset,
BlockSize: blockSize,
FirstKey: firstKey,
})
// Update offset for next block
w.dataOffset += uint64(n)
// Reset the block builder for next block
w.blockBuilder.Reset()
return nil
}
// Finish completes the SSTable writing process
func (w *Writer) Finish() error {
defer func() {
if w.file != nil {
w.file.Close()
}
}()
// Flush any pending data block (only if we have entries that haven't been flushed)
if w.blockBuilder.Entries() > 0 {
if err := w.flushBlock(); err != nil {
return err
}
}
// Create index block
indexOffset := w.dataOffset
// Add all index entries to the index block
for _, entry := range w.pendingIndexEntries {
// Index entry format: key=firstKey, value=blockOffset+blockSize
var valueBuf bytes.Buffer
binary.Write(&valueBuf, binary.LittleEndian, entry.BlockOffset)
binary.Write(&valueBuf, binary.LittleEndian, entry.BlockSize)
if err := w.indexBuilder.Add(entry.FirstKey, valueBuf.Bytes()); err != nil {
return fmt.Errorf("failed to add index entry: %w", err)
}
}
// Write index block
var indexBuf bytes.Buffer
_, err := w.indexBuilder.Finish(&indexBuf)
if err != nil {
return fmt.Errorf("failed to finish index block: %w", err)
}
indexData := indexBuf.Bytes()
indexSize := uint32(len(indexData))
n, err := w.file.Write(indexData)
if err != nil {
return fmt.Errorf("failed to write index block: %w", err)
}
if n != len(indexData) {
return fmt.Errorf("wrote incomplete index block: %d of %d bytes",
n, len(indexData))
}
// Create footer
ft := footer.NewFooter(
indexOffset,
indexSize,
w.entriesAdded,
0, // MinKeyOffset - not implemented yet
0, // MaxKeyOffset - not implemented yet
)
// Write footer
_, err = ft.WriteTo(w.file)
if err != nil {
return fmt.Errorf("failed to write footer: %w", err)
}
// Sync the file
if err := w.file.Sync(); err != nil {
return fmt.Errorf("failed to sync file: %w", err)
}
// Close the file before renaming
if err := w.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %w", err)
}
w.file = nil
// Rename the temp file to the final path
if err := os.Rename(w.tmpPath, w.path); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
return nil
}
// Abort cancels the SSTable writing process
func (w *Writer) Abort() error {
if w.file != nil {
w.file.Close()
w.file = nil
os.Remove(w.tmpPath)
}
return nil
}
// Reader reads an SSTable file
type Reader struct {
path string
file *os.File
fileSize int64
indexOffset uint64
indexSize uint32
numEntries uint32
indexBlock *block.Reader
ft *footer.Footer
mu sync.RWMutex
}
// OpenReader opens an SSTable file for reading
func OpenReader(path string) (*Reader, error) {
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
// Get file size
stat, err := file.Stat()
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to stat file: %w", err)
}
fileSize := stat.Size()
// Ensure file is large enough for a footer
if fileSize < int64(footer.FooterSize) {
file.Close()
return nil, fmt.Errorf("file too small to be valid SSTable: %d bytes", fileSize)
}
// Read footer
footerData := make([]byte, footer.FooterSize)
_, err = file.ReadAt(footerData, fileSize-int64(footer.FooterSize))
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to read footer: %w", err)
}
ft, err := footer.Decode(footerData)
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to decode footer: %w", err)
}
// Read index block
indexData := make([]byte, ft.IndexSize)
_, err = file.ReadAt(indexData, int64(ft.IndexOffset))
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to read index block: %w", err)
}
indexBlock, err := block.NewReader(indexData)
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to create index block reader: %w", err)
}
return &Reader{
path: path,
file: file,
fileSize: fileSize,
indexOffset: ft.IndexOffset,
indexSize: ft.IndexSize,
numEntries: ft.NumEntries,
indexBlock: indexBlock,
ft: ft,
}, nil
}
// Get returns the value for a given key
func (r *Reader) Get(key []byte) ([]byte, error) {
r.mu.RLock()
defer r.mu.RUnlock()
// Track what blocks we've already searched to avoid duplicates
seenBlocks := make(map[uint64]bool)
// Iterate through each block and check for the key
indexIter := r.indexBlock.Iterator()
for indexIter.SeekToFirst(); indexIter.Valid(); indexIter.Next() {
// Get block info
indexValue := indexIter.Value()
if len(indexValue) < 12 { // offset (8) + size (4)
continue
}
blockOffset := binary.LittleEndian.Uint64(indexValue[:8])
blockSize := binary.LittleEndian.Uint32(indexValue[8:12])
// Skip blocks we've already searched
if seenBlocks[blockOffset] {
continue
}
seenBlocks[blockOffset] = true
// Read the data block
blockData := make([]byte, blockSize)
_, err := r.file.ReadAt(blockData, int64(blockOffset))
if err != nil {
return nil, fmt.Errorf("failed to read data block at offset %d: %w", blockOffset, err)
}
// Parse the block
blockReader, err := block.NewReader(blockData)
if err != nil {
return nil, fmt.Errorf("failed to create block reader: %w", err)
}
// Search for the key in the block
blockIter := blockReader.Iterator()
// Scan all entries in this block looking for the key
for blockIter.SeekToFirst(); blockIter.Valid(); blockIter.Next() {
if bytes.Equal(blockIter.Key(), key) {
return blockIter.Value(), nil
}
}
}
return nil, ErrNotFound
}
// NewIterator returns an iterator over the entire SSTable
func (r *Reader) NewIterator() *Iterator {
r.mu.RLock()
defer r.mu.RUnlock()
// Create a fresh block.Iterator for the index
indexIter := r.indexBlock.Iterator()
// Pre-check that we have at least one valid index entry
indexIter.SeekToFirst()
return &Iterator{
reader: r,
indexIterator: indexIter,
dataBlockIter: nil,
currentBlock: nil,
initialized: false,
}
}
// Close closes the SSTable reader
func (r *Reader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.file == nil {
return nil
}
err := r.file.Close()
r.file = nil
return err
}
// Iterator iterates over key-value pairs in an SSTable
type Iterator struct {
reader *Reader
indexIterator *block.Iterator
dataBlockIter *block.Iterator
currentBlock *block.Reader
err error
initialized bool
mu sync.Mutex
}
// SeekToFirst positions the iterator at the first key
func (it *Iterator) SeekToFirst() {
it.mu.Lock()
defer it.mu.Unlock()
// Reset error state
it.err = nil
// Position index iterator at the first entry
it.indexIterator.SeekToFirst()
// Load the first valid data block
if it.indexIterator.Valid() {
// Skip invalid entries
if len(it.indexIterator.Value()) < 8 {
for it.indexIterator.Next() {
if len(it.indexIterator.Value()) >= 8 {
break
}
}
}
if it.indexIterator.Valid() {
// Load the data block
it.loadCurrentDataBlock()
// Position the data block iterator at the first key
if it.dataBlockIter != nil {
it.dataBlockIter.SeekToFirst()
}
}
}
if !it.indexIterator.Valid() || it.dataBlockIter == nil {
// No valid index entries
it.currentBlock = nil
it.dataBlockIter = nil
}
it.initialized = true
}
// SeekToLast positions the iterator at the last key
func (it *Iterator) SeekToLast() {
it.mu.Lock()
defer it.mu.Unlock()
// Reset error state
it.err = nil
// Find the last unique block by tracking all seen blocks
// Start by finding all unique block offsets
seenBlocks := make(map[uint64]bool)
var lastBlockOffset uint64
var lastBlockValid bool
// Position index iterator at the first entry
it.indexIterator.SeekToFirst()
// Scan through all blocks to find the last unique one
for it.indexIterator.Valid() {
if len(it.indexIterator.Value()) >= 8 {
blockOffset := binary.LittleEndian.Uint64(it.indexIterator.Value()[:8])
if !seenBlocks[blockOffset] {
seenBlocks[blockOffset] = true
lastBlockOffset = blockOffset
lastBlockValid = true
}
}
it.indexIterator.Next()
}
// Position index at an entry pointing to the last block
if lastBlockValid {
it.indexIterator.SeekToFirst()
for it.indexIterator.Valid() {
if len(it.indexIterator.Value()) >= 8 {
blockOffset := binary.LittleEndian.Uint64(it.indexIterator.Value()[:8])
if blockOffset == lastBlockOffset {
break
}
}
it.indexIterator.Next()
}
// Load the last data block
it.loadCurrentDataBlock()
// Position the data block iterator at the last key
if it.dataBlockIter != nil {
it.dataBlockIter.SeekToLast()
}
} else {
// No valid index entries
it.currentBlock = nil
it.dataBlockIter = nil
}
it.initialized = true
}
// Seek positions the iterator at the first key >= target
func (it *Iterator) Seek(target []byte) bool {
it.mu.Lock()
defer it.mu.Unlock()
// Reset error state
it.err = nil
it.initialized = true
// Find the block that might contain the key
// The index contains the first key of each block
if !it.indexIterator.Seek(target) {
// If seeking in the index fails, try the last block
it.indexIterator.SeekToLast()
if !it.indexIterator.Valid() {
// No blocks in the SSTable
it.currentBlock = nil
it.dataBlockIter = nil
return false
}
}
// Load the data block at the current index position
it.loadCurrentDataBlock()
if it.dataBlockIter == nil {
return false
}
// Try to find the target key in this block
if it.dataBlockIter.Seek(target) {
// Found a key >= target in this block
return true
}
// If we didn't find the key in this block, it might be in a later block
// The target is greater than all keys in the current block
var foundValidKey bool
// Store current block offset to skip duplicates
var currentBlockOffset uint64
if len(it.indexIterator.Value()) >= 8 {
currentBlockOffset = binary.LittleEndian.Uint64(it.indexIterator.Value()[:8])
}
// Try subsequent blocks, skipping duplicates
for it.indexIterator.Next() {
// Skip invalid entries or duplicates of the current block
if !it.indexIterator.Valid() || len(it.indexIterator.Value()) < 8 {
continue
}
nextBlockOffset := binary.LittleEndian.Uint64(it.indexIterator.Value()[:8])
if nextBlockOffset == currentBlockOffset {
// This is a duplicate index entry pointing to the same block, skip it
continue
}
// Found a new block, update current offset
currentBlockOffset = nextBlockOffset
it.loadCurrentDataBlock()
if it.dataBlockIter == nil {
return false
}
// Position at the first key in the next block
it.dataBlockIter.SeekToFirst()
if it.dataBlockIter.Valid() {
foundValidKey = true
break
}
}
return foundValidKey
}
// Next advances the iterator to the next key
func (it *Iterator) Next() bool {
it.mu.Lock()
defer it.mu.Unlock()
if !it.initialized {
it.SeekToFirst()
return it.Valid()
}
if it.dataBlockIter == nil {
// If we don't have a current block, attempt to load the one at the current index position
if it.indexIterator.Valid() {
it.loadCurrentDataBlock()
if it.dataBlockIter != nil {
it.dataBlockIter.SeekToFirst()
return it.dataBlockIter.Valid()
}
}
return false
}
// Try to advance within current block
if it.dataBlockIter.Next() {
// Successfully moved to the next entry in the current block
return true
}
// We've reached the end of the current block, so try to move to the next block
// Store the current block's offset to find the next unique block
var currentBlockOffset uint64
if len(it.indexIterator.Value()) >= 8 {
currentBlockOffset = binary.LittleEndian.Uint64(it.indexIterator.Value()[:8])
}
// Find next block with a different offset
var nextBlockFound bool
for it.indexIterator.Next() {
// Skip invalid entries or entries pointing to the same block
if !it.indexIterator.Valid() || len(it.indexIterator.Value()) < 8 {
continue
}
nextBlockOffset := binary.LittleEndian.Uint64(it.indexIterator.Value()[:8])
if nextBlockOffset != currentBlockOffset {
// Found a new block
nextBlockFound = true
break
}
}
if !nextBlockFound || !it.indexIterator.Valid() {
// No more unique blocks in the index
it.currentBlock = nil
it.dataBlockIter = nil
return false
}
// Load the next block
it.loadCurrentDataBlock()
if it.dataBlockIter == nil {
return false
}
// Start at the beginning of the new block
it.dataBlockIter.SeekToFirst()
return it.dataBlockIter.Valid()
}
// Key returns the current key
func (it *Iterator) Key() []byte {
it.mu.Lock()
defer it.mu.Unlock()
if !it.initialized || it.dataBlockIter == nil || !it.dataBlockIter.Valid() {
return nil
}
return it.dataBlockIter.Key()
}
// Value returns the current value
func (it *Iterator) Value() []byte {
it.mu.Lock()
defer it.mu.Unlock()
if !it.initialized || it.dataBlockIter == nil || !it.dataBlockIter.Valid() {
return nil
}
return it.dataBlockIter.Value()
}
// Valid returns true if the iterator is positioned at a valid entry
func (it *Iterator) Valid() bool {
it.mu.Lock()
defer it.mu.Unlock()
return it.initialized && it.dataBlockIter != nil && it.dataBlockIter.Valid()
}
// Error returns any error encountered during iteration
func (it *Iterator) Error() error {
it.mu.Lock()
defer it.mu.Unlock()
return it.err
}
// loadCurrentDataBlock loads the data block at the current index iterator position
func (it *Iterator) loadCurrentDataBlock() {
// Check if index iterator is valid
if !it.indexIterator.Valid() {
it.currentBlock = nil
it.dataBlockIter = nil
it.err = fmt.Errorf("index iterator not valid")
return
}
// Parse block location from index value
indexValue := it.indexIterator.Value()
if len(indexValue) < 12 { // offset (8) + size (4)
it.err = fmt.Errorf("invalid index entry (too short, length=%d): %w",
len(indexValue), ErrCorruption)
it.currentBlock = nil
it.dataBlockIter = nil
return
}
blockOffset := binary.LittleEndian.Uint64(indexValue[:8])
blockSize := binary.LittleEndian.Uint32(indexValue[8:12])
// Sanity check on block size and offset
if blockSize == 0 {
it.err = fmt.Errorf("invalid block size (0): %w", ErrCorruption)
it.currentBlock = nil
it.dataBlockIter = nil
return
}
if blockOffset >= uint64(it.reader.fileSize) {
it.err = fmt.Errorf("invalid block offset (%d >= file size %d): %w",
blockOffset, it.reader.fileSize, ErrCorruption)
it.currentBlock = nil
it.dataBlockIter = nil
return
}
// Read the data block
blockData := make([]byte, blockSize)
n, err := it.reader.file.ReadAt(blockData, int64(blockOffset))
if err != nil {
it.err = fmt.Errorf("failed to read data block at offset %d: %w",
blockOffset, err)
it.currentBlock = nil
it.dataBlockIter = nil
return
}
if n != int(blockSize) {
it.err = fmt.Errorf("incomplete block read: got %d bytes, expected %d: %w",
n, blockSize, ErrCorruption)
it.currentBlock = nil
it.dataBlockIter = nil
return
}
// Parse the block
blockReader, err := block.NewReader(blockData)
if err != nil {
it.err = fmt.Errorf("failed to create block reader for block at offset %d: %w",
blockOffset, err)
it.currentBlock = nil
it.dataBlockIter = nil
return
}
it.currentBlock = blockReader
it.dataBlockIter = blockReader.Iterator()
}