network: fix another race in session.subscribe
key in session.subscriptions does not imply key in session.cache
This commit is contained in:
@@ -29,6 +29,7 @@ import sys
|
|||||||
import traceback
|
import traceback
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import Tuple, Union
|
from typing import Tuple, Union
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
import aiorpcx
|
import aiorpcx
|
||||||
from aiorpcx import ClientSession, Notification
|
from aiorpcx import ClientSession, Notification
|
||||||
@@ -46,7 +47,7 @@ class NotificationSession(ClientSession):
|
|||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(NotificationSession, self).__init__(*args, **kwargs)
|
super(NotificationSession, self).__init__(*args, **kwargs)
|
||||||
self.subscriptions = {}
|
self.subscriptions = defaultdict(list)
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
|
|
||||||
async def handle_request(self, request):
|
async def handle_request(self, request):
|
||||||
@@ -70,12 +71,13 @@ class NotificationSession(ClientSession):
|
|||||||
timeout)
|
timeout)
|
||||||
|
|
||||||
async def subscribe(self, method, params, queue):
|
async def subscribe(self, method, params, queue):
|
||||||
|
# note: until the cache is written for the first time,
|
||||||
|
# each 'subscribe' call might make a request on the network.
|
||||||
key = self.get_index(method, params)
|
key = self.get_index(method, params)
|
||||||
if key in self.subscriptions:
|
self.subscriptions[key].append(queue)
|
||||||
self.subscriptions[key].append(queue)
|
if key in self.cache:
|
||||||
result = self.cache[key]
|
result = self.cache[key]
|
||||||
else:
|
else:
|
||||||
self.subscriptions[key] = [queue]
|
|
||||||
result = await self.send_request(method, params)
|
result = await self.send_request(method, params)
|
||||||
self.cache[key] = result
|
self.cache[key] = result
|
||||||
await queue.put(params + [result])
|
await queue.put(params + [result])
|
||||||
|
|||||||
Reference in New Issue
Block a user