Przeglądaj źródła

improved python worker interface. end and kill dont remove the connection from the interface yet.

Tomi Cvetic 6 lat temu
rodzic
commit
6bb640c8f4

+ 148 - 162
backend/src/interfaces.js

@@ -24,31 +24,35 @@ const WORKER_DIR = `${__dirname}/python_workers`
 const HOST = os.hostname()
 
 const state = {
-  workers: [],
+  interfaces: [],
   ports: [],
   connections: [],
   lastScan: {
     workers: Date.now() - 100000,
     ports: Date.now() - 100000,
-    connections: Date.now() - 100000
-  }
+    connections: Date.now() - 100000,
+  },
 }
 
 const typeDefs = `
-  type Worker {
+  type Interface {
     id: ID!
     interfaceName: String!
-    workerScript: String!
-    mtime: DateTime!
-    updated: DateTime
+    workerScript: WorkerScript!
     workerProcess: WorkerProcess
     ports: [Port]!
     connections: [Connection]!
     options: [Option!]
   }
 
+  type WorkerScript {
+    path: String!
+    mtime: DateTime!
+    updated: DateTime
+  }
+
   type WorkerProcess {
-    pid: Int
+    pid: Int!
     killed: Boolean!
     signalCode: String
     exitCode: Int
@@ -67,45 +71,44 @@ const typeDefs = `
     id: String!
     interfaceName: String!
     host: String!
-    device: String!
+    device: String
     name: String
     description: String
   }
 
   type Connection {
     id: ID!
-    interfaceName: String!
-    host: String!
-    device: String!
+    port: Port!
     workerProcess: WorkerProcess
   }
 
