Skip to content

Commit

Permalink
tidy: cleanups and doc refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
felixangell committed Oct 10, 2024
1 parent ab7d0cc commit 1d7674f
Show file tree
Hide file tree
Showing 12 changed files with 631 additions and 403 deletions.
27 changes: 17 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
# `carnax`

carnax is a "distributed" object-store based write ahead log. It uses Raft to maintain consensus and provide a CP
system.
# Carnax
Carnax is a "distributed" object-store based commit log. It uses Raft to maintain consensus and provide a CP system.

## Summary
Carnax aims to be:

carnax aims to be:

* A kafka-like distributed write ahead log
* A Kafka-inspired distributed commit log
* Built on top of S3 for those sweet juicy guarantees of durability and availability
* Simple and easy to read (the code), improve (the code), and understand (the software)
* Simple and easy to read (the code), improve (the code), and understand/operate (the software)

What you can hopefully use it for:

* Brokering messages to/from places
* Persistent data storage in log-form

Caveats:
Foundational caveats:

* Whilst Carnax is heavily inspired by Kafka, it is not the same, and does not aim to be a slot-in replacement. The log structure, API, and configuration are fairly similar to Kafka however compatibility is not guaranteed for the long term.
* Due to the usage of object-stores, the 'ack' patterns, as well as the balanced optimisation against lower costs & efficiency Carnax has some important trade-offs around latency. In other words, sub-millisecond message throughput is a strong anti-pattern in usage. Seek alternatives such as Redis or Kafka.

**Short-term*** caveats:

* Whilst Carnax is heavily inspired by Kafka it is not the same and does not aim to be a slot-in replacement.
* Carnax is still trying to achieve a 'product-market fit'; things are subject to change.
* The current design is _not yet_ built around long retention periods, i.e. time periods longer than a few weeks or even days of data.
* Although a lot of the index lookups are not currently optimised for this, in the long-term carnax should be fit for long-term data fetching and storage.
* There are some bottlenecks around the usage of Raft for consensus
* It might be for now that, if running carnax, a 'cluster'-per-topic approach would be beneficial to avoid this.

> 💡 In the future as the design matures these caveats can be addressed and optimised for more general use-cases.
## Build

