Source code for tpm2_pytss.TCTISPIHelper

# SPDX-License-Identifier: BSD-2

from .internal.utils import _chkrc, _lib_version_atleast
from ._libtpm2_pytss import ffi, lib
from .constants import TSS2_RC, TPM2_RC
from .TSS2_Exception import TSS2_Exception
from .TCTI import TCTI

if not _lib_version_atleast("tss2-tcti-spi-helper", "0.0.0"):
    raise NotImplementedError("Package tss2-tcti-spi-helper not present")


@ffi.def_extern()
def _tcti_spi_helper_sleep_ms(userdata, milliseconds):
    thiz = TCTISPIHelper._cffi_cast(userdata)
    if not hasattr(thiz, "on_sleep_ms"):
        return TSS2_RC.TCTI_RC_NOT_IMPLEMENTED
    try:
        thiz.on_sleep_ms(milliseconds)
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.TCTI_RC_GENERAL_FAILURE
        thiz._set_last_exception(e)
        return rc

    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _tcti_spi_helper_start_timeout(userdata, milliseconds):
    thiz = TCTISPIHelper._cffi_cast(userdata)
    if not hasattr(thiz, "on_start_timeout"):
        return TSS2_RC.TCTI_RC_NOT_IMPLEMENTED
    try:
        thiz.on_start_timeout(milliseconds)
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.TCTI_RC_GENERAL_FAILURE
        thiz._set_last_exception(e)
        return rc

    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _tcti_spi_helper_timeout_expired(userdata, is_time_expired) -> bool:

    thiz = TCTISPIHelper._cffi_cast(userdata)
    if not hasattr(thiz, "on_timeout_expired"):
        return TSS2_RC.TCTI_RC_NOT_IMPLEMENTED
    try:
        result = thiz.on_timeout_expired()
        is_time_expired[0] = result
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.TCTI_RC_GENERAL_FAILURE
        thiz._set_last_exception(e)
        return rc

    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _tcti_spi_helper_spi_acquire(userdata):
    thiz = TCTISPIHelper._cffi_cast(userdata)
    if not hasattr(thiz, "on_start_timeout"):
        return TSS2_RC.TCTI_RC_NOT_IMPLEMENTED
    try:
        thiz.on_spi_acquire()
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.TCTI_RC_GENERAL_FAILURE
        thiz._set_last_exception(e)
        return rc

    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _tcti_spi_helper_spi_release(userdata):
    thiz = TCTISPIHelper._cffi_cast(userdata)
    if not hasattr(thiz, "on_spi_release"):
        return TSS2_RC.TCTI_RC_NOT_IMPLEMENTED
    try:
        thiz.on_spi_release()
    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.TCTI_RC_GENERAL_FAILURE
        thiz._set_last_exception(e)
        return rc

    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _tcti_spi_helper_spi_transfer(userdata, data_out, data_in, cnt):
    thiz = TCTISPIHelper._cffi_cast(userdata)
    if not hasattr(thiz, "on_spi_transfer"):
        return TSS2_RC.TCTI_RC_NOT_IMPLEMENTED
    try:
        # setup for the transaction
        dout = None if data_out == ffi.NULL else bytes(ffi.buffer(data_out, cnt))

        # call for transaction
        data_got = thiz.on_spi_transfer(dout)

        # handle response should None be OK?
        if data_got is None and data_in != ffi.NULL:
            raise RuntimeError("Response data CANNOT be None")
        elif data_got is None and data_in == ffi.NULL:
            return TPM2_RC.SUCCESS

        # current interface is hardcoded to full duplex, so input
        # must equal output
        if len(data_got) != cnt:
            raise ValueError(
                f"Transactions is expected to be {cnt} bytes, got {len(data_got)} bytes"
            )

        # copy the data
        raw_data_got = ffi.from_buffer(data_got)
        ffi.memmove(data_in, raw_data_got, len(data_got))

    except Exception as e:
        rc = e.rc if isinstance(e, TSS2_Exception) else TSS2_RC.TCTI_RC_GENERAL_FAILURE
        thiz._set_last_exception(e)
        return rc

    return TPM2_RC.SUCCESS


@ffi.def_extern()
def _tcti_spi_helper_finalize(userdata):
    thiz = TCTISPIHelper._cffi_cast(userdata)
    if hasattr(thiz, "on_finalize"):
        thiz.on_finalize(thiz)


