interface: throttle messages
This commit is contained in:
@@ -49,6 +49,7 @@ class NotificationSession(ClientSession):
|
|||||||
super(NotificationSession, self).__init__(*args, **kwargs)
|
super(NotificationSession, self).__init__(*args, **kwargs)
|
||||||
self.subscriptions = defaultdict(list)
|
self.subscriptions = defaultdict(list)
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
|
self.in_flight_requests_semaphore = asyncio.Semaphore(100)
|
||||||
|
|
||||||
async def handle_request(self, request):
|
async def handle_request(self, request):
|
||||||
# note: if server sends malformed request and we raise, the superclass
|
# note: if server sends malformed request and we raise, the superclass
|
||||||
@@ -64,11 +65,14 @@ class NotificationSession(ClientSession):
|
|||||||
assert False, request.method
|
assert False, request.method
|
||||||
|
|
||||||
async def send_request(self, *args, timeout=-1, **kwargs):
|
async def send_request(self, *args, timeout=-1, **kwargs):
|
||||||
|
# note: the timeout starts after the request touches the wire!
|
||||||
if timeout == -1:
|
if timeout == -1:
|
||||||
timeout = 20 if not self.proxy else 30
|
timeout = 20 if not self.proxy else 30
|
||||||
return await asyncio.wait_for(
|
# note: the semaphore implementation guarantees no starvation
|
||||||
super().send_request(*args, **kwargs),
|
async with self.in_flight_requests_semaphore:
|
||||||
timeout)
|
return await asyncio.wait_for(
|
||||||
|
super().send_request(*args, **kwargs),
|
||||||
|
timeout)
|
||||||
|
|
||||||
async def subscribe(self, method, params, queue):
|
async def subscribe(self, method, params, queue):
|
||||||
# note: until the cache is written for the first time,
|
# note: until the cache is written for the first time,
|
||||||
|
|||||||
Reference in New Issue
Block a user