interfaces.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. const fs = require('fs')
  2. const os = require('os')
  3. const md5 = require('md5')
  4. const { promisify } = require('util')
  5. const PythonWorker = require('./pythonWorker')
  6. const readdir = promisify(fs.readdir)
  7. const stat = promisify(fs.stat)
  8. /**
  9. * INTERFACE BACKEND
  10. *
  11. * Communication with workers (for now only Python)
  12. * The workers are implemented as REPL clients that
  13. * communicate with JSON.
  14. *
  15. * Names:
  16. * * Worker: a Python file that implements a REPL client (e.g. serial)
  17. * * Port:
  18. * * Connection:
  19. */
  20. const WORKER_DIR = `${__dirname}/python_workers`
  21. const HOST = os.hostname()
  22. const state = {
  23. interfaces: [],
  24. ports: [],
  25. connections: [],
  26. lastScan: {
  27. workers: Date.now() - 100000,
  28. ports: Date.now() - 100000,
  29. connections: Date.now() - 100000,
  30. },
  31. }
  32. const typeDefs = `
  33. type Interface {
  34. id: ID!
  35. interfaceName: String!
  36. workerScript: WorkerScript!
  37. workerProcess: WorkerProcess
  38. ports: [Port]!
  39. connections: [Connection]!
  40. options: [Option!]
  41. }
  42. type WorkerScript {
  43. path: String!
  44. mtime: DateTime!
  45. updated: DateTime
  46. }
  47. type WorkerProcess {
  48. pid: Int!
  49. killed: Boolean!
  50. signalCode: String
  51. exitCode: Int
  52. spawnfile: String!
  53. spawnargs: [String]!
  54. }
  55. type Option {
  56. name: String!
  57. type: String!
  58. description: String
  59. values: [String!]
  60. }
  61. type Port {
  62. id: String!
  63. interfaceName: String!
  64. host: String!
  65. device: String
  66. name: String
  67. description: String
  68. }
  69. type Connection {
  70. id: ID!
  71. port: Port!
  72. workerProcess: WorkerProcess
  73. }
  74. input ConnectionCommand {
  75. type: String!
  76. data: String
  77. options: String
  78. }
  79. extend type Query {
  80. interfaces: [Interface]!
  81. interface(id: ID, interfaceName: String): Interface!
  82. ports(interfaceName: String, force: Boolean): [Port]!
  83. port(id: ID!): Port!
  84. connections: [Connection]!
  85. connection(id: ID!): Connection!
  86. }
  87. extend type Mutation {
  88. connect(portId: ID!): Connection!
  89. endConnection(connectionId: ID!): Connection!
  90. killConnection(connectionId: ID!): Connection!
  91. sendCommand(connectionId: ID!, command: ConnectionCommand!): String!
  92. }
  93. `
  94. /**
  95. * INTERFACE SECTION
  96. */
  97. async function findInterfaces() {
  98. // 1. Don't check more frequently than once per second.
  99. // if (state.lastScan.workers + 1000 > Date.now()) return null
  100. // state.lastScan.workers = Date.now()
  101. // 2. Find all files in ./python_workers that end in _worker.py
  102. const fileNames = await readdir(WORKER_DIR)
  103. const workerFiles = fileNames.filter(fileName =>
  104. fileName.includes('_worker.py')
  105. )
  106. // 3. For every worker script
  107. const workerPromises = workerFiles.map(async workerFile => {
  108. const interfaceName = workerFile.replace(/_worker\.py/, '')
  109. const path = `${WORKER_DIR}/${workerFile}`
  110. // a. Find out if it was modified.
  111. const { mtime } = await stat(path)
  112. const workerScript = { path, mtime, updated: null }
  113. const foundInterface = state.interfaces.find(
  114. iface => iface.interfaceName === interfaceName
  115. )
  116. if (foundInterface) {
  117. // b. If it was modified, save the modification time.
  118. if (foundInterface.workerScript.mtime < mtime)
  119. foundInterface.workerScript.updated = mtime
  120. return
  121. }
  122. // c. Spawn a new worker connection.
  123. const workerProcess = new PythonWorker(workerScript)
  124. const { data, error } = await workerProcess.spawn()
  125. if (error) throw new Error(error)
  126. // d. Save the worker in the state.
  127. state.interfaces.push({
  128. id: md5(`${interfaceName}${workerScript}${mtime}`),
  129. interfaceName,
  130. workerScript,
  131. workerProcess,
  132. ports: [],
  133. connections: [],
  134. options: await options(workerProcess),
  135. })
  136. })
  137. await Promise.all(workerPromises)
  138. }
  139. async function interfaces(parent, args, context, info) {
  140. await findInterfaces()
  141. return state.interfaces.map(iface => ({
  142. ...iface,
  143. ports: ports(iface.interfaceName, args, context, info),
  144. // Serialize worker process
  145. workerProcess: workerProcess(iface.workerProcess, args, context, info),
  146. }))
  147. }
  148. async function interface(parent, args, context, info) {
  149. await findInterfaces()
  150. const { id, interfaceName } = args
  151. if (!id && !interfaceName) {
  152. throw new Error('Either id or interfaceName needs to be provided!')
  153. }
  154. const iface = state.interfaces.find(
  155. iface => iface.id === id || iface.interfaceName === interfaceName
  156. )
  157. if (!iface) {
  158. throw new Error(`Worker id=${id}, interfaceName=${interfaceName} not found`)
  159. }
  160. return {
  161. ...iface,
  162. workerProcess: workerProcess(iface.workerProcess, args, context, info),
  163. }
  164. }
  165. async function options(workerProcess) {
  166. const { data, error } = await workerProcess.send({
  167. type: 'options',
  168. })
  169. if (error) throw new Error(error)
  170. return data
  171. }
  172. function workerProcess(parent, args, ctx, info) {
  173. const {
  174. killed,
  175. exitCode,
  176. signalCode,
  177. spawnargs,
  178. spawnfile,
  179. pid,
  180. } = parent.pythonShell
  181. return {
  182. pid,
  183. killed,
  184. exitCode,
  185. signalCode,
  186. spawnfile,
  187. spawnargs,
  188. }
  189. }
  190. /**
  191. * PORTS SECTION
  192. */
  193. async function findPorts() {
  194. // 1. Make sure, workers are updated.
  195. await findInterfaces()
  196. // 2. Don't check more frequently than once per second.
  197. // if (state.lastScan.ports + 1000 > Date.now()) return null
  198. // state.lastScan.ports = Date.now()
  199. // 3. Loop through all workers to find available ports.
  200. const portsPromises = state.interfaces.map(async iface => {
  201. // a) Ask interface for ports.
  202. const { data, error, pythonError } = await iface.workerProcess.send({
  203. type: 'ports',
  204. })
  205. if (error) throw new Error(error)
  206. // b) Add all ports that are not in the list.
  207. data.forEach(port => {
  208. const id = port.name || port.device
  209. if (state.ports.find(port => port.id === id)) return null
  210. const newPort = {
  211. id,
  212. interfaceName: iface.interfaceName,
  213. host: HOST,
  214. ...port,
  215. }
  216. state.ports.push(newPort)
  217. iface.ports.push(newPort)
  218. })
  219. })
  220. await Promise.all(portsPromises)
  221. }
  222. async function ports(parent, args, ctx, info) {
  223. await findPorts()
  224. const { interfaceName } = args
  225. const ifName = interfaceName || parent
  226. if (ifName) {
  227. const iface = state.interfaces.find(iface => iface.interfaceName === ifName)
  228. if (!iface) throw new Error(`Interface ${ifName} not found.`)
  229. return iface.ports
  230. } else {
  231. return state.ports
  232. }
  233. }
  234. async function port(parent, args, ctx, info) {
  235. await findPorts()
  236. const { id } = args
  237. if (!id) throw new Error('Need an id.')
  238. const port = state.ports.find(port => port.id === id)
  239. return port
  240. }
  241. /**
  242. * CONNECTION SECTION
  243. */
  244. async function connect(parent, args, ctx, info) {
  245. await findPorts()
  246. const { portId } = args
  247. const port = state.ports.find(port => port.id === portId)
  248. if (!port) throw new Error(`Port ${portId} not found`)
  249. const iface = state.interfaces.find(
  250. iface => iface.interfaceName === port.interfaceName
  251. )
  252. const id = md5(iface.interfaceName + iface.device)
  253. if (iface.connections.find(connection => connection.id === id)) {
  254. throw new Error('already connected.')
  255. }
  256. const pythonWorker = new PythonWorker(iface.workerScript)
  257. const spawnData = await pythonWorker.spawn()
  258. if (spawnData.error) throw new Error(spawnData.error)
  259. const connection = {
  260. id,
  261. port,
  262. workerProcess: pythonWorker,
  263. }
  264. const connectionData = await connection.workerProcess.send({
  265. type: 'connect',
  266. device: port.device,
  267. })
  268. if (connectionData.error) throw new Error(connectionData.error)
  269. iface.connections.push(connection)
  270. state.connections.push(connection)
  271. return {
  272. ...connection,
  273. workerProcess: workerProcess(pythonWorker),
  274. }
  275. }
  276. async function connections(parent, args, ctx, info) {
  277. if (parent) {
  278. const iface = state.interfaces.find(iface => iface.interfaceName === parent)
  279. return iface.connections.map(connection => {
  280. return {
  281. ...connection,
  282. workerProcess: workerProcess(connection.workerProcess),
  283. }
  284. })
  285. } else {
  286. return state.connections.map(connection => {
  287. return {
  288. ...connection,
  289. workerProcess: workerProcess(connection.workerProcess),
  290. }
  291. })
  292. }
  293. }
  294. async function connection(parent, args, context, info) {
  295. const connection = state.connections.find(
  296. connection => connection.id === args.id
  297. )
  298. return {
  299. ...connection,
  300. workerProcess: workerProcess(connection.workerProcess),
  301. }
  302. }
  303. async function sendCommand(parent, args, ctx, info) {
  304. const { connectionId, command } = args
  305. const connection = state.connections.find(
  306. connection => connection.id === connectionId
  307. )
  308. const { data, error } = await connection.workerProcess.send({ ...command })
  309. if (error) throw new Error(JSON.stringify(error))
  310. return data.response
  311. }
  312. // TODO Also find connections in interfaces.
  313. async function endConnection(parent, args, ctx, info) {
  314. const { connectionId } = args
  315. const connectionIndex = state.connections.findIndex(
  316. connection => connection.id === connectionId
  317. )
  318. const connection = state.connections[connectionIndex]
  319. const iface = state.interfaces.find(
  320. iface => (iface.interfaceName = connection.interfaceName)
  321. )
  322. const { data, error } = await connection.workerProcess.end()
  323. if (error) throw new Error(JSON.stringify(error))
  324. state.connections.splice(connectionIndex, 1)
  325. return connection
  326. }
  327. async function killConnection(parent, args, ctx, info) {
  328. const { connectionId } = args
  329. const connectionIndex = state.connections.findIndex(
  330. connection => connection.id === connectionId
  331. )
  332. const connection = state.connections[connectionIndex]
  333. const { data, error } = await connection.workerProcess.kill()
  334. if (error) throw new Error(JSON.stringify(error))
  335. state.connections.splice(connectionIndex, 1)
  336. return connection
  337. }
  338. const resolvers = {
  339. Query: {
  340. interfaces,
  341. interface,
  342. ports,
  343. port,
  344. connections,
  345. connection,
  346. },
  347. Mutation: {
  348. connect,
  349. sendCommand,
  350. endConnection,
  351. killConnection,
  352. },
  353. }
  354. module.exports = { typeDefs, resolvers }