import sqlite3 import json import datetime import threading import queue db_path = 'state.db' def sql_thread(command_queue, response_queue): task_done = False try: connection = sqlite3.connect(db_path) cursor = connection.cursor() cursor.execute('CREATE TABLE IF NOT EXISTS actions (id INTEGER PRIMARY KEY, date TEXT, action TEXT)') cursor.execute('CREATE TABLE IF NOT EXISTS state (id INTEGER PRIMARY KEY, date TEXT, state TEXT)') connection.commit() except Exception as error: response_queue.put(error) while not task_done: try: table, data = command_queue.get() if table == 'actions': cursor.execute('INSERT INTO actions VALUES (NULL, ?, ?)', data) connection.commit() elif table == 'state': cursor.execute('INSERT INTO state VALUES (NULL, ?, ?)', data) connection.commit() elif table == 'DONE': task_done = True response_queue.put('Bye!') else: response_queue.put(ValueError('table {} doesn\'t exist.'.format(table))) except Exception as error: response_queue.put(error) command_queue = queue.Queue() response_queue = queue.Queue() thread = threading.Thread(target=sql_thread, args=(command_queue, response_queue)) def middleware(store): dispatch = store['dispatch'] get_state = store['get_state'] def wrapper(next_): def update_db(action): state = get_state() now = datetime.datetime.now() command_queue.put(('actions', [now, json.dumps(action)])) command_queue.put(('state', [now, json.dumps(state)])) return next_(action) return update_db return wrapper if __name__ == "__main__": import unittest class TestSQLite(unittest.TestCase): def setUp(self): db_path = 'test_state.db' self.command_queue = queue.Queue() self.response_queue = queue.Queue() self.test_thread = threading.Thread(target=sql_thread, args=(command_queue, response_queue)) self.test_thread.start() def tearDown(self): command_queue.put(('DONE', None)) self.test_thread.join() def test_middleware(self): store = { 'dispatch': lambda state, action: state, 'get_state': lambda: {} } wrapper = middleware(store) self.assertTrue(callable(wrapper)) update_fn = wrapper(lambda action: None) self.assertTrue(callable(update_fn)) action = {'type': 'module/ACTION', 'value': 5} state = {} update_fn(action) action_table = command_queue.get() state_table = command_queue.get() self.assertEqual(action_table, ('actions', [datetime.datetime.now(), json.dumps(action)])) self.assertEqual(state_table, ('state', [datetime.datetime.now(), json.dumps(state)])) unittest.main()