Skip to content

academy.message

ActionRequest pydantic-model

Bases: BaseModel

Agent action request message.

Warning

The positional and keywords arguments for the invoked action are serialized using serialization strategy when the message is serialized to JSON. This can have non-trivial time and space overheads for large arguments.

Config:

  • default: DEFAULT_MUTABLE_CONFIG

Fields:

action pydantic-field

action: str

Name of the requested action.

serialization pydantic-field

serialization: SerializationStrategy

Serialization strategy used to send args

result_serialization pydantic-field

result_serialization: SerializationStrategy | None = None

Requested serialization of results. If none or empty, use the same method the args were serialized with.

exception_serialization pydantic-field

exception_serialization: SerializationStrategy | None = None

Requested serialization of exceptions. If none or empty, use the same method for exceptions as for results.

pargs pydantic-field

pargs: SkipValidation[tuple[Any, ...]]

Positional arguments to the action method.

kargs pydantic-field

Keyword arguments to the action method.

get_args

get_args() -> tuple[Any, ...]

Get the positional arguments.

Lazily deserializes and returns the positional arguments. Caches the result to avoid redundant decoding.

Returns:

  • tuple[Any, ...]

    The deserialized tuple of positional arguments.

Source code in academy/message.py
def get_args(self) -> tuple[Any, ...]:
    """Get the positional arguments.

    Lazily deserializes and returns the positional arguments.
    Caches the result to avoid redundant decoding.

    Returns:
        The deserialized tuple of positional arguments.
    """
    if isinstance(self.pargs, str):
        self.pargs = deserialize(self.pargs, self.serialization)
    return self.pargs

get_kwargs

get_kwargs() -> dict[str, Any]

Get the keyword arguments.

Lazily deserializes and returns the keyword arguments. Caches the result to avoid redundant decoding.

Returns:

  • dict[str, Any]

    The deserialized dictionary of keyword arguments.

Source code in academy/message.py
def get_kwargs(self) -> dict[str, Any]:
    """Get the keyword arguments.

    Lazily deserializes and returns the keyword arguments.
    Caches the result to avoid redundant decoding.

    Returns:
        The deserialized dictionary of keyword arguments.
    """
    if isinstance(self.kargs, str):
        self.kargs = deserialize(self.kargs, self.serialization)
    return self.kargs

PingRequest pydantic-model

Bases: BaseModel

Agent ping request message.

Config:

  • default: DEFAULT_FROZEN_CONFIG

Fields:

CancelRequest pydantic-model

Bases: BaseModel

Cancel running action.

Config:

  • default: DEFAULT_FROZEN_CONFIG

Fields:

ShutdownRequest pydantic-model

Bases: BaseModel

Agent shutdown request message.

Config:

  • default: DEFAULT_FROZEN_CONFIG

Fields:

terminate pydantic-field

terminate: bool | None = None

Override the termination behavior of the agent.

ActionResponse pydantic-model

Bases: BaseModel

Agent action response message.

Warning

The result is serialized using serialization when the response is serialized to JSON. This can have non-trivial time and space overheads for large results.

Config:

  • default: DEFAULT_MUTABLE_CONFIG

Fields:

result pydantic-field

Result of the action, if successful.

serialization pydantic-field

serialization: SerializationStrategy

Serialization strategy used to send result.

get_result

get_result() -> Any

Get the result.

Lazily deserializes and returns the result of the action. Caches the result to avoid redundant decoding.

Returns:

  • Any

    The deserialized result of the action.

Source code in academy/message.py
def get_result(self) -> Any:
    """Get the result.

    Lazily deserializes and returns the result of the action.
    Caches the result to avoid redundant decoding.

    Returns:
        The deserialized result of the action.
    """
    if (
        isinstance(self.result, list)
        and len(self.result) == 2  # noqa PLR2004
        and self.result[0] == '__serialized__'
    ):
        self.result = deserialize(self.result[1], self.serialization)
    return self.result

ErrorResponse

Bases: Protocol

Protocol for all error messages.

get_exception

get_exception() -> Exception

Get the exception.

Returns:

Source code in academy/message.py
def get_exception(self) -> Exception:
    """Get the exception.

    Returns:
        The exception.
    """
    ...

ErrorCode

Bases: IntEnum

Error codes returned by requests.

These error codes allow us to return errors without serialization.

AcademyErrorResponse pydantic-model

Bases: BaseModel

Error response created by Academy.

Fields:

error_code pydantic-field

error_code: ErrorCode

Error code

mailbox_id pydantic-field

mailbox_id: EntityId | None = None

Mailbox id if necessary for the error.

get_exception

get_exception() -> Exception

Get the exception.

Returns:

