1
0
Files
electrum/electrum/sql_db.py
SomberNight 2c57c78ebe asyncio: stop using get_event_loop(). introduce ~singleton loop.
asyncio.get_event_loop() became deprecated in python3.10. (see https://github.com/python/cpython/issues/83710)
```
.../electrum/electrum/daemon.py:470: DeprecationWarning: There is no current event loop
  self.asyncio_loop = asyncio.get_event_loop()
.../electrum/electrum/network.py:276: DeprecationWarning: There is no current event loop
  self.asyncio_loop = asyncio.get_event_loop()
```
Also, according to that thread, "set_event_loop() [... is] not deprecated by oversight".
So, we stop using get_event_loop() and set_event_loop() in our own code.
Note that libraries we use (such as the stdlib for python <3.10), might call get_event_loop,
which then relies on us having called set_event_loop e.g. for the GUI thread. To work around
this, a custom event loop policy providing a get_event_loop implementation is used.

Previously, we have been using a single asyncio event loop, created with
util.create_and_start_event_loop, and code in many places got a reference to this loop
using asyncio.get_event_loop().
Now, we still use a single asyncio event loop, but it is now stored as a global in
util._asyncio_event_loop (access with util.get_asyncio_loop()).

I believe these changes also fix https://github.com/spesmilo/electrum/issues/5376
2022-04-29 18:49:07 +02:00

77 lines
2.4 KiB
Python

import os
import queue
import threading
import asyncio
import sqlite3
from .logging import Logger
from .util import test_read_write_permissions
def sql(func):
"""wrapper for sql methods
returns an awaitable asyncio.Future
"""
def wrapper(self: 'SqlDB', *args, **kwargs):
assert threading.current_thread() != self.sql_thread
f = self.asyncio_loop.create_future()
self.db_requests.put((f, func, args, kwargs))
return f
return wrapper
class SqlDB(Logger):
def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None):
Logger.__init__(self)
self.asyncio_loop = asyncio_loop
self.stopping = False
self.stopped_event = asyncio.Event()
self.path = path
test_read_write_permissions(path)
self.commit_interval = commit_interval
self.db_requests = queue.Queue()
self.sql_thread = threading.Thread(target=self.run_sql)
self.sql_thread.start()
def stop(self):
self.stopping = True
def filesize(self):
return os.stat(self.path).st_size
def run_sql(self):
self.logger.info("SQL thread started")
self.conn = sqlite3.connect(self.path)
self.logger.info("Creating database")
self.create_database()
i = 0
while not self.stopping and self.asyncio_loop.is_running():
try:
future, func, args, kwargs = self.db_requests.get(timeout=0.1)
except queue.Empty:
continue
try:
result = func(self, *args, **kwargs)
except BaseException as e:
self.asyncio_loop.call_soon_threadsafe(future.set_exception, e)
continue
if not future.cancelled():
self.asyncio_loop.call_soon_threadsafe(future.set_result, result)
# note: in sweepstore session.commit() is called inside
# the sql-decorated methods, so commiting to disk is awaited
if self.commit_interval:
i = (i + 1) % self.commit_interval
if i == 0:
self.conn.commit()
# write
self.conn.commit()
self.conn.close()
self.logger.info("SQL thread terminated")
self.asyncio_loop.call_soon_threadsafe(self.stopped_event.set)
def create_database(self):
raise NotImplementedError()