Source code for tpm2_pytss.policy

# SPDX-License-Identifier: BSD-2
from .internal.utils import _lib_version_atleast, _chkrc

if not _lib_version_atleast("tss2-policy", "4.0.0"):
    raise NotImplementedError("tss2-policy not installed or version is less then 4.0.0")

from .types import (
    TPM2B_DIGEST,
    TPMS_NV_PUBLIC,
    TPM2B_NAME,
    TPMT_PUBLIC,
    TPM2B_NONCE,
    TSS2_OBJECT,
    TSS2_POLICY_PCR_SELECTION,
    TPM2_HANDLE,
)
from .constants import TPM2_ALG, ESYS_TR, TSS2_RC, TPM2_RC
from .TSS2_Exception import TSS2_Exception
from ._libtpm2_pytss import ffi, lib
from .ESAPI import ESAPI
from enum import Enum
from typing import Callable, Union


class policy_cb_types(Enum):
    """Policy callback types"""

    CALC_PCR = 0
    CALC_NAME = 1
    CALC_PUBLIC = 2
    CALC_NVPUBLIC = 3
    EXEC_AUTH = 4
    EXEC_POLSEL = 5
    EXEC_SIGN = 6
    EXEC_POLAUTH = 7
    EXEC_POLAUTHNV = 8
    EXEC_POLDUP = 9
    EXEC_POLACTION = 10


@ffi.def_extern()
def _policy_cb_calc_pcr(selection, out_selection, out_digest, userdata):
    """Callback wrapper for policy PCR calculations

    Args:
        selection (struct TSS2_POLICY_PCR_SELECTION): in
        out_selection (struct TPML_PCR_SELECTION): out
        out_digest (struct TPML_DIGEST): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.CALC_PCR)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        selcopy = ffi.new("TSS2_POLICY_PCR_SELECTION *", selection[0])
        sel = TSS2_POLICY_PCR_SELECTION(_cdata=selcopy)
        cb_selection, cb_digest = cb(sel)
        out_selection.count = cb_selection.count
        for i in range(0, cb_selection.count):
            out_selection.pcrSelections[i] = cb_selection[i]._cdata
        out_digest.count = cb_digest.count
        for i in range(0, cb_digest.count):
            out_digest.digests[i].buffer = cb_digest[i]
            out_digest.digests[i].size = len(cb_digest[i])
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_calc_name(path, name, userdata):
    """Callback wrapper for policy name calculations

    Args:
        path (C string): in
        name (struct TPM2B_DIGEST): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.CALC_NAME)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        pth = ffi.string(path)
        cb_name = cb(pth)
        name.size = len(cb_name)
        name.name = bytes(cb_name.name)
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_calc_public(path, public, userdata):
    """Callback wrapper for getting the public part for a key path

    Args:
        path (C string): in
        public (struct TPMT_PUBLIC): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.CALC_PUBLIC)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        pth = ffi.string(path)
        cb_public = cb(pth)
        public.type = cb_public.type
        public.nameAlg = cb_public.nameAlg
        public.objectAttributes = cb_public.objectAttributes
        public.authPolicy.buffer = bytes(cb_public.authPolicy)
        public.authPolicy.size = cb_public.authPolicy.size
        public.parameters = cb_public.parameters._cdata
        public.unique = cb_public.unique._cdata
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_calc_nvpublic(path, nv_index, nv_public, userdata):
    """Callback wrapper for getting the public part for a NV path

    Args:
        path (C string or NULL): in
        nv_index (TPMI_RH_NV_INDEX or zero): in
        nv_public (struct TPMS_NV_PUBLIC): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.CALC_NVPUBLIC)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        pth = ffi.string(path) if path != ffi.NULL else None
        index = TPM2_HANDLE(nv_index)
        cb_nv_public = cb(pth, index)
        nv_public.nvIndex = cb_nv_public.nvIndex
        nv_public.nameAlg = cb_nv_public.nameAlg
        nv_public.attributes = cb_nv_public.attributes
        nv_public.authPolicy.buffer = bytes(cb_nv_public.authPolicy)
        nv_public.authPolicy.size = cb_nv_public.authPolicy.size
        nv_public.dataSize = cb_nv_public.dataSize
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_auth(name, object_handle, auth_handle, auth_session, userdata):
    """Callback wrapper for getting authorization sessions for a name

    Args:
        name (struct TPM2B_NAME): in
        object_handle (ESYS_TR): out
        auth_handle (ESYS_TR): out
        auth_session (ESYS_TR): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_AUTH)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        nb = ffi.unpack(name.name, name.size)
        name2b = TPM2B_NAME(nb)
        cb_object_handle, cb_auth_handle, cb_auth_session = cb(name2b)
        object_handle[0] = cb_object_handle
        auth_handle[0] = cb_auth_handle
        auth_session[0] = cb_auth_session
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_polsel(
    auth_object, branch_names, branch_count, branch_idx, userdata
):
    """Callback wrapper selection of a policy branch

    Args:
        auth_object (struct TSS2_OBJECT): in
        branch_names (array of C strings): in
        branch_count (int): in
        branch_idx (int): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_POLSEL)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        obj = None
        if auth_object:
            obj = TSS2_OBJECT(handle=auth_object.handle)
        branches = list()
        for i in range(0, branch_count):
            branch = ffi.string(branch_names[i])
            branches.append(branch)
        indx = cb(obj, branches)
        branch_idx[0] = indx
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_sign(
    key_pem,
    public_key_hint,
    key_pem_hash_alg,
    buf,
    buf_size,
    signature,
    signature_size,
    userdata,
):
    """Callback wrapper to signing an operation

    Args:
        key_pem (C string): in
        public_key_hint (C string): in
        key_pem_hash_alg (TPMI_ALG_HASH): in
        buf: (uint8_t array): in
        buf_size (size_t): in
        signature (pointer to uint8_t array): out
        signature_size (pointer to size_t): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_SIGN)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        pem = ffi.string(key_pem)
        key_hint = ffi.string(public_key_hint)
        hash_alg = TPM2_ALG(key_pem_hash_alg)
        b = bytes(ffi.unpack(buf, buf_size))
        cb_signature = cb(pem, key_hint, hash_alg, b)
        signature[0] = ffi.new("char[]", cb_signature)
        signature_size[0] = len(cb_signature)
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_polauth(
    key_public, hash_alg, digest, policy_ref, signature, userdata
):
    """Callback for signing a policy

    Args:
        key_public (struct TPMT_PUBLIC): in
        hash_alg (TPM2_ALG_ID): in
        digest (struct TPM2B_DIGEST): in
        policy_ref (struct TPM2B_NONCE): in
        signature (struct TPMT_SIGNATURE): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_POLAUTH)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        key_pub = TPMT_PUBLIC(_cdata=key_public)
        halg = TPM2_ALG(hash_alg)
        db = ffi.unpack(digest.buffer, digest.size)
        pb = ffi.unpack(policy_ref.buffer, policy_ref.size)
        dig = TPM2B_DIGEST(db)
        polref = TPM2B_NONCE(pb)
        cb_signature = cb(key_pub, halg, dig, polref)
        signature.sigAlg = cb_signature.sigAlg
        signature.signature = cb_signature.signature._cdata
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_polauthnv(nv_public, hash_alg, userdata):
    """Callback wrapper for NV policy authorization

    Args:
        nv_public (struct TPMS_NV_PUBLIC): in
        hash_alg (TPM2_ALG_ID): in
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_POLAUTHNV)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        nvp = TPMS_NV_PUBLIC(nv_public)
        halg = TPM2_ALG(hash_alg)
        cb(nvp, halg)
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_poldup(name, userdata):
    """Callback wrapper to get name for duplication selection

    Args:
        name (struct TPM2B_NAME): out
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_POLDUP)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        cb_name = cb()
        name.size = len(cb_name)
        name.name = bytes(cb_name)
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _policy_cb_exec_polaction(action, userdata):
    """Callback wrapper for policy action

    Args:
        action (C string): in
        userdata (ffi handle): in
    """
    pi = ffi.from_handle(userdata)
    cb = pi._get_callback(policy_cb_types.EXEC_POLACTION)
    if not cb:
        return TSS2_RC.POLICY_RC_NULL_CALLBACK
    try:
        ab = ffi.string(action)
        cb(ab)
    except Exception as e:
        rc = (
            e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.POLICY_RC_GENERAL_FAILURE
        )
        pi._callback_exception = e
        return rc
    return TPM2_RC.SUCCESS


