API Reference
Python API reference for the desktop automation benchmarking framework
cua-bench SDK - A framework for desktop automation tasks with batch processing.
Classes
| Class | Description |
|---|---|
Task | Represents a single task to be executed. |
Desktop | Desktop environment manager. |
Environment | A minimal environment wrapper that delegates everything to a provider. |
BenchmarkResult | Result of a benchmark run. |
TaskResult | Result of a single task execution. |
ClickAction | No description |
DoneAction | No description |
DoubleClickAction | No description |
DragAction | No description |
HotkeyAction | No description |
KeyAction | No description |
MiddleClickAction | No description |
MoveToAction | No description |
RightClickAction | No description |
ScrollAction | No description |
TypeAction | No description |
WaitAction | No description |
Functions
| Function | Description |
|---|---|
repr_to_action | Parse an action from repr format string. |
interact | Run an environment interactively with simplified output. |
make | Create an Environment by loading the env's main.py as a module. |
evaluate_task | Decorator for the function that evaluates a task. |
setup_task | Decorator for the function that sets up a task. |
solve_task | Decorator for the function that solves a task. |
tasks_config | Decorator for the function that loads tasks. |
run_benchmark | Run a benchmark on a dataset using the gym interface. |
run_interactive | Run an environment interactively using the gym interface. |
run_single_task | Run a single task using the gym interface. |
Task
Represents a single task to be executed.
Constructor
Task(self, description: str, task_id: Optional[str] = None, metadata: Optional[dict] = None, computer: Optional[dict] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
description | str | |
task_id | Optional[str] | |
metadata | Optional[dict] | |
computer | Optional[dict] |
Desktop
Desktop environment manager.
Constructor
Desktop(self, env)Attributes
| Name | Type | Description |
|---|---|---|
env | Any | |
state | Any | |
template | Any |
Methods
Desktop.configure
def configure(self, os_type: Optional[str] = None, width: Optional[int] = None, height: Optional[int] = None, background: Optional[str] = None, dock_state: Optional[Dict[str, List[Union[str, Dict[str, str]]]]] = None, randomize_dock: bool = True, taskbar_state: Optional[Dict[str, List[Union[str, Dict[str, str]]]]] = None, randomize_taskbar: bool = True)Configure desktop appearance.
Parameters:
| Name | Type | Description |
|---|---|---|
os_type | Any | OS appearance (win11, win10, win7, macos, winxp, win98, android, ios) |
width | Any | Screen width in pixels |
height | Any | Screen height in pixels |
background | Any | Background color |
dock_state | Any | Explicit dock state to set with keys 'pinned_apps', 'recent_apps', 'pinned_folders' |
randomize_dock | Any | If True, populate dock_state using macOS icon sets |
taskbar_state | Any | Explicit taskbar state to set with keys 'pinned_apps', 'open_apps' |
randomize_taskbar | Any | If True, populate taskbar_state using Windows 11 icon sets |
Desktop.launch
def launch(self, content: str, title: str = 'Window', x: Optional[int] = None, y: Optional[int] = None, width: int = 600, height: int = 400, icon: Optional[str] = None, use_inner_size: bool = False, title_bar_style: str = 'default') -> WindowLaunch a new window on the desktop.
Parameters:
| Name | Type | Description |
|---|---|---|
content | Any | HTML content for the window body |
title | Any | Window title |
x | Any | X position (auto-calculated if None) |
y | Any | Y position (auto-calculated if None) |
width | Any | Window width |
height | Any | Window height |
use_inner_size | Any | Whether to use the inner size of the window (i.e. content size) |
Returns: Window instance
Environment
A minimal environment wrapper that delegates everything to a provider.
Functions can be injected directly, or discovered from a module via
make_from_module based on cua-bench decorators (_td_type, _td_split).
Constructor
Environment(self, env_name: Optional[str] = None, split: str = 'train', tasks_config_fn: Optional[Callable[..., Any]] = None, setup_task_fn: Optional[Callable[..., Any]] = None, solve_task_fn: Optional[Callable[..., Any]] = None, evaluate_task_fn: Optional[Callable[..., Any]] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
session | Optional[Any] | |
env_name | Optional[str] | |
split | Optional[str] | |
headless | bool | |
print_actions | bool | |
bot | Optional[Bot] | |
tracing | Optional[Tracing] | |
step_count | int | |
max_steps | Optional[int] | |
tasks_config_fn | Any | |
setup_task_fn | Any | |
solve_task_fn | Any | |
evaluate_task_fn | Any | |
tasks | Optional[list] | |
current_task | Optional[Any] | |
session_name | Optional[str] | |
session_config | Dict[str, Any] | |
setup_config | DesktopSetupConfig | |
page | Optional[Any] |
Methods
Environment.make_from_module
def make_from_module(cls, module: Any, env_path: str | Path, split: str = 'train') -> 'Environment'Environment.create_sandbox
async def create_sandbox(self, provider: str, provider_config: Dict[str, Any] | None = None, setup_config: DesktopSetupConfig | None = None) -> NoneEnvironment.reset
async def reset(self, task_id: Optional[int] = None, run_id: Optional[str] = None) -> Tuple[bytes, Dict]Environment.step
async def step(self, action: Action, dry_run: bool | Literal['before', 'after'] = False) -> bytesEnvironment.solve
async def solve(self) -> bytesEnvironment.evaluate
async def evaluate(self) -> AnyEnvironment.close
async def close(self) -> NoneBenchmarkResult
Result of a benchmark run.
Attributes: run_id: Unique identifier for this run task_results: List of individual task results total_tasks: Total number of tasks in the benchmark success_count: Number of successful tasks failed_count: Number of failed tasks avg_reward: Average reward across all tasks duration_seconds: Total duration of the benchmark output_dir: Output directory for results (if any)
Constructor
BenchmarkResult(self, run_id: str, task_results: List[Dict[str, Any]], total_tasks: int, success_count: int, failed_count: int, avg_reward: float, duration_seconds: float, output_dir: Optional[str] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
run_id | str | |
task_results | List[Dict[str, Any]] | |
total_tasks | int | |
success_count | int | |
failed_count | int | |
avg_reward | float | |
duration_seconds | float | |
output_dir | Optional[str] |
TaskResult
Result of a single task execution.
Attributes: task_path: Path to the task variant_id: Task variant index success: Whether the task succeeded reward: Reward from evaluation steps: Number of steps taken error: Error message if failed
Constructor
TaskResult(self, task_path: str, variant_id: int, success: bool, reward: float, steps: int, error: Optional[str] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
task_path | str | |
variant_id | int | |
success | bool | |
reward | float | |
steps | int | |
error | Optional[str] |
ClickAction
Constructor
ClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
DoneAction
Constructor
DoneAction(self) -> NoneDoubleClickAction
Constructor
DoubleClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
DragAction
Constructor
DragAction(self, from_x: int, from_y: int, to_x: int, to_y: int, duration: float = 1.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
from_x | int | |
from_y | int | |
to_x | int | |
to_y | int | |
duration | float |
HotkeyAction
Constructor
HotkeyAction(self, keys: List[str]) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
keys | List[str] |
KeyAction
Constructor
KeyAction(self, key: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
key | str |
MiddleClickAction
Constructor
MiddleClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
MoveToAction
Constructor
MoveToAction(self, x: int, y: int, duration: float = 0.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int | |
duration | float |
RightClickAction
Constructor
RightClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
ScrollAction
Constructor
ScrollAction(self, direction: Literal['up', 'down'] = 'up', amount: int = 100) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
direction | Literal['up', 'down'] | |
amount | int |
TypeAction
Constructor
TypeAction(self, text: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
text | str |
WaitAction
Constructor
WaitAction(self, seconds: float = 1.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
seconds | float |
repr_to_action
def repr_to_action(action_repr: str) -> ActionParse an action from repr format string.
Parameters:
| Name | Type | Description |
|---|---|---|
action_repr | Any | Action string in repr format, e.g., "ClickAction(x=100, y=200)" |
Returns: Parsed Action object
Raises:
ValueError- If the action string cannot be parsed
interact
def interact(env_path: str, task_id: int = 0) -> NoneRun an environment interactively with simplified output.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the environment directory |
task_id | Any | Task ID to run (default: 0) |
make
def make(env_name: str, split: str = 'train') -> AnyCreate an Environment by loading the env's main.py as a module.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Path to the environment directory (must contain main.py) |
split | Any | Dataset split to use for decorated functions (e.g., 'train', 'test') |
Returns: Environment instance
evaluate_task
def evaluate_task(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that evaluates a task.
Can be used as @cb.evaluate_task or @cb.evaluate_task("train").
The decorated function receives task_cfg and should return evaluation results.
setup_task
def setup_task(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that sets up a task.
Can be used as @cb.setup_task or @cb.setup_task("train").
The decorated function receives task_cfg and should initialize the environment.
solve_task
def solve_task(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that solves a task.
Can be used as @cb.solve_task or @cb.solve_task("train").
The decorated function receives task_cfg and should execute the solution.
tasks_config
def tasks_config(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that loads tasks.
Can be used as @cb.tasks_config or @cb.tasks_config("train").
The decorated function should return a list of Task objects.
run_benchmark
async def run_benchmark(dataset_path: Path, agent_fn: Optional[Callable[[bytes, Task], Action]] = None, max_steps: int = 100, max_parallel: int = 4, oracle: bool = False, max_variants: Optional[int] = None, task_filter: Optional[str] = None, split: str = 'train') -> BenchmarkResultRun a benchmark on a dataset using the gym interface.
This function runs multiple tasks in parallel using the core gym interface (make, reset, step, evaluate).
Parameters:
| Name | Type | Description |
|---|---|---|
dataset_path | Any | Path to the dataset directory |
agent_fn | Any | Optional agent function that takes (screenshot, task_config) and returns an Action. Required if oracle=False. |
max_steps | Any | Maximum steps per task (default: 100) |
max_parallel | Any | Maximum parallel workers (default: 4) |
oracle | Any | Run oracle/solver mode (default: False) |
max_variants | Any | Maximum variants per task (optional) |
task_filter | Any | Glob pattern to filter tasks (optional) |
split | Any | Dataset split (default: "train") |
Returns: BenchmarkResult with run statistics and task results
Example:
# Run oracle benchmark
result = await run_benchmark(
Path("./datasets/cua-bench-basic"),
oracle=True,
max_parallel=8,
)
print(f"Success rate: {result.success_count / result.total_tasks:.2%}")
# Run with custom agent
def random_agent(screenshot: bytes, task: Task) -> Action:
import random
return random.choice([
ClickAction(x=random.randint(0, 1920), y=random.randint(0, 1080)),
DoneAction(),
])
result = await run_benchmark(
Path("./datasets/my-dataset"),
agent_fn=random_agent,
max_parallel=4,
)run_interactive
async def run_interactive(env_path: Path, task_index: int = 0, split: str = 'train', headless: bool = False) -> Tuple[Environment, bytes, Task]Run an environment interactively using the gym interface.
This function sets up an environment for interactive use, returning the environment instance, initial screenshot, and task configuration.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the environment directory |
task_index | Any | Task variant index (default: 0) |
split | Any | Dataset split (default: "train") |
headless | Any | Run in headless mode (default: False) |
Returns: Tuple of (env, screenshot, task_config) - env: Environment instance (caller should call env.close() when done) - screenshot: Initial screenshot bytes - task_config: Task configuration
Example:
env, screenshot, task_cfg = await run_interactive(Path("./task"))
print(f"Task: {task_cfg.description}")
# Execute actions...
screenshot = await env.step(ClickAction(x=100, y=200))
# Evaluate
reward = await env.evaluate()
print(f"Reward: {reward}")
# Cleanup
await env.close()run_single_task
async def run_single_task(env_path: Path, task_index: int = 0, split: str = 'train', agent_fn: Optional[Callable[[bytes, Task], Action]] = None, max_steps: int = 100, oracle: bool = False) -> TaskResultRun a single task using the gym interface.
This function uses the core gym interface (make, reset, step, evaluate) to run a task with either an agent function or the oracle solver.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the task environment directory |
task_index | Any | Task variant index (default: 0) |
split | Any | Dataset split (default: "train") |
agent_fn | Any | Optional agent function that takes (screenshot, task_config) and returns an Action. If None and oracle=False, returns after setup. |
max_steps | Any | Maximum steps per task (default: 100) |
oracle | Any | Run oracle/solver mode (default: False) |
Returns: TaskResult with execution results
Example:
# Run with oracle
result = await run_single_task(Path("./task"), oracle=True)
# Run with custom agent
def my_agent(screenshot: bytes, task: Task) -> Action:
return DoneAction() # Simple agent that immediately finishes
result = await run_single_task(Path("./task"), agent_fn=my_agent)tracing
Tracing
Lightweight trajectory tracing using Hugging Face Datasets.
Records events with arbitrary JSON metadata and a list of PIL images. Exposes a datasets.Dataset-compatible interface for saving/pushing.
Constructor
Tracing(self, env: Any) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
env | Any | |
trajectory_id | Optional[str] | |
dataset | Dataset | Return a HF Dataset built from current rows, constructing lazily. |
Methods
Tracing.start
def start(self, trajectory_id: Optional[str] = None) -> strStart a new trajectory. Resets any previously recorded rows.
Returns the trajectory_id used.
Tracing.record
def record(self, event_name: str, data_dict: Dict[str, Any], data_images: List[Image.Image | bytes] | None = None) -> NoneTracing.save_to_disk
def save_to_disk(self, output_dir: str, save_pngs: bool = False, image_dir: Optional[str] = None, filter_events: Optional[List[str]] = None) -> NoneTracing.push_to_hub
def push_to_hub(self, repo_id: str, private: bool | None = None) -> strTracing.bytes_to_image
def bytes_to_image(png_bytes: bytes) -> Image.Imageactions
ClickAction
Constructor
ClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
DoneAction
Constructor
DoneAction(self) -> NoneDoubleClickAction
Constructor
DoubleClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
DragAction
Constructor
DragAction(self, from_x: int, from_y: int, to_x: int, to_y: int, duration: float = 1.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
from_x | int | |
from_y | int | |
to_x | int | |
to_y | int | |
duration | float |
HotkeyAction
Constructor
HotkeyAction(self, keys: List[str]) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
keys | List[str] |
KeyAction
Constructor
KeyAction(self, key: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
key | str |
MiddleClickAction
Constructor
MiddleClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
MoveToAction
Constructor
MoveToAction(self, x: int, y: int, duration: float = 0.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int | |
duration | float |
RightClickAction
Constructor
RightClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
ScrollAction
Constructor
ScrollAction(self, direction: Literal['up', 'down'] = 'up', amount: int = 100) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
direction | Literal['up', 'down'] | |
amount | int |
TypeAction
Constructor
TypeAction(self, text: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
text | str |
WaitAction
Constructor
WaitAction(self, seconds: float = 1.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
seconds | float |
repr_to_action
def repr_to_action(action_repr: str) -> ActionParse an action from repr format string.
Parameters:
| Name | Type | Description |
|---|---|---|
action_repr | Any | Action string in repr format, e.g., "ClickAction(x=100, y=200)" |
Returns: Parsed Action object
Raises:
ValueError- If the action string cannot be parsed
snake_case_to_action
def snake_case_to_action(action_str: str) -> ActionParse an action from snake_case format string.
Parameters:
| Name | Type | Description |
|---|---|---|
action_str | Any | Action string in snake_case format, e.g., "click(0.5, 0.5)" |
Returns: Parsed Action object
Raises:
ValueError- If the action string cannot be parsed
parse_action_string
def parse_action_string(action_str: str) -> ActionParse an action from either repr or snake_case format.
This is the unified entry point for parsing action strings. It automatically detects the format and delegates to the appropriate parser.
Parameters:
| Name | Type | Description |
|---|---|---|
action_str | Any | Action string in either format: - Repr format: "ClickAction(x=100, y=200)" - Snake_case format: "click(0.5, 0.5)" |
Returns: Parsed Action object
Raises:
ValueError- If the action string cannot be parsed in either format
action_to_dict
def action_to_dict(action: Action) -> Dict[str, Any]Convert an Action object to a dictionary.
Parameters:
| Name | Type | Description |
|---|---|---|
action | Any | Action object to convert |
Returns: Dictionary representation of the action with 'type' key
dict_to_action
def dict_to_action(action_dict: Dict[str, Any]) -> ActionConvert a dictionary to an Action object.
Parameters:
| Name | Type | Description |
|---|---|---|
action_dict | Any | Dictionary with 'type' key and action parameters |
Returns: Action object
Raises:
ValueError- If the action type is unknown
core
Core classes and functions for cua-bench.
Task
Represents a single task to be executed.
Constructor
Task(self, description: str, task_id: Optional[str] = None, metadata: Optional[dict] = None, computer: Optional[dict] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
description | str | |
task_id | Optional[str] | |
metadata | Optional[dict] | |
computer | Optional[dict] |
make
def make(env_name: str, split: str = 'train') -> AnyCreate an Environment by loading the env's main.py as a module.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Path to the environment directory (must contain main.py) |
split | Any | Dataset split to use for decorated functions (e.g., 'train', 'test') |
Returns: Environment instance
interact
def interact(env_path: str, task_id: int = 0) -> NoneRun an environment interactively with simplified output.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the environment directory |
task_id | Any | Task ID to run (default: 0) |
types
WindowSnapshot
Constructor
WindowSnapshot(self, window_type: Literal['webview', 'process', 'desktop'], pid: Optional[str] = None, url: Optional[str] = None, html: Optional[str] = None, title: str = '', x: int = 0, y: int = 0, width: int = 0, height: int = 0, active: bool = False, minimized: bool = False) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
window_type | Literal['webview', 'process', 'desktop'] | |
pid | Optional[str] | |
url | Optional[str] | |
html | Optional[str] | |
title | str | |
x | int | |
y | int | |
width | int | |
height | int | |
active | bool | |
minimized | bool |
Snapshot
Constructor
Snapshot(self, windows: List[WindowSnapshot]) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
windows | List[WindowSnapshot] |
ClickAction
Constructor
ClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
RightClickAction
Constructor
RightClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
DoubleClickAction
Constructor
DoubleClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
MiddleClickAction
Constructor
MiddleClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
DragAction
Constructor
DragAction(self, from_x: int, from_y: int, to_x: int, to_y: int, duration: float = 1.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
from_x | int | |
from_y | int | |
to_x | int | |
to_y | int | |
duration | float |
MoveToAction
Constructor
MoveToAction(self, x: int, y: int, duration: float = 0.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int | |
duration | float |
ScrollAction
Constructor
ScrollAction(self, direction: Literal['up', 'down'] = 'up', amount: int = 100) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
direction | Literal['up', 'down'] | |
amount | int |
TypeAction
Constructor
TypeAction(self, text: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
text | str |
KeyAction
Constructor
KeyAction(self, key: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
key | str |
HotkeyAction
Constructor
HotkeyAction(self, keys: List[str]) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
keys | List[str] |
DoneAction
Constructor
DoneAction(self) -> NoneWaitAction
Constructor
WaitAction(self, seconds: float = 1.0) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
seconds | float |
bot
ClickAction
Constructor
ClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
RightClickAction
Constructor
RightClickAction(self, x: int, y: int) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int |
Bot
Helper class for writing trajectories for task solutions.
Constructor
Bot(self, env: Any)Attributes
| Name | Type | Description |
|---|---|---|
env | Any |
Methods
Bot.click_element
def click_element(self, pid: int, selector: str) -> NoneFind element by CSS selector and click its center.
Uses provider's bench-ui bridge to fetch element rect in screen space and then dispatches a ClickAction via env.step().
Bot.right_click_element
def right_click_element(self, pid: int, selector: str) -> Noneutils
Utility functions for synthetic data generation.
DesktopSetupConfig
Inherits from: TypedDict
Configuration for desktop setup provided to providers.
Fields mirror high-level desktop appearance and workspace options.
Attributes
| Name | Type | Description |
|---|---|---|
os_type | Literal['win11', 'win10', 'win7', 'winxp', 'win98', 'macos', 'linux', 'android', 'ios', 'windows'] | |
width | int | |
height | int | |
background | str | |
wallpaper | str | |
installed_apps | List[str] | |
image | str | |
storage | str | |
memory | str | |
cpu | str | |
provider_type | str |
Environment
A minimal environment wrapper that delegates everything to a provider.
Functions can be injected directly, or discovered from a module via
make_from_module based on cua-bench decorators (_td_type, _td_split).
Constructor
Environment(self, env_name: Optional[str] = None, split: str = 'train', tasks_config_fn: Optional[Callable[..., Any]] = None, setup_task_fn: Optional[Callable[..., Any]] = None, solve_task_fn: Optional[Callable[..., Any]] = None, evaluate_task_fn: Optional[Callable[..., Any]] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
session | Optional[Any] | |
env_name | Optional[str] | |
split | Optional[str] | |
headless | bool | |
print_actions | bool | |
bot | Optional[Bot] | |
tracing | Optional[Tracing] | |
step_count | int | |
max_steps | Optional[int] | |
tasks_config_fn | Any | |
setup_task_fn | Any | |
solve_task_fn | Any | |
evaluate_task_fn | Any | |
tasks | Optional[list] | |
current_task | Optional[Any] | |
session_name | Optional[str] | |
session_config | Dict[str, Any] | |
setup_config | DesktopSetupConfig | |
page | Optional[Any] |
Methods
Environment.make_from_module
def make_from_module(cls, module: Any, env_path: str | Path, split: str = 'train') -> 'Environment'Environment.create_sandbox
async def create_sandbox(self, provider: str, provider_config: Dict[str, Any] | None = None, setup_config: DesktopSetupConfig | None = None) -> NoneEnvironment.reset
async def reset(self, task_id: Optional[int] = None, run_id: Optional[str] = None) -> Tuple[bytes, Dict]Environment.step
async def step(self, action: Action, dry_run: bool | Literal['before', 'after'] = False) -> bytesEnvironment.solve
async def solve(self) -> bytesEnvironment.evaluate
async def evaluate(self) -> AnyEnvironment.close
async def close(self) -> NoneSnapshot
Constructor
Snapshot(self, windows: List[WindowSnapshot]) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
windows | List[WindowSnapshot] |
render_snapshot_async
async def render_snapshot_async(setup_config: Dict[str, Any], snapshot: Dict[str, Any], screenshot_delay: float = 0, provider: Literal['webtop', 'computer'] = 'webtop') -> bytesRender a snapshot and return screenshot bytes (async).
Parameters:
| Name | Type | Description |
|---|---|---|
provider | Any | Provider name ("webtop" or "computer") |
setup_config | Any | Configuration dict for create_sandbox setup_config parameter |
snapshot | Any | Snapshot dict containing windows and other state |
screenshot_delay | Any | Delay in seconds before taking screenshot |
Returns: Screenshot as bytes
render_windows_async
async def render_windows_async(setup_config: Dict[str, Any], windows: List[Dict[str, Any]], screenshot_delay: float = 0, provider: Literal['webtop', 'computer'] = 'webtop', return_snapshot: bool = False, scroll_into_view: Optional[str] = None) -> bytes | Tuple[bytes, Snapshot]Render windows and return screenshot bytes (async).
Parameters:
| Name | Type | Description |
|---|---|---|
provider | Any | Provider name ("webtop" or "computer") |
setup_config | Any | Configuration dict for create_sandbox setup_config parameter |
windows | Any | List of window dicts to pass directly to launch_window |
screenshot_delay | Any | Delay in seconds before taking screenshot |
return_snapshot | Any | If True, return tuple of (bytes, Snapshot) instead of just bytes |
scroll_into_view | Any | Optional CSS selector for an element to scroll into view |
Returns: Screenshot as bytes, or tuple of (bytes, Snapshot) if return_snapshot=True
render_snapshot
def render_snapshot(setup_config: Dict[str, Any], snapshot: Dict[str, Any], screenshot_delay: float = 0, provider: Literal['webtop', 'computer'] = 'webtop') -> bytesRender a snapshot and return screenshot bytes (sync wrapper).
Parameters:
| Name | Type | Description |
|---|---|---|
provider | Any | Provider name ("webtop" or "computer") |
setup_config | Any | Configuration dict for create_sandbox setup_config parameter |
snapshot | Any | Snapshot dict containing windows and other state |
screenshot_delay | Any | Delay in seconds before taking screenshot |
Returns: Screenshot as bytes
render_windows
def render_windows(setup_config: Dict[str, Any], windows: List[Dict[str, Any]], screenshot_delay: float = 0, provider: Literal['webtop', 'computer'] = 'webtop', return_snapshot: bool = False, scroll_into_view: Optional[str] = None) -> bytes | Tuple[bytes, Snapshot]Render windows and return screenshot bytes (sync wrapper).
Parameters:
| Name | Type | Description |
|---|---|---|
provider | Any | Provider name ("webtop" or "computer") |
setup_config | Any | Configuration dict for create_sandbox setup_config parameter |
windows | Any | List of window dicts to pass directly to launch_window |
screenshot_delay | Any | Delay in seconds before taking screenshot |
return_snapshot | Any | If True, return tuple of (bytes, Snapshot) instead of just bytes |
scroll_into_view | Any | Optional CSS selector for an element to scroll into view |
Returns: Screenshot as bytes, or tuple of (bytes, Snapshot) if return_snapshot=True
runners
Benchmark runner functions for cua-bench.
This module provides programmatic interfaces for running benchmarks and interactive environments, using the core gym interface (make, reset, step, evaluate).
Task
Represents a single task to be executed.
Constructor
Task(self, description: str, task_id: Optional[str] = None, metadata: Optional[dict] = None, computer: Optional[dict] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
description | str | |
task_id | Optional[str] | |
metadata | Optional[dict] | |
computer | Optional[dict] |
Environment
A minimal environment wrapper that delegates everything to a provider.
Functions can be injected directly, or discovered from a module via
make_from_module based on cua-bench decorators (_td_type, _td_split).
Constructor
Environment(self, env_name: Optional[str] = None, split: str = 'train', tasks_config_fn: Optional[Callable[..., Any]] = None, setup_task_fn: Optional[Callable[..., Any]] = None, solve_task_fn: Optional[Callable[..., Any]] = None, evaluate_task_fn: Optional[Callable[..., Any]] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
session | Optional[Any] | |
env_name | Optional[str] | |
split | Optional[str] | |
headless | bool | |
print_actions | bool | |
bot | Optional[Bot] | |
tracing | Optional[Tracing] | |
step_count | int | |
max_steps | Optional[int] | |
tasks_config_fn | Any | |
setup_task_fn | Any | |
solve_task_fn | Any | |
evaluate_task_fn | Any | |
tasks | Optional[list] | |
current_task | Optional[Any] | |
session_name | Optional[str] | |
session_config | Dict[str, Any] | |
setup_config | DesktopSetupConfig | |
page | Optional[Any] |
Methods
Environment.make_from_module
def make_from_module(cls, module: Any, env_path: str | Path, split: str = 'train') -> 'Environment'Environment.create_sandbox
async def create_sandbox(self, provider: str, provider_config: Dict[str, Any] | None = None, setup_config: DesktopSetupConfig | None = None) -> NoneEnvironment.reset
async def reset(self, task_id: Optional[int] = None, run_id: Optional[str] = None) -> Tuple[bytes, Dict]Environment.step
async def step(self, action: Action, dry_run: bool | Literal['before', 'after'] = False) -> bytesEnvironment.solve
async def solve(self) -> bytesEnvironment.evaluate
async def evaluate(self) -> AnyEnvironment.close
async def close(self) -> NoneDoneAction
Constructor
DoneAction(self) -> NoneBenchmarkResult
Result of a benchmark run.
Attributes: run_id: Unique identifier for this run task_results: List of individual task results total_tasks: Total number of tasks in the benchmark success_count: Number of successful tasks failed_count: Number of failed tasks avg_reward: Average reward across all tasks duration_seconds: Total duration of the benchmark output_dir: Output directory for results (if any)
Constructor
BenchmarkResult(self, run_id: str, task_results: List[Dict[str, Any]], total_tasks: int, success_count: int, failed_count: int, avg_reward: float, duration_seconds: float, output_dir: Optional[str] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
run_id | str | |
task_results | List[Dict[str, Any]] | |
total_tasks | int | |
success_count | int | |
failed_count | int | |
avg_reward | float | |
duration_seconds | float | |
output_dir | Optional[str] |
TaskResult
Result of a single task execution.
Attributes: task_path: Path to the task variant_id: Task variant index success: Whether the task succeeded reward: Reward from evaluation steps: Number of steps taken error: Error message if failed
Constructor
TaskResult(self, task_path: str, variant_id: int, success: bool, reward: float, steps: int, error: Optional[str] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
task_path | str | |
variant_id | int | |
success | bool | |
reward | float | |
steps | int | |
error | Optional[str] |
make
def make(env_name: str, split: str = 'train') -> AnyCreate an Environment by loading the env's main.py as a module.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Path to the environment directory (must contain main.py) |
split | Any | Dataset split to use for decorated functions (e.g., 'train', 'test') |
Returns: Environment instance
run_single_task
async def run_single_task(env_path: Path, task_index: int = 0, split: str = 'train', agent_fn: Optional[Callable[[bytes, Task], Action]] = None, max_steps: int = 100, oracle: bool = False) -> TaskResultRun a single task using the gym interface.
This function uses the core gym interface (make, reset, step, evaluate) to run a task with either an agent function or the oracle solver.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the task environment directory |
task_index | Any | Task variant index (default: 0) |
split | Any | Dataset split (default: "train") |
agent_fn | Any | Optional agent function that takes (screenshot, task_config) and returns an Action. If None and oracle=False, returns after setup. |
max_steps | Any | Maximum steps per task (default: 100) |
oracle | Any | Run oracle/solver mode (default: False) |
Returns: TaskResult with execution results
Example:
# Run with oracle
result = await run_single_task(Path("./task"), oracle=True)
# Run with custom agent
def my_agent(screenshot: bytes, task: Task) -> Action:
return DoneAction() # Simple agent that immediately finishes
result = await run_single_task(Path("./task"), agent_fn=my_agent)run_benchmark
async def run_benchmark(dataset_path: Path, agent_fn: Optional[Callable[[bytes, Task], Action]] = None, max_steps: int = 100, max_parallel: int = 4, oracle: bool = False, max_variants: Optional[int] = None, task_filter: Optional[str] = None, split: str = 'train') -> BenchmarkResultRun a benchmark on a dataset using the gym interface.
This function runs multiple tasks in parallel using the core gym interface (make, reset, step, evaluate).
Parameters:
| Name | Type | Description |
|---|---|---|
dataset_path | Any | Path to the dataset directory |
agent_fn | Any | Optional agent function that takes (screenshot, task_config) and returns an Action. Required if oracle=False. |
max_steps | Any | Maximum steps per task (default: 100) |
max_parallel | Any | Maximum parallel workers (default: 4) |
oracle | Any | Run oracle/solver mode (default: False) |
max_variants | Any | Maximum variants per task (optional) |
task_filter | Any | Glob pattern to filter tasks (optional) |
split | Any | Dataset split (default: "train") |
Returns: BenchmarkResult with run statistics and task results
Example:
# Run oracle benchmark
result = await run_benchmark(
Path("./datasets/cua-bench-basic"),
oracle=True,
max_parallel=8,
)
print(f"Success rate: {result.success_count / result.total_tasks:.2%}")
# Run with custom agent
def random_agent(screenshot: bytes, task: Task) -> Action:
import random
return random.choice([
ClickAction(x=random.randint(0, 1920), y=random.randint(0, 1080)),
DoneAction(),
])
result = await run_benchmark(
Path("./datasets/my-dataset"),
agent_fn=random_agent,
max_parallel=4,
)run_interactive
async def run_interactive(env_path: Path, task_index: int = 0, split: str = 'train', headless: bool = False) -> Tuple[Environment, bytes, Task]Run an environment interactively using the gym interface.
This function sets up an environment for interactive use, returning the environment instance, initial screenshot, and task configuration.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the environment directory |
task_index | Any | Task variant index (default: 0) |
split | Any | Dataset split (default: "train") |
headless | Any | Run in headless mode (default: False) |
Returns: Tuple of (env, screenshot, task_config) - env: Environment instance (caller should call env.close() when done) - screenshot: Initial screenshot bytes - task_config: Task configuration
Example:
env, screenshot, task_cfg = await run_interactive(Path("./task"))
print(f"Task: {task_cfg.description}")
# Execute actions...
screenshot = await env.step(ClickAction(x=100, y=200))
# Evaluate
reward = await env.evaluate()
print(f"Reward: {reward}")
# Cleanup
await env.close()environment
Simplified, provider-driven environment.
Bot
Helper class for writing trajectories for task solutions.
Constructor
Bot(self, env: Any)Attributes
| Name | Type | Description |
|---|---|---|
env | Any |
Methods
Bot.click_element
def click_element(self, pid: int, selector: str) -> NoneFind element by CSS selector and click its center.
Uses provider's bench-ui bridge to fetch element rect in screen space and then dispatches a ClickAction via env.step().
Bot.right_click_element
def right_click_element(self, pid: int, selector: str) -> NoneTracing
Lightweight trajectory tracing using Hugging Face Datasets.
Records events with arbitrary JSON metadata and a list of PIL images. Exposes a datasets.Dataset-compatible interface for saving/pushing.
Constructor
Tracing(self, env: Any) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
env | Any | |
trajectory_id | Optional[str] | |
dataset | Dataset | Return a HF Dataset built from current rows, constructing lazily. |
Methods
Tracing.start
def start(self, trajectory_id: Optional[str] = None) -> strStart a new trajectory. Resets any previously recorded rows.
Returns the trajectory_id used.
Tracing.record
def record(self, event_name: str, data_dict: Dict[str, Any], data_images: List[Image.Image | bytes] | None = None) -> NoneTracing.save_to_disk
def save_to_disk(self, output_dir: str, save_pngs: bool = False, image_dir: Optional[str] = None, filter_events: Optional[List[str]] = None) -> NoneTracing.push_to_hub
def push_to_hub(self, repo_id: str, private: bool | None = None) -> strTracing.bytes_to_image
def bytes_to_image(png_bytes: bytes) -> Image.ImageMaxStepsExceeded
Inherits from: Exception
Raised when the environment's max step budget is exhausted.
Environment
A minimal environment wrapper that delegates everything to a provider.
Functions can be injected directly, or discovered from a module via
make_from_module based on cua-bench decorators (_td_type, _td_split).
Constructor
Environment(self, env_name: Optional[str] = None, split: str = 'train', tasks_config_fn: Optional[Callable[..., Any]] = None, setup_task_fn: Optional[Callable[..., Any]] = None, solve_task_fn: Optional[Callable[..., Any]] = None, evaluate_task_fn: Optional[Callable[..., Any]] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
session | Optional[Any] | |
env_name | Optional[str] | |
split | Optional[str] | |
headless | bool | |
print_actions | bool | |
bot | Optional[Bot] | |
tracing | Optional[Tracing] | |
step_count | int | |
max_steps | Optional[int] | |
tasks_config_fn | Any | |
setup_task_fn | Any | |
solve_task_fn | Any | |
evaluate_task_fn | Any | |
tasks | Optional[list] | |
current_task | Optional[Any] | |
session_name | Optional[str] | |
session_config | Dict[str, Any] | |
setup_config | DesktopSetupConfig | |
page | Optional[Any] |
Methods
Environment.make_from_module
def make_from_module(cls, module: Any, env_path: str | Path, split: str = 'train') -> 'Environment'Environment.create_sandbox
async def create_sandbox(self, provider: str, provider_config: Dict[str, Any] | None = None, setup_config: DesktopSetupConfig | None = None) -> NoneEnvironment.reset
async def reset(self, task_id: Optional[int] = None, run_id: Optional[str] = None) -> Tuple[bytes, Dict]Environment.step
async def step(self, action: Action, dry_run: bool | Literal['before', 'after'] = False) -> bytesEnvironment.solve
async def solve(self) -> bytesEnvironment.evaluate
async def evaluate(self) -> AnyEnvironment.close
async def close(self) -> Noneiconify
Iconify icon processing module for cua_bench.
This module provides functionality to process HTML containing iconify-icon elements and replace them with inline SVG content fetched from the Iconify API.
Key features:
- Processes <iconify-icon icon="prefix:name"> elements
- Supports custom icons.json for icon resolution
- Option to ignore icon set prefixes for randomization
- Caches SVG content for performance
- Preserves element attributes (width, height, class, etc.)
process_icons
def process_icons(html: str, icons_json: Optional[str] = None, ignore_iconset: bool = False) -> strProcess HTML containing iconify-icon elements and replace them with inline SVGs.
Parameters:
| Name | Type | Description |
|---|---|---|
html | Any | HTML content containing iconify-icon elements |
icons_json | Any | Path to custom icons.json file. If None, uses default iconsets/icons.json |
ignore_iconset | Any | If True, ignores the iconset prefix and searches for icon name only. Useful for shuffling/randomizing icon sets. For example: - eva:people-outline becomes */people-outline - mingcute:ad-circle-line becomes */ad-circle-line |
Returns: HTML with iconify-icon elements replaced by inline SVG content
Example:
>>> html = '<iconify-icon icon="eva:people-outline"></iconify-icon>'
>>> process_icons(html)
'<svg>...</svg>'
>>> # With ignore_iconset=True for randomization
>>> process_icons(html, ignore_iconset=True) # May use different iconsetclear_cache
def clear_cache()Clear the SVG cache. Useful for testing or memory management.
get_cache_size
def get_cache_size() -> intGet the number of cached SVG entries.
main
Main entry point for cua-bench CLI.
main
def main()Main CLI entry point.
desktop
Desktop environment management for cua-bench.
Window
Represents a window in the desktop environment.
Constructor
Window(self, x: int, y: int, width: int, height: int, title: str, content: str, focused: bool = False, icon: Optional[str] = None, title_bar_style: str = 'hidden') -> NoneAttributes
| Name | Type | Description |
|---|---|---|
x | int | |
y | int | |
width | int | |
height | int | |
title | str | |
content | str | |
focused | bool | |
icon | Optional[str] | |
title_bar_style | str |
DesktopState
State of the unified desktop environment.
Constructor
DesktopState(self, os_type: str = 'win11', width: int = 1024, height: int = 768, background: str = '#000', windows: List[Window] = list(), dock_state: Dict[str, List[Dict[str, str]]] = (lambda: {'pinned_apps': [], 'recent_apps': [], 'pinned_folders': []})(), taskbar_state: Dict[str, List[Dict[str, str]]] = (lambda: {'pinned_apps': [], 'open_apps': []})()) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
os_type | str | |
width | int | |
height | int | |
background | str | |
windows | List[Window] | |
dock_state | Dict[str, List[Dict[str, str]]] | |
taskbar_state | Dict[str, List[Dict[str, str]]] |
Desktop
Desktop environment manager.
Constructor
Desktop(self, env)Attributes
| Name | Type | Description |
|---|---|---|
env | Any | |
state | Any | |
template | Any |
Methods
Desktop.configure
def configure(self, os_type: Optional[str] = None, width: Optional[int] = None, height: Optional[int] = None, background: Optional[str] = None, dock_state: Optional[Dict[str, List[Union[str, Dict[str, str]]]]] = None, randomize_dock: bool = True, taskbar_state: Optional[Dict[str, List[Union[str, Dict[str, str]]]]] = None, randomize_taskbar: bool = True)Configure desktop appearance.
Parameters:
| Name | Type | Description |
|---|---|---|
os_type | Any | OS appearance (win11, win10, win7, macos, winxp, win98, android, ios) |
width | Any | Screen width in pixels |
height | Any | Screen height in pixels |
background | Any | Background color |
dock_state | Any | Explicit dock state to set with keys 'pinned_apps', 'recent_apps', 'pinned_folders' |
randomize_dock | Any | If True, populate dock_state using macOS icon sets |
taskbar_state | Any | Explicit taskbar state to set with keys 'pinned_apps', 'open_apps' |
randomize_taskbar | Any | If True, populate taskbar_state using Windows 11 icon sets |
Desktop.launch
def launch(self, content: str, title: str = 'Window', x: Optional[int] = None, y: Optional[int] = None, width: int = 600, height: int = 400, icon: Optional[str] = None, use_inner_size: bool = False, title_bar_style: str = 'default') -> WindowLaunch a new window on the desktop.
Parameters:
| Name | Type | Description |
|---|---|---|
content | Any | HTML content for the window body |
title | Any | Window title |
x | Any | X position (auto-calculated if None) |
y | Any | Y position (auto-calculated if None) |
width | Any | Window width |
height | Any | Window height |
use_inner_size | Any | Whether to use the inner size of the window (i.e. content size) |
Returns: Window instance
decorators
Decorators for defining cua-bench environments.
tasks_config
def tasks_config(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that loads tasks.
Can be used as @cb.tasks_config or @cb.tasks_config("train").
The decorated function should return a list of Task objects.
setup_task
def setup_task(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that sets up a task.
Can be used as @cb.setup_task or @cb.setup_task("train").
The decorated function receives task_cfg and should initialize the environment.
solve_task
def solve_task(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that solves a task.
Can be used as @cb.solve_task or @cb.solve_task("train").
The decorated function receives task_cfg and should execute the solution.
evaluate_task
def evaluate_task(_arg: Optional[Callable] = None, args = (), kwargs = {}) -> CallableDecorator for the function that evaluates a task.
Can be used as @cb.evaluate_task or @cb.evaluate_task("train").
The decorated function receives task_cfg and should return evaluation results.
computers
DesktopSession
Inherits from: Protocol
Desktop session interface for environment backends.
Usage:
Preferred: async context manager
async with get_session("native")(os_type="linux") as session: await session.screenshot()
Alternative: manual lifecycle
session = get_session("native")(os_type="linux") await session.start() try: await session.screenshot() finally: await session.close()
Constructor
DesktopSession(self, env: Any)Attributes
| Name | Type | Description |
|---|---|---|
page | Any | |
vnc_url | str | Return the VNC URL for accessing the desktop environment. |
apps | 'AppsProxy' | Access registered apps via session.apps.{app_name}. |
Methods
DesktopSession.start
async def start(self, config: Optional[DesktopSetupConfig] = None, headless: Optional[bool] = None) -> NoneStart the session and connect to the environment.
Parameters:
| Name | Type | Description |
|---|---|---|
config | Any | Optional configuration to apply before starting. |
headless | Any | If False, shows browser/VNC preview. Defaults to True. |
DesktopSession.serve_static
async def serve_static(self, url_path: str, local_path: str) -> NoneDesktopSession.launch_window
async def launch_window(self, url: Optional[str] = None, html: Optional[str] = None, folder: Optional[str] = None, title: str = 'Window', x: Optional[int] = None, y: Optional[int] = None, width: int = 600, height: int = 400, icon: Optional[str] = None, use_inner_size: bool = False, title_bar_style: str = 'default') -> int | strLaunch a window and return its process ID.
DesktopSession.get_element_rect
async def get_element_rect(self, pid: int | str, selector: str, space: Literal['window', 'screen'] = 'window', timeout: float = 0.5) -> dict[str, Any] | NoneDesktopSession.execute_javascript
async def execute_javascript(self, pid: int | str, javascript: str) -> AnyDesktopSession.execute_action
async def execute_action(self, action: Any) -> NoneDesktopSession.screenshot
async def screenshot(self) -> bytesDesktopSession.get_snapshot
async def get_snapshot(self) -> SnapshotReturn a lightweight snapshot of the desktop state (windows, etc.).
Implementations should populate the list of open windows with geometry and metadata. If not supported, raise NotImplementedError.
DesktopSession.close
async def close(self) -> NoneDesktopSession.close_all_windows
async def close_all_windows(self) -> NoneClose or clear all open windows in the desktop environment.
DesktopSession.click_element
async def click_element(self, pid: int | str, selector: str) -> NoneFind element by CSS selector and click its center.
Uses the session's get_element_rect to fetch element rect in screen space and then dispatches a ClickAction.
Parameters:
| Name | Type | Description |
|---|---|---|
pid | Any | Process ID of the window |
selector | Any | CSS selector for the element |
DesktopSession.right_click_element
async def right_click_element(self, pid: int | str, selector: str) -> NoneFind element by CSS selector and right-click its center.
Parameters:
| Name | Type | Description |
|---|---|---|
pid | Any | Process ID of the window |
selector | Any | CSS selector for the element |
DesktopSession.run_command
async def run_command(self, command: str, timeout: Optional[float] = None, check: bool = True) -> 'CommandResult'Execute a shell command on the native desktop environment.
This method is only available with the native provider (Docker/QEMU). It will raise NotImplementedError on simulated sessions.
Parameters:
| Name | Type | Description |
|---|---|---|
command | Any | Shell command to execute |
timeout | Any | Optional timeout in seconds |
check | Any | If True (default), raise an exception if the command fails (non-zero return code). If False, return the result regardless. |
Returns: CommandResult with stdout, stderr, and return_code
Raises:
NotImplementedError- If called on simulated providerRuntimeError- If check=True and command returns non-zero exit code
Example:
result = await session.run_command("ls -la /home/user")
print(result.stdout)DesktopSession.install_app
async def install_app(self, app_name: str, with_shortcut: bool = True, kwargs = {}) -> NoneInstall a registered app on the native desktop environment.
Uses the app registry to find platform-specific install functions. This method is only available with the native provider (Docker/QEMU).
Parameters:
| Name | Type | Description |
|---|---|---|
app_name | Any | Name of the app to install (e.g., "godot", "firefox") |
with_shortcut | Any | Create desktop shortcut (default True) **kwargs: App-specific arguments (e.g., version="4.2.1") |
Raises:
ValueError- If app is not registeredNotImplementedError- If app doesn't support the current platform
Example:
await session.install_app("godot", version="4.2.1")
await session.install_app("firefox", with_shortcut=True)DesktopSession.launch_app
async def launch_app(self, app_name: str, kwargs = {}) -> NoneLaunch a registered app on the native desktop environment.
Uses the app registry to find platform-specific launch functions. This method is only available with the native provider (Docker/QEMU).
Parameters:
| Name | Type | Description |
|---|---|---|
app_name | Any | Name of the app to launch **kwargs: App-specific arguments (e.g., project_path="/path") |
Raises:
ValueError- If app is not registeredNotImplementedError- If app doesn't support the current platform
Example:
await session.launch_app("godot", project_path="~/project", editor=True)DesktopSetupConfig
Inherits from: TypedDict
Configuration for desktop setup provided to providers.
Fields mirror high-level desktop appearance and workspace options.
Attributes
| Name | Type | Description |
|---|---|---|
os_type | Literal['win11', 'win10', 'win7', 'winxp', 'win98', 'macos', 'linux', 'android', 'ios', 'windows'] | |
width | int | |
height | int | |
background | str | |
wallpaper | str | |
installed_apps | List[str] | |
image | str | |
storage | str | |
memory | str | |
cpu | str | |
provider_type | str |
RemoteDesktopSession
Unified desktop session using cua-computer SDK.
Supports two modes:
-
Full lifecycle mode (default): Computer SDK manages container/VM
- Pass config via constructor kwargs or start(config={...})
- SDK starts container, waits for boot, connects
-
Client-only mode: Connect to pre-existing cua-computer-server
- Pass api_url to connect to existing server
- Used by 2-container architecture, batch execution
Works with any golden environment type:
- linux-docker: trycua/cua-xfce container
- windows-qemu: Windows 11 VM
- linux-qemu: Linux VM
- android-qemu: Android VM
Supports full bench_ui integration when bench_ui is installed in the remote environment, enabling:
- launch_window() with HTML content via pywebview
- execute_javascript() for DOM manipulation
- get_element_rect() for element location queries
- click_element() / right_click_element() for element-based interaction
Constructor
RemoteDesktopSession(self, api_url: str = '', vnc_url: str = '', width: int = 1920, height: int = 1080, os_type: str = 'linux', image: str = '', provider_type: str = 'docker', memory: str = '8GB', cpu: str = '4', name: str = '', storage: str = '', ephemeral: bool = True, headless: bool = True, kwargs = {})Attributes
| Name | Type | Description |
|---|---|---|
DEFAULT_TIMEOUT | Any | |
SCREENSHOT_TIMEOUT | Any | |
computer | Any | Get the Computer SDK instance for advanced operations. |
interface | Any | Get the computer interface for direct SDK access. |
page | Any | Return underlying page object - not applicable for remote. |
vnc_url | str | Return the VNC URL for accessing the environment. |
apps | 'AppsProxy' | Access registered apps via session.apps.{app_name}. |
os_type | str | Return the OS type for this session. |
Methods
RemoteDesktopSession.step
async def step(self, action: Action) -> NoneExecute an action (alias for execute_action, for env.step() compatibility).
RemoteDesktopSession.start
async def start(self, config: Optional[DesktopSetupConfig] = None, headless: Optional[bool] = None) -> NoneStart the session and connect to the environment.
Parameters:
| Name | Type | Description |
|---|---|---|
config | Any | Optional configuration to apply before starting. |
headless | Any | If False, opens VNC preview in browser. Defaults to constructor value if not specified. |
Example:
# Using constructor params (preferred)
async with RemoteDesktopSession(os_type="linux") as session:
await session.screenshot()
# Or with config dict
session = RemoteDesktopSession()
await session.start(config={"os_type": "linux", "width": 1920})RemoteDesktopSession.serve_static
async def serve_static(self, url_path: str, local_path: str) -> NoneServe static files - not applicable for remote environments.
RemoteDesktopSession.launch_window
async def launch_window(self, url: Optional[str] = None, html: Optional[str] = None, folder: Optional[str] = None, title: str = 'Window', x: Optional[int] = None, y: Optional[int] = None, width: int = 600, height: int = 400, icon: Optional[str] = None, use_inner_size: bool = False, title_bar_style: str = 'default') -> int | strLaunch a window in the remote environment using bench_ui (pywebview).
Supports:
- url: Open a URL in a pywebview window
- html: Display HTML content in a pywebview window
- folder: Copy folder to remote and serve it in a pywebview window
Returns: Process ID of the pywebview window (int)
RemoteDesktopSession.get_element_rect
async def get_element_rect(self, pid: int | str, selector: str, space: Literal['window', 'screen'] = 'window', timeout: float = 0.5) -> dict[str, Any] | NoneGet element rect by CSS selector using bench_ui.
Parameters:
| Name | Type | Description |
|---|---|---|
pid | Any | Process ID of the pywebview window |
selector | Any | CSS selector for the element |
space | Any | Coordinate space - "window" or "screen" |
timeout | Any | Maximum time to wait for element |
Returns: Dict with x, y, width, height or None if not found
RemoteDesktopSession.execute_javascript
async def execute_javascript(self, pid: int | str, javascript: str) -> AnyExecute JavaScript in a pywebview window using bench_ui.
Parameters:
| Name | Type | Description |
|---|---|---|
pid | Any | Process ID of the pywebview window |
javascript | Any | JavaScript code to execute |
Returns: Result of the JavaScript execution
RemoteDesktopSession.execute_action
async def execute_action(self, action: Action) -> NoneExecute an action on the remote desktop using the SDK.
RemoteDesktopSession.screenshot
async def screenshot(self) -> bytesCapture screenshot from remote environment.
Returns: PNG image bytes
RemoteDesktopSession.get_snapshot
async def get_snapshot(self) -> SnapshotGet snapshot of desktop state with active window info.
Uses pywinctl on remote to get active window, and if it's a webview we launched, extracts HTML via snapshot.js.
RemoteDesktopSession.close
async def close(self) -> NoneClose the session and cleanup resources.
RemoteDesktopSession.close_all_windows
async def close_all_windows(self) -> NoneClose all windows - best effort.
RemoteDesktopSession.click_element
async def click_element(self, pid: int | str, selector: str) -> NoneFind element by CSS selector and click its center.
Uses get_element_rect to fetch element rect in screen space and then dispatches a ClickAction.
RemoteDesktopSession.right_click_element
async def right_click_element(self, pid: int | str, selector: str) -> NoneFind element by CSS selector and right-click its center.
RemoteDesktopSession.get_accessibility_tree
async def get_accessibility_tree(self) -> Dict[str, Any]Get the accessibility tree if supported.
RemoteDesktopSession.shell_command
async def shell_command(self, command: str, check: bool = True) -> Dict[str, Any]Execute a shell command.
Parameters:
| Name | Type | Description |
|---|---|---|
command | Any | Shell command to execute |
check | Any | If True (default), raise an exception if the command fails (non-zero return code). If False, return the result regardless. |
Returns: Command result with stdout/stderr
Raises:
RuntimeError- If check=True and command returns non-zero exit code
RemoteDesktopSession.read_file
async def read_file(self, path: str) -> strRead a text file from the environment.
RemoteDesktopSession.write_file
async def write_file(self, path: str, content: str) -> NoneWrite a text file to the environment.
RemoteDesktopSession.read_bytes
async def read_bytes(self, path: str) -> bytesRead a file as bytes from the environment.
RemoteDesktopSession.write_bytes
async def write_bytes(self, path: str, data: bytes) -> NoneWrite bytes to a file in the environment.
RemoteDesktopSession.file_exists
async def file_exists(self, path: str) -> boolCheck if a file exists in the environment.
RemoteDesktopSession.directory_exists
async def directory_exists(self, path: str) -> boolCheck if a directory exists in the environment.
RemoteDesktopSession.list_dir
async def list_dir(self, path: str) -> list[str]List contents of a directory in the environment.
RemoteDesktopSession.run_command
async def run_command(self, command: str, check: bool = True) -> Dict[str, Any]Execute a shell command (alias for shell_command).
Parameters:
| Name | Type | Description |
|---|---|---|
command | Any | Shell command to execute |
check | Any | If True (default), raise an exception if the command fails (non-zero return code). If False, return the result regardless. |
Returns: Command result with stdout/stderr
Raises:
RuntimeError- If check=True and command returns non-zero exit code
RemoteDesktopSession.launch_application
async def launch_application(self, app_name: str) -> NoneLaunch an application by name.
RemoteDesktopSession.check_status
async def check_status(self) -> boolCheck if the environment is responsive.
Returns: True if environment is ready, False otherwise
RemoteDesktopSession.wait_until_ready
async def wait_until_ready(self, timeout: int = 60, poll_interval: float = 2.0) -> boolWait until the environment is ready.
Parameters:
| Name | Type | Description |
|---|---|---|
timeout | Any | Maximum time to wait in seconds |
poll_interval | Any | Time between status checks |
Returns: True if environment became ready, False if timeout
RemoteDesktopSession.click
async def click(self, x: int, y: int) -> NoneClick at coordinates.
RemoteDesktopSession.right_click
async def right_click(self, x: int, y: int) -> NoneRight-click at coordinates.
RemoteDesktopSession.double_click
async def double_click(self, x: int, y: int) -> NoneDouble-click at coordinates.
RemoteDesktopSession.type
async def type(self, text: str) -> NoneType text.
RemoteDesktopSession.key
async def key(self, key: str) -> NonePress a key.
RemoteDesktopSession.hotkey
async def hotkey(self, keys: list[str]) -> NonePress a key combination.
RemoteDesktopSession.scroll
async def scroll(self, direction: str = 'down', amount: int = 300) -> NoneScroll the screen.
RemoteDesktopSession.move_to
async def move_to(self, x: int, y: int) -> NoneMove cursor to coordinates.
RemoteDesktopSession.drag
async def drag(self, from_x: int, from_y: int, to_x: int, to_y: int) -> NoneDrag from one position to another.
RemoteDesktopSession.install_app
async def install_app(self, app_name: str, with_shortcut: bool = True, kwargs = {}) -> NoneInstall a registered app on the native desktop environment.
Uses the app registry to find platform-specific install functions.
Parameters:
| Name | Type | Description |
|---|---|---|
app_name | Any | Name of the app to install (e.g., "godot", "firefox") |
with_shortcut | Any | Create desktop shortcut (default True) **kwargs: App-specific arguments (e.g., version="4.2.1") |
Raises:
ValueError- If app is not registeredNotImplementedError- If app doesn't support the current platform
Example:
await session.install_app("godot", version="4.2.1")
await session.install_app("firefox", with_shortcut=True)RemoteDesktopSession.launch_app
async def launch_app(self, app_name: str, kwargs = {}) -> NoneLaunch a registered app on the native desktop environment.
Uses the app registry to find platform-specific launch functions.
Parameters:
| Name | Type | Description |
|---|---|---|
app_name | Any | Name of the app to launch **kwargs: App-specific arguments (e.g., project_path="/path") |
Raises:
ValueError- If app is not registeredNotImplementedError- If app doesn't support the current platform
Example:
await session.launch_app("godot", project_path="~/project", editor=True)get_session
def get_session(name: Optional[str] = None) -> type[DesktopSession]Return session class by name.
Provider names:
-
"simulated" (alias: "webtop"): Playwright-based browser simulation Fast, no Docker required. UI is HTML/CSS rendering of desktop. Good for web-app testing, UI benchmarks.
-
"native" (alias: "computer"): Real OS in Docker/QEMU container Actual desktop environment with real applications. Requires Docker. Good for real app testing, OS-level tasks.
create_remote_session
def create_remote_session(api_url: str, vnc_url: str = '', os_type: str = 'linux', width: int = 1920, height: int = 1080) -> RemoteDesktopSessionCreate a RemoteDesktopSession.
Parameters:
| Name | Type | Description |
|---|---|---|
api_url | Any | URL of the environment's API endpoint |
vnc_url | Any | URL for VNC access |
os_type | Any | Operating system type |
width | Any | Screen width |
height | Any | Screen height |
Returns: Configured RemoteDesktopSession instance
config
Configuration module for cua-bench.
ConfigLoader
Load and merge configuration from .cua/ directory.
Constructor
ConfigLoader(self, search_path: Path | None = None)Attributes
| Name | Type | Description |
|---|---|---|
CONFIG_DIR_NAME | Any | |
CONFIG_FILE_NAME | Any | |
AGENTS_FILE_NAME | Any | |
search_path | Any |
Methods
ConfigLoader.find_config_dir
def find_config_dir(self) -> Path | NoneWalk up directory tree to find .cua/ directory.
Returns: Path to .cua/ directory if found, None otherwise.
ConfigLoader.load_config
def load_config(self) -> CuaConfig | NoneLoad .cua/config.yaml if it exists.
Returns: CuaConfig object if config file exists, None otherwise.
ConfigLoader.load_agents
def load_agents(self) -> list[CustomAgentEntry]Load .cua/agents.yaml if it exists.
Returns: List of CustomAgentEntry objects.
ConfigLoader.get_agent_by_name
def get_agent_by_name(self, name: str) -> CustomAgentEntry | NoneGet a custom agent entry by name.
Parameters:
| Name | Type | Description |
|---|---|---|
name | Any | Agent name to look up. |
Returns: CustomAgentEntry if found, None otherwise.
ConfigLoader.get_effective_config
def get_effective_config(self, cli_args: dict[str, Any], env_type: str | None = None) -> dict[str, Any]Merge configuration sources into effective config.
Priority (highest to lowest):
- CLI arguments
- Environment-specific overrides
- Agent defaults from agents.yaml
- Agent config from config.yaml
- Defaults from config.yaml
Parameters:
| Name | Type | Description |
|---|---|---|
cli_args | Any | Command line arguments as dictionary. |
env_type | Any | Environment type for env-specific overrides (e.g., "webtop", "winarena"). |
Returns: Merged configuration dictionary.
AgentConfig
Agent configuration from .cua/config.yaml.
Constructor
AgentConfig(self, name: str | None = None, import_path: str | None = None, model: str | None = None, max_steps: int = 100, environments: dict[str, dict[str, Any]] | None = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
name | `str | None` |
import_path | `str | None` |
model | `str | None` |
max_steps | int | |
environments | `dict[str, dict[str, Any]] | None` |
Methods
AgentConfig.from_dict
def from_dict(cls, data: dict[str, Any]) -> AgentConfigCreate AgentConfig from dictionary.
AgentsConfig
Configuration from .cua/agents.yaml.
Supports two formats:
- Legacy:
custom_agentslist - New:
agentslist (preferred)
Example .cua/agents.yaml: agents:
-
name: my-agent image: myregistry/my-agent:latest defaults: model: gpt-4o
-
name: dev-agent import_path: my_agents.dev:DevAgent
Constructor
AgentsConfig(self, custom_agents: list[CustomAgentEntry] = list()) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
custom_agents | list[CustomAgentEntry] |
Methods
AgentsConfig.from_dict
def from_dict(cls, data: dict[str, Any]) -> AgentsConfigCreate AgentsConfig from dictionary.
CuaConfig
Root configuration from .cua/config.yaml.
Constructor
CuaConfig(self, defaults: DefaultsConfig | None = None, agent: AgentConfig | None = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
defaults | `DefaultsConfig | None` |
agent | `AgentConfig | None` |
Methods
CuaConfig.from_dict
def from_dict(cls, data: dict[str, Any]) -> CuaConfigCreate CuaConfig from dictionary.
CustomAgentEntry
Entry for a custom agent in .cua/agents.yaml.
Agents can be defined in two ways:
- Docker image (cloud-ready): Specify
imagefield with a Docker image - Import path (local dev): Specify
import_pathfor Python import
Examples:
Docker image agent
- name: my-agent image: myregistry/my-agent:latest
Import path agent (uses default cua-agent image)
- name: dev-agent import_path: my_agents.dev:DevAgent
Built-in agent
- name: cua-agent builtin: true
Constructor
CustomAgentEntry(self, name: str, image: Optional[str] = None, import_path: Optional[str] = None, builtin: bool = False, command: Optional[list[str]] = None, defaults: dict[str, Any] = dict()) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
name | str | |
image | Optional[str] | |
import_path | Optional[str] | |
builtin | bool | |
command | Optional[list[str]] | |
defaults | dict[str, Any] |
Methods
CustomAgentEntry.get_image
def get_image(self) -> strGet the Docker image to use for this agent.
Returns: Docker image name. Uses custom image if specified, otherwise returns the default cua-agent image.
CustomAgentEntry.is_docker_agent
def is_docker_agent(self) -> boolCheck if this agent is defined as a Docker image.
Returns: True if agent has a custom Docker image specified.
DefaultsConfig
Default configuration values from .cua/config.yaml.
Constructor
DefaultsConfig(self, model: str | None = None, max_steps: int = 100, output_dir: str = './results') -> NoneAttributes
| Name | Type | Description |
|---|---|---|
model | `str | None` |
max_steps | int | |
output_dir | str |
Methods
DefaultsConfig.from_dict
def from_dict(cls, data: dict[str, Any]) -> DefaultsConfigCreate DefaultsConfig from dictionary.
detect_env_type
def detect_env_type(env_path: str) -> str | NoneDetect environment type from path.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to the environment. |
Returns: Environment type string ("webtop" or "winarena"), or None if unknown.
runner
Runner module for 2-container task execution.
TaskResult
Result of a task execution.
Constructor
TaskResult(self, success: bool, exit_code: int, agent_logs: str, env_logs: str, output_dir: Optional[str] = None, error: Optional[str] = None) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
success | bool | |
exit_code | int | |
agent_logs | str | |
env_logs | str | |
output_dir | Optional[str] | |
error | Optional[str] |
TaskRunner
Orchestrates 2-container task execution.
Architecture:
- Creates isolated Docker network per task
- Creates task overlay to protect golden image (QEMU types)
- Starts environment container (base image with QCOW2 disk)
- Starts agent container (runs solver)
- Agent connects to env via network hostname
- Waits for agent completion
- Collects results and cleans up (including overlay)
Constructor
TaskRunner(self, agent_image: str = DEFAULT_AGENT_IMAGE, env_hostname: str = 'cua-env', agent_hostname: str = 'cua-agent')Attributes
| Name | Type | Description |
|---|---|---|
agent_image | Any | |
env_hostname | Any | |
agent_hostname | Any |
Methods
TaskRunner.run_task
async def run_task(self, env_path: Path, task_index: int, env_type: str, golden_name: Optional[str] = None, agent: Optional[str] = None, agent_image: Optional[str] = None, agent_command: Optional[List[str]] = None, agent_import_path: Optional[str] = None, model: Optional[str] = None, max_steps: int = 100, oracle: bool = False, memory: str = '8G', cpus: str = '8', vnc_port: Optional[int] = None, api_port: Optional[int] = None, output_dir: Optional[str] = None, stream_agent_logs: bool = False, timeout: Optional[int] = None, cleanup_before: bool = True, remove_images_after: bool = False, provider_type: Optional[str] = None) -> TaskResultRun a task with 2-container architecture.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to task environment directory |
task_index | Any | Task index to run |
env_type | Any | Environment type (linux-docker, windows-qemu, etc.) |
image_name | Any | Image name to use (defaults to env_type). See: cb image list |
agent | Any | Agent name (for built-in agents) |
agent_image | Any | Docker image for agent container (overrides default) |
agent_command | Any | Custom command for agent container |
agent_import_path | Any | Custom agent import path |
model | Any | Model to use |
max_steps | Any | Maximum agent steps |
oracle | Any | Run oracle solution instead of agent |
memory | Any | Memory for environment (QEMU only) |
cpus | Any | CPUs for environment (QEMU only) |
vnc_port | Any | Host port to map VNC (for debugging) |
api_port | Any | Host port to map API (for debugging) |
output_dir | Any | Output directory for results |
stream_agent_logs | Any | Stream agent logs to <output_dir>/run.log in real-time (default: False) |
timeout | Any | Timeout in seconds (None = no timeout) |
cleanup_before | Any | Clean up stale containers before starting (default: True) |
remove_images_after | Any | Remove Docker images after task (default: False) Note: This removes Docker images but NOT base VM disk images. |
provider_type | Any | Provider type ("simulated", "webtop", "native", "computer", None). If "simulated" or "webtop", the agent container will use a local Playwright session instead of connecting to a remote environment. |
Returns: TaskResult with execution details
TaskRunner.run_task_interactively
async def run_task_interactively(self, env_type: str, golden_name: Optional[str] = None, env_path: Optional[Path] = None, task_index: int = 0, memory: str = '8G', cpus: str = '8', vnc_port: Optional[int] = None, api_port: Optional[int] = None, auto_allocate_ports: bool = True, cleanup_before: bool = True) -> tuple[str, str, callable, Optional[dict]]Start an environment container interactively (without agent).
This method starts only the environment container with VNC and API ports exposed to the host, allowing manual interaction or agent connection. If env_path is provided, it will also load the task and run the setup.
Parameters:
| Name | Type | Description |
|---|---|---|
env_type | Any | Environment type (linux-docker, windows-qemu, etc.) |
golden_name | Any | Image name to use (defaults to env_type) |
env_path | Any | Path to task directory (optional, for running task setup) |
task_index | Any | Task index to run (default: 0) |
memory | Any | Memory for environment (QEMU only) |
cpus | Any | CPUs for environment (QEMU only) |
vnc_port | Any | Host port to map VNC (None = auto-allocate) |
api_port | Any | Host port to map API (None = auto-allocate) |
auto_allocate_ports | Any | Auto-allocate ports if not specified (default: True) |
cleanup_before | Any | Clean up stale containers before starting (default: True) |
Returns: Tuple of (vnc_url, api_url, cleanup_func, task_config, env, session) - vnc_url: URL to access VNC (e.g., http://localhost:8006) - api_url: URL to access API (e.g., http://localhost:5000) - cleanup_func: Async function to call when done to cleanup resources - task_config: Task configuration dict (None if env_path not provided) - env: Environment object (None if env_path not provided) - session: RemoteDesktopSession object (None if env_path not provided)
Example:
```python
runner = TaskRunner()
vnc_url, api_url, cleanup, task_cfg, env, session = await runner.run_task_interactively(
"linux-docker",
env_path=Path("./my_task"),
task_index=0
)
print(f"VNC: {vnc_url}")
print(f"Task: {task_cfg.get('description')}")
# ... do interactive work ...
# Evaluate before cleanup
if env and env.evaluate_task_fn:
result = await env.evaluate_task_fn(task_cfg['_task_cfg'], session)
print(f"Result: {result}")
await cleanup()
#### TaskRunner.cleanup_all
```python
async def cleanup_all(self) -> NoneClean up all running tasks.
TaskRunner.force_cleanup
async def force_cleanup() -> dictForce cleanup of all stale cua-bench containers and networks.
Use this when containers are left behind from previous runs.
Returns: Dict with counts: {"containers": N, "networks": N}
agents
AgentResult
Result of agent execution.
Constructor
AgentResult(self, total_input_tokens: int = 0, total_output_tokens: int = 0, failure_mode: FailureMode = FailureMode.UNSET) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
total_input_tokens | int | |
total_output_tokens | int | |
failure_mode | FailureMode |
BaseAgent
Inherits from: ABC
Base class for agents that can perform tasks.
Constructor
BaseAgent(self, kwargs = {})Attributes
| Name | Type | Description |
|---|---|---|
version | `str | None` |
prompt_template | `str | None` |
Methods
BaseAgent.name
def name() -> strReturn the name of the agent.
BaseAgent.perform_task
async def perform_task(self, task_description: str, session: DesktopSession, logging_dir: Path | None = None, tracer = None) -> AgentResultPerform a task using the agent.
Parameters:
| Name | Type | Description |
|---|---|---|
task_description | Any | The task description/instruction |
session | Any | The desktop or mobile session to interact with |
logging_dir | Any | Optional directory for logging agent execution |
tracer | Any | Optional tracer object for recording agent actions |
Returns: AgentResult with token counts and failure mode
FailureMode
Inherits from: Enum
Failure mode for agent execution.
Attributes
| Name | Type | Description |
|---|---|---|
UNSET | Any | |
NONE | Any | |
UNKNOWN | Any | |
MAX_STEPS_EXCEEDED | Any |
CuaAgent
Inherits from: BaseAgent
Agent implementation using the CUA Computer Agent SDK.
Constructor
CuaAgent(self, kwargs = {})Attributes
| Name | Type | Description |
|---|---|---|
model | Any | |
max_steps | Any |
Methods
CuaAgent.name
def name() -> strCuaAgent.perform_task
async def perform_task(self, task_description: str, session: DesktopSession, logging_dir: Path | None = None, tracer = None) -> AgentResultPerform a task using the CUA Computer Agent.
Parameters:
| Name | Type | Description |
|---|---|---|
task_description | Any | The task description/instruction |
session | Any | The desktop session to interact with |
logging_dir | Any | Optional directory for logging agent execution |
tracer | Any | Optional tracer object for recording agent actions |
Returns: AgentResult with token counts and failure mode
GeminiAgent
Inherits from: BaseAgent
Agent implementation using Google's Gemini API with Computer Use.
Constructor
GeminiAgent(self, kwargs = {})Attributes
| Name | Type | Description |
|---|---|---|
model | Any | |
api_key | Any | |
thinking_level | Any | |
media_resolution | Any | |
max_steps | Any |
Methods
GeminiAgent.name
def name() -> strGeminiAgent.perform_task
async def perform_task(self, task_description: str, session: DesktopSession, logging_dir: Path | None = None, tracer = None) -> AgentResultPerform a task using the Gemini Computer Use agent.
Parameters:
| Name | Type | Description |
|---|---|---|
task_description | Any | The task description/instruction |
session | Any | The desktop session to interact with |
logging_dir | Any | Optional directory for logging agent execution |
tracer | Any | Optional tracer object for recording agent actions |
Returns: AgentResult with token counts and failure mode
register_agent
def register_agent(name: str)Decorator to register an agent class with a given name.
load_agent_from_path
def load_agent_from_path(import_path: str) -> type[BaseAgent]Load an agent class from an import path.
Parameters:
| Name | Type | Description |
|---|---|---|
import_path | Any | Import path in format 'module.path:ClassName' |
Returns: Agent class
Raises:
ValueError- If import path format is invalidImportError- If module cannot be importedAttributeError- If class is not found in module
get_agent
def get_agent(name: str, config_loader: 'ConfigLoader | None' = None) -> type[BaseAgent] | NoneGet an agent class by name.
Lookup order:
- Local registry (.cua/agents.yaml) - if config_loader provided
- Built-in registry (_AGENT_REGISTRY)
Parameters:
| Name | Type | Description |
|---|---|---|
name | Any | Agent name to look up |
config_loader | Any | Optional ConfigLoader for local registry lookup |
Returns: Agent class if found, None otherwise
list_agents
def list_agents(config_loader: 'ConfigLoader | None' = None) -> list[str]List all registered agent names.
Parameters:
| Name | Type | Description |
|---|---|---|
config_loader | Any | Optional ConfigLoader to include local agents |
Returns: List of agent names (local + built-in, deduplicated)
processors
Snapshot processors for converting batch outputs into various dataset formats.
AgUVisStage1Processor
Inherits from: BaseProcessor
Processor for aguvis-stage-1 format (action augmentation dataset).
Methods
AgUVisStage1Processor.get_dataset_name
def get_dataset_name(self) -> strAgUVisStage1Processor.process
def process(self) -> List[Dict[str, Any]]Process snapshots into aguvis-stage-1 format.
BaseProcessor
Inherits from: ABC
Base class for snapshot processors.
A processor converts batch dump outputs (screenshots + snapshots) into a specific dataset format.
Constructor
BaseProcessor(self, args: ProcessorArgs)Attributes
| Name | Type | Description |
|---|---|---|
args | Any |
Methods
BaseProcessor.process
def process(self) -> List[Dict[str, Any]]Process the snapshots and return a list of dataset rows.
Returns: List of dictionaries, where each dict is a row in the dataset. The schema depends on the specific processor implementation.
BaseProcessor.get_dataset_name
def get_dataset_name(self) -> strGet the default dataset name for this processor.
BaseProcessor.save_jsonl
def save_jsonl(self, rows: List[Dict[str, Any]], save_dir: Path, dataset_name: str) -> PathSave dataset rows as JSONL file.
Parameters:
| Name | Type | Description |
|---|---|---|
rows | Any | List of dataset row dictionaries |
save_dir | Any | Directory to save to |
dataset_name | Any | Name of the dataset file (without extension) |
Returns: Path to the saved file
BaseProcessor.save_to_disk
def save_to_disk(self, rows: List[Dict[str, Any]], save_dir: Path, dataset_name: str) -> PathSave dataset rows using HuggingFace's save_to_disk method.
This method properly handles PIL images and other complex data types that cannot be serialized to JSON.
Parameters:
| Name | Type | Description |
|---|---|---|
rows | Any | List of dataset row dictionaries |
save_dir | Any | Directory to save to |
dataset_name | Any | Name of the dataset directory |
Returns: Path to the saved dataset directory
BaseProcessor.push_to_hub
def push_to_hub(self, rows: List[Dict[str, Any]], repo_id: str, private: bool) -> NonePush dataset to Hugging Face Hub.
Parameters:
| Name | Type | Description |
|---|---|---|
rows | Any | List of dataset row dictionaries |
repo_id | Any | HuggingFace repository ID (e.g., "username/dataset-name") |
private | Any | Whether to make the dataset private |
GuiR1Processor
Inherits from: BaseProcessor
Processor for gui-r1 format (low-level click instructions).
Methods
GuiR1Processor.get_dataset_name
def get_dataset_name(self) -> strGuiR1Processor.process
def process(self) -> List[Dict[str, Any]]Process snapshots into gui-r1 format.
get_processor
def get_processor(name: str) -> type[BaseProcessor]Get a processor class by name.
sessions
Sessions module for async container management.
SessionProvider
Inherits from: ABC
Base class for session providers (Docker, CUA Cloud, etc.).
Methods
SessionProvider.start_session
async def start_session(self, session_id: str, env_path: Path, container_script: str, image_uri: Optional[str] = None, output_dir: Optional[str] = None, kwargs = {}) -> Dict[str, Any]Start a new session.
Parameters:
| Name | Type | Description |
|---|---|---|
session_id | Any | Unique identifier for the session |
env_path | Any | Path to the environment directory |
container_script | Any | Script to run in the container |
image_uri | Any | Container image to use |
output_dir | Any | Directory to save outputs **kwargs: Additional provider-specific arguments |
Returns: Dict containing session metadata (container_id, status, etc.)
SessionProvider.get_session_status
async def get_session_status(self, session_id: str) -> Dict[str, Any]Get the status of a running session.
Parameters:
| Name | Type | Description |
|---|---|---|
session_id | Any | Session identifier |
Returns: Dict containing session status information
SessionProvider.stop_session
async def stop_session(self, session_id: str) -> NoneStop a running session.
Parameters:
| Name | Type | Description |
|---|---|---|
session_id | Any | Session identifier |
SessionProvider.get_session_logs
async def get_session_logs(self, session_id: str, tail: Optional[int] = None) -> strGet logs from a session.
Parameters:
| Name | Type | Description |
|---|---|---|
session_id | Any | Session identifier |
tail | Any | Number of lines to return from the end (None for all) |
Returns: Log output as string
list_sessions
def list_sessions(provider: Optional[str] = None) -> List[Dict[str, Any]]List all stored sessions.
Parameters:
| Name | Type | Description |
|---|---|---|
provider | Any | Optional provider filter ("docker", "cua-cloud", etc.) |
Returns: List of session metadata dicts
make
def make(provider_name: str, env_type: Optional[str] = None) -> SessionProviderCreate a session provider for the specified provider.
Parameters:
| Name | Type | Description |
|---|---|---|
provider_name | Any | Name of the provider: - "local": Run locally using Docker (webtop) or QEMU/KVM (winarena) - "cloud": Run on CUA Cloud (GCP Batch for webtop, Azure Batch for winarena) - "docker": (legacy) Alias for "local" |
env_type | Any | Optional environment type hint ("webtop" or "winarena"). Used by local provider to select appropriate backend. |
Returns: SessionProvider instance
Raises:
ValueError- If provider is not supported
batch
Batch integration for cua-bench.
execute_batch
async def execute_batch(job_name: str, env_path: Path, container_script: str, task_count: int = 4, task_parallelism: int = 4, run_local: bool = False, image_uri: Optional[str] = None, auto_cleanup: bool = True, output_dir: Optional[str] = None) -> List[str]Execute a batch job for cua-bench environment.
Parameters:
| Name | Type | Description |
|---|---|---|
job_name | Any | Name of the batch job |
env_path | Any | Path to the environment directory |
container_script | Any | Script to run in the container |
task_count | Any | Number of tasks to run |
task_parallelism | Any | Max concurrent tasks |
run_local | Any | Run locally using Docker instead of GCP |
image_uri | Any | Custom container image |
auto_cleanup | Any | Clean up resources after completion |
Returns: List of log lines from the job
run_local_docker
async def run_local_docker(env_path: Path, container_script: str, image_uri: Optional[str] = None, output_dir: Optional[str] = None, task_count: int = 1, parallelism: int = 1) -> List[str]Run the batch job locally using Docker.
Parameters:
| Name | Type | Description |
|---|---|---|
env_path | Any | Path to environment directory |
container_script | Any | Script to run |
image_uri | Any | Docker image to use |
output_dir | Any | Local directory to mount as /tmp/td_output for results |
task_count | Any | Total number of tasks to run |
parallelism | Any | Maximum number of concurrent containers |
Returns: List of output lines
workers
Worker-based gym system for parallel environment management.
This module provides a FastAPI-based worker system for running CUA-Bench environments in parallel, enabling efficient RL training and evaluation.
Components:
- worker_server: FastAPI server wrapping Environment instances
- worker_client: HTTP client for interacting with worker servers
- worker_manager: Utilities for spawning and managing multiple workers
- dataloader: MultiTurnDataloader and ReplayBuffer for RL training
MultiTurnDataloader
Dataloader for RL training with parallel environment workers.
Each env_config must contain a 'task_configs' key with a list of task configurations that the client will use internally.
Constructor
MultiTurnDataloader(self, env_class, env_configs, tokenizer, processor = None, is_multi_modal = True, batch_size = 8, replay_capacity = 10000, replay_reward_discount = 0.9, max_prompt_length = 1024, max_response_length = 1024, only_keep_outcome_in_replay = False)Attributes
| Name | Type | Description |
|---|---|---|
num_envs | Any | |
batch_size | Any | |
replay | Any |
Methods
MultiTurnDataloader.async_step
def async_step(self, batch_return)MultiTurnDataloader.sample_from_buffer
def sample_from_buffer(self, batch_size = None)MultiTurnDataloader.clear_replay_buffer
def clear_replay_buffer(self)MultiTurnDataloader.get_balance_stats
def get_balance_stats(self)MultiTurnDataloader.calculate_outcome_reward
def calculate_outcome_reward(self)MultiTurnDataloader.print_examples
def print_examples(self, n = 2)MultiTurnDataloader.print_stats_in_replay_buffer
def print_stats_in_replay_buffer(self)MultiTurnDataloader.running_outcome_reward
def running_outcome_reward(self)MultiTurnDataloader.close
def close(self)Close all workers and clean up resources.
ReplayBuffer
Constructor
ReplayBuffer(self, capacity = 10000, gamma = 1.0, only_keep_outcome = False, balance_thres = 0.1)Attributes
| Name | Type | Description |
|---|---|---|
capacity | Any | |
gamma | Any | |
only_keep_outcome | Any | |
balance_thres | Any | |
ready_buffer | Any | |
ready_position | Any | |
ready_count | Any | |
episode_buffer | Any |
Methods
ReplayBuffer.add
def add(self, data)Add data to the replay buffer
Parameters:
| Name | Type | Description |
|---|---|---|
data | tuple | A tuple of (worker_id, env_ret, meta_info) |
ReplayBuffer.get_balance_stats
def get_balance_stats(self)ReplayBuffer.should_keep
def should_keep(self, curr_below, curr_above, curr_ret)ReplayBuffer.sample
def sample(self, batch_size)Sample experiences from the ready buffer
Parameters:
| Name | Type | Description |
|---|---|---|
batch_size | int | Number of experiences to sample |
Returns: list: List of sampled experiences
ReplayBuffer.clear
def clear(self)Clear both ready buffer and episode buffer
CBEnvWorkerClient
HTTP client for CUA-Bench worker servers.
This client manages communication with the worker server, image processing, observation history tracking, and action normalization.
Args: env_config: Configuration dict with keys:
- server_url: URL of the worker server
- task_configs: List of task configs, each with env_path, task_index, split
- img_w: Image width (default: 1920)
- img_h: Image height (default: 1080)
- max_step: Maximum steps per episode (default: 50)
- max_hist: Maximum observation history length (default: 10)
- timeout: Environment timeout in seconds (default: 300)
Constructor
CBEnvWorkerClient(self, env_config)Attributes
| Name | Type | Description |
|---|---|---|
vision_start_token | Any | |
vision_end_token | Any | |
think_start_token | Any | |
think_end_token | Any | |
action_start_token | Any | |
action_end_token | Any | |
valid_fn_names | Any | |
vlm_img_w | Any | |
vlm_img_h | Any | |
dynamic_img_size | Any | |
env_config | Any | |
server_url | Any | |
max_step | Any | |
max_hist | Any | |
task_configs | List[Dict[str, Any]] | |
img_h | Any | |
img_w | Any | |
timeout | Any | |
env_id | Any | |
uid | Any | |
step_count | Any | |
done | Any | |
prompt | Any |
Methods
CBEnvWorkerClient.reset
def reset(self)CBEnvWorkerClient.reset_attempt
def reset_attempt(self)CBEnvWorkerClient.prompt_to_input_obs
def prompt_to_input_obs(self, prompt)CBEnvWorkerClient.check_and_fix_action
def check_and_fix_action(self, action_str)Parse action string and return (normalized_str, Action object for server).
CBEnvWorkerClient.reward_shaping
def reward_shaping(self, reward)CBEnvWorkerClient.check_and_resize_image
def check_and_resize_image(self, jpg_string)CBEnvWorkerClient.step
def step(self, action)CBEnvWorkerClient.step_attempt
def step_attempt(self, action)CBEnvWorkerClient.render
def render(self)Renders the current state in self.prompt as a sequence of text-image pairs into a single image
Returns: PIL.Image: Combined image showing the instruction and interaction history
WorkerHandle
Handle for a running worker server.
Attributes: worker_id: Unique identifier for this worker port: Port the worker is listening on process: Subprocess running the worker api_url: Full URL for API requests
Constructor
WorkerHandle(self, worker_id: str, port: int, process: subprocess.Popen, api_url: str) -> NoneAttributes
| Name | Type | Description |
|---|---|---|
worker_id | str | |
port | int | |
process | subprocess.Popen | |
api_url | str | |
is_running | bool | Check if the worker process is still running. |
Methods
WorkerHandle.health_check
async def health_check(self, timeout: float = 5.0) -> boolCheck if the worker is healthy.
Parameters:
| Name | Type | Description |
|---|---|---|
timeout | Any | Request timeout in seconds |
Returns: True if healthy, False otherwise
WorkerHandle.stop
def stop(self) -> NoneStop the worker process.
WorkerPool
Context manager for a pool of worker servers.
Example: async with WorkerPool(n_workers=4, allowed_ips=["127.0.0.1"]) as pool: for url in pool.urls: client = CBEnvWorkerClient({ "server_url": url })
Use client...
Constructor
WorkerPool(self, n_workers: int, allowed_ips: List[str], startup_timeout: float = 30.0, host: str = '0.0.0.0')Attributes
| Name | Type | Description |
|---|---|---|
n_workers | Any | |
allowed_ips | Any | |
startup_timeout | Any | |
host | Any | |
workers | List[WorkerHandle] | Get the list of worker handles. |
urls | List[str] | Get the list of worker URLs. |
Methods
WorkerPool.health_check_all
async def health_check_all(self) -> dictCheck health of all workers.
Returns: Dict mapping worker_id to health status
cleanup_workers
async def cleanup_workers(workers: List[WorkerHandle]) -> NoneStop all workers.
Parameters:
| Name | Type | Description |
|---|---|---|
workers | Any | List of WorkerHandle objects to stop |
create_workers
async def create_workers(n_workers: int, allowed_ips: List[str], startup_timeout: float = 30.0, host: str = '0.0.0.0') -> List[WorkerHandle]Spawn N worker servers on automatically allocated free ports.
Parameters:
| Name | Type | Description |
|---|---|---|
n_workers | Any | Number of worker servers to spawn |
allowed_ips | Any | List of IPs allowed to access workers |
startup_timeout | Any | Max time to wait for each worker to become healthy |
host | Any | Host for workers to bind to |
Returns: List of WorkerHandle objects
Raises:
RuntimeError- If any worker fails to start
Example:
workers = await create_workers(
n_workers=4,
allowed_ips=["127.0.0.1", "10.0.0.5"],
)
# Each worker manages up to 2 envs, so 4 workers = 8 parallel envstelemetry
Telemetry module for cua-bench.
This module provides analytics for tracking feature usage, user workflows, and system performance. All telemetry is routed through cua-core's PostHog infrastructure for consistency across the CUA ecosystem.
Events tracked:
- Tier 1 (Core): command_invoked, task_execution_started, task_evaluation_completed, batch_job_started
- Tier 2 (High Value): task_step_executed, batch_task_completed, dataset_processing_completed, task_execution_failed
Usage: from cua_bench.telemetry import record_event, track_command
Track CLI command usage
@track_command def my_command(args): ...
Track custom events
record_event("custom_event", {"property": "value"})
Environment Variables: CUA_TELEMETRY_ENABLED: Set to "false" to disable telemetry (default: "true") CUA_TELEMETRY_DEBUG: Set to "on" for debug logging
flush_telemetry
def flush_telemetry() -> NoneFlush pending telemetry events.
Delegates to cua-core's PostHog client.
is_telemetry_enabled
def is_telemetry_enabled() -> boolCheck if telemetry is enabled.
Delegates to cua-core's telemetry check.
record_event
def record_event(event_name: str, properties: Optional[Dict[str, Any]] = None) -> NoneRecord a telemetry event.
Routes through cua-core's telemetry infrastructure.
Parameters:
| Name | Type | Description |
|---|---|---|
event_name | Any | Name of the event (e.g., "cb_command_invoked") |
properties | Any | Optional dict of event properties |
track_batch_job_started
def track_batch_job_started(dataset_name: str, task_count: int, variant_count: int, parallelism: int = 1, agent: Optional[str] = None, model: Optional[str] = None, run_id: Optional[str] = None, provider_type: Optional[str] = None) -> NoneTrack batch job start.
Parameters:
| Name | Type | Description |
|---|---|---|
dataset_name | Any | Name of the dataset |
task_count | Any | Number of unique tasks |
variant_count | Any | Total variants to run |
parallelism | Any | Max parallel workers |
agent | Any | Agent name if specified |
model | Any | Model name if specified |
run_id | Any | Run ID for correlation |
provider_type | Any | Provider type |
track_batch_task_completed
def track_batch_task_completed(env_name: str, task_index: int, success: bool, reward: Optional[float] = None, total_steps: int = 0, duration_seconds: float = 0, run_id: Optional[str] = None, error: Optional[str] = None) -> NoneTrack individual task completion in batch.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Name of the environment/task |
task_index | Any | Task variant index |
success | Any | Whether task succeeded |
reward | Any | Reward/score if available |
total_steps | Any | Steps taken |
duration_seconds | Any | Task duration |
run_id | Any | Run ID for correlation |
error | Any | Error message if failed |
track_command
def track_command(func: Callable) -> CallableDecorator to track command invocation.
Usage: @track_command def cmd_run_task(args): ...
track_command_async
def track_command_async(func: Callable) -> CallableAsync decorator to track command invocation.
track_command_invoked
def track_command_invoked(command: str, subcommand: Optional[str] = None, args: Optional[Dict[str, Any]] = None) -> NoneTrack CLI command invocation.
This is the primary event for understanding feature usage.
Parameters:
| Name | Type | Description |
|---|---|---|
command | Any | Main command (e.g., "run", "interact", "trace") |
subcommand | Any | Optional subcommand (e.g., "task", "dataset", "list") |
args | Any | Optional sanitized arguments (no sensitive data) |
track_dataset_processing_completed
def track_dataset_processing_completed(processor_mode: str, rows_processed: int, duration_seconds: float, success: bool = True, output_format: Optional[str] = None) -> NoneTrack dataset processing completion.
Parameters:
| Name | Type | Description |
|---|---|---|
processor_mode | Any | Processing mode (aguvis-stage-1, gui-r1, etc.) |
rows_processed | Any | Number of rows processed |
duration_seconds | Any | Processing duration |
success | Any | Whether processing succeeded |
output_format | Any | Output format (disk, hub, jsonl) |
track_task_evaluation_completed
def track_task_evaluation_completed(env_name: str, task_index: int, result: Any, success: bool, total_steps: int, duration_seconds: float, run_id: Optional[str] = None, agent: Optional[str] = None, model: Optional[str] = None) -> NoneTrack task evaluation completion.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Name of the environment/task |
task_index | Any | Task variant index |
result | Any | Evaluation result (reward/score) |
success | Any | Whether task was successful |
total_steps | Any | Total steps taken |
duration_seconds | Any | Total duration in seconds |
run_id | Any | Run ID for correlation |
agent | Any | Agent name if used |
model | Any | Model name if used |
track_task_execution_failed
def track_task_execution_failed(env_name: str, task_index: int, error_type: str, error_message: str, stage: str, run_id: Optional[str] = None) -> NoneTrack task execution failure.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Name of the environment/task |
task_index | Any | Task variant index |
error_type | Any | Exception class name |
error_message | Any | Error message (truncated) |
stage | Any | Stage where error occurred |
run_id | Any | Run ID for correlation |
track_task_execution_started
def track_task_execution_started(env_name: str, task_index: int, provider_type: Optional[str] = None, os_type: Optional[str] = None, agent: Optional[str] = None, model: Optional[str] = None, max_steps: Optional[int] = None, execution_mode: str = 'single', run_id: Optional[str] = None) -> NoneTrack task execution start.
Parameters:
| Name | Type | Description |
|---|---|---|
env_name | Any | Name of the environment/task |
task_index | Any | Task variant index |
provider_type | Any | Provider type (simulated, webtop, native, computer) |
os_type | Any | OS type (linux, windows, android) |
agent | Any | Agent name if specified |
model | Any | Model name if specified |
max_steps | Any | Max steps budget |
execution_mode | Any | Execution mode (single, batch, interactive) |
run_id | Any | Run ID for correlation |
track_task_step_executed
def track_task_step_executed(action_type: str, step_count: int, duration_ms: Optional[float] = None, run_id: Optional[str] = None) -> NoneTrack individual step execution.
Note: This should be sampled to avoid high event volume.
Parameters:
| Name | Type | Description |
|---|---|---|
action_type | Any | Type of action (ClickAction, TypeAction, etc.) |
step_count | Any | Current step number |
duration_ms | Any | Step duration in milliseconds |
run_id | Any | Run ID for correlation |
apps
App Registry for cua-bench.
A decorator-based API for registering platform-specific app installers and launchers. Makes it easy for contributors to add support for new applications.
Example - Defining an app:
cua_bench/apps/godot.py
from cua_bench.apps import App, install, launch
class Godot(App): name = "godot" description = "Godot game engine"
@install("linux") async def install_linux(session, , with_shortcut=True, version="4.2.1"): await session.run_command( f"cd ~/Desktop && " f"wget -q https://github.com/godotengine/godot/releases/download/\{version\}-stable/Godot_v\{version\}-stable_linux.x86_64.zip && " f"unzip -q Godot_v{version}-stable_linux.x86_64.zip" ) if with_shortcut: await session.run_command( "ln -sf ~/Desktop/Godot_v_linux.x86_64 ~/Desktop/Godot" )
@install("windows") async def install_windows(session, *, with_shortcut=True, version="4.2.1"): await session.run_command(f"choco install godot --version={version} -y")
@launch("linux", "windows") async def launch_editor(session, *, project_path=None): cmd = "~/Desktop/Godot" if session.os_type == "linux" else "godot" if project_path: cmd += f" --editor --path {project_path}" await session.run_command(f"{cmd} &")
Example - Using in a task:
@cb.setup_task(split="train") async def start(task_cfg: cb.Task, session: cb.DesktopSession):
Install app (auto-selects platform)
await session.install_app("godot", with_shortcut=True, version="4.2.1")
Launch app
await session.launch_app("godot", project_path="~/project")
App
Base class for app definitions.
Subclass this and define platform-specific methods using decorators:
class MyApp(App): name = "myapp" description = "My application"
@install("linux") async def install_linux(session, **kwargs): ...
@install("windows") async def install_windows(session, **kwargs): ...
@launch("linux", "windows") async def launch(session, **kwargs): ...
Attributes
| Name | Type | Description |
|---|---|---|
name | str | |
description | str |
Methods
App.get_method
def get_method(self, method_type: str, platform: Platform) -> Optional[AppMethod]Get a method for the given type and platform.
App.get_install
def get_install(self, platform: Platform) -> Optional[AppMethod]Get the install method for a platform.
App.get_launch
def get_launch(self, platform: Platform) -> Optional[AppMethod]Get the launch method for a platform.
App.get_uninstall
def get_uninstall(self, platform: Platform) -> Optional[AppMethod]Get the uninstall method for a platform.
App.supported_platforms
def supported_platforms(self, method_type: str = 'install') -> Set[Platform]Get platforms supported for a method type.
AppRegistry
Registry access for DesktopSession integration.
This class provides the interface used by DesktopSession to install/launch apps.
Methods
AppRegistry.install_app
async def install_app(session: Any, app_name: str, with_shortcut: bool = True, kwargs = {}) -> NoneInstall an app on the session's platform.
Parameters:
| Name | Type | Description |
|---|---|---|
session | Any | DesktopSession instance |
app_name | Any | Name of the app to install |
with_shortcut | Any | Whether to create desktop shortcut (default True) **kwargs: Additional app-specific arguments |
AppRegistry.launch_app
async def launch_app(session: Any, app_name: str, kwargs = {}) -> NoneLaunch an app on the session's platform.
Parameters:
| Name | Type | Description |
|---|---|---|
session | Any | DesktopSession instance |
app_name | Any | Name of the app to launch **kwargs: App-specific launch arguments |
AppRegistry.uninstall_app
async def uninstall_app(session: Any, app_name: str, kwargs = {}) -> NoneUninstall an app from the session's platform.
Parameters:
| Name | Type | Description |
|---|---|---|
session | Any | DesktopSession instance |
app_name | Any | Name of the app to uninstall **kwargs: App-specific arguments |
get_app
def get_app(name: str) -> Optional[App]Get a registered app by name.
list_apps
def list_apps() -> List[str]List all registered app names.
Was this page helpful?