Skip to content

Commit

Permalink
chore(dataobj): initial commit of value encoding (#15606)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto authored Jan 7, 2025
1 parent 7033091 commit 9a21590
Show file tree
Hide file tree
Showing 11 changed files with 968 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/dataobj/internal/dataset/dataset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package dataset contains utilities for working with datasets. Datasets hold
// columnar data across multiple pages.
package dataset
125 changes: 125 additions & 0 deletions pkg/dataobj/internal/dataset/value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package dataset

import (
"fmt"
"unsafe"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)

// Helper types
type (
stringptr *byte
)

// A Value represents a single value within a dataset. Unlike [any], Values can
// be constructed without allocations. The zero Value corresponds to nil.
type Value struct {
// The internal representation of Value is based on log/slog.Value, which is
// also designed to avoid allocations.
//
// While usage of any typically causes an allocation (due to any being a fat
// pointer), our usage avoids it:
//
// * Go will avoid allocating integer values that can be stored in a single
// byte, which applies to datasetmd.ValueType.
//
// * If any is referring to a pointer, then wrapping the poitner in an any
// does not cause an allocation. This is why we use stringptr instead of a
// string.

_ [0]func() // Disallow equality checking of two Values

// num holds the value for numeric types, or the string length for string
// types.
num uint64

// If any is of type [datasetmd.ValueType], then the value is in num as
// described above.
//
// If any is of type stringptr, then the value is of type
// [datasetmd.VALUE_TYPE_STRING] and the string value consists of the length
// in num and the pointer in any.
any any
}

// Int64Value rerturns a [Value] for an int64.
func Int64Value(v int64) Value {
return Value{
num: uint64(v),
any: datasetmd.VALUE_TYPE_INT64,
}
}

// Uint64Value returns a [Value] for a uint64.
func Uint64Value(v uint64) Value {
return Value{
num: v,
any: datasetmd.VALUE_TYPE_UINT64,
}
}

// StringValue returns a [Value] for a string.
func StringValue(v string) Value {
return Value{
num: uint64(len(v)),
any: (stringptr)(unsafe.StringData(v)),
}
}

// IsNil returns whether v is nil.
func (v Value) IsNil() bool {
return v.any == nil
}

// IsZero reports whether v is the zero value.
func (v Value) IsZero() bool {
// If Value is a numeric type, v.num == 0 checks if it's the zero value. For
// string types, v.num == 0 means the string is empty.
return v.num == 0
}

// Type returns the [datasetmd.ValueType] of v. If v is nil, Type returns
// [datasetmd.VALUE_TYPE_UNSPECIFIED].
func (v Value) Type() datasetmd.ValueType {
if v.IsNil() {
return datasetmd.VALUE_TYPE_UNSPECIFIED
}

switch v := v.any.(type) {
case datasetmd.ValueType:
return v
case stringptr:
return datasetmd.VALUE_TYPE_STRING
default:
panic(fmt.Sprintf("dataset.Value has unexpected type %T", v))
}
}

// Int64 returns v's value as an int64. It panics if v is not a
// [datasetmd.VALUE_TYPE_INT64].
func (v Value) Int64() int64 {
if expect, actual := datasetmd.VALUE_TYPE_INT64, v.Type(); expect != actual {
panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect))
}
return int64(v.num)
}

// Uint64 returns v's value as a uint64. It panics if v is not a
// [datasetmd.VALUE_TYPE_UINT64].
func (v Value) Uint64() uint64 {
if expect, actual := datasetmd.VALUE_TYPE_UINT64, v.Type(); expect != actual {
panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect))
}
return v.num
}

// String returns v's value as a string. Because of Go's String method
// convention, if v is not a string, String returns a string of the form
// "VALUE_TYPE_T", where T is the underlying type of v.
func (v Value) String() string {
if sp, ok := v.any.(stringptr); ok {
return unsafe.String(sp, v.num)
}
return v.Type().String()
}
123 changes: 123 additions & 0 deletions pkg/dataobj/internal/dataset/value_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package dataset

import (
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)

// A valueEncoder encodes sequences of [Value], writing them to an underlying
// [streamio.Writer]. Implementations of encoding types must call
// registerValueEncoding to register themselves.
type valueEncoder interface {
// ValueType returns the type of values supported by the valueEncoder.
ValueType() datasetmd.ValueType

// EncodingType returns the encoding type used by the valueEncoder.
EncodingType() datasetmd.EncodingType

// Encode encodes an individual [Value]. Encode returns an error if encoding
// fails or if value is an unsupported type.
Encode(value Value) error

// Flush encodes any buffered data and immediately writes it to the
// underlying [streamio.Writer]. Flush returns an error if encoding fails.
Flush() error

// Reset discards any state and resets the valueEncoder to write to w. This
// permits reusing a valueEncoder rather than allocating a new one.
Reset(w streamio.Writer)
}

// A valueDecoder decodes sequences of [Value] from an underlying
// [streamio.Reader]. Implementations of encoding types must call
// registerValueEncoding to register themselves.
type valueDecoder interface {
// ValueType returns the type of values supported by the valueDecoder.
ValueType() datasetmd.ValueType

// EncodingType returns the encoding type used by the valueDecoder.
EncodingType() datasetmd.EncodingType

// Decode decodes an individual [Value]. Decode returns an error if decoding
// fails.
Decode() (Value, error)

// Reset discards any state and resets the valueDecoder to read from r. This
// permits reusing a valueDecoder rather than allocating a new one.
Reset(r streamio.Reader)
}

