1
0

allow all plugins to be either zip or directory based

This commit is contained in:
f321x
2025-03-17 14:22:01 +01:00
parent bea4c617f6
commit 246f03fe20
4 changed files with 98 additions and 67 deletions

View File

@@ -460,7 +460,7 @@ class ElectrumQmlApplication(QGuiApplication):
"MEMPOOL_MB": electrum.util.UI_UNIT_NAME_MEMPOOL_MB,
})
self.plugins.load_internal_plugin('trustedcoin')
self.plugins.load_plugin_by_name('trustedcoin')
qInstallMessageHandler(self.message_handler)

View File

@@ -159,7 +159,7 @@ class ElectrumGui(BaseElectrumGui, Logger):
self.reload_app_stylesheet()
# always load 2fa
self.plugins.load_internal_plugin('trustedcoin')
self.plugins.load_plugin_by_name('trustedcoin')
run_hook('init_qt', self)

View File

@@ -38,6 +38,7 @@ import concurrent
import zipimport
from concurrent import futures
from functools import wraps, partial
from itertools import chain
from .i18n import _
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
@@ -91,9 +92,9 @@ class Plugins(DaemonThread):
def descriptions(self):
return dict(list(self.internal_plugin_metadata.items()) + list(self.external_plugin_metadata.items()))
def find_internal_plugins(self):
"""Populates self.internal_plugin_metadata"""
iter_modules = list(pkgutil.iter_modules([self.pkgpath]))
def find_directory_plugins(self, pkg_path: str, external: bool):
"""Finds plugins in directory form from the given pkg_path and populates the metadata dicts"""
iter_modules = list(pkgutil.iter_modules([pkg_path]))
for loader, name, ispkg in iter_modules:
# FIXME pyinstaller binaries are packaging each built-in plugin twice:
# once as data and once as code. To honor the "no duplicates" rule below,
@@ -102,8 +103,19 @@ class Plugins(DaemonThread):
continue
if self.cmd_only and self.config.get('enable_plugin_' + name) is not True:
continue
full_name = f'electrum.plugins.{name}' + ('.commands' if self.cmd_only else '')
spec = importlib.util.find_spec(full_name)
base_name = 'electrum.plugins' if not external else 'electrum_external_plugins'
full_name = f'{base_name}.{name}' + ('.commands' if self.cmd_only else '')
if external:
module_path = os.path.join(pkg_path, name)
if not self._has_recursive_root_permissions(module_path):
self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
continue
module_path = os.path.join(module_path, 'commands.py' if self.cmd_only else '__init__.py')
if not os.path.exists(module_path):
continue
spec = importlib.util.spec_from_file_location(full_name, module_path)
else:
spec = importlib.util.find_spec(full_name)
if spec is None:
if self.cmd_only:
continue # no commands module in this plugin
@@ -129,10 +141,13 @@ class Plugins(DaemonThread):
if d.get('requires_wallet_type'):
# trustedcoin will not be added to list
continue
if name in self.internal_plugin_metadata:
if name in self.internal_plugin_metadata or name in self.external_plugin_metadata:
_logger.info(f"Found the following plugin modules: {iter_modules=}")
raise Exception(f"duplicate plugins? for {name=}")
self.internal_plugin_metadata[name] = d
if not external:
self.internal_plugin_metadata[name] = d
else:
self.external_plugin_metadata[name] = d
@staticmethod
def exec_module_from_spec(spec, path):
@@ -147,44 +162,36 @@ class Plugins(DaemonThread):
return module
def find_plugins(self):
self.find_internal_plugins()
self.find_external_plugins()
internal_plugins_path = (self.pkgpath, False)
external_plugins_path = (self.get_external_plugin_dir(), True)
for pkg_path, external in (internal_plugins_path, external_plugins_path):
# external plugins enforce root permissions on the directory
if pkg_path and os.path.exists(pkg_path):
self.find_directory_plugins(pkg_path=pkg_path, external=external)
self.find_zip_plugins(pkg_path=pkg_path, external=external)
def load_plugins(self):
self.load_internal_plugins()
self.load_external_plugins()
def load_internal_plugins(self):
for name, d in self.internal_plugin_metadata.items():
for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
if not d.get('requires_wallet_type') and self.config.get('enable_plugin_' + name):
try:
self.load_internal_plugin(name)
self.load_plugin_by_name(name)
except BaseException as e:
self.logger.exception(f"cannot initialize plugin {name}: {e}")
def load_external_plugin(self, name):
if name in self.plugins:
return self.plugins[name]
# If we do not have the metadata, it was not detected by `load_external_plugins`
# on startup, or added by manual user installation after that point.
metadata = self.external_plugin_metadata.get(name)
if metadata is None:
self.logger.exception(f"attempted to load unknown external plugin {name}")
return
full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
spec = importlib.util.find_spec(full_name)
if spec is None:
raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
module = self.exec_module_from_spec(spec, full_name)
plugin = module.Plugin(self, self.config, name)
self.add_jobs(plugin.thread_jobs())
self.plugins[name] = plugin
self.logger.info(f"loaded external plugin {name}")
return plugin
def _has_root_permissions(self, path):
return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
@profiler(min_threshold=0.5)
def _has_recursive_root_permissions(self, path):
"""Check if a directory and all its subdirectories have root permissions"""
for root, dirs, files in os.walk(path):
if not self._has_root_permissions(root):
return False
for f in files:
if not self._has_root_permissions(os.path.join(root, f)):
return False
return True
def get_external_plugin_dir(self):
if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
return
@@ -197,18 +204,23 @@ class Plugins(DaemonThread):
return
return pkg_path
def external_plugin_path(self, name):
metadata = self.external_plugin_metadata[name]
filename = metadata['filename']
return os.path.join(self.get_external_plugin_dir(), filename)
def zip_plugin_path(self, name):
filename = self.get_metadata(name)['filename']
if name in self.internal_plugin_metadata:
pkg_path = self.pkgpath
else:
pkg_path = self.get_external_plugin_dir()
return os.path.join(pkg_path, filename)
def find_external_plugins(self):
pkg_path = self.get_external_plugin_dir()
def find_zip_plugins(self, pkg_path: str, external: bool):
"""Finds plugins in zip form in the given pkg_path and populates the metadata dicts"""
if pkg_path is None:
return
for filename in os.listdir(pkg_path):
path = os.path.join(pkg_path, filename)
if not self._has_root_permissions(path):
if not filename.endswith('.zip'):
continue
if external and not self._has_root_permissions(path):
self.logger.info(f'not loading {path}: file has user write permissions')
continue
try:
@@ -223,7 +235,7 @@ class Plugins(DaemonThread):
raise Exception(f"duplicate plugins for name={name}")
if name in self.external_plugin_metadata:
raise Exception(f"duplicate plugins for name={name}")
module_path = f'electrum_external_plugins.{name}'
module_path = f'electrum_external_plugins.{name}' if external else f'electrum.plugins.{name}'
spec = zipfile.find_spec(name)
module = self.exec_module_from_spec(spec, module_path)
if self.cmd_only:
@@ -245,16 +257,11 @@ class Plugins(DaemonThread):
continue
d['display_name'] = d['fullname']
d['zip_hash_sha256'] = get_file_hash256(path)
self.external_plugin_metadata[name] = d
def load_external_plugins(self):
for name, d in self.external_plugin_metadata.items():
if self.config.get('enable_plugin_' + name):
try:
self.load_external_plugin(name)
except BaseException as e:
traceback.print_exc(file=sys.stdout) # shouldn't this be... suppressed unless -v?
self.logger.exception(f"cannot initialize plugin {name} {e!r}")
d['is_zip'] = True
if external:
self.external_plugin_metadata[name] = d
else:
self.internal_plugin_metadata[name] = d
def get(self, name):
return self.plugins.get(name)
@@ -266,23 +273,31 @@ class Plugins(DaemonThread):
"""Imports the code of the given plugin.
note: can be called from any thread.
"""
if name in self.internal_plugin_metadata:
return self.load_internal_plugin(name)
elif name in self.external_plugin_metadata:
return self.load_external_plugin(name)
if self.get_metadata(name):
return self.load_plugin_by_name(name)
else:
raise Exception(f"could not find plugin {name!r}")
def load_internal_plugin(self, name) -> 'BasePlugin':
def load_plugin_by_name(self, name) -> 'BasePlugin':
if name in self.plugins:
return self.plugins[name]
full_name = f'electrum.plugins.{name}.{self.gui_name}'
is_zip = self.is_plugin_zip(name)
is_external = name in self.external_plugin_metadata
if not is_external:
full_name = f'electrum.plugins.{name}.{self.gui_name}'
else:
full_name = f'electrum_external_plugins.{name}.{self.gui_name}'
spec = importlib.util.find_spec(full_name)
if spec is None:
raise RuntimeError(f"{self.gui_name} implementation for {name} plugin not found")
try:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if is_zip:
module = self.exec_module_from_spec(spec, full_name)
else:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
plugin = module.Plugin(self, self.config, name)
except Exception as e:
raise Exception(f"Error loading {name} plugin: {repr(e)}") from e
@@ -376,6 +391,19 @@ class Plugins(DaemonThread):
self.load_plugin(name)
return self.plugins[name]
def is_plugin_zip(self, name: str) -> bool:
"""Returns True if the plugin is a zip file"""
if (metadata := self.get_metadata(name)) is None:
return False
return metadata.get('is_zip', False)
def get_metadata(self, name: str) -> Optional[dict]:
"""Returns the metadata of the plugin"""
metadata = self.internal_plugin_metadata.get(name) or self.external_plugin_metadata.get(name)
if not metadata:
return None
return metadata
def run(self):
while self.is_running():
self.wake_up_event.wait(0.1) # time.sleep(0.1) OR event
@@ -470,13 +498,16 @@ class BasePlugin(Logger):
def read_file(self, filename: str) -> bytes:
import zipfile
if self.name in self.parent.external_plugin_metadata:
plugin_filename = self.parent.external_plugin_path(self.name)
if self.parent.is_plugin_zip(self.name):
plugin_filename = self.parent.zip_plugin_path(self.name)
with zipfile.ZipFile(plugin_filename) as myzip:
with myzip.open(os.path.join(self.name, filename)) as myfile:
return myfile.read()
else:
path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
if filename in self.parent.internal_plugin_metadata:
path = os.path.join(os.path.dirname(__file__), 'plugins', self.name, filename)
else:
path = os.path.join(self.parent.get_external_plugin_dir(), self.name, filename)
with open(path, 'rb') as myfile:
return myfile.read()

View File

@@ -43,7 +43,7 @@ class WizardTestCase(ElectrumTestCase):
})
self.wallet_path = os.path.join(self.electrum_path, "somewallet")
self.plugins = Plugins(self.config, gui_name='cmdline')
self.plugins.load_internal_plugin('trustedcoin')
self.plugins.load_plugin_by_name('trustedcoin')
def tearDown(self):
self.plugins.stop()