Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async encoding of nodejs Stream #82

Open
misos1 opened this issue Aug 8, 2018 · 12 comments
Open

Async encoding of nodejs Stream #82

misos1 opened this issue Aug 8, 2018 · 12 comments

Comments

@misos1
Copy link

misos1 commented Aug 8, 2018

Should be possible to asynchronously encode object with some values of type Stream and return stream which will emit encoded data continuously.

CBOR supports indefinite byte buffers: https://tools.ietf.org/html/rfc7049#section-2.2.2 so size of input streams does not must be known before writing data.

CBOR supports also indefinite arrays and maps so also input object streams are doable.

@hildjj
Copy link
Owner

hildjj commented Aug 9, 2018

I am open to this. Can you talk more about how you would want the API to look? Would you want to be able to send some other CBOR to prefix or wrap the stream? I assume you would want to set the type to either bytes or text on creation, and to either be able to write buffers or strings to one side, and get cbor bytes out the other, right? would you be willing to have a maximum chunk size? would you mind the stream adding chunk boundaries at will? In string mode, would you want to ensure that only fully-formed UTF8 sequences were written?

I'm imagining a transform stream. In backpressure situations, it's possible that the stream would get called with different boundaries than the writer intended.

@misos1
Copy link
Author

misos1 commented Aug 9, 2018

Simple for example encodeAsync with same parameters as encode function but it will return readable stream.

Would you want to be able to send some other CBOR to prefix or wrap the stream?

I am not sure whether I am understand this.

I assume you would want to set the type to either bytes or text on creation, and to either be able to write buffers or strings to one side, and get cbor bytes out the other, right?

Seems good to me. Maybe with bytes as default.

would you be willing to have a maximum chunk size? would you mind the stream adding chunk boundaries at will?

I think when implementing this using transform stream when encodeAsync finds stream type it should push type 2 with indefinite length (31), then in each _transform it will get chunks of data which can be written to cbor as chunks of type 2 with same lengths as will get _transform. Maybe it can do some optimisations for example if _transform gets too small buffers like of size 1 byte then it could concat them into bigger cbor chunk.

I do not see reason for maximum chunk size. (But as option can be added)

Yes chunk boundaries at will of Stream are ok. For output cbor buffer stream reader or decoder on other side should not mind how is indefinite cbor buffer chunked.

In string mode, would you want to ensure that only fully-formed UTF8 sequences were written?

Yes maybe invalid utf8 written to type 3 should emit error. (As probably does decoding)

@hildjj
Copy link
Owner

hildjj commented Aug 9, 2018

I like the idea of just sticking a ReadableStream in the input. That makes things a lot more straightforward. If you wanted to write [{header}, Stream, {footer}], the footer just gets written after the stream closes. I don't think I'll do any application of Nagle's algorithm to coalesce, but I might need to split things up to avoid buffer stalls. Will do some testing to make sure.

Plan:

  • Add support for writing generators, ReadableStream's and TransformStreams to an Encoder. TODO: async generators.
  • Valid stream modes: binary, string, array, map
  • When the first item is read from the stream, it will set the type of the CBOR stream. A string will set to string mode, a Buffer or ArrayBuffer will set to binary mode. Anything else will set to array mode. If you have marked your stream with the cbor.STREAM_MODE symbol, I'll use that mode instead of detecting.
  • If the stream close happens without any items being read, and no stream type is set, you'll get binary.

@misos1
Copy link
Author

misos1 commented Aug 9, 2018

That makes things a lot more straightforward. If you wanted to write [{header}, Stream, {footer}], the footer just gets written after the stream closes.

I do not think this could be very useful and it can be done also on user side.

I hope I was understood right how I meant it. Input stream should not be possible only as input argument to encode function but also inside nested structure. For example now if I want to send this:

let buffer = cbor.encode({
  a: fs.readFileSync("some big file"),
  b: [{
    c: fs.readFileSync("some other big file") 
  }]
});
send(buffer);

Then I first need to have 2x the size of both files together in memory before sending. My idea was to be possible something like this:

let stream = cbor.encodeAsync({
  a: fs.createReadStream("some big file"),
  b: [{
    c: fs.createReadStream("some other big file") 
  }]
});
send(stream);

And memory footprint is around as small as would be when sending some file as stream directly.

@hildjj
Copy link
Owner

hildjj commented Aug 9, 2018

cbor.Encode is already a Transform stream.

const stream = new cbor.Encoder()
stream.pipe(process.stdout)
stream.end({foo: 1})

@hildjj
Copy link
Owner

hildjj commented Aug 9, 2018

In my proposed approach, you would write:

const e = new cbor.Encoder()
e.pipe(stream)
e.write({
  a: fs.createReadStream("some big file"),
  b: [{
    c: fs.createReadStream("some other big file") 
  }]
})

And you would get the memory usage you expect, along with appropriate backpressure.

@misos1
Copy link
Author

misos1 commented Aug 9, 2018

But fs.createReadStream returns stream. When using buffers this would have 1x the size of both files together in memory but stream support here would avoid that. Lets say I can stream from file system or from two network connections two files directly into cbor stream and sending it to another location without need to first load files into memory.

@hildjj
Copy link
Owner

hildjj commented Aug 10, 2018

I'll put together a quick patch, and we'll see if it solves your problem. If not, I'll ask a bunch more questions until I understand better.

@misos1
Copy link
Author

misos1 commented Aug 10, 2018

I suppose this would require more asynchronous approach during encoding. Here is what I am expecting that should happen in steps:

e.write(
{ // send cbor type map with size 2
  a: // send type string with size 1, send "a"
    stream1, // send type buffer/string/array with indefinite length, start sending
             // chunks as they arrive on stream as buffer/string/any (could be done with
             // stream1.pipe to output through transform stream which will prefix
             // chunks with cbor type+size and encode data), must asynchronously wait
             // until stream1 ends then send break stop code, then continue with key b
  b: // send type string with size 1, send "b"
    [ // send type array size 1
      { // send type map size 1
        c: // send type string with size 1, send "c"
          stream2,  // do same as for stream1 ... async wait for stream2 then after
                    // break stop code continue on key d then key e
        d: 10 // ...
      }
    ],
  e: 20 // ...
})

So stream1, stream2 and output cbor are never present whole in memory (at least if they were not get whole as one chunk). Streams can be nested at any sane depth, inside maps and arrays just like other types.

@AshfordN
Copy link

AshfordN commented Nov 14, 2022

Any update on this? I'm trying to encode File types as ArrayBuffer but File.prototype.arrayBuffer() returns a promise, so there's no way to achieve this, since the entire library appears to be synchronous.
There is currently an encodeAsync() method on the encoder, and while I understand it was not designed with this use case in mind, it seems like a good place to define this functionality, where that function would wait for any asynchronous sub-encoders/streams.

@hildjj
Copy link
Owner

hildjj commented Nov 16, 2022

I presume you can't transform the File instance before calling encode?

@AshfordN
Copy link

Well yes, that's the current work-around. But I was under the impression that the whole point of implementing the encodeCBOR method and the addSemanticType interface, was to streamline encoding and avoid these kinds of ad-hoc transformations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants