Source code for tpm2_pytss.FAPI

# SPDX-License-Identifier: BSD-2
from .internal.utils import _lib_version_atleast

if not _lib_version_atleast("tss2-fapi", "3.0.0"):
    raise NotImplementedError("FAPI Not installed or version is not 3.0.0")

import contextlib
import json
import logging
import os
import tempfile
from typing import Any, Callable, List, Optional, Tuple, Union

from ._libtpm2_pytss import ffi, lib
from .fapi_info import FapiInfo
from .internal.utils import _chkrc, _check_bug_fixed, _get_dptr, _to_bytes_or_null
from .TCTI import TCTI
from .types import TPM2B_PUBLIC, TPM2B_PRIVATE
from .constants import TSS2_RC, TPM2_ALG
from .TSS2_Exception import TSS2_Exception

logger = logging.getLogger(__name__)

FAPI_CONFIG_ENV = "TSS2_FAPICONF"
FAPI_CONFIG_PATHS = [
    "/etc/tpm2-tss/fapi-config.json",
    "/usr/local/etc/tpm2-tss/fapi-config.json",
]

_cffi_malloc = ffi.new_allocator(free=None)


class FAPIConfig(contextlib.ExitStack):
    """Context to create a temporary Fapi environment."""

    def __init__(self, config: Optional[dict] = None, temp_dirs: bool = True, **kwargs):
        f"""Create a temporary Fapi environment. Get the fapi_conf in this order:
        * `config` if given
        * File specified with environment variable `{FAPI_CONFIG_ENV}` if defined
        * Installed config at `{FAPI_CONFIG_PATHS}`

        Single entries are overridden if additional named arguments are given
        and/or if `temp_dirs` is True.

        Args:
            config (dict): Fapi configuration to use instead of the installed `fapi-config.json`. Defaults to None.
            temp_dirs (bool): Create temporary keystore and log directories and set the respective config entries. Defaults to True.
            **kwargs: Single configuration entries which override those in `config` or `fapi-config.json`.
        """
        super().__init__()

        self.config_env_backup = None
        self.config_tmp_path = None
        self.config = config

        # Return if no custom fapi config is used
        if not (config is not None or temp_dirs or kwargs):
            return

        if self.config is None:
            # Load the currently active fapi-config.json
            config_path = os.environ.get(FAPI_CONFIG_ENV, None)
            if config_path is None:
                for p in FAPI_CONFIG_PATHS:
                    try:
                        with open(p) as file:
                            self.config = json.load(file)
                            break
                    except FileNotFoundError:
                        # keep trying
                        pass

                if self.config is None:
                    raise RuntimeError(
                        f"Could not find fapi config at {FAPI_CONFIG_PATHS}, "
                        f"set env var {FAPI_CONFIG_ENV}"
                    )
            else:
                with open(config_path) as file:
                    self.config = json.load(file)

        self.config = {**self.config, **kwargs}

        if temp_dirs:
            temp_dir_config = {
                "user_dir": self.enter_context(tempfile.TemporaryDirectory()),
                "system_dir": self.enter_context(tempfile.TemporaryDirectory()),
                "log_dir": self.enter_context(tempfile.TemporaryDirectory()),
            }
            conflicting_keys = [k for k in temp_dir_config.keys() if k in kwargs]
            if conflicting_keys:
                raise ValueError(
                    f"Conflicting config entries from temp_dirs and **kwargs: {conflicting_keys}"
                )

            self.config = {**self.config, **temp_dir_config}

        with tempfile.NamedTemporaryFile(mode="w", delete=False) as fapi_conf_file:
            self.config_tmp_path = fapi_conf_file.name
            fapi_conf_file.write(json.dumps(self.config))

        logger.debug(
            f"Creating FAPIConfig: {self.config_tmp_path}:\n{json.dumps(self.config, indent=4)}"
        )

        # Set fapi config env variable
        self.config_env_backup = os.environ.get(FAPI_CONFIG_ENV, None)
        os.environ[FAPI_CONFIG_ENV] = self.config_tmp_path

    def __exit__(self, exc_type, exc_val, exc_tb):
        super().__exit__(exc_type, exc_val, exc_tb)

        del os.environ[FAPI_CONFIG_ENV]

        if self.config_tmp_path is not None:
            os.unlink(self.config_tmp_path)


class _FAPI_CB_UDATA:
    def __init__(self, cb, udata):
        self.cur_exc = None
        self.udata = udata
        self.cb = cb


@ffi.def_extern()
def _fapi_auth_callback(object_path, description, auth, user_data):
    cb_udata: _FAPI_CB_UDATA = ffi.from_handle(user_data)

    if not cb_udata.cb:
        return TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
    try:

        got_auth: Optional[bytes, str] = cb_udata.cb(
            ffi.string(object_path), ffi.string(description), cb_udata.udata
        )

        auth_bytes = got_auth.decode() if isinstance(got_auth, str) else got_auth
        auth[0] = _cffi_malloc("char[]", auth_bytes)
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
        cb_udata.cur_exc = e
        return rc

    return TSS2_RC.RC_SUCCESS


@ffi.def_extern()
def _fapi_policy_action_callback(object_path, action, user_data):
    cb_udata: _FAPI_CB_UDATA = ffi.from_handle(user_data)

    if not cb_udata.cb:
        return TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
    try:
        cb_udata.cb(
            ffi.string(object_path).decode(),
            ffi.string(action).decode(),
            cb_udata.udata,
        )
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
        cb_udata.cur_exc = e
        return rc

    return TSS2_RC.RC_SUCCESS