Source code in academy/message.py
def get_exception(self) -> Exception:
    """Get the exception.

    Returns:
        The exception.
    """
    match self.error_code:
        case ErrorCode.MAILBOX_TERMINATED:
            assert self.mailbox_id is not None, (
                'Improper error response while decoding exception. '
                'Missing mailbox_id field in MailboxTerminatedError.'
            )
            return MailboxTerminatedError(self.mailbox_id)
        case ErrorCode.PING_CANCELLED:
            return PingCancelledError()
        case ErrorCode.ACTION_INVALID_STATE:
            return ActionInvalidStateError()
        case ErrorCode.ACTION_CANCELLED:
            return ActionCancelledError()
        case ErrorCode.INVALID_CLIENT:
            return TypeError(f'{self.mailbox_id} cannot fulfill requests.')
        case ErrorCode.INCOMPATIBLE_PROTOCOL:
            return IncompatibleNetworkProtocolError(None, PROTOCOL_VERSION)
    raise AssertionError('Unreachable.')

UserErrorResponse pydantic-model

Bases: BaseModel

Error response message.

Contains the exception raised by a failed request.

Config:

  • default: DEFAULT_MUTABLE_CONFIG

Fields:

serialization pydantic-field

serialization: SerializationStrategy

Serialization strategy used to send exception.

exception pydantic-field

Exception of the failed request.

get_exception

get_exception() -> Exception

Get the exception.

Lazily deserializes and returns the exception object. Caches the result to avoid redundant decoding.

Returns:

Source code in academy/message.py
def get_exception(self) -> Exception:
    """Get the exception.

    Lazily deserializes and returns the exception object.
    Caches the result to avoid redundant decoding.

    Returns:
        The deserialized exception.
    """
    if isinstance(self.exception, str):
        self.exception = deserialize(self.exception, self.serialization)
    return self.exception

SuccessResponse pydantic-model

Bases: BaseModel

Success response message.

Config:

  • default: DEFAULT_FROZEN_CONFIG

Fields:

  • kind (Literal['success-response'])

Header pydantic-model

Bases: BaseModel

Message metadata header.

Contains information about the sender, receiver, and message context.

Config:

  • default: DEFAULT_FROZEN_CONFIG

Fields:

src pydantic-field

src: EntityId

Message source ID.

dest pydantic-field

dest: EntityId

Message destination ID.

tag pydantic-field

tag: UUID

Unique message tag used to match requests and responses.

label pydantic-field

label: UUID | None = None

Optional label used to disambiguate response messages when multiple objects (i.e., handles) share the same mailbox. This is a different usage from the tag.

protocol_version pydantic-field

protocol_version: str | None = str(PROTOCOL_VERSION)

Version of the academy protocol used. Messages within a major version are intended to be mutually compatible, while minor version changes might introduce optional fields or remove existing fields without prohibiting their existence.

create_response_header

create_response_header() -> Self

Create a response header based on the current request header.

Swaps the source and destination, retains the tag and label, and sets the kind to 'response'.

Returns:

  • Self

    A new header instance with reversed roles.

Raises:

  • ValueError

    If the current header is already a response.

Source code in academy/message.py
def create_response_header(self) -> Self:
    """Create a response header based on the current request header.

    Swaps the source and destination, retains the tag and label,
    and sets the kind to 'response'.

    Returns:
        A new header instance with reversed roles.

    Raises:
        ValueError: If the current header is already a response.
    """
    if self.kind == 'response':
        raise ValueError(
            'Cannot create response header from another response header',
        )
    return type(self)(
        tag=self.tag,
        src=self.dest,
        dest=self.src,
        label=self.label,
        kind='response',
    )

Message pydantic-model

Bases: BaseModel, Generic[BodyT]

A complete message with header and body.

Wraps a header and a typed request/response body. Supports lazy deserialization of message bodies, metadata access, and convenient construction.

Note

The body value is ignored when testing equality or hashing an instance because the body value could be in either a serialized or deserialized state until get_body() is called.

Config:

  • default: DEFAULT_MUTABLE_CONFIG

Fields:

src property

src: EntityId

Message source ID.

dest property

dest: EntityId

Message destination ID.

tag property

tag: UUID

Message tag.

label property

label: UUID | None

Message label.

protocol_version property

protocol_version: str | None

Message protocol version.

create classmethod

create(
    src: EntityId,
    dest: EntityId,
    body: BodyT,
    *,
    label: UUID | None = None,
    tag: UUID | None = None
) -> Message[BodyT]

Create a new message with the specified header and body.

Parameters:

  • src (EntityId) –

    Source entity ID.

  • dest (EntityId) –

    Destination entity ID.

  • body (BodyT) –

    Message body.

  • label (UUID | None, default: None ) –

    Optional label for disambiguation.

  • tag (UUID | None, default: None ) –

    Optional tag for relating responses to requests.

Returns:

  • Message[BodyT]

    A new message instance.