[docs] class TCTISPIHelper(TCTI): """The TCTI for interacting with SPI devices. Users should *extend* a TCTISPIHelper object and implement the following callbacks: All Users: - on_sleep_ms - on_start_timeout - on_timeout_expired - on_spi_transfer with_wait_state=true: - on_spi_acquire - on_spi_release Optional: - on_finalize Args: with_wait_state (bool): True if you intend to use wait states. Defaults to False. """ def __init__(self, with_wait_state=False): self._with_wait_state = with_wait_state size = ffi.new("size_t *") self._callbacks = ffi.new("TSS2_TCTI_SPI_HELPER_PLATFORM *") self._callbacks.sleep_ms = lib._tcti_spi_helper_sleep_ms self._callbacks.start_timeout = lib._tcti_spi_helper_start_timeout self._callbacks.timeout_expired = lib._tcti_spi_helper_timeout_expired self._callbacks.spi_transfer = lib._tcti_spi_helper_spi_transfer self._callbacks.finalize = lib._tcti_spi_helper_finalize self._callbacks.user_data = self._thiz = ffi.new_handle(self) missing_implementation = [] if self._with_wait_state: self._callbacks.spi_acquire = lib._tcti_spi_helper_spi_acquire self._callbacks.spi_release = lib._tcti_spi_helper_spi_release if "TCTISPIHelper.on_spi_acquire" in str(self.on_spi_acquire): missing_implementation.append("on_spi_acquire") if "TCTISPIHelper.on_spi_release" in str(self.on_spi_release): missing_implementation.append("on_spi_release") if "TCTISPIHelper.on_spi_transfer" in str(self.on_spi_transfer): missing_implementation.append("on_spi_transfer") if "TCTISPIHelper.on_timeout_expired" in str(self.on_timeout_expired): missing_implementation.append("on_timeout_expired") if "TCTISPIHelper.on_start_timeout" in str(self.on_start_timeout): missing_implementation.append("on_start_timeout") if "TCTISPIHelper.on_sleep_ms" in str(self.on_sleep_ms): missing_implementation.append("on_sleep_ms") if len(missing_implementation) > 0: raise NotImplementedError( f"Subclasses must implement {','.join(missing_implementation)}" ) _chkrc(lib.Tss2_Tcti_Spi_Helper_Init(ffi.NULL, size, self._callbacks)) self._tcti_mem = ffi.new(f"uint8_t [{size[0]}]") self._opaque_tcti_ctx = ffi.cast("TSS2_TCTI_CONTEXT *", self._tcti_mem) try: self._clear_exceptions() _chkrc( lib.Tss2_Tcti_Spi_Helper_Init( self._opaque_tcti_ctx, size, self._callbacks ) ) except Exception as e: e = self._get_current_exception(e) self._clear_exceptions() raise e super().__init__(self._opaque_tcti_ctx) @property def waitstate(self): """Gets the wait state property. Returns(bool): True if this TCTI implements wait states, false otherwise. """ return self._with_wait_state @staticmethod def _cffi_cast(userdata): return ffi.from_handle(userdata)
[docs] def on_sleep_ms(self, milliseconds: int) -> None: """Sleeps for a specified amount of time in millisecons. This callback is REQUIRED. Args: milliseconds(int): The time to sleep. Raises: Exception: Implementations are free to raise any Exception. Exceptions are retained across the native boundary. """ pass
[docs] def on_start_timeout(self, milliseconds: int) -> None: """Called when a timeout is occurring with the sleep duration in millisecons. This callback is REQUIRED. Args: milliseconds(int): The time to sleep. Raises: Exception: Implementations are free to raise any Exception. Exceptions are retained across the native boundary. """ pass
[docs] def on_timeout_expired(self) -> bool: """Called to determine if a timeout is expired. This callback is REQUIRED. No errors may occur across this boundary. """ pass
[docs] def on_spi_transfer(self, data_in: bytes) -> bytes: """Called to transfer data across the SPI bus. This callback is REQUIRED. Args: data_in(bytes): The data to send. Returns(bytes): The bytes to send. Raises: Exception: Implementations are free to raise any Exception. Exceptions are retained across the native boundary. """ pass
[docs] def on_finalize(self) -> None: """Called when the TCTI is finalized. This callback is OPTIONAL. Raises: Exception: Implementations are free to raise any Exception. Exceptions are retained across the native boundary. """ pass
[docs] def on_spi_acquire(self) -> None: """Called when the SPI bus needs to be acquired for wait states. This callback is REQUIRED for WAIT STATES. No errors may occur across this boundary. """ pass
[docs] def on_spi_release(self) -> None: """Called when the SPI bus needs to be released for wait states. This callback is REQUIRED for WAIT STATES. Raises: Exception: Implementations are free to raise any Exception. Exceptions are retained across the native boundary. """ pass