1
0

plugins: add functionality to allow setting plugin pubkey from gui

Adds functionality that allows the user to store the plugin authorization pubkey without having to edit files/registry manually.
On Linux systems it spawns the commands in a subprocess with pkexec which will trigger a OS prompt to execute the commands as root.
The user sees the executed commands and can either authorize with the root password or decline.
On windows it uses the windows `ShellExecuteExW` api to edit the registry, this also triggers a OS dialog to accept or decline (UAC dialog).
There is also functionality to reset the key again, which works in the same way.
This commit is contained in:
f321x
2025-04-28 10:17:53 +02:00
parent 2b68f65aac
commit abc50a97e1
2 changed files with 321 additions and 38 deletions

View File

@@ -3,7 +3,8 @@ from functools import partial
import shutil import shutil
import os import os
from PyQt6.QtWidgets import QLabel, QVBoxLayout, QGridLayout, QPushButton, QWidget, QScrollArea, QFormLayout, QFileDialog, QMenu, QApplication from PyQt6.QtWidgets import QLabel, QVBoxLayout, QGridLayout, QPushButton, QWidget, QScrollArea, \
QFormLayout, QFileDialog, QMenu, QApplication, QMessageBox
from PyQt6.QtCore import QTimer from PyQt6.QtCore import QTimer
from electrum.i18n import _ from electrum.i18n import _
@@ -15,7 +16,7 @@ from .util import (WindowModalDialog, Buttons, CloseButton, WWLabel, insert_spac
if TYPE_CHECKING: if TYPE_CHECKING:
from . import ElectrumGui from . import ElectrumGui
from electrum_cc import ECPrivkey from electrum_ecc import ECPrivkey
from electrum.simple_config import SimpleConfig from electrum.simple_config import SimpleConfig
from electrum.plugin import Plugins from electrum.plugin import Plugins
@@ -174,6 +175,7 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
scroll_w.setLayout(self.grid) scroll_w.setLayout(self.grid)
vbox.addWidget(scroll) vbox.addWidget(scroll)
add_button = QPushButton(_('Add')) add_button = QPushButton(_('Add'))
add_button.setMinimumWidth(40) # looks better on windows, no difference on linux
menu = QMenu() menu = QMenu()
for name, item in self.plugins.internal_plugin_metadata.items(): for name, item in self.plugins.internal_plugin_metadata.items():
fullname = item['fullname'] fullname = item['fullname']
@@ -194,7 +196,7 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
pubkey, salt = self.plugins.get_pubkey_bytes() pubkey, salt = self.plugins.get_pubkey_bytes()
if not pubkey: if not pubkey:
self.init_plugins_password() self.init_plugins_password()
return return None
# ask for url and password, same window # ask for url and password, same window
pw = self.password_dialog( pw = self.password_dialog(
msg=' '.join([ msg=' '.join([
@@ -208,18 +210,39 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
]) ])
) )
if not pw: if not pw:
return return None
privkey = self.plugins.derive_privkey(pw, salt) privkey = self.plugins.derive_privkey(pw, salt)
if pubkey != privkey.get_public_key_bytes(): if pubkey != privkey.get_public_key_bytes():
keyfile_path, keyfile_help = self.plugins.get_keyfile_path() keyfile_path, _keyfile_help = self.plugins.get_keyfile_path(None)
self.show_error(
''.join([ while True:
_('Incorrect password.'), '\n\n', exit_dialog = True
_('Your plugin authorization password is required to install plugins.'), ' ', auto_reset_btn = QPushButton(_('Try Auto-Reset'))
_('If you need to reset it, remove the following file:'), '\n\n', def on_try_auto_reset_clicked():
keyfile_path nonlocal exit_dialog
])) if not self.plugins.try_auto_key_reset():
return self.show_error(_("Auto-Reset not possible. Delete the file manually."))
exit_dialog = False
else:
self.show_message(_("Auto-Reset successful. You can now setup a new password."))
auto_reset_btn.clicked.connect(on_try_auto_reset_clicked)
buttons = [
QMessageBox.StandardButton.Ok,
(auto_reset_btn, QMessageBox.ButtonRole.ActionRole, 0),
]
if self.show_error(
''.join([
_('Incorrect password.'), '\n\n',
_('Your plugin authorization password is required to install plugins.'), ' ',
_('If you need to reset it, remove the following file:'), '\n\n',
keyfile_path
]),
buttons=buttons
) or exit_dialog:
break
return None
return privkey return privkey
def init_plugins_password(self): def init_plugins_password(self):
@@ -233,7 +256,7 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
if not pw: if not pw:
return return
key_hex = self.plugins.create_new_key(pw) key_hex = self.plugins.create_new_key(pw)
keyfile_path, keyfile_help = self.plugins.get_keyfile_path() keyfile_path, keyfile_help = self.plugins.get_keyfile_path(key_hex)
msg = '\n\n'.join([ msg = '\n\n'.join([
_('Your plugins key is:'), key_hex, _('Your plugins key is:'), key_hex,
_('This key has been copied to your clipboard. Please save it in:'), _('This key has been copied to your clipboard. Please save it in:'),
@@ -243,10 +266,30 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
]) ])
clipboard = QApplication.clipboard() clipboard = QApplication.clipboard()
clipboard.setText(key_hex) clipboard.setText(key_hex)
self.show_message(msg)
while True:
exit_dialog = True
# the button has to be recreated inside the loop, as qt destroys it when the dialog is closed
auto_setup_btn = QPushButton(_('Try Auto-Setup'))
def on_auto_setup_clicked():
nonlocal exit_dialog
if not self.plugins.try_auto_key_setup(key_hex):
self.show_error(_("Auto-Setup not possible. Try the manual setup."))
exit_dialog = False
else:
self.show_message(_("Auto-Setup successful. You can now install plugins."))
auto_setup_btn.clicked.connect(on_auto_setup_clicked)
# on windows, the auto-setup button is shown right of the ok button,
# apparently due to OS conventions
buttons = [
(auto_setup_btn, QMessageBox.ButtonRole.ActionRole, 0),
QMessageBox.StandardButton.Ok,
]
if self.show_message(msg, buttons=buttons) or exit_dialog:
break
def download_plugin_dialog(self): def download_plugin_dialog(self):
import os
from .util import line_dialog from .util import line_dialog
from electrum.util import UserCancelled from electrum.util import UserCancelled
pubkey, salt = self.plugins.get_pubkey_bytes() pubkey, salt = self.plugins.get_pubkey_bytes()
@@ -263,18 +306,10 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
except UserCancelled: except UserCancelled:
return return
except Exception as e: except Exception as e:
self._logger.exception("")
self.show_error(f"{e}") self.show_error(f"{e}")
return return
try: self._try_add_external_plugin_from_path(path)
success = self.add_external_plugin(path)
except Exception as e:
self.show_error(f"{e}")
success = False
if not success:
try:
os.unlink(path)
except FileNotFoundError:
self._logger.debug("", exc_info=True)
def add_plugin_dialog(self): def add_plugin_dialog(self):
pubkey, salt = self.plugins.get_pubkey_bytes() pubkey, salt = self.plugins.get_pubkey_bytes()
@@ -290,9 +325,13 @@ class PluginsDialog(WindowModalDialog, MessageBoxMixin):
self.show_warning(_('Plugin already installed.')) self.show_warning(_('Plugin already installed.'))
return return
shutil.copyfile(filename, path) shutil.copyfile(filename, path)
self._try_add_external_plugin_from_path(path)
def _try_add_external_plugin_from_path(self, path: str):
try: try:
success = self.add_external_plugin(path) success = self.add_external_plugin(path)
except Exception as e: except Exception as e:
self._logger.exception("")
self.show_error(f"{e}") self.show_error(f"{e}")
success = False success = False
if not success: if not success:

View File

@@ -72,8 +72,9 @@ class Plugins(DaemonThread):
LOGGING_SHORTCUT = 'p' LOGGING_SHORTCUT = 'p'
pkgpath = os.path.dirname(plugins.__file__) pkgpath = os.path.dirname(plugins.__file__)
keyfile_linux = '/etc/electrum/plugins_key' # TODO: use XDG Base Directory Specification instead of hardcoding /etc
keyfile_windows = 'C:\\HKEY_LOCAL_MACHINE\\SOFTWARE\\Electrum\\PluginsKey' keyfile_posix = '/etc/electrum/plugins_key'
keyfile_windows = r'HKEY_LOCAL_MACHINE\SOFTWARE\Electrum\PluginsKey'
@profiler @profiler
def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False): def __init__(self, config: SimpleConfig, gui_name: str = None, cmd_only: bool = False):
@@ -187,18 +188,256 @@ class Plugins(DaemonThread):
def _has_root_permissions(self, path): def _has_root_permissions(self, path):
return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK) return os.stat(path).st_uid == 0 and not os.access(path, os.W_OK)
def get_keyfile_path(self) -> Tuple[str, str]: def get_keyfile_path(self, key_hex: Optional[str]) -> Tuple[str, str]:
if sys.platform in ['windows', 'win32']: if sys.platform in ['windows', 'win32']:
keyfile_path = self.keyfile_windows keyfile_path = self.keyfile_windows
keyfile_help = _('This file can be edited with Regdit') keyfile_help = _('This file can be edited with Regdit')
elif 'ANDROID_DATA' in os.environ: elif 'ANDROID_DATA' in os.environ:
raise Exception('platform not supported') raise Exception('platform not supported')
else: else:
# treat unknown platforms as linux-like # treat unknown platforms and macOS as linux-like
keyfile_path = self.keyfile_linux keyfile_path = self.keyfile_posix
keyfile_help = _('The file must have root permissions') keyfile_help = "" if not key_hex else "".join([
_('The file must have root permissions'),
".\n\n",
_("To set it you can also use the Auto-Setup or run "
"the following terminal command"),
":\n\n",
f"sudo sh -c \"{self._posix_plugin_key_creation_command(key_hex)}\"",
])
return keyfile_path, keyfile_help return keyfile_path, keyfile_help
def try_auto_key_setup(self, pubkey_hex: str) -> bool:
"""Can be called from the GUI to store the plugin pubkey as root/admin user"""
try:
if sys.platform in ['windows', 'win32']:
self._write_key_to_regedit_windows(pubkey_hex)
elif 'ANDROID_DATA' in os.environ:
raise Exception('platform not supported')
elif sys.platform.startswith('darwin'): # macOS
self._write_key_to_root_file_macos(pubkey_hex)
else:
self._write_key_to_root_file_linux(pubkey_hex)
except Exception:
self.logger.exception(f"auto-key setup for {pubkey_hex} failed")
return False
return True
def try_auto_key_reset(self) -> bool:
try:
if sys.platform in ['windows', 'win32']:
self._delete_plugin_key_from_windows_registry()
elif 'ANDROID_DATA' in os.environ:
raise Exception('platform not supported')
elif sys.platform.startswith('darwin'): # macOS
self._delete_macos_plugin_keyfile()
else:
self._delete_linux_plugin_keyfile()
except Exception:
self.logger.exception(f'auto-reset of plugin key failed')
return False
return True
def _posix_plugin_key_creation_command(self, pubkey_hex: str) -> str:
"""creates the dir (dir_path), writes the key in file, and sets permissions to 644"""
dir_path: str = os.path.dirname(self.keyfile_posix)
sh_command = (
f"mkdir -p {dir_path} " # create the /etc/electrum dir
f"&& printf '%s' '{pubkey_hex}' > {self.keyfile_posix} " # write the key to the file
f"&& chmod 644 {self.keyfile_posix} " # set read permissions for the file
f"&& chmod 755 {dir_path}" # set read permissions for the dir
)
return sh_command
@staticmethod
def _get_macos_osascript_command(commands: List[str]) -> List[str]:
"""
Inspired by
https://github.com/barneygale/elevate/blob/01263b690288f022bf6fa702711ac96816bc0e74/elevate/posix.py
Wraps the given commands in a macOS osascript command to prompt for root permissions.
"""
from shlex import quote
def quote_shell(args):
return " ".join(quote(arg) for arg in args)
def quote_applescript(string):
charmap = {
"\n": "\\n",
"\r": "\\r",
"\t": "\\t",
"\"": "\\\"",
"\\": "\\\\",
}
return '"%s"' % "".join(charmap.get(char, char) for char in string)
commands = [
"osascript",
"-e",
"do shell script %s "
"with administrator privileges "
"without altering line endings"
% quote_applescript(quote_shell(commands))
]
return commands
@staticmethod
def _run_win_regedit_as_admin(reg_exe_command: str) -> None:
"""
Runs reg.exe reg_exe_command and requests admin privileges through UAC prompt.
"""
# has to use ShellExecuteEx as ShellExecuteW (the simpler api) doesn't allow to wait
# for the result of the process (returns no process handle)
from ctypes import byref, sizeof, windll, Structure, c_ulong
from ctypes.wintypes import HANDLE, DWORD, HWND, HINSTANCE, HKEY, LPCWSTR
# https://learn.microsoft.com/en-us/windows/win32/api/shellapi/ns-shellapi-shellexecuteinfoa
class SHELLEXECUTEINFO(Structure):
_fields_ = [
('cbSize', DWORD),
('fMask', c_ulong),
('hwnd', HWND),
('lpVerb', LPCWSTR),
('lpFile', LPCWSTR),
('lpParameters', LPCWSTR),
('lpDirectory', LPCWSTR),
('nShow', c_ulong),
('hInstApp', HINSTANCE),
('lpIDList', c_ulong),
('lpClass', LPCWSTR),
('hkeyClass', HKEY),
('dwHotKey', DWORD),
('hIcon', HANDLE),
('hProcess', HANDLE)
]
info = SHELLEXECUTEINFO()
info.cbSize = sizeof(SHELLEXECUTEINFO)
info.fMask = 0x00000040 # SEE_MASK_NOCLOSEPROCESS (so we can check the result of the process)
info.hwnd = None
info.lpVerb = 'runas' # run as administrator
info.lpFile = 'reg.exe' # the executable to run
info.lpParameters = reg_exe_command # the registry edit command
info.lpDirectory = None
info.nShow = 1
# Execute and wait
if not windll.shell32.ShellExecuteExW(byref(info)):
error = windll.kernel32.GetLastError()
raise Exception(f'Error executing registry command: {error}')
# block until the process is done or 5 sec timeout
windll.kernel32.WaitForSingleObject(info.hProcess, 0x1338)
# Close handle
windll.kernel32.CloseHandle(info.hProcess)
@staticmethod
def _execute_commands_in_subprocess(commands: List[str]) -> None:
"""
Executes the given commands in a subprocess and asserts that it was successful.
"""
import subprocess
process = subprocess.Popen(
commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise Exception(f'error executing command ({process.returncode}): {stderr}')
def _write_key_to_root_file_linux(self, key_hex: str) -> None:
"""
Spawns a pkexec subprocess to write the key to a file with root permissions.
This will open an OS dialog asking for the root password. Can only succeed if
the system has polkit installed.
"""
assert os.path.exists("/etc"), "System does not have /etc directory"
sh_command: str = self._posix_plugin_key_creation_command(key_hex)
commands = ['pkexec', 'sh', '-c', sh_command]
self._execute_commands_in_subprocess(commands)
# check if the key was written correctly
with open(self.keyfile_posix, 'r') as f:
assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
def _delete_linux_plugin_keyfile(self) -> None:
"""
Deletes the root owned key file at self.keyfile_posix.
"""
if not os.path.exists(self.keyfile_posix):
self.logger.debug(f'file {self.keyfile_posix} does not exist')
return
if not self._has_root_permissions(self.keyfile_posix):
os.unlink(self.keyfile_posix)
return
# use pkexec to delete the file as root user
commands = ['pkexec', 'rm', self.keyfile_posix]
self._execute_commands_in_subprocess(commands)
assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
def _write_key_to_root_file_macos(self, key_hex: str) -> None:
assert os.path.exists("/etc"), "System does not have /etc directory"
sh_command: str = self._posix_plugin_key_creation_command(key_hex)
macos_commands = self._get_macos_osascript_command(["sh", "-c", sh_command])
self._execute_commands_in_subprocess(macos_commands)
with open(self.keyfile_posix, 'r') as f:
assert f.read() == key_hex, f'file content mismatch: {f.read()} != {key_hex}'
self.logger.debug(f'file saved successfully to {self.keyfile_posix}')
def _delete_macos_plugin_keyfile(self) -> None:
if not os.path.exists(self.keyfile_posix):
self.logger.debug(f'file {self.keyfile_posix} does not exist')
return
if not self._has_root_permissions(self.keyfile_posix):
os.unlink(self.keyfile_posix)
return
# use osascript to delete the file as root user
macos_commands = self._get_macos_osascript_command(["rm", self.keyfile_posix])
self._execute_commands_in_subprocess(macos_commands)
assert not os.path.exists(self.keyfile_posix), f'file {self.keyfile_posix} still exists'
def _write_key_to_regedit_windows(self, key_hex: str) -> None:
"""
Writes the key to the Windows registry with windows UAC prompt.
"""
from winreg import ConnectRegistry, OpenKey, QueryValue, HKEY_LOCAL_MACHINE
value_type = 'REG_SZ'
command = f'add "{self.keyfile_windows}" /ve /t {value_type} /d "{key_hex}" /f'
self._run_win_regedit_as_admin(command)
# check if the key was written correctly
with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
with OpenKey(hkey, r'SOFTWARE\Electrum') as key:
assert key_hex == QueryValue(key, 'PluginsKey'), "incorrect registry key value"
self.logger.debug(f'key saved successfully to {self.keyfile_windows}')
def _delete_plugin_key_from_windows_registry(self) -> None:
"""
Deletes the PluginsKey dir in the Windows registry.
"""
from winreg import ConnectRegistry, OpenKey, HKEY_LOCAL_MACHINE
command = f'delete "{self.keyfile_windows}" /f'
self._run_win_regedit_as_admin(command)
try:
# do a sanity check to see if the key has been deleted
with ConnectRegistry(None, HKEY_LOCAL_MACHINE) as hkey:
with OpenKey(hkey, r'SOFTWARE\Electrum\PluginsKey'):
raise Exception(f'Key {self.keyfile_windows} still exists, deletion failed')
except FileNotFoundError:
pass
def create_new_key(self, password:str) -> str: def create_new_key(self, password:str) -> str:
salt = os.urandom(32) salt = os.urandom(32)
privkey = self.derive_privkey(password, salt) privkey = self.derive_privkey(password, salt)
@@ -224,14 +463,18 @@ class Plugins(DaemonThread):
return None, None return None, None
else: else:
# treat unknown platforms as linux-like # treat unknown platforms as linux-like
if not os.path.exists(self.keyfile_linux): if not os.path.exists(self.keyfile_posix):
return None, None return None, None
if not self._has_root_permissions(self.keyfile_linux): if not self._has_root_permissions(self.keyfile_posix):
return return
with open(self.keyfile_linux) as f: with open(self.keyfile_posix) as f:
key_hex = f.read() key_hex = f.read()
key = bytes.fromhex(key_hex) try:
version = key[0] key = bytes.fromhex(key_hex)
version = key[0]
except Exception:
self.logger.exception(f'{key_hex=} invalid')
return None, None
if version != PLUGIN_PASSWORD_VERSION: if version != PLUGIN_PASSWORD_VERSION:
self.logger.info(f'unknown plugin password version: {version}') self.logger.info(f'unknown plugin password version: {version}')
return None, None return None, None
@@ -409,7 +652,8 @@ class Plugins(DaemonThread):
self.authorize_plugin(name, path, privkey) self.authorize_plugin(name, path, privkey)
def uninstall(self, name: str): def uninstall(self, name: str):
self.config.set_key(f'plugins.{name}', None) if self.config.get(f'plugins.{name}'):
self.config.set_key(f'plugins.{name}', None)
if name in self.external_plugin_metadata: if name in self.external_plugin_metadata:
zipfile = self.zip_plugin_path(name) zipfile = self.zip_plugin_path(name)
os.unlink(zipfile) os.unlink(zipfile)