Source code in academy/message.py
@classmethod
def create(
    cls,
    src: EntityId,
    dest: EntityId,
    body: BodyT,
    *,
    label: uuid.UUID | None = None,
    tag: uuid.UUID | None = None,
) -> Message[BodyT]:
    """Create a new message with the specified header and body.

    Args:
        src: Source entity ID.
        dest: Destination entity ID.
        body: Message body.
        label: Optional label for disambiguation.
        tag: Optional tag for relating responses to requests.

    Returns:
        A new message instance.
    """
    if isinstance(body, get_args(Request)):
        kind = 'request'
    elif isinstance(body, get_args(Response)):
        kind = 'response'
    else:
        raise AssertionError('Unreachable.')

    if tag is None:
        tag = uuid.uuid4()

    header = Header(src=src, dest=dest, label=label, kind=kind, tag=tag)
    request: Message[BodyT] = Message(header=header, body=body)
    return request

create_response

create_response(body: ResponseT) -> Message[ResponseT]

Create a response message from this request message.

Parameters:

  • body (ResponseT) –

    Response message body.

Returns:

  • Message[ResponseT]

    A new response message instance.

Raises:

  • ValueError

    If this message is already a response.

Source code in academy/message.py
def create_response(self, body: ResponseT) -> Message[ResponseT]:
    """Create a response message from this request message.

    Args:
        body: Response message body.

    Returns:
        A new response message instance.

    Raises:
        ValueError: If this message is already a response.
    """
    header = self.header.create_response_header()
    response: Message[ResponseT] = Message(header=header, body=body)
    return response

get_body

get_body() -> BodyT

Return the message body, deserializing if needed.

Lazily deserializes and returns the body object. Caches the body to avoid redundant decoding.

Returns:

  • BodyT

    The deserialized body.

Source code in academy/message.py
def get_body(self) -> BodyT:
    """Return the message body, deserializing if needed.

    Lazily deserializes and returns the body object.
    Caches the body to avoid redundant decoding.

    Returns:
        The deserialized body.
    """
    if isinstance(self.body, get_args(Body)):
        return self.body

    if not check_version(self.protocol_version):
        # If we are deserializing from another protocol version,
        # we expect the validation to fail, but we would like to
        # raise a more informative error.
        raise IncompatibleNetworkProtocolError(
            self.protocol_version,
            PROTOCOL_VERSION,
        )

    adapter: TypeAdapter[BodyT] = TypeAdapter(Body)
    body = (
        adapter.validate_json(self.body)
        if isinstance(self.body, str)
        else adapter.validate_python(self.body)
    )
    self.body = body
    return self.body

is_request

is_request() -> bool

Check if the message is a request.

Source code in academy/message.py
def is_request(self) -> bool:
    """Check if the message is a request."""
    return self.header.kind == 'request'

is_response

is_response() -> bool

Check if the message is a response.

Source code in academy/message.py
def is_response(self) -> bool:
    """Check if the message is a response."""
    return self.header.kind == 'response'

model_deserialize classmethod

model_deserialize(data: bytes) -> Message[BodyT]

Deserialize a message from bytes.

Parameters:

  • data (bytes) –

    The serialized message as bytes.

Returns:

  • Message[BodyT]

    The deserialized message instance.

Source code in academy/message.py
@classmethod
def model_deserialize(cls, data: bytes) -> Message[BodyT]:
    """Deserialize a message from bytes.

    Args:
        data: The serialized message as bytes.

    Returns:
        The deserialized message instance.
    """
    message = cls.model_validate_json(data.decode('utf-8'))
    return message

model_serialize

model_serialize() -> bytes

Serialize the message to bytes.

Returns:

  • bytes

    The serialized message as bytes.

Source code in academy/message.py
def model_serialize(self) -> bytes:
    """Serialize the message to bytes.

    Returns:
        The serialized message as bytes.
    """
    return self.model_dump_json().encode('utf-8')

log_extra

log_extra() -> dict[str, object]

Returns extra info useful in logs about this Message.

Source code in academy/message.py
def log_extra(self) -> dict[str, object]:
    """Returns extra info useful in logs about this Message."""
    return {
        'academy.message_type': type(self.body).__name__,
        'academy.src': self.src,
        'academy.dest': self.dest,
        'academy.message_tag': self.tag,
        'academy.message_label': self.label,
    }

check_version

check_version(version: str | None) -> bool

Checks if a protocol_version is compatible.

Messages within a major version are intended to be mutually compatible, while minor version changes might introduce optional fields or remove existing fields without prohibiting their existence. Local tags may be added for branches/forks and must match exactly.

Parameters:

  • version (str | None) –

    The version string to check

Returns:

  • bool

    True if the version is compatible.

Source code in academy/message.py
def check_version(version: str | None) -> bool:
    """Checks if a `protocol_version` is compatible.

    Messages within a major version are intended to be mutually
    compatible, while minor version changes might introduce
    optional fields or remove existing fields without prohibiting
    their existence. Local tags may be added for branches/forks
    and must match exactly.

    Args:
        version: The version string to check

    Returns:
        True if the version is compatible.
    """
    return (
        version is not None
        and parse(version).major == PROTOCOL_VERSION.major
        and parse(version).local == PROTOCOL_VERSION.local
    )