Skip to content

academy.exchange.cloud.globus

AcademyAPIError

Bases: GlobusAPIError

Error class to represent error responses from Academy.

AcademyGlobusClient

Bases: BaseClient

A globus service client to make requests to hosted exchange.

The GlobusExchangeClient acts as a wrapper through which authenticated requests are issued. The exchange automatically handles things like retrying, refreshing tokens, and exponential backoff. The BaseClient is implemented using requests, so calls are synchronous, and must be run within a thread when using asyncio.

GlobusAgentRegistration

Bases: BaseModel, Generic[AgentT]

Agent registration for hosted globus exchange.

agent_id instance-attribute

agent_id: AgentId[AgentT]

Unique identifier for the agent created by the exchange.

client_id instance-attribute

client_id: UUID

Client ID of Globus resource server.

Each agent is a resources server. This allows the agent to exchange delegated tokens to act on the exchange on behalf of the client, and to create it's own delegated tokens so other agents can act on its behalf.

token instance-attribute

token: str

Auth. token provided by launching client (user or another agent).

secret instance-attribute

secret: str

Secret for agent to use to authenticate itself with Globus Auth.

Agents are created as hybrid resource servers within the Globus ecosystem. This allows them to use delegated tokens, but also requires the them to be able to store secrets. In order to support the launching of agents, we pass a secret as part of the agent initialization. We assume the security of the launching channel (typically Globus Compute in the Academy ecosystem).

GlobusExchangeTransport

GlobusExchangeTransport(
    mailbox_id: EntityId,
    *,
    connection_info: _AcademyConnectionInfo,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None
)

Bases: ExchangeTransportMixin, NoPickleMixin

Globus exchange client.

Parameters:

  • mailbox_id (EntityId) –

    Identifier of the mailbox on the exchange. If there is not an id provided, the exchange will create a new client mailbox.

  • connection_info (_AcademyConnectionInfo) –

    Project id, client parameters and other information about the connection to the service.

  • app (GlobusApp | None, default: None ) –

    For user authorization through token retrieval.

  • authorizer (GlobusAuthorizer | None, default: None ) –

    For service authorization through token retrieval.

Source code in academy/exchange/cloud/globus.py
def __init__(
    self,
    mailbox_id: EntityId,
    *,
    connection_info: _AcademyConnectionInfo,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
) -> None:
    self._mailbox_id = mailbox_id
    self.project = connection_info.project_id
    self.child_clients: list[uuid.UUID] = []
    self.client_params = connection_info.client_params or {}
    self.request_timeout_s = connection_info.request_timeout_s

    self.login_time = datetime.min
    self._app = app
    self._authorizer = authorizer
    self._auth_lock = threading.Lock()
    self._local_data = threading.local()
    self.executor = ThreadPoolExecutor(
        thread_name_prefix='exchange-globus-thread',
    )

exchange_client property

exchange_client: AcademyGlobusClient

A thread local copy of the Globus AuthClient.

auth_client property

auth_client: AuthClient

A thread local copy of the Globus AuthClient.

new async classmethod

new(
    *,
    connection_info: _AcademyConnectionInfo,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None
) -> Self

Instantiate a new transport.

Parameters:

  • connection_info (_AcademyConnectionInfo) –

    Project id, client parameters and other information about the connection to the service.

  • app (GlobusApp | None, default: None ) –

    For user authorization through token retrieval

  • authorizer (GlobusAuthorizer | None, default: None ) –

    For service authorization through token retrieval

  • mailbox_id (EntityId | None, default: None ) –

    Bind the transport to the specific mailbox. If None, a new user entity will be registered and the transport will be bound to that mailbox.

  • name (str | None, default: None ) –

    Display name of the registered entity if mailbox_id is None.

Returns:

  • Self

    An instantiated transport bound to a specific mailbox.

Source code in academy/exchange/cloud/globus.py
@classmethod
async def new(
    cls,
    *,
    connection_info: _AcademyConnectionInfo,
    app: GlobusApp | None = None,
    authorizer: GlobusAuthorizer | None = None,
    mailbox_id: EntityId | None = None,
    name: str | None = None,
) -> Self:
    """Instantiate a new transport.

    Args:
        connection_info: Project id, client parameters and other
            information about the connection to the service.
        app: For user authorization through token retrieval
        authorizer: For service authorization through token retrieval
        mailbox_id: Bind the transport to the specific mailbox. If `None`,
            a new user entity will be registered and the transport will be
            bound to that mailbox.
        name: Display name of the registered entity if `mailbox_id` is
            `None`.

    Returns:
        An instantiated transport bound to a specific mailbox.
    """
    loop = asyncio.get_running_loop()

    if mailbox_id is None:
        mailbox_id = UserId.new(name=name)
        client = cls(
            mailbox_id,
            connection_info=connection_info,
            app=app,
            authorizer=authorizer,
        )
        await loop.run_in_executor(
            client.executor,
            client._register_client,
        )
        logger.info(
            'Registered %s in exchange',
            mailbox_id,
            extra={'academy.mailbox_id': mailbox_id},
        )
        return client

    return cls(
        mailbox_id,
        connection_info=connection_info,
        app=app,
        authorizer=authorizer,
    )

register_agents async

register_agents(
    agents: list[tuple[type[AgentT], str | None]],
) -> list[GlobusAgentRegistration[AgentT]]

Register multiple agents with a single auth prompt.

Parameters:

  • agents (list[tuple[type[AgentT], str | None]]) –

    List of (agent_type, name) pairs to register.

Returns:

