Files
bitwarden-desktop-agent/native_messaging.py

136 lines
4.8 KiB
Python
Raw Normal View History

2026-03-18 17:47:10 +09:00
import base64
import json
import time
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
import log
2026-03-19 12:42:29 +09:00
from askpass import Prompter
2026-03-18 17:47:10 +09:00
from crypto import SymmetricKey, enc_string_encrypt, enc_string_decrypt, enc_string_to_dict, dict_to_enc_string
2026-03-19 12:42:29 +09:00
from secmem import wipe
2026-03-18 17:47:10 +09:00
from storage import KeyStore
class BiometricBridge:
2026-03-19 12:42:29 +09:00
def __init__(self, store: KeyStore, user_id: str, prompter: Prompter):
2026-03-18 17:47:10 +09:00
self._store = store
self._uid = user_id
2026-03-19 12:42:29 +09:00
self._prompt = prompter
2026-03-18 17:47:10 +09:00
self._sessions: dict[str, SymmetricKey] = {}
def __call__(self, msg: dict) -> dict | None:
app_id = msg.get("appId", "")
message = msg.get("message")
if message is None:
return None
if isinstance(message, dict) and message.get("command") == "setupEncryption":
return self._handshake(app_id, message)
if isinstance(message, dict) and ("encryptedString" in message or "encryptionType" in message):
return self._encrypted(app_id, message)
if isinstance(message, str) and message.startswith("2."):
return self._encrypted(app_id, {"encryptedString": message})
return None
def _handshake(self, app_id: str, msg: dict) -> dict:
pub_bytes = base64.b64decode(msg.get("publicKey", ""))
pub_key = serialization.load_der_public_key(pub_bytes)
shared = SymmetricKey.generate()
self._sessions[app_id] = shared
encrypted = pub_key.encrypt(
2026-03-19 12:42:29 +09:00
bytes(shared.raw),
2026-03-18 17:47:10 +09:00
asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(), label=None,
),
)
log.info(f"handshake complete, app={app_id[:12]}")
return {
"appId": app_id,
"command": "setupEncryption",
"messageId": -1,
"sharedSecret": base64.b64encode(encrypted).decode(),
}
def _encrypted(self, app_id: str, enc_msg: dict) -> dict | None:
if app_id not in self._sessions:
log.warn(f"no session for app={app_id[:12]}")
return {"appId": app_id, "command": "invalidateEncryption"}
key = self._sessions[app_id]
try:
plaintext = enc_string_decrypt(dict_to_enc_string(enc_msg), key)
except Exception:
log.error("message decryption failed")
return {"appId": app_id, "command": "invalidateEncryption"}
data = json.loads(plaintext)
cmd = data.get("command", "")
mid = data.get("messageId", 0)
log.info(f"<- {cmd} (msg={mid})")
resp = self._dispatch(cmd, mid)
if resp is None:
log.warn(f"unhandled command: {cmd}")
return None
encrypted = enc_string_encrypt(json.dumps(resp), key)
return {"appId": app_id, "messageId": mid, "message": enc_string_to_dict(encrypted)}
2026-03-19 12:42:29 +09:00
def _reply(self, cmd: str, mid: int, **kwargs) -> dict:
return {"command": cmd, "messageId": mid, "timestamp": int(time.time() * 1000), **kwargs}
2026-03-18 17:47:10 +09:00
2026-03-19 12:42:29 +09:00
def _dispatch(self, cmd: str, mid: int) -> dict | None:
handlers = {
"unlockWithBiometricsForUser": self._handle_unlock,
"getBiometricsStatus": self._handle_status,
"getBiometricsStatusForUser": self._handle_status,
"authenticateWithBiometrics": self._handle_auth,
}
handler = handlers.get(cmd)
if handler is None:
return None
return handler(cmd, mid)
def _handle_unlock(self, cmd: str, mid: int) -> dict:
key_b64 = self._unseal_key()
if key_b64 is None:
log.warn("unlock denied or failed")
return self._reply(cmd, mid, response=False)
log.info("-> unlock granted")
resp = self._reply(cmd, mid, response=True, userKeyB64=key_b64)
key_b64 = None
return resp
def _handle_status(self, cmd: str, mid: int) -> dict:
log.info("-> biometrics available")
return self._reply(cmd, mid, response=0)
def _handle_auth(self, cmd: str, mid: int) -> dict:
log.info("-> authenticated")
return self._reply(cmd, mid, response=True)
2026-03-18 17:47:10 +09:00
def _unseal_key(self) -> str | None:
2026-03-19 12:42:29 +09:00
pw = self._prompt(f"Enter {self._store.name} password:")
2026-03-18 17:47:10 +09:00
if pw is None:
2026-03-19 12:42:29 +09:00
log.info("cancelled")
2026-03-18 17:47:10 +09:00
return None
try:
raw = self._store.load(self._uid, pw)
pw = None
2026-03-19 12:42:29 +09:00
b64 = base64.b64encode(bytes(raw)).decode()
if isinstance(raw, bytearray):
wipe(raw)
log.info(f"unsealed {len(raw)}B from {self._store.name}")
2026-03-18 17:47:10 +09:00
return b64
except Exception as e:
log.error(f"unseal failed: {e}")
return None