|
@@ -18,7 +18,7 @@ const fs = require('fs')
|
|
|
const os = require('os')
|
|
|
const md5 = require('md5')
|
|
|
const { promisify } = require('util')
|
|
|
-const PythonWorker = require('./pythonWorker')
|
|
|
+let PythonWorker = require('./pythonWorker')
|
|
|
|
|
|
const readdir = promisify(fs.readdir)
|
|
|
const stat = promisify(fs.stat)
|
|
@@ -127,6 +127,7 @@ const typeDefs = `
|
|
|
* Find worker files in a directory based on filename convention
|
|
|
* (ending in _worker.py).
|
|
|
* @param {string} directory - Python worker directory
|
|
|
+ * @return {string[]} Array with found worker scripts
|
|
|
*/
|
|
|
async function findWorkers (directory) {
|
|
|
// Only accept strings
|
|
@@ -157,17 +158,28 @@ async function getOptions (workerProcess) {
|
|
|
* Takes a Python worker script and generates an interface for it.
|
|
|
* You need to call the spawn() function to launch the Python worker
|
|
|
* process.
|
|
|
+ * @typedef {object} WorkerScript
|
|
|
+ * @property {string} path - Path of the worker script
|
|
|
+ * @property {number} mtime - Last modification time
|
|
|
+ * @property {number} updated - Last modification time, if changed after spawn
|
|
|
+ *
|
|
|
+ * @typedef {object} Interface
|
|
|
+ * @property {string} id - ID of the interface
|
|
|
+ * @property {string} interfaceName - Name of the interface
|
|
|
+ * @property {WorkerScript} workerScript - Information about the worker script
|
|
|
+ * @property {boolean} active - Is the worker still available
|
|
|
+ *
|
|
|
* @param {string} directory - Python worker directory
|
|
|
* @param {string} workerFile - Python worker file
|
|
|
- * @param {Object} state - State with already connected interfaces
|
|
|
- * @return {Object} Updated state
|
|
|
+ * @param {Interface[]} interfaces - State with already connected interfaces
|
|
|
+ * @return {Object} created interface
|
|
|
*/
|
|
|
async function createInterface (directory, workerFile, interfaces) {
|
|
|
// Assert that arguments are strings.
|
|
|
- if (!(typeof directory === 'string')) {
|
|
|
+ if (!(typeof directory === 'string' || directory instanceof String)) {
|
|
|
throw new Error('Directory argument must be a string.')
|
|
|
}
|
|
|
- if (!(typeof workerFile === 'string')) {
|
|
|
+ if (!(typeof workerFile === 'string' || workerFile instanceof String)) {
|
|
|
throw new Error('workerFile argument must be a string.')
|
|
|
}
|
|
|
const interfaceName = workerFile.replace(/_worker\.py/, '')
|
|
@@ -176,45 +188,39 @@ async function createInterface (directory, workerFile, interfaces) {
|
|
|
const { mtime } = await stat(path)
|
|
|
const workerScript = { path, mtime, updated: null }
|
|
|
// 2. Check if the interface already exists
|
|
|
- const ifaceIndex = interfaces.findIndex(
|
|
|
- iface => iface.interfaceName === interfaceName
|
|
|
- )
|
|
|
- if (ifaceIndex >= 0) {
|
|
|
+ const iface = interfaces.find(iface => iface.interfaceName === interfaceName)
|
|
|
+ if (iface) {
|
|
|
// b. If it was modified, save the modification time.
|
|
|
- if (interfaces[ifaceIndex].workerScript.mtime < mtime) {
|
|
|
- const iface = interfaces[ifaceIndex]
|
|
|
- const newWorkerScript = { ...iface.workerScript, updated: mtime }
|
|
|
- return [
|
|
|
- ...interfaces.slice(0, ifaceIndex),
|
|
|
- {
|
|
|
- ...iface,
|
|
|
- workerScript: newWorkerScript
|
|
|
- },
|
|
|
- ...interfaces.slice(ifaceIndex + 1)
|
|
|
- ]
|
|
|
- } else {
|
|
|
- return interfaces
|
|
|
+ if (iface.workerScript.mtime < mtime) {
|
|
|
+ iface.workerScript.update = mtime
|
|
|
}
|
|
|
+ return iface
|
|
|
}
|
|
|
// c. Spawn a new worker connection.
|
|
|
const workerProcess = new PythonWorker(workerScript)
|
|
|
const { data, error } = await workerProcess.spawn()
|
|
|
if (error) throw new Error(error)
|
|
|
- const id = md5(`${interfaceName}${workerScript.path}${mtime.toISOString()}`)
|
|
|
+ const id = md5(
|
|
|
+ `${HOST}${interfaceName}${workerScript.path}${mtime.toISOString()}`
|
|
|
+ )
|
|
|
const options = await getOptions(workerProcess)
|
|
|
- // d. Save the worker in the state.
|
|
|
- return [
|
|
|
- ...interfaces,
|
|
|
- { id, interfaceName, workerScript, workerProcess, options }
|
|
|
- ]
|
|
|
+ // d. Return the interface.
|
|
|
+ return {
|
|
|
+ id,
|
|
|
+ interfaceName,
|
|
|
+ workerScript,
|
|
|
+ workerProcess,
|
|
|
+ options,
|
|
|
+ active: true
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * INTERFACE SECTION
|
|
|
+ * Generate interface list
|
|
|
*/
|
|
|
-async function findInterfaces () {
|
|
|
+async function generateInterfaceList () {
|
|
|
// 1. Don't check more frequently than once per second.
|
|
|
- // if (state.lastScan.workers + 1000 > Date.now()) return null
|
|
|
+ // if (state.lastScan.workers + 1000 > Date.now()) return state.interfaces
|
|
|
// state.lastScan.workers = Date.now()
|
|
|
|
|
|
// 2. Find all files in ./python_workers that end in _worker.py
|
|
@@ -222,23 +228,32 @@ async function findInterfaces () {
|
|
|
|
|
|
// 3. For every worker script
|
|
|
const workerPromises = workerFiles.map(workerFile =>
|
|
|
- createInterface(WORKER_DIR, workerFile, state)
|
|
|
+ createInterface(WORKER_DIR, workerFile, state.interfaces)
|
|
|
)
|
|
|
- await Promise.all(workerPromises).then(results => console.log(results))
|
|
|
+ return Promise.all(workerPromises)
|
|
|
}
|
|
|
|
|
|
async function interfaces (parent, args, context, info) {
|
|
|
- await findInterfaces()
|
|
|
+ const incomingInterfaces = await generateInterfaceList()
|
|
|
+ const newInterfaces = incomingInterfaces.filter(
|
|
|
+ inIface => !state.interfaces.find(stIface => stIface.id === inIface.id)
|
|
|
+ )
|
|
|
+ state.interfaces.forEach(stIface => {
|
|
|
+ if (!incomingInterfaces.find(inIface => inIface.id === stIface.id)) {
|
|
|
+ stIface.active = false
|
|
|
+ }
|
|
|
+ })
|
|
|
+ state.interfaces = [...state.interfaces, ...newInterfaces]
|
|
|
return state.interfaces
|
|
|
}
|
|
|
|
|
|
async function iface (parent, { id }, context, info) {
|
|
|
- await findInterfaces()
|
|
|
+ await generateInterfaceList()
|
|
|
const iface = state.interfaces.find(
|
|
|
iface => iface.id === id || iface.interfaceName === interfaceName
|
|
|
)
|
|
|
if (!iface) {
|
|
|
- throw new Error(`Worker id=${id}, interfaceName=${interfaceName} not found`)
|
|
|
+ throw new Error(`Worker id=${id} not found`)
|
|
|
}
|
|
|
return iface
|
|
|
}
|
|
@@ -273,43 +288,55 @@ function serializeWorkerProcess (parent, args, context, info) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+async function createInterfacePorts (iface, ports) {
|
|
|
+ // a) Ask interface for ports.
|
|
|
+ const { data, error } = await iface.workerProcess.send({
|
|
|
+ type: 'ports'
|
|
|
+ })
|
|
|
+ if (error) throw new Error(error)
|
|
|
+ // b) Add all ports that are not in the list.
|
|
|
+ return data.map(port => {
|
|
|
+ const id = port.name || port.device
|
|
|
+ return {
|
|
|
+ id,
|
|
|
+ interface: iface,
|
|
|
+ interfaceName: iface.interfaceName,
|
|
|
+ host: HOST,
|
|
|
+ ...port
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
- * PORTS SECTION
|
|
|
+ * Search for available ports on all interfaces
|
|
|
+ * @typedef {object} Port
|
|
|
+ * @property {string} id - Port ID
|
|
|
+ * @property {string} interfaceName - Interface name
|
|
|
+ * @property {host} host - Hostname
|
|
|
+ * @property {string} device - Physical device name
|
|
|
+ * @property {string} name - System name for the port
|
|
|
+ * @property {string} description - Description if available by system
|
|
|
+ *
|
|
|
+ * @param {Port[]} ports - Array of already found ports
|
|
|
+ * @return {Promise<Port[]>} Promise to return found ports
|
|
|
*/
|
|
|
-async function findPorts () {
|
|
|
+async function generatePortList (ports) {
|
|
|
// 1. Make sure, workers are updated.
|
|
|
- await findInterfaces()
|
|
|
+ const incomingPorts = await generateInterfaceList()
|
|
|
|
|
|
// 2. Don't check more frequently than once per second.
|
|
|
// if (state.lastScan.ports + 1000 > Date.now()) return null
|
|
|
// state.lastScan.ports = Date.now()
|
|
|
|
|
|
// 3. Loop through all workers to find available ports.
|
|
|
- const portsPromises = state.interfaces.map(async iface => {
|
|
|
- // a) Ask interface for ports.
|
|
|
- const { data, error } = await iface.workerProcess.send({
|
|
|
- type: 'ports'
|
|
|
- })
|
|
|
- if (error) throw new Error(error)
|
|
|
- // b) Add all ports that are not in the list.
|
|
|
- data.forEach(port => {
|
|
|
- const id = port.name || port.device
|
|
|
- if (state.ports.find(port => port.id === id)) return null
|
|
|
- const newPort = {
|
|
|
- id,
|
|
|
- interface: iface,
|
|
|
- interfaceName: iface.interfaceName,
|
|
|
- host: HOST,
|
|
|
- ...port
|
|
|
- }
|
|
|
- state.ports.push(newPort)
|
|
|
- })
|
|
|
+ const portsPromises = interfaces.map(async iface => {
|
|
|
+ createInterfacePorts(iface, state.ports)
|
|
|
})
|
|
|
await Promise.all(portsPromises)
|
|
|
}
|
|
|
|
|
|
async function ports (parent, args, context, info) {
|
|
|
- await findPorts()
|
|
|
+ await generatePortList()
|
|
|
|
|
|
if (parent) {
|
|
|
return state.ports.filter(
|
|
@@ -321,7 +348,7 @@ async function ports (parent, args, context, info) {
|
|
|
}
|
|
|
|
|
|
async function port (parent, { id }, context, info) {
|
|
|
- await findPorts()
|
|
|
+ await generatePortList()
|
|
|
const port = state.ports.find(port => port.id === id)
|
|
|
return port
|
|
|
}
|
|
@@ -330,7 +357,7 @@ async function port (parent, { id }, context, info) {
|
|
|
* CONNECTION SECTION
|
|
|
*/
|
|
|
async function connect (parent, { portId }, ctx, info) {
|
|
|
- await findPorts()
|
|
|
+ await generatePortList()
|
|
|
const port = state.ports.find(port => port.id === portId)
|
|
|
if (!port) throw new Error(`Port ${portId} not found`)
|
|
|
|
|
@@ -431,20 +458,11 @@ const resolvers = {
|
|
|
Interface: {
|
|
|
ports,
|
|
|
connections,
|
|
|
- serializeWorkerProcess
|
|
|
+ workerProcess: serializeWorkerProcess
|
|
|
},
|
|
|
Connection: {
|
|
|
- serializeWorkerProcess
|
|
|
+ workerProcess: serializeWorkerProcess
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-const __test__ = {
|
|
|
- findWorkers,
|
|
|
- createInterface,
|
|
|
- getOptions,
|
|
|
- findInterfaces,
|
|
|
- serializeWorkerProcess,
|
|
|
- state
|
|
|
-}
|
|
|
-
|
|
|
-module.exports = { typeDefs, resolvers, __test__ }
|
|
|
+module.exports = { typeDefs, resolvers }
|