Skip to content

Commit

Permalink
fix: migrate to streamx
Browse files Browse the repository at this point in the history
  • Loading branch information
ThaUnknown committed Jul 4, 2022
1 parent f1a492d commit fa3ac11
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 77 deletions.
125 changes: 53 additions & 72 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const debug = require('debug')('simple-peer')
const getBrowserRTC = require('get-browser-rtc')
const randombytes = require('randombytes')
const stream = require('readable-stream')
const { Duplex } = require('streamx')
const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported
const errCode = require('err-code')
const { Buffer } = require('buffer')
Expand All @@ -25,7 +25,7 @@ function warn (message) {
* Duplex stream.
* @param {Object} opts
*/
class Peer extends stream.Duplex {
class Peer extends Duplex {
constructor (opts) {
opts = Object.assign({
allowHalfOpen: false
Expand All @@ -52,8 +52,8 @@ class Peer extends stream.Duplex {
this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false
this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT

this.destroyed = false
this.destroying = false
this._destroyed = false
this._destroying = false
this._connected = false

this.remoteAddress = undefined
Expand Down Expand Up @@ -180,8 +180,8 @@ class Peer extends stream.Duplex {
}

signal (data) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED')
if (typeof data === 'string') {
try {
data = JSON.parse(data)
Expand Down Expand Up @@ -209,7 +209,7 @@ class Peer extends stream.Duplex {
if (data.sdp) {
this._pc.setRemoteDescription(new (this._wrtc.RTCSessionDescription)(data))
.then(() => {
if (this.destroyed) return
if (this._destroyed) return

this._pendingCandidates.forEach(candidate => {
this._addIceCandidate(candidate)
Expand Down Expand Up @@ -244,8 +244,8 @@ class Peer extends stream.Duplex {
* @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob} chunk
*/
send (chunk) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED')
this._channel.send(chunk)
}

Expand All @@ -255,8 +255,8 @@ class Peer extends stream.Duplex {
* @param {Object} init
*/
addTransceiver (kind, init) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTransceiver()')

if (this.initiator) {
Expand All @@ -279,8 +279,8 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addStream (stream) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addStream()')

stream.getTracks().forEach(track => {
Expand All @@ -294,8 +294,8 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addTrack (track, stream) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTrack()')

const submap = this._senderMap.get(track) || new Map() // nested Maps map [track, stream] to sender
Expand All @@ -319,8 +319,8 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
replaceTrack (oldTrack, newTrack, stream) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('replaceTrack()')

const submap = this._senderMap.get(oldTrack)
Expand All @@ -343,8 +343,8 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeTrack (track, stream) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSender()')

const submap = this._senderMap.get(track)
Expand All @@ -370,8 +370,8 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeStream (stream) {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSenders()')

stream.getTracks().forEach(track => {
Expand All @@ -396,8 +396,8 @@ class Peer extends stream.Duplex {
}

negotiate () {
if (this.destroying) return
if (this.destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED')
if (this._destroying) return
if (this._destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED')

if (this.initiator) {
if (this._isNegotiating) {
Expand All @@ -424,29 +424,16 @@ class Peer extends stream.Duplex {
this._isNegotiating = true
}

// TODO: Delete this method once readable-stream is updated to contain a default
// implementation of destroy() that automatically calls _destroy()
// See: https://github.com/nodejs/readable-stream/issues/283
destroy (err) {
this._destroy(err, () => {})
}

_destroy (err, cb) {
if (this.destroyed || this.destroying) return
this.destroying = true

this._debug('destroying (error: %s)', err && (err.message || err))
_predestroy () {
if (this._destroyed || this._destroying) return
this._destroying = true

queueMicrotask(() => { // allow events concurrent with the call to _destroy() to fire (see #692)
this.destroyed = true
this.destroying = false

this._debug('destroy (error: %s)', err && (err.message || err))

this.readable = this.writable = false
this._destroyed = true
this._destroying = false

if (!this._readableState.ended) this.push(null)
if (!this._writableState.finished) this.end()
if (!this._writableState.ended) this.end()

this._connected = false
this._pcReady = false
Expand Down Expand Up @@ -492,10 +479,6 @@ class Peer extends stream.Duplex {
}
this._pc = null
this._channel = null

if (err) this.emit('error', err)
this.emit('close')
cb()
})
}

Expand Down Expand Up @@ -548,10 +531,8 @@ class Peer extends stream.Duplex {
}, CHANNEL_CLOSING_TIMEOUT)
}

_read () {}

_write (chunk, encoding, cb) {
if (this.destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL'))
_write (chunk, cb) {
if (this._destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL'))

if (this._connected) {
try {
Expand All @@ -575,7 +556,7 @@ class Peer extends stream.Duplex {
// When stream finishes writing, close socket. Half open connections are not
// supported.
_onFinish () {
if (this.destroyed) return
if (this._destroyed) return

// Wait a bit before destroying so the socket flushes.
// TODO: is there a more reliable way to accomplish this?
Expand All @@ -591,7 +572,7 @@ class Peer extends stream.Duplex {
}

_startIceCompleteTimeout () {
if (this.destroyed) return
if (this._destroyed) return
if (this._iceCompleteTimer) return
this._debug('started iceComplete timeout')
this._iceCompleteTimer = setTimeout(() => {
Expand All @@ -605,16 +586,16 @@ class Peer extends stream.Duplex {
}

_createOffer () {
if (this.destroyed) return
if (this._destroyed) return

this._pc.createOffer(this.offerOptions)
.then(offer => {
if (this.destroyed) return
if (this._destroyed) return
if (!this.trickle && !this.allowHalfTrickle) offer.sdp = filterTrickle(offer.sdp)
offer.sdp = this.sdpTransform(offer.sdp)

const sendOffer = () => {
if (this.destroyed) return
if (this._destroyed) return
const signal = this._pc.localDescription || offer
this._debug('signal')
this.emit('signal', {
Expand All @@ -625,7 +606,7 @@ class Peer extends stream.Duplex {

const onSuccess = () => {
this._debug('createOffer success')
if (this.destroyed) return
if (this._destroyed) return
if (this.trickle || this._iceComplete) sendOffer()
else this.once('_iceComplete', sendOffer) // wait for candidates
}
Expand Down Expand Up @@ -655,16 +636,16 @@ class Peer extends stream.Duplex {
}

_createAnswer () {
if (this.destroyed) return
if (this._destroyed) return

this._pc.createAnswer(this.answerOptions)
.then(answer => {
if (this.destroyed) return
if (this._destroyed) return
if (!this.trickle && !this.allowHalfTrickle) answer.sdp = filterTrickle(answer.sdp)
answer.sdp = this.sdpTransform(answer.sdp)

const sendAnswer = () => {
if (this.destroyed) return
if (this._destroyed) return
const signal = this._pc.localDescription || answer
this._debug('signal')
this.emit('signal', {
Expand All @@ -675,7 +656,7 @@ class Peer extends stream.Duplex {
}

const onSuccess = () => {
if (this.destroyed) return
if (this._destroyed) return
if (this.trickle || this._iceComplete) sendAnswer()
else this.once('_iceComplete', sendAnswer)
}
Expand All @@ -694,14 +675,14 @@ class Peer extends stream.Duplex {
}

_onConnectionStateChange () {
if (this.destroyed) return
if (this._destroyed) return
if (this._pc.connectionState === 'failed') {
this.destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE'))
}
}

_onIceStateChange () {
if (this.destroyed) return
if (this._destroyed) return
const iceConnectionState = this._pc.iceConnectionState
const iceGatheringState = this._pc.iceGatheringState

Expand Down Expand Up @@ -750,7 +731,7 @@ class Peer extends stream.Duplex {
} else if (this._pc.getStats.length > 0) {
this._pc.getStats(res => {
// If we destroy connection in `connect` callback this code might happen to run when actual connection is already closed
if (this.destroyed) return
if (this._destroyed) return

const reports = []
res.result().forEach(result => {
Expand Down Expand Up @@ -781,10 +762,10 @@ class Peer extends stream.Duplex {

// HACK: We can't rely on order here, for details see https://github.com/js-platform/node-webrtc/issues/339
const findCandidatePair = () => {
if (this.destroyed) return
if (this._destroyed) return

this.getStats((err, items) => {
if (this.destroyed) return
if (this._destroyed) return

// Treat getStats error as non-fatal. It's not essential.
if (err) items = []
Expand Down Expand Up @@ -921,7 +902,7 @@ class Peer extends stream.Duplex {
}

_onSignalingStateChange () {
if (this.destroyed) return
if (this._destroyed) return

if (this._pc.signalingState === 'stable') {
this._isNegotiating = false
Expand Down Expand Up @@ -949,7 +930,7 @@ class Peer extends stream.Duplex {
}

_onIceCandidate (event) {
if (this.destroyed) return
if (this._destroyed) return
if (event.candidate && this.trickle) {
this.emit('signal', {
type: 'candidate',
Expand All @@ -970,35 +951,35 @@ class Peer extends stream.Duplex {
}

_onChannelMessage (event) {
if (this.destroyed) return
if (this._destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
this.push(data)
}

_onChannelBufferedAmountLow () {
if (this.destroyed || !this._cb) return
if (this._destroyed || !this._cb) return
this._debug('ending backpressure: bufferedAmount %d', this._channel.bufferedAmount)
const cb = this._cb
this._cb = null
cb(null)
}

_onChannelOpen () {
if (this._connected || this.destroyed) return
if (this._connected || this._destroyed) return
this._debug('on channel open')
this._channelReady = true
this._maybeReady()
}

_onChannelClose () {
if (this.destroyed) return
if (this._destroyed) return
this._debug('on channel close')
this.destroy()
}

_onTrack (event) {
if (this.destroyed) return
if (this._destroyed) return

event.streams.forEach(eventStream => {
this._debug('on track')
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"get-browser-rtc": "^1.1.0",
"queue-microtask": "^1.2.3",
"randombytes": "^2.1.0",
"readable-stream": "^3.6.0"
"streamx": "^2.12.4"
},
"devDependencies": {
"airtap": "^4.0.3",
Expand Down
Loading

0 comments on commit fa3ac11

Please sign in to comment.