123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- let { spawn } = require('child_process')
- const { Lock } = require('semaphore-async-await')
- const { serializeError } = require('./utils')
- function PythonWorker (workerScript, shellOptions) {
- // create a lock for incomming commands
- this.commandLock = new Lock()
- // create a lock for python communication
- this.pythonLock = new Lock()
- this.data = []
- this.error = []
- this.workerScript = workerScript
- // The python shell is started with this.spawn
- this.pythonShell = null
- this.isProcessRunning = () =>
- this.pythonShell &&
- !this.pythonShell.killed &&
- this.pythonShell.exitCode !== null
- this.transaction = async (func, args) => {
- // 1. Block new commands
- await this.commandLock.acquire()
- // 2. Send data to Python shell (lock will be released by data/error event)
- await this.pythonLock.acquire()
- func(args)
- // 3. Wait for data from the Python shell
- await this.pythonLock.acquire()
- const pythonError = this.error.pop()
- const workerResult = this.data.pop()
- this.pythonLock.release()
- // 4. Unblock new commands
- this.commandLock.release()
- // 5. Return result
- return { ...workerResult, ...pythonError }
- }
- // Use send a command to the python worker.
- this.send = command => {
- if (!this.isProcessRunning) return { error: 'Process not running' }
- return this.transaction(command => {
- // console.log('[STDIN]', command)
- this.pythonShell.stdin.write(
- // Write the command as a JSON object, end with a newline!
- JSON.stringify(command) + '\n'
- )
- }, command)
- }
- this.spawn = async () => {
- if (this.isProcessRunning()) return { error: 'Process already running' }
- return this.transaction(() => {
- this.pythonShell = spawn(
- `${process.env.PWD}/${process.env.PYTHON_PATH}`,
- [this.workerScript.path]
- )
- this.pythonShell.stdout.on('data', message => {
- // The python worker returns JSON {data, error}
- const parsed = JSON.parse(message)
- // console.log('[STDOUT]', parsed)
- this.data.push(parsed)
- this.pythonLock.release()
- })
- this.pythonShell.stderr.on('data', error => {
- // console.log('[STDERR]', error)
- this.error.push({ error })
- this.pythonLock.release()
- })
- this.pythonShell.on('close', exitCode => {
- // console.log('[CLOSE]', exitCode)
- this.data.push({ data: exitCode })
- this.pythonLock.release()
- })
- this.pythonShell.on('error', error => {
- // console.log('[ERROR]', error)
- this.error.push({ error })
- this.pythonLock.release()
- })
- })
- }
- this.end = () => {
- if (!this.isProcessRunning) return { error: 'Process not running' }
- return this.transaction(() => {
- this.pythonShell.stdin.end()
- })
- }
- this.kill = signal => {
- if (!this.isProcessRunning) return { error: 'Process not running' }
- return this.transaction(signal => {
- this.pythonShell.kill(signal)
- })
- }
- }
- module.exports = PythonWorker
|