Source code in academy/exchange/cloud/globus.py
async def register_agents(
    self,
    agents: list[tuple[type[AgentT], str | None]],
) -> list[GlobusAgentRegistration[AgentT]]:
    """Register multiple agents with a single auth prompt.

    Args:
        agents: List of (agent_type, name) pairs to register.

    Returns:
        List of agent registrations in order of input.
    """
    loop = asyncio.get_running_loop()

    # Phase 1: create auth entities concurrently.
    # _prepare_registration no longer mutates shared GlobusApp
    # state, and the auth_client property serializes interactive
    # login via _auth_lock, so parallel dispatch is safe.
    aids: list[AgentId[AgentT]] = [
        AgentId.new(name=name) for _, name in agents
    ]
    prepared: list[_PendingRegistration[AgentT]] = list(
        await asyncio.gather(
            *(
                loop.run_in_executor(
                    self.executor,
                    self._prepare_registration,
                    aid,
                )
                for aid in aids
            ),
        ),
    )
    pending = list(
        zip(
            prepared,
            [at for at, _ in agents],
            strict=True,
        ),
    )

    # Accumulate scope requirements, then do a single login
    # that satisfies all of them at once.
    assert self._app is not None
    for p, _ in pending:
        self._app.add_scope_requirements(
            {str(p.client_id): [p.scope]},
        )

    if self._app.login_required():
        await loop.run_in_executor(
            self.executor,
            self._app.login,
        )

    # Phase 2: extract delegated tokens and create mailboxes.
    # Safe to parallelize — no shared state mutation, and the
    # app already holds valid tokens for all scopes.
    regs: list[GlobusAgentRegistration[AgentT]] = list(
        await asyncio.gather(
            *(
                loop.run_in_executor(
                    self.executor,
                    self._finalize_registration,
                    p,
                )
                for p, _ in pending
            ),
        ),
    )

    await asyncio.gather(
        *(
            loop.run_in_executor(
                self.executor,
                self._create_mailbox,
                reg.agent_id,
                agent_type,
            )
            for reg, (_, agent_type) in zip(
                regs,
                pending,
                strict=True,
            )
        ),
    )

    for reg in regs:
        self.child_clients.append(reg.client_id)

    return regs

GlobusExchangeFactory

GlobusExchangeFactory(
    project_id: UUID,
    client_params: dict[str, Any] | None = None,
    request_timeout_s: float = 60,
)

Bases: ExchangeFactory[GlobusExchangeTransport]

Globus exchange client factory.

Parameters:

  • project_id (UUID) –

    Project to create new clients under. Must be able to authenticate as a administrator.

  • client_params (dict[str, Any] | None, default: None ) –

    Additional parameters for globus client.

  • request_timeout_s (float, default: 60 ) –

    Maximum length of receive/listen requests to the exchange.

Source code in academy/exchange/cloud/globus.py
def __init__(
    self,
    project_id: uuid.UUID,
    client_params: dict[str, Any] | None = None,
    request_timeout_s: float = 60,
) -> None:
    self.info = _AcademyConnectionInfo(
        project_id=project_id,
        client_params=client_params,
        request_timeout_s=request_timeout_s,
    )

create_agent_client async

create_agent_client(
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]

Create a new agent exchange client.

An agent must be registered with the exchange before an exchange client can be created. For example:

factory = ExchangeFactory(...)
user_client = factory.create_user_client()
registration = user_client.register_agent(...)
agent_client = factory.create_agent_client(registration, ...)

Parameters:

  • registration (AgentRegistration[AgentT]) –

    Registration information returned by the exchange.

  • request_handler (RequestHandler[RequestT_co]) –

    Agent request message handler.

Returns:

Raises:

  • BadEntityIdError

    If an agent with registration.agent_id is not already registered with the exchange.

Source code in academy/exchange/factory.py
async def create_agent_client(
    self,
    registration: AgentRegistration[AgentT],
    request_handler: RequestHandler[RequestT_co],
) -> AgentExchangeClient[AgentT, ExchangeTransportT]:
    """Create a new agent exchange client.

    An agent must be registered with the exchange before an exchange
    client can be created. For example:
    ```python
    factory = ExchangeFactory(...)
    user_client = factory.create_user_client()
    registration = user_client.register_agent(...)
    agent_client = factory.create_agent_client(registration, ...)
    ```

    Args:
        registration: Registration information returned by the exchange.
        request_handler: Agent request message handler.

    Returns:
        Agent exchange client.

    Raises:
        BadEntityIdError: If an agent with `registration.agent_id` is not
            already registered with the exchange.
    """
    agent_id: AgentId[AgentT] = registration.agent_id
    transport = await self._create_transport(
        mailbox_id=agent_id,
        registration=registration,
    )
    assert transport.mailbox_id == agent_id
    status = await transport.status(agent_id)
    if status != MailboxStatus.ACTIVE:
        await transport.close()
        raise BadEntityIdError(agent_id)
    return AgentExchangeClient(
        agent_id,
        transport,
        request_handler=request_handler,
    )

create_user_client async

create_user_client(
    *, name: str | None = None, start_listener: bool = True
) -> UserExchangeClient[ExchangeTransportT]

Create a new user in the exchange and associated client.

Parameters:

  • name (str | None, default: None ) –

    Display name of the client on the exchange.

  • start_listener (bool, default: True ) –

    Start a message listener thread.

Returns:

Source code in academy/exchange/factory.py
async def create_user_client(
    self,
    *,
    name: str | None = None,
    start_listener: bool = True,
) -> UserExchangeClient[ExchangeTransportT]:
    """Create a new user in the exchange and associated client.

    Args:
        name: Display name of the client on the exchange.
        start_listener: Start a message listener thread.

    Returns:
        User exchange client.
    """
    transport = await self._create_transport(mailbox_id=None, name=name)
    user_id = transport.mailbox_id
    assert isinstance(user_id, UserId)
    return UserExchangeClient(
        user_id,
        transport,
        start_listener=start_listener,
    )