add encryption version to channel backups
This commit is contained in:
@@ -189,23 +189,21 @@ def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
|
||||
raise UnexpectedPasswordHashVersion(version)
|
||||
|
||||
|
||||
def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
|
||||
"""plaintext bytes -> base64 ciphertext"""
|
||||
def pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> str:
|
||||
"""bytes -> bytes"""
|
||||
if version not in KNOWN_PW_HASH_VERSIONS:
|
||||
raise UnexpectedPasswordHashVersion(version)
|
||||
# derive key from password
|
||||
secret = _hash_password(password, version=version)
|
||||
# encrypt given data
|
||||
ciphertext = EncodeAES_bytes(secret, data)
|
||||
ciphertext_b64 = base64.b64encode(ciphertext)
|
||||
return ciphertext_b64.decode('utf8')
|
||||
return ciphertext
|
||||
|
||||
|
||||
def pw_decode_bytes(data: str, password: Union[bytes, str], *, version: int) -> bytes:
|
||||
"""base64 ciphertext -> plaintext bytes"""
|
||||
def pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
|
||||
"""bytes -> bytes"""
|
||||
if version not in KNOWN_PW_HASH_VERSIONS:
|
||||
raise UnexpectedPasswordHashVersion(version)
|
||||
data_bytes = bytes(base64.b64decode(data))
|
||||
# derive key from password
|
||||
secret = _hash_password(password, version=version)
|
||||
# decrypt given data
|
||||
@@ -216,6 +214,38 @@ def pw_decode_bytes(data: str, password: Union[bytes, str], *, version: int) ->
|
||||
return d
|
||||
|
||||
|
||||
def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
|
||||
"""plaintext bytes -> base64 ciphertext"""
|
||||
ciphertext = pw_encode_raw(data, password, version=version)
|
||||
ciphertext_b64 = base64.b64encode(ciphertext)
|
||||
return ciphertext_b64.decode('utf8')
|
||||
|
||||
|
||||
def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
|
||||
"""base64 ciphertext -> plaintext bytes"""
|
||||
if version not in KNOWN_PW_HASH_VERSIONS:
|
||||
raise UnexpectedPasswordHashVersion(version)
|
||||
data_bytes = bytes(base64.b64decode(data))
|
||||
return pw_decode_raw(data_bytes, password, version=version)
|
||||
|
||||
|
||||
def pw_encode_b64_with_version(data: bytes, password: Union[bytes, str]) -> str:
|
||||
"""plaintext bytes -> base64 ciphertext"""
|
||||
version = PW_HASH_VERSION_LATEST
|
||||
ciphertext = pw_encode_raw(data, password, version=version)
|
||||
ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext)
|
||||
return ciphertext_b64.decode('utf8')
|
||||
|
||||
|
||||
def pw_decode_b64_with_version(data: str, password: Union[bytes, str]) -> bytes:
|
||||
"""base64 ciphertext -> plaintext bytes"""
|
||||
data_bytes = bytes(base64.b64decode(data))
|
||||
version = int(data_bytes[0])
|
||||
if version not in KNOWN_PW_HASH_VERSIONS:
|
||||
raise UnexpectedPasswordHashVersion(version)
|
||||
return pw_decode_raw(data_bytes[1:], password, version=version)
|
||||
|
||||
|
||||
def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
|
||||
"""plaintext str -> base64 ciphertext"""
|
||||
if not password:
|
||||
|
||||
Reference in New Issue
Block a user