[docs] class policy(object): """Initialize policy object. Args: policy (Union(bytes, str]): The JSON policy to calculate or execute. hash_alg (TPM2_ALG): The hash algorithm to use for policy calculations. Returns: An instance of the policy object. This class implements the policy part of the TCG TSS 2.0 JSON Data Types and Policy Language Specification. The specification can be found at https://trustedcomputinggroup.org/resource/tcg-tss-json/ """ def __init__(self, policy: Union[bytes, str], hash_alg: TPM2_ALG): if isinstance(policy, str): policy = policy.encode() self._policy = policy self._hash_alg = hash_alg self._callbacks = dict() self._callback_exception = None self._ctx_pp = ffi.new("TSS2_POLICY_CTX **") _chkrc(lib.Tss2_PolicyInit(policy, hash_alg, self._ctx_pp)) self._ctx = self._ctx_pp[0] self._handle = ffi.new_handle(self) self._calc_callbacks = ffi.new("TSS2_POLICY_CALC_CALLBACKS *") self._exec_callbacks = ffi.new("TSS2_POLICY_EXEC_CALLBACKS *") def __enter__(self): return self def __exit__(self, _type, value, traceback): self.close()
[docs] def close(self): """Finalize the policy instance""" lib.Tss2_PolicyFinalize(self._ctx_pp) self._ctx_pp = ffi.NULL self._ctx = ffi.NULL
@property def policy(self) -> bytes: """bytes: The JSON policy.""" return self._policy @property def hash_alg(self) -> TPM2_ALG: """TPM2_ALG: The hash algorithm to be used during policy calculcation.""" return self._hash_alg def _get_callback(self, callback_type: policy_cb_types) -> Callable: return self._callbacks.get(callback_type)
[docs] def set_callback( self, callback_type: policy_cb_types, callback: Union[None, Callable] ): """Set callback for policy calculaction or execution Args: callback_type (policy_cb_types): Which callback to set or unset. callback (Union[None, Callable]): The callback function to call, or None to remove the callback. Raises: ValueError """ userdata = self._handle if callback is None: userdata = ffi.NULL update_calc = False update_exec = False if callback_type == policy_cb_types.CALC_PCR: self._callbacks[callback_type] = callback self._calc_callbacks.cbpcr = lib._policy_cb_calc_pcr self._calc_callbacks.cbpcr_userdata = userdata update_calc = True elif callback_type == policy_cb_types.CALC_NAME: self._callbacks[callback_type] = callback self._calc_callbacks.cbname = lib._policy_cb_calc_name self._calc_callbacks.cbname_userdata = userdata update_calc = True elif callback_type == policy_cb_types.CALC_PUBLIC: self._callbacks[callback_type] = callback self._calc_callbacks.cbpublic = lib._policy_cb_calc_public self._calc_callbacks.cbpublic_userdata = userdata update_calc = True elif callback_type == policy_cb_types.CALC_NVPUBLIC: self._callbacks[callback_type] = callback self._calc_callbacks.cbnvpublic = lib._policy_cb_calc_nvpublic self._calc_callbacks.cbnvpublic_userdata = userdata update_calc = True elif callback_type == policy_cb_types.EXEC_AUTH: self._callbacks[callback_type] = callback self._exec_callbacks.cbauth = lib._policy_cb_exec_auth self._exec_callbacks.cbauth_userdata = userdata update_exec = True elif callback_type == policy_cb_types.EXEC_POLSEL: self._callbacks[callback_type] = callback self._exec_callbacks.cbpolsel = lib._policy_cb_exec_polsel self._exec_callbacks.cbpolsel_userdata = userdata update_exec = True elif callback_type == policy_cb_types.EXEC_SIGN: self._callbacks[callback_type] = callback self._exec_callbacks.cbsign = lib._policy_cb_exec_sign self._exec_callbacks.cbsign_userdata = userdata update_exec = True elif callback_type == policy_cb_types.EXEC_POLAUTH: self._callbacks[callback_type] = callback self._exec_callbacks.cbauthpol = lib._policy_cb_exec_polauth self._exec_callbacks.cbauthpol_userdata = userdata update_exec = True elif callback_type == policy_cb_types.EXEC_POLAUTHNV: self._callbacks[callback_type] = callback self._exec_callbacks.cbauthnv = lib._policy_cb_exec_polauthnv self._exec_callbacks.cbauthnv_userdata = userdata update_exec = True elif callback_type == policy_cb_types.EXEC_POLDUP: self._callbacks[callback_type] = callback self._exec_callbacks.cbdup = lib._policy_cb_exec_poldup self._exec_callbacks.cbdup_userdata = userdata update_exec = True elif callback_type == policy_cb_types.EXEC_POLACTION: self._callbacks[callback_type] = callback self._exec_callbacks.cbaction = lib._policy_cb_exec_polaction self._exec_callbacks.cbaction_userdata = userdata update_exec = True else: raise ValueError(f"unsupported callback type {callback_type}") if update_calc: _chkrc(lib.Tss2_PolicySetCalcCallbacks(self._ctx, self._calc_callbacks)) elif update_exec: _chkrc(lib.Tss2_PolicySetExecCallbacks(self._ctx, self._exec_callbacks))
[docs] def execute(self, esys_ctx: ESAPI, session: ESYS_TR): """Executes the policy Args: esys_ctx (ESAPI): The ESAPI instance to use during policy execution. session (ESYS_TR): The policy session to use during execution. Raises: TSS2_Exception or any possible exception from a callback function. """ try: _chkrc(lib.Tss2_PolicyExecute(self._ctx, esys_ctx._ctx, session)) except Exception as e: if self._callback_exception is not None: raise self._callback_exception raise e finally: self._callback_exception = None
[docs] def calculate(self): """Calculate the policy Raises: TSS2_Exception """ try: _chkrc(lib.Tss2_PolicyCalculate(self._ctx)) except Exception as e: if self._callback_exception is not None: raise self._callback_exception raise e finally: self._callback_exception = None
[docs] def get_calculated_json(self) -> bytes: """Get the calculated policy as JSON Returns: The calculated JSON policy as bytes Raises: TSS2_Exception """ size = ffi.new("size_t *") _chkrc(lib.Tss2_PolicyGetCalculatedJSON(self._ctx, ffi.NULL, size)) cjson = ffi.new("uint8_t[]", size[0]) _chkrc(lib.Tss2_PolicyGetCalculatedJSON(self._ctx, cjson, size)) return ffi.string(cjson, size[0])
@property def description(self) -> bytes: """bytes: The policy description.""" size = ffi.new("size_t *") _chkrc(lib.Tss2_PolicyGetDescription(self._ctx, ffi.NULL, size)) desc = ffi.new("uint8_t[]", size[0]) _chkrc(lib.Tss2_PolicyGetDescription(self._ctx, desc, size)) return ffi.string(desc, size[0])
[docs] def get_calculated_digest(self) -> TPM2B_DIGEST: """Get the digest of the calculated policy Returns: The digest as a TPM2B_DIGEST. Raises: TSS2_Exception """ dig = ffi.new("TPM2B_DIGEST *") _chkrc(lib.Tss2_PolicyGetCalculatedDigest(self._ctx, dig)) return TPM2B_DIGEST(_cdata=dig[0])