123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- const fs = require('fs')
- const os = require('os')
- const md5 = require('md5')
- const { promisify } = require('util')
- const PythonWorker = require('./pythonWorker')
- const readdir = promisify(fs.readdir)
- const state = {
- workers: [],
- interfaces: [],
- ports: [],
- connections: []
- }
- const typeDefs = `
- type Option {
- name: String!
- type: String!
- description: String
- values: [String!]
- }
- type Port {
- id: String!
- interfaceName: String!
- host: String!
- device: String!
- name: String
- description: String
- }
- type Worker {
- pid: Int
- killed: Boolean!
- signalCode: String
- exitCode: Int
- spawnfile: String!
- spawnargs: [String]!
- error: Int!
- data: Int!
- }
- type Connection {
- id: ID!
- interfaceName: String!
- host: String!
- device: String!
- workerInfo: Worker
- }
- type Interface {
- interfaceName: String!
- host: String!
- workerScript: String!
- workerInfo: Worker
- ports: [Port]!
- connections: [Connection]!
- options: [Option]!
- }
- extend type Query {
- interfaces(force: Boolean): [Interface]!
- ports(interfaceName: String, force: Boolean): [Port]!
- connections: [Connection]!
- connection(id: ID!): Connection!
- }
- extend type Mutation {
- connect(interfaceName: String!, device: String!): Connection!
- spawnWorker(id: ID!): Worker!
- endWorker(id: ID!): Worker!
- killWorker(id: ID!): Worker!
- connectionCommand(connectionId: ID!, type: String!, string: String!, options: String): String!
- }
- `
- async function findWorkers() {
- // Find all files in ./python_workers that end in _worker.py
- const fileNames = await readdir(`${__dirname}/python_workers`)
- const workerFiles = fileNames.filter(fileName => fileName.includes('_worker.py'))
- // Find the added workers
- workerFiles.forEach(workerFile => {
- const interfaceName = workerFile.replace(/_worker\.py/, '')
- if (state.workers.find(worker => worker.interfaceName === interfaceName)) return null
- const workerScript = `${__dirname}/python_workers/${workerFile}`
- state.workers.push({
- interfaceName,
- workerScript
- })
- })
- }
- async function findInterfaces() {
- // Try to identify workers if necessary
- //if (!state.workers.length)
- await findWorkers()
- // Generate an interface for each worker
- const workerPromises = state.workers.map(async worker => {
- const { workerScript, interfaceName } = worker
- // Skip existing interfaces
- if (state.interfaces.find(iface => iface.interfaceName === interfaceName)) return null
- const pythonWorker = new PythonWorker(workerScript)
- const { res, error } = await pythonWorker.spawn()
- if (error) {
- return
- }
- state.interfaces.push({
- interfaceName,
- workerScript,
- worker: pythonWorker,
- ports: [],
- connections: [],
- options: []
- })
- })
- await Promise.all(workerPromises)
- }
- async function interfaces(parent, args, ctx, info) {
- const { force } = args
- // Try to identify interfaces if necessary
- //if (!state.interfaces.length || force)
- await findInterfaces()
- return state.interfaces.map(iface => {
- const { workerScript, interfaceName, worker } = iface
- return {
- interfaceName,
- host: os.hostname(),
- workerScript,
- workerInfo: workerInfo(worker, args, ctx, info),
- ports: ports(interfaceName, args, ctx, info),
- connections: connections(interfaceName, args, ctx, info),
- options: options(interfaceName, args, ctx, info)
- }
- })
- }
- async function findPorts(interfaceName) {
- // Generate all ports for the interface
- const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
- const { data, error, pythonError } = await iface.worker.send({ type: 'ports' })
- if (error) throw new Error(error)
- if (pythonError) throw new Error(pythonError)
- data.forEach(port => {
- const id = port.name || port.device
- // Skip existing ports
- if (state.ports.find(port => port.id === id)) return null
- const newPort = {
- id,
- interfaceName,
- host: os.hostname(),
- ...port
- }
- state.ports.push(newPort)
- iface.ports.push(newPort)
- })
- }
- async function ports(parent, args, ctx, info) {
- const { force, interfaceName } = args
- const ifName = interfaceName || parent
- if (ifName) {
- const iface = state.interfaces.find(iface => iface.interfaceName === ifName)
- // Try to find ports if necessary
- if (!iface.ports.length || force) await findPorts(ifName)
- return iface.ports
- } else {
- return state.ports
- }
- }
- async function findOptions(interfaceName) {
- const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
- const { data, error, pythonError } = await iface.worker.send({ type: 'options' })
- if (error) throw new Error(error)
- if (pythonError) throw new Error(pythonError)
- iface.options.push(...data)
- }
- async function options(parent, args, ctx, info) {
- const iface = state.interfaces.find(iface => iface.interfaceName === parent)
- // Try to find options if necessary
- if (!iface.options.length) await findOptions(parent)
- return iface.options
- }
- function workerInfo(parent, args, ctx, info) {
- const { killed, exitCode, signalCode, spawnargs, spawnfile, pid } = parent.pythonShell
- return {
- pid,
- killed,
- exitCode,
- signalCode,
- spawnfile,
- spawnargs,
- error: parent.error.length,
- data: parent.data.length
- }
- }
- async function connections(parent, args, ctx, info) {
- if (parent) {
- const iface = state.interfaces.find(iface => iface.interfaceName === parent)
- return iface.connections
- } else {
- return state.connections
- }
- }
- async function connection(parent, args, context, info) {
- const connection = state.connections.find(connection => connection.id === args.id)
- return connection
- }
- async function connect(parent, args, ctx, info) {
- const { interfaceName, device } = args
- const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
- const id = md5(interfaceName + device)
- if (iface.connections.find(connection => connection.id === id)) throw new Error('already connected.')
- const pythonWorker = new PythonWorker(iface.workerScript)
- const spawnData = await pythonWorker.spawn()
- if (spawnData.error) throw new Error(spawnData.error)
- const connection = {
- id,
- device,
- interfaceName,
- host: os.hostname(),
- worker: pythonWorker,
- workerInfo: (parent, args, context, info) => workerInfo(pythonWorker, args, context, info)
- }
- const connectionData = await connection.worker.send({ type: 'connect', device })
- if (connectionData.error) throw new Error(connectionData.error)
- if (connectionData.pythonError) throw new Error(connectionData.pythonError)
- iface.connections.push(connection)
- state.connections.push(connection)
- return connection
- }
- async function connectionCommand(parent, args, ctx, info) {
- const { connectionId, type, string, options } = args
- const connection = state.connections.find(connection => connection.id === connectionId)
- const { data, error, pythonError } = await connection.worker.send({ type, string, options })
- if (error) throw new Error(JSON.stringify(error))
- if (pythonError) throw new Error(JSON.stringify(pythonError))
- return data.response
- }
- //TODO Also find connections in interfaces.
- async function endWorker(parent, args, ctx, info) {
- const { id } = args
- const connection = state.connections.find(connection => connection.id === id)
- const { data, error, pythonError } = await connection.worker.end()
- if (error) throw new Error(JSON.stringify(error))
- if (pythonError.error !== 0) throw new Error(JSON.stringify(pythonError))
- return connection.workerInfo()
- }
- async function spawnWorker(parent, args, ctx, info) {
- const { id } = args
- const connection = state.connections.find(connection => connection.id === id)
- const { data, error, pythonError } = await connection.worker.spawn()
- console.log(data, error, pythonError)
- if (error) throw new Error(JSON.stringify(error))
- if (pythonError) throw new Error(JSON.stringify(pythonError))
- const connectionData = await connection.worker.send({ type: 'connect', device: connection.device })
- if (connectionData.error) throw new Error(connectionData.error)
- if (connectionData.pythonError) throw new Error(connectionData.pythonError)
- return connection.workerInfo()
- }
- async function killWorker(parent, args, ctx, info) {
- const { id } = args
- const connection = state.connections.find(connection => connection.id === id)
- const { data, error, pythonError } = await connection.worker.kill()
- console.log(data, error, pythonError)
- if (error) throw new Error(JSON.stringify(error))
- if (pythonError) throw new Error(JSON.stringify(pythonError))
- return connection.workerInfo()
- }
- const resolvers = {
- Query: {
- interfaces,
- ports,
- connections,
- connection
- },
- Mutation: {
- connect,
- connectionCommand,
- killWorker,
- endWorker,
- spawnWorker
- }
- }
- module.exports = { typeDefs, resolvers }
|