1
0

Userspace plugins:

- Allow plugins saved as zipfiles in user data dir
 - plugins are authorized with a user chosen password
 - pubkey derived from password is saved with admin permissions
This commit is contained in:
ThomasV
2025-04-04 14:23:52 +02:00
parent bd5de52768
commit 737417fb80
3 changed files with 403 additions and 135 deletions

View File

@@ -236,6 +236,32 @@ class ChangePasswordDialogBase(WindowModalDialog):
raise NotImplementedError()
class NewPasswordDialog(WindowModalDialog):
def __init__(self, parent, msg):
self.msg = msg
WindowModalDialog.__init__(self, parent)
OK_button = OkButton(self)
self.playout = PasswordLayout(
msg=self.msg,
kind=PW_CHANGE,
OK_button=OK_button,
wallet=None)
self.setWindowTitle(self.playout.title())
vbox = QVBoxLayout(self)
vbox.addLayout(self.playout.layout())
vbox.addStretch(1)
vbox.addLayout(Buttons(CancelButton(self), OK_button))
def run(self):
try:
if not self.exec():
return None
return self.playout.new_password()
finally:
self.playout.clear_password_fields()
class ChangePasswordDialogForSW(ChangePasswordDialogBase):
def create_password_layout(self, wallet, is_encrypted, OK_button):

View File

@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Optional
from functools import partial
from PyQt6.QtWidgets import QLabel, QVBoxLayout, QGridLayout, QPushButton, QWidget, QScrollArea, QCheckBox, QFormLayout
from PyQt6.QtWidgets import QLabel, QVBoxLayout, QGridLayout, QPushButton, QWidget, QScrollArea, QFormLayout
from electrum.i18n import _
from electrum.plugin import run_hook
@@ -11,12 +11,13 @@ from .util import WindowModalDialog, Buttons, CloseButton, WWLabel, insert_space
if TYPE_CHECKING:
from .main_window import ElectrumWindow
from electrum_cc import ECPrivkey
class PluginDialog(WindowModalDialog):
def __init__(self, name, metadata, cb: 'QCheckBox', window: 'ElectrumWindow', index:int):
display_name = metadata.get('display_name', '')
def __init__(self, name, metadata, status_button: Optional['PluginStatusButton'], window: 'ElectrumWindow'):
display_name = metadata.get('fullname', '')
author = metadata.get('author', '')
description = metadata.get('description', '')
requires = metadata.get('requires')
@@ -24,14 +25,13 @@ class PluginDialog(WindowModalDialog):
zip_hash = metadata.get('zip_hash_sha256', None)
WindowModalDialog.__init__(self, window, 'Plugin')
self.setMinimumSize(400,250)
self.index = index
self.setMinimumSize(400, 250)
self.window = window
self.metadata = metadata
self.plugins = self.window.plugins
self.name = name
self.cb = cb
p = self.plugins.get(name) # is installed
self.status_button = status_button
p = self.plugins.get(name) # is enabled
vbox = QVBoxLayout(self)
form = QFormLayout(None)
form.addRow(QLabel(_('Name') + ':'), QLabel(display_name))
@@ -44,24 +44,75 @@ class PluginDialog(WindowModalDialog):
msg = '\n'.join(map(lambda x: x[1], requires))
form.addRow(QLabel(_('Requires') + ':'), WWLabel(msg))
vbox.addLayout(form)
text = _('Disable') if p else _('Enable')
toggle_button = QPushButton(text)
toggle_button.clicked.connect(partial(self.do_toggle, toggle_button, name))
toggle_button = QPushButton('')
if not self.plugins.is_installed(name):
toggle_button.setText(_('Install...'))
toggle_button.clicked.connect(self.accept)
else:
text = (_('Disable') if p else _('Enable')) if self.plugins.is_authorized(name) else _('Authorize...')
toggle_button.setText(text)
toggle_button.clicked.connect(partial(self.do_toggle, toggle_button, name))
close_button = CloseButton(self)
close_button.setText(_('Cancel'))
close_button.setText(_('Close'))
buttons = [toggle_button, close_button]
# add settings widget
if p and p.requires_settings() and p.is_enabled():
widget = p.settings_widget(self)
buttons.insert(0, widget)
vbox.addLayout(Buttons(*buttons))
def do_toggle(self, button, name):
button.setEnabled(False)
p = self.plugins.toggle(name)
self.cb.setChecked(bool(p))
def do_toggle(self, toggle_button, name):
toggle_button.setEnabled(False)
if not self.plugins.is_authorized(name):
privkey = self.window.get_plugins_privkey()
if not privkey:
return
filename = self.plugins.zip_plugin_path(name)
self.window.plugins.authorize_plugin(name, filename, privkey)
self.status_button.update()
self.close()
return
p = self.plugins.get(name)
if not p:
self.plugins.enable(name)
else:
self.plugins.disable(name)
self.status_button.update()
self.close()
self.window.enable_settings_widget(name, self.index)
# note: all enabled plugins will receive this hook:
run_hook('init_qt', self.window.window.gui_object)
class PluginStatusButton(QPushButton):
def __init__(self, window, name):
QPushButton.__init__(self, '')
self.window = window
self.plugins = window.plugins
self.name = name
self.clicked.connect(self.show_plugin_dialog)
self.update()
def show_plugin_dialog(self):
metadata = self.plugins.descriptions[self.name]
d = PluginDialog(self.name, metadata, self, self.window)
d.exec()
def update(self):
from .util import ColorScheme
p = self.plugins.get(self.name)
plugin_is_loaded = p is not None
enabled = (
not plugin_is_loaded and self.plugins.is_available(self.name, self.window.wallet)
or plugin_is_loaded and p.can_user_disable()
)
self.setEnabled(enabled)
text, color = (_('Unauthorized'), ColorScheme.RED) if not self.window.plugins.is_authorized(self.name)\
else ((_('Enabled'), ColorScheme.BLUE) if p is not None and p.is_enabled() else (_('Disabled'), ColorScheme.DEFAULT))
self.setStyleSheet(color.as_stylesheet())
self.setText(text)
class PluginsDialog(WindowModalDialog):
def __init__(self, window: 'ElectrumWindow'):
@@ -70,63 +121,157 @@ class PluginsDialog(WindowModalDialog):
self.wallet = self.window.wallet
self.config = window.config
self.plugins = self.window.gui_object.plugins
self.settings_widgets = {}
vbox = QVBoxLayout(self)
scroll = QScrollArea()
scroll.setEnabled(True)
scroll.setWidgetResizable(True)
scroll.setMinimumSize(400,250)
scroll.setMinimumSize(400, 250)
scroll_w = QWidget()
scroll.setWidget(scroll_w)
self.grid = QGridLayout()
self.grid.setColumnStretch(0,1)
self.grid.setColumnStretch(0, 1)
scroll_w.setLayout(self.grid)
vbox.addWidget(scroll)
vbox.addLayout(Buttons(CloseButton(self)))
add_button = QPushButton(_('Add'))
add_button.clicked.connect(self.add_plugin_dialog)
#add_button.clicked.connect(self.download_plugin_dialog)
vbox.addLayout(Buttons(add_button, CloseButton(self)))
self.show_list()
def enable_settings_widget(self, name: str, i: int):
p = self.plugins.get(name)
widget = self.settings_widgets.get(name) # type: Optional[QWidget]
if widget and not p:
# plugin got disabled, rm widget
self.grid.removeWidget(widget)
widget.setParent(None)
self.settings_widgets.pop(name)
elif widget is None and p and p.requires_settings() and p.is_enabled():
# plugin got enabled, add widget
widget = self.settings_widgets[name] = p.settings_widget(self)
self.grid.addWidget(widget, i, 1)
def get_plugins_privkey(self) -> Optional['ECPrivkey']:
pubkey, salt = self.plugins.get_pubkey_bytes()
if not pubkey:
self.init_plugins_password()
return
# ask for url and password, same window
pw = self.window.password_dialog(
msg=' '.join([
_('<b>Warning</b>: Third-party plugins are not endorsed by Electrum!'),
'<br/><br/>',
_('If you install a third-party plugin, you trust the software not to be malicious.'),
_('Electrum will not be responsible in case of theft, loss of funds or privacy that might result from third-party plugins.'),
_('You should at minimum check who the author of the plugin is, and be careful of imposters.'),
'<br/><br/>',
_('Please enter your plugin authorization password') + ':'
])
)
if not pw:
return
privkey = self.plugins.derive_privkey(pw, salt)
if pubkey != privkey.get_public_key_bytes():
keyfile_path, keyfile_help = self.plugins.get_keyfile_path()
self.window.show_error(
''.join([
_('Incorrect password.'), '\n\n',
_('Your plugin authorization password is required to install plugins.'), ' ',
_('If you need to reset it, remove the following file:'), '\n\n',
keyfile_path
]))
return
return privkey
def init_plugins_password(self):
from .password_dialog import NewPasswordDialog
msg = ' '.join([
_('In order to install third-party plugins, you need to choose a plugin authorization password.'),
_('Its purpose is to prevent unauthorized users (or malware) from installing plugins.'),
])
d = NewPasswordDialog(self, msg=msg)
pw = d.run()
if not pw:
return
key_hex = self.plugins.create_new_key(pw)
keyfile_path, keyfile_help = self.plugins.get_keyfile_path()
msg = ''.join([
_('Your plugins key is:'), '\n\n', key_hex, '\n\n',
_('Please save this key in'), '\n\n' + keyfile_path, '\n\n', keyfile_help
])
self.window.do_copy(key_hex, title=_('Plugins key'))
self.window.show_message(msg)
def download_plugin_dialog(self):
import os
from .util import line_dialog
from electrum.util import UserCancelled
pubkey, salt = self.plugins.get_pubkey_bytes()
if not pubkey:
self.init_plugins_password()
return
url = line_dialog(self, 'url', _('Enter plugin URL'), _('Download'))
if not url:
return
coro = self.plugins.download_external_plugin(url)
try:
path = self.window.run_coroutine_dialog(coro, "Downloading plugin...")
except UserCancelled:
return
except Exception as e:
self.window.show_error(f"{e}")
return
try:
success = self.confirm_add_plugin(path)
except Exception as e:
self.window.show_error(f"{e}")
success = False
if not success:
os.unlink(path)
def add_plugin_dialog(self):
from PyQt6.QtWidgets import QFileDialog
import shutil, os
pubkey, salt = self.plugins.get_pubkey_bytes()
if not pubkey:
self.init_plugins_password()
return
filename, __ = QFileDialog.getOpenFileName(self, "Select your plugin zipfile", "", "*.zip")
if not filename:
return
plugins_dir = self.plugins.get_external_plugin_dir()
path = os.path.join(plugins_dir, os.path.basename(filename))
shutil.copyfile(filename, path)
try:
success = self.confirm_add_plugin(path)
except Exception as e:
self.window.show_error(f"{e}")
success = False
if not success:
os.unlink(path)
def confirm_add_plugin(self, path):
manifest = self.plugins.read_manifest(path)
name = manifest['name']
d = PluginDialog(name, manifest, None, self)
if not d.exec():
return False
# ask password once user has approved
privkey = self.get_plugins_privkey()
if not privkey:
return False
self.plugins.external_plugin_metadata[name] = manifest
self.plugins.authorize_plugin(name, path, privkey)
self.window.show_message(_('Plugin installed successfully.'))
self.show_list()
return True
def show_list(self):
descriptions = self.plugins.descriptions
descriptions = sorted(descriptions.items())
grid = self.grid
# clear existing items
for i in reversed(range(grid.count())):
grid.itemAt(i).widget().setParent(None)
# populate
i = 0
for name, metadata in descriptions:
i += 1
p = self.plugins.get(name)
if metadata.get('registers_keystore'):
continue
display_name = metadata.get('display_name')
display_name = metadata.get('fullname')
if not display_name:
continue
#try:
cb = QCheckBox(display_name)
plugin_is_loaded = p is not None
cb_enabled = (not plugin_is_loaded and self.plugins.is_available(name, self.wallet)
or plugin_is_loaded and p.can_user_disable())
cb.setEnabled(cb_enabled)
cb.setChecked(plugin_is_loaded and p.is_enabled())
grid.addWidget(cb, i, 0)
self.enable_settings_widget(name, i)
cb.clicked.connect(partial(self.show_plugin_dialog, name, cb, i))
#grid.setRowStretch(len(descriptions), 1)
def show_plugin_dialog(self, name, cb, i):
p = self.plugins.get(name)
metadata = self.plugins.descriptions[name]
cb.setChecked(p is not None and p.is_enabled())
d = PluginDialog(name, metadata, cb, self, i)
d.exec()
label = QLabel(display_name)
grid.addWidget(label, i, 0)
status_button = PluginStatusButton(self, name)
grid.addWidget(status_button, i, 1)
# add stretch
grid.setRowStretch(i + 1, 1)

View File

@@ -29,24 +29,27 @@ import pkgutil
import importlib.util
import time
import threading
import traceback
import sys
import aiohttp
import zipfile as zipfile_lib
from urllib.parse import urlparse
from typing import (NamedTuple, Any, Union, TYPE_CHECKING, Optional, Tuple,
Dict, Iterable, List, Sequence, Callable, TypeVar, Mapping)
import concurrent
import zipimport
from concurrent import futures
from functools import wraps, partial
from itertools import chain
from electrum_ecc import ECPrivkey, ECPubkey
from ._vendor.distutils.version import StrictVersion
from .version import ELECTRUM_VERSION
from .i18n import _
from .util import (profiler, DaemonThread, UserCancelled, ThreadJob, UserFacingException)
from . import bip32
from . import plugins
from .simple_config import ConfigVar, SimpleConfig
from .simple_config import SimpleConfig
from .logging import get_logger, Logger
from .crypto import sha256
@@ -60,17 +63,20 @@ _logger = get_logger(__name__)
plugin_loaders = {}
hook_names = set()
hooks = {}
_root_permission_cache = {}
_exec_module_failure = {} # type: Dict[str, Exception]
PLUGIN_PASSWORD_VERSION = 1
class Plugins(DaemonThread):
LOGGING_SHORTCUT = 'p'
pkgpath = os.path.dirname(plugins.__file__)
keyfile_linux = '/etc/electrum/plugins_key'
keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey'
@profiler
def __init__(self, config: SimpleConfig, gui_name = None, cmd_only: bool = False):
def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
self.config = config
self.cmd_only = cmd_only # type: bool
self.internal_plugin_metadata = {}
@@ -106,9 +112,6 @@ class Plugins(DaemonThread):
if loader.__class__.__qualname__ == "PyiFrozenImporter":
continue
module_path = os.path.join(pkg_path, name)
if external and not self._has_recursive_root_permissions(module_path):
self.logger.info(f"Not loading plugin {module_path}: directory has user write permissions")
continue
if self.cmd_only and not self.config.get('enable_plugin_' + name) is True:
continue
try:
@@ -119,7 +122,6 @@ class Plugins(DaemonThread):
continue
if 'fullname' not in d:
continue
d['display_name'] = d['fullname']
d['path'] = module_path
if not self.cmd_only:
gui_good = self.gui_name in d.get('available_for', [])
@@ -164,10 +166,11 @@ class Plugins(DaemonThread):
internal_plugins_path = (self.pkgpath, False)
external_plugins_path = (self.get_external_plugin_dir(), True)
for pkg_path, external in (internal_plugins_path, external_plugins_path):
# external plugins enforce root permissions on the directory
if pkg_path and os.path.exists(pkg_path):
self.find_directory_plugins(pkg_path=pkg_path, external=external)
self.find_zip_plugins(pkg_path=pkg_path, external=external)
if not external:
self.find_directory_plugins(pkg_path=pkg_path, external=external)
else:
self.find_zip_plugins(pkg_path=pkg_path, external=external)
def load_plugins(self):
for name, d in chain(self.internal_plugin_metadata.items(), self.external_plugin_metadata.items()):
@@ -183,35 +186,96 @@ class Plugins(DaemonThread):
def _has_root_permissions(self, path):
return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
@profiler(min_threshold=0.5)
def _has_recursive_root_permissions(self, path):
"""Check if a directory and all its subdirectories have root permissions"""
if _root_permission_cache.get(path) is not None:
return _root_permission_cache[path]
_root_permission_cache[path] = False
for root, dirs, files in os.walk(path):
if not self._has_root_permissions(root):
return False
for f in files:
if not self._has_root_permissions(os.path.join(root, f)):
return False
_root_permission_cache[path] = True
return True
def get_keyfile_path(self) -> Tuple[str, str]:
if sys.platform in ['windows', 'win32']:
keyfile_path = self.keyfile_windows
keyfile_help = _('This file can be edited with Regdit')
elif 'ANDROID_DATA' in os.environ:
raise Exception('platform not supported')
else:
# treat unknown platforms as linux-like
keyfile_path = self.keyfile_linux
keyfile_help = _('The file must have root permissions')
return keyfile_path, keyfile_help
def get_external_plugin_dir(self):
if sys.platform not in ['linux', 'darwin'] and not sys.platform.startswith('freebsd'):
return
pkg_path = '/opt/electrum_plugins'
def create_new_key(self, password:str) -> str:
salt = os.urandom(32)
privkey = self.derive_privkey(password, salt)
pubkey = privkey.get_public_key_bytes()
key = chr(PLUGIN_PASSWORD_VERSION) + salt + pubkey
return key.hex()
def get_pubkey_bytes(self) -> Tuple[Optional[bytes], bytes]:
"""
returns pubkey, salt
returns None, None if the pubkey has not been set
"""
if sys.platform in ['windows', 'win32']:
import winreg
with winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE) as hkey:
try:
with winreg.OpenKey(hkey, r"SOFTWARE\\Electrum") as key:
key_hex = winreg.QueryValue(key, "PluginsKey")
except Exception as e:
self.logger.info(f'winreg error: {e}')
return None, None
elif 'ANDROID_DATA' in os.environ:
return None, None
else:
# treat unknown platforms as linux-like
if not os.path.exists(self.keyfile_linux):
return None, None
if not self._has_root_permissions(self.keyfile_linux):
return
with open(self.keyfile_linux) as f:
key_hex = f.read()
key = bytes.fromhex(key_hex)
version = key[0]
if version != PLUGIN_PASSWORD_VERSION:
self.logger.info(f'unknown plugin password version: {version}')
return None, None
# all good
salt = key[1:1+32]
pubkey = key[1+32:]
return pubkey, salt
def get_external_plugin_dir(self) -> str:
pkg_path = os.path.join(self.config.electrum_path(), 'plugins')
if not os.path.exists(pkg_path):
self.logger.info(f'directory {pkg_path} does not exist')
return
if not self._has_root_permissions(pkg_path):
self.logger.info(f'not loading {pkg_path}: directory has user write permissions')
return
os.mkdir(pkg_path)
return pkg_path
def zip_plugin_path(self, name):
filename = self.get_metadata(name)['filename']
async def download_external_plugin(self, url):
filename = os.path.basename(urlparse(url).path)
pkg_path = self.get_external_plugin_dir()
path = os.path.join(pkg_path, filename)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
with open(path, 'wb') as fd:
async for chunk in resp.content.iter_chunked(10):
fd.write(chunk)
return path
def read_manifest(self, path) -> dict:
""" return json dict """
with zipfile_lib.ZipFile(path) as file:
for filename in file.namelist():
if filename.endswith('manifest.json'):
break
else:
raise Exception('could not find manifest.json in zip archive')
with file.open(filename, 'r') as f:
manifest = json.load(f)
manifest['path'] = path # external, path of the zipfile
manifest['dirname'] = os.path.dirname(filename) # internal
manifest['is_zip'] = True
manifest['zip_hash_sha256'] = get_file_hash256(path).hex()
return manifest
def zip_plugin_path(self, name) -> str:
path = self.get_metadata(name)['path']
filename = os.path.basename(path)
if name in self.internal_plugin_metadata:
pkg_path = self.pkgpath
else:
@@ -226,46 +290,37 @@ class Plugins(DaemonThread):
path = os.path.join(pkg_path, filename)
if not filename.endswith('.zip'):
continue
if external and not self._has_root_permissions(path):
self.logger.info(f'not loading {path}: file has user write permissions')
continue
try:
zipfile = zipimport.zipimporter(path)
except zipimport.ZipImportError:
self.logger.exception(f"unable to load zip plugin '{filename}'")
d = self.read_manifest(path)
name = d['name']
except Exception:
self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
continue
for name, b in pkgutil.iter_zipimport_modules(zipfile):
if b is False:
if name in self.internal_plugin_metadata:
raise Exception(f"duplicate plugins for name={name}")
if name in self.external_plugin_metadata:
raise Exception(f"duplicate plugins for name={name}")
if self.cmd_only and not self.config.get('enable_plugin_' + name):
continue
min_version = d.get('min_electrum_version')
if min_version and StrictVersion(min_version) > StrictVersion(ELECTRUM_VERSION):
self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
continue
max_version = d.get('max_electrum_version')
if max_version and StrictVersion(max_version) < StrictVersion(ELECTRUM_VERSION):
self.logger.info(f"version mismatch for zip plugin {filename}", exc_info=True)
continue
if not self.cmd_only:
gui_good = self.gui_name in d.get('available_for', [])
if not gui_good:
continue
if name in self.internal_plugin_metadata:
raise Exception(f"duplicate plugins for name={name}")
if name in self.external_plugin_metadata:
raise Exception(f"duplicate plugins for name={name}")
if self.cmd_only and not self.config.get('enable_plugin_' + name):
if 'fullname' not in d:
continue
try:
with zipfile_lib.ZipFile(path) as file:
manifest_path = os.path.join(name, 'manifest.json')
with file.open(manifest_path, 'r') as f:
d = json.load(f)
except Exception:
self.logger.info(f"could not load manifest.json from zip plugin {filename}", exc_info=True)
continue
d['filename'] = filename
d['is_zip'] = True
d['path'] = path
if not self.cmd_only:
gui_good = self.gui_name in d.get('available_for', [])
if not gui_good:
continue
if 'fullname' not in d:
continue
d['display_name'] = d['fullname']
d['zip_hash_sha256'] = get_file_hash256(path)
if external:
self.external_plugin_metadata[name] = d
else:
self.internal_plugin_metadata[name] = d
if external:
self.external_plugin_metadata[name] = d
else:
self.internal_plugin_metadata[name] = d
def get(self, name):
return self.plugins.get(name)
@@ -285,20 +340,23 @@ class Plugins(DaemonThread):
def maybe_load_plugin_init_method(self, name: str) -> None:
"""Loads the __init__.py module of the plugin if it is not already loaded."""
is_external = name in self.external_plugin_metadata
base_name = (f'electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
base_name = ('electrum_external_plugins.' if is_external else 'electrum.plugins.') + name
if base_name not in sys.modules:
metadata = self.get_metadata(name)
is_zip = metadata.get('is_zip', False)
# if the plugin was not enabled on startup the init module hasn't been loaded yet
if not is_zip:
if is_external:
# this branch is deprecated: external plugins are always zip files
path = os.path.join(metadata['path'], '__init__.py')
init_spec = importlib.util.spec_from_file_location(base_name, path)
else:
init_spec = importlib.util.find_spec(base_name)
else:
zipfile = zipimport.zipimporter(metadata['path'])
init_spec = zipfile.find_spec(name)
dirname = metadata['dirname']
init_spec = zipfile.find_spec(dirname)
self.exec_module_from_spec(init_spec, base_name)
if name == "trustedcoin":
# removes trustedcoin after loading to not show it in the list of plugins
@@ -307,11 +365,12 @@ class Plugins(DaemonThread):
def load_plugin_by_name(self, name: str) -> 'BasePlugin':
if name in self.plugins:
return self.plugins[name]
# if the plugin was not enabled on startup the init module hasn't been loaded yet
self.maybe_load_plugin_init_method(name)
is_external = name in self.external_plugin_metadata
if is_external and not self.is_authorized(name):
self.logger.info(f'plugin not authorized {name}')
return
if not is_external:
full_name = f'electrum.plugins.{name}.{self.gui_name}'
else:
@@ -333,6 +392,41 @@ class Plugins(DaemonThread):
def close_plugin(self, plugin):
self.remove_jobs(plugin.thread_jobs())
def derive_privkey(self, pw: str, salt:bytes) -> ECPrivkey:
from hashlib import pbkdf2_hmac
secret = pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations=10**5)
return ECPrivkey(secret)
def is_installed(self, name) -> bool:
"""an external plugin may be installed but not authorized """
return name in self.internal_plugin_metadata or name in self.external_plugin_metadata
def is_authorized(self, name) -> bool:
if name in self.internal_plugin_metadata:
return True
if name not in self.external_plugin_metadata:
return False
pubkey_bytes, salt = self.get_pubkey_bytes()
if not pubkey_bytes:
return False
if not self.is_plugin_zip(name):
return False
filename = self.zip_plugin_path(name)
plugin_hash = get_file_hash256(filename)
sig = self.config.get('authorize_plugin_' + name)
if not sig:
return False
pubkey = ECPubkey(pubkey_bytes)
return pubkey.ecdsa_verify(bytes.fromhex(sig), plugin_hash)
def authorize_plugin(self, name: str, filename, privkey: ECPrivkey):
pubkey_bytes, salt = self.get_pubkey_bytes()
assert pubkey_bytes == privkey.get_public_key_bytes()
plugin_hash = get_file_hash256(filename)
sig = privkey.ecdsa_sign(plugin_hash)
value = sig.hex()
self.config.set_key('authorize_plugin_' + name, value, save=True)
def enable(self, name: str) -> 'BasePlugin':
self.config.set_key('enable_plugin_' + name, True, save=True)
p = self.get(name)
@@ -351,7 +445,7 @@ class Plugins(DaemonThread):
@classmethod
def is_plugin_enabler_config_key(cls, key: str) -> bool:
return key.startswith('enable_plugin_')
return key.startswith('enable_plugin_') or key.startswith('authorize_plugin_')
def toggle(self, name: str) -> Optional['BasePlugin']:
p = self.get(name)
@@ -435,10 +529,11 @@ class Plugins(DaemonThread):
self.on_stop()
def get_file_hash256(path: str) -> str:
'''Get the sha256 hash of a file in hex, similar to `sha256sum`.'''
def get_file_hash256(path: str) -> bytes:
'''Get the sha256 hash of a file, similar to `sha256sum`.'''
with open(path, 'rb') as f:
return sha256(f.read()).hex()
return sha256(f.read())
def hook(func):
hook_names.add(func.__name__)
@@ -523,8 +618,10 @@ class BasePlugin(Logger):
def read_file(self, filename: str) -> bytes:
if self.parent.is_plugin_zip(self.name):
plugin_filename = self.parent.zip_plugin_path(self.name)
metadata = self.parent.external_plugin_metadata[self.name]
dirname = metadata['dirname']
with zipfile_lib.ZipFile(plugin_filename) as myzip:
with myzip.open(os.path.join(self.name, filename)) as myfile:
with myzip.open(os.path.join(dirname, filename)) as myfile:
return myfile.read()
else:
if self.name in self.parent.internal_plugin_metadata: