Skip to content

Commit

Permalink
Harden scope of class properties to match Python
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed May 22, 2024
1 parent 1032098 commit bc98e3f
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 132 deletions.
2 changes: 1 addition & 1 deletion agents/src/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
import type { JobContext } from './job_context.js';

type entryFunction = (job: JobContext) => Promise<void>;
export type entryFunction = (job: JobContext) => Promise<void>;

export interface Agent {
entry: entryFunction;
Expand Down
1 change: 1 addition & 0 deletions agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ export * from './worker.js';
export * from './utils.js';
export * from './log.js';
export * from './generator.js';
export * from './tokenize.js';

export { cli, stt, tts };
6 changes: 3 additions & 3 deletions agents/src/job_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class JobContext {
#job: Job;
#room: Room;
#publisher?: RemoteParticipant;
tx: EventEmitter;
#tx: EventEmitter;

constructor(
tx: EventEmitter,
Expand All @@ -20,7 +20,7 @@ export class JobContext {
this.#job = job;
this.#room = room;
this.#publisher = publisher;
this.tx = tx;
this.#tx = tx;
}

get id(): string {
Expand All @@ -44,6 +44,6 @@ export class JobContext {
}

async shutdown() {
this.tx.emit('close');
this.#tx.emit('close');
}
}
16 changes: 8 additions & 8 deletions agents/src/job_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AnsweredError extends Error {
}
}

enum AutoDisconnect {
export enum AutoDisconnect {
ROOM_EMPTY,
PUBLISHER_LEFT,
NONE,
Expand Down Expand Up @@ -44,12 +44,12 @@ export type AvailRes = {
export class JobRequest {
#job: Job;
#answered = false;
tx: EventEmitter;
logger = log.child({ job: this.job });
#tx: EventEmitter;
#logger = log.child({ job: this.job });

constructor(job: Job, tx: EventEmitter) {
this.#job = job;
this.tx = tx;
this.#tx = tx;
}

get id(): string {
Expand Down Expand Up @@ -77,8 +77,8 @@ export class JobRequest {
throw new AnsweredError();
}
this.#answered = true;
this.tx.emit('recv', { avail: false, data: undefined } as AvailRes);
this.logger.info('rejected job', this.id);
this.#tx.emit('recv', { avail: false, data: undefined } as AvailRes);
this.#logger.info('rejected job', this.id);
}

async accept(
Expand Down Expand Up @@ -111,8 +111,8 @@ export class JobRequest {
assign,
};

this.tx.emit('recv', { avail: true, data } as AvailRes);
this.#tx.emit('recv', { avail: true, data } as AvailRes);

this.logger.info('accepted job', this.id);
this.#logger.info('accepted job', this.id);
}
}
2 changes: 1 addition & 1 deletion agents/src/stt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
//
// SPDX-License-Identifier: Apache-2.0

export { STT, SpeechEvent, SpeechEventType, SpeechStream } from './stt.js';
export { STT, SpeechEvent, SpeechEventType, SpeechStream, SpeechData } from './stt.js';
export { StreamAdapter, StreamAdapterWrapper } from './stream_adapter.js';
64 changes: 32 additions & 32 deletions agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ import { VADEventType, type VADStream } from '../vad.js';
import { STT, SpeechEvent, SpeechEventType, SpeechStream } from './stt.js';

export class StreamAdapterWrapper extends SpeechStream {
closed: boolean;
stt: STT;
vadStream: VADStream;
eventQueue: (SpeechEvent | undefined)[];
language?: string;
task: {
#closed: boolean;
#stt: STT;
#vadStream: VADStream;
#eventQueue: (SpeechEvent | undefined)[];
#language?: string;
#task: {
run: Promise<void>;
cancel: () => void;
};

constructor(stt: STT, vadStream: VADStream, language: string | undefined = undefined) {
super();
this.closed = false;
this.stt = stt;
this.vadStream = vadStream;
this.eventQueue = [];
this.language = language;
this.task = {
this.#closed = false;
this.#stt = stt;
this.#vadStream = vadStream;
this.#eventQueue = [];
this.#language = language;
this.#task = {
run: new Promise((_, reject) => {
this.run(reject);
}),
Expand All @@ -33,46 +33,46 @@ export class StreamAdapterWrapper extends SpeechStream {
}

async run(reject: (arg: Error) => void) {
this.task.cancel = () => {
this.closed = true;
this.#task.cancel = () => {
this.#closed = true;
reject(new Error('cancelled'));
};

for (const event of this.vadStream) {
for (const event of this.#vadStream) {
if (event.type == VADEventType.START_OF_SPEECH) {
const startEvent = new SpeechEvent(SpeechEventType.START_OF_SPEECH);
this.eventQueue.push(startEvent);
this.#eventQueue.push(startEvent);
} else if (event.type == VADEventType.END_OF_SPEECH) {
const mergedFrames = mergeFrames(event.speech);
const endEvent = await this.stt.recognize(mergedFrames, this.language);
this.eventQueue.push(endEvent);
const endEvent = await this.#stt.recognize(mergedFrames, this.#language);
this.#eventQueue.push(endEvent);
}
}

this.eventQueue.push(undefined);
this.#eventQueue.push(undefined);
}

pushFrame(frame: AudioFrame) {
if (this.closed) {
if (this.#closed) {
throw new TypeError('cannot push frame to closed stream');
}

this.vadStream.pushFrame(frame);
this.#vadStream.pushFrame(frame);
}

async close(wait: boolean = true): Promise<void> {
this.closed = true;
this.#closed = true;

if (!wait) {
this.task.cancel();
this.#task.cancel();
}

await this.vadStream.close(wait);
await this.task.run;
await this.#vadStream.close(wait);
await this.#task.run;
}

next(): IteratorResult<SpeechEvent> {
const item = this.eventQueue.shift();
const item = this.#eventQueue.shift();
if (item) {
return { done: false, value: item };
} else {
Expand All @@ -82,23 +82,23 @@ export class StreamAdapterWrapper extends SpeechStream {
}

export class StreamAdapter extends STT {
stt: STT;
vadStream: VADStream;
#stt: STT;
#vadStream: VADStream;

constructor(stt: STT, vadStream: VADStream) {
super(true);
this.stt = stt;
this.vadStream = vadStream;
this.#stt = stt;
this.#vadStream = vadStream;
}

async recognize(
buffer: AudioBuffer,
language: string | undefined = undefined,
): Promise<SpeechEvent> {
return await this.stt.recognize(buffer, language);
return await this.#stt.recognize(buffer, language);
}

stream(language: string | undefined = undefined) {
return new StreamAdapterWrapper(this.stt, this.vadStream, language);
return new StreamAdapterWrapper(this.#stt, this.#vadStream, language);
}
}
60 changes: 30 additions & 30 deletions agents/src/tts/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ import type { SentenceStream, SentenceTokenizer } from '../tokenize.js';
import { ChunkedStream, SynthesisEvent, SynthesisEventType, SynthesizeStream, TTS } from './tts.js';

export class StreamAdapterWrapper extends SynthesizeStream {
closed: boolean;
tts: TTS;
sentenceStream: SentenceStream;
eventQueue: (SynthesisEvent | undefined)[];
task: {
#closed: boolean;
#tts: TTS;
#sentenceStream: SentenceStream;
#eventQueue: (SynthesisEvent | undefined)[];
#task: {
run: Promise<void>;
cancel: () => void;
};

constructor(tts: TTS, sentenceStream: SentenceStream) {
super();
this.closed = false;
this.tts = tts;
this.sentenceStream = sentenceStream;
this.eventQueue = [];
this.task = {
this.#closed = false;
this.#tts = tts;
this.#sentenceStream = sentenceStream;
this.#eventQueue = [];
this.#task = {
run: new Promise((_, reject) => {
this.run(reject);
}),
Expand All @@ -29,32 +29,32 @@ export class StreamAdapterWrapper extends SynthesizeStream {
}

async run(reject: (arg: Error) => void) {
while (!this.closed) {
this.task.cancel = () => {
this.closed = true;
while (!this.#closed) {
this.#task.cancel = () => {
this.#closed = true;
reject(new Error('cancelled'));
};
for await (const sentence of this.sentenceStream) {
const audio = await this.tts.synthesize(sentence.text).then((data) => data.next());
for await (const sentence of this.#sentenceStream) {
const audio = await this.#tts.synthesize(sentence.text).then((data) => data.next());
if (!audio.done) {
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.STARTED));
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.AUDIO, audio.value));
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.FINISHED));
this.#eventQueue.push(new SynthesisEvent(SynthesisEventType.STARTED));
this.#eventQueue.push(new SynthesisEvent(SynthesisEventType.AUDIO, audio.value));
this.#eventQueue.push(new SynthesisEvent(SynthesisEventType.FINISHED));
}
}
}
}

pushText(token: string) {
this.sentenceStream.pushText(token);
this.#sentenceStream.pushText(token);
}

async flush() {
await this.sentenceStream.flush();
await this.#sentenceStream.flush();
}

next(): IteratorResult<SynthesisEvent> {
const event = this.eventQueue.shift();
const event = this.#eventQueue.shift();
if (event) {
return { done: false, value: event };
} else {
Expand All @@ -63,30 +63,30 @@ export class StreamAdapterWrapper extends SynthesizeStream {
}

async close(): Promise<void> {
this.task.cancel();
this.#task.cancel();
try {
await this.task.run;
await this.#task.run;
} finally {
this.eventQueue.push(undefined);
this.#eventQueue.push(undefined);
}
}
}

export class StreamAdapter extends TTS {
tts: TTS;
tokenizer: SentenceTokenizer;
#tts: TTS;
#tokenizer: SentenceTokenizer;

constructor(tts: TTS, tokenizer: SentenceTokenizer) {
super(true);
this.tts = tts;
this.tokenizer = tokenizer;
this.#tts = tts;
this.#tokenizer = tokenizer;
}

synthesize(text: string): Promise<ChunkedStream> {
return this.tts.synthesize(text);
return this.#tts.synthesize(text);
}

stream() {
return new StreamAdapterWrapper(this.tts, this.tokenizer.stream(undefined));
return new StreamAdapterWrapper(this.#tts, this.#tokenizer.stream(undefined));
}
}
Loading

0 comments on commit bc98e3f

Please sign in to comment.