HTTP message exchange client and server.
To start the exchange:
python -m academy.exchange.cloud --config exchange.toml
Connect to the exchange through the client.
from academy.exchange import HttpExchangeFactory
with HttpExchangeFactory(
'http://localhost:1234'
).create_user_client() as exchange:
aid, agent_info = exchange.register_agent()
...
StatusCode
Bases: Enum
Http status codes.
get_client_info
Reconstitute client info from Request.
Source code in academy/exchange/cloud/app.py
| def get_client_info(request: Request) -> ClientInfo:
"""Reconstitute client info from Request."""
client_info = ClientInfo(
client_id=request.headers.get('client_id', ''),
group_memberships=set(
request.headers.get('client_groups', '').split(','),
),
)
return client_info
|
exception_to_response
Convert exceptions to responses.
Parameters:
-
request_name
(str)
–
Request name for logging.
Source code in academy/exchange/cloud/app.py
| def exception_to_response(request_name: str) -> Callable[..., Any]:
"""Convert exceptions to responses.
Args:
request_name: Request name for logging.
"""
def decorator(
func: Callable[[Any], Coroutine[None, None, Response]],
) -> Callable[[Any], Coroutine[None, None, Response]]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Response: # noqa: PLR0911
try:
return await func(*args, **kwargs)
except (KeyError, ValidationError) as e:
logger.warning(
f'Missing or invalid field in {request_name} request.',
)
return Response(
status=StatusCode.BAD_REQUEST.value,
text=f'Missing or invalid field: {e}',
)
except BadEntityIdError:
logger.exception(f'Unknown mailbox id in {request_name}')
return Response(
status=StatusCode.NOT_FOUND.value,
text='Unknown mailbox ID',
)
except ForbiddenError:
logger.exception(f'Incorrect permissions in {request_name}')
return Response(
status=StatusCode.FORBIDDEN.value,
text='Incorrect permissions',
)
except MailboxTerminatedError:
logger.exception(
f'Mailbox in {request_name} request was terminated.',
)
return Response(
status=StatusCode.TERMINATED.value,
text='Mailbox was closed',
)
except MessageTooLargeError as e:
logger.exception('Message to mailbox too large.')
return Response(
status=StatusCode.TOO_LARGE.value,
text=(
f'Message of size {e.size} larger than limit '
f'{e.limit}.'
),
)
except ConnectionResetError: # pragma: no cover
# This happens when the client cancels it's task, and
# closes its connection. In this case, we don't
# need to do anything because the client disconnected
# itself. If we don't catch this aiohttp will
# just log an error message each time this happens.
return Response(status=StatusCode.NO_RESPONSE.value)
return wrapper
return decorator
|
authenticate_factory
Create an authentication middleware for a given authenticator.
Parameters:
-
authenticator
(Authenticator)
–
Used to validate client id and transform token into id.
Returns:
-
Any
–
A aiohttp.web.middleware function that will only allow authenticated
requests.
Source code in academy/exchange/cloud/app.py
| def authenticate_factory(
authenticator: Authenticator,
) -> Any:
"""Create an authentication middleware for a given authenticator.
Args:
authenticator: Used to validate client id and transform token into id.
Returns:
A aiohttp.web.middleware function that will only allow authenticated
requests.
"""
@middleware
async def authenticate(
request: Request,
handler: Callable[[Request], Awaitable[Response]],
) -> Response:
try:
client_info: ClientInfo = await authenticator.authenticate_user(
request.headers,
)
except ForbiddenError:
logger.exception('Could not authenticate.')
return Response(
status=StatusCode.FORBIDDEN.value,
text='Token expired or revoked.',
)
except UnauthorizedError:
logger.exception('Could not authenticate.')
return Response(
status=StatusCode.UNAUTHORIZED.value,
text='Missing required headers.',
)
headers = request.headers.copy()
headers['client_id'] = client_info.client_id
headers['client_groups'] = ','.join(client_info.group_memberships)
# Handle early client-side disconnect in Issue #142
# This is somewhat hard to reproduce in tests:
# https://github.com/aio-libs/aiohttp/issues/6978
if (
request.transport is None or request.transport.is_closing()
): # pragma: no cover
return Response(status=StatusCode.NO_RESPONSE.value)
request = request.clone(headers=headers)
return await handler(request)
return authenticate
|
create_app
Create a new server application.
Source code in academy/exchange/cloud/app.py
| def create_app(
config: ExchangeServingConfig | None = None,
) -> Application:
"""Create a new server application."""
if config is None:
config = ExchangeServingConfig()
backend = config.backend.get_backend()
authenticator = get_authenticator(config.auth)
middlewares = [authenticate_factory(authenticator)]
app = Application(middlewares=middlewares)
app[MANAGER_KEY] = backend
app[TIMEOUT_KEY] = config.listen_timeout_s
app.router.add_post('/mailbox', _create_mailbox_route)
app.router.add_post('/mailbox/share', _share_mailbox_route)
app.router.add_get('/mailbox/share', _get_mailbox_shares_route)
app.router.add_delete('/mailbox/share', _remove_mailbox_shares_route)
app.router.add_delete('/mailbox', _terminate_route)
app.router.add_get('/mailbox', _check_mailbox_route)
app.router.add_put('/message', _send_message_route)
app.router.add_get('/message', _recv_message_route)
app.router.add_get('/discover', _discover_route)
app.router.add_get('/mailbox/listen', _listen_mailbox_route)
app.router.add_get('/mailbox/heartbeat', _get_heartbeat_route)
return app
|