refactor: consolidate writeRawRecord in WAL with other functions
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled
Some checks failed
Go Tests / Run Tests (1.24.2) (push) Has been cancelled
This commit is contained in:
parent
33a8a41e7d
commit
2335e9a10a
@ -352,11 +352,6 @@ func (w *WAL) writeRecord(recordType uint8, entryType uint8, seqNum uint64, key,
|
||||
return fmt.Errorf("record too large: %d > %d", payloadSize, MaxRecordSize)
|
||||
}
|
||||
|
||||
// Prepare the header
|
||||
header := make([]byte, HeaderSize)
|
||||
binary.LittleEndian.PutUint16(header[4:6], uint16(payloadSize))
|
||||
header[6] = recordType
|
||||
|
||||
// Prepare the payload
|
||||
payload := make([]byte, payloadSize)
|
||||
offset := 0
|
||||
@ -382,23 +377,8 @@ func (w *WAL) writeRecord(recordType uint8, entryType uint8, seqNum uint64, key,
|
||||
copy(payload[offset:], value)
|
||||
}
|
||||
|
||||
// Calculate CRC
|
||||
crc := crc32.ChecksumIEEE(payload)
|
||||
binary.LittleEndian.PutUint32(header[0:4], crc)
|
||||
|
||||
// Write the record
|
||||
if _, err := w.writer.Write(header); err != nil {
|
||||
return fmt.Errorf("failed to write record header: %w", err)
|
||||
}
|
||||
if _, err := w.writer.Write(payload); err != nil {
|
||||
return fmt.Errorf("failed to write record payload: %w", err)
|
||||
}
|
||||
|
||||
// Update bytes written
|
||||
w.bytesWritten += int64(HeaderSize + payloadSize)
|
||||
w.batchByteSize += int64(HeaderSize + payloadSize)
|
||||
|
||||
return nil
|
||||
// Use writeRawRecord to write the record
|
||||
return w.writeRawRecord(recordType, payload)
|
||||
}
|
||||
|
||||
// writeRawRecord writes a raw record with provided data as payload
|
||||
@ -416,17 +396,24 @@ func (w *WAL) writeRawRecord(recordType uint8, data []byte) error {
|
||||
crc := crc32.ChecksumIEEE(data)
|
||||
binary.LittleEndian.PutUint32(header[0:4], crc)
|
||||
|
||||
// Write the record using the common writeRecordData method
|
||||
return w.writeRecordData(header, data)
|
||||
}
|
||||
|
||||
// writeRecordData writes a complete record (header + payload) directly to the WAL
|
||||
// This is a lower-level method that handles the actual writing to the buffer and updating bytes written
|
||||
func (w *WAL) writeRecordData(header, payload []byte) error {
|
||||
// Write the record
|
||||
if _, err := w.writer.Write(header); err != nil {
|
||||
return fmt.Errorf("failed to write record header: %w", err)
|
||||
}
|
||||
if _, err := w.writer.Write(data); err != nil {
|
||||
if _, err := w.writer.Write(payload); err != nil {
|
||||
return fmt.Errorf("failed to write record payload: %w", err)
|
||||
}
|
||||
|
||||
// Update bytes written
|
||||
w.bytesWritten += int64(HeaderSize + len(data))
|
||||
w.batchByteSize += int64(HeaderSize + len(data))
|
||||
w.bytesWritten += int64(len(header) + len(payload))
|
||||
w.batchByteSize += int64(len(header) + len(payload))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -463,19 +450,19 @@ func (w *WAL) AppendExactBytes(rawBytes []byte, seqNum uint64) (uint64, error) {
|
||||
w.nextSequence = seqNum + 1
|
||||
}
|
||||
|
||||
// Write the raw bytes directly to the WAL
|
||||
if _, err := w.writer.Write(rawBytes); err != nil {
|
||||
// Extract header and payload
|
||||
header := rawBytes[:HeaderSize]
|
||||
payload := rawBytes[HeaderSize:]
|
||||
|
||||
// Write the record data
|
||||
if err := w.writeRecordData(header, payload); err != nil {
|
||||
return 0, fmt.Errorf("failed to write raw WAL record: %w", err)
|
||||
}
|
||||
|
||||
// Update bytes written
|
||||
w.bytesWritten += int64(len(rawBytes))
|
||||
w.batchByteSize += int64(len(rawBytes))
|
||||
|
||||
// Notify observers (with a simplified Entry since we can't properly parse the raw bytes)
|
||||
entry := &Entry{
|
||||
SequenceNumber: seqNum,
|
||||
Type: rawBytes[HeaderSize], // Read first byte of payload as entry type
|
||||
Type: payload[0], // Read first byte of payload as entry type
|
||||
Key: []byte{},
|
||||
Value: []byte{},
|
||||
}
|
||||
@ -619,6 +606,26 @@ func (w *WAL) Sync() error {
|
||||
return w.syncLocked()
|
||||
}
|
||||
|
||||
// serializeBatch prepares a batch header with common batch information
|
||||
func (w *WAL) serializeBatchHeader(startSeqNum uint64, entryCount int) []byte {
|
||||
// Create batch header: opType(1) + seqNum(8) + entryCount(4)
|
||||
batchHeader := make([]byte, 1+8+4)
|
||||
offset := 0
|
||||
|
||||
// Write operation type (batch)
|
||||
batchHeader[offset] = OpTypeBatch
|
||||
offset++
|
||||
|
||||
// Write sequence number
|
||||
binary.LittleEndian.PutUint64(batchHeader[offset:offset+8], startSeqNum)
|
||||
offset += 8
|
||||
|
||||
// Write entry count
|
||||
binary.LittleEndian.PutUint32(batchHeader[offset:offset+4], uint32(entryCount))
|
||||
|
||||
return batchHeader
|
||||
}
|
||||
|
||||
// AppendBatch adds a batch of entries to the WAL
|
||||
func (w *WAL) AppendBatch(entries []*Entry) (uint64, error) {
|
||||
w.mu.Lock()
|
||||
@ -638,22 +645,8 @@ func (w *WAL) AppendBatch(entries []*Entry) (uint64, error) {
|
||||
// Start sequence number for the batch
|
||||
startSeqNum := w.nextSequence
|
||||
|
||||
// Record this as a batch operation with the number of entries
|
||||
batchHeader := make([]byte, 1+8+4) // opType(1) + seqNum(8) + entryCount(4)
|
||||
offset := 0
|
||||
|
||||
// Write operation type (batch)
|
||||
batchHeader[offset] = OpTypeBatch
|
||||
offset++
|
||||
|
||||
// Write sequence number
|
||||
binary.LittleEndian.PutUint64(batchHeader[offset:offset+8], startSeqNum)
|
||||
offset += 8
|
||||
|
||||
// Write entry count
|
||||
binary.LittleEndian.PutUint32(batchHeader[offset:offset+4], uint32(len(entries)))
|
||||
|
||||
// Write the batch header
|
||||
// Create and write the batch header
|
||||
batchHeader := w.serializeBatchHeader(startSeqNum, len(entries))
|
||||
if err := w.writeRawRecord(RecordTypeFull, batchHeader); err != nil {
|
||||
return 0, fmt.Errorf("failed to write batch header: %w", err)
|
||||
}
|
||||
@ -766,7 +759,7 @@ func (w *WAL) AppendBatchWithSequence(entries []*Entry, startSequence uint64) (u
|
||||
}
|
||||
}
|
||||
|
||||
// Write the batch entry to WAL
|
||||
// Write the batch entry to WAL using writeRecord
|
||||
if err := w.writeRecord(RecordTypeFull, OpTypeBatch, startSeqNum, data, nil); err != nil {
|
||||
return 0, fmt.Errorf("failed to write batch with sequence %d: %w", startSeqNum, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user