From 737417fb800caa0fcad4e6d0bb473cde99d5b22d Mon Sep 17 00:00:00 2001 From: ThomasV Date: Fri, 4 Apr 2025 14:23:52 +0200 Subject: [PATCH] Userspace plugins: - Allow plugins saved as zipfiles in user data dir - plugins are authorized with a user chosen password - pubkey derived from password is saved with admin permissions --- electrum/gui/qt/password_dialog.py | 26 +++ electrum/gui/qt/plugins_dialog.py | 251 +++++++++++++++++++++------ electrum/plugin.py | 261 ++++++++++++++++++++--------- 3 files changed, 403 insertions(+), 135 deletions(-) diff --git a/electrum/gui/qt/password_dialog.py b/electrum/gui/qt/password_dialog.py index 354fea5ff..48d59be3f 100644 --- a/electrum/gui/qt/password_dialog.py +++ b/electrum/gui/qt/password_dialog.py @@ -236,6 +236,32 @@ class ChangePasswordDialogBase(WindowModalDialog): raise NotImplementedError() +class NewPasswordDialog(WindowModalDialog): + + def __init__(self, parent, msg): + self.msg = msg + WindowModalDialog.__init__(self, parent) + OK_button = OkButton(self) + self.playout = PasswordLayout( + msg=self.msg, + kind=PW_CHANGE, + OK_button=OK_button, + wallet=None) + self.setWindowTitle(self.playout.title()) + vbox = QVBoxLayout(self) + vbox.addLayout(self.playout.layout()) + vbox.addStretch(1) + vbox.addLayout(Buttons(CancelButton(self), OK_button)) + + def run(self): + try: + if not self.exec(): + return None + return self.playout.new_password() + finally: + self.playout.clear_password_fields() + + class ChangePasswordDialogForSW(ChangePasswordDialogBase): def create_password_layout(self, wallet, is_encrypted, OK_button): diff --git a/electrum/gui/qt/plugins_dialog.py b/electrum/gui/qt/plugins_dialog.py index ba703aaa0..7b9e7081f 100644 --- a/electrum/gui/qt/plugins_dialog.py +++ b/electrum/gui/qt/plugins_dialog.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING, Optional from functools import partial -from PyQt6.QtWidgets import QLabel, QVBoxLayout, QGridLayout, QPushButton, QWidget, QScrollArea, QCheckBox, QFormLayout +from PyQt6.QtWidgets import QLabel, QVBoxLayout, QGridLayout, QPushButton, QWidget, QScrollArea, QFormLayout from electrum.i18n import _ from electrum.plugin import run_hook @@ -11,12 +11,13 @@ from .util import WindowModalDialog, Buttons, CloseButton, WWLabel, insert_space if TYPE_CHECKING: from .main_window import ElectrumWindow + from electrum_cc import ECPrivkey class PluginDialog(WindowModalDialog): - def __init__(self, name, metadata, cb: 'QCheckBox', window: 'ElectrumWindow', index:int): - display_name = metadata.get('display_name', '') + def __init__(self, name, metadata, status_button: Optional['PluginStatusButton'], window: 'ElectrumWindow'): + display_name = metadata.get('fullname', '') author = metadata.get('author', '') description = metadata.get('description', '') requires = metadata.get('requires') @@ -24,14 +25,13 @@ class PluginDialog(WindowModalDialog): zip_hash = metadata.get('zip_hash_sha256', None) WindowModalDialog.__init__(self, window, 'Plugin') - self.setMinimumSize(400,250) - self.index = index + self.setMinimumSize(400, 250) self.window = window self.metadata = metadata self.plugins = self.window.plugins self.name = name - self.cb = cb - p = self.plugins.get(name) # is installed + self.status_button = status_button + p = self.plugins.get(name) # is enabled vbox = QVBoxLayout(self) form = QFormLayout(None) form.addRow(QLabel(_('Name') + ':'), QLabel(display_name)) @@ -44,24 +44,75 @@ class PluginDialog(WindowModalDialog): msg = '\n'.join(map(lambda x: x[1], requires)) form.addRow(QLabel(_('Requires') + ':'), WWLabel(msg)) vbox.addLayout(form) - text = _('Disable') if p else _('Enable') - toggle_button = QPushButton(text) - toggle_button.clicked.connect(partial(self.do_toggle, toggle_button, name)) + toggle_button = QPushButton('') + if not self.plugins.is_installed(name): + toggle_button.setText(_('Install...')) + toggle_button.clicked.connect(self.accept) + else: + text = (_('Disable') if p else _('Enable')) if self.plugins.is_authorized(name) else _('Authorize...') + toggle_button.setText(text) + toggle_button.clicked.connect(partial(self.do_toggle, toggle_button, name)) close_button = CloseButton(self) - close_button.setText(_('Cancel')) + close_button.setText(_('Close')) buttons = [toggle_button, close_button] + # add settings widget + if p and p.requires_settings() and p.is_enabled(): + widget = p.settings_widget(self) + buttons.insert(0, widget) vbox.addLayout(Buttons(*buttons)) - def do_toggle(self, button, name): - button.setEnabled(False) - p = self.plugins.toggle(name) - self.cb.setChecked(bool(p)) + def do_toggle(self, toggle_button, name): + toggle_button.setEnabled(False) + if not self.plugins.is_authorized(name): + privkey = self.window.get_plugins_privkey() + if not privkey: + return + filename = self.plugins.zip_plugin_path(name) + self.window.plugins.authorize_plugin(name, filename, privkey) + self.status_button.update() + self.close() + return + p = self.plugins.get(name) + if not p: + self.plugins.enable(name) + else: + self.plugins.disable(name) + self.status_button.update() self.close() - self.window.enable_settings_widget(name, self.index) # note: all enabled plugins will receive this hook: run_hook('init_qt', self.window.window.gui_object) +class PluginStatusButton(QPushButton): + + def __init__(self, window, name): + QPushButton.__init__(self, '') + self.window = window + self.plugins = window.plugins + self.name = name + self.clicked.connect(self.show_plugin_dialog) + self.update() + + def show_plugin_dialog(self): + metadata = self.plugins.descriptions[self.name] + d = PluginDialog(self.name, metadata, self, self.window) + d.exec() + + def update(self): + from .util import ColorScheme + p = self.plugins.get(self.name) + plugin_is_loaded = p is not None + enabled = ( + not plugin_is_loaded and self.plugins.is_available(self.name, self.window.wallet) + or plugin_is_loaded and p.can_user_disable() + ) + self.setEnabled(enabled) + text, color = (_('Unauthorized'), ColorScheme.RED) if not self.window.plugins.is_authorized(self.name)\ + else ((_('Enabled'), ColorScheme.BLUE) if p is not None and p.is_enabled() else (_('Disabled'), ColorScheme.DEFAULT)) + self.setStyleSheet(color.as_stylesheet()) + self.setText(text) + + class PluginsDialog(WindowModalDialog): def __init__(self, window: 'ElectrumWindow'): @@ -70,63 +121,157 @@ class PluginsDialog(WindowModalDialog): self.wallet = self.window.wallet self.config = window.config self.plugins = self.window.gui_object.plugins - self.settings_widgets = {} vbox = QVBoxLayout(self) scroll = QScrollArea() scroll.setEnabled(True) scroll.setWidgetResizable(True) - scroll.setMinimumSize(400,250) + scroll.setMinimumSize(400, 250) scroll_w = QWidget() scroll.setWidget(scroll_w) self.grid = QGridLayout() - self.grid.setColumnStretch(0,1) + self.grid.setColumnStretch(0, 1) scroll_w.setLayout(self.grid) vbox.addWidget(scroll) - vbox.addLayout(Buttons(CloseButton(self))) + add_button = QPushButton(_('Add')) + add_button.clicked.connect(self.add_plugin_dialog) + #add_button.clicked.connect(self.download_plugin_dialog) + vbox.addLayout(Buttons(add_button, CloseButton(self))) self.show_list() - def enable_settings_widget(self, name: str, i: int): - p = self.plugins.get(name) - widget = self.settings_widgets.get(name) # type: Optional[QWidget] - if widget and not p: - # plugin got disabled, rm widget - self.grid.removeWidget(widget) - widget.setParent(None) - self.settings_widgets.pop(name) - elif widget is None and p and p.requires_settings() and p.is_enabled(): - # plugin got enabled, add widget - widget = self.settings_widgets[name] = p.settings_widget(self) - self.grid.addWidget(widget, i, 1) + def get_plugins_privkey(self) -> Optional['ECPrivkey']: + pubkey, salt = self.plugins.get_pubkey_bytes() + if not pubkey: + self.init_plugins_password() + return + # ask for url and password, same window + pw = self.window.password_dialog( + msg=' '.join([ + _('Warning: Third-party plugins are not endorsed by Electrum!'), + '

', + _('If you install a third-party plugin, you trust the software not to be malicious.'), + _('Electrum will not be responsible in case of theft, loss of funds or privacy that might result from third-party plugins.'), + _('You should at minimum check who the author of the plugin is, and be careful of imposters.'), + '

', + _('Please enter your plugin authorization password') + ':' + ]) + ) + if not pw: + return + privkey = self.plugins.derive_privkey(pw, salt) + if pubkey != privkey.get_public_key_bytes(): + keyfile_path, keyfile_help = self.plugins.get_keyfile_path() + self.window.show_error( + ''.join([ + _('Incorrect password.'), '\n\n', + _('Your plugin authorization password is required to install plugins.'), ' ', + _('If you need to reset it, remove the following file:'), '\n\n', + keyfile_path + ])) + return + return privkey + + def init_plugins_password(self): + from .password_dialog import NewPasswordDialog + msg = ' '.join([ + _('In order to install third-party plugins, you need to choose a plugin authorization password.'), + _('Its purpose is to prevent unauthorized users (or malware) from installing plugins.'), + ]) + d = NewPasswordDialog(self, msg=msg) + pw = d.run() + if not pw: + return + key_hex = self.plugins.create_new_key(pw) + keyfile_path, keyfile_help = self.plugins.get_keyfile_path() + msg = ''.join([ + _('Your plugins key is:'), '\n\n', key_hex, '\n\n', + _('Please save this key in'), '\n\n' + keyfile_path, '\n\n', keyfile_help + ]) + self.window.do_copy(key_hex, title=_('Plugins key')) + self.window.show_message(msg) + + def download_plugin_dialog(self): + import os + from .util import line_dialog + from electrum.util import UserCancelled + pubkey, salt = self.plugins.get_pubkey_bytes() + if not pubkey: + self.init_plugins_password() + return + url = line_dialog(self, 'url', _('Enter plugin URL'), _('Download')) + if not url: + return + coro = self.plugins.download_external_plugin(url) + try: + path = self.window.run_coroutine_dialog(coro, "Downloading plugin...") + except UserCancelled: + return + except Exception as e: + self.window.show_error(f"{e}") + return + try: + success = self.confirm_add_plugin(path) + except Exception as e: + self.window.show_error(f"{e}") + success = False + if not success: + os.unlink(path) + + def add_plugin_dialog(self): + from PyQt6.QtWidgets import QFileDialog + import shutil, os + pubkey, salt = self.plugins.get_pubkey_bytes() + if not pubkey: + self.init_plugins_password() + return + filename, __ = QFileDialog.getOpenFileName(self, "Select your plugin zipfile", "", "*.zip") + if not filename: + return + plugins_dir = self.plugins.get_external_plugin_dir() + path = os.path.join(plugins_dir, os.path.basename(filename)) + shutil.copyfile(filename, path) + try: + success = self.confirm_add_plugin(path) + except Exception as e: + self.window.show_error(f"{e}") + success = False + if not success: + os.unlink(path) + + def confirm_add_plugin(self, path): + manifest = self.plugins.read_manifest(path) + name = manifest['name'] + d = PluginDialog(name, manifest, None, self) + if not d.exec(): + return False + # ask password once user has approved + privkey = self.get_plugins_privkey() + if not privkey: + return False + self.plugins.external_plugin_metadata[name] = manifest + self.plugins.authorize_plugin(name, path, privkey) + self.window.show_message(_('Plugin installed successfully.')) + self.show_list() + return True def show_list(self): descriptions = self.plugins.descriptions descriptions = sorted(descriptions.items()) grid = self.grid + # clear existing items + for i in reversed(range(grid.count())): + grid.itemAt(i).widget().setParent(None) + # populate i = 0 for name, metadata in descriptions: i += 1 - p = self.plugins.get(name) if metadata.get('registers_keystore'): continue - display_name = metadata.get('display_name') + display_name = metadata.get('fullname') if not display_name: continue - #try: - cb = QCheckBox(display_name) - plugin_is_loaded = p is not None - cb_enabled = (not plugin_is_loaded and self.plugins.is_available(name, self.wallet) - or plugin_is_loaded and p.can_user_disable()) - cb.setEnabled(cb_enabled) - cb.setChecked(plugin_is_loaded and p.is_enabled()) - grid.addWidget(cb, i, 0) - self.enable_settings_widget(name, i) - cb.clicked.connect(partial(self.show_plugin_dialog, name, cb, i)) - - #grid.setRowStretch(len(descriptions), 1) - - def show_plugin_dialog(self, name, cb, i): - p = self.plugins.get(name) - metadata = self.plugins.descriptions[name] - cb.setChecked(p is not None and p.is_enabled()) - d = PluginDialog(name, metadata, cb, self, i) - d.exec() + label = QLabel(display_name) + grid.addWidget(label, i, 0) + status_button = PluginStatusButton(self, name) + grid.addWidget(status_button, i, 1) + # add stretch + grid.setRowStretch(i + 1, 1) diff --git a/electrum/plugin.py b/electrum/plugin.py index 5b296ca73..7f1eed5c8 100644 --- a/electrum/plugin.py +++ b/electrum/plugin.py @@ -29,24 +29,27 @@ import pkgutil import importlib.util import time import threading -import traceback import sys import aiohttp import zipfile as zipfile_lib +from urllib.parse import urlparse from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple, Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping) import concurrent import zipimport -from concurrent import futures from functools import wraps, partial from itertools import chain +from electrum_ecc import ECPrivkey, ECPubkey + +from ._vendor.distutils.version import StrictVersion +from .version import ELECTRUM_VERSION from .i18n import _ from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException) from . import bip32 from . import plugins -from .simple_config import ConfigVar, SimpleConfig +from .simple_config import SimpleConfig from .logging import get_logger, Logger from .crypto import sha256 @@ -60,17 +63,20 @@ _logger = get_logger(__name__) plugin_loaders = {} hook_names = set() hooks = {} -_root_permission_cache = {} _exec_module_failure = {} # type: Dict[str, Exception] +PLUGIN_PASSWORD_VERSION = 1 + class Plugins(DaemonThread): LOGGING_SHORTCUT = 'p' pkgpath = os.path.dirname(plugins.__file__) + keyfile_linux = '/etc/electrum/plugins_key' + keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey' @profiler - def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False): + def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False): self.config = config self.cmd_only = cmd_only # type: bool self.internal_plugin_metadata = {} @@ -106,9 +112,6 @@ class Plugins(DaemonThread): if loader.__class__.__qualname__ == "PyiFrozenImporter": continue module_path = os.path.join(pkg_path, name) - if external and not self._has_recursive_root_permissions(module_path): - self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions") - continue if self.cmd_only and not self.config.get('enable_plugin_' + name) is True: continue try: @@ -119,7 +122,6 @@ class Plugins(DaemonThread): continue if 'fullname' not in d: continue - d['display_name'] = d['fullname'] d['path'] = module_path if not self.cmd_only: gui_good = self.gui_name in d.get('available_for', []) @@ -164,10 +166,11 @@ class Plugins(DaemonThread): internal_plugins_path = (self.pkgpath, False) external_plugins_path = (self.get_external_plugin_dir(), True) for pkg_path, external in (internal_plugins_path, external_plugins_path): - # external plugins enforce root permissions on the directory if pkg_path and os.path.exists(pkg_path): - self.find_directory_plugins(pkg_path=pkg_path, external=external) - self.find_zip_plugins(pkg_path=pkg_path, external=external) + if not external: + self.find_directory_plugins(pkg_path=pkg_path, external=external) + else: + self.find_zip_plugins(pkg_path=pkg_path, external=external) def load_plugins(self): for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()): @@ -183,35 +186,96 @@ class Plugins(DaemonThread): def _has_root_permissions(self, path): return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK) - @profiler(min_threshold=0.5) - def _has_recursive_root_permissions(self, path): - """Check if a directory and all its subdirectories have root permissions""" - if _root_permission_cache.get(path) is not None: - return _root_permission_cache[path] - _root_permission_cache[path] = False - for root, dirs, files in os.walk(path): - if not self._has_root_permissions(root): - return False - for f in files: - if not self._has_root_permissions(os.path.join(root, f)): - return False - _root_permission_cache[path] = True - return True + def get_keyfile_path(self) -> Tuple[str, str]: + if sys.platform in ['windows', 'win32']: + keyfile_path = self.keyfile_windows + keyfile_help = _('This file can be edited with Regdit') + elif 'ANDROID_DATA' in os.environ: + raise Exception('platform not supported') + else: + # treat unknown platforms as linux-like + keyfile_path = self.keyfile_linux + keyfile_help = _('The file must have root permissions') + return keyfile_path, keyfile_help - def get_external_plugin_dir(self): - if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'): - return - pkg_path = '/opt/electrum_plugins' + def create_new_key(self, password:str) -> str: + salt = os.urandom(32) + privkey = self.derive_privkey(password, salt) + pubkey = privkey.get_public_key_bytes() + key = chr(PLUGIN_PASSWORD_VERSION) + salt + pubkey + return key.hex() + + def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]: + """ + returns pubkey, salt + returns None, None if the pubkey has not been set + """ + if sys.platform in ['windows', 'win32']: + import winreg + with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey: + try: + with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key: + key_hex = winreg.QueryValue(key, "PluginsKey") + except Exception as e: + self.logger.info(f'winreg error: {e}') + return None, None + elif 'ANDROID_DATA' in os.environ: + return None, None + else: + # treat unknown platforms as linux-like + if not os.path.exists(self.keyfile_linux): + return None, None + if not self._has_root_permissions(self.keyfile_linux): + return + with open(self.keyfile_linux) as f: + key_hex = f.read() + key = bytes.fromhex(key_hex) + version = key[0] + if version != PLUGIN_PASSWORD_VERSION: + self.logger.info(f'unknown plugin password version: {version}') + return None, None + # all good + salt = key[1:1+32] + pubkey = key[1+32:] + return pubkey, salt + + def get_external_plugin_dir(self) -> str: + pkg_path = os.path.join(self.config.electrum_path(), 'plugins') if not os.path.exists(pkg_path): - self.logger.info(f'directory {pkg_path} does not exist') - return - if not self._has_root_permissions(pkg_path): - self.logger.info(f'not loading {pkg_path}: directory has user write permissions') - return + os.mkdir(pkg_path) return pkg_path - def zip_plugin_path(self, name): - filename = self.get_metadata(name)['filename'] + async def download_external_plugin(self, url): + filename = os.path.basename(urlparse(url).path) + pkg_path = self.get_external_plugin_dir() + path = os.path.join(pkg_path, filename) + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + if resp.status == 200: + with open(path, 'wb') as fd: + async for chunk in resp.content.iter_chunked(10): + fd.write(chunk) + return path + + def read_manifest(self, path) -> dict: + """ return json dict """ + with zipfile_lib.ZipFile(path) as file: + for filename in file.namelist(): + if filename.endswith('manifest.json'): + break + else: + raise Exception('could not find manifest.json in zip archive') + with file.open(filename, 'r') as f: + manifest = json.load(f) + manifest['path'] = path # external, path of the zipfile + manifest['dirname'] = os.path.dirname(filename) # internal + manifest['is_zip'] = True + manifest['zip_hash_sha256'] = get_file_hash256(path).hex() + return manifest + + def zip_plugin_path(self, name) -> str: + path = self.get_metadata(name)['path'] + filename = os.path.basename(path) if name in self.internal_plugin_metadata: pkg_path = self.pkgpath else: @@ -226,46 +290,37 @@ class Plugins(DaemonThread): path = os.path.join(pkg_path, filename) if not filename.endswith('.zip'): continue - if external and not self._has_root_permissions(path): - self.logger.info(f'not loading {path}: file has user write permissions') - continue try: - zipfile = zipimport.zipimporter(path) - except zipimport.ZipImportError: - self.logger.exception(f"unable to load zip plugin '{filename}'") + d = self.read_manifest(path) + name = d['name'] + except Exception: + self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True) continue - for name, b in pkgutil.iter_zipimport_modules(zipfile): - if b is False: + if name in self.internal_plugin_metadata: + raise Exception(f"duplicate plugins for name={name}") + if name in self.external_plugin_metadata: + raise Exception(f"duplicate plugins for name={name}") + if self.cmd_only and not self.config.get('enable_plugin_' + name): + continue + min_version = d.get('min_electrum_version') + if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION): + self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True) + continue + max_version = d.get('max_electrum_version') + if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION): + self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True) + continue + + if not self.cmd_only: + gui_good = self.gui_name in d.get('available_for', []) + if not gui_good: continue - if name in self.internal_plugin_metadata: - raise Exception(f"duplicate plugins for name={name}") - if name in self.external_plugin_metadata: - raise Exception(f"duplicate plugins for name={name}") - if self.cmd_only and not self.config.get('enable_plugin_' + name): + if 'fullname' not in d: continue - try: - with zipfile_lib.ZipFile(path) as file: - manifest_path = os.path.join(name, 'manifest.json') - with file.open(manifest_path, 'r') as f: - d = json.load(f) - except Exception: - self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True) - continue - d['filename'] = filename - d['is_zip'] = True - d['path'] = path - if not self.cmd_only: - gui_good = self.gui_name in d.get('available_for', []) - if not gui_good: - continue - if 'fullname' not in d: - continue - d['display_name'] = d['fullname'] - d['zip_hash_sha256'] = get_file_hash256(path) - if external: - self.external_plugin_metadata[name] = d - else: - self.internal_plugin_metadata[name] = d + if external: + self.external_plugin_metadata[name] = d + else: + self.internal_plugin_metadata[name] = d def get(self, name): return self.plugins.get(name) @@ -285,20 +340,23 @@ class Plugins(DaemonThread): def maybe_load_plugin_init_method(self, name: str) -> None: """Loads the __init__.py module of the plugin if it is not already loaded.""" is_external = name in self.external_plugin_metadata - base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name + base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name if base_name not in sys.modules: metadata = self.get_metadata(name) is_zip = metadata.get('is_zip', False) # if the plugin was not enabled on startup the init module hasn't been loaded yet if not is_zip: if is_external: + # this branch is deprecated: external plugins are always zip files path = os.path.join(metadata['path'], '__init__.py') init_spec = importlib.util.spec_from_file_location(base_name, path) else: init_spec = importlib.util.find_spec(base_name) else: zipfile = zipimport.zipimporter(metadata['path']) - init_spec = zipfile.find_spec(name) + dirname = metadata['dirname'] + init_spec = zipfile.find_spec(dirname) + self.exec_module_from_spec(init_spec, base_name) if name == "trustedcoin": # removes trustedcoin after loading to not show it in the list of plugins @@ -307,11 +365,12 @@ class Plugins(DaemonThread): def load_plugin_by_name(self, name: str) -> 'BasePlugin': if name in self.plugins: return self.plugins[name] - # if the plugin was not enabled on startup the init module hasn't been loaded yet self.maybe_load_plugin_init_method(name) - is_external = name in self.external_plugin_metadata + if is_external and not self.is_authorized(name): + self.logger.info(f'plugin not authorized {name}') + return if not is_external: full_name = f'electrum.plugins.{name}.{self.gui_name}' else: @@ -333,6 +392,41 @@ class Plugins(DaemonThread): def close_plugin(self, plugin): self.remove_jobs(plugin.thread_jobs()) + def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey: + from hashlib import pbkdf2_hmac + secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5) + return ECPrivkey(secret) + + def is_installed(self, name) -> bool: + """an external plugin may be installed but not authorized """ + return name in self.internal_plugin_metadata or name in self.external_plugin_metadata + + def is_authorized(self, name) -> bool: + if name in self.internal_plugin_metadata: + return True + if name not in self.external_plugin_metadata: + return False + pubkey_bytes, salt = self.get_pubkey_bytes() + if not pubkey_bytes: + return False + if not self.is_plugin_zip(name): + return False + filename = self.zip_plugin_path(name) + plugin_hash = get_file_hash256(filename) + sig = self.config.get('authorize_plugin_' + name) + if not sig: + return False + pubkey = ECPubkey(pubkey_bytes) + return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash) + + def authorize_plugin(self, name: str, filename, privkey: ECPrivkey): + pubkey_bytes, salt = self.get_pubkey_bytes() + assert pubkey_bytes == privkey.get_public_key_bytes() + plugin_hash = get_file_hash256(filename) + sig = privkey.ecdsa_sign(plugin_hash) + value = sig.hex() + self.config.set_key('authorize_plugin_' + name, value, save=True) + def enable(self, name: str) -> 'BasePlugin': self.config.set_key('enable_plugin_' + name, True, save=True) p = self.get(name) @@ -351,7 +445,7 @@ class Plugins(DaemonThread): @classmethod def is_plugin_enabler_config_key(cls, key: str) -> bool: - return key.startswith('enable_plugin_') + return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_') def toggle(self, name: str) -> Optional['BasePlugin']: p = self.get(name) @@ -435,10 +529,11 @@ class Plugins(DaemonThread): self.on_stop() -def get_file_hash256(path: str) -> str: - '''Get the sha256 hash of a file in hex, similar to `sha256sum`.''' +def get_file_hash256(path: str) -> bytes: + '''Get the sha256 hash of a file, similar to `sha256sum`.''' with open(path, 'rb') as f: - return sha256(f.read()).hex() + return sha256(f.read()) + def hook(func): hook_names.add(func.__name__) @@ -523,8 +618,10 @@ class BasePlugin(Logger): def read_file(self, filename: str) -> bytes: if self.parent.is_plugin_zip(self.name): plugin_filename = self.parent.zip_plugin_path(self.name) + metadata = self.parent.external_plugin_metadata[self.name] + dirname = metadata['dirname'] with zipfile_lib.ZipFile(plugin_filename) as myzip: - with myzip.open(os.path.join(self.name, filename)) as myfile: + with myzip.open(os.path.join(dirname, filename)) as myfile: return myfile.read() else: if self.name in self.parent.internal_plugin_metadata: