network.best_effort_reliable: use curio APIs instead of asyncio
This commit is contained in:
@@ -37,6 +37,7 @@ import traceback
|
|||||||
import concurrent
|
import concurrent
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
import copy
|
import copy
|
||||||
|
import functools
|
||||||
|
|
||||||
import aiorpcx
|
import aiorpcx
|
||||||
from aiorpcx import TaskGroup, ignore_after
|
from aiorpcx import TaskGroup, ignore_after
|
||||||
@@ -829,40 +830,39 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def best_effort_reliable(func):
|
def best_effort_reliable(func):
|
||||||
|
@functools.wraps(func)
|
||||||
async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
|
async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
iface = self.interface
|
iface = self.interface
|
||||||
# retry until there is a main interface
|
# retry until there is a main interface
|
||||||
if not iface:
|
if not iface:
|
||||||
try:
|
async with ignore_after(1):
|
||||||
await asyncio.wait_for(self.default_server_changed_event.wait(), 1)
|
await self.default_server_changed_event.wait()
|
||||||
except asyncio.TimeoutError:
|
|
||||||
pass
|
|
||||||
continue # try again
|
continue # try again
|
||||||
assert iface.ready.done(), "interface not ready yet"
|
assert iface.ready.done(), "interface not ready yet"
|
||||||
# try actual request
|
# try actual request
|
||||||
success_fut = asyncio.ensure_future(func(self, *args, **kwargs))
|
try:
|
||||||
await asyncio.wait([success_fut, iface.got_disconnected.wait()], return_when=asyncio.FIRST_COMPLETED)
|
async with TaskGroup(wait=any) as group:
|
||||||
if success_fut.done() and not success_fut.cancelled():
|
task = await group.spawn(func(self, *args, **kwargs))
|
||||||
if success_fut.exception():
|
await group.spawn(iface.got_disconnected.wait())
|
||||||
try:
|
except RequestTimedOut:
|
||||||
raise success_fut.exception()
|
await iface.close()
|
||||||
except RequestTimedOut:
|
await iface.got_disconnected.wait()
|
||||||
await iface.close()
|
continue # try again
|
||||||
await iface.got_disconnected.wait()
|
except RequestCorrupted as e:
|
||||||
continue # try again
|
# TODO ban server?
|
||||||
except RequestCorrupted as e:
|
iface.logger.exception(f"RequestCorrupted: {e}")
|
||||||
# TODO ban server?
|
await iface.close()
|
||||||
iface.logger.exception(f"RequestCorrupted: {e}")
|
await iface.got_disconnected.wait()
|
||||||
await iface.close()
|
continue # try again
|
||||||
await iface.got_disconnected.wait()
|
if task.done() and not task.cancelled():
|
||||||
continue # try again
|
return task.result()
|
||||||
return success_fut.result()
|
|
||||||
# otherwise; try again
|
# otherwise; try again
|
||||||
raise BestEffortRequestFailed('no interface to do request on... gave up.')
|
raise BestEffortRequestFailed('no interface to do request on... gave up.')
|
||||||
return make_reliable_wrapper
|
return make_reliable_wrapper
|
||||||
|
|
||||||
def catch_server_exceptions(func):
|
def catch_server_exceptions(func):
|
||||||
|
@functools.wraps(func)
|
||||||
async def wrapper(self, *args, **kwargs):
|
async def wrapper(self, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
return await func(self, *args, **kwargs)
|
return await func(self, *args, **kwargs)
|
||||||
|
|||||||
Reference in New Issue
Block a user