interfaces.js 8.8 KB


  1. const fs = require('fs')
  2. const os = require('os')
  3. const md5 = require('md5')
  4. const { promisify } = require('util')
  5. const PythonWorker = require('./pythonWorker')
  6. const readdir = promisify(fs.readdir)
  7. const state = {
  8. workers: [],
  9. interfaces: [],
  10. ports: [],
  11. connections: []
  12. }
  13. const typeDefs = `
  14. type Option {
  15. name: String!
  16. type: String!
  17. description: String
  18. values: [String!]
  19. }
  20. type Port {
  21. id: String!
  22. interfaceName: String!
  23. host: String!
  24. device: String!
  25. name: String
  26. description: String
  27. }
  28. type Worker {
  29. pid: Int
  30. killed: Boolean!
  31. signalCode: String
  32. exitCode: Int
  33. spawnfile: String!
  34. spawnargs: [String]!
  35. error: Int!
  36. data: Int!
  37. }
  38. type Connection {
  39. id: ID!
  40. interfaceName: String!
  41. host: String!
  42. device: String!
  43. workerInfo: Worker
  44. }
  45. type Interface {
  46. interfaceName: String!
  47. host: String!
  48. workerScript: String!
  49. workerInfo: Worker
  50. ports: [Port]!
  51. connections: [Connection]!
  52. options: [Option]!
  53. }
  54. extend type Query {
  55. interfaces(force: Boolean): [Interface]!
  56. ports(interfaceName: String, force: Boolean): [Port]!
  57. connections: [Connection]!
  58. connection(id: ID!): Connection!
  59. }
  60. extend type Mutation {
  61. connect(interfaceName: String!, device: String!): Connection!
  62. spawnWorker(id: ID!): Worker!
  63. endWorker(id: ID!): Worker!
  64. killWorker(id: ID!): Worker!
  65. connectionCommand(connectionId: ID!, type: String!, string: String!, options: String): String!
  66. }
  67. `
  68. async function findWorkers() {
  69. // Find all files in ./python_workers that end in _worker.py
  70. const fileNames = await readdir(`${__dirname}/python_workers`)
  71. const workerFiles = fileNames.filter(fileName => fileName.includes('_worker.py'))
  72. // Find the added workers
  73. workerFiles.forEach(workerFile => {
  74. const interfaceName = workerFile.replace(/_worker\.py/, '')
  75. if (state.workers.find(worker => worker.interfaceName === interfaceName)) return null
  76. const workerScript = `${__dirname}/python_workers/${workerFile}`
  77. state.workers.push({
  78. interfaceName,
  79. workerScript
  80. })
  81. })
  82. }
  83. async function findInterfaces() {
  84. // Try to identify workers if necessary
  85. //if (!state.workers.length)
  86. await findWorkers()
  87. // Generate an interface for each worker
  88. const workerPromises = state.workers.map(async worker => {
  89. const { workerScript, interfaceName } = worker
  90. // Skip existing interfaces
  91. if (state.interfaces.find(iface => iface.interfaceName === interfaceName)) return null
  92. const pythonWorker = new PythonWorker(workerScript)
  93. const { res, error } = await pythonWorker.spawn()
  94. if (error) {
  95. return
  96. }
  97. state.interfaces.push({
  98. interfaceName,
  99. workerScript,
  100. worker: pythonWorker,
  101. ports: [],
  102. connections: [],
  103. options: []
  104. })
  105. })
  106. await Promise.all(workerPromises)
  107. }
  108. async function interfaces(parent, args, ctx, info) {
  109. const { force } = args
  110. // Try to identify interfaces if necessary
  111. //if (!state.interfaces.length || force)
  112. await findInterfaces()
  113. return state.interfaces.map(iface => {
  114. const { workerScript, interfaceName, worker } = iface
  115. return {
  116. interfaceName,
  117. host: os.hostname(),
  118. workerScript,
  119. workerInfo: workerInfo(worker, args, ctx, info),
  120. ports: ports(interfaceName, args, ctx, info),
  121. connections: connections(interfaceName, args, ctx, info),
  122. options: options(interfaceName, args, ctx, info)
  123. }
  124. })
  125. }
  126. async function findPorts(interfaceName) {
  127. // Generate all ports for the interface
  128. const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
  129. const { data, error, pythonError } = await iface.worker.send({ type: 'ports' })
  130. if (error) throw new Error(error)
  131. if (pythonError) throw new Error(pythonError)
  132. data.forEach(port => {
  133. const id = port.name || port.device
  134. // Skip existing ports
  135. if (state.ports.find(port => port.id === id)) return null
  136. const newPort = {
  137. id,
  138. interfaceName,
  139. host: os.hostname(),
  140. ...port
  141. }
  142. state.ports.push(newPort)
  143. iface.ports.push(newPort)
  144. })
  145. }
  146. async function ports(parent, args, ctx, info) {
  147. const { force, interfaceName } = args
  148. const ifName = interfaceName || parent
  149. if (ifName) {
  150. const iface = state.interfaces.find(iface => iface.interfaceName === ifName)
  151. // Try to find ports if necessary
  152. if (!iface.ports.length || force) await findPorts(ifName)
  153. return iface.ports
  154. } else {
  155. return state.ports
  156. }
  157. }
  158. async function findOptions(interfaceName) {
  159. const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
  160. const { data, error, pythonError } = await iface.worker.send({ type: 'options' })
  161. if (error) throw new Error(error)
  162. if (pythonError) throw new Error(pythonError)
  163. iface.options.push(...data)
  164. }
  165. async function options(parent, args, ctx, info) {
  166. const iface = state.interfaces.find(iface => iface.interfaceName === parent)
  167. // Try to find options if necessary
  168. if (!iface.options.length) await findOptions(parent)
  169. return iface.options
  170. }
  171. function workerInfo(parent, args, ctx, info) {
  172. const { killed, exitCode, signalCode, spawnargs, spawnfile, pid } = parent.pythonShell
  173. return {
  174. pid,
  175. killed,
  176. exitCode,
  177. signalCode,
  178. spawnfile,
  179. spawnargs,
  180. error: parent.error.length,
  181. data: parent.data.length
  182. }
  183. }
  184. async function connections(parent, args, ctx, info) {
  185. if (parent) {
  186. const iface = state.interfaces.find(iface => iface.interfaceName === parent)
  187. return iface.connections
  188. } else {
  189. return state.connections
  190. }
  191. }
  192. async function connection(parent, args, context, info) {
  193. const connection = state.connections.find(connection => connection.id === args.id)
  194. return connection
  195. }
  196. async function connect(parent, args, ctx, info) {
  197. const { interfaceName, device } = args
  198. const iface = state.interfaces.find(iface => iface.interfaceName === interfaceName)
  199. const id = md5(interfaceName + device)
  200. if (iface.connections.find(connection => connection.id === id)) throw new Error('already connected.')
  201. const pythonWorker = new PythonWorker(iface.workerScript)
  202. const spawnData = await pythonWorker.spawn()
  203. if (spawnData.error) throw new Error(spawnData.error)
  204. const connection = {
  205. id,
  206. device,
  207. interfaceName,
  208. host: os.hostname(),
  209. worker: pythonWorker,
  210. workerInfo: (parent, args, context, info) => workerInfo(pythonWorker, args, context, info)
  211. }
  212. const connectionData = await connection.worker.send({ type: 'connect', device })
  213. if (connectionData.error) throw new Error(connectionData.error)
  214. if (connectionData.pythonError) throw new Error(connectionData.pythonError)
  215. iface.connections.push(connection)
  216. state.connections.push(connection)
  217. return connection
  218. }
  219. async function connectionCommand(parent, args, ctx, info) {
  220. const { connectionId, type, string, options } = args
  221. const connection = state.connections.find(connection => connection.id === connectionId)
  222. const { data, error, pythonError } = await connection.worker.send({ type, string, options })
  223. if (error) throw new Error(JSON.stringify(error))
  224. if (pythonError) throw new Error(JSON.stringify(pythonError))
  225. return data.response
  226. }
  227. //TODO Also find connections in interfaces.
  228. async function endWorker(parent, args, ctx, info) {
  229. const { id } = args
  230. const connection = state.connections.find(connection => connection.id === id)
  231. const { data, error, pythonError } = await connection.worker.end()
  232. if (error) throw new Error(JSON.stringify(error))
  233. if (pythonError.error !== 0) throw new Error(JSON.stringify(pythonError))
  234. return connection.workerInfo()
  235. }
  236. async function spawnWorker(parent, args, ctx, info) {
  237. const { id } = args
  238. const connection = state.connections.find(connection => connection.id === id)
  239. const { data, error, pythonError } = await connection.worker.spawn()
  240. console.log(data, error, pythonError)
  241. if (error) throw new Error(JSON.stringify(error))
  242. if (pythonError) throw new Error(JSON.stringify(pythonError))
  243. const connectionData = await connection.worker.send({ type: 'connect', device: connection.device })
  244. if (connectionData.error) throw new Error(connectionData.error)
  245. if (connectionData.pythonError) throw new Error(connectionData.pythonError)
  246. return connection.workerInfo()
  247. }
  248. async function killWorker(parent, args, ctx, info) {
  249. const { id } = args
  250. const connection = state.connections.find(connection => connection.id === id)
  251. const { data, error, pythonError } = await connection.worker.kill()
  252. console.log(data, error, pythonError)
  253. if (error) throw new Error(JSON.stringify(error))
  254. if (pythonError) throw new Error(JSON.stringify(pythonError))
  255. return connection.workerInfo()
  256. }
  257. const resolvers = {
  258. Query: {
  259. interfaces,
  260. ports,
  261. connections,
  262. connection
  263. },
  264. Mutation: {
  265. connect,
  266. connectionCommand,
  267. killWorker,
  268. endWorker,
  269. spawnWorker
  270. }
  271. }
  272. module.exports = { typeDefs, resolvers }