@ffi.def_extern()
def _fapi_sign_callback(
    object_path,
    description,
    publickey,
    publickey_hint,
    hashalg,
    data_to_sign,
    data_to_sign_size,
    signature,
    signature_size,
    user_data,
):
    cb_udata: _FAPI_CB_UDATA = ffi.from_handle(user_data)

    if not cb_udata.cb:
        return TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
    try:
        sig: bytes = cb_udata.cb(
            ffi.string(object_path).decode(),
            ffi.string(description).decode(),
            ffi.string(publickey).decode(),
            ffi.string(publickey_hint).decode(),
            TPM2_ALG(hashalg),
            ffi.buffer(data_to_sign, data_to_sign_size)[:],
            cb_udata.udata,
        )
        signature_size[0] = len(sig)
        signature[0] = _cffi_malloc("char[]", sig)
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.FAPI_RC_GENERAL_FAILURE
        cb_udata.cur_exc = e
        return rc

    return TSS2_RC.RC_SUCCESS


@ffi.def_extern()
def _fapi_branch_callback(
    object_path, description, branch_names, num_branches, selected_branch, user_data
):
    cb_udata: _FAPI_CB_UDATA = ffi.from_handle(user_data)

    if not cb_udata.cb:
        return TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
    try:

        branch_list = [
            ffi.string(x).decode() for x in ffi.unpack(branch_names, num_branches)
        ]

        position: int = cb_udata.cb(
            ffi.string(object_path).decode(),
            ffi.string(description).decode(),
            branch_list,
            cb_udata.udata,
        )
        selected_branch[0] = position
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.FAPI_RC_NOT_IMPLEMENTED
        cb_udata.cur_exc = e
        return rc

    return TSS2_RC.RC_SUCCESS


