I've designed a function diversify() to take some expensive function f() and run it in parallel on all the cores of the machine it's being run on. I've also set this up so that if f() returns a value on one core, all the other threads are automatically killed. The code for that is below:
let diversify = f => {
// Split a function `f` into a thread for each core of the machine.
// The first thread to finish will return its result and end all others.
if ( cluster.isMaster ) {
let children = []
for ( let i = 0; i < os.cpus().length; i ) {
children.push(cluster.fork())
}
cluster.on('message', (_, msg) => {
if ( msg.cmd == 'stop' ) { children.forEach(child => {
child.process.kill()
})}
return msg.out
})
} else {
let out = f()
process.send({ cmd: 'stop', out })
}
}
The problem is that after one process tells the master process to stop through msg.cmd = 'stop', there is no way to return the output msg.out. The way that it is being implemented in the code above, the return msg.out statement is inside an anonymous function nested inside the larger diversify function. Therefore, this return value is not exposed when you run something like diversify(() => { return true }), resulting in undefined instead. Is there any way to return what's being sent in msg.out?
CodePudding user response:
Sending messages to workers is by design asynchronous.
There is technically no way to return a value from a callback that is an event handler, since the event may occur at any time.
The canonical solution to this problem is via promises. Simply wrap the function body in a promise and resolve the value instead of returning it.
const diversify = (f) =>
new Promise((resolve) => {
// Split a function `f` into a thread for each core of the machine.
// The first thread to finish will return its result and end all others.
if (cluster.isMaster) {
let children = [];
for (let i = 0; i < os.cpus().length; i ) {
children.push(cluster.fork());
}
cluster.on('message', (_, msg) => {
if (msg.cmd == 'stop') {
children.forEach((child) => {
child.process.kill();
});
}
resolve(msg.out);
});
} else {
let out = f();
process.send({ cmd: 'stop', out });
}
});
The result is then accessible in out:
const getPid = () => {
return process.pid;
};
diversify(() => {
console.log(getPid());
return true;
}).then((out) => {
console.log(out); // true
});
// or using async/await
const diversifyAsync = async () => {
const out = await diversify(() => {
console.log(getPid());
return true;
});
console.log(out); // true
};
diversifyAsync();
