Skip to content

Commit

Permalink
Switch to testing the distributed runtime (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman authored Nov 29, 2024
1 parent abe3ce8 commit fee8743
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 205 deletions.
8 changes: 8 additions & 0 deletions conf.crash.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"bootstrap" : true,
"seed" : "foo",
"keys" : 100000 ,
"tests" : 100000,
"maxProgramSize" : 20,
"crashInterval" : 50000
}
7 changes: 7 additions & 0 deletions conf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"bootstrap" : true,
"seed" : "foo",
"keys" : 100000 ,
"tests" : 100000,
"maxProgramSize" : 20
}
113 changes: 113 additions & 0 deletions src/interpreter/dist_spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

import { ClusterSpec, ContainerSpec } from "./infra";

const RESTATE_IMAGE =
process.env.RESTATE_CONTAINER_IMAGE ?? "ghcr.io/restatedev/restate:main";

const SDK_IMAGE =
process.env.SERVICES_CONTAINER_IMAGE ??
"localhost/restatedev/test-services:latest";

export const RESTATE_LEADER: ContainerSpec = {
image: RESTATE_IMAGE,
name: "n1",
ports: [8080, 9070, 5122, 5123],
pull: "always",
env: {
RESTATE_LOG_FILTER: "restate=warn",
RESTATE_LOG_FORMAT: "json",
RESTATE_ROLES: "[worker,log-server,admin,metadata-store]",
RESTATE_CLUSTER_NAME: "foobar",
RESTATE_BIFROST__DEFAULT_PROVIDER: "replicated",
RESTATE_ALLOW_BOOTSTRAP: "true",
RESTATE_ADVERTISED_ADDRESS: "http://n1:5122",
},
};

export const RESTATE_FOLLOWER = (n: number): ContainerSpec => {
const name = `n${n + 2}`; // followers start at 2, leader is 1.
return {
image: RESTATE_IMAGE,
name,
ports: [8080],
pull: "always",
env: {
RESTATE_LOG_FILTER: "restate=warn",
RESTATE_LOG_FORMAT: "json",
RESTATE_ROLES: "[worker,admin,log-server]",
RESTATE_CLUSTER_NAME: "foobar",
RESTATE_BIFROST__DEFAULT_PROVIDER: "replicated",
RESTATE_ALLOW_BOOTSTRAP: "true",
RESTATE_METADATA_STORE_CLIENT__ADDRESS: "http://n1:5123",
RESTATE_ADVERTISED_ADDRESS: `http://${name}:5122`,
},
};
};

export const INTERPRETER = (n: number): ContainerSpec => {
let english: string;
switch (n) {
case 0:
english = "zero";
break;
case 1:
english = "one";
break;
case 2:
english = "two";
break;
default:
throw new Error("Invalid interpreter number");
}
const name = `interpreter_${english}`;
return {
image: SDK_IMAGE,
name,
ports: [9000 + n],
pull: "never",
env: {
PORT: `${9000 + n}`,
RESTATE_LOGGING: "ERROR",
NODE_ENV: "production",
SERVICES: `ObjectInterpreterL${n}`,
},
};
};

export const SERVICES: ContainerSpec = {
image: SDK_IMAGE,
name: "services",
ports: [9003],
pull: "never",
env: {
PORT: "9003",
RESTATE_LOGGING: "ERROR",
NODE_ENV: "production",
SERVICES: "ServiceInterpreterHelper",
},
};

export const CLUSTER: ClusterSpec = (() => {
const containers = [];

containers.push(RESTATE_LEADER);

for (let i = 0; i < 2; i++) {
containers.push(RESTATE_FOLLOWER(i));
}

containers.push(INTERPRETER(0));
containers.push(INTERPRETER(1));
containers.push(INTERPRETER(2));
containers.push(SERVICES);

return { containers };
})();
201 changes: 201 additions & 0 deletions src/interpreter/infra.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

import {
GenericContainer,
ImagePullPolicy,
Network,
PullPolicy,
StartedNetwork,
StartedTestContainer,
} from "testcontainers";

export type ClusterSpec = {
containers: ContainerSpec[];
};

export type ContainerSpec = {
image: string;
name: string;
ports: number[];
env?: Record<string, string>;
pull: "always" | "never";
};

export type Container = {
name: string;
port(port: number): number;
ports(): Record<number, number>;
url(port: number): string;
host(): string;
stop(): Promise<void>;
restart(): Promise<void>;
};

export type Cluster = {
start(): Promise<void>;
stop(): Promise<void>;
container(name: string): Container;
hostContainerUrl(name: string, port: number): string;
internalContainerUrl(name: string, port: number): string;
};

