Skip to content

academy.exchange.cloud.app

HTTP message exchange client and server.

To start the exchange:

python -m academy.exchange.cloud --config exchange.toml

Connect to the exchange through the client.

from academy.exchange import HttpExchangeFactory

with HttpExchangeFactory(
    'http://localhost:1234'
).create_user_client() as exchange:
    aid, agent_info = exchange.register_agent()
    ...

StatusCode

Bases: Enum

Http status codes.

get_client_info

get_client_info(request: Request) -> ClientInfo

Reconstitute client info from Request.

Source code in academy/exchange/cloud/app.py
def get_client_info(request: Request) -> ClientInfo:
    """Reconstitute client info from Request."""
    client_info = ClientInfo(
        client_id=request.headers.get('client_id', ''),
        group_memberships=set(
            request.headers.get('client_groups', '').split(','),
        ),
    )
    return client_info

exception_to_response

exception_to_response(
    request_name: str,
) -> Callable[..., Any]

Convert exceptions to responses.

Parameters:

  • request_name (str) –

    Request name for logging.

Source code in academy/exchange/cloud/app.py
def exception_to_response(request_name: str) -> Callable[..., Any]:
    """Convert exceptions to responses.

    Args:
        request_name: Request name for logging.
    """

    def decorator(
        func: Callable[[Any], Coroutine[None, None, Response]],
    ) -> Callable[[Any], Coroutine[None, None, Response]]:
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Response:  # noqa: PLR0911
            try:
                return await func(*args, **kwargs)
            except (KeyError, ValidationError) as e:
                logger.warning(
                    f'Missing or invalid field in {request_name} request.',
                )
                return Response(
                    status=StatusCode.BAD_REQUEST.value,
                    text=f'Missing or invalid field: {e}',
                )
            except BadEntityIdError:
                logger.exception(f'Unknown mailbox id in {request_name}')
                return Response(
                    status=StatusCode.NOT_FOUND.value,
                    text='Unknown mailbox ID',
                )
            except ForbiddenError:
                logger.exception(f'Incorrect permissions in {request_name}')
                return Response(
                    status=StatusCode.FORBIDDEN.value,
                    text='Incorrect permissions',
                )
            except MailboxTerminatedError:
                logger.exception(
                    f'Mailbox in {request_name} request was terminated.',
                )
                return Response(
                    status=StatusCode.TERMINATED.value,
                    text='Mailbox was closed',
                )
            except MessageTooLargeError as e:
                logger.exception('Message to mailbox too large.')
                return Response(
                    status=StatusCode.TOO_LARGE.value,
                    text=(
                        f'Message of size {e.size} larger than limit '
                        f'{e.limit}.'
                    ),
                )
            except ConnectionResetError:  # pragma: no cover
                # This happens when the client cancels it's task, and
                # closes its connection. In this case, we don't
                # need to do anything because the client disconnected
                # itself. If we don't catch this aiohttp will
                # just log an error message each time this happens.
                return Response(status=StatusCode.NO_RESPONSE.value)

        return wrapper

    return decorator

authenticate_factory

authenticate_factory(authenticator: Authenticator) -> Any

Create an authentication middleware for a given authenticator.

Parameters:

  • authenticator (Authenticator) –

    Used to validate client id and transform token into id.

Returns:

  • Any

    A aiohttp.web.middleware function that will only allow authenticated requests.

Source code in academy/exchange/cloud/app.py
def authenticate_factory(
    authenticator: Authenticator,
) -> Any:
    """Create an authentication middleware for a given authenticator.

    Args:
        authenticator: Used to validate client id and transform token into id.

    Returns:
        A aiohttp.web.middleware function that will only allow authenticated
            requests.
    """

    @middleware
    async def authenticate(
        request: Request,
        handler: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        try:
            client_info: ClientInfo = await authenticator.authenticate_user(
                request.headers,
            )
        except ForbiddenError:
            logger.exception('Could not authenticate.')
            return Response(
                status=StatusCode.FORBIDDEN.value,
                text='Token expired or revoked.',
            )
        except UnauthorizedError:
            logger.exception('Could not authenticate.')
            return Response(
                status=StatusCode.UNAUTHORIZED.value,
                text='Missing required headers.',
            )

        headers = request.headers.copy()
        headers['client_id'] = client_info.client_id
        headers['client_groups'] = ','.join(client_info.group_memberships)

        # Handle early client-side disconnect in Issue #142
        # This is somewhat hard to reproduce in tests:
        # https://github.com/aio-libs/aiohttp/issues/6978
        if (
            request.transport is None or request.transport.is_closing()
        ):  # pragma: no cover
            return Response(status=StatusCode.NO_RESPONSE.value)

        request = request.clone(headers=headers)
        return await handler(request)

    return authenticate

create_app

create_app(
    config: ExchangeServingConfig | None = None,
) -> Application

Create a new server application.

Source code in academy/exchange/cloud/app.py
def create_app(
    config: ExchangeServingConfig | None = None,
) -> Application:
    """Create a new server application."""
    if config is None:
        config = ExchangeServingConfig()

    backend = config.backend.get_backend()
    authenticator = get_authenticator(config.auth)
    middlewares = [authenticate_factory(authenticator)]

    app = Application(middlewares=middlewares)
    app[MANAGER_KEY] = backend
    app[TIMEOUT_KEY] = config.listen_timeout_s

    app.router.add_post('/mailbox', _create_mailbox_route)
    app.router.add_post('/mailbox/share', _share_mailbox_route)
    app.router.add_get('/mailbox/share', _get_mailbox_shares_route)
    app.router.add_delete('/mailbox/share', _remove_mailbox_shares_route)
    app.router.add_delete('/mailbox', _terminate_route)
    app.router.add_get('/mailbox', _check_mailbox_route)
    app.router.add_put('/message', _send_message_route)
    app.router.add_get('/message', _recv_message_route)
    app.router.add_get('/discover', _discover_route)
    app.router.add_get('/mailbox/listen', _listen_mailbox_route)
    app.router.add_get('/mailbox/heartbeat', _get_heartbeat_route)

    return app