123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import sqlite3
- import json
- import datetime
- import threading
- import queue
- db_path = 'state.db'
- def sql_thread(command_queue, response_queue):
- task_done = False
- try:
- connection = sqlite3.connect(db_path)
- cursor = connection.cursor()
- cursor.execute('CREATE TABLE IF NOT EXISTS actions (id INTEGER PRIMARY KEY, date TEXT, action TEXT)')
- cursor.execute('CREATE TABLE IF NOT EXISTS state (id INTEGER PRIMARY KEY, date TEXT, state TEXT)')
- connection.commit()
- except Exception as error:
- response_queue.put(error)
- while not task_done:
- try:
- table, data = command_queue.get()
- if table == 'actions':
- cursor.execute('INSERT INTO actions VALUES (NULL, ?, ?)', data)
- connection.commit()
- elif table == 'state':
- cursor.execute('INSERT INTO state VALUES (NULL, ?, ?)', data)
- connection.commit()
- elif table == 'DONE':
- task_done = True
- response_queue.put('Bye!')
- else:
- response_queue.put(ValueError('table {} doesn\'t exist.'.format(table)))
- except Exception as error:
- response_queue.put(error)
- command_queue = queue.Queue()
- response_queue = queue.Queue()
- thread = threading.Thread(target=sql_thread, args=(command_queue, response_queue))
- def middleware(store):
- dispatch = store['dispatch']
- get_state = store['get_state']
- def wrapper(next_):
- def update_db(action):
- state = get_state()
- now = datetime.datetime.now()
- command_queue.put(('actions', [now, json.dumps(action)]))
- command_queue.put(('state', [now, json.dumps(state)]))
- return next_(action)
- return update_db
- return wrapper
- if __name__ == "__main__":
- import unittest
- class TestSQLite(unittest.TestCase):
- def setUp(self):
- db_path = 'test_state.db'
- self.command_queue = queue.Queue()
- self.response_queue = queue.Queue()
- self.test_thread = threading.Thread(target=sql_thread, args=(command_queue, response_queue))
- self.test_thread.start()
- def tearDown(self):
- command_queue.put(('DONE', None))
- self.test_thread.join()
- def test_middleware(self):
- store = {
- 'dispatch': lambda state, action: state,
- 'get_state': lambda: {}
- }
- wrapper = middleware(store)
- self.assertTrue(callable(wrapper))
- update_fn = wrapper(lambda action: None)
- self.assertTrue(callable(update_fn))
- action = {'type': 'module/ACTION', 'value': 5}
- state = {}
- update_fn(action)
- action_table = command_queue.get()
- state_table = command_queue.get()
- self.assertEqual(action_table, ('actions', [datetime.datetime.now(), json.dumps(action)]))
- self.assertEqual(state_table, ('state', [datetime.datetime.now(), json.dumps(state)]))
- unittest.main()
|