pydux_sqlite.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import sqlite3
  2. import json
  3. import datetime, time
  4. import threading
  5. import queue
  6. db_path = 'state.db'
  7. def sql_thread(command_queue, response_queue):
  8. connection = sqlite3.connect(db_path)
  9. cursor = connection.cursor()
  10. cursor.execute('CREATE TABLE IF NOT EXISTS actions (id INTEGER PRIMARY KEY, date TEXT, action TEXT)')
  11. cursor.execute('CREATE TABLE IF NOT EXISTS state (id INTEGER PRIMARY KEY, date TEXT, state TEXT)')
  12. connection.commit()
  13. task_done = False
  14. while not task_done:
  15. table, data = command_queue.get()
  16. try:
  17. if table == 'actions':
  18. cursor.execute('INSERT INTO actions VALUES (NULL, ?, ?)', data)
  19. connection.commit()
  20. elif table == 'state':
  21. cursor.execute('INSERT INTO state VALUES (NULL, ?, ?)', data)
  22. connection.commit()
  23. elif table == 'DONE':
  24. task_done = True
  25. response_queue.put('Bye!')
  26. else:
  27. response_queue.put(ValueError('table {} doesn\'t exist.'.format(table)))
  28. except Exception as error:
  29. response_queue.put(error)
  30. command_queue = queue.Queue()
  31. response_queue = queue.Queue()
  32. t = threading.Thread(target=sql_thread, args=(command_queue, response_queue))
  33. t.start()
  34. def middleware(store):
  35. dispatch = store['dispatch']
  36. get_state = store['get_state']
  37. def wrapper(next_):
  38. def update_db(action):
  39. state = get_state()
  40. now = datetime.datetime.now()
  41. command_queue.put(('actions', [now, json.dumps(action)]))
  42. command_queue.put(('state', [now, json.dumps(state)]))
  43. return next_(action)
  44. return update_db
  45. return wrapper