Rename actor to worker (#174)

# PR Description
This PR renames `actor` to `worker` 

**Does not include deployment related things in
`.github/workflows/release-containers.yml`**
This commit is contained in:
Eric Gustin 2025-01-03 14:28:04 -08:00 committed by GitHub
parent 2cc9aba0f4
commit 890ee96ef4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 152 additions and 152 deletions

4
.vscode/launch.json vendored
View file

@ -18,11 +18,11 @@
"cwd": "${workspaceFolder}/examples/fastapi/arcade_example_fastapi"
},
{
"name": "Debug `arcade actorup --no-auth`",
"name": "Debug `arcade workerup --no-auth`",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/arcade/run_cli.py",
"args": ["actorup", "--no-auth"],
"args": ["workerup", "--no-auth"],
"console": "integratedTerminal",
"jinja": true,
"justMyCode": true,

View file

@ -30,25 +30,25 @@ if os.environ.get("HOMEBREW_REPOSITORY") is not None:
def start_servers(
actor_host: str,
actor_port: int,
worker_host: str,
worker_port: int,
engine_config: str | None,
engine_env: str | None = None,
debug: bool = False,
) -> None:
"""
Start the actor and engine servers.
Start the worker and engine servers.
Args:
host: Host for the actor server.
port: Port for the actor server.
host: Host for the worker server.
port: Port for the worker server.
engine_config: Path to the engine configuration file.
engine_env: Path to the engine environment file.
debug: Whether to run in debug mode.
"""
# Validate host and port
actor_host = _validate_host(actor_host)
actor_port = _validate_port(actor_port)
worker_host = _validate_host(worker_host)
worker_port = _validate_port(worker_port)
# Ensure engine_config is provided and validated
engine_config = _get_config_file(engine_config, default_filename="engine.yaml")
@ -56,14 +56,14 @@ def start_servers(
# Ensure engine_env is provided or found and either way, validated
env_file = _get_config_file(engine_env, default_filename="arcade.env", optional=True)
# Prepare command-line arguments for the actor server and engine
actor_cmd = _build_actor_command(actor_host, actor_port, debug)
# Prepare command-line arguments for the worker server and engine
worker_cmd = _build_worker_command(worker_host, worker_port, debug)
# even if the user didn't pass an env file we may have found it in the default locations
engine_cmd = _build_engine_command(engine_config, engine_env=env_file if env_file else None)
# Start and manage the processes
_manage_processes(actor_cmd, actor_host, actor_port, engine_cmd, debug=debug)
_manage_processes(worker_cmd, worker_host, worker_port, engine_cmd, debug=debug)
def _validate_host(host: str) -> str:
@ -71,7 +71,7 @@ def _validate_host(host: str) -> str:
Validates the host input.
Args:
host: Host for the actor server.
host: Host for the worker server.
Returns:
The validated host as a string.
@ -95,7 +95,7 @@ def _validate_port(port: int) -> int:
Validates the port input.
Args:
port: Port for the actor server.
port: Port for the worker server.
Returns:
The validated port as an integer.
@ -176,13 +176,13 @@ def _get_config_file(
raise RuntimeError(f"Config file '{default_filename}' not found.")
def _build_actor_command(host: str, port: int, debug: bool) -> list[str]:
def _build_worker_command(host: str, port: int, debug: bool) -> list[str]:
"""
Builds the command to start the actor server.
Builds the command to start the worker server.
Args:
host: Host for the actor server.
port: Port for the actor server.
host: Host for the worker server.
port: Port for the worker server.
debug: Whether to run in debug mode.
Returns:
@ -198,7 +198,7 @@ def _build_actor_command(host: str, port: int, debug: bool) -> list[str]:
sys.exit(1)
cmd = [
arcade_bin,
"actorup",
"workerup",
"--host",
host,
"--port",
@ -246,28 +246,28 @@ def _build_engine_command(engine_config: str | None, engine_env: str | None = No
def _manage_processes(
actor_cmd: list[str],
actor_host: str,
actor_port: int,
worker_cmd: list[str],
worker_host: str,
worker_port: int,
engine_cmd: list[str],
engine_env: dict[str, str] | None = None,
debug: bool = False,
) -> None:
"""
Manages the lifecycle of the actor and engine processes.
Manages the lifecycle of the worker and engine processes.
Args:
actor_cmd: The command to start the actor server.
worker_cmd: The command to start the worker server.
engine_cmd: The command to start the engine.
engine_env: Environment variables to set for the engine.
debug: Whether to run in debug mode.
"""
actor_process: subprocess.Popen | None = None
worker_process: subprocess.Popen | None = None
engine_process: subprocess.Popen | None = None
def terminate_processes(exit_program: bool = False) -> None:
console.print("Terminating child processes...", style="bold yellow")
_terminate_process(actor_process)
_terminate_process(worker_process)
_terminate_process(engine_process)
if exit_program:
sys.exit(0)
@ -279,18 +279,18 @@ def _manage_processes(
while retry_count <= max_retries:
try:
# Start the actor server
console.print("Starting actor server...", style="bold green")
actor_process = _start_process("Actor", actor_cmd, debug=debug)
# Start the worker server
console.print("Starting worker server...", style="bold green")
worker_process = _start_process("Worker", worker_cmd, debug=debug)
_wait_for_healthy_actor(actor_process, actor_host, actor_port)
_wait_for_healthy_worker(worker_process, worker_host, worker_port)
# Start the engine
console.print("Starting engine...", style="bold green")
engine_process = _start_process("Engine", engine_cmd, env=engine_env, debug=debug)
# Monitor processes
_monitor_processes(actor_process, engine_process)
_monitor_processes(worker_process, engine_process)
# If we reach here, one of the processes has exited
retry_count += 1
@ -345,7 +345,7 @@ def _start_process(
else:
_env["GIN_MODE"] = "release"
if name == "Actor":
if name == "Worker":
_env["PYTHONUNBUFFERED"] = "1"
try:
@ -365,26 +365,26 @@ def _start_process(
raise RuntimeError(f"Failed to start {name}")
def _wait_for_healthy_actor(
actor_process: subprocess.Popen, actor_host: str, actor_port: int
def _wait_for_healthy_worker(
worker_process: subprocess.Popen, worker_host: str, worker_port: int
) -> None:
"""Wait until an HTTP request to `host:port/actor/health` returns 200"""
"""Wait until an HTTP request to `host:port/worker/health` returns 200"""
while actor_process.poll() is None: # Continue waiting UNLESS the actor process has exited
while worker_process.poll() is None: # Continue waiting UNLESS the worker process has exited
time.sleep(1)
try:
conn = http.client.HTTPConnection(actor_host, actor_port, timeout=1)
conn.request("GET", "/actor/health")
conn = http.client.HTTPConnection(worker_host, worker_port, timeout=1)
conn.request("GET", "/worker/health")
res = conn.getresponse()
if res.status == 200:
break
conn.close()
except (socket.gaierror, http.client.HTTPException, ConnectionRefusedError, TimeoutError):
pass # Handle expected exceptions gracefully
console.print("Waiting for actor to start...", style="bold yellow")
console.print("Waiting for worker to start...", style="bold yellow")
time.sleep(1) # Wait just a little longer for everything to settle (discovered experimentally)
console.print("Actor is healthy", style="bold green")
console.print("Worker is healthy", style="bold green")
def _stream_output(process: subprocess.Popen, name: str) -> None:
@ -395,7 +395,7 @@ def _stream_output(process: subprocess.Popen, name: str) -> None:
process: The subprocess.Popen object.
name: Name of the process.
"""
stdout_style = "green" if name == "Actor" else "#87CEFA"
stdout_style = "green" if name == "Worker" else "#87CEFA"
def stream(pipe: io.TextIOWrapper | None, style: str) -> None:
if pipe is None:
@ -418,23 +418,23 @@ def _stream_output(process: subprocess.Popen, name: str) -> None:
threading.Thread(target=stream, args=(process.stderr, "red"), daemon=True).start()
def _monitor_processes(actor_process: subprocess.Popen, engine_process: subprocess.Popen) -> None:
def _monitor_processes(worker_process: subprocess.Popen, engine_process: subprocess.Popen) -> None:
"""
Monitors the actor and engine processes, restarts them if they exit.
Monitors the worker and engine processes, restarts them if they exit.
Args:
actor_process: The actor subprocess.
worker_process: The worker subprocess.
engine_process: The engine subprocess.
"""
while True:
actor_status = actor_process.poll()
worker_status = worker_process.poll()
engine_status = engine_process.poll()
if actor_status is not None or engine_status is not None:
if actor_status is not None:
if worker_status is not None or engine_status is not None:
if worker_status is not None:
console.print(
f"Actor process exited with code {actor_status}. Restarting both processes...",
f"Worker process exited with code {worker_status}. Restarting both processes...",
style="bold red",
)
if engine_status is not None:
@ -442,7 +442,7 @@ def _monitor_processes(actor_process: subprocess.Popen, engine_process: subproce
f"Engine process exited with code {engine_status}. Restarting both processes...",
style="bold red",
)
_terminate_process(actor_process)
_terminate_process(worker_process)
_terminate_process(engine_process)
time.sleep(1)
break # Exit to restart both processes

View file

@ -413,9 +413,9 @@ def evals(
@cli.command(help="Launch Arcade AI locally for tool dev", rich_help_panel="Launch")
def dev(
host: str = typer.Option("127.0.0.1", help="Host for the actor server.", show_default=True),
host: str = typer.Option("127.0.0.1", help="Host for the worker server.", show_default=True),
port: int = typer.Option(
8002, "-p", "--port", help="Port for the actor server.", show_default=True
8002, "-p", "--port", help="Port for the worker server.", show_default=True
),
engine_config: str = typer.Option(
None, "-c", "--config", help="Path to the engine configuration file."
@ -426,7 +426,7 @@ def dev(
debug: bool = typer.Option(False, "-d", "--debug", help="Show debug information"),
) -> None:
"""
Start both the actor and engine servers.
Start both the worker and engine servers.
"""
try:
start_servers(host, port, engine_config, engine_env=env_file, debug=debug)
@ -436,8 +436,8 @@ def dev(
typer.Exit(code=1)
@cli.command(help="Start a local Arcade Actor server", rich_help_panel="Launch", hidden=True)
def actorup(
@cli.command(help="Start a local Arcade Worker server", rich_help_panel="Launch", hidden=True)
def workerup(
host: str = typer.Option(
"127.0.0.1",
help="Host for the app, from settings by default.",
@ -449,7 +449,7 @@ def actorup(
disable_auth: bool = typer.Option(
False,
"--no-auth",
help="Disable authentication for the actor. Not recommended for production.",
help="Disable authentication for the worker. Not recommended for production.",
show_default=True,
),
otel_enable: bool = typer.Option(
@ -458,18 +458,18 @@ def actorup(
debug: bool = typer.Option(False, "--debug", "-d", help="Show debug information"),
) -> None:
"""
Starts the actor with host, port, and reload options. Uses
Uvicorn as ASGI actor. Parameters allow runtime configuration.
Starts the worker with host, port, and reload options. Uses
Uvicorn as ASGI worker. Parameters allow runtime configuration.
"""
from arcade.cli.serve import serve_default_actor
from arcade.cli.serve import serve_default_worker
try:
serve_default_actor(
serve_default_worker(
host, port, disable_auth=disable_auth, enable_otel=otel_enable, debug=debug
)
except KeyboardInterrupt:
typer.Exit()
except Exception as e:
error_message = f"❌ Failed to start Arcade Actor: {escape(str(e))}"
error_message = f"❌ Failed to start Arcade Worker: {escape(str(e))}"
console.print(error_message, style="bold red")
typer.Exit(code=1)

View file

@ -23,8 +23,8 @@ except ImportError:
"Uvicorn is not installed. Please install it using `pip install arcade-ai[fastapi]`."
)
from arcade.actor.fastapi.actor import FastAPIActor
from arcade.sdk import Toolkit
from arcade.worker.fastapi.worker import FastAPIWorker
class InterceptHandler(logging.Handler):
@ -80,7 +80,7 @@ async def lifespan(app: fastapi.FastAPI): # type: ignore[no-untyped-def]
logger.debug("Lifespan cancelled.")
def serve_default_actor(
def serve_default_worker(
host: str = "127.0.0.1",
port: int = 8002,
disable_auth: bool = False,
@ -91,7 +91,7 @@ def serve_default_actor(
**kwargs: Any,
) -> None:
"""
Get an instance of a FastAPI server with the Arcade Actor.
Get an instance of a FastAPI server with the Arcade Worker.
"""
# Setup unified logging
setup_logging(log_level=logging.DEBUG if debug else logging.INFO)
@ -105,27 +105,27 @@ def serve_default_actor(
num_tools = sum(len(tools) for tools in toolkit.tools.values())
logger.info(f" - {toolkit.name} ({toolkit.package_name}): {num_tools} tools")
actor_secret = os.environ.get("ARCADE_ACTOR_SECRET")
if not disable_auth and not actor_secret:
worker_secret = os.environ.get("ARCADE_WORKER_SECRET")
if not disable_auth and not worker_secret:
logger.warning(
"Warning: ARCADE_ACTOR_SECRET environment variable is not set. Using 'dev' as the actor secret.",
"Warning: ARCADE_WORKER_SECRET environment variable is not set. Using 'dev' as the worker secret.",
)
actor_secret = actor_secret or "dev"
worker_secret = worker_secret or "dev"
app = fastapi.FastAPI(
title="Arcade AI Actor",
description="Arcade AI default Actor implementation using FastAPI.",
title="Arcade AI Worker",
description="Arcade AI default Worker implementation using FastAPI.",
version="0.1.0",
lifespan=lifespan, # Use custom lifespan to catch errors, notably KeyboardInterrupt (Ctrl+C)
)
otel_handler = OTELHandler(app, enable=enable_otel)
actor = FastAPIActor(
app, secret=actor_secret, disable_auth=disable_auth, otel_meter=otel_handler.get_meter()
worker = FastAPIWorker(
app, secret=worker_secret, disable_auth=disable_auth, otel_meter=otel_handler.get_meter()
)
for toolkit in toolkits:
actor.register_toolkit(toolkit)
worker.register_toolkit(toolkit)
logger.info("Starting FastAPI server...")

View file

@ -108,7 +108,7 @@ class MaterializedTool(BaseModel):
class ToolCatalog(BaseModel):
"""Singleton class that holds all tools for a given actor"""
"""Singleton class that holds all tools for a given worker"""
_tools: dict[FullyQualifiedName, MaterializedTool] = {}

View file

@ -38,7 +38,7 @@ class OTELHandler:
"🔎 Initializing OpenTelemetry. Use environment variables to configure the connection"
)
self.resource = Resource(
attributes={SERVICE_NAME: "arcade-actor", "environment": self.environment}
attributes={SERVICE_NAME: "arcade-worker", "environment": self.environment}
)
self._init_tracer()

View file

@ -14,10 +14,10 @@ ARCADE_API_PORT=9099
# ANTHROPIC_API_KEY=
# GROQ_API_KEY=
### Actor Configuration ###
### Worker Configuration ###
ARCADE_ACTOR_URI=http://localhost:8002
# ARCADE_ACTOR_SECRET=
ARCADE_WORKER_URI=http://localhost:8002
# ARCADE_WORKER_SECRET=
### Token Storage ###
# REDIS_HOST=

View file

@ -59,7 +59,7 @@
```bash
arcade login
```
4. Start the Arcade Engine and Actor:
4. Start the Arcade Engine and Worker:
```bash
arcade dev
```

View file

@ -19,19 +19,19 @@ class SigningAlgorithm(str, Enum):
HS256 = "HS256"
def validate_engine_token(actor_secret: str, token: str) -> TokenValidationResult:
def validate_engine_token(worker_secret: str, token: str) -> TokenValidationResult:
try:
payload = jwt.decode(
token,
actor_secret,
worker_secret,
algorithms=[SigningAlgorithm.HS256],
verify=True,
audience="actor",
audience="worker",
)
except jwt.InvalidSignatureError as e:
logger.warning(
"Invalid signature. Is the Arcade Engine configured with the Actor secret '%s'?",
actor_secret,
"Invalid signature. Is the Arcade Engine configured with the Worker secret '%s'?",
worker_secret,
)
return TokenValidationResult(valid=False, error=str(e))

View file

@ -7,13 +7,6 @@ from typing import Any, Callable, ClassVar
from opentelemetry import trace
from opentelemetry.metrics import Meter
from arcade.actor.core.common import Actor, Router
from arcade.actor.core.components import (
ActorComponent,
CallToolComponent,
CatalogComponent,
HealthCheckComponent,
)
from arcade.core.catalog import ToolCatalog, Toolkit
from arcade.core.executor import ToolExecutor
from arcade.core.schema import (
@ -21,19 +14,26 @@ from arcade.core.schema import (
ToolCallResponse,
ToolDefinition,
)
from arcade.worker.core.common import Router, Worker
from arcade.worker.core.components import (
CallToolComponent,
CatalogComponent,
HealthCheckComponent,
WorkerComponent,
)
logger = logging.getLogger(__name__)
class BaseActor(Actor):
class BaseWorker(Worker):
"""
A base actor class that provides a default implementation for registering tools and invoking them.
Actor implementations for specific web frameworks will inherit from this class.
A base worker class that provides a default implementation for registering tools and invoking them.
Worker implementations for specific web frameworks will inherit from this class.
"""
base_path = "/actor" # By default, prefix all our routes with /actor
base_path = "/worker" # By default, prefix all our routes with /worker
default_components: ClassVar[tuple[type[ActorComponent], ...]] = (
default_components: ClassVar[tuple[type[WorkerComponent], ...]] = (
CatalogComponent,
CallToolComponent,
HealthCheckComponent,
@ -43,14 +43,14 @@ class BaseActor(Actor):
self, secret: str | None = None, disable_auth: bool = False, otel_meter: Meter | None = None
) -> None:
"""
Initialize the BaseActor with an empty ToolCatalog.
If no secret is provided, the actor will use the ARCADE_ACTOR_SECRET environment variable.
Initialize the BaseWorker with an empty ToolCatalog.
If no secret is provided, the worker will use the ARCADE_WORKER_SECRET environment variable.
"""
self.catalog = ToolCatalog()
self.disable_auth = disable_auth
if disable_auth:
logger.warning(
"Warning: Actor is running without authentication. Not recommended for production."
"Warning: Worker is running without authentication. Not recommended for production."
)
self.secret = self._set_secret(secret, disable_auth)
@ -71,12 +71,12 @@ class BaseActor(Actor):
return secret
# If secret is not provided, try to get it from environment variables
env_secret = os.environ.get("ARCADE_ACTOR_SECRET")
env_secret = os.environ.get("ARCADE_WORKER_SECRET")
if env_secret:
return env_secret
raise ValueError(
"No secret provided for actor. Set the ARCADE_ACTOR_SECRET environment variable."
"No secret provided for worker. Set the ARCADE_WORKER_SECRET environment variable."
)
def get_catalog(self) -> list[ToolDefinition]:
@ -173,7 +173,7 @@ class BaseActor(Actor):
def health_check(self) -> dict[str, Any]:
"""
Provide a health check that serves as a heartbeat of actor health.
Provide a health check that serves as a heartbeat of worker health.
"""
return {"status": "ok", "tool_count": len(self.catalog)}

View file

@ -8,7 +8,7 @@ from arcade.core.schema import ToolCallRequest, ToolCallResponse, ToolDefinition
class RequestData(BaseModel):
"""
The raw data for a request to an actor.
The raw data for a request to a worker.
This is not intended to represent everything about an HTTP request,
but just the essential info a framework integration will need to extract from the request.
"""
@ -23,7 +23,7 @@ class RequestData(BaseModel):
class Router(ABC):
"""
A router is responsible for adding routes to the underlying framework hosting the actor.
A router is responsible for adding routes to the underlying framework hosting the worker.
"""
@abstractmethod
@ -36,37 +36,37 @@ class Router(ABC):
pass
class Actor(ABC):
class Worker(ABC):
"""
An Actor represents a collection of tools that is hosted inside a web framework
A Worker represents a collection of tools that is hosted inside a web framework
and can be called by an Engine.
"""
@abstractmethod
def get_catalog(self) -> list[ToolDefinition]:
"""
Get the catalog of tools available in the actor.
Get the catalog of tools available in the worker.
"""
pass
@abstractmethod
async def call_tool(self, request: ToolCallRequest) -> ToolCallResponse:
"""
Send a request to call a tool to the Actor
Send a request to call a tool to the Worker
"""
pass
@abstractmethod
def health_check(self) -> dict[str, Any]:
"""
Perform a health check of the actor
Perform a health check of the worker
"""
pass
class ActorComponent(ABC):
def __init__(self, actor: Actor) -> None:
self.actor = actor
class WorkerComponent(ABC):
def __init__(self, worker: Worker) -> None:
self.worker = worker
@abstractmethod
def register(self, router: Router) -> None:

View file

@ -2,13 +2,13 @@ from typing import Any
from opentelemetry import trace
from arcade.actor.core.common import Actor, ActorComponent, RequestData, Router
from arcade.core.schema import ToolCallRequest, ToolCallResponse, ToolDefinition
from arcade.worker.core.common import RequestData, Router, Worker, WorkerComponent
class CatalogComponent(ActorComponent):
def __init__(self, actor: Actor) -> None:
self.actor = actor
class CatalogComponent(WorkerComponent):
def __init__(self, worker: Worker) -> None:
self.worker = worker
def register(self, router: Router) -> None:
"""
@ -22,12 +22,12 @@ class CatalogComponent(ActorComponent):
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("Catalog"):
return self.actor.get_catalog()
return self.worker.get_catalog()
class CallToolComponent(ActorComponent):
def __init__(self, actor: Actor) -> None:
self.actor = actor
class CallToolComponent(WorkerComponent):
def __init__(self, worker: Worker) -> None:
self.worker = worker
def register(self, router: Router) -> None:
"""
@ -43,12 +43,12 @@ class CallToolComponent(ActorComponent):
with tracer.start_as_current_span("CallTool"):
call_tool_request_data = request.body_json
call_tool_request = ToolCallRequest.model_validate(call_tool_request_data)
return await self.actor.call_tool(call_tool_request)
return await self.worker.call_tool(call_tool_request)
class HealthCheckComponent(ActorComponent):
def __init__(self, actor: Actor) -> None:
self.actor = actor
class HealthCheckComponent(WorkerComponent):
def __init__(self, worker: Worker) -> None:
self.worker = worker
def register(self, router: Router) -> None:
"""
@ -62,4 +62,4 @@ class HealthCheckComponent(ActorComponent):
"""
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("HealthCheck"):
return self.actor.health_check()
return self.worker.health_check()

View file

@ -1,16 +1,16 @@
from fastapi import HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from arcade.actor.core.auth import validate_engine_token
from arcade.worker.core.auth import validate_engine_token
# Dependency function to validate JWT
async def validate_engine_request(
actor_secret: str,
worker_secret: str,
credentials: HTTPAuthorizationCredentials,
) -> None:
jwt: str = credentials.credentials
validation_result = validate_engine_token(actor_secret, jwt)
validation_result = validate_engine_token(worker_secret, jwt)
if not validation_result.valid:
raise HTTPException(

View file

@ -5,18 +5,18 @@ from fastapi import Depends, FastAPI, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from opentelemetry.metrics import Meter
from arcade.actor.core.base import (
BaseActor,
from arcade.worker.core.base import (
BaseWorker,
Router,
)
from arcade.actor.core.common import RequestData
from arcade.actor.fastapi.auth import validate_engine_request
from arcade.actor.utils import is_async_callable
from arcade.worker.core.common import RequestData
from arcade.worker.fastapi.auth import validate_engine_request
from arcade.worker.utils import is_async_callable
class FastAPIActor(BaseActor):
class FastAPIWorker(BaseWorker):
"""
An Arcade Actor that is hosted inside a FastAPI app.
An Arcade Worker that is hosted inside a FastAPI app.
"""
def __init__(
@ -28,8 +28,8 @@ class FastAPIActor(BaseActor):
otel_meter: Meter | None = None,
) -> None:
"""
Initialize the FastAPIActor with a FastAPI app instance.
If no secret is provided, the actor will use the ARCADE_ACTOR_SECRET environment variable.
Initialize the FastAPIWorker with a FastAPI app instance.
If no secret is provided, the worker will use the ARCADE_WORKER_SECRET environment variable.
"""
super().__init__(secret, disable_auth, otel_meter)
self.app = app
@ -41,28 +41,28 @@ security = HTTPBearer() # Authorization: Bearer <xxx>
class FastAPIRouter(Router):
def __init__(self, app: FastAPI, actor: BaseActor) -> None:
def __init__(self, app: FastAPI, worker: BaseWorker) -> None:
self.app = app
self.actor = actor
self.worker = worker
def _wrap_handler(self, handler: Callable, require_auth: bool = True) -> Callable:
"""
Wrap the handler to handle FastAPI-specific request and response.
"""
use_auth_for_route = not self.actor.disable_auth and require_auth
use_auth_for_route = not self.worker.disable_auth and require_auth
def call_validate_engine_request(actor_secret: str) -> Callable:
def call_validate_engine_request(worker_secret: str) -> Callable:
async def dependency(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> None:
await validate_engine_request(actor_secret, credentials)
await validate_engine_request(worker_secret, credentials)
return dependency
async def wrapped_handler(
request: Request,
_: None = Depends(call_validate_engine_request(self.actor.secret))
_: None = Depends(call_validate_engine_request(self.worker.secret))
if use_auth_for_route
else None,
) -> Any:
@ -87,7 +87,7 @@ class FastAPIRouter(Router):
Add a route to the FastAPI application.
"""
self.app.add_api_route(
f"{self.actor.base_path}/{endpoint_path}",
f"{self.worker.base_path}/{endpoint_path}",
self._wrap_handler(handler, require_auth),
methods=[method],
)

View file

@ -38,7 +38,7 @@ def test_init_with_enable_true(
handler = OTELHandler(app, enable=True)
# Verify that the resource is set correctly
assert handler.resource.attributes["service.name"] == "arcade-actor"
assert handler.resource.attributes["service.name"] == "arcade-worker"
assert "environment" in handler.resource.attributes
# Verify that initialization methods are called

View file

@ -42,7 +42,7 @@ RUN python -m pip install -r ./requirements.txt
# Expose the port
EXPOSE $PORT
# Run the arcade actorup (hidden cli command)
# Run the arcade workerup (hidden cli command)
COPY docker/start.sh /app/start.sh
RUN chmod +x /app/start.sh
CMD ["/app/start.sh"]

View file

@ -9,7 +9,7 @@ ECR_ENDPOINT ?= 471112909428.dkr.ecr.us-east-1.amazonaws.com
VERSION ?= 0.1.0.dev0
COMMIT ?= $(shell git describe --dirty --always --abbrev=15)
BUILD_DATE ?= $(shell date -u +"%Y-%m-%dT%H:%M:%SZ")
IMAGE_NAME ?= actor
IMAGE_NAME ?= worker
PORT ?= 8002
@ -57,7 +57,7 @@ ecr-login: # Login to ECR
.PHONY: help
help:
@echo "🛠️ Actor Docker Commands:\n"
@echo "🛠️ Worker Docker Commands:\n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
.DEFAULT_GOAL := help

View file

@ -7,4 +7,4 @@ for toolkit in $(echo $TOOLKITS | tr "," " "); do
done
echo "Starting arcade..."
arcade actorup --host $HOST --port $PORT $([ "$OTEL_ENABLE" = "true" ] && echo "--otel-enable")
arcade workerup --host $HOST --port $PORT $([ "$OTEL_ENABLE" = "true" ] && echo "--otel-enable")

View file

@ -5,16 +5,16 @@ from fastapi import FastAPI, HTTPException
from openai import AsyncOpenAI
from pydantic import BaseModel
from arcade.actor.fastapi.actor import FastAPIActor
from arcade.sdk import Toolkit
from arcade.worker.fastapi.worker import FastAPIWorker
client = AsyncOpenAI(api_key=os.environ["ARCADE_API_KEY"], base_url="http://localhost:9099/v1")
app = FastAPI()
actor_secret = os.environ["ARCADE_ACTOR_SECRET"]
actor = FastAPIActor(app, secret=actor_secret)
actor.register_toolkit(Toolkit.from_module(arcade_math))
worker_secret = os.environ["ARCADE_WORKER_SECRET"]
worker = FastAPIWorker(app, secret=worker_secret)
worker.register_toolkit(Toolkit.from_module(arcade_math))
class ChatRequest(BaseModel):