Skip to content

academy.exchange.cloud.globus

AcademyAPIError

Bases: GlobusAPIError

Error class to represent error responses from Academy.

AcademyGlobusClient

Bases: BaseClient

A globus service client to make requests to hosted exchange.

The GlobusExchangeClient acts as a wrapper through which authenticated requests are issued. The exchange automatically handles things like retrying, refreshing tokens, and exponential backoff. The BaseClient is implemented using requests, so calls are synchronous, and must be run within a thread when using asyncio.

GlobusAgentRegistration dataclass

GlobusAgentRegistration(
    agent_id: AgentId[AgentT],
    client_id: UUID,
    token: str,
    secret: str,
)

Bases: Generic[AgentT]

Agent registration for hosted globus exchange.

agent_id instance-attribute

agent_id: AgentId[AgentT]

Unique identifier for the agent created by the exchange.

client_id instance-attribute

client_id: UUID

Client ID of Globus resource server.

Each agent is a resources server. This allows the agent to exchange delegated tokens to act on the exchange on behalf of the client, and to create it's own delegated tokens so other agents can act on its behalf.

token instance-attribute

token: str

Auth. token provided by launching client (user or another agent).

secret instance-attribute

secret: str

Secret for agent to use to authenticate itself with Globus Auth.

Agents are created as hybrid resource servers within the Globus ecosystem. This allows them to use delegated tokens, but also requires the them to be able to store secrets. In order to support the launching of agents, we pass a secret as part of the agent initialization. We assume the security of the launching channel (typically Globus Compute in the Academy ecosystem).

GlobusExchangeTransport

GlobusExchangeTransport(
    mailbox_id: EntityId,
    *,
    project_id: UUID,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
    client_params: dict[str, Any] | None = None
)

Bases: ExchangeTransportMixin, NoPickleMixin

Globus exchange client.

Parameters:

  • mailbox_id (EntityId) –

    Identifier of the mailbox on the exchange. If there is not an id provided, the exchange will create a new client mailbox.

  • project_id (UUID) –

    Globus Identifier of project to create agents under.

  • app (GlobusApp | None, default: None ) –

    For user authorization through token retrieval.

  • authorizer (GlobusAuthorizer | None, default: None ) –

    For service authorization through token retrieval.

  • client_params (dict[str, Any] | None, default: None ) –

    Additional parameters for globus client.

Source code in academy/exchange/cloud/globus.py
def __init__(
    self,
    mailbox_id: EntityId,
    *,
    project_id: uuid.UUID,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
    client_params: dict[str, Any] | None = None,
) -> None:
    self._mailbox_id = mailbox_id
    self.project = project_id
    self.child_clients: list[uuid.UUID] = []
    self.client_params = client_params or {}

    self.login_time = datetime.min
    self._app_lock = threading.Lock()
    self._app = app
    self._authorizer = authorizer
    self._local_data = threading.local()
    self.executor = ThreadPoolExecutor(
        thread_name_prefix='exchange-globus-thread',
    )

exchange_client property

exchange_client: AcademyGlobusClient

A thread local copy of the Globus AuthClient.

auth_client property

auth_client: AuthClient

A thread local copy of the Globus AuthClient.

new async classmethod

new(
    *,
    project_id: UUID,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    client_params: dict[str, Any] | None = None
) -> Self

Instantiate a new transport.

Parameters:

  • project_id (UUID) –

    Globus Identifier of project to create agents under.

  • app (GlobusApp | None, default: None ) –

    For user authorization through token retrieval

  • authorizer (GlobusAuthorizer | None, default: None ) –

    For service authorization through token retrieval

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the registered entity if mailbox_id is None.

  • client_params (dict[str, Any] | None, default: None ) –

    Additional parameters for globus client.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Source code in academy/exchange/cloud/globus.py
@classmethod
async def new(  # noqa: PLR0913
    cls,
    *,
    project_id: uuid.UUID,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
    client_params: dict[str, Any] | None = None,
) -> Self:
    """Instantiate a new transport.

    Args:
        project_id: Globus Identifier of project to create agents under.
        app: For user authorization through token retrieval
        authorizer: For service authorization through token retrieval
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the registered entity if `mailbox_id` is
            `None`.
        client_params: Additional parameters for globus client.

    Returns:
        An instantiated transport bound to a specific mailbox.
    """
    loop = asyncio.get_running_loop()

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        client = cls(
            mailbox_id,
            project_id=project_id,
            app=app,
            authorizer=authorizer,
            client_params=client_params,
        )
        await loop.run_in_executor(
            client.executor,
            client._register_client,
        )
        logger.info('Registered %s in exchange', mailbox_id)
        return client

    return cls(
        mailbox_id,
        project_id=project_id,
        app=app,
        authorizer=authorizer,
        client_params=client_params,
    )

GlobusExchangeFactory

GlobusExchangeFactory(
    project_id: UUID,
    client_params: dict[str, Any] | None = None,
)

Bases: ExchangeFactory[GlobusExchangeTransport]

Globus exchange client factory.

Parameters:

  • project_id (UUID) –

    Project to create new clients under. Must be able to authenticate as a administrator.

  • client_params (dict[str, Any] | None, default: None ) –

    Additional parameters for globus client.

Source code in academy/exchange/cloud/globus.py
def __init__(
    self,
    project_id: uuid.UUID,
    client_params: dict[str, Any] | None = None,
) -> None:
    self.project = project_id
    self.client_params = client_params

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )