From 2d311443759c56edeb7467f2443d865cb88f8033 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 7 Jan 2025 09:57:09 -0500 Subject: [PATCH] chore(dataobj): add bitmap encoding This commit adds bitmap encoding, the third and final type of encoding needed for the data object prototype. Bitmap encoding efficiently stores sequences of uint64 values in a combination of RLE runs or bitpacked runs. RLE runs are long sequences of the same value, while bitpacked runs are runs of 8 values packed together into the smallest possible bit width. Bitmap encoding is based off of the RLE encoding format used by Parquet, with some notable changes to facilitate streaming writes: - Our bitmap encoding doesn't use a fixed width for values. Instead, the width is determined upon flushing a bitpacked set. Bitpacked sets of the same width are then combined into a single run. This comes at the cost of an extra byte per bitpacked run. - As values are streamed, the final length of the bitmap isn't included to the bitmap header. Callers can choose to prepend the length by writing the bitmap into a separate buffer and then writing a custom header. Without this, readers must take caution to know the exact number of encoded values to not read past the end of the RLE sequence. This code is unfortunately quite complex. I've tried to add comments for as much as I could, but if there's an easier way to do the bitpacking, I would love to move over to that. --- .../internal/dataset/value_encoding_bitmap.go | 601 ++++++++++++++++++ .../dataset/value_encoding_bitmap_test.go | 265 ++++++++ .../metadata/datasetmd/datasetmd.pb.go | 26 +- .../metadata/datasetmd/datasetmd.proto | 4 + 4 files changed, 886 insertions(+), 10 deletions(-) create mode 100644 pkg/dataobj/internal/dataset/value_encoding_bitmap.go create mode 100644 pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go diff --git a/pkg/dataobj/internal/dataset/value_encoding_bitmap.go b/pkg/dataobj/internal/dataset/value_encoding_bitmap.go new file mode 100644 index 0000000000000..97f09afc61825 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding_bitmap.go @@ -0,0 +1,601 @@ +package dataset + +import ( + "fmt" + "io" + "math/bits" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +func init() { + // Register the encoding so instances of it can be dynamically created. + registerValueEncoding( + datasetmd.VALUE_TYPE_UINT64, + datasetmd.ENCODING_TYPE_BITMAP, + func(w streamio.Writer) valueEncoder { return newBitmapEncoder(w) }, + func(r streamio.Reader) valueDecoder { return newBitmapDecoder(r) }, + ) +} + +const maxRunLength uint64 = 1<<63 - 1 // 2^63-1 + +// bitmapEncoder encodes and decodes bitmaps of unsigned numbers up to 64 bits +// wide. To use bitmap with signed integers, callers should first encode the +// integers using zig-zag encoding to minimize the number of bits needed for +// negative values. +// +// Data is encoded with a hybrid of run-length encoding and bitpacking. Longer +// sequences of the same value are encoded with run-length encoding, while +// shorter sequences are bitpacked when possible. To avoid padding, bitpacking +// is only used when there are a multiple of 8 values to encode. +// +// Bitpacking is done using a dynamic bit width. The bit width is determined by +// the largest value in the set of 8 values. The bit width can be any value +// from 1 to 64, inclusive. +// +// # Format +// +// The bitmap format is a slight modification of the Parquet format to support +// longer runs and support streaming. The EBNF grammar is as follows: +// +// bitmap = run+; +// run = bit_packed_run | rle_run; +// bit_packed_run = bit_packed_header bit_packed_values; +// bit_packed_header = (* uvarint(bit_packed_sets << 7 | bit_width << 1 | 1) *) +// bit_packed_sets = (* value between 1 and 2^57-1, inclusive; each set has 8 elements *) +// bit_width = (* bit size of element in set; value between 1 and 64, inclusive *) +// bit_packed_values = (* least significant bit of each byte to most significant bit of each byte *) +// rle_run = rle_header repeated_value; +// rle_header = (* uvarint(rle_run_len << 1) *) +// rle_run_len = (* value between 1 and 2^63-1, inclusive *) +// repeated_value = (* repeated value encoded as uvarint *) +// +// Where this differs from Parquet: +// +// - We don't use a fixed width for bitpacked values, to allow for streaming +// calls to Encode without knowing the width in advance. Instead, the width +// is determined when flushing a bitpacked set to an internal buffer. +// +// To minimize the overhead of encoding the width dynamically, we store +// each bitpacked set with the smallest amount of bits possible. If two +// sets have different widths, they are flushed as two different runs. +// +// - For simplicity, repeated_value is encoded as uvarint rather than +// flushing the value in its entirety. +// +// - To facilitate streaming, we don't prepend the length of all bytes +// written. Callers may choose to prepend the length. Without the length, +// readers must take caution to not read past the end of the RLE sequence +// by knowing exactly how many values were encoded. +type bitmapEncoder struct { + w streamio.Writer + + // bitmapEncoder is a basic state machine with three states: + // + // READY The default state; no values are being tracked. Appending a new + // value moves to the RLE state. + // + // RLE bitmapEncoder is tracking a run of runValue. Active when + // runLength>0. If a value not matching runValue is appended and + // runLength<8, the state moves to BITPACK. Otherwise, the previous + // run is flushed and a new RLE run is started. + // + // BITPACK bitmapEncoder is tracking a set of values to bitpack. Active when + // setSize>0. Once the set reaches 8 values, the set is flushed and + // the encoder resets to READY. + + runValue uint64 // Value in the current run. + runLength uint64 // Length of the current run. + + set [8]uint64 // Set of values to bitpack. + setSize byte // Current number of elements in set. + + buf *bitpackBuffer // Buffer for multiple runs of bitpacked sets of the same bit width. +} + +// newBitmapEncoder creates a new bitmap encoder that writes encoded numbers to w. +func newBitmapEncoder(w streamio.Writer) *bitmapEncoder { + return &bitmapEncoder{ + w: w, + buf: newBitpackBuffer(), + } +} + +// ValueType returns [datasetmd.VALUE_TYPE_UINT64]. +func (enc *bitmapEncoder) ValueType() datasetmd.ValueType { + return datasetmd.VALUE_TYPE_UINT64 +} + +// EncodingType returns [datasetmd.ENCODING_TYPE_BITMAP]. +func (enc *bitmapEncoder) EncodingType() datasetmd.EncodingType { + return datasetmd.ENCODING_TYPE_BITMAP +} + +// Encode appends a new uint64 value to enc. +// +// When flushing, Encode returns an error if writing to the underlying +// [streamio.Writer] fails. +// +// Call [bitmapEncoder.Flush] to end the current run and flush any remaining +// values. +func (enc *bitmapEncoder) Encode(v Value) error { + if v.Type() != datasetmd.VALUE_TYPE_UINT64 { + return fmt.Errorf("invalid value type %s", v.Type()) + } + uv := v.Uint64() + + switch { + case enc.runLength == 0 && enc.setSize == 0: // READY; start a new run. + enc.runValue = uv + enc.runLength = 1 + return nil + + case enc.runLength > 0 && uv == enc.runValue: // RLE with matching value; continue the run. + enc.runLength++ + + // If we hit the maximum run length, flush immediately. + if enc.runLength == maxRunLength { + return enc.flushRLE() + } + return nil + + case enc.runLength > 0 && uv != enc.runValue: // RLE with different value; the run ended. + // If the run lasted less than 8 values, we switch to the BITPACK state. + if enc.runLength < 8 { + // Copy over the existing run to the set and then add the new value. + enc.setSize = byte(enc.runLength) + for i := byte(0); i < enc.setSize; i++ { + enc.set[i] = enc.runValue + } + + enc.runLength = 0 + enc.set[enc.setSize] = uv + enc.setSize++ + + if enc.setSize == 8 { + return enc.flushBitpacked() + } + return nil + } + + // Otherwise, flush the previous run and start a new one. + if err := enc.flushRLE(); err != nil { + return err + } + + enc.runValue = uv + enc.runLength = 1 + return nil + + case enc.setSize > 0: // BITPACK; add the value to the set. + enc.set[enc.setSize] = uv + enc.setSize++ + + if enc.setSize == 8 { + return enc.flushBitpacked() + } + return nil + + default: + panic("dataset.bitmapEncoder: invalid state") + } +} + +// Flush writes any remaining values to the underlying [streamio.Writer]. +func (enc *bitmapEncoder) Flush() error { + // We always flush using RLE. If bitmapEncoder was in the BITPACK state, we + // don't have 8 values yet; if we did, they would've already been flushed in + // Encode. + return enc.flushRLE() +} + +func (enc *bitmapEncoder) flushRLE() error { + // Flush anything in the bitpack buffer. + if err := enc.buf.Flush(enc.w); err != nil { + return err + } + + switch { + case enc.runLength > 0: + if enc.runLength > maxRunLength { + return fmt.Errorf("run length too large") + } + + // Header. + if err := streamio.WriteUvarint(enc.w, enc.runLength<<1); err != nil { + return err + } + + // Value. + if err := streamio.WriteUvarint(enc.w, enc.runValue); err != nil { + return err + } + + enc.runLength = 0 + enc.runValue = 0 + return nil + + case enc.setSize > 0: + // Cosnume the set as a sequence of runs. + for off := byte(0); off < enc.setSize; { + var ( + val = enc.set[off] + run = byte(1) + ) + + for j := off + 1; j < enc.setSize; j++ { + if enc.set[j] != val { + break + } + run++ + } + + off += run + + // Header. + if err := streamio.WriteUvarint(enc.w, uint64(run<<1)); err != nil { + return err + } + + // Value. + if err := streamio.WriteUvarint(enc.w, val); err != nil { + return err + } + } + + enc.setSize = 0 + return nil + + default: + return nil + } +} + +// flushBitpacked flushes the current bitpacked buffer for accumulating runs. +// If the buffer is full, we flush the buffer immediately to the underlying +// writer. +func (enc *bitmapEncoder) flushBitpacked() error { + if enc.setSize != 8 { + panic("dataset.bitmapEncoder: flushBitpacked called with less than 8 values") + } + + // Detect the bit width of the set. + width := 1 + for i := 0; i < int(enc.setSize); i++ { + if bitLength := bits.Len64(enc.set[i]); bitLength > width { + width = bitLength + } + } + + // Write out the bitpacked values. Bitpacking 8 values of bit width N always + // requires exactly N bytes. + // + // Each value is packed from the least significant bit of each byte to the + // most significant bit, while still retaining the order of bits from the + // original value. + // + // This bitpacking algorithm is challenging to reason about, but is retained + // to align with Parquet's behaviour. I (rfratto) found it easier to reason + // by considering how the output bits map to the input bits. + // + // This means that for width == 3: + // + // index: 0 1 2 3 4 5 6 7 + // dec value: 0 1 2 3 4 5 6 7 + // bit value: 000 001 010 011 100 101 110 111 + // bit label: ABC DEF GHI JKL MNO PQR STU VWX + // + // index: 22111000 54443332 77766655 + // bit value: 10001000 11000110 11111010 + // bit label: HIDEFABC RMNOJKLG VWXSTUPQ + // + // Formatting it as a table better demonstrates the mapping: + // + // Index Labels Input bit Output byte Output bit (for byte) + // ----- ----- --------- ----------- --------------------- + // 0 ABC 2 1 0 0 0 0 2 1 0 + // 1 DEF 5 4 3 0 0 0 5 4 3 + // 2 GHI 8 7 6 1 0 0 0 7 6 + // 3 JKL 11 10 9 1 1 1 3 2 1 + // 4 MNO 14 13 12 1 1 1 6 5 4 + // 5 PQR 17 16 15 2 2 1 1 0 7 + // 6 STU 20 19 18 2 2 2 4 3 2 + // 7 VWX 23 22 21 2 2 2 7 6 5 + // + // So, for any given output bit, its value originates from: + // + // * enc.set index: output_bit/width + // * enc.set element bit: output_bit%width + // + // If there's a much simpler way to understand and do this packing, I'd love + // to know. + buf := make([]byte, 0, width) + + for outputByte := 0; outputByte < width; outputByte++ { + var b byte + + for i := 0; i < 8; i++ { + outputBit := outputByte*8 + i + inputIndex := outputBit / width + inputBit := outputBit % width + + // Set the bit in b. + if enc.set[inputIndex]&(1< 64 { + return fmt.Errorf("invalid width: %d", width) + } + + // Error conditions + switch { + case len(b.data)+len(data) > b.maxBufferSize && b.sets > 0: + return fmt.Errorf("buffer full") + case b.width != width && b.sets > 0: + return fmt.Errorf("width changed") + } + + b.sets++ + b.width = width + b.data = append(b.data, data...) + return nil +} + +// Flush flushes buffered data to w and resets state for more writes. +func (b *bitpackBuffer) Flush(w streamio.Writer) error { + if b.sets == 0 { + return nil + } + + // The header of the bitpacked sequence encodes: + // + // * The number of sets + // * The width of elements in the sets (1-64) + // * A flag indicating the header type + // + // To encode the width in 6 bits, we encode width-1. That reserves the bottom + // 7 bits for metadata, and the remaining 57 bits for the number of sets. + const maxSets = 1<<57 - 1 + + // Validate constraints for safety. + switch { + case b.width < 1 || b.width > 64: + return fmt.Errorf("invalid width: %d", b.width) + case b.sets > maxSets: + // This shouldn't ever happen; 2^57-1 sets, in the best case (width of 1), + // would require our buffer to be 144PB. + return fmt.Errorf("too many sets: %d", b.sets) + } + + // Width can be between 1 and 64. To pack it into 6 bits, we subtract 1 from + // the value. + header := (uint64(b.sets) << 7) | (uint64(b.width-1) << 1) | 1 + if err := streamio.WriteUvarint(w, header); err != nil { + return err + } + + if n, err := w.Write(b.data); err != nil { + return err + } else if n != len(b.data) { + return fmt.Errorf("short write: %d != %d", n, len(b.data)) + } + + b.width = 0 + b.sets = 0 + b.data = b.data[:0] + return nil +} + +// Reset resets the buffer to its initial state without flushing. +func (b *bitpackBuffer) Reset() { + b.width = 0 + b.sets = 0 + b.data = b.data[:0] +} + +// bitmapDecoder decoes uint64s from a bitmap-encoded stream. See the doc +// comment on [bitmapEncoder] for detail on the format. +type bitmapDecoder struct { + r streamio.Reader + + // Like [bitmapEncoder], bitmapDecoder is a basic state machine with four + // states: + // + // READY The default state; the decoder needs to pull a new run + // header and move to RLE or BITPACK-READY. + // + // RLE The decoder is in the middle of an RLE-encoded run. Active + // when runLength>0. runLength should decrease by 1 each time + // Decode is called, returning runValue. + // + // BITPACK-READY The decoder is ready to read a new bitpacked set. Active + // when sets>0 and setSize==0. The decoder needs to pull the + // next set, update setSize and decrement sets by 1. + // + // BITPACK-SET The decoder is in the middle of a bitpacked set. Active + // when setSize>0. setSize decreases by 1 each time Decode is + // called, and the next bitpacked value in the set is + // returned. + // + // bitmapDecoder always starts in the READY state, and the header it pulls + // determines whether it moves to RLE or BITPACK-READY. After fully consuming + // a run, it reverts back to READY. + + runValue uint64 // Value of the current RLE run. + runLength uint64 // Remaining values in the current RLE run. + + sets int // Number of bitpacked sets left to read, each of which contains 8 elements. + setWidth int // Number of bits to use for each value. Must be no greater than 64. + setSize byte // Number of values left in the current bitpacked set. + set []byte // Current set of bitpacked values. +} + +// newBitmapDecoder creates a new bitmap decoder that reads encoded numbers +// from r. +func newBitmapDecoder(r streamio.Reader) *bitmapDecoder { + return &bitmapDecoder{r: r} +} + +// ValueType returns [datasetmd.VALUE_TYPE_UINT64]. +func (dec *bitmapDecoder) ValueType() datasetmd.ValueType { + return datasetmd.VALUE_TYPE_UINT64 +} + +// EncodingType returns [datasetmd.ENCODING_TYPE_BITMAP]. +func (dec *bitmapDecoder) EncodingType() datasetmd.EncodingType { + return datasetmd.ENCODING_TYPE_BITMAP +} + +// Decode reads the next uint64 value from the stream. +func (dec *bitmapDecoder) Decode() (Value, error) { + // See comment inside [bitmapDecoder] for the state machine details. + +NextState: + switch { + case dec.runLength == 0 && dec.sets == 0 && dec.setSize == 0: // READY + if err := dec.readHeader(); err != nil { + return Uint64Value(0), fmt.Errorf("reading header: %w", err) + } + goto NextState + + case dec.runLength > 0: // RLE + dec.runLength-- + return Uint64Value(dec.runValue), nil + + case dec.sets > 0 && dec.setSize == 0: // BITPACK-READY + if err := dec.nextBitpackSet(); err != nil { + return Uint64Value(0), fmt.Errorf("reading bitpacked set: %w", err) + } + goto NextState + + case dec.setSize > 0: // BITPACK-SET + elem := 8 - dec.setSize + + var val uint64 + for b := 0; b < dec.setWidth; b++ { + // Read bit b of element index i, where i is byte i*8/width. + i := (int(elem)*dec.setWidth + b) / 8 + offset := (int(elem)*dec.setWidth + b) % 8 + bitValue := dec.set[i] & (1 << offset) >> offset + + val |= uint64(bitValue) << b + } + + dec.setSize-- + return Uint64Value(val), nil + + default: + panic("dataset.bitmapDecoder: invalid state") + } +} + +// readHeader reads the next header from the stream +func (dec *bitmapDecoder) readHeader() error { + // Ready the next uvarint. + header, err := streamio.ReadUvarint(dec.r) + if err != nil { + return err + } + + if header&1 == 1 { + // Start of a bitpacked set. + dec.sets = int(header >> 7) + dec.setWidth = int((header>>1)&0x3f) + 1 + dec.setSize = 0 // Sets will be loaded in [bitmapDecoder.nextBitpackSet]. + dec.set = make([]byte, dec.setWidth) + } else { + // RLE run. + runLength := header >> 1 + + val, err := streamio.ReadUvarint(dec.r) + if err != nil { + return err + } + + dec.runLength = runLength + dec.runValue = val + } + + return nil +} + +// nextBitpackSet loads the next bitpack set and decrements the sets counter. +func (dec *bitmapDecoder) nextBitpackSet() error { + if dec.sets == 0 { + return fmt.Errorf("no bitpacked sets left") + } + + // dec.set is allocated in [bitmapDecoder.readHeader]. + if _, err := io.ReadFull(dec.r, dec.set); err != nil { + return err + } + + dec.setSize = 8 // Always 8 elements in each set. + dec.sets-- + return nil +} + +// Reset resets dec to read from r. +func (dec *bitmapDecoder) Reset(r streamio.Reader) { + dec.r = r + dec.runValue = 0 + dec.runLength = 0 + dec.sets = 0 + dec.setWidth = 0 + dec.setSize = 0 + dec.set = nil +} diff --git a/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go b/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go new file mode 100644 index 0000000000000..34264b0ea4867 --- /dev/null +++ b/pkg/dataobj/internal/dataset/value_encoding_bitmap_test.go @@ -0,0 +1,265 @@ +package dataset + +import ( + "bytes" + "math" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_bitmap(t *testing.T) { + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + count := 1500 + for i := 0; i < count; i++ { + require.NoError(t, enc.Encode(Uint64Value(uint64(1)))) + } + require.NoError(t, enc.Flush()) + + t.Logf("Buffer size: %d", buf.Len()) + + for range count { + v, err := dec.Decode() + require.NoError(t, err) + require.Equal(t, uint64(1), v.Uint64()) + } +} + +func Test_bitmap_bitpacking(t *testing.T) { + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + expect := []uint64{0, 1, 2, 3, 4, 5, 6, 7} + for _, v := range expect { + require.NoError(t, enc.Encode(Uint64Value(v))) + } + require.NoError(t, enc.Flush()) + + var actual []uint64 + for range len(expect) { + v, err := dec.Decode() + require.NoError(t, err) + actual = append(actual, v.Uint64()) + } + require.NoError(t, enc.Flush()) + + require.Equal(t, expect, actual) +} + +func Test_bitmap_bitpacking_partial(t *testing.T) { + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + expect := []uint64{0, 1, 2, 3, 4} + for _, v := range expect { + require.NoError(t, enc.Encode(Uint64Value(v))) + } + require.NoError(t, enc.Flush()) + + var actual []uint64 + for range len(expect) { + v, err := dec.Decode() + require.NoError(t, err) + actual = append(actual, v.Uint64()) + } + + require.Equal(t, expect, actual) +} + +func Fuzz_bitmap(f *testing.F) { + f.Add(int64(775972800), 1, 10) + f.Add(int64(758350800), 8, 25) + f.Add(int64(1718425412), 32, 50) + f.Add(int64(1734130411), 64, 75) + + f.Fuzz(func(t *testing.T, seed int64, width int, count int) { + if width < 1 || width > 64 { + t.Skip() + } else if count <= 0 { + t.Skip() + } + + rnd := rand.New(rand.NewSource(seed)) + + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + var numbers []uint64 + for i := 0; i < count; i++ { + var mask uint64 = math.MaxUint64 + if width < 64 { + mask = (1 << width) - 1 + } + + v := uint64(rnd.Int63()) & mask + numbers = append(numbers, v) + require.NoError(t, enc.Encode(Uint64Value(v))) + } + require.NoError(t, enc.Flush()) + + var actual []uint64 + for i := 0; i < count; i++ { + v, err := dec.Decode() + require.NoError(t, err) + actual = append(actual, v.Uint64()) + } + + require.Equal(t, numbers, actual) + }) +} + +func Benchmark_bitmapEncoder(b *testing.B) { + b.Run("width=1", func(b *testing.B) { benchmarkBitmapEncoder(b, 1) }) + b.Run("width=3", func(b *testing.B) { benchmarkBitmapEncoder(b, 3) }) + b.Run("width=5", func(b *testing.B) { benchmarkBitmapEncoder(b, 5) }) + b.Run("width=8", func(b *testing.B) { benchmarkBitmapEncoder(b, 8) }) + b.Run("width=32", func(b *testing.B) { benchmarkBitmapEncoder(b, 32) }) + b.Run("width=64", func(b *testing.B) { benchmarkBitmapEncoder(b, 64) }) +} + +func benchmarkBitmapEncoder(b *testing.B, width int) { + b.Run("variance=none", func(b *testing.B) { + var cw countingWriter + enc := newBitmapEncoder(&cw) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Uint64Value(1)) + } + _ = enc.Flush() + + b.ReportMetric(float64(cw.n), "encoded_bytes") + }) + + b.Run("variance=alternating", func(b *testing.B) { + var cw countingWriter + enc := newBitmapEncoder(&cw) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = enc.Encode(Uint64Value(uint64(i % width))) + } + _ = enc.Flush() + + b.ReportMetric(float64(cw.n), "encoded_bytes") + }) + + b.Run("variance=random", func(b *testing.B) { + rnd := rand.New(rand.NewSource(0)) + + var cw countingWriter + enc := newBitmapEncoder(&cw) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = enc.Encode(Uint64Value(uint64(rnd.Int63()) % uint64(width))) + } + _ = enc.Flush() + + b.ReportMetric(float64(cw.n), "encoded_bytes") + }) +} + +func Benchmark_bitmapDecoder(b *testing.B) { + b.Run("width=1", func(b *testing.B) { benchmarkBitmapDecoder(b, 1) }) + b.Run("width=3", func(b *testing.B) { benchmarkBitmapDecoder(b, 3) }) + b.Run("width=5", func(b *testing.B) { benchmarkBitmapDecoder(b, 5) }) + b.Run("width=8", func(b *testing.B) { benchmarkBitmapDecoder(b, 8) }) + b.Run("width=32", func(b *testing.B) { benchmarkBitmapDecoder(b, 32) }) + b.Run("width=64", func(b *testing.B) { benchmarkBitmapDecoder(b, 64) }) +} + +func benchmarkBitmapDecoder(b *testing.B, width int) { + b.Run("variance=none", func(b *testing.B) { + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Uint64Value(1)) + } + _ = enc.Flush() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = dec.Decode() + } + }) + + b.Run("variance=alternating", func(b *testing.B) { + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Uint64Value(uint64(i % width))) + } + _ = enc.Flush() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = dec.Decode() + } + }) + + b.Run("variance=random", func(b *testing.B) { + rnd := rand.New(rand.NewSource(0)) + var buf bytes.Buffer + + var ( + enc = newBitmapEncoder(&buf) + dec = newBitmapDecoder(&buf) + ) + + for i := 0; i < b.N; i++ { + _ = enc.Encode(Uint64Value(uint64(rnd.Int63()) % uint64(width))) + } + _ = enc.Flush() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = dec.Decode() + } + }) +} + +type countingWriter struct { + n int64 +} + +func (w *countingWriter) Write(p []byte) (n int, err error) { + n = len(p) + w.n += int64(n) + return +} + +func (w *countingWriter) WriteByte(_ byte) error { + w.n++ + return nil +} diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go index a395f51a24713..52f2a35c53e9d 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go @@ -65,18 +65,23 @@ const ( // Delta encoding. The first value within the page is stored as-is, and // subsequent values are stored as the delta from the previous value. ENCODING_TYPE_DELTA EncodingType = 2 + // Bitmap encoding. Bitmaps effiently store repeating sequences of unsigned + // integers using a combination of run-length encoding and bitpacking. + ENCODING_TYPE_BITMAP EncodingType = 3 ) var EncodingType_name = map[int32]string{ 0: "ENCODING_TYPE_UNSPECIFIED", 1: "ENCODING_TYPE_PLAIN", 2: "ENCODING_TYPE_DELTA", + 3: "ENCODING_TYPE_BITMAP", } var EncodingType_value = map[string]int32{ "ENCODING_TYPE_UNSPECIFIED": 0, "ENCODING_TYPE_PLAIN": 1, "ENCODING_TYPE_DELTA": 2, + "ENCODING_TYPE_BITMAP": 3, } func (EncodingType) EnumDescriptor() ([]byte, []int) { @@ -93,7 +98,7 @@ func init() { } var fileDescriptor_7ab9d5b21b743868 = []byte{ - // 296 bytes of a gzipped FileDescriptorProto + // 310 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x2f, 0xc8, 0x4e, 0xd7, 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, 0xcf, 0x4d, 0x2d, 0x49, 0x04, 0x09, 0x82, 0x65, 0x8a, 0x53, 0x4b, 0x72, 0x53, 0x10, 0x2c, @@ -102,17 +107,18 @@ var fileDescriptor_7ab9d5b21b743868 = []byte{ 0x21, 0x29, 0x2e, 0xb1, 0x30, 0x47, 0x9f, 0x50, 0xd7, 0xf8, 0x90, 0xc8, 0x00, 0xd7, 0xf8, 0x50, 0xbf, 0xe0, 0x00, 0x57, 0x67, 0x4f, 0x37, 0x4f, 0x57, 0x17, 0x01, 0x06, 0x21, 0x11, 0x2e, 0x01, 0x24, 0x39, 0x4f, 0xbf, 0x10, 0x33, 0x13, 0x01, 0x46, 0x21, 0x51, 0x2e, 0x41, 0x64, 0x1d, 0x10, - 0x61, 0x26, 0x34, 0xe1, 0xe0, 0x90, 0x20, 0x4f, 0x3f, 0x77, 0x01, 0x66, 0xad, 0x78, 0x2e, 0x1e, + 0x61, 0x26, 0x34, 0xe1, 0xe0, 0x90, 0x20, 0x4f, 0x3f, 0x77, 0x01, 0x66, 0xad, 0x4a, 0x2e, 0x1e, 0xd7, 0xbc, 0xe4, 0xfc, 0x94, 0xcc, 0xbc, 0x74, 0xb0, 0x7d, 0xb2, 0x5c, 0x92, 0xae, 0x7e, 0xce, 0xfe, 0x2e, 0x9e, 0x7e, 0xee, 0xd8, 0xac, 0x14, 0xe7, 0x12, 0x46, 0x95, 0x0e, 0xf0, 0x71, 0xf4, - 0xf4, 0x13, 0x60, 0xc4, 0x94, 0x70, 0x71, 0xf5, 0x09, 0x71, 0x14, 0x60, 0x72, 0xaa, 0xb8, 0xf0, - 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, - 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, - 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, - 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0xe5, 0x94, 0x9e, 0x59, 0x92, 0x51, 0x9a, 0xa4, 0x97, 0x9c, - 0x9f, 0xab, 0x9f, 0x5e, 0x94, 0x98, 0x96, 0x98, 0x97, 0xa8, 0x9f, 0x93, 0x9f, 0x9d, 0xa9, 0x5f, - 0x66, 0xac, 0x4f, 0x64, 0xa0, 0x27, 0xb1, 0x81, 0xc3, 0xda, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, - 0x03, 0x3b, 0xd9, 0x58, 0xa6, 0x01, 0x00, 0x00, + 0xf4, 0x13, 0x60, 0xc4, 0x94, 0x70, 0x71, 0xf5, 0x09, 0x71, 0x14, 0x60, 0x12, 0x92, 0xe0, 0x12, + 0x41, 0x95, 0x70, 0xf2, 0x0c, 0xf1, 0x75, 0x0c, 0x10, 0x60, 0x76, 0xaa, 0xb8, 0xf0, 0x50, 0x8e, + 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, 0x78, 0x24, + 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0xbe, 0x78, + 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, + 0x78, 0x2c, 0xc7, 0x10, 0xe5, 0x94, 0x9e, 0x59, 0x92, 0x51, 0x9a, 0xa4, 0x97, 0x9c, 0x9f, 0xab, + 0x9f, 0x5e, 0x94, 0x98, 0x96, 0x98, 0x97, 0xa8, 0x9f, 0x93, 0x9f, 0x9d, 0xa9, 0x5f, 0x66, 0xac, + 0x4f, 0x64, 0x74, 0x24, 0xb1, 0x81, 0x63, 0xc1, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xee, 0xfd, + 0x13, 0xfe, 0xc0, 0x01, 0x00, 0x00, } func (x ValueType) String() string { diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto index eeb300d03da49..478f74037871f 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto @@ -32,4 +32,8 @@ enum EncodingType { // Delta encoding. The first value within the page is stored as-is, and // subsequent values are stored as the delta from the previous value. ENCODING_TYPE_DELTA = 2; + + // Bitmap encoding. Bitmaps effiently store repeating sequences of unsigned + // integers using a combination of run-length encoding and bitpacking. + ENCODING_TYPE_BITMAP = 3; }