-  type Interface {
-    name: String!
+  input ConnectionCommand {
+    type: String!
+    data: String
+    options: String
   }
 
   extend type Query {
     interfaces: [Interface]!
-    workers: [Worker]!
-    worker(id: ID, interfaceName: String): Worker!
+    interface(id: ID, interfaceName: String): Interface!
     ports(interfaceName: String, force: Boolean): [Port]!
+    port(id: ID!): Port!
     connections: [Connection]!
     connection(id: ID!): Connection!
   }
 
   extend type Mutation {
-    connect(interfaceName: String!, device: String!): Connection!
-    spawnWorker(id: ID!): Worker!
-    endWorker(id: ID!): Worker!
-    killWorker(id: ID!): Worker!
-    connectionCommand(connectionId: ID!, type: String!, string: String!, options: String): String!
+    connect(portId: ID!): Connection!
+    endConnection(connectionId: ID!): Connection!
+    killConnection(connectionId: ID!): Connection!
+    sendCommand(connectionId: ID!, command: ConnectionCommand!): String!
   }
 `
 
 /**
- * WORKER SECTION
+ * INTERFACE SECTION
  */
-async function findWorkers () {
+async function findInterfaces() {
   // 1. Don't check more frequently than once per second.
   // if (state.lastScan.workers + 1000 > Date.now()) return null
   // state.lastScan.workers = Date.now()
@@ -119,106 +122,114 @@ async function findWorkers () {
   // 3. For every worker script
   const workerPromises = workerFiles.map(async workerFile => {
     const interfaceName = workerFile.replace(/_worker\.py/, '')
-    const workerScript = `${WORKER_DIR}/${workerFile}`
+    const path = `${WORKER_DIR}/${workerFile}`
     // a. Find out if it was modified.
-    const { mtime } = await stat(workerScript)
-    const foundWorker = state.workers.find(
-      worker => worker.interfaceName === interfaceName
+    const { mtime } = await stat(path)
+    const workerScript = { path, mtime, updated: null }
+    const foundInterface = state.interfaces.find(
+      iface => iface.interfaceName === interfaceName
     )
-    if (foundWorker) {
+    if (foundInterface) {
       // b. If it was modified, save the modification time.
-      if (foundWorker.mtime < mtime) foundWorker.updated = mtime
+      if (foundInterface.workerScript.mtime < mtime)
+        foundInterface.workerScript.updated = mtime
       return
     }
     // c. Spawn a new worker connection.
     const workerProcess = new PythonWorker(workerScript)
-    const { data, error, pythonError } = await workerProcess.spawn()
+    const { data, error } = await workerProcess.spawn()
     if (error) throw new Error(error)
-    if (pythonError) throw new Error(pythonError)
+    console.log('Spawned', data, error)
     // d. Save the worker in the state.
-    state.workers.push({
+    state.interfaces.push({
       id: md5(`${interfaceName}${workerScript}${mtime}`),
       interfaceName,
       workerScript,
-      mtime,
-      updated: null,
       workerProcess,
       ports: [],
       connections: [],
-      options: []
+      options: await options(workerProcess),
     })
   })
   await Promise.all(workerPromises)
 }
 
-async function workers (parent, args, context, info) {
-  await findWorkers()
-  return state.workers.map(worker => ({
-    ...worker,
-    // Find ports
-    ports: ports(worker.interfaceName, args, context, info),
+async function interfaces(parent, args, context, info) {
+  await findInterfaces()
+  return state.interfaces.map(iface => ({
+    ...iface,
+    ports: ports(iface.interfaceName, args, context, info),
     // Serialize worker process
-    workerProcess: workerProcess(worker.workerProcess, args, context, info)
+    workerProcess: workerProcess(iface.workerProcess, args, context, info),
   }))
 }
 
-async function worker (parent, args, context, info) {
-  await findWorkers()
+async function interface(parent, args, context, info) {
+  await findInterfaces()
   const { id, interfaceName } = args
   if (!id && !interfaceName) {
     throw new Error('Either id or interfaceName needs to be provided!')
   }
-  const worker = state.workers.find(
-    worker => worker.id === id || worker.interfaceName === interfaceName
+  const iface = state.interfaces.find(
+    iface => iface.id === id || iface.interfaceName === interfaceName
   )
-  if (!worker) {
+  if (!iface) {
     throw new Error(`Worker id=${id}, interfaceName=${interfaceName} not found`)
   }
   return {
-    ...worker,
-    workerProcess: workerProcess(worker.workerProcess, args, context, info)
+    ...iface,
+    workerProcess: workerProcess(iface.workerProcess, args, context, info),
   }
 }
 
-function workerProcess (parent, args, ctx, info) {
+async function options(workerProcess) {
+  console.log(workerProcess.send)
+  const { data, error } = await workerProcess.send({
+    type: 'options',
+  })
+  if (error) throw new Error(error)
+  return data
+}
+
+function workerProcess(parent, args, ctx, info) {
   const {
     killed,
     exitCode,
     signalCode,
     spawnargs,
     spawnfile,
-    pid
+    pid,
   } = parent.pythonShell
+  console.log(killed)
   return {
     pid,
     killed,
     exitCode,
     signalCode,
     spawnfile,
-    spawnargs
+    spawnargs,
   }
 }
 
 /**
  * PORTS SECTION
  */
-async function findPorts () {
+async function findPorts() {
   console.log('find ports.')
   // 1. Make sure, workers are updated.
-  await findWorkers()
+  await findInterfaces()
 
   // 2. Don't check more frequently than once per second.
   // if (state.lastScan.ports + 1000 > Date.now()) return null
   // state.lastScan.ports = Date.now()
 
   // 3. Loop through all workers to find available ports.
-  const portsPromises = state.workers.map(async worker => {
-    // a) Ask worker for ports.
-    const { data, error, pythonError } = await worker.workerProcess.send({
-      type: 'ports'
+  const portsPromises = state.interfaces.map(async iface => {
+    // a) Ask interface for ports.
+    const { data, error, pythonError } = await iface.workerProcess.send({
+      type: 'ports',
     })
     if (error) throw new Error(error)
-    if (pythonError) throw new Error(pythonError)
     console.log(data)
     // b) Add all ports that are not in the list.
     data.forEach(port => {
@@ -226,12 +237,12 @@ async function findPorts () {
       if (state.ports.find(port => port.id === id)) return null
       const newPort = {
         id,
-        interfaceName: worker.interfaceName,
-        host: os.hostname(),
-        ...port
+        interfaceName: iface.interfaceName,
+        host: HOST,
+        ...port,
       }
       state.ports.push(newPort)
-      worker.ports.push(newPort)
+      iface.ports.push(newPort)
     })
   })
   await Promise.all(portsPromises)
@@ -239,14 +250,13 @@ async function findPorts () {
   console.log('found ports.')
 }
 
-async function ports (parent, args, ctx, info) {
+async function ports(parent, args, ctx, info) {
   await findPorts()
   const { interfaceName } = args
   const ifName = interfaceName || parent
 
-  console.log(ifName)
   if (ifName) {
-    const iface = state.workers.find(iface => iface.interfaceName === ifName)
+    const iface = state.interfaces.find(iface => iface.interfaceName === ifName)
     if (!iface) throw new Error(`Interface ${ifName} not found.`)
     return iface.ports
   } else {
@@ -254,7 +264,7 @@ async function ports (parent, args, ctx, info) {
   }
 }
 
-async function port (parent, args, ctx, info) {
+async function port(parent, args, ctx, info) {
   await findPorts()
   const { id } = args
   if (!id) throw new Error('Need an id.')
@@ -262,151 +272,127 @@ async function port (parent, args, ctx, info) {
   return port
 }
 
-async function findOptions () {
-  // 1. Make sure, workers are updated.
-  await findWorkers()
-
-  // 2. Don't check more frequently than once per second.
-  // if (state.lastScan.options + 1000 > Date.now()) return null
-  // state.lastScan.options = Date.now()
-
-  const optionPromises = state.workers.map(async worker => {
-    const {
-      data,
-      error,
-      pythonError
-    } = await worker.workerProcess.pythonShell.send({ type: 'options' })
-    if (error) throw new Error(error)
-    if (pythonError) throw new Error(pythonError)
-    iface.options.push(...data)
-  })
-  await Promise.all(optionPromises)
-}
-
-async function options (parent, args, ctx, info) {
-  if (!parent) throw new Error('Need a parent.')
-  const iface = state.interfaces.find(iface => iface.interfaceName === parent)
-
-  // Try to find options if necessary
-  if (!iface.options.length) await findOptions(parent)
-  return iface.options
-}
-
-async function connect (parent, args, ctx, info) {
-  const { interfaceName, device } = args
+/**
+ * CONNECTION SECTION
+ */
+async function connect(parent, args, ctx, info) {
+  await findPorts()
+  const { portId } = args
+  const port = state.ports.find(port => port.id === portId)
+  if (!port) throw new Error(`Port ${portId} not found`)
   const iface = state.interfaces.find(
-    iface => iface.interfaceName === interfaceName
+    iface => iface.interfaceName === port.interfaceName
   )
-  const id = md5(interfaceName + device)
+  const id = md5(iface.interfaceName + iface.device)
   if (iface.connections.find(connection => connection.id === id)) {
     throw new Error('already connected.')
   }
   const pythonWorker = new PythonWorker(iface.workerScript)
+  console.log(pythonWorker)
   const spawnData = await pythonWorker.spawn()
   if (spawnData.error) throw new Error(spawnData.error)
   const connection = {
     id,
-    device,
-    interfaceName,
-    host: HOST,
-    worker: pythonWorker,
-    workerProcess: (parent, args, context, info) =>
-      workerProcess(pythonWorker, args, context, info)
+    port,
+    workerProcess: pythonWorker,
   }
-  const connectionData = await connection.worker.send({
+  const connectionData = await connection.workerProcess.send({
     type: 'connect',
-    device
+    device: port.device,
   })
   if (connectionData.error) throw new Error(connectionData.error)
-  if (connectionData.pythonError) throw new Error(connectionData.pythonError)
   iface.connections.push(connection)
   state.connections.push(connection)
-  return connection
+  console.log({ ...connection, workerProcess: workerProcess(pythonWorker) })
+  return {
+    ...connection,
+    workerProcess: workerProcess(pythonWorker),
+  }
 }
 
-async function connections (parent, args, ctx, info) {
+async function connections(parent, args, ctx, info) {
   if (parent) {
     const iface = state.interfaces.find(iface => iface.interfaceName === parent)
-    return iface.connections
+    return iface.connections.map(connection => {
+      return {
+        ...connection,
+        workerProcess: workerProcess(connection.workerProcess),
+      }
+    })
   } else {
-    return state.connections
+    return state.connections.map(connection => {
+      return {
+        ...connection,
+        workerProcess: workerProcess(connection.workerProcess),
+      }
+    })
   }
 }
 
-async function connection (parent, args, context, info) {
+async function connection(parent, args, context, info) {
   const connection = state.connections.find(
     connection => connection.id === args.id
   )
-  return connection
+  return {
+    ...connection,
+    workerProcess: workerProcess(connection.workerProcess),
+  }
 }
 
-async function connectionCommand (parent, args, ctx, info) {
-  const { connectionId, type, string, options } = args
+async function sendCommand(parent, args, ctx, info) {
+  const { connectionId, command } = args
   const connection = state.connections.find(
     connection => connection.id === connectionId
   )
-  const { data, error, pythonError } = await connection.worker.send({
-    type,
-    string,
-    options
-  })
+  console.log({ ...command })
+  const { data, error } = await connection.workerProcess.send({ ...command })
   if (error) throw new Error(JSON.stringify(error))
-  if (pythonError) throw new Error(JSON.stringify(pythonError))
   return data.response
 }
 
 // TODO Also find connections in interfaces.
-async function endWorker (parent, args, ctx, info) {
-  const { id } = args
-  const connection = state.connections.find(connection => connection.id === id)
-  const { data, error, pythonError } = await connection.worker.end()
-  if (error) throw new Error(JSON.stringify(error))
-  if (pythonError.error !== 0) throw new Error(JSON.stringify(pythonError))
-  return connection.workerProcess()
-}
-
-async function spawnWorker (parent, args, ctx, info) {
-  const { id } = args
-  const connection = state.connections.find(connection => connection.id === id)
-  const { data, error, pythonError } = await connection.worker.spawn()
-  console.log(data, error, pythonError)
+async function endConnection(parent, args, ctx, info) {
+  const { connectionId } = args
+  const connectionIndex = state.connections.findIndex(
+    connection => connection.id === connectionId
+  )
+  const connection = state.connections[connectionIndex]
+  const iface = state.interfaces.find(
+    iface => (iface.interfaceName = connection.interfaceName)
+  )
+  const { data, error } = await connection.workerProcess.end()
   if (error) throw new Error(JSON.stringify(error))
-  if (pythonError) throw new Error(JSON.stringify(pythonError))
-  const connectionData = await connection.worker.send({
-    type: 'connect',
-    device: connection.device
-  })
-  if (connectionData.error) throw new Error(connectionData.error)
-  if (connectionData.pythonError) throw new Error(connectionData.pythonError)
-  return connection.workerProcess()
+  state.connections.splice(connectionIndex, 1)
+  return connection
 }
 
-async function killWorker (parent, args, ctx, info) {
-  const { id } = args
-  const connection = state.connections.find(connection => connection.id === id)
-  const { data, error, pythonError } = await connection.worker.kill()
-  console.log(data, error, pythonError)
+async function killConnection(parent, args, ctx, info) {
+  const { connectionId } = args
+  const connectionIndex = state.connections.findIndex(
+    connection => connection.id === connectionId
+  )
+  const connection = state.connections[connectionIndex]
+  const { data, error } = await connection.workerProcess.kill()
   if (error) throw new Error(JSON.stringify(error))
-  if (pythonError) throw new Error(JSON.stringify(pythonError))
-  return connection.workerProcess()
+  state.connections.splice(connectionIndex, 1)
+  return connection
 }
 
 const resolvers = {
   Query: {
-    interfaces: () => [{ name: 'serial' }, { name: 'usbtmc' }],
-    workers,
-    worker,
+    interfaces,
+    interface,
     ports,
+    port,
     connections,
-    connection
+    connection,
   },
   Mutation: {
     connect,
-    connectionCommand,
-    killWorker,
-    endWorker,
-    spawnWorker
-  }
+    sendCommand,
+    endConnection,
+    killConnection,
+  },
 }
 
 module.exports = { typeDefs, resolvers }

+ 26 - 58
backend/src/pythonWorker.js

@@ -13,6 +13,11 @@ function PythonWorker (workerScript, shellOptions) {
   // The python shell is started with this.spawn
   this.pythonShell = null
 
+  this.isProcessRunning = () =>
+    this.pythonShell &&
+    !this.pythonShell.killed &&
+    this.pythonShell.exitCode !== null
+
   this.transaction = async (func, args) => {
     // 1. Block new commands
     await this.commandLock.acquire()
@@ -27,14 +32,9 @@ function PythonWorker (workerScript, shellOptions) {
     // 4. Unblock new commands
     this.commandLock.release()
     // 5. Return result
-    return { ...workerResult, pythonError }
+    return { ...workerResult, ...pythonError }
   }
 
-  this.isProcessRunning = () =>
-    this.pythonShell &&
-    !this.pythonShell.killed &&
-    this.pythonShell.exitCode !== null
-
   // Use send a command to the python worker.
   this.send = command => {
     if (!this.isProcessRunning) return { error: 'Process not running' }
@@ -53,78 +53,46 @@ function PythonWorker (workerScript, shellOptions) {
     return this.transaction(() => {
       this.pythonShell = spawn(
         `${process.env.PWD}/${process.env.PYTHON_PATH}`,
-        [this.workerScript]
+        [this.workerScript.path]
       )
       this.pythonShell.stdout.on('data', message => {
+        console.log('[PYSTDOUT]', message)
         // The python worker returns JSON {data, error}
         this.data.push(JSON.parse(message))
         this.pythonLock.release()
       })
       this.pythonShell.stderr.on('data', error => {
-        this.error.push(serializeError(new Error(error)))
+        console.log('[PYSTDERR]', error)
+        this.error.push({ error })
         this.pythonLock.release()
       })
       this.pythonShell.on('close', error => {
-        this.error.push(serializeError(new Error(error)))
+        console.log('[PYCLOSE]', error)
+        this.error.push({ error })
         this.pythonLock.release()
       })
       this.pythonShell.on('error', error => {
-        this.error.push(serializeError(error))
+        console.log('[PYERROR]', error)
+        this.error.push({ error })
         this.pythonLock.release()
       })
     })
   }
 
-  this.end = async () => {
-    if (this.pythonShell.killed) {
-      return { error: `process ${this.pythonShell.pid} already killed.` }
-    }
-    if (this.pythonShell.exitCode !== null) {
-      return {
-        error: `process ${this.pythonShell.pid} already exited with code ${
-          this.pythonShell.exitCode
-        }`
-      }
-    }
-    await this.commandLock.acquire()
-    await this.pythonLock.acquire()
-    this.pythonShell.stdin.end()
-    await this.pythonLock.acquire()
-    const pythonError = this.error.pop()
-    if (pythonError) {
-      this.pythonLock.release()
-      this.commandLock.release()
-      return { pythonError }
-    }
-    const { data, error } = this.data.pop()
-    this.pythonLock.release()
-    this.commandLock.release()
-    return { data, error }
+  this.end = () => {
+    if (!this.isProcessRunning) return { error: 'Process not running' }
+
+    return this.transaction(() => {
+      this.pythonShell.stdin.end()
+    })
   }
 
-  this.kill = async () => {
-    if (this.pythonShell.killed) {
-      return { error: `process ${this.pythonShell.pid} already killed.` }
-    }
-    if (this.pythonShell.exitCode !== null) {
-      return {
-        error: `process ${this.pythonShell.pid} already exited with code ${
-          this.pythonShell.exitCode
-        }`
-      }
-    }
-    await this.commandLock.acquire()
-    await this.pythonLock.acquire()
-    const pythonError = this.error.pop()
-    if (pythonError) {
-      this.pythonLock.release()
-      this.commandLock.release()
-      return { pythonError }
-    }
-    const { data, error } = this.data.pop()
-    this.pythonLock.release()
-    this.commandLock.release()
-    return { data, error }
+  this.kill = signal => {
+    if (!this.isProcessRunning) return { error: 'Process not running' }
+
+    return this.transaction(signal => {
+      this.pythonShell.kill(signal)
+    })
   }
 }
 

+ 2 - 2
backend/src/python_workers/serial_worker.py

@@ -116,9 +116,9 @@ for line in sys.stdin:
         elif command['type'] == 'read':
             res = worker.read()
         elif command['type'] == 'write':
-            res = worker.write(command['string'])
+            res = worker.write(command['data'])
         elif command['type'] == 'ask':
-            res = worker.ask(command['string'])
+            res = worker.ask(command['data'])
         print(json.dumps({"data": res}), flush=True)
     except Exception as error:
         print(handle_exception(error), flush=True)

+ 4 - 4
backend/src/python_workers/test_worker.py

@@ -52,8 +52,8 @@ class TestWorker:
     @staticmethod
     def ports():
         driverPorts = [
-            {'name': 'Port 22', 'device': 22},
-            {'name': 'Port 80', 'device': 80},
+            {'name': 'Port 22', 'device': '22'},
+            {'name': 'Port 80', 'device': '80'},
         ]
 
         ports = []
@@ -120,9 +120,9 @@ for line in sys.stdin:
         elif command['type'] == 'read':
             data = worker.read()
         elif command['type'] == 'write':
-            data = worker.write(command['string'])
+            data = worker.write(command['data'])
         elif command['type'] == 'ask':
-            data = worker.ask(command['string'])
+            data = worker.ask(command['data'])
         print(json.dumps({"data": data}), flush=True)
     except Exception as error:
         print(handle_exception(error), flush=True)

+ 2 - 2
backend/src/python_workers/usbtmc_worker.py

@@ -89,9 +89,9 @@ for line in sys.stdin:
         elif command['type'] == 'read':
             res = worker.read()
         elif command['type'] == 'write':
-            res = worker.write(command['string'])
+            res = worker.write(command['data'])
         elif command['type'] == 'ask':
-            res = worker.ask(command['string'])
+            res = worker.ask(command['data'])
         print(json.dumps({"data": res}), flush=True)
     except Exception as error:
         print(handle_exception(error), flush=True)

+ 10 - 8
backend/src/utils.js

@@ -1,12 +1,14 @@
 function serializeError (error) {
-  return JSON.stringify({
-    name: error.name,
-    message: error.message,
-    stack: error.stack,
-    filename: error.filename,
-    lineNumber: error.lineNumber,
-    columnNumber: error.columnNumber
-  })
+  return {
+    error: JSON.stringify({
+      name: error.name,
+      message: error.message,
+      stack: error.stack,
+      filename: error.filename,
+      lineNumber: error.lineNumber,
+      columnNumber: error.columnNumber
+    })
+  }
 }
 
 module.exports = { serializeError }