const fs = require('fs') const os = require('os') const md5 = require('md5') const { promisify } = require('util') const PythonWorker = require('./pythonWorker') const readdir = promisify(fs.readdir) const state = { workers: [], interfaces: [], ports: [], connections: [] } const typeDefs = ` type Option { name: String! type: String! description: String values: [String!] } type Port { id: String! interfaceName: String! host: String! device: String! name: String description: String } type Worker { pid: Int killed: Boolean! signalCode: String exitCode: Int spawnfile: String! spawnargs: [String]! error: Int! data: Int! } type Connection { id: ID! interfaceName: String! host: String! device: String! workerInfo: Worker } type Interface { interfaceName: String! host: String! workerScript: String! workerInfo: Worker ports: [Port]! connections: [Connection]! options: [Option]! } extend type Query { interfaces(force: Boolean): [Interface]! ports(interfaceName: String, force: Boolean): [Port]! connections: [Connection]! connection(id: ID!): Connection! } extend type Mutation { connect(interfaceName: String!, device: String!): Connection! spawnWorker(id: ID!): Worker! endWorker(id: ID!): Worker! killWorker(id: ID!): Worker! connectionCommand(connectionId: ID!, type: String!, string: String!, options: String): String! } ` async function findWorkers() { // Find all files in ./python_workers that end in _worker.py const fileNames = await readdir(`${__dirname}/python_workers`) const workerFiles = fileNames.filter(fileName => fileName.includes('_worker.py')) // Find the added workers workerFiles.forEach(workerFile => { const interfaceName = workerFile.replace(/_worker\.py/, '') if (state.workers.find(worker => worker.interfaceName === interfaceName)) return null const workerScript = `${__dirname}/python_workers/${workerFile}` state.workers.push({ interfaceName, workerScript }) }) } async function findInterfaces() { // Try to identify workers if necessary //if (!state.workers.length) await findWorkers() // Generate an interface for each worker const workerPromises = state.workers.map(async worker => { const { workerScript, interfaceName } = worker // Skip existing interfaces if (state.interfaces.find(iface => iface.interfaceName === interfaceName)) return null const pythonWorker = new PythonWorker(workerScript) const { res, error } = await pythonWorker.spawn() if (error) { return } state.interfaces.push({ interfaceName, workerScript, worker: pythonWorker, ports: [], connections: [], options: [] }) }) await Promise.all(workerPromises) } async function interfaces(parent, args, ctx, info) { const { force } = args // Try to identify interfaces if necessary //if (!state.interfaces.length || force) await findInterfaces() return state.interfaces.map(iface => { const { workerScript, interfaceName, worker } = iface return { interfaceName, host: os.hostname(), workerScript, workerInfo: workerInfo(worker, args, ctx, info), ports: ports(interfaceName, args, ctx, info), connections: connections(interfaceName, args, ctx, info), options: options(interfaceName, args, ctx, info) } }) } async function findPorts(interfaceName) { // Generate all ports for the interface const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName) const { data, error, pythonError } = await iface.worker.send({ type: 'ports' }) if (error) throw new Error(error) if (pythonError) throw new Error(pythonError) data.forEach(port => { const id = port.name || port.device // Skip existing ports if (state.ports.find(port => port.id === id)) return null const newPort = { id, interfaceName, host: os.hostname(), ...port } state.ports.push(newPort) iface.ports.push(newPort) }) } async function ports(parent, args, ctx, info) { const { force, interfaceName } = args const ifName = interfaceName || parent if (ifName) { const iface = state.interfaces.find(iface => iface.interfaceName === ifName) // Try to find ports if necessary if (!iface.ports.length || force) await findPorts(ifName) return iface.ports } else { return state.ports } } async function findOptions(interfaceName) { const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName) const { data, error, pythonError } = await iface.worker.send({ type: 'options' }) if (error) throw new Error(error) if (pythonError) throw new Error(pythonError) iface.options.push(...data) } async function options(parent, args, ctx, info) { const iface = state.interfaces.find(iface => iface.interfaceName === parent) // Try to find options if necessary if (!iface.options.length) await findOptions(parent) return iface.options } function workerInfo(parent, args, ctx, info) { const { killed, exitCode, signalCode, spawnargs, spawnfile, pid } = parent.pythonShell return { pid, killed, exitCode, signalCode, spawnfile, spawnargs, error: parent.error.length, data: parent.data.length } } async function connections(parent, args, ctx, info) { if (parent) { const iface = state.interfaces.find(iface => iface.interfaceName === parent) return iface.connections } else { return state.connections } } async function connection(parent, args, context, info) { const connection = state.connections.find(connection => connection.id === args.id) return connection } async function connect(parent, args, ctx, info) { const { interfaceName, device } = args const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName) const id = md5(interfaceName + device) if (iface.connections.find(connection => connection.id === id)) throw new Error('already connected.') const pythonWorker = new PythonWorker(iface.workerScript) const spawnData = await pythonWorker.spawn() if (spawnData.error) throw new Error(spawnData.error) const connection = { id, device, interfaceName, host: os.hostname(), worker: pythonWorker, workerInfo: (parent, args, context, info) => workerInfo(pythonWorker, args, context, info) } const connectionData = await connection.worker.send({ type: 'connect', device }) if (connectionData.error) throw new Error(connectionData.error) if (connectionData.pythonError) throw new Error(connectionData.pythonError) iface.connections.push(connection) state.connections.push(connection) return connection } async function connectionCommand(parent, args, ctx, info) { const { connectionId, type, string, options } = args const connection = state.connections.find(connection => connection.id === connectionId) const { data, error, pythonError } = await connection.worker.send({ type, string, options }) if (error) throw new Error(JSON.stringify(error)) if (pythonError) throw new Error(JSON.stringify(pythonError)) return data.response } //TODO Also find connections in interfaces. async function endWorker(parent, args, ctx, info) { const { id } = args const connection = state.connections.find(connection => connection.id === id) const { data, error, pythonError } = await connection.worker.end() if (error) throw new Error(JSON.stringify(error)) if (pythonError.error !== 0) throw new Error(JSON.stringify(pythonError)) return connection.workerInfo() } async function spawnWorker(parent, args, ctx, info) { const { id } = args const connection = state.connections.find(connection => connection.id === id) const { data, error, pythonError } = await connection.worker.spawn() console.log(data, error, pythonError) if (error) throw new Error(JSON.stringify(error)) if (pythonError) throw new Error(JSON.stringify(pythonError)) const connectionData = await connection.worker.send({ type: 'connect', device: connection.device }) if (connectionData.error) throw new Error(connectionData.error) if (connectionData.pythonError) throw new Error(connectionData.pythonError) return connection.workerInfo() } async function killWorker(parent, args, ctx, info) { const { id } = args const connection = state.connections.find(connection => connection.id === id) const { data, error, pythonError } = await connection.worker.kill() console.log(data, error, pythonError) if (error) throw new Error(JSON.stringify(error)) if (pythonError) throw new Error(JSON.stringify(pythonError)) return connection.workerInfo() } const resolvers = { Query: { interfaces, ports, connections, connection }, Mutation: { connect, connectionCommand, killWorker, endWorker, spawnWorker } } module.exports = { typeDefs, resolvers }