1
0

imports, whitespace

This commit is contained in:
Sander van Grieken
2025-02-10 14:22:50 +01:00
parent 7e5ad079b2
commit 36efae3875
4 changed files with 44 additions and 19 deletions

View File

@@ -23,18 +23,16 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import hashlib
from typing import List, Tuple, TYPE_CHECKING, Optional, Union, Sequence, Any
from typing import Tuple, TYPE_CHECKING, Optional, Union, Sequence
import enum
from enum import IntEnum, Enum
import electrum_ecc as ecc
from .util import bfh, BitcoinException, assert_bytes, to_bytes, inv_dict, is_hex_str, classproperty
from . import version
from . import segwit_addr
from . import constants
from .crypto import sha256d, sha256, hash_160, hmac_oneshot
from .crypto import sha256d, sha256, hash_160
if TYPE_CHECKING:
from .network import Network
@@ -375,29 +373,36 @@ def hash160_to_p2pkh(h160: bytes, *, net=None) -> str:
if net is None: net = constants.net
return hash160_to_b58_address(h160, net.ADDRTYPE_P2PKH)
def hash160_to_p2sh(h160: bytes, *, net=None) -> str:
if net is None: net = constants.net
return hash160_to_b58_address(h160, net.ADDRTYPE_P2SH)
def public_key_to_p2pkh(public_key: bytes, *, net=None) -> str:
return hash160_to_p2pkh(hash_160(public_key), net=net)
def hash_to_segwit_addr(h: bytes, witver: int, *, net=None) -> str:
if net is None: net = constants.net
addr = segwit_addr.encode_segwit_address(net.SEGWIT_HRP, witver, h)
assert addr is not None
return addr
def public_key_to_p2wpkh(public_key: bytes, *, net=None) -> str:
return hash_to_segwit_addr(hash_160(public_key), witver=0, net=net)
def script_to_p2wsh(script: bytes, *, net=None) -> str:
return hash_to_segwit_addr(sha256(script), witver=0, net=net)
def p2wsh_nested_script(witness_script: bytes) -> bytes:
wsh = sha256(witness_script)
return construct_script([0, wsh])
def pubkey_to_address(txin_type: str, pubkey: str, *, net=None) -> str:
from . import descriptor
desc = descriptor.get_singlesig_descriptor_from_legacy_leaf(pubkey=pubkey, script_type=txin_type)
@@ -593,12 +598,12 @@ def DecodeBase58Check(psz: Union[bytes, str]) -> bytes:
# extended WIF for segwit (used in 3.0.x; but still used internally)
# the keys in this dict should be a superset of what Imported Wallets can import
WIF_SCRIPT_TYPES = {
'p2pkh':0,
'p2wpkh':1,
'p2wpkh-p2sh':2,
'p2sh':5,
'p2wsh':6,
'p2wsh-p2sh':7
'p2pkh': 0,
'p2wpkh': 1,
'p2wpkh-p2sh': 2,
'p2sh': 5,
'p2wsh': 6,
'p2wsh-p2sh': 7
}
WIF_SCRIPT_TYPES_INV = inv_dict(WIF_SCRIPT_TYPES)
@@ -679,6 +684,7 @@ def address_from_private_key(sec: str) -> str:
public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
return pubkey_to_address(txin_type, public_key)
def is_segwit_address(addr: str, *, net=None) -> bool:
if net is None: net = constants.net
try:
@@ -687,6 +693,7 @@ def is_segwit_address(addr: str, *, net=None) -> bool:
return False
return witprog is not None
def is_taproot_address(addr: str, *, net=None) -> bool:
if net is None: net = constants.net
try:
@@ -695,6 +702,7 @@ def is_taproot_address(addr: str, *, net=None) -> bool:
return False
return witver == 1
def is_b58_address(addr: str, *, net=None) -> bool:
if net is None: net = constants.net
try:
@@ -706,6 +714,7 @@ def is_b58_address(addr: str, *, net=None) -> bool:
return False
return True
def is_address(addr: str, *, net=None) -> bool:
return is_segwit_address(addr, net=net) \
or is_b58_address(addr, net=net)
@@ -733,6 +742,7 @@ def is_minikey(text: str) -> bool:
and all(ord(c) in __b58chars for c in text)
and sha256(text + '?')[0] == 0x00)
def minikey_to_private_key(text: str) -> bytes:
return sha256(text)
@@ -740,7 +750,10 @@ def minikey_to_private_key(text: str) -> bytes:
def _get_dummy_address(purpose: str) -> str:
return redeem_script_to_address('p2wsh', sha256(bytes(purpose, "utf8")))
_dummy_addr_funcs = set()
class DummyAddress:
"""dummy address for fee estimation of funding tx
Use e.g. as: DummyAddress.CHANNEL
@@ -799,22 +812,24 @@ def taproot_tweak_seckey(seckey0: bytes, h: bytes) -> bytes:
TapTreeLeaf = Tuple[int, bytes]
TapTree = Union[TapTreeLeaf, Sequence['TapTree']]
# FIXME just use electrum_ecc.util.bip340_tagged_hash instead
def bip340_tagged_hash(tag: bytes, msg: bytes) -> bytes:
# note: _libsecp256k1.secp256k1_tagged_sha256 benchmarks about 70% slower than this (on my machine)
return sha256(sha256(tag) + sha256(tag) + msg)
def taproot_tree_helper(script_tree: TapTree):
if isinstance(script_tree, tuple):
leaf_version, script = script_tree
h = bip340_tagged_hash(b"TapLeaf", bytes([leaf_version]) + witness_push(script))
return ([((leaf_version, script), bytes())], h)
return [((leaf_version, script), bytes())], h
left, left_h = taproot_tree_helper(script_tree[0])
right, right_h = taproot_tree_helper(script_tree[1])
ret = [(l, c + right_h) for l, c in left] + [(l, c + left_h) for l, c in right]
if right_h < left_h:
left_h, right_h = right_h, left_h
return (ret, bip340_tagged_hash(b"TapBranch", left_h + right_h))
return ret, bip340_tagged_hash(b"TapBranch", left_h + right_h)
def taproot_output_script(internal_pubkey: bytes, *, script_tree: Optional[TapTree]) -> bytes:
@@ -850,11 +865,13 @@ def usermessage_magic(message: bytes) -> bytes:
length = var_int(len(message))
return b"\x18Bitcoin Signed Message:\n" + length + message
def ecdsa_sign_usermessage(ec_privkey, message: Union[bytes, str], *, is_compressed: bool) -> bytes:
message = to_bytes(message, 'utf8')
msg32 = sha256d(usermessage_magic(message))
return ec_privkey.ecdsa_sign_recoverable(msg32, is_compressed=is_compressed)
def verify_usermessage_with_address(address: str, sig65: bytes, message: bytes, *, net=None) -> bool:
from electrum_ecc import ECPubkey
assert_bytes(sig65, message)

View File

@@ -46,9 +46,11 @@ MAX_TARGET = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
class MissingHeader(Exception):
pass
class InvalidHeader(Exception):
pass
def serialize_header(header_dict: dict) -> bytes:
s = (
int.to_bytes(header_dict['version'], length=4, byteorder="little", signed=False)
@@ -59,6 +61,7 @@ def serialize_header(header_dict: dict) -> bytes:
+ int.to_bytes(int(header_dict['nonce']), length=4, byteorder="little", signed=False))
return s
def deserialize_header(s: bytes, height: int) -> dict:
if not s:
raise InvalidHeader('Invalid header: {}'.format(s))
@@ -74,6 +77,7 @@ def deserialize_header(s: bytes, height: int) -> dict:
h['block_height'] = height
return h
def hash_header(header: dict) -> str:
if header is None:
return '0' * 64
@@ -161,6 +165,7 @@ def read_blockchains(config: 'SimpleConfig'):
def get_best_chain() -> 'Blockchain':
return blockchains[constants.net.GENESIS]
# block hash -> chain work; up to and including that block
_CHAINWORK_CACHE = {
"0000000000000000000000000000000000000000000000000000000000000000": 0, # virtual block at height -1

View File

@@ -37,16 +37,13 @@ from enum import IntEnum
import functools
from aiorpcx import NetAddress
import electrum_ecc as ecc
from electrum_ecc import ECPubkey
from .sql_db import SqlDB, sql
from . import constants, util
from .util import profiler, get_headers_dir, is_ip_address, json_normalize, UserFacingException, is_private_netaddress
from .logging import Logger
from .lntransport import LNPeerAddr
from .lnutil import (format_short_channel_id, ShortChannelID,
validate_features, IncompatibleOrInsaneFeatures, InvalidGossipMsg)
from .lnutil import ShortChannelID, validate_features, IncompatibleOrInsaneFeatures, InvalidGossipMsg
from .lnverifier import LNChannelVerifier, verify_sig_for_channel_update
from .lnmsg import decode_msg
from .crypto import sha256d
@@ -238,6 +235,7 @@ class NodeInfo(NamedTuple):
@staticmethod
def parse_addresses_field(addresses_field):
buf = addresses_field
def read(n):
nonlocal buf
data, buf = buf[0:n], buf[n:]
@@ -286,6 +284,7 @@ class UpdateStatus(IntEnum):
UNCHANGED = 3
GOOD = 4
class CategorizedChannelUpdates(NamedTuple):
orphaned: List # no channel announcement for channel update
expired: List # update older than two weeks
@@ -303,6 +302,7 @@ def get_mychannel_info(short_channel_id: ShortChannelID,
ci = ChannelInfo.from_raw_msg(raw_msg)
return ci._replace(capacity_sat=chan.constraints.capacity)
def get_mychannel_policy(short_channel_id: bytes, node_id: bytes,
my_channels: Dict[ShortChannelID, 'Channel']) -> Optional[Policy]:
chan = my_channels.get(short_channel_id) # type: Optional[Channel]
@@ -583,7 +583,6 @@ class ChannelDB(SqlDB):
unchanged=unchanged,
good=good)
def create_database(self):
c = self.conn.cursor()
c.execute(create_node_info)
@@ -802,6 +801,7 @@ class ChannelDB(SqlDB):
def load_data(self):
if self.data_loaded.is_set():
return
# Note: this method takes several seconds... mostly due to lnmsg.decode_msg being slow.
def maybe_abort():
if self.stopping:
@@ -817,6 +817,7 @@ class ChannelDB(SqlDB):
except Exception:
continue
self._addresses[node_id][net_addr] = int(timestamp or 0)
def newest_ts_for_node_id(node_id):
newest_ts = 0
for addr, ts in self._addresses[node_id].items():

View File

@@ -24,11 +24,11 @@
# SOFTWARE.
from collections import defaultdict
from math import floor, log10
from typing import NamedTuple, List, Callable, Sequence, Union, Dict, Tuple, Mapping, Type, TYPE_CHECKING
from typing import NamedTuple, List, Callable, Sequence, Dict, Tuple, Mapping, Type, TYPE_CHECKING
from decimal import Decimal
from .bitcoin import sha256, COIN, is_address
from .transaction import Transaction, TxOutput, PartialTransaction, PartialTxInput, PartialTxOutput
from .transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput
from .util import NotEnoughFunds
from .logging import Logger
@@ -503,12 +503,14 @@ COIN_CHOOSERS = {
'Privacy': CoinChooserPrivacy,
} # type: Mapping[str, Type[CoinChooserBase]]
def get_name(config: 'SimpleConfig') -> str:
kind = config.WALLET_COIN_CHOOSER_POLICY
if kind not in COIN_CHOOSERS:
kind = config.cv.WALLET_COIN_CHOOSER_POLICY.get_default_value()
return kind
def get_coin_chooser(config: 'SimpleConfig') -> CoinChooserBase:
klass = COIN_CHOOSERS[get_name(config)]
# note: we enable enable_output_value_rounding by default as