123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- const fs = require('fs')
- const os = require('os')
- const md5 = require('md5')
- const { promisify } = require('util')
- const PythonWorker = require('./pythonWorker')
- const readdir = promisify(fs.readdir)
- const stat = promisify(fs.stat)
- /**
- * INTERFACE BACKEND
- *
- * Communication with workers (for now only Python)
- * The workers are implemented as REPL clients that
- * communicate with JSON.
- *
- * Names:
- * * Worker: a Python file that implements a REPL client (e.g. serial)
- * * Port:
- * * Connection:
- */
- const WORKER_DIR = `${__dirname}/python_workers`
- const HOST = os.hostname()
- const state = {
- interfaces: [],
- ports: [],
- connections: [],
- lastScan: {
- workers: Date.now() - 100000,
- ports: Date.now() - 100000,
- connections: Date.now() - 100000,
- },
- }
- const typeDefs = `
- type Interface {
- id: ID!
- interfaceName: String!
- workerScript: WorkerScript!
- workerProcess: WorkerProcess
- ports: [Port]!
- connections: [Connection]!
- options: [Option!]
- }
- type WorkerScript {
- path: String!
- mtime: DateTime!
- updated: DateTime
- }
- type WorkerProcess {
- pid: Int!
- killed: Boolean!
- signalCode: String
- exitCode: Int
- spawnfile: String!
- spawnargs: [String]!
- }
- type Option {
- name: String!
- type: String!
- description: String
- values: [String!]
- }
- type Port {
- id: String!
- interfaceName: String!
- host: String!
- device: String
- name: String
- description: String
- }
- type Connection {
- id: ID!
- port: Port!
- workerProcess: WorkerProcess
- }
- input ConnectionCommand {
- type: String!
- data: String
- options: String
- }
- extend type Query {
- interfaces: [Interface]!
- interface(id: ID, interfaceName: String): Interface!
- ports(interfaceName: String, force: Boolean): [Port]!
- port(id: ID!): Port!
- connections: [Connection]!
- connection(id: ID!): Connection!
- }
- extend type Mutation {
- connect(portId: ID!): Connection!
- endConnection(connectionId: ID!): Connection!
- killConnection(connectionId: ID!): Connection!
- sendCommand(connectionId: ID!, command: ConnectionCommand!): String!
- }
- `
- /**
- * INTERFACE SECTION
- */
- async function findInterfaces() {
- // 1. Don't check more frequently than once per second.
- // if (state.lastScan.workers + 1000 > Date.now()) return null
- // state.lastScan.workers = Date.now()
- // 2. Find all files in ./python_workers that end in _worker.py
- const fileNames = await readdir(WORKER_DIR)
- const workerFiles = fileNames.filter(fileName =>
- fileName.includes('_worker.py')
- )
- // 3. For every worker script
- const workerPromises = workerFiles.map(async workerFile => {
- const interfaceName = workerFile.replace(/_worker\.py/, '')
- const path = `${WORKER_DIR}/${workerFile}`
- // a. Find out if it was modified.
- const { mtime } = await stat(path)
- const workerScript = { path, mtime, updated: null }
- const foundInterface = state.interfaces.find(
- iface => iface.interfaceName === interfaceName
- )
- if (foundInterface) {
- // b. If it was modified, save the modification time.
- if (foundInterface.workerScript.mtime < mtime)
- foundInterface.workerScript.updated = mtime
- return
- }
- // c. Spawn a new worker connection.
- const workerProcess = new PythonWorker(workerScript)
- const { data, error } = await workerProcess.spawn()
- if (error) throw new Error(error)
- // d. Save the worker in the state.
- state.interfaces.push({
- id: md5(`${interfaceName}${workerScript}${mtime}`),
- interfaceName,
- workerScript,
- workerProcess,
- ports: [],
- connections: [],
- options: await options(workerProcess),
- })
- })
- await Promise.all(workerPromises)
- }
- async function interfaces(parent, args, context, info) {
- await findInterfaces()
- return state.interfaces.map(iface => ({
- ...iface,
- ports: ports(iface.interfaceName, args, context, info),
- // Serialize worker process
- workerProcess: workerProcess(iface.workerProcess, args, context, info),
- }))
- }
- async function interface(parent, args, context, info) {
- await findInterfaces()
- const { id, interfaceName } = args
- if (!id && !interfaceName) {
- throw new Error('Either id or interfaceName needs to be provided!')
- }
- const iface = state.interfaces.find(
- iface => iface.id === id || iface.interfaceName === interfaceName
- )
- if (!iface) {
- throw new Error(`Worker id=${id}, interfaceName=${interfaceName} not found`)
- }
- return {
- ...iface,
- workerProcess: workerProcess(iface.workerProcess, args, context, info),
- }
- }
- async function options(workerProcess) {
- const { data, error } = await workerProcess.send({
- type: 'options',
- })
- if (error) throw new Error(error)
- return data
- }
- function workerProcess(parent, args, ctx, info) {
- const {
- killed,
- exitCode,
- signalCode,
- spawnargs,
- spawnfile,
- pid,
- } = parent.pythonShell
- return {
- pid,
- killed,
- exitCode,
- signalCode,
- spawnfile,
- spawnargs,
- }
- }
- /**
- * PORTS SECTION
- */
- async function findPorts() {
- // 1. Make sure, workers are updated.
- await findInterfaces()
- // 2. Don't check more frequently than once per second.
- // if (state.lastScan.ports + 1000 > Date.now()) return null
- // state.lastScan.ports = Date.now()
- // 3. Loop through all workers to find available ports.
- const portsPromises = state.interfaces.map(async iface => {
- // a) Ask interface for ports.
- const { data, error, pythonError } = await iface.workerProcess.send({
- type: 'ports',
- })
- if (error) throw new Error(error)
- // b) Add all ports that are not in the list.
- data.forEach(port => {
- const id = port.name || port.device
- if (state.ports.find(port => port.id === id)) return null
- const newPort = {
- id,
- interfaceName: iface.interfaceName,
- host: HOST,
- ...port,
- }
- state.ports.push(newPort)
- iface.ports.push(newPort)
- })
- })
- await Promise.all(portsPromises)
- }
- async function ports(parent, args, ctx, info) {
- await findPorts()
- const { interfaceName } = args
- const ifName = interfaceName || parent
- if (ifName) {
- const iface = state.interfaces.find(iface => iface.interfaceName === ifName)
- if (!iface) throw new Error(`Interface ${ifName} not found.`)
- return iface.ports
- } else {
- return state.ports
- }
- }
- async function port(parent, args, ctx, info) {
- await findPorts()
- const { id } = args
- if (!id) throw new Error('Need an id.')
- const port = state.ports.find(port => port.id === id)
- return port
- }
- /**
- * CONNECTION SECTION
- */
- async function connect(parent, args, ctx, info) {
- await findPorts()
- const { portId } = args
- const port = state.ports.find(port => port.id === portId)
- if (!port) throw new Error(`Port ${portId} not found`)
- const iface = state.interfaces.find(
- iface => iface.interfaceName === port.interfaceName
- )
- const id = md5(iface.interfaceName + iface.device)
- if (iface.connections.find(connection => connection.id === id)) {
- throw new Error('already connected.')
- }
- const pythonWorker = new PythonWorker(iface.workerScript)
- const spawnData = await pythonWorker.spawn()
- if (spawnData.error) throw new Error(spawnData.error)
- const connection = {
- id,
- port,
- workerProcess: pythonWorker,
- }
- const connectionData = await connection.workerProcess.send({
- type: 'connect',
- device: port.device,
- })
- if (connectionData.error) throw new Error(connectionData.error)
- iface.connections.push(connection)
- state.connections.push(connection)
- return {
- ...connection,
- workerProcess: workerProcess(pythonWorker),
- }
- }
- async function connections(parent, args, ctx, info) {
- if (parent) {
- const iface = state.interfaces.find(iface => iface.interfaceName === parent)
- return iface.connections.map(connection => {
- return {
- ...connection,
- workerProcess: workerProcess(connection.workerProcess),
- }
- })
- } else {
- return state.connections.map(connection => {
- return {
- ...connection,
- workerProcess: workerProcess(connection.workerProcess),
- }
- })
- }
- }
- async function connection(parent, args, context, info) {
- const connection = state.connections.find(
- connection => connection.id === args.id
- )
- return {
- ...connection,
- workerProcess: workerProcess(connection.workerProcess),
- }
- }
- async function sendCommand(parent, args, ctx, info) {
- const { connectionId, command } = args
- const connection = state.connections.find(
- connection => connection.id === connectionId
- )
- const { data, error } = await connection.workerProcess.send({ ...command })
- if (error) throw new Error(JSON.stringify(error))
- return data.response
- }
- // TODO Also find connections in interfaces.
- async function endConnection(parent, args, ctx, info) {
- const { connectionId } = args
- const connectionIndex = state.connections.findIndex(
- connection => connection.id === connectionId
- )
- const connection = state.connections[connectionIndex]
- const iface = state.interfaces.find(
- iface => (iface.interfaceName = connection.interfaceName)
- )
- const { data, error } = await connection.workerProcess.end()
- if (error) throw new Error(JSON.stringify(error))
- state.connections.splice(connectionIndex, 1)
- return connection
- }
- async function killConnection(parent, args, ctx, info) {
- const { connectionId } = args
- const connectionIndex = state.connections.findIndex(
- connection => connection.id === connectionId
- )
- const connection = state.connections[connectionIndex]
- const { data, error } = await connection.workerProcess.kill()
- if (error) throw new Error(JSON.stringify(error))
- state.connections.splice(connectionIndex, 1)
- return connection
- }
- const resolvers = {
- Query: {
- interfaces,
- interface,
- ports,
- port,
- connections,
- connection,
- },
- Mutation: {
- connect,
- sendCommand,
- endConnection,
- killConnection,
- },
- }
- module.exports = { typeDefs, resolvers }
|