const fs = require('fs') const os = require('os') const md5 = require('md5') const { promisify } = require('util') const PythonWorker = require('./pythonWorker') const readdir = promisify(fs.readdir) const stat = promisify(fs.stat) /** * INTERFACE BACKEND * * Communication with workers (for now only Python) * The workers are implemented as REPL clients that * communicate with JSON. * * Names: * * Worker: a Python file that implements a REPL client (e.g. serial) * * Port: * * Connection: */ const WORKER_DIR = `${__dirname}/python_workers` const HOST = os.hostname() const state = { interfaces: [], ports: [], connections: [], lastScan: { workers: Date.now() - 100000, ports: Date.now() - 100000, connections: Date.now() - 100000, }, } const typeDefs = ` type Interface { id: ID! interfaceName: String! workerScript: WorkerScript! workerProcess: WorkerProcess ports: [Port]! connections: [Connection]! options: [Option!] } type WorkerScript { path: String! mtime: DateTime! updated: DateTime } type WorkerProcess { pid: Int! killed: Boolean! signalCode: String exitCode: Int spawnfile: String! spawnargs: [String]! } type Option { name: String! type: String! description: String values: [String!] } type Port { id: String! interfaceName: String! host: String! device: String name: String description: String } type Connection { id: ID! port: Port! workerProcess: WorkerProcess } input ConnectionCommand { type: String! data: String options: String } extend type Query { interfaces: [Interface]! interface(id: ID, interfaceName: String): Interface! ports(interfaceName: String, force: Boolean): [Port]! port(id: ID!): Port! connections: [Connection]! connection(id: ID!): Connection! } extend type Mutation { connect(portId: ID!): Connection! endConnection(connectionId: ID!): Connection! killConnection(connectionId: ID!): Connection! sendCommand(connectionId: ID!, command: ConnectionCommand!): String! } ` /** * INTERFACE SECTION */ async function findInterfaces() { // 1. Don't check more frequently than once per second. // if (state.lastScan.workers + 1000 > Date.now()) return null // state.lastScan.workers = Date.now() // 2. Find all files in ./python_workers that end in _worker.py const fileNames = await readdir(WORKER_DIR) const workerFiles = fileNames.filter(fileName => fileName.includes('_worker.py') ) // 3. For every worker script const workerPromises = workerFiles.map(async workerFile => { const interfaceName = workerFile.replace(/_worker\.py/, '') const path = `${WORKER_DIR}/${workerFile}` // a. Find out if it was modified. const { mtime } = await stat(path) const workerScript = { path, mtime, updated: null } const foundInterface = state.interfaces.find( iface => iface.interfaceName === interfaceName ) if (foundInterface) { // b. If it was modified, save the modification time. if (foundInterface.workerScript.mtime < mtime) foundInterface.workerScript.updated = mtime return } // c. Spawn a new worker connection. const workerProcess = new PythonWorker(workerScript) const { data, error } = await workerProcess.spawn() if (error) throw new Error(error) // d. Save the worker in the state. state.interfaces.push({ id: md5(`${interfaceName}${workerScript}${mtime}`), interfaceName, workerScript, workerProcess, ports: [], connections: [], options: await options(workerProcess), }) }) await Promise.all(workerPromises) } async function interfaces(parent, args, context, info) { await findInterfaces() return state.interfaces.map(iface => ({ ...iface, ports: ports(iface.interfaceName, args, context, info), // Serialize worker process workerProcess: workerProcess(iface.workerProcess, args, context, info), })) } async function interface(parent, args, context, info) { await findInterfaces() const { id, interfaceName } = args if (!id && !interfaceName) { throw new Error('Either id or interfaceName needs to be provided!') } const iface = state.interfaces.find( iface => iface.id === id || iface.interfaceName === interfaceName ) if (!iface) { throw new Error(`Worker id=${id}, interfaceName=${interfaceName} not found`) } return { ...iface, workerProcess: workerProcess(iface.workerProcess, args, context, info), } } async function options(workerProcess) { const { data, error } = await workerProcess.send({ type: 'options', }) if (error) throw new Error(error) return data } function workerProcess(parent, args, ctx, info) { const { killed, exitCode, signalCode, spawnargs, spawnfile, pid, } = parent.pythonShell return { pid, killed, exitCode, signalCode, spawnfile, spawnargs, } } /** * PORTS SECTION */ async function findPorts() { // 1. Make sure, workers are updated. await findInterfaces() // 2. Don't check more frequently than once per second. // if (state.lastScan.ports + 1000 > Date.now()) return null // state.lastScan.ports = Date.now() // 3. Loop through all workers to find available ports. const portsPromises = state.interfaces.map(async iface => { // a) Ask interface for ports. const { data, error, pythonError } = await iface.workerProcess.send({ type: 'ports', }) if (error) throw new Error(error) // b) Add all ports that are not in the list. data.forEach(port => { const id = port.name || port.device if (state.ports.find(port => port.id === id)) return null const newPort = { id, interfaceName: iface.interfaceName, host: HOST, ...port, } state.ports.push(newPort) iface.ports.push(newPort) }) }) await Promise.all(portsPromises) } async function ports(parent, args, ctx, info) { await findPorts() const { interfaceName } = args const ifName = interfaceName || parent if (ifName) { const iface = state.interfaces.find(iface => iface.interfaceName === ifName) if (!iface) throw new Error(`Interface ${ifName} not found.`) return iface.ports } else { return state.ports } } async function port(parent, args, ctx, info) { await findPorts() const { id } = args if (!id) throw new Error('Need an id.') const port = state.ports.find(port => port.id === id) return port } /** * CONNECTION SECTION */ async function connect(parent, args, ctx, info) { await findPorts() const { portId } = args const port = state.ports.find(port => port.id === portId) if (!port) throw new Error(`Port ${portId} not found`) const iface = state.interfaces.find( iface => iface.interfaceName === port.interfaceName ) const id = md5(iface.interfaceName + iface.device) if (iface.connections.find(connection => connection.id === id)) { throw new Error('already connected.') } const pythonWorker = new PythonWorker(iface.workerScript) const spawnData = await pythonWorker.spawn() if (spawnData.error) throw new Error(spawnData.error) const connection = { id, port, workerProcess: pythonWorker, } const connectionData = await connection.workerProcess.send({ type: 'connect', device: port.device, }) if (connectionData.error) throw new Error(connectionData.error) iface.connections.push(connection) state.connections.push(connection) return { ...connection, workerProcess: workerProcess(pythonWorker), } } async function connections(parent, args, ctx, info) { if (parent) { const iface = state.interfaces.find(iface => iface.interfaceName === parent) return iface.connections.map(connection => { return { ...connection, workerProcess: workerProcess(connection.workerProcess), } }) } else { return state.connections.map(connection => { return { ...connection, workerProcess: workerProcess(connection.workerProcess), } }) } } async function connection(parent, args, context, info) { const connection = state.connections.find( connection => connection.id === args.id ) return { ...connection, workerProcess: workerProcess(connection.workerProcess), } } async function sendCommand(parent, args, ctx, info) { const { connectionId, command } = args const connection = state.connections.find( connection => connection.id === connectionId ) const { data, error } = await connection.workerProcess.send({ ...command }) if (error) throw new Error(JSON.stringify(error)) return data.response } // TODO Also find connections in interfaces. async function endConnection(parent, args, ctx, info) { const { connectionId } = args const connectionIndex = state.connections.findIndex( connection => connection.id === connectionId ) const connection = state.connections[connectionIndex] const iface = state.interfaces.find( iface => (iface.interfaceName = connection.interfaceName) ) const { data, error } = await connection.workerProcess.end() if (error) throw new Error(JSON.stringify(error)) state.connections.splice(connectionIndex, 1) return connection } async function killConnection(parent, args, ctx, info) { const { connectionId } = args const connectionIndex = state.connections.findIndex( connection => connection.id === connectionId ) const connection = state.connections[connectionIndex] const { data, error } = await connection.workerProcess.kill() if (error) throw new Error(JSON.stringify(error)) state.connections.splice(connectionIndex, 1) return connection } const resolvers = { Query: { interfaces, interface, ports, port, connections, connection, }, Mutation: { connect, sendCommand, endConnection, killConnection, }, } module.exports = { typeDefs, resolvers }