academy.manager¶
Manager
¶
Manager(
exchange_client: ExchangeClient[ExchangeTransportT],
executors: (
Executor
| MutableMapping[str, Executor | None]
| None
) = None,
*,
default_executor: str = "event_loop",
max_retries: int = 0
)
Bases: Generic[ExchangeTransportT], NoPickleMixin
Launch and manage running agents.
A manager is used to launch agents in the current event loop or in
Executors and interact with/manage those
agents.
Tip
This class can be used as a context manager. Upon exiting the context, running agents will be shutdown, any agent handles created by the manager will be closed, and the executors will be shutdown.
Tip
When using
ProcessPoolExecutors,
use the initializer argument to configure logging in the worker
processes that will execute agents.
Note
The manager takes ownership of the exchange client and executors, meaning the manager will clean up those resources when the manager is closed.
Parameters:
-
exchange_client(ExchangeClient[ExchangeTransportT]) –Exchange client.
-
executors(Executor | MutableMapping[str, Executor | None] | None, default:None) –An executor instance or mapping of names to executors to use to run agents. If a single executor is provided, it is set as the default executor with name
'default', overriding any value ofdefault_executor. Settingexecutor=None(the default) or passing{'name': None, ...}can be used to launch agents in the current event loop. -
default_executor(str, default:'event_loop') –Specify the name of the default executor to use when not specified in
launch(). -
max_retries(int, default:0) –Maximum number of times to retry running an agent if it exits with an error.
Raises:
-
ValueError–If
default_executoris specified but does not exist inexecutors.
Source code in academy/manager.py
exchange_client
property
¶
exchange_client: ExchangeClient[ExchangeTransportT]
User client for the exchange.
exchange_factory
property
¶
exchange_factory: ExchangeFactory[ExchangeTransportT]
Client factory for the exchange.
from_exchange_factory
async
classmethod
¶
from_exchange_factory(
factory: ExchangeFactory[ExchangeTransportT],
executors: (
Executor
| MutableMapping[str, Executor | None]
| None
) = None,
*,
default_executor: str = "event_loop",
max_retries: int = 0
) -> Self
Instantiate a new exchange client and manager from a factory.
Source code in academy/manager.py
close
async
¶
close(close_exchange: bool = True) -> None
Shutdown the manager and cleanup resources.
- Request all running agents to shut down.
- Wait for all running agents to shut down.
- Close the exchange client. (if close_exchange)
- Shutdown the executors.
- Raise an exceptions returned by agents.
Parameters:
-
close_exchange(bool, default:True) –If the exchange_client should be closed.
Raises:
-
Exception–Any exceptions raised by agents.
Source code in academy/manager.py
add_executor
¶
Add an executor to the manager.
Note
It is not possible to remove an executor as this could create complications if an agent is already running in that executor.
Parameters:
-
name(str) –Name of the executor used when launching agents.
-
executor(Executor) –Executor instance.
Returns:
-
Self–Self for chaining.
Raises:
-
ValueError–If an executor with
namealready exists.
Source code in academy/manager.py
set_default_executor
¶
set_default_executor(name: str) -> Self
Set the default executor by name.
Parameters:
-
name(str) –Name of the executor to use as default.
Returns:
-
Self–Self for chaining.
Raises:
-
ValueError–If no executor with
nameexists.
Source code in academy/manager.py
launch
async
¶
launch(
agent: AgentT | type[AgentT],
*,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
config: RuntimeConfig | None = None,
executor: str | None = None,
submit_kwargs: dict[str, Any] | None = None,
name: str | None = None,
registration: AgentRegistration[AgentT] | None = None,
init_logging: bool = False,
loglevel: int | str = INFO,
logfile: str | None = None
) -> Handle[AgentT]
Launch a new agent with a specified agent.
Parameters:
-
agent(AgentT | type[AgentT]) –Agent instance the agent will implement or the agent type that will be initialized on the worker using
argsandkwargs. -
args(tuple[Any, ...] | None, default:None) –Positional arguments used to initialize the agent. Ignored if
agentis already an instance. -
kwargs(dict[str, Any] | None, default:None) –Keyword arguments used to initialize the agent. Ignored if
agentis already an instance. -
config(RuntimeConfig | None, default:None) –Agent run configuration.
-
executor(str | None, default:None) –Name of the executor instance to use. If
None, uses the default executor. -
submit_kwargs(dict[str, Any] | None, default:None) –Additional arguments to pass to the submit function of the executor (i.e. a resource specification).
-
name(str | None, default:None) –Readable name of the agent used when registering a new agent.
-
registration(AgentRegistration[AgentT] | None, default:None) –If
None, a new agent will be registered with the exchange. -
init_logging(bool, default:False) –Initialize logging before running agent.
-
loglevel(int | str, default:INFO) –Level of logging.
-
logfile(str | None, default:None) –Location to write logs.
Returns:
Raises:
-
RuntimeError–If
registrationis provided and an agent with that ID has already been executed.
Source code in academy/manager.py
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 | |
get_handle
¶
Create a new handle to an agent.
A handle acts like a reference to a remote agent, enabling a user to manage the agent or asynchronously invoke actions.
Parameters:
-
agent(AgentId[AgentT] | AgentRegistration[AgentT]) –Agent ID or registration indicating the agent to create a handle to. The agent must be registered with the same exchange that this manager is a client of.
Returns:
Source code in academy/manager.py
register_agent
async
¶
register_agent(
agent: type[AgentT], *, name: str | None = None
) -> AgentRegistration[AgentT]
Register a new agent with the exchange.
Parameters:
-
agent(type[AgentT]) –Agent type of the agent.
-
name(str | None, default:None) –Optional display name for the agent.
Returns:
-
AgentRegistration[AgentT]–Agent registration info that can be passed to
-
AgentRegistration[AgentT]–
Source code in academy/manager.py
running
¶
Get a set of IDs of all running agents.
Returns:
-
set[AgentId[Any]]–Set of agent IDs corresponding to all agents launched by this manager that have not completed yet.
Source code in academy/manager.py
shutdown
async
¶
shutdown(
agent: AgentId[Any] | Handle[Any],
*,
blocking: bool = True,
raise_error: bool = True,
terminate: bool | None = None,
timeout: float | None = None
) -> None
Shutdown a launched agent.
Parameters:
-
agent(AgentId[Any] | Handle[Any]) –ID or handle to the launched agent.
-
blocking(bool, default:True) –Wait for the agent to exit before returning.
-
raise_error(bool, default:True) –Raise the error returned by the agent if
blocking=True. -
terminate(bool | None, default:None) –Override the termination behavior of the agent defined in the
RuntimeConfig. -
timeout(float | None, default:None) –Optional timeout is seconds when
blocking=True.
Raises:
-
BadEntityIdError–If an agent with
agent_idwas not launched by this manager. -
TimeoutError–If
timeoutwas exceeded while blocking for agent.
Source code in academy/manager.py
wait
async
¶
wait(
agents: Iterable[AgentId[Any] | Handle[Any]],
*,
raise_error: bool = False,
return_when: str = ALL_COMPLETED,
timeout: float | None = None
) -> None
Wait for launched agents to complete.
Note
Calling wait() is only valid after launch() has succeeded.
Parameters:
-
agents(Iterable[AgentId[Any] | Handle[Any]]) –An iterable of agent IDs or handles to wait on.
-
raise_error(bool, default:False) –Raise errors returned by completed agents.
-
return_when(str, default:ALL_COMPLETED) –Indicate when this function should return. The same as
asyncio.wait(). -
timeout(float | None, default:None) –Optional timeout in seconds to wait for agents.
Raises:
-
BadEntityIdError–If an agent was not launched by this manager.
-
TimeoutError–If
timeoutwas exceeded while waiting for agents. -
Exception–Any exception raised by an agent that completed due to a failure and
raise_error=Trueis set.