Expand Down
29 changes: 27 additions & 2 deletions api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,38 @@ func (m *CarnaxController) softCommit(id string, clientId string, index uint32,
// OffsetsForTimes ...
// default.api.timeout.ms
func (m *CarnaxController) OffsetsForTimes(id string, clientId string, m2 map[*TopicPartitionHash]*apiv1.SeekIndex) {
// todo: shift into apply...
// todo: shift into apply?

for tph, seekIndex := range m2 {
// for given partition do index seeks by timestamp to an offset.
log.Println(tph.String(), "to", seekIndex)

// soft commit
m.seekOffset(id, clientId, tph, seekIndex)

// soft commit whatever we get back
//m.softCommit(id, clientId, 0, 0)
}
}

func (m *CarnaxController) seekOffset(id string, id2 string, tph *TopicPartitionHash, index *apiv1.SeekIndex) {
commandBytes, err := proto.Marshal(&commandv1.Command{
Type: commandv1.CommandType_COMMAND_TYPE_SEEK_OFFSET,
Command: &commandv1.Command_SeekOffset{
SeekOffset: &commandv1.SeekOffsetCommand{
ConsumerGroupId: id,
ClientId: id2,
TopicPartition: (*apiv1.TopicPartition)(tph),
SeekIndex: index,
},
},
})
if err != nil {
panic(err)
}

res := m.getRaft().Apply(commandBytes, m.config.raftTimeout)
if err := res.Error(); err != nil {
panic(err)
}
res.Response()
}
18 changes: 17 additions & 1 deletion api/controller_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func (f *CarnaxControllerFSM) Apply(l *raft.Log) interface{} {
case commandv1.CommandType_COMMAND_TYPE_SOFT_COMMIT:
sc := cmd.GetSoftCommit()
return f.applySoftCommit(sc.ConsumerGroupId, sc.ClientId, sc.PartitionIndex, sc.Offset)
case commandv1.CommandType_COMMAND_TYPE_SEEK_OFFSET:
so := cmd.GetSeekOffset()
return f.applySeekOffset(so.ConsumerGroupId, so.ClientId, so.TopicPartition, so.SeekIndex)
default:
panic("Command is not handled " + cmd.Type.String())
}
Expand Down Expand Up @@ -176,7 +179,7 @@ func (f *CarnaxControllerFSM) applyWrite(topic string, rec *apiv1.Record, addres
}

offset := address.Offset
segment.activeSegments[address.PartitionIndex].Append(rec, offset)
segment.activeSegments[address.PartitionIndex].CommitRecord(rec, offset)

return &commandv1.WriteMessageCommand_Response{
Address: address,
Expand Down Expand Up @@ -624,3 +627,16 @@ func (f *CarnaxControllerFSM) applySoftCommit(consumerGroupId string, clientId s

return nil
}

/**
Segment metadata exists that covers the range of offsets
this is IN MEMORY, which is the low and high watermark of
offset and timestamp for each segment.
Kafka is able to rebuild internal metadata from index files quickly. Disk writes are cheap here
However when backed by an object-store we should think about an alternative strategy, e.g.
checkpointing.
*/

func (f *CarnaxControllerFSM) applySeekOffset(id string, clientId string, topicPartition *apiv1.TopicPartition, seekIndex *apiv1.SeekIndex) interface{} {
return nil
}
8 changes: 7 additions & 1 deletion api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ func (c *carnaxTestSuite) TestIndexByTimestampToOffset() {
err = c.controller.Flush()
assert.NoError(c.T(), err)

minRaftPropagationSleep()

// setup a consumer,
// seek to offset by timestamp
// poll message
Expand All @@ -570,5 +572,9 @@ func (c *carnaxTestSuite) TestIndexByTimestampToOffset() {
},
})

c.controller.Poll(cg.ConsumerGroupId, cg.ClientId, 15*time.Second)
pollResult, e := c.controller.Poll(cg.ConsumerGroupId, cg.ClientId, 15*time.Second)
assert.NoError(c.T(), e)

first := pollResult.Records[0]
assert.Equal(c.T(), "Record written on day 2", string(first.Payload))
}
39 changes: 24 additions & 15 deletions api/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@ func (s SegmentPath) Valid() bool {
func findLowestSegmentFile(paths []string, offset uint64) string {
log.Println("SEG_LU", offset, "IN", strings.Join(paths, ";"))

pathByOffset := parseSegmentPaths(paths)

left, right := 0, len(pathByOffset)-1
best := uint64(0)

for left <= right {
mid := left + ((right - left) / 2)
if pathByOffset[mid] <= offset {
left = mid + 1
best = pathByOffset[mid]
} else {
right = mid - 1
}
}

return offsetToPath(best)
}

func offsetToPath(best uint64) string {
return fmt.Sprintf("%020d", best)
}

func parseSegmentPaths(paths []string) []uint64 {
offs := make([]uint64, len(paths))

for i, p := range paths {
Expand All @@ -40,19 +63,5 @@ func findLowestSegmentFile(paths []string, offset uint64) string {

offs[i] = v
}

left, right := 0, len(offs)-1
best := uint64(0)

for left <= right {
mid := left + ((right - left) / 2)
if offs[mid] <= offset {
left = mid + 1
best = offs[mid]
} else {
right = mid - 1
}
}

return fmt.Sprintf("%020d", best)
return offs
}
85 changes: 67 additions & 18 deletions api/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ import (
type TopicPartitionSegment struct {
dataLog *bytes.Buffer // .log
offsIndex *bytes.Buffer // .index
// .timeindex TBD
timeIndex *bytes.Buffer

len uint64
start uint64

// watermark of this segment
low, high uint64
low, high uint64
lowTs, highTs int64

bytesSinceLastIndexWrite uint64
config CarnaxConfig

timeIndex *bytes.Buffer
}

func (s *TopicPartitionSegment) Append(rec *apiv1.Record, offset uint64) {
func (s *TopicPartitionSegment) CommitRecord(rec *apiv1.Record, offset uint64) {
/**
Here be dragons. There are some very crucially ordered
steps here to ensure particular guarantees.
*/

// specify watermark for this topic
if offset < s.low {
s.low = offset
Expand All @@ -38,7 +42,9 @@ func (s *TopicPartitionSegment) Append(rec *apiv1.Record, offset uint64) {
// which is the position this message will be written at.
initialWritePosition := s.len

if s.shouldWriteIndex() {
writeIndex := s.shouldWriteIndex()

if writeIndex {
_, err := protodelim.MarshalTo(s.offsIndex, &apiv1.Index{
Offset: offset,
Position: s.len,
Expand All @@ -49,15 +55,7 @@ func (s *TopicPartitionSegment) Append(rec *apiv1.Record, offset uint64) {
s.bytesSinceLastIndexWrite = 0
}

// if should write to time offsIndex...
//s.timeIndex.Write()

recordBytes, err := proto.Marshal(rec)
if err != nil {
panic("Failed to marshal record")
}

// in most cases the metadata will not be set
// In most cases the metadata will not be set
// however, it is permissible, though unsafe, for
// producing clients to mangle this data if absolutely necessary.
if rec.Metadata != nil {
Expand All @@ -72,18 +70,53 @@ func (s *TopicPartitionSegment) Append(rec *apiv1.Record, offset uint64) {
}
}

// defensive measure to ensure checksums are only set once.
// Commit the timestamp watermark
timestamp := rec.Metadata.Timestamp
if rec.Headers.Timestamp != 0 {
// the header timestamp takes precedence
timestamp = rec.Headers.Timestamp
}
if timestamp < s.lowTs {
s.lowTs = timestamp
}
if timestamp > s.highTs {
s.highTs = timestamp
}

// Write a timestamp index log entry if necessary
if writeIndex {
_, err := protodelim.MarshalTo(s.timeIndex, &apiv1.TimeIndex{
Offset: offset,
Timestamp: timestamp,
})
if err != nil {
panic(err)
}
}

// Defensive measure to ensure checksums are only set once.
if rec.Checksum != 0 {
panic("invalid state: checksum must not be set")
}

// Marhsal the record ONCE and generate a checksum
// the ordering here is CRUCIAL! as we have now committed
// metadata + so the checksum would include this.
recordBytes, err := proto.Marshal(rec)
if err != nil {
panic("Failed to marshal record")
}

// Write the final checksum just before we commit to the log.
rec.Checksum = crc32.ChecksumIEEE(recordBytes)

// Commit the record to log
numBytesWritten, err := protodelim.MarshalTo(s.dataLog, rec)
if err != nil {
panic(err)
}

// Vaguely calculates the next offset (it's not gauranteed with "SegmentIncrement")
nextOffs := offset + uint64(numBytesWritten)
if nextOffs > s.high {
s.high = nextOffs
Expand All @@ -96,6 +129,8 @@ func (s *TopicPartitionSegment) Append(rec *apiv1.Record, offset uint64) {
// hm...
type IndexFile []*apiv1.Index

type TimeIndexFile []*apiv1.TimeIndex

func (i IndexFile) Search(offs uint64) *apiv1.Index {
if len(i) == 0 {
return nil
Expand Down Expand Up @@ -126,8 +161,22 @@ func (i IndexFile) Search(offs uint64) *apiv1.Index {
return i[best]
}

func TimeIndexFromBytes(data []byte) TimeIndexFile {
buf := bytes.NewReader(data)

var res TimeIndexFile
for buf.Len() > 0 {
timeIndexEntry := new(apiv1.TimeIndex)
if err := protodelim.UnmarshalFrom(buf, timeIndexEntry); err != nil {
panic(err)
}
res = append(res, timeIndexEntry)
}
return res
}

func IndexFromBytes(data []byte) IndexFile {
buf := bytes.NewBuffer(data)
buf := bytes.NewReader(data)

var res IndexFile

Expand All @@ -152,7 +201,7 @@ func (s *TopicPartitionSegment) Data() []byte {
}

func (s *TopicPartitionSegment) TimeIndex() []byte {
return []byte{}
return s.timeIndex.Bytes()
}

func (s *TopicPartitionSegment) shouldWriteIndex() bool {
Expand Down
Loading

0 comments on commit 1d7674f

Please sign in to comment.