network: rm server_queue
it's no longer needed; now it was just an extra level of indirection
This commit is contained in:
@@ -245,7 +245,6 @@ class Network(Logger):
|
|||||||
interface: Optional[Interface]
|
interface: Optional[Interface]
|
||||||
interfaces: Dict[ServerAddr, Interface]
|
interfaces: Dict[ServerAddr, Interface]
|
||||||
connecting: Set[ServerAddr]
|
connecting: Set[ServerAddr]
|
||||||
server_queue: 'Optional[queue.Queue[ServerAddr]]'
|
|
||||||
default_server: ServerAddr
|
default_server: ServerAddr
|
||||||
_recent_servers: List[ServerAddr]
|
_recent_servers: List[ServerAddr]
|
||||||
|
|
||||||
@@ -310,7 +309,6 @@ class Network(Logger):
|
|||||||
self.interfaces = {}
|
self.interfaces = {}
|
||||||
self.auto_connect = self.config.get('auto_connect', True)
|
self.auto_connect = self.config.get('auto_connect', True)
|
||||||
self.connecting = set()
|
self.connecting = set()
|
||||||
self.server_queue = None
|
|
||||||
self.proxy = None
|
self.proxy = None
|
||||||
|
|
||||||
# Dump network messages (all interfaces). Set at runtime from the console.
|
# Dump network messages (all interfaces). Set at runtime from the console.
|
||||||
@@ -537,18 +535,6 @@ class Network(Logger):
|
|||||||
out = filter_noonion(out)
|
out = filter_noonion(out)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
def _start_interface(self, server: ServerAddr):
|
|
||||||
if server in self.interfaces or server in self.connecting:
|
|
||||||
return
|
|
||||||
if server == self.default_server:
|
|
||||||
self.logger.info(f"connecting to {server} as new interface")
|
|
||||||
self._set_status('connecting')
|
|
||||||
self.connecting.add(server)
|
|
||||||
self.server_queue.put(server)
|
|
||||||
# update _last_tried_server
|
|
||||||
last_time, num_attempts = self._last_tried_server.get(server, (0, 0))
|
|
||||||
self._last_tried_server[server] = time.time(), num_attempts + 1
|
|
||||||
|
|
||||||
def _can_retry_server(self, server: ServerAddr, *, now: float = None) -> bool:
|
def _can_retry_server(self, server: ServerAddr, *, now: float = None) -> bool:
|
||||||
if now is None:
|
if now is None:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
@@ -700,11 +686,11 @@ class Network(Logger):
|
|||||||
if old_server and old_server != server:
|
if old_server and old_server != server:
|
||||||
await self._close_interface(old_interface)
|
await self._close_interface(old_interface)
|
||||||
if len(self.interfaces) <= self.num_server:
|
if len(self.interfaces) <= self.num_server:
|
||||||
self._start_interface(old_server)
|
await self.taskgroup.spawn(self._run_new_interface(old_server))
|
||||||
|
|
||||||
if server not in self.interfaces:
|
if server not in self.interfaces:
|
||||||
self.interface = None
|
self.interface = None
|
||||||
self._start_interface(server)
|
await self.taskgroup.spawn(self._run_new_interface(server))
|
||||||
return
|
return
|
||||||
|
|
||||||
i = self.interfaces[server]
|
i = self.interfaces[server]
|
||||||
@@ -758,9 +744,19 @@ class Network(Logger):
|
|||||||
return request_type.RELAXED
|
return request_type.RELAXED
|
||||||
return request_type.NORMAL
|
return request_type.NORMAL
|
||||||
|
|
||||||
@ignore_exceptions # do not kill main_taskgroup
|
@ignore_exceptions # do not kill outer taskgroup
|
||||||
@log_exceptions
|
@log_exceptions
|
||||||
async def _run_new_interface(self, server: ServerAddr):
|
async def _run_new_interface(self, server: ServerAddr):
|
||||||
|
if server in self.interfaces or server in self.connecting:
|
||||||
|
return
|
||||||
|
self.connecting.add(server)
|
||||||
|
if server == self.default_server:
|
||||||
|
self.logger.info(f"connecting to {server} as new interface")
|
||||||
|
self._set_status('connecting')
|
||||||
|
# update _last_tried_server
|
||||||
|
last_time, num_attempts = self._last_tried_server.get(server, (0, 0))
|
||||||
|
self._last_tried_server[server] = time.time(), num_attempts + 1
|
||||||
|
|
||||||
interface = Interface(network=self, server=server, proxy=self.proxy)
|
interface = Interface(network=self, server=server, proxy=self.proxy)
|
||||||
# note: using longer timeouts here as DNS can sometimes be slow!
|
# note: using longer timeouts here as DNS can sometimes be slow!
|
||||||
timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
|
timeout = self.get_network_timeout_seconds(NetworkTimeout.Generic)
|
||||||
@@ -1145,14 +1141,13 @@ class Network(Logger):
|
|||||||
assert not self.taskgroup
|
assert not self.taskgroup
|
||||||
self.taskgroup = taskgroup = SilentTaskGroup()
|
self.taskgroup = taskgroup = SilentTaskGroup()
|
||||||
assert not self.interface and not self.interfaces
|
assert not self.interface and not self.interfaces
|
||||||
assert not self.connecting and not self.server_queue
|
assert not self.connecting
|
||||||
self.logger.info('starting network')
|
self.logger.info('starting network')
|
||||||
self._last_tried_server.clear()
|
self._last_tried_server.clear()
|
||||||
self.protocol = self.default_server.protocol
|
self.protocol = self.default_server.protocol
|
||||||
self.server_queue = queue.Queue()
|
|
||||||
self._set_proxy(deserialize_proxy(self.config.get('proxy')))
|
self._set_proxy(deserialize_proxy(self.config.get('proxy')))
|
||||||
self._set_oneserver(self.config.get('oneserver', False))
|
self._set_oneserver(self.config.get('oneserver', False))
|
||||||
self._start_interface(self.default_server)
|
await self.taskgroup.spawn(self._run_new_interface(self.default_server))
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
self.logger.info("starting taskgroup.")
|
self.logger.info("starting taskgroup.")
|
||||||
@@ -1192,7 +1187,6 @@ class Network(Logger):
|
|||||||
self.interface = None
|
self.interface = None
|
||||||
self.interfaces = {}
|
self.interfaces = {}
|
||||||
self.connecting.clear()
|
self.connecting.clear()
|
||||||
self.server_queue = None
|
|
||||||
if not full_shutdown:
|
if not full_shutdown:
|
||||||
util.trigger_callback('network_updated')
|
util.trigger_callback('network_updated')
|
||||||
|
|
||||||
@@ -1215,16 +1209,12 @@ class Network(Logger):
|
|||||||
await self.switch_to_interface(self.default_server)
|
await self.switch_to_interface(self.default_server)
|
||||||
|
|
||||||
async def _maintain_sessions(self):
|
async def _maintain_sessions(self):
|
||||||
async def launch_already_queued_up_new_interfaces():
|
async def maybe_start_new_interfaces():
|
||||||
while self.server_queue.qsize() > 0:
|
|
||||||
server = self.server_queue.get()
|
|
||||||
await self.taskgroup.spawn(self._run_new_interface(server))
|
|
||||||
async def maybe_queue_new_interfaces_to_be_launched_later():
|
|
||||||
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
|
for i in range(self.num_server - len(self.interfaces) - len(self.connecting)):
|
||||||
# FIXME this should try to honour "healthy spread of connected servers"
|
# FIXME this should try to honour "healthy spread of connected servers"
|
||||||
server = self._get_next_server_to_try()
|
server = self._get_next_server_to_try()
|
||||||
if server:
|
if server:
|
||||||
self._start_interface(server)
|
await self.taskgroup.spawn(self._run_new_interface(server))
|
||||||
async def maintain_healthy_spread_of_connected_servers():
|
async def maintain_healthy_spread_of_connected_servers():
|
||||||
with self.interfaces_lock: interfaces = list(self.interfaces.values())
|
with self.interfaces_lock: interfaces = list(self.interfaces.values())
|
||||||
random.shuffle(interfaces)
|
random.shuffle(interfaces)
|
||||||
@@ -1241,8 +1231,7 @@ class Network(Logger):
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
await launch_already_queued_up_new_interfaces()
|
await maybe_start_new_interfaces()
|
||||||
await maybe_queue_new_interfaces_to_be_launched_later()
|
|
||||||
await maintain_healthy_spread_of_connected_servers()
|
await maintain_healthy_spread_of_connected_servers()
|
||||||
await maintain_main_interface()
|
await maintain_main_interface()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
|||||||
Reference in New Issue
Block a user