pydux_sqlite.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import sqlite3
  2. import json
  3. import datetime
  4. import threading
  5. import queue
  6. db_path = 'state.db'
  7. def sql_thread(command_queue, response_queue):
  8. task_done = False
  9. try:
  10. connection = sqlite3.connect(db_path)
  11. cursor = connection.cursor()
  12. cursor.execute('CREATE TABLE IF NOT EXISTS actions (id INTEGER PRIMARY KEY, date TEXT, action TEXT)')
  13. cursor.execute('CREATE TABLE IF NOT EXISTS state (id INTEGER PRIMARY KEY, date TEXT, state TEXT)')
  14. connection.commit()
  15. except Exception as error:
  16. response_queue.put(error)
  17. while not task_done:
  18. try:
  19. table, data = command_queue.get()
  20. if table == 'actions':
  21. cursor.execute('INSERT INTO actions VALUES (NULL, ?, ?)', data)
  22. connection.commit()
  23. elif table == 'state':
  24. cursor.execute('INSERT INTO state VALUES (NULL, ?, ?)', data)
  25. connection.commit()
  26. elif table == 'DONE':
  27. task_done = True
  28. response_queue.put('Bye!')
  29. else:
  30. response_queue.put(ValueError('table {} doesn\'t exist.'.format(table)))
  31. except Exception as error:
  32. response_queue.put(error)
  33. command_queue = queue.Queue()
  34. response_queue = queue.Queue()
  35. thread = threading.Thread(target=sql_thread, args=(command_queue, response_queue))
  36. def middleware(store):
  37. dispatch = store['dispatch']
  38. get_state = store['get_state']
  39. def wrapper(next_):
  40. def update_db(action):
  41. state = get_state()
  42. now = datetime.datetime.now()
  43. command_queue.put(('actions', [now, json.dumps(action)]))
  44. command_queue.put(('state', [now, json.dumps(state)]))
  45. return next_(action)
  46. return update_db
  47. return wrapper
  48. if __name__ == "__main__":
  49. import unittest
  50. class TestSQLite(unittest.TestCase):
  51. def setUp(self):
  52. db_path = 'test_state.db'
  53. self.command_queue = queue.Queue()
  54. self.response_queue = queue.Queue()
  55. self.test_thread = threading.Thread(target=sql_thread, args=(command_queue, response_queue))
  56. self.test_thread.start()
  57. def tearDown(self):
  58. command_queue.put(('DONE', None))
  59. self.test_thread.join()
  60. def test_middleware(self):
  61. store = {
  62. 'dispatch': lambda state, action: state,
  63. 'get_state': lambda: {}
  64. }
  65. wrapper = middleware(store)
  66. self.assertTrue(callable(wrapper))
  67. update_fn = wrapper(lambda action: None)
  68. self.assertTrue(callable(update_fn))
  69. action = {'type': 'module/ACTION', 'value': 5}
  70. state = {}
  71. update_fn(action)
  72. action_table = command_queue.get()
  73. state_table = command_queue.get()
  74. self.assertEqual(action_table, ('actions', [datetime.datetime.now(), json.dumps(action)]))
  75. self.assertEqual(state_table, ('state', [datetime.datetime.now(), json.dumps(state)]))
  76. unittest.main()