diff --git a/electrum/gui/qml/qeapp.py b/electrum/gui/qml/qeapp.py index dde6bb3cb..22f70745b 100644 --- a/electrum/gui/qml/qeapp.py +++ b/electrum/gui/qml/qeapp.py @@ -460,7 +460,7 @@ class ElectrumQmlApplication(QGuiApplication): "MEMPOOL_MB": electrum.util.UI_UNIT_NAME_MEMPOOL_MB, }) - self.plugins.load_internal_plugin('trustedcoin') + self.plugins.load_plugin_by_name('trustedcoin') qInstallMessageHandler(self.message_handler) diff --git a/electrum/gui/qt/__init__.py b/electrum/gui/qt/__init__.py index 93cea0652..8dc7d6b73 100644 --- a/electrum/gui/qt/__init__.py +++ b/electrum/gui/qt/__init__.py @@ -159,7 +159,7 @@ class ElectrumGui(BaseElectrumGui, Logger): self.reload_app_stylesheet() # always load 2fa - self.plugins.load_internal_plugin('trustedcoin') + self.plugins.load_plugin_by_name('trustedcoin') run_hook('init_qt', self) diff --git a/electrum/plugin.py b/electrum/plugin.py index ca99ed530..7ed7e02cc 100644 --- a/electrum/plugin.py +++ b/electrum/plugin.py @@ -38,6 +38,7 @@ import concurrent import zipimport from concurrent import futures from functools import wraps, partial +from itertools import chain from .i18n import _ from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException) @@ -91,9 +92,9 @@ class Plugins(DaemonThread): def descriptions(self): return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items())) - def find_internal_plugins(self): - """Populates self.internal_plugin_metadata""" - iter_modules = list(pkgutil.iter_modules([self.pkgpath])) + def find_directory_plugins(self, pkg_path: str, external: bool): + """Finds plugins in directory form from the given pkg_path and populates the metadata dicts""" + iter_modules = list(pkgutil.iter_modules([pkg_path])) for loader, name, ispkg in iter_modules: # FIXME pyinstaller binaries are packaging each built-in plugin twice: # once as data and once as code. To honor the "no duplicates" rule below, @@ -102,8 +103,19 @@ class Plugins(DaemonThread): continue if self.cmd_only and self.config.get('enable_plugin_' + name) is not True: continue - full_name = f'electrum.plugins.{name}' + ('.commands' if self.cmd_only else '') - spec = importlib.util.find_spec(full_name) + base_name = 'electrum.plugins' if not external else 'electrum_external_plugins' + full_name = f'{base_name}.{name}' + ('.commands' if self.cmd_only else '') + if external: + module_path = os.path.join(pkg_path, name) + if not self._has_recursive_root_permissions(module_path): + self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions") + continue + module_path = os.path.join(module_path, 'commands.py' if self.cmd_only else '__init__.py') + if not os.path.exists(module_path): + continue + spec = importlib.util.spec_from_file_location(full_name, module_path) + else: + spec = importlib.util.find_spec(full_name) if spec is None: if self.cmd_only: continue # no commands module in this plugin @@ -129,10 +141,13 @@ class Plugins(DaemonThread): if d.get('requires_wallet_type'): # trustedcoin will not be added to list continue - if name in self.internal_plugin_metadata: + if name in self.internal_plugin_metadata or name in self.external_plugin_metadata: _logger.info(f"Found the following plugin modules: {iter_modules=}") raise Exception(f"duplicate plugins? for {name=}") - self.internal_plugin_metadata[name] = d + if not external: + self.internal_plugin_metadata[name] = d + else: + self.external_plugin_metadata[name] = d @staticmethod def exec_module_from_spec(spec, path): @@ -147,44 +162,36 @@ class Plugins(DaemonThread): return module def find_plugins(self): - self.find_internal_plugins() - self.find_external_plugins() + internal_plugins_path = (self.pkgpath, False) + external_plugins_path = (self.get_external_plugin_dir(), True) + for pkg_path, external in (internal_plugins_path, external_plugins_path): + # external plugins enforce root permissions on the directory + if pkg_path and os.path.exists(pkg_path): + self.find_directory_plugins(pkg_path=pkg_path, external=external) + self.find_zip_plugins(pkg_path=pkg_path, external=external) def load_plugins(self): - self.load_internal_plugins() - self.load_external_plugins() - - def load_internal_plugins(self): - for name, d in self.internal_plugin_metadata.items(): + for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()): if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name): try: - self.load_internal_plugin(name) + self.load_plugin_by_name(name) except BaseException as e: self.logger.exception(f"cannot initialize plugin {name}: {e}") - def load_external_plugin(self, name): - if name in self.plugins: - return self.plugins[name] - # If we do not have the metadata, it was not detected by `load_external_plugins` - # on startup, or added by manual user installation after that point. - metadata = self.external_plugin_metadata.get(name) - if metadata is None: - self.logger.exception(f"attempted to load unknown external plugin {name}") - return - full_name = f'electrum_external_plugins.{name}.{self.gui_name}' - spec = importlib.util.find_spec(full_name) - if spec is None: - raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found") - module = self.exec_module_from_spec(spec, full_name) - plugin = module.Plugin(self, self.config, name) - self.add_jobs(plugin.thread_jobs()) - self.plugins[name] = plugin - self.logger.info(f"loaded external plugin {name}") - return plugin - def _has_root_permissions(self, path): return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK) + @profiler(min_threshold=0.5) + def _has_recursive_root_permissions(self, path): + """Check if a directory and all its subdirectories have root permissions""" + for root, dirs, files in os.walk(path): + if not self._has_root_permissions(root): + return False + for f in files: + if not self._has_root_permissions(os.path.join(root, f)): + return False + return True + def get_external_plugin_dir(self): if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'): return @@ -197,18 +204,23 @@ class Plugins(DaemonThread): return return pkg_path - def external_plugin_path(self, name): - metadata = self.external_plugin_metadata[name] - filename = metadata['filename'] - return os.path.join(self.get_external_plugin_dir(), filename) + def zip_plugin_path(self, name): + filename = self.get_metadata(name)['filename'] + if name in self.internal_plugin_metadata: + pkg_path = self.pkgpath + else: + pkg_path = self.get_external_plugin_dir() + return os.path.join(pkg_path, filename) - def find_external_plugins(self): - pkg_path = self.get_external_plugin_dir() + def find_zip_plugins(self, pkg_path: str, external: bool): + """Finds plugins in zip form in the given pkg_path and populates the metadata dicts""" if pkg_path is None: return for filename in os.listdir(pkg_path): path = os.path.join(pkg_path, filename) - if not self._has_root_permissions(path): + if not filename.endswith('.zip'): + continue + if external and not self._has_root_permissions(path): self.logger.info(f'not loading {path}: file has user write permissions') continue try: @@ -223,7 +235,7 @@ class Plugins(DaemonThread): raise Exception(f"duplicate plugins for name={name}") if name in self.external_plugin_metadata: raise Exception(f"duplicate plugins for name={name}") - module_path = f'electrum_external_plugins.{name}' + module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}' spec = zipfile.find_spec(name) module = self.exec_module_from_spec(spec, module_path) if self.cmd_only: @@ -245,16 +257,11 @@ class Plugins(DaemonThread): continue d['display_name'] = d['fullname'] d['zip_hash_sha256'] = get_file_hash256(path) - self.external_plugin_metadata[name] = d - - def load_external_plugins(self): - for name, d in self.external_plugin_metadata.items(): - if self.config.get('enable_plugin_' + name): - try: - self.load_external_plugin(name) - except BaseException as e: - traceback.print_exc(file=sys.stdout) # shouldn't this be... suppressed unless -v? - self.logger.exception(f"cannot initialize plugin {name} {e!r}") + d['is_zip'] = True + if external: + self.external_plugin_metadata[name] = d + else: + self.internal_plugin_metadata[name] = d def get(self, name): return self.plugins.get(name) @@ -266,23 +273,31 @@ class Plugins(DaemonThread): """Imports the code of the given plugin. note: can be called from any thread. """ - if name in self.internal_plugin_metadata: - return self.load_internal_plugin(name) - elif name in self.external_plugin_metadata: - return self.load_external_plugin(name) + if self.get_metadata(name): + return self.load_plugin_by_name(name) else: raise Exception(f"could not find plugin {name!r}") - def load_internal_plugin(self, name) -> 'BasePlugin': + def load_plugin_by_name(self, name) -> 'BasePlugin': if name in self.plugins: return self.plugins[name] - full_name = f'electrum.plugins.{name}.{self.gui_name}' + + is_zip = self.is_plugin_zip(name) + is_external = name in self.external_plugin_metadata + if not is_external: + full_name = f'electrum.plugins.{name}.{self.gui_name}' + else: + full_name = f'electrum_external_plugins.{name}.{self.gui_name}' + spec = importlib.util.find_spec(full_name) if spec is None: raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found") try: - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) + if is_zip: + module = self.exec_module_from_spec(spec, full_name) + else: + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) plugin = module.Plugin(self, self.config, name) except Exception as e: raise Exception(f"Error loading {name} plugin: {repr(e)}") from e @@ -376,6 +391,19 @@ class Plugins(DaemonThread): self.load_plugin(name) return self.plugins[name] + def is_plugin_zip(self, name: str) -> bool: + """Returns True if the plugin is a zip file""" + if (metadata := self.get_metadata(name)) is None: + return False + return metadata.get('is_zip', False) + + def get_metadata(self, name: str) -> Optional[dict]: + """Returns the metadata of the plugin""" + metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name) + if not metadata: + return None + return metadata + def run(self): while self.is_running(): self.wake_up_event.wait(0.1) # time.sleep(0.1) OR event @@ -470,13 +498,16 @@ class BasePlugin(Logger): def read_file(self, filename: str) -> bytes: import zipfile - if self.name in self.parent.external_plugin_metadata: - plugin_filename = self.parent.external_plugin_path(self.name) + if self.parent.is_plugin_zip(self.name): + plugin_filename = self.parent.zip_plugin_path(self.name) with zipfile.ZipFile(plugin_filename) as myzip: with myzip.open(os.path.join(self.name, filename)) as myfile: return myfile.read() else: - path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename) + if filename in self.parent.internal_plugin_metadata: + path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename) + else: + path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename) with open(path, 'rb') as myfile: return myfile.read() diff --git a/tests/test_wizard.py b/tests/test_wizard.py index 822c22098..c3f009faa 100644 --- a/tests/test_wizard.py +++ b/tests/test_wizard.py @@ -43,7 +43,7 @@ class WizardTestCase(ElectrumTestCase): }) self.wallet_path = os.path.join(self.electrum_path, "somewallet") self.plugins = Plugins(self.config, gui_name='cmdline') - self.plugins.load_internal_plugin('trustedcoin') + self.plugins.load_plugin_by_name('trustedcoin') def tearDown(self): self.plugins.stop()