// registry stores known value encoders and decoders. We use a global variable
// to track implementations to allow encoding implementations to be
// self-contained in a single file.
var registry = map[registryKey]registryEntry{}

type (
registryKey struct {
Value datasetmd.ValueType
Encoding datasetmd.EncodingType
}

registryEntry struct {
NewEncoder func(streamio.Writer) valueEncoder
NewDecoder func(streamio.Reader) valueDecoder
}
)

// registerValueEncoding registers a [valueEncoder] and [valueDecoder] for a
// specified valueType and encodingType tuple. If another encoding has been
// registered for the same tuple, registerValueEncoding panics.
//
// registerValueEncoding should be called in an init method of files
// implementing encodings.
func registerValueEncoding(
valueType datasetmd.ValueType,
encodingType datasetmd.EncodingType,
newEncoder func(streamio.Writer) valueEncoder,
newDecoder func(streamio.Reader) valueDecoder,
) {
key := registryKey{
Value: valueType,
Encoding: encodingType,
}
if _, exist := registry[key]; exist {
panic(fmt.Sprintf("dataset: registerValueEncoding already called for %s/%s", valueType, encodingType))
}

registry[key] = registryEntry{
NewEncoder: newEncoder,
NewDecoder: newDecoder,
}
}

// newValueEncoder creates a new valueEncoder for the specified valueType and
// encodingType. If no encoding is registered for the specified combination of
// valueType and encodingType, newValueEncoder returns nil and false.
func newValueEncoder(valueType datasetmd.ValueType, encodingType datasetmd.EncodingType, w streamio.Writer) (valueEncoder, bool) {
key := registryKey{
Value: valueType,
Encoding: encodingType,
}
entry, exist := registry[key]
if !exist {
return nil, false
}
return entry.NewEncoder(w), true
}

// newValueDecoder creates a new valueDecoder for the specified valueType and
// encodingType. If no encoding is registered for the specified combination of
// valueType and encodingType, vewValueDecoder returns nil and false.
func newValueDecoder(valueType datasetmd.ValueType, encodingType datasetmd.EncodingType, r streamio.Reader) (valueDecoder, bool) {
key := registryKey{
Value: valueType,
Encoding: encodingType,
}
entry, exist := registry[key]
if !exist {
return nil, false
}
return entry.NewDecoder(r), true
}
110 changes: 110 additions & 0 deletions pkg/dataobj/internal/dataset/value_encoding_delta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dataset

import (
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)

func init() {
// Register the encoding so instances of it can be dynamically created.
registerValueEncoding(
datasetmd.VALUE_TYPE_INT64,
datasetmd.ENCODING_TYPE_DELTA,
func(w streamio.Writer) valueEncoder { return newDeltaEncoder(w) },
func(r streamio.Reader) valueDecoder { return newDeltaDecoder(r) },
)
}

// deltaEncoder encodes delta-encoded int64s. Values are encoded as varint,
// with each subsequent value being the delta from the previous value.
type deltaEncoder struct {
w streamio.Writer
prev int64
}

var _ valueEncoder = (*deltaEncoder)(nil)

// newDeltaEncoder creates a deltaEncoder that writes encoded numbers to w.
func newDeltaEncoder(w streamio.Writer) *deltaEncoder {
var enc deltaEncoder
enc.Reset(w)
return &enc
}

// ValueType returns [datasetmd.VALUE_TYPE_INT64].
func (enc *deltaEncoder) ValueType() datasetmd.ValueType {
return datasetmd.VALUE_TYPE_INT64
}

// EncodingType returns [datasetmd.ENCODING_TYPE_DELTA].
func (enc *deltaEncoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_DELTA
}

// Encode encodes a new value.
func (enc *deltaEncoder) Encode(v Value) error {
if v.Type() != datasetmd.VALUE_TYPE_INT64 {
return fmt.Errorf("delta: invalid value type %v", v.Type())
}
iv := v.Int64()

delta := iv - enc.prev
enc.prev = iv
return streamio.WriteVarint(enc.w, delta)
}

// Flush implements [valueEncoder]. It is a no-op for deltaEncoder.
func (enc *deltaEncoder) Flush() error {
return nil
}

// Reset resets the encoder to its initial state.
func (enc *deltaEncoder) Reset(w streamio.Writer) {
enc.prev = 0
enc.w = w
}

// deltaDecoder decodes delta-encoded numbers. Values are decoded as varint,
// with each subsequent value being the delta from the previous value.
type deltaDecoder struct {
r streamio.Reader
prev int64
}

var _ valueDecoder = (*deltaDecoder)(nil)

// newDeltaDecoder creates a deltaDecoder that reads encoded numbers from r.
func newDeltaDecoder(r streamio.Reader) *deltaDecoder {
var dec deltaDecoder
dec.Reset(r)
return &dec
}

// ValueType returns [datasetmd.VALUE_TYPE_INT64].
func (dec *deltaDecoder) ValueType() datasetmd.ValueType {
return datasetmd.VALUE_TYPE_INT64
}

// Type returns [datasetmd.ENCODING_TYPE_DELTA].
func (dec *deltaDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_DELTA
}

// Decode decodes the next value.
func (dec *deltaDecoder) Decode() (Value, error) {
delta, err := streamio.ReadVarint(dec.r)
if err != nil {
return Int64Value(dec.prev), err
}

dec.prev += delta
return Int64Value(dec.prev), nil
}

// Reset resets the deltaDecoder to its initial state.
func (dec *deltaDecoder) Reset(r streamio.Reader) {
dec.prev = 0
dec.r = r
}
Loading

0 comments on commit 9a21590

Please sign in to comment.