Skip to content

Commit

Permalink
Fix load calculation, report worker status (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcherry authored Sep 5, 2024
1 parent ad463b2 commit 0b2b4b6
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 129 deletions.
9 changes: 9 additions & 0 deletions .changeset/few-llamas-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@livekit/agents": minor
"@livekit/agents-plugin-elevenlabs": minor
"livekit-agents-examples": minor
---

bump underlying dependencies
fix load calculation
report worker status
2 changes: 0 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ jobs:
- name: REUSE-3.2 compliance check
uses: fsfe/reuse-action@v4
- uses: pnpm/action-setup@v4
with:
version: 9
- name: Setup node
uses: actions/setup-node@v4
with:
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: pnpm/action-setup@v4
with:
version: 9
- name: Use Node.js 20
uses: actions/setup-node@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"typescript": "^5.0.0"
},
"dependencies": {
"@livekit/rtc-node": "^0.5.1",
"@livekit/rtc-node": "^0.7.0",
"@livekit/protocol": "^1.21.0",
"commander": "^12.0.0",
"livekit-server-sdk": "^2.6.1",
Expand Down
43 changes: 34 additions & 9 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ParticipantPermission,
ServerMessage,
WorkerMessage,
WorkerStatus,
} from '@livekit/protocol';
import { EventEmitter } from 'events';
import { AccessToken } from 'livekit-server-sdk';
Expand All @@ -25,15 +26,15 @@ const MAX_RECONNECT_ATTEMPTS = 10;
const ASSIGNMENT_TIMEOUT = 15 * 1000;
const LOAD_INTERVAL = 5 * 1000;

const cpuLoad = (): number =>
(os
const defaultCpuLoad = (): number =>
1 -
os
.cpus()
.reduce(
(acc, x) => acc + x.times.idle / Object.values(x.times).reduce((acc, x) => acc + x, 0),
0,
) /
os.cpus().length) *
100;
os.cpus().length;

export class WorkerPermissions {
canPublish: boolean;
Expand All @@ -59,7 +60,8 @@ export class WorkerPermissions {

export class WorkerOptions {
requestFunc: (arg: JobRequest) => Promise<void>;
cpuLoadFunc: () => number;
loadFunc: () => number;
loadThreshold: number;
namespace: string;
permissions: WorkerPermissions;
workerType: JobType;
Expand All @@ -73,7 +75,8 @@ export class WorkerOptions {

constructor({
requestFunc,
cpuLoadFunc = cpuLoad,
loadFunc = defaultCpuLoad,
loadThreshold = 0.65,
namespace = 'default',
permissions = new WorkerPermissions(),
workerType = JobType.JT_ROOM,
Expand All @@ -86,7 +89,10 @@ export class WorkerOptions {
logLevel = 'info',
}: {
requestFunc: (arg: JobRequest) => Promise<void>;
cpuLoadFunc?: () => number;
/** Called to determine the current load of the worker. Should return a value between 0 and 1. */
loadFunc?: () => number;
/** When the load exceeds this threshold, the worker will be marked as unavailable. */
loadThreshold?: number;
namespace?: string;
permissions?: WorkerPermissions;
workerType?: JobType;
Expand All @@ -99,7 +105,8 @@ export class WorkerOptions {
logLevel?: string;
}) {
this.requestFunc = requestFunc;
this.cpuLoadFunc = cpuLoadFunc;
this.loadFunc = loadFunc;
this.loadThreshold = loadThreshold;
this.namespace = namespace;
this.permissions = permissions;
this.workerType = workerType;
Expand Down Expand Up @@ -305,15 +312,33 @@ export class Worker {
}),
);

let currentStatus = WorkerStatus.WS_AVAILABLE;
const loadMonitor = setInterval(() => {
if (closingWS) clearInterval(loadMonitor);

const oldStatus = currentStatus;
const currentLoad = this.#opts.loadFunc();
const isFull = currentLoad >= this.#opts.loadThreshold;
const currentlyAvailable = !isFull;
currentStatus = currentlyAvailable ? WorkerStatus.WS_AVAILABLE : WorkerStatus.WS_FULL;

if (oldStatus != currentStatus) {
const extra = { load: currentLoad, loadThreshold: this.#opts.loadThreshold };
if (isFull) {
log.child(extra).info('worker is at full capacity, marking as unavailable');
} else {
log.child(extra).info('worker is below capacity, marking as available');
}
}

this.#event.emit(
'worker_msg',
new WorkerMessage({
message: {
case: 'updateWorker',
value: {
load: cpuLoad(),
load: currentLoad,
status: currentStatus,
},
},
}),
Expand Down
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
"dependencies": {
"@livekit/agents": "workspace:*",
"@livekit/agents-plugin-elevenlabs": "workspace:*",
"@livekit/rtc-node": "^0.5.0"
"@livekit/rtc-node": "^0.7.0"
}
}
57 changes: 34 additions & 23 deletions examples/src/minimal.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import {
type Agent,
type JobContext,
type JobRequest,
WorkerOptions,
cli,
defineAgent,
} from '@livekit/agents';
import { fileURLToPath } from 'url';
import { type JobContext, type JobRequest, WorkerOptions, cli, defineAgent } from '@livekit/agents';
import { TTS } from '@livekit/agents-plugin-elevenlabs';
import { AudioSource, LocalAudioTrack, TrackPublishOptions, TrackSource } from '@livekit/rtc-node';

const requestFunc = async (req: JobRequest) => {
console.log('received request', req);
await req.accept(fileURLToPath(import.meta.url));
};
export default defineAgent({
entry: async (job: JobContext) => {
console.log('starting TTS example agent');

if (process.argv[1] === fileURLToPath(import.meta.url)) {
cli.runApp(new WorkerOptions({ requestFunc }));
}
// prepare our audio track and start publishing it to the room
const source = new AudioSource(24000, 1);
const track = LocalAudioTrack.createAudioTrack('agent-mic', source);
const options = new TrackPublishOptions();
options.source = TrackSource.SOURCE_MICROPHONE;
await job.room.localParticipant?.publishTrack(track, options);

const myAgent: Agent = {
entry: async (job: JobContext) => {
console.log('starting voice assistant...');
job;
// ask ElevenLabs to synthesize "Hello!"
const tts = new TTS();
console.log('speaking "Hello!"');
await tts
.synthesize('Hello!')
.then((output) => output.collect())
.then((output) => {
// send the audio to our track
source.captureFrame(output);
});
},
});

// the requestFunc function allows us to do some things on the main thread after worker connection
const requestFunc = async (req: JobRequest) => {
// this line needs to be exact.
// we are passing this file's path to Agents, in order to import it later and run our entry function.
await req.accept(import.meta.filename);
};

// your entry file has to provide a default export of type Agent.
// use the defineAgent() helper function to generate your agent.
export default defineAgent(myAgent);
// check that we're running this file and not importing functions from it
// without this if closure, our code would start` a new Agents process on every job process.
if (process.argv[1] === import.meta.filename) {
cli.runApp(new WorkerOptions({ requestFunc }));
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@
"typedoc": "^0.25.13",
"typescript": "^5.4.5",
"vitest": "^1.6.0"
}
},
"packageManager": "[email protected]"
}
2 changes: 1 addition & 1 deletion plugins/elevenlabs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
},
"dependencies": {
"@livekit/agents": "workspace:*",
"@livekit/rtc-node": "^0.5.0",
"@livekit/rtc-node": "^0.7.0",
"ws": "^8.16.0"
}
}
Loading

0 comments on commit 0b2b4b6

Please sign in to comment.