export function createCluster(spec: ClusterSpec): Cluster {
return new ConfiguredCluster(spec);
}

class ConfiguredContainer implements Container {
private started: StartedTestContainer | undefined;

constructor(
private readonly spec: ContainerSpec,
private readonly container: StartedTestContainer,
) {
this.started = container;
}

get name() {
return this.spec.name;
}

port(port: number): number {
if (this.started === undefined) {
throw new Error("Container not started");
}
return this.started.getMappedPort(port);
}

ports(): Record<number, number> {
if (this.started === undefined) {
throw new Error("Container not started");
}
const started = this.started;
return this.spec.ports.reduce(
(acc, port) => {
acc[port] = started.getMappedPort(port);
return acc;
},
{} as Record<number, number>,
);
}

url(port: number): string {
return `http://${this.host()}:${this.port(port)}`;
}

host(): string {
if (this.started === undefined) {
throw new Error("Container not started");
}
return this.started.getHost();
}

async stop() {
if (this.started === undefined) {
return;
}
await this.started.stop();
this.started = undefined;
}

async restart() {
if (this.started === undefined) {
throw new Error("Container not started");
}
await this.started.restart({ timeout: 1 });
}
}

class ConfiguredCluster implements Cluster {
private containers: Map<string, ConfiguredContainer> | undefined;
private network: StartedNetwork | undefined;

constructor(private readonly spec: ClusterSpec) {}

hostContainerUrl(name: string, port: number): string {
if (this.containers === undefined) {
throw new Error("Cluster not started");
}
const container = this.containers.get(name);
if (!container) {
throw new Error(`Container ${name} not found`);
}
return container.url(port);
}

internalContainerUrl(name: string, port: number): string {
if (this.containers === undefined) {
throw new Error("Cluster not started");
}
const container = this.containers.get(name);
if (!container) {
throw new Error(`Container ${name} not found`);
}
return `http://${container.host()}:${port}`;
}

async start(): Promise<void> {
const network = await new Network().start();

const neverPoll: ImagePullPolicy = {
shouldPull: function (): boolean {
return false;
},
};

const containerPromises = this.spec.containers.map(async (spec) => {
const container = new GenericContainer(spec.image)
.withExposedPorts(...spec.ports)
.withNetwork(network)
.withNetworkAliases(spec.name)
.withPullPolicy(
spec.pull === "always" ? PullPolicy.alwaysPull() : neverPoll,
)
.withEnvironment(spec.env ?? {});

const startedContainer = await container.start();

return new ConfiguredContainer(spec, startedContainer);
});

const containers = await Promise.all(containerPromises);

this.network = network;
this.containers = containers.reduce((acc, container) => {
acc.set(container.name, container);
return acc;
}, new Map<string, ConfiguredContainer>());
}

async stop(): Promise<void> {
const c = this.containers;
this.containers = undefined;

if (c) {
const startedContainers = [...c.values()];
const futures = startedContainers.map((c) => c.stop());
await Promise.all(futures);
}
if (this.network) {
await this.network.stop();
this.network = undefined;
}
}

container(name: string): Container {
if (this.containers === undefined) {
throw new Error("Cluster not started");
}
const container = this.containers.get(name);
if (!container) {
throw new Error(`Container ${name} not found`);
}
return container;
}
}
52 changes: 52 additions & 0 deletions src/interpreter/raw_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import http from "node:http";
import type { Program } from "./commands";

const agent = new http.Agent({
keepAlive: true,
});

export function sendInterpreter(opts: {
ingressUrl: URL;
idempotencyKey: string;
interpreterId: string;
program: Program;
}): Promise<void> {
const { ingressUrl, idempotencyKey, interpreterId, program } = opts;

const options = {
method: "POST",
hostname: ingressUrl.hostname,
port: ingressUrl.port,
path: `/ObjectInterpreterL0/${interpreterId}/interpret/send`,
agent,
headers: {
"idempotency-key": idempotencyKey,
"Content-Type": "application/json",
Accept: "application/json",
},
};

const { promise, resolve, reject } = Promise.withResolvers<void>();

const req = http.request(options, function (res) {
if (res.statusCode === 200 || res.statusCode === 202) {
resolve();
} else {
reject(new Error(`Failed to send: ${res.statusCode}`));
}

res.on("data", function (chunk) {});
res.on("error", function (e) {
reject(e);
});
res.on("end", function () {});
});

const jsBody = JSON.stringify(program);

req.end(jsBody).on("error", function (e) {
reject(e);
});

return promise;
}
Loading

0 comments on commit fee8743

Please sign in to comment.