pythonWorker.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. let { spawn } = require('child_process')
  2. const { Lock } = require('semaphore-async-await')
  3. const { serializeError } = require('./utils')
  4. function PythonWorker (workerScript, shellOptions) {
  5. // create a lock for incomming commands
  6. this.commandLock = new Lock()
  7. // create a lock for python communication
  8. this.pythonLock = new Lock()
  9. this.data = []
  10. this.error = []
  11. this.workerScript = workerScript
  12. // The python shell is started with this.spawn
  13. this.pythonShell = null
  14. this.isProcessRunning = () =>
  15. this.pythonShell &&
  16. !this.pythonShell.killed &&
  17. this.pythonShell.exitCode !== null
  18. this.transaction = async (func, args) => {
  19. // 1. Block new commands
  20. await this.commandLock.acquire()
  21. // 2. Send data to Python shell (lock will be released by data/error event)
  22. await this.pythonLock.acquire()
  23. func(args)
  24. // 3. Wait for data from the Python shell
  25. await this.pythonLock.acquire()
  26. const pythonError = this.error.pop()
  27. const workerResult = this.data.pop()
  28. this.pythonLock.release()
  29. // 4. Unblock new commands
  30. this.commandLock.release()
  31. // 5. Return result
  32. return { ...workerResult, ...pythonError }
  33. }
  34. // Use send a command to the python worker.
  35. this.send = command => {
  36. if (!this.isProcessRunning) return { error: 'Process not running' }
  37. return this.transaction(command => {
  38. // console.log('[STDIN]', command)
  39. this.pythonShell.stdin.write(
  40. // Write the command as a JSON object, end with a newline!
  41. JSON.stringify(command) + '\n'
  42. )
  43. }, command)
  44. }
  45. this.spawn = async () => {
  46. if (this.isProcessRunning()) return { error: 'Process already running' }
  47. return this.transaction(() => {
  48. this.pythonShell = spawn(
  49. `${process.env.PWD}/${process.env.PYTHON_PATH}`,
  50. [this.workerScript.path]
  51. )
  52. this.pythonShell.stdout.on('data', message => {
  53. // The python worker returns JSON {data, error}
  54. const parsed = JSON.parse(message)
  55. // console.log('[STDOUT]', parsed)
  56. this.data.push(parsed)
  57. this.pythonLock.release()
  58. })
  59. this.pythonShell.stderr.on('data', error => {
  60. // console.log('[STDERR]', error)
  61. this.error.push({ error })
  62. this.pythonLock.release()
  63. })
  64. this.pythonShell.on('close', exitCode => {
  65. // console.log('[CLOSE]', exitCode)
  66. this.data.push({ data: exitCode })
  67. this.pythonLock.release()
  68. })
  69. this.pythonShell.on('error', error => {
  70. // console.log('[ERROR]', error)
  71. this.error.push({ error })
  72. this.pythonLock.release()
  73. })
  74. })
  75. }
  76. this.end = () => {
  77. if (!this.isProcessRunning) return { error: 'Process not running' }
  78. return this.transaction(() => {
  79. this.pythonShell.stdin.end()
  80. })
  81. }
  82. this.kill = signal => {
  83. if (!this.isProcessRunning) return { error: 'Process not running' }
  84. return this.transaction(signal => {
  85. this.pythonShell.kill(signal)
  86. })
  87. }
  88. }
  89. module.exports = PythonWorker