let { spawn } = require('child_process') const { Lock } = require('semaphore-async-await') const { serializeError } = require('./utils') function PythonWorker (workerScript, shellOptions) { // create a lock for incomming commands this.commandLock = new Lock() // create a lock for python communication this.pythonLock = new Lock() this.data = [] this.error = [] this.workerScript = workerScript // The python shell is started with this.spawn this.pythonShell = null this.isProcessRunning = () => this.pythonShell && !this.pythonShell.killed && this.pythonShell.exitCode !== null this.transaction = async (func, args) => { // 1. Block new commands await this.commandLock.acquire() // 2. Send data to Python shell (lock will be released by data/error event) await this.pythonLock.acquire() func(args) // 3. Wait for data from the Python shell await this.pythonLock.acquire() const pythonError = this.error.pop() const workerResult = this.data.pop() this.pythonLock.release() // 4. Unblock new commands this.commandLock.release() // 5. Return result return { ...workerResult, ...pythonError } } // Use send a command to the python worker. this.send = command => { if (!this.isProcessRunning) return { error: 'Process not running' } return this.transaction(command => { // console.log('[STDIN]', command) this.pythonShell.stdin.write( // Write the command as a JSON object, end with a newline! JSON.stringify(command) + '\n' ) }, command) } this.spawn = async () => { if (this.isProcessRunning()) return { error: 'Process already running' } return this.transaction(() => { this.pythonShell = spawn( `${process.env.PWD}/${process.env.PYTHON_PATH}`, [this.workerScript.path] ) this.pythonShell.stdout.on('data', message => { // The python worker returns JSON {data, error} const parsed = JSON.parse(message) // console.log('[STDOUT]', parsed) this.data.push(parsed) this.pythonLock.release() }) this.pythonShell.stderr.on('data', error => { // console.log('[STDERR]', error) this.error.push({ error }) this.pythonLock.release() }) this.pythonShell.on('close', exitCode => { // console.log('[CLOSE]', exitCode) this.data.push({ data: exitCode }) this.pythonLock.release() }) this.pythonShell.on('error', error => { // console.log('[ERROR]', error) this.error.push({ error }) this.pythonLock.release() }) }) } this.end = () => { if (!this.isProcessRunning) return { error: 'Process not running' } return this.transaction(() => { this.pythonShell.stdin.end() }) } this.kill = signal => { if (!this.isProcessRunning) return { error: 'Process not running' } return this.transaction(signal => { this.pythonShell.kill(signal) }) } } module.exports = PythonWorker