|
@@ -5,15 +5,60 @@ const { promisify } = require('util')
|
|
|
const PythonWorker = require('./pythonWorker')
|
|
|
|
|
|
const readdir = promisify(fs.readdir)
|
|
|
+const stat = promisify(fs.stat)
|
|
|
+
|
|
|
+/**
|
|
|
+ * INTERFACE BACKEND
|
|
|
+ *
|
|
|
+ * Communication with workers (for now only Python)
|
|
|
+ * The workers are implemented as REPL clients that
|
|
|
+ * communicate with JSON.
|
|
|
+ *
|
|
|
+ * Names:
|
|
|
+ * * Worker: a Python file that implements a REPL client (e.g. serial)
|
|
|
+ * * Port:
|
|
|
+ * * Connection:
|
|
|
+ */
|
|
|
+
|
|
|
+const WORKER_DIR = `${__dirname}/python_workers`
|
|
|
+const HOST = os.hostname()
|
|
|
|
|
|
const state = {
|
|
|
workers: [],
|
|
|
interfaces: [],
|
|
|
ports: [],
|
|
|
- connections: []
|
|
|
+ connections: [],
|
|
|
+ lastScan: {
|
|
|
+ workers: Date.now() - 100000,
|
|
|
+ ports: Date.now() - 100000,
|
|
|
+ connections: Date.now() - 100000
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
const typeDefs = `
|
|
|
+ type Worker {
|
|
|
+ id: ID!
|
|
|
+ interfaceName: String!
|
|
|
+ workerScript: String!
|
|
|
+ mtime: DateTime!
|
|
|
+ updated: DateTime
|
|
|
+ workerProcess: WorkerProcess
|
|
|
+ ports: [Port]!
|
|
|
+ connections: [Connection]!
|
|
|
+ options: [Option!]
|
|
|
+ }
|
|
|
+
|
|
|
+ type WorkerProcess {
|
|
|
+ pid: Int
|
|
|
+ killed: Boolean!
|
|
|
+ signalCode: String
|
|
|
+ exitCode: Int
|
|
|
+ spawnfile: String!
|
|
|
+ spawnargs: [String]!
|
|
|
+ error: [String]!
|
|
|
+ data: [String]!
|
|
|
+ }
|
|
|
+
|
|
|
type Option {
|
|
|
name: String!
|
|
|
type: String!
|
|
@@ -30,37 +75,17 @@ const typeDefs = `
|
|
|
description: String
|
|
|
}
|
|
|
|
|
|
- type Worker {
|
|
|
- pid: Int
|
|
|
- killed: Boolean!
|
|
|
- signalCode: String
|
|
|
- exitCode: Int
|
|
|
- spawnfile: String!
|
|
|
- spawnargs: [String]!
|
|
|
- error: Int!
|
|
|
- data: Int!
|
|
|
- }
|
|
|
-
|
|
|
type Connection {
|
|
|
id: ID!
|
|
|
interfaceName: String!
|
|
|
host: String!
|
|
|
device: String!
|
|
|
- workerInfo: Worker
|
|
|
- }
|
|
|
-
|
|
|
- type Interface {
|
|
|
- interfaceName: String!
|
|
|
- host: String!
|
|
|
- workerScript: String!
|
|
|
- workerInfo: Worker
|
|
|
- ports: [Port]!
|
|
|
- connections: [Connection]!
|
|
|
- options: [Option]!
|
|
|
+ workerProcess: WorkerProcess
|
|
|
}
|
|
|
|
|
|
extend type Query {
|
|
|
- interfaces(force: Boolean): [Interface]!
|
|
|
+ workers: [Worker]!
|
|
|
+ worker(id: ID, interfaceName: String): Worker!
|
|
|
ports(interfaceName: String, force: Boolean): [Port]!
|
|
|
connections: [Connection]!
|
|
|
connection(id: ID!): Connection!
|
|
@@ -75,42 +100,43 @@ const typeDefs = `
|
|
|
}
|
|
|
`
|
|
|
|
|
|
-async function findWorkers() {
|
|
|
- // Find all files in ./python_workers that end in _worker.py
|
|
|
- const fileNames = await readdir(`${__dirname}/python_workers`)
|
|
|
+/**
|
|
|
+ * WORKER SECTION
|
|
|
+ */
|
|
|
+async function findWorkers () {
|
|
|
+ // 1. Don't check more frequently than once per second.
|
|
|
+ // if (state.lastScan.workers + 1000 > Date.now()) return null
|
|
|
+ // state.lastScan.workers = Date.now()
|
|
|
+
|
|
|
+ // 2. Find all files in ./python_workers that end in _worker.py
|
|
|
+ const fileNames = await readdir(WORKER_DIR)
|
|
|
const workerFiles = fileNames.filter(fileName => fileName.includes('_worker.py'))
|
|
|
|
|
|
- // Find the added workers
|
|
|
- workerFiles.forEach(workerFile => {
|
|
|
+ // 3. For every worker script
|
|
|
+ const workerPromises = workerFiles.map(async workerFile => {
|
|
|
const interfaceName = workerFile.replace(/_worker\.py/, '')
|
|
|
- if (state.workers.find(worker => worker.interfaceName === interfaceName)) return null
|
|
|
- const workerScript = `${__dirname}/python_workers/${workerFile}`
|
|
|
- state.workers.push({
|
|
|
- interfaceName,
|
|
|
- workerScript
|
|
|
- })
|
|
|
- })
|
|
|
-}
|
|
|
-
|
|
|
-async function findInterfaces() {
|
|
|
- // Try to identify workers if necessary
|
|
|
- //if (!state.workers.length)
|
|
|
- await findWorkers()
|
|
|
-
|
|
|
- // Generate an interface for each worker
|
|
|
- const workerPromises = state.workers.map(async worker => {
|
|
|
- const { workerScript, interfaceName } = worker
|
|
|
- // Skip existing interfaces
|
|
|
- if (state.interfaces.find(iface => iface.interfaceName === interfaceName)) return null
|
|
|
- const pythonWorker = new PythonWorker(workerScript)
|
|
|
- const { res, error } = await pythonWorker.spawn()
|
|
|
- if (error) {
|
|
|
+ const workerScript = `${WORKER_DIR}/${workerFile}`
|
|
|
+ // a. Find out if it was modified.
|
|
|
+ const { mtime } = await stat(workerScript)
|
|
|
+ const foundWorker = state.workers.find(worker => worker.interfaceName === interfaceName)
|
|
|
+ if (foundWorker) {
|
|
|
+ // b. If it was modified, save the modification time.
|
|
|
+ if (foundWorker.mtime < mtime) foundWorker.updated = mtime
|
|
|
return
|
|
|
}
|
|
|
- state.interfaces.push({
|
|
|
+ // c. Spawn a new worker connection.
|
|
|
+ const workerProcess = new PythonWorker(workerScript)
|
|
|
+ const { data, error, pythonError } = await workerProcess.spawn()
|
|
|
+ if (error) throw new Error(error)
|
|
|
+ if (pythonError) throw new Error(pythonError)
|
|
|
+ // c. Save the worker in the state.
|
|
|
+ state.workers.push({
|
|
|
+ id: md5(`${interfaceName}${workerScript}${mtime}`),
|
|
|
interfaceName,
|
|
|
workerScript,
|
|
|
- worker: pythonWorker,
|
|
|
+ mtime,
|
|
|
+ updated: null,
|
|
|
+ workerProcess,
|
|
|
ports: [],
|
|
|
connections: [],
|
|
|
options: []
|
|
@@ -119,33 +145,45 @@ async function findInterfaces() {
|
|
|
await Promise.all(workerPromises)
|
|
|
}
|
|
|
|
|
|
-async function interfaces(parent, args, ctx, info) {
|
|
|
- const { force } = args
|
|
|
- // Try to identify interfaces if necessary
|
|
|
- //if (!state.interfaces.length || force)
|
|
|
- await findInterfaces()
|
|
|
+async function workers (parent, args, context, info) {
|
|
|
+ await findWorkers()
|
|
|
+ return state.workers.map(worker => ({
|
|
|
+ ...worker,
|
|
|
+ workerProcess: workerProcess(worker.workerProcess, args, context, info)
|
|
|
+ }))
|
|
|
+}
|
|
|
|
|
|
- return state.interfaces.map(iface => {
|
|
|
- const { workerScript, interfaceName, worker } = iface
|
|
|
- return {
|
|
|
- interfaceName,
|
|
|
- host: os.hostname(),
|
|
|
- workerScript,
|
|
|
- workerInfo: workerInfo(worker, args, ctx, info),
|
|
|
- ports: ports(interfaceName, args, ctx, info),
|
|
|
- connections: connections(interfaceName, args, ctx, info),
|
|
|
- options: options(interfaceName, args, ctx, info)
|
|
|
- }
|
|
|
- })
|
|
|
+async function worker (parent, args, context, info) {
|
|
|
+ await findWorkers()
|
|
|
+ const { id, interfaceName } = args
|
|
|
+ if (!id && !interfaceName) throw new Error('Either id or interfaceName needs to be provided!')
|
|
|
+ const worker = state.workers.find(worker => worker.id === id || worker.interfaceName === interfaceName)
|
|
|
+ if (!worker) throw new Error(`Worker id=${id}, interfaceName=${interfaceName} not found`)
|
|
|
+ return worker
|
|
|
}
|
|
|
|
|
|
-async function findPorts(interfaceName) {
|
|
|
+/**
|
|
|
+ * PORTS SECTION
|
|
|
+ */
|
|
|
+async function findPorts (interfaceName) {
|
|
|
+ console.log('find ports.')
|
|
|
+ // 1. Make sure, workers are updated.
|
|
|
+ await findWorkers()
|
|
|
+
|
|
|
+ // 2. Don't check more frequently than once per second.
|
|
|
+ // if (state.lastScan.ports + 1000 > Date.now()) return null
|
|
|
+ // state.lastScan.ports = Date.now()
|
|
|
+
|
|
|
+ const portsPromises = state.workers.map(worker => {
|
|
|
+
|
|
|
+ })
|
|
|
// Generate all ports for the interface
|
|
|
const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
|
|
|
|
|
|
const { data, error, pythonError } = await iface.worker.send({ type: 'ports' })
|
|
|
if (error) throw new Error(error)
|
|
|
if (pythonError) throw new Error(pythonError)
|
|
|
+ console.log(data)
|
|
|
data.forEach(port => {
|
|
|
const id = port.name || port.device
|
|
|
// Skip existing ports
|
|
@@ -159,9 +197,10 @@ async function findPorts(interfaceName) {
|
|
|
state.ports.push(newPort)
|
|
|
iface.ports.push(newPort)
|
|
|
})
|
|
|
+ console.log('found ports.')
|
|
|
}
|
|
|
|
|
|
-async function ports(parent, args, ctx, info) {
|
|
|
+async function ports (parent, args, ctx, info) {
|
|
|
const { force, interfaceName } = args
|
|
|
const ifName = interfaceName || parent
|
|
|
|
|
@@ -176,7 +215,7 @@ async function ports(parent, args, ctx, info) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-async function findOptions(interfaceName) {
|
|
|
+async function findOptions (interfaceName) {
|
|
|
const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
|
|
|
const { data, error, pythonError } = await iface.worker.send({ type: 'options' })
|
|
|
if (error) throw new Error(error)
|
|
@@ -184,7 +223,7 @@ async function findOptions(interfaceName) {
|
|
|
iface.options.push(...data)
|
|
|
}
|
|
|
|
|
|
-async function options(parent, args, ctx, info) {
|
|
|
+async function options (parent, args, ctx, info) {
|
|
|
const iface = state.interfaces.find(iface => iface.interfaceName === parent)
|
|
|
|
|
|
// Try to find options if necessary
|
|
@@ -192,8 +231,9 @@ async function options(parent, args, ctx, info) {
|
|
|
return iface.options
|
|
|
}
|
|
|
|
|
|
-function workerInfo(parent, args, ctx, info) {
|
|
|
+function workerProcess (parent, args, ctx, info) {
|
|
|
const { killed, exitCode, signalCode, spawnargs, spawnfile, pid } = parent.pythonShell
|
|
|
+ const { error, data } = parent
|
|
|
return {
|
|
|
pid,
|
|
|
killed,
|
|
@@ -201,12 +241,12 @@ function workerInfo(parent, args, ctx, info) {
|
|
|
signalCode,
|
|
|
spawnfile,
|
|
|
spawnargs,
|
|
|
- error: parent.error.length,
|
|
|
- data: parent.data.length
|
|
|
+ error,
|
|
|
+ data
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-async function connections(parent, args, ctx, info) {
|
|
|
+async function connections (parent, args, ctx, info) {
|
|
|
if (parent) {
|
|
|
const iface = state.interfaces.find(iface => iface.interfaceName === parent)
|
|
|
return iface.connections
|
|
@@ -215,12 +255,12 @@ async function connections(parent, args, ctx, info) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-async function connection(parent, args, context, info) {
|
|
|
+async function connection (parent, args, context, info) {
|
|
|
const connection = state.connections.find(connection => connection.id === args.id)
|
|
|
return connection
|
|
|
}
|
|
|
|
|
|
-async function connect(parent, args, ctx, info) {
|
|
|
+async function connect (parent, args, ctx, info) {
|
|
|
const { interfaceName, device } = args
|
|
|
const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
|
|
|
const id = md5(interfaceName + device)
|
|
@@ -234,7 +274,7 @@ async function connect(parent, args, ctx, info) {
|
|
|
interfaceName,
|
|
|
host: os.hostname(),
|
|
|
worker: pythonWorker,
|
|
|
- workerInfo: (parent, args, context, info) => workerInfo(pythonWorker, args, context, info)
|
|
|
+ workerProcess: (parent, args, context, info) => workerProcess(pythonWorker, args, context, info)
|
|
|
}
|
|
|
const connectionData = await connection.worker.send({ type: 'connect', device })
|
|
|
if (connectionData.error) throw new Error(connectionData.error)
|
|
@@ -244,7 +284,7 @@ async function connect(parent, args, ctx, info) {
|
|
|
return connection
|
|
|
}
|
|
|
|
|
|
-async function connectionCommand(parent, args, ctx, info) {
|
|
|
+async function connectionCommand (parent, args, ctx, info) {
|
|
|
const { connectionId, type, string, options } = args
|
|
|
const connection = state.connections.find(connection => connection.id === connectionId)
|
|
|
const { data, error, pythonError } = await connection.worker.send({ type, string, options })
|
|
@@ -253,17 +293,17 @@ async function connectionCommand(parent, args, ctx, info) {
|
|
|
return data.response
|
|
|
}
|
|
|
|
|
|
-//TODO Also find connections in interfaces.
|
|
|
-async function endWorker(parent, args, ctx, info) {
|
|
|
+// TODO Also find connections in interfaces.
|
|
|
+async function endWorker (parent, args, ctx, info) {
|
|
|
const { id } = args
|
|
|
const connection = state.connections.find(connection => connection.id === id)
|
|
|
const { data, error, pythonError } = await connection.worker.end()
|
|
|
if (error) throw new Error(JSON.stringify(error))
|
|
|
if (pythonError.error !== 0) throw new Error(JSON.stringify(pythonError))
|
|
|
- return connection.workerInfo()
|
|
|
+ return connection.workerProcess()
|
|
|
}
|
|
|
|
|
|
-async function spawnWorker(parent, args, ctx, info) {
|
|
|
+async function spawnWorker (parent, args, ctx, info) {
|
|
|
const { id } = args
|
|
|
const connection = state.connections.find(connection => connection.id === id)
|
|
|
const { data, error, pythonError } = await connection.worker.spawn()
|
|
@@ -273,22 +313,23 @@ async function spawnWorker(parent, args, ctx, info) {
|
|
|
const connectionData = await connection.worker.send({ type: 'connect', device: connection.device })
|
|
|
if (connectionData.error) throw new Error(connectionData.error)
|
|
|
if (connectionData.pythonError) throw new Error(connectionData.pythonError)
|
|
|
- return connection.workerInfo()
|
|
|
+ return connection.workerProcess()
|
|
|
}
|
|
|
|
|
|
-async function killWorker(parent, args, ctx, info) {
|
|
|
+async function killWorker (parent, args, ctx, info) {
|
|
|
const { id } = args
|
|
|
const connection = state.connections.find(connection => connection.id === id)
|
|
|
const { data, error, pythonError } = await connection.worker.kill()
|
|
|
console.log(data, error, pythonError)
|
|
|
if (error) throw new Error(JSON.stringify(error))
|
|
|
if (pythonError) throw new Error(JSON.stringify(pythonError))
|
|
|
- return connection.workerInfo()
|
|
|
+ return connection.workerProcess()
|
|
|
}
|
|
|
|
|
|
const resolvers = {
|
|
|
Query: {
|
|
|
- interfaces,
|
|
|
+ workers,
|
|
|
+ worker,
|
|
|
ports,
|
|
|
connections,
|
|
|
connection
|