1
0
Files
electrum/run_electrum
SomberNight 8a99219456 follow-up prev: electrum-ecc should search for libsecp dll in electrum/
The hack needs to be applied before we try importing electrum_ecc, i.e. it needs to be in the main script.
However, it should also be applied if the main script is not invoked directly, but e.g. the user imports electrum directly.
Hence the duplication.
2025-06-15 18:25:42 +00:00

613 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- mode: python -*-
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2011 thomasv@gitorious
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import sys
MIN_PYTHON_VERSION = "3.10.0" # FIXME duplicated from setup.py
_min_python_version_tuple = tuple(map(int, (MIN_PYTHON_VERSION.split("."))))
if sys.version_info[:3] < _min_python_version_tuple:
sys.exit("Error: Electrum requires Python version >= %s..." % MIN_PYTHON_VERSION)
import warnings
import asyncio
from typing import TYPE_CHECKING, Optional, Dict
script_dir = os.path.dirname(os.path.realpath(__file__))
is_pyinstaller = getattr(sys, 'frozen', False)
is_android = 'ANDROID_DATA' in os.environ
is_appimage = 'APPIMAGE' in os.environ
is_binary_distributable = is_pyinstaller or is_android or is_appimage
# is_local: unpacked tar.gz but not pip installed, or git clone
is_local = (not is_binary_distributable
and os.path.exists(os.path.join(script_dir, "electrum.desktop")))
is_git_clone = is_local and os.path.exists(os.path.join(script_dir, ".git"))
if is_git_clone:
# developers should probably see all deprecation warnings unless explicitly overruled
if not any(['DeprecationWarning' in x for x in sys.warnoptions]):
warnings.simplefilter('default', DeprecationWarning)
if is_local or is_android:
sys.path.insert(0, os.path.join(script_dir, 'packages'))
if is_pyinstaller:
# Keep an open file handle for the binary that started us. On Windows, this
# prevents users from moving or renaming the exe file while running (doing which
# causes ImportErrors and other runtime failures). (see #4072)
_file = open(sys.executable, 'rb')
# when running from source, on Windows, also search for DLLs in inner 'electrum' folder
if is_local and os.name == 'nt': # fixme: duplicated between main script and __init__.py :(
os.add_dll_directory(os.path.join(os.path.dirname(__file__), 'electrum'))
def check_imports():
# pure-python dependencies need to be imported here for pyinstaller
try:
import dns
import certifi
import qrcode
import google.protobuf
import aiorpcx
import aiohttp
import aiohttp_socks
import electrum_ecc
import jsonpatch
import electrum_aionostr
except ImportError as e:
sys.exit(f"Error: {str(e)}. Some dependencies are missing. Have you read the README? Or just try '$ python3 -m pip install -r contrib/requirements/requirements.txt'")
if not ((0, 25, 0) <= aiorpcx._version < (0, 26)):
raise RuntimeError(f'aiorpcX version {aiorpcx._version} does not match required: 0.25.0<=ver<0.26')
# the following imports are for pyinstaller
from google.protobuf import descriptor
from google.protobuf import message
from google.protobuf import reflection
from google.protobuf import descriptor_pb2
# make sure that certificates are here
assert os.path.exists(certifi.where())
if not is_android:
check_imports()
sys._ELECTRUM_RUNNING_VIA_RUNELECTRUM = True # used by logging.py
from electrum.logging import get_logger, configure_logging # import logging submodule first
from electrum import util
from electrum.payment_identifier import PaymentIdentifier
from electrum import SimpleConfig
from electrum.wallet_db import WalletDB
from electrum.wallet import Wallet
from electrum.storage import WalletStorage
from electrum.util import print_msg, print_stderr, json_encode, json_decode, UserCancelled
from electrum.util import InvalidPassword
from electrum.plugin import Plugins
from electrum.commands import get_parser, get_simple_parser, known_commands, Commands, config_variables
from electrum import daemon
from electrum.util import create_and_start_event_loop, UserFacingException, JsonRPCError
from electrum.i18n import set_language
if TYPE_CHECKING:
import threading
_logger = get_logger(__name__)
# get password routine
def prompt_password(prompt: str, *, confirm: bool = True) -> Optional[str]:
import getpass
password = getpass.getpass(prompt, stream=None)
if password and confirm:
password2 = getpass.getpass("Confirm: ")
if password != password2:
sys.exit("Error: Passwords do not match.")
if not password:
password = None
return password
def init_cmdline(config_options, wallet_path, *, rpcserver: bool, config: 'SimpleConfig'):
cmdname = config.get('cmd')
cmd = known_commands[cmdname]
if cmdname in ['payto', 'paytomany'] and config.get('unsigned'):
cmd.requires_password = False
if cmdname in ['payto', 'paytomany'] and config.get('broadcast'):
cmd.requires_network = True
if cmd.requires_wallet and not wallet_path:
print_msg("wallet path not provided.")
sys_exit(1)
# instantiate wallet for command-line
storage = WalletStorage(wallet_path) if wallet_path else None
if cmd.requires_wallet and not storage.file_exists():
print_msg("Error: Wallet file not found.")
print_msg("Type 'electrum create' to create a new wallet, or provide a path to a wallet with the -w option")
sys_exit(1)
# important warning
if cmd.name in ['getprivatekeys']:
print_stderr("WARNING: ALL your private keys are secret.")
print_stderr("Exposing a single private key can compromise your entire wallet!")
print_stderr("In particular, DO NOT use 'redeem private key' services proposed by third parties.")
# commands needing password
if ((cmd.requires_wallet and storage.is_encrypted() and not rpcserver)
or (cmdname == 'load_wallet' and storage.is_encrypted())
or (cmdname in ['password', 'unlock'])
or (cmd.requires_password and not rpcserver)):
if storage.is_encrypted_with_hw_device():
# this case is handled later in the control flow
password = None
elif config.get('password') is not None:
password = config.get('password')
if password == '':
password = None
else:
password = prompt_password('Password:', confirm=False)
else:
password = None
config_options['password'] = config_options.get('password') or password
if cmd.name == 'password' and 'new_password' not in config_options:
new_password = prompt_password('New password:')
config_options['new_password'] = new_password
def get_connected_hw_devices(plugins: 'Plugins'):
supported_plugins = plugins.get_hardware_support()
# scan devices
devices = []
devmgr = plugins.device_manager
for splugin in supported_plugins:
name, plugin = splugin.name, splugin.plugin
if not plugin:
e = splugin.exception
_logger.error(f"{name}: error during plugin init: {repr(e)}")
continue
try:
u = devmgr.list_pairable_device_infos(handler=None, plugin=plugin)
except Exception as e:
_logger.error(f'error getting device infos for {name}: {repr(e)}')
continue
devices += list(map(lambda x: (name, x), u))
return devices
def get_password_for_hw_device_encrypted_storage(plugins: 'Plugins') -> str:
devices = get_connected_hw_devices(plugins)
if len(devices) == 0:
print_msg("Error: No connected hw device found. Cannot decrypt this wallet.")
sys.exit(1)
elif len(devices) > 1:
print_msg("Warning: multiple hardware devices detected. "
"The first one will be used to decrypt the wallet.")
# FIXME we use the "first" device, in case of multiple ones
name, device_info = devices[0]
devmgr = plugins.device_manager
try:
client = devmgr.client_by_id(device_info.device.id_)
client.handler = client.plugin.create_handler(None)
return client.get_password_for_storage_encryption()
except UserCancelled:
sys.exit(0)
async def run_offline_command(config, config_options, wallet_path, plugins: 'Plugins'):
cmdname = config.get('cmd')
cmd = known_commands[cmdname]
password = config_options.get('password')
if 'wallet_path' in cmd.options and config_options.get('wallet_path') is None:
config_options['wallet_path'] = wallet_path
if cmd.requires_wallet:
storage = WalletStorage(wallet_path)
if storage.is_encrypted():
if storage.is_encrypted_with_hw_device():
password = get_password_for_hw_device_encrypted_storage(plugins)
config_options['password'] = password
storage.decrypt(password)
db = WalletDB(storage.read(), storage=storage, upgrade=True)
wallet = Wallet(db, config=config)
config_options['wallet'] = wallet
else:
wallet = None
# check password
if cmd.requires_password and wallet.has_password():
try:
wallet.check_password(password)
except InvalidPassword:
print_msg("Error: This password does not decode this wallet.")
sys.exit(1)
if cmd.requires_network:
print_msg("Warning: running command offline")
# arguments passed to function
args = [config.get(x) for x in cmd.params]
# decode json arguments
if cmdname not in ('setconfig',):
args = list(map(json_decode, args))
# options
kwargs = {}
for x in cmd.options:
kwargs[x] = (config_options.get(x) if x in ['wallet_path', 'wallet', 'password', 'new_password'] else config.get(x))
cmd_runner = Commands(config=config)
func = getattr(cmd_runner, cmd.name)
result = await func(*args, **kwargs)
# save wallet
if wallet:
wallet.save_db()
return result
loop = None # type: Optional[asyncio.AbstractEventLoop]
stop_loop = None # type: Optional[asyncio.Future]
loop_thread = None # type: Optional[threading.Thread]
def sys_exit(i):
# stop event loop and exit
if loop:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
sys.exit(i)
def read_config(config_options: dict) -> SimpleConfig:
"""
Reads the config file and returns SimpleConfig, on failure it will potentially
show a GUI error dialog if a gui is available, and then re-raise the exception.
"""
try:
return SimpleConfig(config_options)
except Exception as config_error:
# parse full cmd to find out which UI is being used
full_config_options = parse_command_line(simple_parser=False)
if full_config_options.get("cmd") == 'gui':
gui_name = full_config_options.get(SimpleConfig.GUI_NAME.key(), 'qt')
try:
gui = __import__(f'electrum.gui.{gui_name}', fromlist=['electrum'])
gui.standalone_exception_dialog(config_error) # type: ignore
except Exception as e:
print_stderr(f"Error showing standalone gui dialog: {e}")
raise
def parse_command_line(simple_parser=False) -> Dict:
# parse command line from sys.argv
if simple_parser:
parser = get_simple_parser()
options, args = parser.parse_args()
config_options = options.__dict__
config_options['cmd'] = 'gui'
else:
parser = get_parser()
args = parser.parse_args()
config_options = args.__dict__
f = lambda key: config_options[key] is not None and key not in config_variables.get(args.cmd, {}).keys()
config_options = {key: config_options[key] for key in filter(f, config_options.keys())}
if config_options.get(SimpleConfig.NETWORK_SERVER.key()):
config_options[SimpleConfig.NETWORK_AUTO_CONNECT.key()] = False
config_options['cwd'] = cwd = os.getcwd()
# fixme: this can probably be achieved with a runtime hook (pyinstaller)
if is_pyinstaller and os.path.exists(os.path.join(sys._MEIPASS, 'is_portable')):
config_options['portable'] = True
if config_options.get('portable'):
if is_local:
# running from git clone or local source: put datadir next to main script
datadir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'electrum_data')
else:
# Running a binary or installed source. The most generic but still reasonable thing
# is to use the current working directory. (see #7732)
# note: The main script is often unpacked to a temporary directory from a bundled executable,
# and we don't want to put the datadir inside a temp dir.
# note: Re the portable .exe on Windows, when the user double-clicks it, CWD gets set
# to the parent dir, i.e. we will put the datadir next to the exe.
datadir = os.path.join(os.path.realpath(cwd), 'electrum_data')
config_options['electrum_path'] = datadir
if not config_options.get('verbosity'):
warnings.simplefilter('ignore', DeprecationWarning)
return config_options
def main():
global loop, stop_loop, loop_thread
# The hook will only be used in the Qt GUI right now
util.setup_thread_excepthook()
# on macOS, delete Process Serial Number arg generated for apps launched in Finder
sys.argv = list(filter(lambda x: not x.startswith('-psn'), sys.argv))
# old 'help' syntax
if len(sys.argv) > 1 and sys.argv[1] == 'help':
sys.argv.remove('help')
sys.argv.append('-h')
# old '-v' syntax
# Due to this workaround that keeps old -v working,
# more advanced usages of -v need to use '-v='.
# e.g. -v=debug,network=warning,interface=error
try:
i = sys.argv.index('-v')
except ValueError:
pass
else:
sys.argv[i] = '-v*'
# read arguments from stdin pipe and prompt
for i, arg in enumerate(sys.argv):
if arg == '-':
if not sys.stdin.isatty():
sys.argv[i] = sys.stdin.read()
break
else:
raise Exception('Cannot get argument from stdin')
elif arg == '?':
sys.argv[i] = input("Enter argument:")
elif arg == ':':
sys.argv[i] = prompt_password('Enter argument (will not echo):', confirm=False)
# config is an object passed to the various constructors (wallet, interface, gui)
if is_android:
import importlib.util
config_options = {
'verbosity': '*' if util.is_android_debug_apk() else '',
'cmd': 'gui',
SimpleConfig.GUI_NAME.key(): 'qml',
SimpleConfig.WALLET_USE_SINGLE_PASSWORD.key(): True,
}
if util.get_android_package_name() == "org.electrum.testnet.electrum":
# ~hack for easier testnet builds. pkgname subject to change.
config_options['testnet'] = True
elif util.get_android_package_name() == "org.electrum.regtest.electrum":
# ~hack for easier regtest builds. pkgname subject to change.
config_options['regtest'] = True
else:
# save sys args for next parser
saved_sys_argv = sys.argv[:]
# disable help, the next parser will display it
for x in sys.argv:
if x in ['-h', '--help']:
sys.argv.remove(x)
# parse first without plugins
config_options = parse_command_line(simple_parser=True)
tmp_config = read_config(config_options)
# load (only) the commands modules of plugins so their commands are registered
_plugin_commands = Plugins(tmp_config, cmd_only=True)
# re-parse command line
sys.argv = saved_sys_argv[:]
config_options = parse_command_line()
config = read_config(config_options)
cmdname = config.get('cmd')
# set language as early as possible
# Note: we are already too late for strings that are declared in the global scope
# of an already imported module. However, the GUI and the plugins at least have
# not been imported yet. (see #4621)
# Note: it is ok to call set_language() again later, but note that any call only applies
# to not-yet-evaluated strings.
# Note: the CLI is intentionally always non-localized.
# Note: Some unit tests might rely on the default non-localized strings.
if cmdname == 'gui':
gui_name = config.GUI_NAME
lang = config.LOCALIZATION_LANGUAGE
if not lang:
try:
from electrum.gui.default_lang import get_default_language
lang = get_default_language(gui_name=gui_name)
_logger.info(f"get_default_language: detected default as {lang=!r}")
except ImportError as e:
_logger.info(f"get_default_language: failed. got exc={e!r}")
set_language(lang)
chain = config.get_selected_chain()
chain.set_as_network()
# check if we received a valid payment identifier
uri = config_options.get('url')
if uri and not PaymentIdentifier(None, uri).is_valid():
print_stderr('unknown command:', uri)
sys.exit(1)
if sys.platform == "linux" and not is_android:
import electrum.harden_memory_linux
electrum.harden_memory_linux.set_dumpable_safe(False)
if cmdname == 'daemon' and config.get("detach"):
# detect lockfile.
# This is not as good as get_file_descriptor, but that would require the asyncio loop
lockfile = daemon.get_lockfile(config)
if os.path.exists(lockfile):
print_stderr("Daemon already running (lockfile detected).")
print_stderr("Run 'electrum stop' to stop the daemon.")
sys.exit(1)
# Initialise rpc credentials to random if not set yet. This would normally be done
# later anyway, but we need to avoid the two sides of the fork setting conflicting random creds.
daemon.get_rpc_credentials(config) # inits creds as side-effect
# fork before creating the asyncio event loop
try:
pid = os.fork()
except AttributeError as e:
print_stderr(f"Error: {e!r}")
print_stderr("Running daemon in detached mode (-d) is not supported on this platform.")
print_stderr("Try running the daemon in the foreground (without -d).")
sys.exit(1)
if pid:
print_stderr("starting daemon (PID %d)" % pid)
loop, stop_loop, loop_thread = create_and_start_event_loop()
ready = daemon.wait_until_daemon_becomes_ready(config=config, timeout=5)
if ready:
sys_exit(0)
else:
print_stderr("timed out waiting for daemon to get ready")
sys_exit(1)
else:
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'w')
se = open(os.devnull, 'w')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
loop, stop_loop, loop_thread = create_and_start_event_loop()
try:
handle_cmd(
cmdname=cmdname,
config=config,
config_options=config_options,
)
except Exception:
_logger.exception("")
sys_exit(1)
def handle_cmd(*, cmdname: str, config: 'SimpleConfig', config_options: dict):
if cmdname == 'gui':
configure_logging(config)
fd = daemon.get_file_descriptor(config)
if fd is not None:
d = daemon.Daemon(config, fd, start_network=False)
try:
d.run_gui()
except BaseException as e:
_logger.exception('daemon.run_gui errored')
sys_exit(1)
else:
sys_exit(0)
else:
try:
result = daemon.request(config, 'gui', (config_options,))
except JsonRPCError as e:
if e.code == JsonRPCError.Codes.USERFACING:
print_stderr(e.message)
elif e.code == JsonRPCError.Codes.INTERNAL:
print_stderr("(inside daemon): " + e.data["traceback"])
print_stderr(e.message)
else:
raise Exception(f"unknown error code {e.code}")
sys_exit(1)
elif cmdname == 'daemon':
configure_logging(config)
fd = daemon.get_file_descriptor(config)
if fd is not None:
# run daemon
d = daemon.Daemon(config, fd)
d.run_daemon()
sys_exit(0)
else:
# FIXME this message is lost in detached mode (parent process already exited after forking)
print_msg("Daemon already running")
sys_exit(1)
else:
# command line
configure_logging(config, log_to_file=False) # don't spam logfiles for each client-side RPC, but support "-v"
cmd = known_commands[cmdname]
wallet_path = config.get_wallet_path()
if cmd.requires_wallet and not wallet_path:
print_stderr('wallet path not provided')
sys_exit(1)
if not config.NETWORK_OFFLINE:
init_cmdline(config_options, wallet_path, rpcserver=True, config=config)
timeout = config.CLI_TIMEOUT
try:
result = daemon.request(config, 'run_cmdline', (config_options,), timeout)
except daemon.DaemonNotRunning:
print_msg("Daemon not running; try 'electrum daemon -d'")
if not cmd.requires_network:
print_msg("To run this command without a daemon, use --offline")
if cmd.name == "stop": # remove lockfile if it exists, as daemon looks dead
lockfile = daemon.get_lockfile(config)
if os.path.exists(lockfile):
print_msg("Found lingering lockfile for daemon. Removing.")
daemon.remove_lockfile(lockfile)
sys_exit(1)
except JsonRPCError as e:
if e.code == JsonRPCError.Codes.USERFACING:
print_stderr(e.message)
elif e.code == JsonRPCError.Codes.INTERNAL:
print_stderr("(inside daemon): " + e.data["traceback"])
print_stderr(e.message)
else:
raise Exception(f"unknown error code {e.code}")
sys_exit(1)
except Exception as e:
_logger.exception("error running command (with daemon)")
sys_exit(1)
else:
if cmd.requires_network:
print_msg("This command cannot be run offline")
sys_exit(1)
lockfile = daemon.get_lockfile(config)
if os.path.exists(lockfile):
print_stderr("Daemon already running (lockfile detected)")
print_stderr("Run 'electrum stop' to stop the daemon.")
print_stderr("Run this command without --offline to interact with the daemon")
sys_exit(1)
init_cmdline(config_options, wallet_path, rpcserver=False, config=config)
plugins = Plugins(config, 'cmdline')
coro = run_offline_command(config, config_options, wallet_path, plugins)
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
try:
result = fut.result()
finally:
plugins.stop()
plugins.stopped_event.wait(1)
except UserFacingException as e:
print_stderr(str(e))
sys_exit(1)
except Exception as e:
_logger.exception("error running command (without daemon)")
sys_exit(1)
# print result
if isinstance(result, str):
print_msg(result)
elif result is not None:
print_msg(json_encode(result))
sys_exit(0)
if __name__ == '__main__':
main()