[docs]class FAPI: """The TPM2 Feature API. This class can be used as a python context or be closed manually via :meth:`~tpm2_pytss.FAPI.close`. """ def __init__(self, uri: Optional[Union[bytes, str]] = None): self.encoding = "utf-8" self._ctx_pp = ffi.new("FAPI_CONTEXT **") uri = _to_bytes_or_null(uri) ret = lib.Fapi_Initialize(self._ctx_pp, uri) _chkrc(ret) self._callback_metadata = {} @property def _ctx(self): """Get the Feature API C context used by the library to hold state. Returns: The Feature API C context. """ return self._ctx_pp[0] def __enter__(self): return self def __exit__(self, _type, value, traceback): self.close()
[docs] def close(self) -> None: """Finalize the Feature API. This frees allocated memory and invalidates the FAPI object.""" lib.Fapi_Finalize(self._ctx_pp)
# TODO flesh out info class @property def version(self): """ Get the tpm2-tss library version. Returns: str: The Feature API C context. """ info = json.loads(self.get_info()) return FapiInfo(info).version @property def config(self): # TODO doc, test info = json.loads(self.get_info()) return FapiInfo(info).fapi_config @property def tcti(self): # TODO doc, test tcti = ffi.new("TSS2_TCTI_CONTEXT **") # returns the actual tcti context, not a copy (so no extra memory is allocated by the fapi) ret = lib.Fapi_GetTcti(self._ctx, tcti) _chkrc(ret) return TCTI(tcti[0])
[docs] def provision( self, auth_value_eh: Optional[Union[bytes, str]] = None, auth_value_sh: Optional[Union[bytes, str]] = None, auth_value_lockout: Optional[Union[bytes, str]] = None, is_provisioned_ok: bool = True, ) -> bool: """Provision the Feature API. Creates the keystore and creates some TPM objects. See also config file `/etc/tpm2-tss/fapi-config.json`. Args: auth_value_eh (bytes or str): Endorsement Hierarchy password. Defaults to None. auth_value_sh (bytes or str): Storage/Owner Hierarchy password. Defaults to None. auth_value_lockout (bytes or str): Lockout Hierarchy password. Defaults to None. is_provisioned_ok (bool): Do not throw a TSS2_Exception if Fapi is already provisioned. Defaults to True. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bool: True if Fapi was provisioned, False otherwise. """ auth_value_eh = _to_bytes_or_null(auth_value_eh) auth_value_sh = _to_bytes_or_null(auth_value_sh) auth_value_lockout = _to_bytes_or_null(auth_value_lockout, allow_null=False) ret = lib.Fapi_Provision( self._ctx, auth_value_eh, auth_value_sh, auth_value_lockout ) _chkrc( ret, acceptable=[lib.TSS2_FAPI_RC_ALREADY_PROVISIONED] if is_provisioned_ok else None, ) return ret == lib.TPM2_RC_SUCCESS
[docs] def get_random(self, num_bytes: int) -> bytes: """Get true random bytes, generated by the TPM. Args: num_bytes (int): Number of bytes to generate. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bytes: The random bytes. """ if num_bytes > 1024: logger.warning( "Requesting a large number of bytes. This may take a while: {num_bytes}" ) data = ffi.new("uint8_t **") ret = lib.Fapi_GetRandom(self._ctx, num_bytes, data) _chkrc(ret) return bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), num_bytes))
[docs] def get_info(self) -> str: """Get Fapi information, containing library info, TPM capabilities and more. Raises: TSS2_Exception: If Fapi returned an error code. Returns: str: JSON-encoded info string. """ info = ffi.new("char **") ret = lib.Fapi_GetInfo(self._ctx, info) _chkrc(ret) return ffi.string(_get_dptr(info, lib.Fapi_Free)).decode(self.encoding)
[docs] def list(self, search_path: Optional[Union[bytes, str]] = None) -> List[str]: """Get a list of all Fapi current object paths. Args: search_path (bytes or str): If given, only list children of `search_path`. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. Returns: List[str]: List of all current Fapi object paths. """ search_path = _to_bytes_or_null(search_path, allow_null=False) path_list = ffi.new("char **") ret = lib.Fapi_List(self._ctx, search_path, path_list) _chkrc(ret) return ( ffi.string(_get_dptr(path_list, lib.Fapi_Free)) .decode(self.encoding) .split(":") )
[docs] def create_key( self, path: Union[bytes, str], type_: Optional[Union[bytes, str]] = None, # TODO enum policy_path: Optional[Union[bytes, str]] = None, auth_value: Optional[Union[bytes, str]] = None, exists_ok: bool = False, ) -> bool: """Create a cryptographic key inside the TPM. Args: path (bytes or str): Path to the new key object, e.g. `/HS/SRK/new_signing_key`. type_ (bytes or str): Comma separated list. Possible values: system, sign, decrypt, restricted, exportable, noda, 0x81000000. Defaults to None. auth_value (bytes or str): Password to key. Defaults to None. exists_ok (bool): Do not throw a TSS2_Exception if an object with the given path already exists. Defaults to False. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bool: True if the key was created. False otherwise. """ path = _to_bytes_or_null(path) type_ = _to_bytes_or_null(type_) policy_path = _to_bytes_or_null(policy_path) auth_value = _to_bytes_or_null(auth_value) ret = lib.Fapi_CreateKey(self._ctx, path, type_, policy_path, auth_value) _chkrc( ret, acceptable=lib.TSS2_FAPI_RC_PATH_ALREADY_EXISTS if exists_ok else None ) return ret == lib.TPM2_RC_SUCCESS
[docs] def sign( self, path: Union[bytes, str], digest: bytes, padding: Optional[Union[bytes, str]] = None, # TODO enum ) -> Tuple[bytes, str, str]: """Create a signature over a given digest. Args: path (bytes or str): Path to the signing key. digest (bytes): Digest to sign. padding (bytes or str): `"rsa_ssa"` or `"rsa_pss"`. Defaults to None (using the scheme specified in the crypto profile). Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[bytes, str, str]: (signature (DER), public key (PEM), certificate (PEM)) """ path = _to_bytes_or_null(path) padding = _to_bytes_or_null(padding) # enum digest = _to_bytes_or_null(digest) signature = ffi.new("uint8_t **") signature_size = ffi.new("size_t *") public_key = ffi.new("char **") certificate = ffi.new("char **") ret = lib.Fapi_Sign( self._ctx, path, padding, digest, len(digest), signature, signature_size, public_key, certificate, ) _chkrc(ret) return ( bytes(ffi.unpack(_get_dptr(signature, lib.Fapi_Free), signature_size[0])), ffi.string(_get_dptr(public_key, lib.Fapi_Free)), ffi.string(_get_dptr(certificate, lib.Fapi_Free)), )
[docs] def verify_signature( self, path: Union[bytes, str], digest: bytes, signature: bytes ): """Verify a signature on a given digest. Args: path (bytes or str): Path to the signing key. digest (bytes): Digest which was signed. signature (bytes): Signature to be verified. Raises: TSS2_Exception: If Fapi returned an error code, e.g. if the signature cannot be verified successfully. """ path = _to_bytes_or_null(path) ret = lib.Fapi_VerifySignature( self._ctx, path, digest, len(digest), signature, len(signature) ) _chkrc(ret)
[docs] def encrypt( self, path: Union[bytes, str], plaintext: Union[bytes, str] ) -> bytes: # TODO difference seal/unseal """Encrypt the plaintext and return the ciphertext. Args: path (bytes or str): The decrypt key used for encryption. plaintext (bytes or str): The data to be encrypted. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bytes: The ciphertext. """ _check_bug_fixed( fixed_in="3.2", backports=["2.4.7", "3.0.5", "3.1.1"], details="Faulty free of FAPI Encrypt might lead to Segmentation Fault. See https://github.com/tpm2-software/tpm2-tss/issues/2092", ) path = _to_bytes_or_null(path) plaintext = _to_bytes_or_null(plaintext) ciphertext = ffi.new("uint8_t **") ciphertext_size = ffi.new("size_t *") ret = lib.Fapi_Encrypt( self._ctx, path, plaintext, len(plaintext), ciphertext, ciphertext_size ) _chkrc(ret) return bytes( ffi.unpack(_get_dptr(ciphertext, lib.Fapi_Free), ciphertext_size[0]) )
[docs] def decrypt(self, path: Union[bytes, str], ciphertext: bytes) -> bytes: """Decrypt the ciphertext and return the plaintext. Args: path (bytes or str): The decrypt key used for decryption. ciphertext (bytes or str): The data to be decrypted. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bytes: The plaintext. """ path = _to_bytes_or_null(path) plaintext = ffi.new("uint8_t **") plaintext_size = ffi.new("size_t *") ret = lib.Fapi_Decrypt( self._ctx, path, ciphertext, len(ciphertext), plaintext, plaintext_size ) _chkrc(ret) return bytes(ffi.unpack(plaintext[0], plaintext_size[0]))
[docs] def create_seal( self, path: Union[bytes, str], data: Optional[Union[bytes, str]] = None, type_: Optional[Union[bytes, str]] = None, policy_path: Optional[Union[bytes, str]] = None, auth_value: Optional[Union[bytes, str]] = None, size: Optional[int] = None, exists_ok: bool = False, ) -> bool: """Create a Fapi sealed (= encrypted) object, that is data sealed a Fapi parent key. Oftentimes, the data is a digest. Args: path (bytes or str): The path of the new sealed object. data (bytes or str): Data to be sealed (often a digest). If None, random data will be generated. Defaults to None. type_ (bytes or str): Comma separated list. Possible values: system, sign, decrypt, restricted, exportable, noda, 0x81000000. Defaults to None. policy_path (bytes or str): The path to the policy which will be associated with the sealed object. Defaults to None. auth_value (bytes or str): Password to protect the new sealed object. Defaults to None. size (int): If data is None, random bytes of length size are generated. Parameters data and size cannot be given at the same time. Defaults to None. exists_ok (bool): Do not throw a TSS2_Exception if an object with the given path already exists. Defaults to False. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bool: True if the sealed object was created. False otherwise. """ path = _to_bytes_or_null(path) if data is not None and size is not None: raise ValueError("Parameters data and size cannot be given at same time.") if data is None and size is None: raise ValueError("Either parameter data or parameter size must be given.") if data is None: data_len = size else: data_len = len(data) data = _to_bytes_or_null(data) type_ = _to_bytes_or_null(type_) policy_path = _to_bytes_or_null(policy_path) auth_value = _to_bytes_or_null(auth_value) ret = lib.Fapi_CreateSeal( self._ctx, path, type_, data_len, policy_path, auth_value, data ) _chkrc( ret, acceptable=lib.TSS2_FAPI_RC_PATH_ALREADY_EXISTS if exists_ok else None ) return ret == lib.TPM2_RC_SUCCESS
[docs] def unseal(self, path: Union[bytes, str]) -> bytes: """Unseal a sealed (= encrypted) Fapi object and return the data in plaintext. Args: path (Union[bytes, str]): The path to the sealed object. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bytes: The unsealed data in plaintext. """ path = _to_bytes_or_null(path) data = ffi.new("uint8_t **") data_size = ffi.new("size_t *") ret = lib.Fapi_Unseal(self._ctx, path, data, data_size) _chkrc(ret) return bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), data_size[0]))
[docs] def import_object( self, path: Union[bytes, str], import_data: Union[bytes, str], exists_ok: bool = False, ) -> bool: """Import policy, policy template or key into the keystore. Args: path (bytes or str): Path of the future Fapi object. import_data (bytes or str): JSON-encoded data to import. exists_ok (bool): Do not throw a TSS2_Exception if an object with the given path already exists. Defaults to False. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bool: True if the object was imported. False otherwise. """ _check_bug_fixed( fixed_in="3.2", details="FAPI Import will overwrite existing objects with same path silently. See https://github.com/tpm2-software/tpm2-tss/issues/2028", ) path = _to_bytes_or_null(path) import_data = _to_bytes_or_null(import_data) ret = lib.Fapi_Import(self._ctx, path, import_data) _chkrc( ret, acceptable=lib.TSS2_FAPI_RC_PATH_ALREADY_EXISTS if exists_ok else None ) return ret == lib.TPM2_RC_SUCCESS
[docs] def delete(self, path: Union[bytes, str]) -> None: """Delete Fapi object. Args: path (bytes or str): Path to the Fapi object to delete. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) ret = lib.Fapi_Delete(self._ctx, path) _chkrc(ret)
[docs] def change_auth( self, path: Union[bytes, str], auth_value: Optional[Union[bytes, str]] = None ) -> None: """Change the password to a Fapi object. Args: path (bytes or str): Path to the Fapi object. auth_value (bytes or str): New password. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) auth_value = _to_bytes_or_null(auth_value) ret = lib.Fapi_ChangeAuth(self._ctx, path, auth_value) _chkrc(ret)
[docs] def export_key( self, path: Union[bytes, str], new_path: Union[bytes, str] = None ) -> str: """Export a Fapi object as a JSON-encoded string. Args: path (bytes or str): Path to the existing Fapi object. new_path (bytes or str): New path to the Fapi object. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. Returns: str: The exported data. """ path = _to_bytes_or_null(path) new_path = _to_bytes_or_null(new_path) exported_data = ffi.new("char **") ret = lib.Fapi_ExportKey(self._ctx, path, new_path, exported_data) _chkrc(ret) return ffi.string(_get_dptr(exported_data, lib.Fapi_Free)).decode(self.encoding)
[docs] def set_description( self, path: Union[bytes, str], description: Optional[Union[bytes, str]] = None ) -> None: """Set the description of a Fapi object. Args: path (bytes or str): Path to the Fapi object. description (bytes or str): New description of the Fapi object. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) description = _to_bytes_or_null(description) ret = lib.Fapi_SetDescription(self._ctx, path, description) _chkrc(ret)
[docs] def get_description(self, path: Union[bytes, str] = None) -> str: """Get the description of a Fapi object. Args: path (bytes or str): Path to the Fapi object. Raises: TSS2_Exception: If Fapi returned an error code. Returns: str: The description of the Fapi object. """ path = _to_bytes_or_null(path) description = ffi.new("char **") ret = lib.Fapi_GetDescription(self._ctx, path, description) _chkrc(ret) # description is guaranteed to be a null-terminated string return ffi.string(_get_dptr(description, lib.Fapi_Free)).decode()
[docs] def set_app_data( self, path: Union[bytes, str], app_data: Optional[Union[bytes, str]] = None ) -> None: """Add custom application data to a Fapi object. This data is saved alongside the object and can be used by the application. Args: path (bytes or str): Path to the Fapi object. app_data (bytes or str): Custom application data to be associated with the Fapi object. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) if app_data is None: app_data_len = 0 else: app_data_len = len(app_data) app_data = _to_bytes_or_null(app_data) ret = lib.Fapi_SetAppData(self._ctx, path, app_data, app_data_len) _chkrc(ret)
[docs] def get_app_data(self, path: Union[bytes, str]) -> Optional[bytes]: """Get the custom application data of a Fapi object. Args: path (bytes or str): Path to the Fapi object. Raises: TSS2_Exception: If Fapi returned an error code. Returns: Optional[bytes]: The application data or None. """ path = _to_bytes_or_null(path) app_data = ffi.new("uint8_t **") app_data_size = ffi.new("size_t *") ret = lib.Fapi_GetAppData(self._ctx, path, app_data, app_data_size) _chkrc(ret) if app_data[0] == ffi.NULL: return None return bytes(ffi.unpack(_get_dptr(app_data, lib.Fapi_Free), app_data_size[0]))
[docs] def set_certificate( self, path: Union[bytes, str], certificate: Optional[Union[bytes, str]] = None ) -> None: """Add x509 certificate to a Fapi object. This data is saved alongside the object and can be used by the application. Args: path (bytes or str): Path to the Fapi object. certificate (bytes or str): x509 certificate to be associated with the Fapi object. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) certificate = _to_bytes_or_null(certificate) ret = lib.Fapi_SetCertificate(self._ctx, path, certificate) _chkrc(ret)
[docs] def get_certificate(self, path: Union[bytes, str]) -> str: """Get the custom application data of a Fapi object. Args: path (bytes or str): Path to the Fapi object. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bytes: The application data. """ path = _to_bytes_or_null(path) certificate = ffi.new("char **") ret = lib.Fapi_GetCertificate(self._ctx, path, certificate) _chkrc(ret) # certificate is guaranteed to be a null-terminated string return ffi.string(_get_dptr(certificate, lib.Fapi_Free)).decode()
[docs] def get_platform_certificates(self, no_cert_ok: bool = False) -> bytes: """Get the platform certificate and the so-called delta certificates. Args: no_cert_ok (bool): If True, an empty byte string is returned if no certificate is found. If False, in this case a TSS2_Exception is raised. Defaults to False. Raises: TSS2_Exception: If Fapi returned an error code. Returns: bytes: The platform certificates """ _check_bug_fixed( fixed_in="3.2", backports=["2.4.7", "3.0.5", "3.1.1"], details="FAPI Get Platform Certificate might lead wrong sequence errors. See https://github.com/tpm2-software/tpm2-tss/issues/2091", ) # TODO split certificates into list # TODO why bytes? is this DER? certificate = ffi.new("uint8_t **") certificates_size = ffi.new("size_t *") ret = lib.Fapi_GetPlatformCertificates( self._ctx, certificate, certificates_size ) _chkrc(ret, acceptable=lib.TSS2_FAPI_RC_NO_CERT if no_cert_ok else None) if no_cert_ok and ret == lib.TSS2_FAPI_RC_NO_CERT: return b"" return bytes( ffi.unpack(_get_dptr(certificate, lib.Fapi_Free), certificates_size) )
[docs] def get_tpm_blobs( self, path: Union[bytes, str] ) -> Tuple[TPM2B_PUBLIC, TPM2B_PRIVATE, str]: """Get the TPM data blobs and the policy associates with a Fapi object. Args: path (bytes or str): Path to the Fapi object. Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[TPM2B_PUBLIC, TPM2B_PRIVATE, str]: (tpm_2b_public, tpm_2b_private, policy) """ path = _to_bytes_or_null(path) tpm_2b_public = ffi.new("uint8_t **") tpm_2b_public_size = ffi.new("size_t *") tpm_2b_private = ffi.new("uint8_t **") tpm_2b_private_size = ffi.new("size_t *") policy = ffi.new("char **") ret = lib.Fapi_GetTpmBlobs( self._ctx, path, tpm_2b_public, tpm_2b_public_size, tpm_2b_private, tpm_2b_private_size, policy, ) _chkrc(ret) policy_str = ffi.string(policy[0]).decode(self.encoding) tpm_2b_public_buffer = bytes( ffi.buffer(tpm_2b_public[0], tpm_2b_public_size[0]) ) tpm_2b_public_unmarsh, _ = TPM2B_PUBLIC.unmarshal(tpm_2b_public_buffer) tpm_2b_private_buffer = bytes( ffi.buffer(tpm_2b_private[0], tpm_2b_private_size[0]) ) tpm_2b_private_unmarsh, _ = TPM2B_PRIVATE.unmarshal(tpm_2b_private_buffer) return ( tpm_2b_public_unmarsh, tpm_2b_private_unmarsh, policy_str, )
[docs] def get_esys_blob(self, path: Union[bytes, str]) -> Tuple[bytes, Any]: """Return the ESAPI binary blob associated with a Fapi object. This blob can be easily loaded with :meth:`~tpm2_pytss.ESAPI.load_blob()`. Args: path (bytes or str): Path to the Fapi object. Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[bytes, Any]: A tuple of the binary blob and its type (:const:`constants.FAPI_ESYSBLOB.CONTEXTLOAD` or :const:`constants.FAPI_ESYSBLOB.DESERIALIZE`) """ path = _to_bytes_or_null(path) type_ = ffi.new("uint8_t *") data = ffi.new("uint8_t **") length = ffi.new("size_t *") ret = lib.Fapi_GetEsysBlob(self._ctx, path, type_, data, length) _chkrc(ret) return bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), length[0])), type_[0]
[docs] def export_policy(self, path: Union[bytes, str]) -> str: """Export a policy from the key store as a JSON-encoded string. Args: path (bytes or str): Path to the FAPI policy. Raises: TSS2_Exception: If Fapi returned an error code. Returns: str: JSON-encoded policy. """ path = _to_bytes_or_null(path) policy = ffi.new("char **") ret = lib.Fapi_ExportPolicy(self._ctx, path, policy) _chkrc(ret) return ffi.string(_get_dptr(policy, lib.Fapi_Free)).decode()
[docs] def authorize_policy( self, policy_path: Union[bytes, str], key_path: Union[bytes, str], policy_ref: Optional[Union[bytes, str]] = None, ): """Specify the underlying policy/policies for a policy Authorize. Args: policy_path (bytes or str): Path to the underlying policy. key_path (bytes or str): Path to the key associated with the policy Authorize. policy_ref (bytes or str): Additional application data (e.g. a reference to another policy). Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ policy_path = _to_bytes_or_null(policy_path) key_path = _to_bytes_or_null(key_path) if policy_ref is None: policy_ref_len = 0 else: policy_ref_len = len(policy_ref) policy_ref = _to_bytes_or_null(policy_ref) ret = lib.Fapi_AuthorizePolicy( self._ctx, policy_path, key_path, policy_ref, policy_ref_len ) _chkrc(ret)
[docs] def pcr_read(self, index: int) -> Tuple[bytes, str]: """Read the value of a TPM Platform Configuration Register (PCR) and its associated event log. Args: index (int): Index of the PCR (in the range of 0-23 in most cases). Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[bytes, str]: (pcr_value, event_log) """ value = ffi.new("uint8_t **") value_size = ffi.new("size_t *") log = ffi.new("char **") ret = lib.Fapi_PcrRead(self._ctx, index, value, value_size, log) _chkrc(ret) return ( bytes(ffi.unpack(_get_dptr(value, lib.Fapi_Free), value_size[0])), ffi.string(_get_dptr(log, lib.Fapi_Free)).decode(), )
[docs] def pcr_extend( self, index: int, data: Union[bytes, str], log: Optional[Union[bytes, str]] = None, ) -> None: """Extend the value of a TPM Platform Configuration Register (PCR). The data given by the user and the previous PCR value are hashed together. The resulting digest is stored as the new PCR value. As a result, a PCR value depends on every piece of data given via the extend command (until the PCR is reset). Args: index (int): Index of the PCR (in the range of 0-23 in most cases). data (bytes or str): Input data to the extend operation. log (bytes or str): JSON-encoded event log data. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[bytes, str]: PCR value and its associated event log. """ # TODO "extend", formula in doc log = _to_bytes_or_null(log) data = _to_bytes_or_null(data) ret = lib.Fapi_PcrExtend(self._ctx, index, data, len(data), log) _chkrc(ret)
[docs] def quote( self, path: Union[bytes, str], pcrs: List[int], quote_type: Optional[Union[bytes, str]] = None, qualifying_data: Optional[Union[bytes, str]] = None, ) -> Tuple[str, bytes, str, str]: """Create a TPM quote, that is a signed data structure of the TPM Platform Configuration Registers (PCRs), reset count, firmware version and more. Args: path (bytes or str): Path to the key used for signing. pcrs (List[int]): List of PCR indices to be included in the quote. quote_type (bytes or str): Type of quote to create. The default "TPM-Quote" is used if None is given. Defaults to None. qualifying_data (bytes or str): Additional application-defined data. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[str, bytes, str, str]: info, signature, pcr_log, certificate """ _check_bug_fixed( fixed_in="3.2", backports=["2.4.7", "3.0.5", "3.1.1"], details="Multiple calls of FAPI Quote might lead to TPM out of memory errors. See https://github.com/tpm2-software/tpm2-tss/issues/2084", ) path = _to_bytes_or_null(path) quote_type = _to_bytes_or_null(quote_type) if qualifying_data is None: qualifying_data_len = 0 else: qualifying_data_len = len(qualifying_data) qualifying_data = _to_bytes_or_null(qualifying_data) quote_info = ffi.new("char **") signature = ffi.new("uint8_t **") signature_len = ffi.new("size_t *") pcr_log = ffi.new("char **") certificate = ffi.new("char **") ret = lib.Fapi_Quote( self._ctx, pcrs, len(pcrs), path, quote_type, qualifying_data, qualifying_data_len, quote_info, signature, signature_len, pcr_log, certificate, ) _chkrc(ret) return ( ffi.string(_get_dptr(quote_info, lib.Fapi_Free)).decode(), bytes(ffi.unpack(_get_dptr(signature, lib.Fapi_Free), signature_len[0])), ffi.string(_get_dptr(pcr_log, lib.Fapi_Free)).decode(), ffi.string( _get_dptr(certificate, lib.Fapi_Free) or ffi.new("char *") ).decode(), )
[docs] def verify_quote( self, path: Union[bytes, str], signature: bytes, quote_info: Union[bytes, str], qualifying_data: Optional[Union[bytes, str]] = None, pcr_log: Optional[Union[bytes, str]] = None, ): """Verify the signature to a TPM quote. Args: path (bytes or str): Path to the key used for verifying the signature. signature (bytes): Signature to the quote. quote_info (bytes or str): Quote info structure. qualifying_data (bytes or str): Additional application-defined data. Defaults to None. pcr_log (bytes or str): JSON-encoded PCR log entry. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) signature = _to_bytes_or_null(signature) if qualifying_data is None: qualifying_data_len = 0 else: qualifying_data_len = len(qualifying_data) qualifying_data = _to_bytes_or_null(qualifying_data) quote_info = _to_bytes_or_null(quote_info) pcr_log = _to_bytes_or_null(pcr_log) ret = lib.Fapi_VerifyQuote( self._ctx, path, qualifying_data, qualifying_data_len, quote_info, signature, len(signature), pcr_log, ) _chkrc(ret)
[docs] def create_nv( self, path: Union[bytes, str], size: int, type_: Optional[Union[bytes, str]] = None, policy_path: Optional[Union[bytes, str]] = None, auth_value: Optional[Union[bytes, str]] = None, ) -> None: """Create non-volatile (NV) storage on the TPM. Args: path (bytes or str): Path to the NV storage area. size (int): Size of the storage area in bytes. type_ (bytes or str): Type of the storage area. A combination of `bitfield`, `counter`, `pcr`, `system`, `noda`. Defaults to None. policy_path (bytes or str): The path to the policy which will be associated with the storage area. Defaults to None. auth_value (bytes or str): Password to protect the new storage area. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) type_ = _to_bytes_or_null(type_) policy_path = _to_bytes_or_null(policy_path) auth_value = _to_bytes_or_null(auth_value) ret = lib.Fapi_CreateNv(self._ctx, path, type_, size, policy_path, auth_value) _chkrc(ret)
[docs] def nv_read(self, path: Union[bytes, str]) -> Tuple[bytes, str]: """Read from non-volatile (NV) TPM storage. Args: path (bytes or str): Path to the NV storage area. Raises: TSS2_Exception: If Fapi returned an error code. Returns: Tuple[bytes, str]: Data stored in the NV storage area and its associated event log. """ path = _to_bytes_or_null(path) data = ffi.new("uint8_t **") data_size = ffi.new("size_t *") log = ffi.new("char **") ret = lib.Fapi_NvRead(self._ctx, path, data, data_size, log) _chkrc(ret) return ( bytes(ffi.unpack(_get_dptr(data, lib.Fapi_Free), data_size[0])), ffi.string(_get_dptr(log, lib.Fapi_Free)).decode(), )
[docs] def nv_write(self, path: Union[bytes, str], data: Union[bytes, str]) -> None: """Write data to a non-volatile (NV) TPM storage and the associated event log. Args: path (bytes or str): Path to the NV storage area. data (bytes or str): Data to write to the NV storage area. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) data = _to_bytes_or_null(data) ret = lib.Fapi_NvWrite(self._ctx, path, data, len(data)) _chkrc(ret)
[docs] def nv_extend( self, path: Union[bytes, str], data: Union[bytes, str], log: Optional[Union[bytes, str]] = None, ) -> None: """Perform an extend operation on a non-volatile TPM storage area. Requires an NV object of type `pcr`. For more information on the extend operation, see :meth:`~tpm2_pytss.FAPI.pcr_extend`. Args: path (bytes or str): Path to the NV storage area. data (bytes or str): Input data to the extend operation. log (bytes or str): JSON-encoded event to be associated with this change. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) data = _to_bytes_or_null(data) log = _to_bytes_or_null(log) ret = lib.Fapi_NvExtend(self._ctx, path, data, len(data), log) _chkrc(ret)
[docs] def nv_increment(self, path: Union[bytes, str]) -> None: """Increment the counter value stored in non-volatile (NV) TPM storage. Args: path (bytes or str): Path to the NV storage area. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) ret = lib.Fapi_NvIncrement(self._ctx, path) _chkrc(ret)
[docs] def nv_set_bits(self, path: Union[bytes, str], bitmap: int) -> None: """Set bits of bitfielad, stored in non-volatile (NV) TPM storage. Args: path (bytes or str): Path to the NV storage area. bitmap (int): Bits to set in the NV storage area. Raises: TSS2_Exception: If Fapi returned an error code. """ path = _to_bytes_or_null(path) ret = lib.Fapi_NvSetBits(self._ctx, path, bitmap) _chkrc(ret)
[docs] def write_authorize_nv( self, nv_path: Union[bytes, str], policy_path: Union[bytes, str] ) -> None: """Write a policy to non-volatile (NV) TPM storage. Args: nv_path (bytes or str): Path to the NV storage area. policy_path (bytes or str): Path to the policy to be written. Raises: TSS2_Exception: If Fapi returned an error code. """ nv_path = _to_bytes_or_null(nv_path) policy_path = _to_bytes_or_null(policy_path) ret = lib.Fapi_WriteAuthorizeNv(self._ctx, nv_path, policy_path) _chkrc(ret)
[docs] def set_auth_callback( self, callback: Optional[ Callable[[str, str, Optional[Any]], Union[bytes, str]] ] = None, user_data: Optional[Any] = None, ) -> None: """Register a callback that provides the password for Fapi objects when needed. Typically, this callback implements a password prompt. If `callback` is None, the callback function is reset. Args: callback (Optional[Callable[[str, str, Optional[Any]], Union[bytes, str]]]): A callback function `callback(path, description, user_data=None)` which returns the password (:class:`str`). Defaults to None. user_data (Any): Bytes that will be handed to the callback. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. RuntimeError: If callback is None and user_data is NOT None. """ if callback is None and user_data is not None: raise RuntimeError("If callback is None, user_data must be None, too.") if callback: cb_udata = _FAPI_CB_UDATA(callback, user_data) cb_user_data_handle = ffi.new_handle(cb_udata) # keep this alive, overwrite old so it be gc'd self._callback_metadata["auth"] = cb_user_data_handle lib_callback = lib._fapi_auth_callback else: lib_callback = ffi.NULL cb_user_data_handle = ffi.NULL self._callback_metadata["auth"] = None ret = lib.Fapi_SetAuthCB(self._ctx, lib_callback, cb_user_data_handle) _chkrc(ret)
[docs] def set_branch_callback( self, callback: Optional[Callable[[str, str, List[str], Optional[Any]], int]] = None, user_data: Optional[Any] = None, ): """Set the Fapi policy branch callback, called to decide which policy path to take in a policy Or. If `callback` is None, the callback function is reset. Args: callback (Callable[[str, str, List[str], Optional[bytes]], int]): A callback function `callback(path, description, branch_names, user_data=None)` which returns the index (:class:`int`) of the selected branch in `branch_names`. Defaults to None. user_data (bytes or str): Custom data passed to the callback function. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. RuntimeError: If callback is None and user_data is NOT None. """ if callback is None: _check_bug_fixed( fixed_in="3.1", backports=["2.4.3", "3.0.1", "3.1.0"], details="A NULL FAPI SetBranchCallback Action might lead to crashes. See https://github.com/tpm2-software/tpm2-tss/pull/2500", ) if callback is None and user_data is not None: raise RuntimeError("If callback is None, user_data must be None, too.") if callback: cb_udata = _FAPI_CB_UDATA(callback, user_data) cb_user_data_handle = ffi.new_handle(cb_udata) # keep this alive, overwrite old so it be gc'd self._callback_metadata["branch"] = cb_user_data_handle lib_callback = lib._fapi_branch_callback else: lib_callback = ffi.NULL cb_user_data_handle = ffi.NULL self._callback_metadata["branch"] = None ret = lib.Fapi_SetBranchCB(self._ctx, lib_callback, cb_user_data_handle) _chkrc(ret)
[docs] def set_sign_callback( self, callback: Optional[ Callable[[str, str, str, str, int, bytes, Optional[Any]], bytes] ] = None, user_data: Optional[Any] = None, ): """Set the Fapi signing callback which is called to satisfy the policy Signed. If `callback` is None, the callback function is reset. Args: callback (Callable[[str, str, str, str, int, bytes, Optional[bytes]], bytes]): A callback function `callback(path, description, public_key, public_key_hint, hash_alg, data_to_sign, user_data=None)` which returns a signature (:class:`bytes`) of `data_to_sign`. Defaults to None. user_data (Any): Custom data passed to the callback function. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. RuntimeError: If callback is None and user_data is NOT None. """ _check_bug_fixed( fixed_in="3.2", details="FAPI PolicySigned default nameAlg might be SHA1 unexpectedly. See https://github.com/tpm2-software/tpm2-tss/issues/2080. Fixed in https://github.com/tpm2-software/tpm2-tss/commit/b843960b6e601a786b469832392dc0a12e13cf34", ) if callback is None and user_data is not None: raise RuntimeError("If callback is None, user_data must be None, too.") if callback: cb_udata = _FAPI_CB_UDATA(callback, user_data) cb_user_data_handle = ffi.new_handle(cb_udata) # keep this alive, overwrite old so it be gc'd self._callback_metadata["sign"] = cb_user_data_handle lib_callback = lib._fapi_sign_callback else: lib_callback = ffi.NULL cb_user_data_handle = ffi.NULL self._callback_metadata["sign"] = None ret = lib.Fapi_SetSignCB(self._ctx, lib_callback, cb_user_data_handle) _chkrc(ret)
[docs] def set_policy_action_callback( self, callback: Optional[Callable[[str, str, Optional[bytes]], None]] = None, user_data: Optional[Any] = None, ): """Set the policy Action callback which is called to satisfy the policy Action. If `callback` is None, the callback function is reset. Args: callback (Callable[[str, str, Optional[bytes]], None]): A callback function `callback(path, action, user_data=None)`. Defaults to None. user_data (Any): Custom data passed to the callback function. Defaults to None. Raises: TSS2_Exception: If Fapi returned an error code. RuntimeError: If callback is None and user_data is NOT None. """ _check_bug_fixed( fixed_in="3.2", backports=["2.4.7", "3.0.5", "3.1.1"], details="FAPI Policy Action might lead to crashes. See https://github.com/tpm2-software/tpm2-tss/issues/2089", ) if callback is None and user_data is not None: raise RuntimeError("If callback is None, user_data must be None, too.") if callback: cb_udata = _FAPI_CB_UDATA(callback, user_data) cb_user_data_handle = ffi.new_handle(cb_udata) # keep this alive, overwrite old so it be gc'd self._callback_metadata["policy"] = cb_user_data_handle lib_callback = lib._fapi_policy_action_callback else: lib_callback = ffi.NULL cb_user_data_handle = ffi.NULL self._callback_metadata["policy"] = None ret = lib.Fapi_SetPolicyActionCB(self._ctx, lib_callback, cb_user_data_handle) _chkrc(ret)