Skip to content

Commit

Permalink
Fix child_process.fork() so that IPC works between parent and child
Browse files Browse the repository at this point in the history
and vice versa.
We still can't send network handles back and forth.
  • Loading branch information
Gregory Brail committed Apr 11, 2014
1 parent 14363e7 commit 92320d5
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 58 deletions.
18 changes: 13 additions & 5 deletions core/src/main/java/io/apigee/trireme/core/NodeScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.apigee.trireme.core.internal.ModuleRegistry;
import io.apigee.trireme.core.internal.ScriptRunner;
import io.apigee.trireme.core.modules.ProcessWrap;
import org.mozilla.javascript.Scriptable;

import java.io.File;
Expand All @@ -46,7 +47,7 @@ public class NodeScript
private ScriptRunner runner;
private Object attachment;
private Sandbox sandbox;
private Scriptable parentProcess;
private Object parentProcess;
private boolean pin;
private boolean forceRepl;
private boolean printEval;
Expand Down Expand Up @@ -105,7 +106,7 @@ public ScriptFuture execute()
runner = new ScriptRunner(this, env, sandbox, scriptFile, args);
}
runner.setRegistry(registry);
runner.setParentProcess(parentProcess);
runner.setParentProcess((ProcessWrap.ProcessImpl)parentProcess);
if (workingDir != null) {
try {
runner.setWorkingDirectory(workingDir);
Expand Down Expand Up @@ -144,7 +145,7 @@ public ScriptFuture executeModule()

runner = new ScriptRunner(this, env, sandbox, scriptName,
makeModuleScript(), args);
runner.setParentProcess(parentProcess);
runner.setParentProcess((ProcessWrap.ProcessImpl)parentProcess);
runner.setRegistry(registry);
if (workingDir != null) {
try {
Expand Down Expand Up @@ -314,11 +315,11 @@ public void addEnvironment(String name, String value)
/**
* An internal method to identify the child process argument of the parent who forked this script.
*/
public void _setParentProcess(Scriptable parent)
public void _setParentProcess(Object parent)
{
this.parentProcess = parent;
if (runner != null) {
runner.setParentProcess(parent);
runner.setParentProcess((ProcessWrap.ProcessImpl)parent);
}
}

Expand All @@ -333,5 +334,12 @@ public Scriptable _getProcessObject()
runner.awaitInitialization();
return runner.getProcess();
}

/**
* An internal method to get the runtime for this script.
*/
public ScriptRunner _getRuntime() {
return runner;
}
}

121 changes: 118 additions & 3 deletions core/src/main/java/io/apigee/trireme/core/internal/ScriptRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.apigee.trireme.core.modules.Buffer;
import io.apigee.trireme.core.modules.NativeModule;
import io.apigee.trireme.core.modules.Process;
import io.apigee.trireme.core.modules.ProcessWrap;
import io.apigee.trireme.net.SelectorHandler;
import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
Expand All @@ -47,6 +48,7 @@
import org.mozilla.javascript.Script;
import org.mozilla.javascript.Scriptable;
import org.mozilla.javascript.ScriptableObject;
import org.mozilla.javascript.Undefined;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,6 +58,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
Expand Down Expand Up @@ -111,7 +114,7 @@ public class ScriptRunner
private Buffer.BufferModuleImpl buffer;
private String workingDirectory;
private String scriptFileName;
private Scriptable parentProcess;
private ProcessWrap.ProcessImpl parentProcess;
private boolean forceRepl;

private ScriptableObject scope;
Expand Down Expand Up @@ -286,15 +289,16 @@ public OutputStream getStderr() {
return ((sandbox != null) && (sandbox.getStderr() != null)) ? sandbox.getStderr() : System.err;
}

public Scriptable getParentProcess() {
public ProcessWrap.ProcessImpl getParentProcess() {
return parentProcess;
}

public Process.ProcessImpl getProcess() {
return process;
}

public void setParentProcess(Scriptable parentProcess) {
public void setParentProcess(ProcessWrap.ProcessImpl parentProcess)
{
this.parentProcess = parentProcess;
}

Expand Down Expand Up @@ -378,6 +382,115 @@ public void enqueueTask(ScriptTask task, Scriptable domain)
selector.wakeup();
}

/**
* This method is used by the "child_process" module when sending an IPC message between child processes
* in the same JVM.
*
* @param message A JavaScript object, String, or Buffer. We will make a copy to prevent confusion.
* @param child If null, deliver the message to the "process" object. Otherwise, deliver it to the
* specified child.
*/
public void enqueueIpc(Context cx, Object message, final ProcessWrap.ProcessImpl child)
{
Object toDeliver;
String event = "message";

if (message == ProcessWrap.IPC_DISCONNECT) {
event = "disconnect";
toDeliver = Undefined.instance;

} else if (message instanceof Buffer.BufferImpl) {
// Copy the bytes, because a buffer might be modified between apps
ByteBuffer bb = ((Buffer.BufferImpl)message).getBuffer();
toDeliver = Buffer.BufferImpl.newBuffer(cx, scope, bb, true);

} else if (message instanceof Scriptable) {
// Copy the object because we can't rely on safely sharing them between apps.
Scriptable s = (Scriptable)message;
toDeliver = copy(cx, s);
if (s.has("cmd", s)) {
String cmd = Context.toString(s.get("cmd", s));
if (cmd.startsWith("NODE_")) {
event = "internalMessage";
}
}

} else if (message instanceof String) {
// Strings are immutable in Java!
toDeliver = message;
} else {
throw new AssertionError("Unsupported object type for IPC");
}

final Object reallyDeliver = toDeliver;
final String fevent = event;
if (child == null) {
// We are called on child's script runtime, so enqueue a task here
enqueueTask(new ScriptTask() {
@Override
public void execute(Context cx, Scriptable scope)
{
if ("disconnect".equals(fevent)) {
// Special handling for a disconnect from the parent
if (process.isConnected()) {
process.setConnected(false);
process.getEmit().call(cx, scope, process, new Object[] { fevent });
}
} else {
process.getEmit().call(cx, scope, process,
new Object[] { fevent, reallyDeliver });
}
}
});

} else {
// We are the child's script runtime. Enqueue task that sends to the parent
// "child" here actually refers to the "child_process" object inside the parent!
assert(child.getRuntime() != this);
child.getRuntime().enqueueTask(new ScriptTask()
{
@Override
public void execute(Context cx, Scriptable scope)
{
// Now we should be running inside the script thread of the other script
child.getOnMessage().call(cx, scope, null, new Object[] { fevent, reallyDeliver });
}
});
}
}

/**
* Copy one JavaScript object to another, taking nested objects into account. Don't copy primitive fields
* because we assume that they are immutable (string, boolean, and number).
*/
private Scriptable copy(Context cx, Scriptable s)
{
if (s instanceof Function) {
return null;
}
Scriptable t = cx.newObject(scope);
for (Object id : s.getIds()) {
if (id instanceof String) {
String n = (String)id;
Object val = s.get(n, s);
if (val instanceof Scriptable) {
val = copy(cx, (Scriptable)val);
}
t.put(n, t, val);
} else if (id instanceof Number) {
int i = ((Number)id).intValue();
Object val = s.get(i, s);
if (val instanceof Scriptable) {
val = copy(cx, (Scriptable)val);
}
t.put(i, t, val);
} else {
throw new AssertionError();
}
}
return t;
}

public Scriptable getDomain()
{
return ArgUtils.ensureValid(process.getDomain());
Expand Down Expand Up @@ -909,6 +1022,8 @@ private void initGlobals(Context cx)

// Next we need "process" which takes a bit more care
process = (Process.ProcessImpl)require(Process.MODULE_NAME, cx);
// Check if we are connected to a parent via API
process.setConnected(parentProcess != null);

// The buffer module needs special handling because of the "charsWritten" variable
buffer = (Buffer.BufferModuleImpl)require("buffer", cx);
Expand Down
76 changes: 70 additions & 6 deletions core/src/main/java/io/apigee/trireme/core/modules/Process.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public static class ProcessImpl
private boolean needImmediateCallback;
private Function immediateCallback;
private Function fatalException;
private Function emit;
private Object domain;
private boolean exiting;
private int umask = DEFAULT_UMASK;
Expand All @@ -114,6 +115,7 @@ public static class ProcessImpl
private String eval;
private boolean printEval;
private boolean forceRepl;
private boolean connected;
private Scriptable tickInfoBox;

@JSConstructor
Expand Down Expand Up @@ -179,6 +181,18 @@ public Object getTickInfoBox() {
return tickInfoBox;
}

@JSGetter("connected")
@SuppressWarnings("undefined")
public boolean isConnected() {
return connected;
}

@JSSetter("connected")
@SuppressWarnings("undefined")
public void setConnected(boolean c) {
this.connected = c;
}

/**
* Implement process.binding. This works like the rest of the module loading but uses a different
* namespace and a different cache.
Expand Down Expand Up @@ -458,19 +472,44 @@ public int getPid()
return System.identityHashCode(runner) % 65536;
}

/**
* Send a message back to our parent process if there is one.
*/
@JSFunction
@SuppressWarnings("unused")
public static void send(Context cx, Scriptable thisObj, Object[] args, Function func)
{
Object message = objArg(args, 0, Object.class, true);
ProcessImpl self = (ProcessImpl)thisObj;

if (!self.connected) {
throw Utils.makeError(cx, thisObj, "IPC to the parent is disconnected");
}
if (self.runner.getParentProcess() == null) {
throw Utils.makeError(cx, thisObj, "IPC is not enabled back to the parent");
}

// We have a parent, which has a reference to its own "child_process" object that
// refers back to us. Put a message on THAT script's queue that came from us.
ProcessWrap.ProcessImpl childObj = self.runner.getParentProcess();
childObj.getRuntime().enqueueIpc(cx, message, childObj);
}

@JSFunction
@SuppressWarnings("unused")
public static void disconnect(Context cx, Scriptable thisObj, Object[] args, Function func)
{
ProcessImpl self = (ProcessImpl)thisObj;

if (self.runner.getParentProcess() == null) {
throw Utils.makeError(cx, thisObj, "IPC is not enabled back to the parent");
}

ProcessWrap.ProcessImpl pw = (ProcessWrap.ProcessImpl)self.runner.getParentProcess();
pw.getOnMessage().call(cx, pw, pw, new Object[] { message });
ProcessWrap.ProcessImpl childObj = self.runner.getParentProcess();

self.emit.call(cx, self.emit, thisObj, new Object[] { "disconnected" });
self.connected = false;
childObj.getRuntime().enqueueIpc(cx, ProcessWrap.IPC_DISCONNECT, childObj);
}

@JSGetter("_errno")
Expand Down Expand Up @@ -501,10 +540,6 @@ public static Object memoryUsage(Context cx, Scriptable thisObj, Object[] args,

public void fireExit(Context cx, int code)
{
Function emit = (Function)ScriptableObject.getProperty(this, "emit");
if (emit == null) {
throw new AssertionError("process.emit not defined");
}
emit.call(cx, emit, this, new Object[] { "exit", code });
}

Expand All @@ -523,6 +558,23 @@ public Function getSubmitTick() {
return submitTick;
}

/**
* We use these functions when our own JS code needs to control whether the event loop stays alive.
*/
@JSFunction("_pin")
@SuppressWarnings("unused")
public void pin()
{
runner.pin();
}

@JSFunction("_unpin")
@SuppressWarnings("unused")
public void unPin()
{
runner.unPin();
}

/**
* trireme.js (aka node.js) calls this whenever nextTick is called and it thinks that we
* don't know that it needs us to do stuff.
Expand All @@ -549,6 +601,18 @@ public boolean isNeedTickCallback() {
return needTickCallback;
}

@JSSetter("emit")
@SuppressWarnings("unused")
public void setEmit(Function f) {
this.emit = f;
}

@JSGetter("emit")
@SuppressWarnings("unused")
public Function getEmit() {
return emit;
}

@JSSetter("_tickCallback")
@SuppressWarnings("unused")
public void setTickCallback(Function f) {
Expand Down
Loading

0 comments on commit 92320d5

Please sign in to comment.