Skip to content

Commit

Permalink
fix stream parser on long bytes (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanweixiao authored Aug 20, 2021
1 parent bcc7d6d commit ea3508c
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package y3

import (
"bytes"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -34,33 +33,52 @@ func ReadPacket(reader io.Reader) ([]byte, error) {
break
}
}

// parse to y3.Length
var len int32
var length int32
codec := encoding.VarCodec{}
err = codec.DecodePVarInt32(lenbuf.Bytes(), &len)
err = codec.DecodePVarInt32(lenbuf.Bytes(), &length)
if err != nil {
return nil, err
}

// validate len decoded from stream
if len < 0 {
return nil, fmt.Errorf("y3.ReadPacket() get lenbuf=(%# x), decode len=(%v)", lenbuf.Bytes(), len)
if length < 0 {
return nil, fmt.Errorf("y3.ReadPacket() get lenbuf=(%# x), decode len=(%v)", lenbuf.Bytes(), length)
}

// write y3.Length bytes
buf.Write(lenbuf.Bytes())

// read next {len} bytes as y3.Value
valbuf := make([]byte, len)
p, err := reader.Read(valbuf)
if err != nil {
return nil, err
valbuf := bytes.Buffer{}

// every batch read 512 bytes, if next reads < 512, read
var count int
for {
batchReadSize := 512
var tmpbuf = []byte{}
if int(length)-count < batchReadSize {
tmpbuf = make([]byte, int(length)-count)
} else {
tmpbuf = make([]byte, batchReadSize)
}
p, err := reader.Read(tmpbuf)
count += p
if err != nil {
return nil, fmt.Errorf("y3 parse valbuf error: %v", err)
}
valbuf.Write(tmpbuf[:p])
if count == int(length) {
break
}
}
if p < int(len) {
return nil, errors.New("[y3] p should == len when getting y3 value buffer")

if count < int(length) {
return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count)
}
// write y3.Value bytes
buf.Write(valbuf)
buf.Write(valbuf.Bytes())

return buf.Bytes(), nil
}
Expand Down

0 comments on commit ea3508c

Please sign in to comment.