diff --git a/proto/depot/cloud/v3/machine.proto b/proto/depot/cloud/v3/machine.proto index 323b723..54b45be 100644 --- a/proto/depot/cloud/v3/machine.proto +++ b/proto/depot/cloud/v3/machine.proto @@ -7,6 +7,7 @@ service MachineService { rpc PingMachineHealth(PingMachineHealthRequest) returns (PingMachineHealthResponse); rpc ReportMachineHealth(stream ReportMachineHealthRequest) returns (ReportMachineHealthResponse); rpc Usage(UsageRequest) returns (UsageResponse); + rpc Shutdown(ShutdownRequest) returns (ShutdownResponse); } message RegisterMachineRequest { @@ -159,3 +160,6 @@ message Cache { } message UsageResponse {} + +message ShutdownRequest {} +message ShutdownResponse {} diff --git a/src/gen/ts/depot/cloud/v3/machine_connect.ts b/src/gen/ts/depot/cloud/v3/machine_connect.ts index e35c3a1..56c30fb 100644 --- a/src/gen/ts/depot/cloud/v3/machine_connect.ts +++ b/src/gen/ts/depot/cloud/v3/machine_connect.ts @@ -11,6 +11,8 @@ import { RegisterMachineResponse, ReportMachineHealthRequest, ReportMachineHealthResponse, + ShutdownRequest, + ShutdownResponse, UsageRequest, UsageResponse, } from './machine_pb' @@ -57,5 +59,14 @@ export const MachineService = { O: UsageResponse, kind: MethodKind.Unary, }, + /** + * @generated from rpc depot.cloud.v3.MachineService.Shutdown + */ + shutdown: { + name: 'Shutdown', + I: ShutdownRequest, + O: ShutdownResponse, + kind: MethodKind.Unary, + }, }, } as const diff --git a/src/gen/ts/depot/cloud/v3/machine_pb.ts b/src/gen/ts/depot/cloud/v3/machine_pb.ts index 246f3bf..6d17910 100644 --- a/src/gen/ts/depot/cloud/v3/machine_pb.ts +++ b/src/gen/ts/depot/cloud/v3/machine_pb.ts @@ -1140,3 +1140,69 @@ export class UsageResponse extends Message { return proto3.util.equals(UsageResponse, a, b) } } + +/** + * @generated from message depot.cloud.v3.ShutdownRequest + */ +export class ShutdownRequest extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'depot.cloud.v3.ShutdownRequest' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary(bytes: Uint8Array, options?: Partial): ShutdownRequest { + return new ShutdownRequest().fromBinary(bytes, options) + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ShutdownRequest { + return new ShutdownRequest().fromJson(jsonValue, options) + } + + static fromJsonString(jsonString: string, options?: Partial): ShutdownRequest { + return new ShutdownRequest().fromJsonString(jsonString, options) + } + + static equals( + a: ShutdownRequest | PlainMessage | undefined, + b: ShutdownRequest | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(ShutdownRequest, a, b) + } +} + +/** + * @generated from message depot.cloud.v3.ShutdownResponse + */ +export class ShutdownResponse extends Message { + constructor(data?: PartialMessage) { + super() + proto3.util.initPartial(data, this) + } + + static readonly runtime: typeof proto3 = proto3 + static readonly typeName = 'depot.cloud.v3.ShutdownResponse' + static readonly fields: FieldList = proto3.util.newFieldList(() => []) + + static fromBinary(bytes: Uint8Array, options?: Partial): ShutdownResponse { + return new ShutdownResponse().fromBinary(bytes, options) + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ShutdownResponse { + return new ShutdownResponse().fromJson(jsonValue, options) + } + + static fromJsonString(jsonString: string, options?: Partial): ShutdownResponse { + return new ShutdownResponse().fromJsonString(jsonString, options) + } + + static equals( + a: ShutdownResponse | PlainMessage | undefined, + b: ShutdownResponse | PlainMessage | undefined, + ): boolean { + return proto3.util.equals(ShutdownResponse, a, b) + } +} diff --git a/src/tasks/buildkit.ts b/src/tasks/buildkit.ts index 00b6550..dd86a6e 100644 --- a/src/tasks/buildkit.ts +++ b/src/tasks/buildkit.ts @@ -4,6 +4,7 @@ import * as fsp from 'fs/promises' import {onShutdown, onShutdownError} from 'node-graceful-shutdown' import {RegisterMachineResponse, RegisterMachineResponse_BuildKitTask} from '../gen/ts/depot/cloud/v3/machine_pb' import {pathExists} from '../utils/common' +import {client} from '../utils/grpc' import {ensureMounted, fstrim, mountExecutor, unmapBlockDevice, unmountDevice} from '../utils/mounts' import {reportHealth} from './health' import {reportUsage} from './usage' @@ -209,6 +210,9 @@ keepBytes = ${cacheSizeBytes} // Ignore this error, it's expected when the process is killed. } else if (isAbortError(error)) { // Ignore this error, it's expected when the process is killed. + } else if (error instanceof Error && error.message.includes('Command failed with exit code 2')) { + console.error(`BuildKit exited with panic: ${error}`) + throw error } else { throw error } @@ -237,41 +241,59 @@ keepBytes = ${cacheSizeBytes} console.log(`BuildKit exited with error: ${error}`) } - // Remove estargz cache because we will rely on the buildkit layer cache instead. - await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => { - console.error(err) - }) - - // Print the time it takes to sync the filesystem. - const start = Date.now() - // sync the filesystem to ensure all data is written to disk. - await execa('sync', {stdio: 'inherit'}).catch((err) => { - console.error(err) - }) - console.log(`sync took ${Date.now() - start}ms`) - - for (const mount of task.mounts) { - if (mount.cephVolume) { - if (!task.disableFstrim) { - await fstrim(mount.path) - } - await unmountDevice(mount.path) - await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec) - } else { - await unmountDevice(mount.path) - } - } + await shutdown(rootDir, task) }) try { - await Promise.all([ + const [result] = await Promise.allSettled([ buildkit, - reportHealth({signal, headers, path: rootDir}), + reportHealth({controller, headers, path: rootDir}), reportUsage({machineId, signal, headers}), ]) + if (result.status === 'rejected') { + throw result.reason + } + + // If we have successfully stopped buildkit, we can shutdown. + await shutdown(rootDir, task) } catch (error) { throw error } finally { controller.abort() } } + +async function shutdown(rootDir: string, task: RegisterMachineResponse_BuildKitTask) { + // Remove estargz cache because we will rely on the buildkit layer cache instead. + await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => { + console.error(err) + }) + + // Print the time it takes to sync the filesystem. + const start = Date.now() + // sync the filesystem to ensure all data is written to disk. + await execa('sync', {stdio: 'inherit'}).catch((err) => { + console.error(err) + }) + console.log(`sync took ${Date.now() - start}ms`) + + for (const mount of task.mounts) { + if (mount.cephVolume) { + if (!task.disableFstrim) { + await fstrim(mount.path) + } + await unmountDevice(mount.path) + await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec) + } else { + await unmountDevice(mount.path) + } + } + + // Report shutdown to the API to indicate that the machine is no longer available. + await reportShutdown() +} + +async function reportShutdown() { + const signal = AbortSignal.timeout(5000) + return await client.shutdown({}, {signal}) +} diff --git a/src/tasks/health.ts b/src/tasks/health.ts index 4249b47..0b95bfe 100644 --- a/src/tasks/health.ts +++ b/src/tasks/health.ts @@ -1,4 +1,5 @@ import {PlainMessage} from '@bufbuild/protobuf' +import {isAbortError} from 'abort-controller-x' import {execa} from 'execa' import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb' import {sleep} from '../utils/common' @@ -6,12 +7,14 @@ import {stats} from '../utils/disk' import {client} from '../utils/grpc' export interface ReportHealthParams { - signal: AbortSignal + controller: AbortController headers: HeadersInit path: string } -export async function reportHealth({signal, headers, path}: ReportHealthParams) { +export async function reportHealth({controller, headers, path}: ReportHealthParams) { + const signal = controller.signal + while (!signal.aborted) { await waitForBuildKitWorkers(signal) @@ -34,9 +37,16 @@ export async function reportHealth({signal, headers, path}: ReportHealthParams) } const res = await client.reportMachineHealth(stream(), {headers, signal}) - if (res.shouldTerminate) return + if (res.shouldTerminate) { + console.log('shutdown requested') + controller.abort() + + return + } } catch (error) { - console.log('Error reporting health:', error) + if (!isAbortError(error)) { + console.log('Error reporting health:', error) + } } await sleep(1000) }