academy.manager¶
Manager
¶
Manager(
exchange_client: ExchangeClient[ExchangeTransportT],
executors: (
Executor
| MutableMapping[str, Executor | None]
| None
) = None,
*,
default_executor: str = "event_loop",
max_retries: int = 0,
log_config: LogConfig | None = None
)
Bases: Generic[ExchangeTransportT], NoPickleMixin
Launch and manage running agents.
A manager is used to launch agents in the current event loop or in
Executors and interact with/manage those
agents.
Tip
This class can be used as a context manager. Upon exiting the context, running agents will be shutdown, any agent handles created by the manager will be closed, and the executors will be shutdown.
Note
The manager takes ownership of the exchange client and executors, meaning the manager will clean up those resources when the manager is closed.
Parameters:
-
exchange_client(ExchangeClient[ExchangeTransportT]) –Exchange client.
-
executors(Executor | MutableMapping[str, Executor | None] | None, default:None) –An executor instance or mapping of names to executors to use to run agents. If a single executor is provided, it is set as the default executor with name
'default', overriding any value ofdefault_executor. Settingexecutor=None(the default) or passing{'name': None, ...}can be used to launch agents in the current event loop. -
default_executor(str, default:'event_loop') –Specify the name of the default executor to use when not specified in
launch(). -
max_retries(int, default:0) –Maximum number of times to retry running an agent if it exits with an error.
Raises:
-
ValueError–If
default_executoris specified but does not exist inexecutors, or if executors is badly typed.
Source code in academy/manager.py
exchange_client
property
¶
exchange_client: ExchangeClient[ExchangeTransportT]
User client for the exchange.
exchange_factory
property
¶
exchange_factory: ExchangeFactory[ExchangeTransportT]
Client factory for the exchange.
from_exchange_factory
async
classmethod
¶
from_exchange_factory(
factory: ExchangeFactory[ExchangeTransportT],
executors: (
Executor
| MutableMapping[str, Executor | None]
| None
) = None,
*,
default_executor: str = "event_loop",
max_retries: int = 0,
log_config: LogConfig | None = None
) -> Self
Instantiate a new exchange client and manager from a factory.
Source code in academy/manager.py
close
async
¶
close(close_exchange: bool = True) -> None
Shutdown the manager and cleanup resources.
- Request all running agents to shut down.
- Wait for all running agents to shut down.
- Close the exchange client. (if close_exchange)
- Shutdown the executors.
- Raise an exceptions returned by agents.
Parameters:
-
close_exchange(bool, default:True) –If the exchange_client should be closed.
Raises:
-
Exception–Any exceptions raised by agents.
Source code in academy/manager.py
add_executor
¶
Add an executor to the manager.
Note
It is not possible to remove an executor as this could create complications if an agent is already running in that executor.
Parameters:
-
name(str) –Name of the executor used when launching agents.
-
executor(Executor) –Executor instance.
Returns:
-
Self–Self for chaining.
Raises:
-
ValueError–If an executor with
namealready exists.
Source code in academy/manager.py
set_default_executor
¶
set_default_executor(name: str) -> Self
Set the default executor by name.
Parameters:
-
name(str) –Name of the executor to use as default.
Returns:
-
Self–Self for chaining.
Raises:
-
ValueError–If no executor with
nameexists.
Source code in academy/manager.py
launch
async
¶
launch(
agent: AgentT | type[AgentT],
*,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
config: RuntimeConfig | None = None,
executor: str | None = None,
submit_kwargs: dict[str, Any] | None = None,
name: str | None = None,
registration: AgentRegistration[AgentT] | None = None,
log_config: LogConfig | None = None
) -> Handle[AgentT]
Launch a new agent with a specified agent.
Note
Parameters are mirrored in _LaunchIntent and
_BatchLauncher.queue; keep all three in sync.
Parameters:
-
agent(AgentT | type[AgentT]) –Agent instance the agent will implement or the agent type that will be initialized on the worker using
argsandkwargs. -
args(tuple[Any, ...] | None, default:None) –Positional arguments used to initialize the agent. Ignored if
agentis already an instance. -
kwargs(dict[str, Any] | None, default:None) –Keyword arguments used to initialize the agent. Ignored if
agentis already an instance. -
config(RuntimeConfig | None, default:None) –Agent run configuration.
-
executor(str | None, default:None) –Name of the executor instance to use. If
None, uses the default executor. -
submit_kwargs(dict[str, Any] | None, default:None) –Additional arguments to pass to the submit function of the executor (i.e. a resource specification).
-
name(str | None, default:None) –Readable name of the agent used when registering a new agent.
-
registration(AgentRegistration[AgentT] | None, default:None) –If
None, a new agent will be registered with the exchange. -
log_config(LogConfig | None, default:None) –log configuration to use. If
None, log configuration will be inherited from the enclosingManager.
Returns:
-
Handle[AgentT]–Handle (client bound) used to interact with the agent.
Raises:
-
RuntimeError–If
registrationis provided and an agent with that ID has already been executed.
Source code in academy/manager.py
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 | |
get_handle
¶
get_handle(
agent: AgentId[AgentT] | AgentRegistration[AgentT],
) -> Handle[AgentT]
Create a new handle to an agent.
A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.
Parameters:
-
agent(AgentId[AgentT] | AgentRegistration[AgentT]) –Agent ID or registration indicating the agent to create a handle to. The agent must be registered with the same exchange that this manager is a client of.
Returns:
-
Handle[AgentT]–Handle to the agent.
Source code in academy/manager.py
launch_batch
¶
Open a batch context that performs multiple simultaneous launches.
On transports that batch auth (notably
GlobusExchangeFactory),
every agent queued inside the async with block is
registered under a single auth consent prompt. Other
transports fall back to sequential registration.
Handles returned by batch.queue can be passed as
arguments to sibling batch.queue calls; they are not
messageable until the batch is submitted on exit.
Example
A convenience wrapper around
register_agents
and
launch(registration=...);
you can use the pair instead when registration and launch
need to be separated in time.
Source code in academy/manager.py
register_agent
async
¶
register_agent(
agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]
Register a new agent with the exchange.
Parameters:
-
agent(type[AgentT]) –Agent type of the agent.
-
name(str | None, default:None) –Optional display name for the agent.
Returns:
-
AgentRegistration[AgentT]–Agent registration info that can be passed to
-
AgentRegistration[AgentT]–
Source code in academy/manager.py
register_agents
async
¶
register_agents(
agents: list[tuple[type[AgentT], str | None]],
) -> list[AgentRegistration[AgentT]]
Register multiple agents, batching auth when possible.
For transports that support batch registration (e.g. Globus), all auth consent is consolidated into a single prompt. For other transports this falls back to sequential registration.
Use the returned registrations with
launch(registration=...).
Parameters:
Returns:
-
list[AgentRegistration[AgentT]]–List of registrations in the same order as the input.
Source code in academy/manager.py
running
¶
Get a set of IDs of all running agents.
Returns:
-
set[AgentId[Any]]–Set of agent IDs corresponding to all agents launched by this manager that have not completed yet.
Source code in academy/manager.py
shutdown
async
¶
shutdown(
agent: AgentId[Any] | Handle[Any],
*,
blocking: bool = True,
raise_error: bool = True,
terminate: bool | None = None,
timeout: float | None = None
) -> None
Shutdown a launched agent.
Parameters:
-
agent(AgentId[Any] | Handle[Any]) –ID or handle to the launched agent.
-
blocking(bool, default:True) –Wait for the agent to exit before returning.
-
raise_error(bool, default:True) –Raise the error returned by the agent if
blocking=True. -
terminate(bool | None, default:None) –Override the termination behavior of the agent defined in the
RuntimeConfig. -
timeout(float | None, default:None) –Optional timeout is seconds when
blocking=True.
Raises:
-
BadEntityIdError–If an agent with
agent_idwas not launched by this manager. -
TimeoutError–If
timeoutwas exceeded while blocking for agent.
Source code in academy/manager.py
wait
async
¶
wait(
agents: Iterable[AgentId[Any] | Handle[Any]],
*,
raise_error: bool = False,
return_when: str = ALL_COMPLETED,
timeout: float | None = None
) -> None
Wait for launched agents to complete.
Note
Calling wait() is only valid after launch() has succeeded.
Parameters:
-
agents(Iterable[AgentId[Any] | Handle[Any]]) –An iterable of agent IDs or handles to wait on.
-
raise_error(bool, default:False) –Raise errors returned by completed agents.
-
return_when(str, default:ALL_COMPLETED) –Indicate when this function should return. The same as
asyncio.wait(). -
timeout(float | None, default:None) –Optional timeout in seconds to wait for agents.
Raises:
-
BadEntityIdError–If an agent was not launched by this manager.
-
TimeoutError–If
timeoutwas exceeded while waiting for agents. -
Exception–Any exception raised by an agent that completed due to a failure and
raise_error=Trueis set.