Skip to content

Commit

Permalink
bug fix: listpack backlen;
Browse files Browse the repository at this point in the history
optimize listpack parsing speed
  • Loading branch information
HDT3213 committed Jan 25, 2024
1 parent eac5035 commit 83d771e
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 84 deletions.
5 changes: 3 additions & 2 deletions core/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"encoding/binary"
"errors"

"github.com/hdt3213/rdb/model"
)

Expand Down Expand Up @@ -163,11 +164,11 @@ func (dec *Decoder) readListPackHash() (map[string][]byte, *model.ListpackDetail
size := readListPackLength(buf, &cursor)
m := make(map[string][]byte)
for i := 0; i < size; i += 2 {
key, _, err := dec.readListPackEntry(buf, &cursor)
key, err := dec.readListPackEntryAsString(buf, &cursor)
if err != nil {
return nil, nil, err
}
val, _, err := dec.readListPackEntry(buf, &cursor)
val, err := dec.readListPackEntryAsString(buf, &cursor)
if err != nil {
return nil, nil, err
}
Expand Down
175 changes: 105 additions & 70 deletions core/listpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ func (dec *Decoder) readListPack() ([][]byte, []uint32, error) {
entries := make([][]byte, 0, size)
entrySizes := make([]uint32, 0, size)
for i := 0; i < size; i++ {
entry, length, err := dec.readListPackEntry(buf, &cursor)
str, intval, length, err := dec.readListPackEntry(buf, &cursor)
if err != nil {
return nil, nil, err
}
entries = append(entries, entry)
if str == nil {
str = []byte(strconv.FormatInt(intval, 10))
}
entries = append(entries, str)
entrySizes = append(entrySizes, length)
}
return entries, entrySizes, nil
Expand All @@ -37,135 +40,167 @@ func readListPackLength(buf []byte, cursor *int) int {
return size
}

func readVarInt(buf []byte, cursor *int) uint32 {
var v uint32
shift := 0
for *cursor < len(buf) {
x := buf[*cursor]
*cursor++
v |= uint32(x&0x7f) << shift
shift += 7
if x&0x80 == 0 {
break
}
func getBackLen(elementLen uint32) uint32 {
if elementLen <= 127 {
return 1
} else if elementLen < (1<<14)-1 {
return 2
} else if elementLen < (1<<21)-1 {
return 3
} else if elementLen < (1<<28)-1 {
return 4
} else {
return 5
}
return v
}

// readListPackEntry returns: content(string), length, error
func (dec *Decoder) readListPackEntry(buf []byte, cursor *int) ([]byte, uint32, error) {

// readListPackEntry returns: string content, int content, entry length(encoding+content+backlen), error
func (dec *Decoder) readListPackEntry(buf []byte, cursor *int) ([]byte, int64, uint32, error) {
header, err := readByte(buf, cursor)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
var result []byte
var length uint32
switch header >> 6 {
case 0, 1: // 0xxx xxxx -> uint7 [0, 127]
result = []byte(strconv.FormatInt(int64(int8(header)), 10))
length = readVarInt(buf, cursor) // read element length
return result, length, nil
case 2: // 10xx xxxx -> str, len<= 63
case 0, 1: // 0xxxxxxx, uint7
result := int64(int8(header))
var contentLen uint32 = 1
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return nil, result, contentLen + backlen, nil
case 2: // 10xxxxxx + content, string(len<=63)
strLen := int(header & 0x3f)
result, err := readBytes(buf, cursor, strLen)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
length = readVarInt(buf, cursor) // read element length
return result, length, nil
var contentLen = uint32(1 + strLen)
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return result, 0, contentLen + backlen, nil
}
// assert header == 11xx xxxx
// assert header == 11xxxxxx
switch header >> 4 {
case 12, 13: // 110x xxxx -> int13
case 12, 13: // 110xxxxx yyyyyyyy, int13
// see https://github.com/CN-annotation-team/redis7.0-chinese-annotated/blob/fba43c524524cbdb54955a28af228b513420d78d/src/listpack.c#L586
next, err := readByte(buf, cursor)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
val := ((uint(header) & 0x1F) << 8) | uint(next)
if val >= uint(1<<12) {
val = -(8191 - val) - 1 // val is uint, must use -(8191 - val), val - 8191 will cause overflow
}
result = []byte(strconv.FormatInt(int64(val), 10))
length = readVarInt(buf, cursor) // read element length
return result, length, nil
case 14: // 1110 xxxx -> str, type(len) == uint12
result := int64(val)
var contentLen uint32 = 2
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return nil, result, contentLen + backlen, nil
case 14: // 1110xxxx yyyyyyyy + content, string(len < 1<<12)
dec.buffer[0] = header & 0x0f
dec.buffer[1], err = readByte(buf, cursor)
if err != nil {
return nil, 0, 0, err
}
strLen := binary.BigEndian.Uint16(dec.buffer[:2])
result, err := readBytes(buf, cursor, int(strLen))
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
length = readVarInt(buf, cursor) // read element length
return result, length, nil
var contentLen = uint32(2 + strLen)
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return result, 0, contentLen + backlen, nil
}
// assert header == 1111 xxxx
// assert header == 1111xxxx
switch header & 0x0f {
case 0: // 1111 0000 -> str, 4 bytes len
case 0: // 11110000 aaaaaaaa bbbbbbbb cccccccc dddddddd + content, string(len < 1<<32)
var lenBytes []byte
lenBytes, err = readBytes(buf, cursor, 4)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
strLen := int(binary.LittleEndian.Uint32(lenBytes))
result, err := readBytes(buf, cursor, strLen)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
length = readVarInt(buf, cursor) // read element length
return result, length, nil
case 1: // 1111 0001 -> int16
var contentLen = uint32(1 + 4 + strLen)
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return result, 0, contentLen + backlen, nil
case 1: // 11110001 aaaaaaaa bbbbbbbb, int16
var bs []byte
bs, err = readBytes(buf, cursor, 2)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
result = []byte(strconv.FormatInt(int64(int16(binary.LittleEndian.Uint16(bs))), 10))
length = readVarInt(buf, cursor)
return result, length, nil
case 2: // 1111 0010 -> int24
result := int64(int16(binary.LittleEndian.Uint16(bs)))
var contentLen uint32 = 3
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return nil, result, contentLen + backlen, nil
case 2: // 11110010 aaaaaaaa bbbbbbbb cccccccc, int24
var bs []byte
bs, err = readBytes(buf, cursor, 3)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
bs = append([]byte{0}, bs...)
result = []byte(strconv.FormatInt(int64(int32(binary.LittleEndian.Uint32(bs))>>8), 10))
length = readVarInt(buf, cursor)
return result, length, nil
result := int64(int32(binary.LittleEndian.Uint32(bs))>>8)
var contentLen uint32 = 4
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return nil, result, contentLen + backlen, nil
case 3: // 1111 0011 -> int32
var bs []byte
bs, err = readBytes(buf, cursor, 4)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
result = []byte(strconv.FormatInt(int64(int32(binary.LittleEndian.Uint32(bs))), 10))
length = readVarInt(buf, cursor)
return result, length, nil
case 4: // 1111 0100 -> int64
result := int64(int32(binary.LittleEndian.Uint32(bs)))
var contentLen uint32 = 5
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return nil, result, contentLen + backlen, nil
case 4: // 11110100 8Byte -> int64
var bs []byte
bs, err = readBytes(buf, cursor, 8)
if err != nil {
return nil, 0, err
return nil, 0, 0, err
}
result = []byte(strconv.FormatInt(int64(binary.LittleEndian.Uint64(bs)), 10))
length = readVarInt(buf, cursor)
return result, length, nil
case 15: // 1111 1111 -> end
return nil, 0, errors.New("unexpected end")
result := int64(binary.LittleEndian.Uint64(bs))
var contentLen uint32 = 9
backlen := getBackLen(contentLen)
*cursor += int(backlen)
return nil, result, contentLen + backlen, nil
case 15: // 11111111 -> end
return nil, 0, 0, errors.New("unexpected end")
}
return nil, 0, fmt.Errorf("unknown entry header")
return nil, 0, 0, fmt.Errorf("unknown entry header")
}

// readListPackEntryAsString return a string representation of entry
// It means if the entry is a integer, then format it as string
func (dec *Decoder) readListPackEntryAsString(buf []byte, cursor *int) ([]byte, error) {
str, intval, _, err := dec.readListPackEntry(buf, cursor)
if err != nil {
return nil, fmt.Errorf("read from failed: %v", err)
}
if str != nil {
return str, nil
}
str = []byte(strconv.FormatInt(intval, 10))
return str, nil
}

func (dec *Decoder) readListPackEntryAsInt(buf []byte, cursor *int) (int64, error) {
bin, _, err := dec.readListPackEntry(buf, cursor)
str, intval, _, err := dec.readListPackEntry(buf, cursor)
if err != nil {
return 0, fmt.Errorf("read from failed: %v", err)
}
i, err := strconv.ParseInt(string(bin), 10, 64)
if err != nil {
return 0, fmt.Errorf("%s is not a uint", string(bin))
if str != nil {
return 0, fmt.Errorf("%s is not a integer", string(str))
}
return i, nil
return intval, nil
}
5 changes: 3 additions & 2 deletions core/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package core
import (
"encoding/binary"
"fmt"
"github.com/hdt3213/rdb/model"
"math"
"sort"
"strconv"

"github.com/hdt3213/rdb/model"
)

func (dec *Decoder) readSet() ([][]byte, error) {
Expand Down Expand Up @@ -73,7 +74,7 @@ func (dec *Decoder) readListPackSet() ([][]byte, *model.ListpackDetail, error) {
size := readListPackLength(buf, &cursor)
values := make([][]byte, 0, size)
for i := 0; i < size; i += 1 {
member, _, err := dec.readListPackEntry(buf, &cursor)
member, err := dec.readListPackEntryAsString(buf, &cursor)
if err != nil {
return nil, nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions core/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ func (dec *Decoder) readStreamEntryContent(buf []byte, cursor *int, firstId *mod
masterFieldNum := int(fieldNum0)
masterFieldNames := make([]string, masterFieldNum)
for i := 0; i < masterFieldNum; i++ {
name, _, err := dec.readListPackEntry(buf, cursor)
name, err := dec.readListPackEntryAsString(buf, cursor)
if err != nil {
return nil, fmt.Errorf("read field name of stream entry failed: %v", err)
}
masterFieldNames[i] = string(name)
}
// read end flag
if _, _, err = dec.readListPackEntry(buf, cursor); err != nil {
// read lp count of master entry
if _, err = dec.readListPackEntryAsString(buf, cursor); err != nil {
return nil, fmt.Errorf("read fields end flag failed: %v", err)
}

Expand Down Expand Up @@ -191,20 +191,20 @@ func (dec *Decoder) readStreamEntryContent(buf []byte, cursor *int, firstId *mod
if flag&StreamItemFlagSameFields > 0 {
fieldName = masterFieldNames[i]
} else {
fieldNameBin, _, err := dec.readListPackEntry(buf, cursor)
fieldNameBin, err := dec.readListPackEntryAsString(buf, cursor)
if err != nil {
return nil, fmt.Errorf("read stream item field name failed: %v", err)
}
fieldName = unsafeBytes2Str(fieldNameBin)
}
fieldValue, _, err := dec.readListPackEntry(buf, cursor)
fieldValue, err := dec.readListPackEntryAsString(buf, cursor)
if err != nil {
return nil, fmt.Errorf("read stream item field value failed: %v", err)
}
msg.Fields[fieldName] = unsafeBytes2Str(fieldValue)
}
// read end flag
if _, _, err = dec.readListPackEntry(buf, cursor); err != nil {
// read lp count
if _, err = dec.readListPackEntryAsString(buf, cursor); err != nil {
return nil, fmt.Errorf("read fields end flag failed: %v", err)
}
msgs = append(msgs, msg)
Expand Down
7 changes: 4 additions & 3 deletions core/zset.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package core

import (
"github.com/hdt3213/rdb/model"
"strconv"

"github.com/hdt3213/rdb/model"
)

func (dec *Decoder) readZSet(zset2 bool) ([]*model.ZSetEntry, error) {
Expand Down Expand Up @@ -74,11 +75,11 @@ func (dec *Decoder) readListPackZSet() ([]*model.ZSetEntry, *model.ListpackDetai
size := readListPackLength(buf, &cursor)
entries := make([]*model.ZSetEntry, 0, size)
for i := 0; i < size; i += 2 {
member, _, err := dec.readListPackEntry(buf, &cursor)
member, err := dec.readListPackEntryAsString(buf, &cursor)
if err != nil {
return nil, nil, err
}
scoreLiteral, _, err := dec.readListPackEntry(buf, &cursor)
scoreLiteral, err := dec.readListPackEntryAsString(buf, &cursor)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 83d771e

Please sign in to comment.