academy.exchange.transport¶
AgentRegistrationT
module-attribute
¶
AgentRegistrationT = TypeVar(
"AgentRegistrationT", bound=AgentRegistration[Any]
)
Type variable bound AgentRegistration.
ExchangeTransportT
module-attribute
¶
ExchangeTransportT = TypeVar(
"ExchangeTransportT", bound=ExchangeTransport[Any]
)
Type variable bound ExchangeTransport.
MailboxStatus
¶
AgentRegistration
¶
ExchangeTransport
¶
Bases: Protocol[AgentRegistrationT_co]
Low-level exchange communicator.
A message exchange hosts mailboxes for each entity (i.e., agent or user) in a multi-agent system. This transport protocol defines mechanisms for entity management (e.g., registration, discovery, status, termination) and for sending/receiving messages from a mailbox. As such, each transport instance is "bound" to a specific mailbox in the exchange.
Warning
A specific exchange transport should not be replicated because multiple client instances receiving from the same mailbox produces undefined agent.
close
async
¶
Close the exchange client.
Note
This does not alter the state of the mailbox this client is bound to. I.e., the mailbox will not be terminated.
discover
async
¶
Discover peer agents with a given agent.
Warning
Implementations of this method are often O(n) and scan the types of all agents registered to the exchange.
Parameters:
-
agent(type[Agent]) –Agent type of interest.
-
allow_subclasses(bool, default:True) –Return agents implementing subclasses of the agent.
Returns:
Raises:
-
ExchangeError–Error returned by the exchange.
Source code in academy/exchange/transport.py
factory
¶
factory() -> ExchangeFactory[Self]
recv
async
¶
Receive the next message sent to the mailbox.
This blocks until the next message is received, there is a timeout, or the mailbox is terminated.
Parameters:
-
timeout(float | None, default:None) –Optional timeout in seconds to wait for the next message. If
None, the default, block forever until the next message or the mailbox is closed.
Raises:
-
MailboxTerminatedError–If the mailbox was closed.
-
ExchangeError–Error returned by the exchange.
-
TimeoutError–If a
timeoutwas specified and exceeded.
Source code in academy/exchange/transport.py
register_agent
async
¶
Register a new agent and associated mailbox with the exchange.
Parameters:
-
agent(type[AgentT]) –Agent type of the agent.
-
name(str | None, default:None) –Optional display name for the agent.
Returns:
-
AgentRegistrationT_co–Agent registration info.
Raises:
-
ExchangeError–Error returned by the exchange.
Source code in academy/exchange/transport.py
send
async
¶
Send a message to a mailbox.
Parameters:
Raises:
-
BadEntityIdError–If a mailbox for
message.destdoes not exist. -
MailboxTerminatedError–If the mailbox was closed.
-
ExchangeError–Error returned by the exchange.
Source code in academy/exchange/transport.py
status
async
¶
status(uid: EntityId) -> MailboxStatus
Check the status of a mailbox in the exchange.
Parameters:
-
uid(EntityId) –Entity identifier of the mailbox to check.
Raises:
-
ExchangeError–Error returned by the exchange.
Source code in academy/exchange/transport.py
terminate
async
¶
terminate(uid: EntityId) -> None
Terminate a mailbox in the exchange.
Once an entity's mailbox is terminated:
- All request messages in the mailbox will be replied to with a
MailboxTerminatedError. - All calls to
recv()will raise aMailboxTerminatedError. - All attempts to
send()to this mailbox by other entities will raise aMailboxTerminatedError.
Note
This method is a no-op if the mailbox does not exist.
Parameters:
-
uid(EntityId) –Entity identifier of the mailbox to close.
Raises:
-
ExchangeError–Error returned by the exchange.
Source code in academy/exchange/transport.py
ExchangeTransportMixin
¶
Magic method mixin for exchange transport implementations.
Adds __repr__, __str__, and context manager support.