From 0dbfc577ee09baf53646c21ff6061052c5962040 Mon Sep 17 00:00:00 2001 From: Jacob Gillespie Date: Fri, 11 Oct 2024 12:24:20 +0100 Subject: [PATCH] Use ReportMachineHealth method for health stream --- proto/depot/cloud/v3/machine.proto | 9 +++ src/gen/ts/depot/cloud/v3/machine_connect.ts | 13 +++- src/gen/ts/depot/cloud/v3/machine_pb.ts | 82 +++++++++++++++++++- src/tasks/buildkit.ts | 2 +- src/tasks/engine.ts | 2 +- src/tasks/engineHealth.ts | 45 ++++++----- src/tasks/health.ts | 45 ++++++----- 7 files changed, 148 insertions(+), 50 deletions(-) diff --git a/proto/depot/cloud/v3/machine.proto b/proto/depot/cloud/v3/machine.proto index 524887e..323b723 100644 --- a/proto/depot/cloud/v3/machine.proto +++ b/proto/depot/cloud/v3/machine.proto @@ -5,6 +5,7 @@ package depot.cloud.v3; service MachineService { rpc RegisterMachine(RegisterMachineRequest) returns (stream RegisterMachineResponse); rpc PingMachineHealth(PingMachineHealthRequest) returns (PingMachineHealthResponse); + rpc ReportMachineHealth(stream ReportMachineHealthRequest) returns (ReportMachineHealthResponse); rpc Usage(UsageRequest) returns (UsageResponse); } @@ -134,6 +135,14 @@ message PingMachineHealthResponse { bool should_terminate = 1; } +message ReportMachineHealthRequest { + repeated DiskSpace disks = 1; +} + +message ReportMachineHealthResponse { + bool should_terminate = 1; +} + message Cert { string cert = 1; string key = 2; diff --git a/src/gen/ts/depot/cloud/v3/machine_connect.ts b/src/gen/ts/depot/cloud/v3/machine_connect.ts index a4c6298..e35c3a1 100644 --- a/src/gen/ts/depot/cloud/v3/machine_connect.ts +++ b/src/gen/ts/depot/cloud/v3/machine_connect.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-connect-es v0.12.0 with parameter "target=ts,import_extension=none" +// @generated by protoc-gen-connect-es v1.4.0 with parameter "target=ts,import_extension=none" // @generated from file depot/cloud/v3/machine.proto (package depot.cloud.v3, syntax proto3) /* eslint-disable */ // @ts-nocheck @@ -9,6 +9,8 @@ import { PingMachineHealthResponse, RegisterMachineRequest, RegisterMachineResponse, + ReportMachineHealthRequest, + ReportMachineHealthResponse, UsageRequest, UsageResponse, } from './machine_pb' @@ -37,6 +39,15 @@ export const MachineService = { O: PingMachineHealthResponse, kind: MethodKind.Unary, }, + /** + * @generated from rpc depot.cloud.v3.MachineService.ReportMachineHealth + */ + reportMachineHealth: { + name: 'ReportMachineHealth', + I: ReportMachineHealthRequest, + O: ReportMachineHealthResponse, + kind: MethodKind.ClientStreaming, + }, /** * @generated from rpc depot.cloud.v3.MachineService.Usage */ diff --git a/src/gen/ts/depot/cloud/v3/machine_pb.ts b/src/gen/ts/depot/cloud/v3/machine_pb.ts index e319909..246f3bf 100644 --- a/src/gen/ts/depot/cloud/v3/machine_pb.ts +++ b/src/gen/ts/depot/cloud/v3/machine_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.3.0 with parameter "target=ts,import_extension=none" +// @generated by protoc-gen-es v1.10.0 with parameter "target=ts,import_extension=none" // @generated from file depot/cloud/v3/machine.proto (package depot.cloud.v3, syntax proto3) /* eslint-disable */ // @ts-nocheck @@ -900,6 +900,86 @@ export class PingMachineHealthResponse extends Message { + /** + * @generated from field: repeated depot.cloud.v3.DiskSpace disks = 1; + */ + disks: DiskSpace[] = [] + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'depot.cloud.v3.ReportMachineHealthRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + {no: 1, name: 'disks', kind: 'message', T: DiskSpace, repeated: true}, + ]) + + static fromBinary(bytes: Uint8Array, options?: Partial): ReportMachineHealthRequest { + return new ReportMachineHealthRequest().fromBinary(bytes, options) + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ReportMachineHealthRequest { + return new ReportMachineHealthRequest().fromJson(jsonValue, options) + } + + static fromJsonString(jsonString: string, options?: Partial): ReportMachineHealthRequest { + return new ReportMachineHealthRequest().fromJsonString(jsonString, options) + } + + static equals( + a: ReportMachineHealthRequest | PlainMessage | undefined, + b: ReportMachineHealthRequest | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(ReportMachineHealthRequest, a, b) + } +} + +/** + * @generated from message depot.cloud.v3.ReportMachineHealthResponse + */ +export class ReportMachineHealthResponse extends Message { + /** + * @generated from field: bool should_terminate = 1; + */ + shouldTerminate = false + + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'depot.cloud.v3.ReportMachineHealthResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + {no: 1, name: 'should_terminate', kind: 'scalar', T: 8 /* ScalarType.BOOL */}, + ]) + + static fromBinary(bytes: Uint8Array, options?: Partial): ReportMachineHealthResponse { + return new ReportMachineHealthResponse().fromBinary(bytes, options) + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ReportMachineHealthResponse { + return new ReportMachineHealthResponse().fromJson(jsonValue, options) + } + + static fromJsonString(jsonString: string, options?: Partial): ReportMachineHealthResponse { + return new ReportMachineHealthResponse().fromJsonString(jsonString, options) + } + + static equals( + a: ReportMachineHealthResponse | PlainMessage | undefined, + b: ReportMachineHealthResponse | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(ReportMachineHealthResponse, a, b) + } +} + /** * @generated from message depot.cloud.v3.Cert */ diff --git a/src/tasks/buildkit.ts b/src/tasks/buildkit.ts index 3153859..476b68c 100644 --- a/src/tasks/buildkit.ts +++ b/src/tasks/buildkit.ts @@ -258,7 +258,7 @@ keepBytes = ${cacheSizeBytes} try { await Promise.all([ buildkit, - reportHealth({machineId, signal, headers, path: rootDir}), + reportHealth({signal, headers, path: rootDir}), reportUsage({machineId, signal, headers}), ]) } catch (error) { diff --git a/src/tasks/engine.ts b/src/tasks/engine.ts index a6870fa..edad31b 100644 --- a/src/tasks/engine.ts +++ b/src/tasks/engine.ts @@ -110,7 +110,7 @@ export async function startEngine(message: RegisterMachineResponse, task: Regist try { await Promise.all([ engine, - reportEngineHealth({machineId, signal, headers, path: '/var/lib/engine'}), + reportEngineHealth({signal, headers, path: '/var/lib/engine'}), // reportUsage({machineId, signal, headers}), ]) } catch (error) { diff --git a/src/tasks/engineHealth.ts b/src/tasks/engineHealth.ts index 2897655..f509ae1 100644 --- a/src/tasks/engineHealth.ts +++ b/src/tasks/engineHealth.ts @@ -1,41 +1,40 @@ +import {PlainMessage} from '@bufbuild/protobuf' import {execa} from 'execa' +import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb' import {sleep} from '../utils/common' import {stats} from '../utils/disk' import {client} from '../utils/grpc' export interface ReportHealthParams { - machineId: string signal: AbortSignal headers: HeadersInit path: string } -export async function reportEngineHealth({machineId, signal, headers, path}: ReportHealthParams) { - while (true) { - if (signal.aborted) return - +export async function reportEngineHealth({signal, headers, path}: ReportHealthParams) { + while (!signal.aborted) { await waitForWorkers(signal) try { - while (true) { - if (signal.aborted) return - - const disk_stats = await stats(path) - const disk_space = disk_stats - ? [ - { - path, - freeMb: disk_stats.freeMb, - totalMb: disk_stats.totalMb, - freeInodes: disk_stats.freeInodes, - totalInodes: disk_stats.totalInodes, - }, - ] - : undefined - - await client.pingMachineHealth({machineId, disks: disk_space}, {headers, signal}) - await sleep(1000) + async function* stream() { + while (!signal.aborted) { + const diskStats = await stats(path) + if (diskStats) { + const diskSpace: PlainMessage = { + path, + freeMb: diskStats.freeMb, + totalMb: diskStats.totalMb, + freeInodes: diskStats.freeInodes, + totalInodes: diskStats.totalInodes, + } + yield {disks: [diskSpace]} + } + await sleep(1000) + } } + + const res = await client.reportMachineHealth(stream(), {headers, signal}) + if (res.shouldTerminate) return } catch (error) { console.log('Error reporting health:', error) } diff --git a/src/tasks/health.ts b/src/tasks/health.ts index ebb403b..4249b47 100644 --- a/src/tasks/health.ts +++ b/src/tasks/health.ts @@ -1,41 +1,40 @@ +import {PlainMessage} from '@bufbuild/protobuf' import {execa} from 'execa' +import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb' import {sleep} from '../utils/common' import {stats} from '../utils/disk' import {client} from '../utils/grpc' export interface ReportHealthParams { - machineId: string signal: AbortSignal headers: HeadersInit path: string } -export async function reportHealth({machineId, signal, headers, path}: ReportHealthParams) { - while (true) { - if (signal.aborted) return - +export async function reportHealth({signal, headers, path}: ReportHealthParams) { + while (!signal.aborted) { await waitForBuildKitWorkers(signal) try { - while (true) { - if (signal.aborted) return - - const disk_stats = await stats(path) - const disk_space = disk_stats - ? [ - { - path, - freeMb: disk_stats.freeMb, - totalMb: disk_stats.totalMb, - freeInodes: disk_stats.freeInodes, - totalInodes: disk_stats.totalInodes, - }, - ] - : undefined - - await client.pingMachineHealth({machineId, disks: disk_space}, {headers, signal}) - await sleep(1000) + async function* stream() { + while (!signal.aborted) { + const diskStats = await stats(path) + if (diskStats) { + const diskSpace: PlainMessage = { + path, + freeMb: diskStats.freeMb, + totalMb: diskStats.totalMb, + freeInodes: diskStats.freeInodes, + totalInodes: diskStats.totalInodes, + } + yield {disks: [diskSpace]} + } + await sleep(1000) + } } + + const res = await client.reportMachineHealth(stream(), {headers, signal}) + if (res.shouldTerminate) return } catch (error) { console.log('Error reporting health:', error) }