1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- import sqlite3
- import json
- import datetime, time
- import threading
- import queue
- db_path = 'state.db'
- def sql_thread(command_queue, response_queue):
- connection = sqlite3.connect(db_path)
- cursor = connection.cursor()
- cursor.execute('CREATE TABLE IF NOT EXISTS actions (id INTEGER PRIMARY KEY, date TEXT, action TEXT)')
- cursor.execute('CREATE TABLE IF NOT EXISTS state (id INTEGER PRIMARY KEY, date TEXT, state TEXT)')
- connection.commit()
- task_done = False
- while not task_done:
- table, data = command_queue.get()
- try:
- if table == 'actions':
- cursor.execute('INSERT INTO actions VALUES (NULL, ?, ?)', data)
- connection.commit()
- elif table == 'state':
- cursor.execute('INSERT INTO state VALUES (NULL, ?, ?)', data)
- connection.commit()
- elif table == 'DONE':
- task_done = True
- response_queue.put('Bye!')
- else:
- response_queue.put(ValueError('table {} doesn\'t exist.'.format(table)))
- except Exception as error:
- response_queue.put(error)
- command_queue = queue.Queue()
- response_queue = queue.Queue()
- t = threading.Thread(target=sql_thread, args=(command_queue, response_queue))
- t.start()
- def middleware(store):
- dispatch = store['dispatch']
- get_state = store['get_state']
- def wrapper(next_):
- def update_db(action):
- state = get_state()
- now = datetime.datetime.now()
- command_queue.put(('actions', [now, json.dumps(action)]))
- command_queue.put(('state', [now, json.dumps(state)]))
- return next_(action)
- return update_db
- return wrapper
|