academy.exchange.cloud¶
HttpExchangeFactory
¶
HttpExchangeFactory(
url: str = DEFAULT_EXCHANGE_URL,
auth_method: Literal["globus"] | None = None,
additional_headers: dict[str, str] | None = None,
request_timeout_s: float = 60,
ssl_verify: bool | None = None,
)
Bases: ExchangeFactory[HttpExchangeTransport]
Http exchange client factory.
Parameters:
-
url(str, default:DEFAULT_EXCHANGE_URL) –Address of HTTP exchange. Defaults to the Academy-hosted exchange
-
auth_method(Literal['globus'] | None, default:None) –Method to get authorization headers
-
additional_headers(dict[str, str] | None, default:None) –Any other information necessary to communicate with the exchange. Used for passing the Globus bearer token
-
ssl_verify(bool | None, default:None) –Same as requests.Session.verify. If the server's TLS certificate should be validated. Should be true if using HTTPS Only set to false for testing or local development.
Source code in academy/exchange/cloud/client.py
create_agent_client
async
¶
create_agent_client(
registration: AgentRegistration[AgentT],
request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]
Create a new agent exchange client.
An agent must be registered with the exchange before an exchange client can be created. For example:
factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)
Parameters:
-
registration(AgentRegistration[AgentT]) –Registration information returned by the exchange.
-
request_handler(RequestHandler[RequestT_co]) –Agent request message handler.
Returns:
-
AgentExchangeClient[AgentT, ExchangeTransportT]–Agent exchange client.
Raises:
-
BadEntityIdError–If an agent with
registration.agent_idis not already registered with the exchange.
Source code in academy/exchange/factory.py
create_user_client
async
¶
create_user_client(
*, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]
Create a new user in the exchange and associated client.
Parameters:
-
name(str | None, default:None) –Display name of the client on the exchange.
-
start_listener(bool, default:True) –Start a message listener thread.
Returns:
-
UserExchangeClient[ExchangeTransportT]–User exchange client.
Source code in academy/exchange/factory.py
HttpExchangeTransport
¶
HttpExchangeTransport(
mailbox_id: EntityId,
session: ClientSession,
connection_info: _HttpConnectionInfo,
)
Bases: ExchangeTransportMixin, NoPickleMixin
Http exchange client.
Parameters:
-
mailbox_id(EntityId) –Identifier of the mailbox on the exchange. If there is not an id provided, the exchange will create a new client mailbox.
-
session(ClientSession) –Http session.
-
connection_info(_HttpConnectionInfo) –Exchange connection info.
Source code in academy/exchange/cloud/client.py
new
async
classmethod
¶
new(
*,
connection_info: _HttpConnectionInfo,
mailbox_id: EntityId | None = None,
name: str | None = None
) -> Self
Instantiate a new transport.
Parameters:
-
connection_info(_HttpConnectionInfo) –Exchange connection information.
-
mailbox_id(EntityId | None, default:None) –Bind the transport to the specific mailbox. If
None, a new user entity will be registered and the transport will be bound to that mailbox. -
name(str | None, default:None) –Display name of the registered entity if
mailbox_idisNone.
Returns:
-
Self–An instantiated transport bound to a specific mailbox.
Source code in academy/exchange/cloud/client.py
GlobusExchangeFactory
¶
GlobusExchangeFactory(
project_id: UUID,
client_params: dict[str, Any] | None = None,
request_timeout_s: float = 60,
)
Bases: ExchangeFactory[GlobusExchangeTransport]
Globus exchange client factory.
Parameters:
-
project_id(UUID) –Project to create new clients under. Must be able to authenticate as a administrator.
-
client_params(dict[str, Any] | None, default:None) –Additional parameters for globus client.
-
request_timeout_s(float, default:60) –Maximum length of receive/listen requests to the exchange.
Source code in academy/exchange/cloud/globus.py
create_agent_client
async
¶
create_agent_client(
registration: AgentRegistration[AgentT],
request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]
Create a new agent exchange client.
An agent must be registered with the exchange before an exchange client can be created. For example:
factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)
Parameters:
-
registration(AgentRegistration[AgentT]) –Registration information returned by the exchange.
-
request_handler(RequestHandler[RequestT_co]) –Agent request message handler.
Returns:
-
AgentExchangeClient[AgentT, ExchangeTransportT]–Agent exchange client.
Raises:
-
BadEntityIdError–If an agent with
registration.agent_idis not already registered with the exchange.
Source code in academy/exchange/factory.py
create_user_client
async
¶
create_user_client(
*, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]
Create a new user in the exchange and associated client.
Parameters:
-
name(str | None, default:None) –Display name of the client on the exchange.
-
start_listener(bool, default:True) –Start a message listener thread.
Returns:
-
UserExchangeClient[ExchangeTransportT]–User exchange client.
Source code in academy/exchange/factory.py
GlobusExchangeTransport
¶
GlobusExchangeTransport(
mailbox_id: EntityId,
*,
connection_info: _AcademyConnectionInfo,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None
)
Bases: ExchangeTransportMixin, NoPickleMixin
Globus exchange client.
Parameters:
-
mailbox_id(EntityId) –Identifier of the mailbox on the exchange. If there is not an id provided, the exchange will create a new client mailbox.
-
connection_info(_AcademyConnectionInfo) –Project id, client parameters and other information about the connection to the service.
-
app(GlobusApp | None, default:None) –For user authorization through token retrieval.
-
authorizer(GlobusAuthorizer | None, default:None) –For service authorization through token retrieval.
Source code in academy/exchange/cloud/globus.py
exchange_client
property
¶
exchange_client: AcademyGlobusClient
A thread local copy of the Globus AuthClient.
new
async
classmethod
¶
new(
*,
connection_info: _AcademyConnectionInfo,
app: GlobusApp | None = None,
authorizer: GlobusAuthorizer | None = None,
mailbox_id: EntityId | None = None,
name: str | None = None
) -> Self
Instantiate a new transport.
Parameters:
-
connection_info(_AcademyConnectionInfo) –Project id, client parameters and other information about the connection to the service.
-
app(GlobusApp | None, default:None) –For user authorization through token retrieval
-
authorizer(GlobusAuthorizer | None, default:None) –For service authorization through token retrieval
-
mailbox_id(EntityId | None, default:None) –Bind the transport to the specific mailbox. If
None, a new user entity will be registered and the transport will be bound to that mailbox. -
name(str | None, default:None) –Display name of the registered entity if
mailbox_idisNone.
Returns:
-
Self–An instantiated transport bound to a specific mailbox.
Source code in academy/exchange/cloud/globus.py
register_agents
async
¶
register_agents(
agents: list[tuple[type[AgentT], str | None]],
) -> list[GlobusAgentRegistration[AgentT]]
Register multiple agents with a single auth prompt.
Parameters:
Returns:
-
list[GlobusAgentRegistration[AgentT]]–List of agent registrations in order of input.
Source code in academy/exchange/cloud/globus.py
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 | |
spawn_http_exchange
¶
spawn_http_exchange(
host: str = "0.0.0.0",
port: int = 5463,
*,
level: int | str = WARNING,
timeout: float | None = None
) -> Generator[HttpExchangeFactory]
Context manager that spawns an HTTP exchange in a subprocess.
This function spawns a new process (rather than forking) and wait to
return until a connection with the exchange has been established.
When exiting the context manager, SIGINT will be sent to the exchange
process. If the process does not exit within 5 seconds, it will be
killed.
Warning
The exclusion of authentication and ssl configuration is intentional. This method should only be used for temporary exchanges in trusted environments (such as a fully firewalled individual workstation with no other user access).
Parameters:
-
host(str, default:'0.0.0.0') –Host the exchange should listen on.
-
port(int, default:5463) –Port the exchange should listen on.
-
level(int | str, default:WARNING) –Logging level.
-
timeout(float | None, default:None) –Connection timeout when waiting for exchange to start.
Returns:
-
Generator[HttpExchangeFactory]–Exchange interface connected to the spawned exchange.