|
@@ -1,4 +1,6 @@
|
|
|
-const fs = require('fs')
|
|
|
+/** @module interfaces */
|
|
|
+
|
|
|
+let fs = require('fs')
|
|
|
const os = require('os')
|
|
|
const md5 = require('md5')
|
|
|
const { promisify } = require('util')
|
|
@@ -20,9 +22,13 @@ const stat = promisify(fs.stat)
|
|
|
* * Connection:
|
|
|
*/
|
|
|
|
|
|
+/** Directory with the Python worker scripts */
|
|
|
const WORKER_DIR = `${__dirname}/python_workers`
|
|
|
+
|
|
|
+/** Hostname is used to identify ports across machines */
|
|
|
const HOST = os.hostname()
|
|
|
|
|
|
+/** Local state for the interface module */
|
|
|
const state = {
|
|
|
interfaces: [],
|
|
|
ports: [],
|
|
@@ -34,6 +40,7 @@ const state = {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/** GraphQL types for the interface module */
|
|
|
const typeDefs = `
|
|
|
type Interface {
|
|
|
id: ID!
|
|
@@ -104,6 +111,79 @@ const typeDefs = `
|
|
|
sendCommand(connectionId: ID!, command: ConnectionCommand!): String!
|
|
|
}
|
|
|
`
|
|
|
+/**
|
|
|
+ * Find worker files in a directory based on filename convention
|
|
|
+ * (ending in _worker.py).
|
|
|
+ * @param {string} directory - Python worker directory
|
|
|
+ */
|
|
|
+async function findWorkers (directory) {
|
|
|
+ // Only accept strings
|
|
|
+ if (!(typeof directory === 'string')) {
|
|
|
+ throw new Error('Directory argument must be a string.')
|
|
|
+ }
|
|
|
+ const fileNames = await readdir(directory)
|
|
|
+ return fileNames.filter(fileName => fileName.includes('_worker.py'))
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Fetch options from Python worker process.
|
|
|
+ * @param {Object} workerProcess - Worker process to use for fetching the options
|
|
|
+ */
|
|
|
+async function getOptions (workerProcess) {
|
|
|
+ // Check that workerProcess has a send method.
|
|
|
+ if (!workerProcess || !workerProcess.send) throw new Error('workerProcess not configured properly.')
|
|
|
+ const { data, error } = await workerProcess.send({ type: 'options' })
|
|
|
+ if (error) throw new Error(error)
|
|
|
+ return data
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Takes a Python worker script and generates an interface for it.
|
|
|
+ * You need to call the spawn() function to launch the Python worker
|
|
|
+ * process.
|
|
|
+ * @param {string} directory - Python worker directory
|
|
|
+ * @param {string} workerFile - Python worker file
|
|
|
+ */
|
|
|
+async function createInterface (directory, workerFile, state) {
|
|
|
+ // Assert that arguments are strings.
|
|
|
+ if (!(typeof directory === 'string')) {
|
|
|
+ throw new Error('Directory argument must be a string.')
|
|
|
+ }
|
|
|
+ if (!(typeof workerFile === 'string')) {
|
|
|
+ throw new Error('workerFile argument must be a string.')
|
|
|
+ }
|
|
|
+ const interfaceName = workerFile.replace(/_worker\.py/, '')
|
|
|
+ const path = `${directory}/${workerFile}`
|
|
|
+ // 1. Find out the last modification time.
|
|
|
+ const { mtime } = await stat(path)
|
|
|
+ const workerScript = { path, mtime, updated: null }
|
|
|
+ // 2. Check if the interface already exists
|
|
|
+ console.log('bugu', state)
|
|
|
+ const foundInterface = state.interfaces.find(
|
|
|
+ iface => iface.interfaceName === interfaceName
|
|
|
+ )
|
|
|
+ if (foundInterface) {
|
|
|
+ // b. If it was modified, save the modification time.
|
|
|
+ if (foundInterface.workerScript.mtime < mtime) {
|
|
|
+ foundInterface.workerScript.updated = mtime
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // c. Spawn a new worker connection.
|
|
|
+ const workerProcess = new PythonWorker(workerScript)
|
|
|
+ const { data, error } = await workerProcess.spawn()
|
|
|
+ if (error) throw new Error(error)
|
|
|
+ const id = md5(`${interfaceName}${workerScript.path}${mtime.toISOString()}`)
|
|
|
+ const options = await getOptions(workerProcess)
|
|
|
+ // d. Save the worker in the state.
|
|
|
+ return {
|
|
|
+ id,
|
|
|
+ interfaceName,
|
|
|
+ workerScript,
|
|
|
+ workerProcess,
|
|
|
+ options
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
/**
|
|
|
* INTERFACE SECTION
|
|
@@ -114,42 +194,13 @@ async function findInterfaces () {
|
|
|
// state.lastScan.workers = Date.now()
|
|
|
|
|
|
// 2. Find all files in ./python_workers that end in _worker.py
|
|
|
- const fileNames = await readdir(WORKER_DIR)
|
|
|
- const workerFiles = fileNames.filter(fileName =>
|
|
|
- fileName.includes('_worker.py')
|
|
|
- )
|
|
|
+ const workerFiles = await findWorkers(WORKER_DIR)
|
|
|
|
|
|
// 3. For every worker script
|
|
|
- const workerPromises = workerFiles.map(async workerFile => {
|
|
|
- const interfaceName = workerFile.replace(/_worker\.py/, '')
|
|
|
- const path = `${WORKER_DIR}/${workerFile}`
|
|
|
- // a. Find out if it was modified.
|
|
|
- const { mtime } = await stat(path)
|
|
|
- const workerScript = { path, mtime, updated: null }
|
|
|
- const foundInterface = state.interfaces.find(
|
|
|
- iface => iface.interfaceName === interfaceName
|
|
|
- )
|
|
|
- if (foundInterface) {
|
|
|
- // b. If it was modified, save the modification time.
|
|
|
- if (foundInterface.workerScript.mtime < mtime) {
|
|
|
- foundInterface.workerScript.updated = mtime
|
|
|
- }
|
|
|
- return
|
|
|
- }
|
|
|
- // c. Spawn a new worker connection.
|
|
|
- const workerProcess = new PythonWorker(workerScript)
|
|
|
- const { data, error } = await workerProcess.spawn()
|
|
|
- if (error) throw new Error(error)
|
|
|
- // d. Save the worker in the state.
|
|
|
- state.interfaces.push({
|
|
|
- id: md5(`${interfaceName}${workerScript}${mtime}`),
|
|
|
- interfaceName,
|
|
|
- workerScript,
|
|
|
- workerProcess,
|
|
|
- options: await options(workerProcess)
|
|
|
- })
|
|
|
- })
|
|
|
- await Promise.all(workerPromises)
|
|
|
+ const workerPromises = workerFiles.map(workerFile =>
|
|
|
+ createInterface(WORKER_DIR, workerFile, state)
|
|
|
+ )
|
|
|
+ await Promise.all(workerPromises).then(results => console.log(results))
|
|
|
}
|
|
|
|
|
|
async function interfaces (parent, args, context, info) {
|
|
@@ -171,14 +222,6 @@ async function iface (parent, { id }, context, info) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-async function options (workerProcess) {
|
|
|
- const { data, error } = await workerProcess.send({
|
|
|
- type: 'options'
|
|
|
- })
|
|
|
- if (error) throw new Error(error)
|
|
|
- return data
|
|
|
-}
|
|
|
-
|
|
|
function workerProcess (parent, args, context, info) {
|
|
|
const {
|
|
|
killed,
|
|
@@ -363,4 +406,12 @@ const resolvers = {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-module.exports = { typeDefs, resolvers }
|
|
|
+const __test__ = {
|
|
|
+ findWorkers,
|
|
|
+ createInterface,
|
|
|
+ getOptions,
|
|
|
+ workerProcess,
|
|
|
+ state
|
|
|
+}
|
|
|
+
|
|
|
+module.exports = { typeDefs, resolvers, __test__ }
|