Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ data/*
FastSAM-x.pt
yolo11n.pt

# MuJoCo dumps a GL-warning log in the CWD at runtime.
MUJOCO_LOG.TXT

/thread_monitor_report.csv

# symlink one of .envrc.* if you'd like to use
Expand Down
4 changes: 2 additions & 2 deletions data/.lfs/command_center.html.tar.gz
Git LFS file not shown
4 changes: 2 additions & 2 deletions data/.lfs/mujoco_sim.tar.gz
Git LFS file not shown
9 changes: 9 additions & 0 deletions dimos/control/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class HardwareComponent:
address: Connection address - IP for TCP, port for CAN
auto_enable: Whether to auto-enable servos
gripper_joints: Joints that use adapter gripper methods (separate from joints).
domain_id: DDS domain ID for adapters that use DDS transport
(e.g. Unitree G1). Real robot uses 0; unitree_mujoco sim
defaults to 1. Ignored by non-DDS adapters.
"""

hardware_id: HardwareId
Expand All @@ -59,6 +62,12 @@ class HardwareComponent:
address: str | None = None
auto_enable: bool = True
gripper_joints: list[JointName] = field(default_factory=list)
domain_id: int = 0
# Per-joint PD gains used by ConnectedWholeBody when translating
# position commands to MotorCommand. None → adapter/component
# defaults. Must match `joints` length when set.
kp: list[float] | None = None
kd: list[float] | None = None
Comment on lines +65 to +70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all hardware components will need this value.

Let's flag this issue and add a more structural fix to this. I would like to have ConnectedWholeBody specific things perhaps to be part of the WholeBodyAdapter spec.


@property
def all_joints(self) -> list[JointName]:
Expand Down
192 changes: 180 additions & 12 deletions dimos/control/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from dimos.msgs.sensor_msgs import (
JointState,
)
from dimos.msgs.std_msgs.Bool import Bool
from dimos.teleop.quest.quest_types import (
Buttons,
)
Expand All @@ -83,29 +84,54 @@ class TaskConfig:

Attributes:
name: Task name (e.g., "traj_arm")
type: Task type ("trajectory", "servo", "velocity", "cartesian_ik", "teleop_ik")
type: Task type ("trajectory", "servo", "velocity", "cartesian_ik", "teleop_ik", "groot_wbc")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need a task based registry too now. Just flagging for reference

joint_names: List of joint names this task controls
priority: Task priority (higher wins arbitration)
model_path: Path to URDF/MJCF for IK solver (cartesian_ik/teleop_ik only)
model_path: Path to URDF/MJCF for IK solver (cartesian_ik/teleop_ik)
or directory containing balance.onnx/walk.onnx (groot_wbc).
ee_joint_id: End-effector joint ID in model (cartesian_ik/teleop_ik only)
hand: "left" or "right" controller hand (teleop_ik only)
gripper_joint: Joint name for gripper virtual joint
gripper_open_pos: Gripper position at trigger 0.0
gripper_closed_pos: Gripper position at trigger 1.0
hardware_id: Hardware id this task reads extra state from
(required by groot_wbc — pulls the WholeBodyAdapter for IMU
and the full joint list for observation assembly).
"""

name: str
type: str = "trajectory"
joint_names: list[str] = field(default_factory=lambda: [])
priority: int = 10
# Cartesian IK / Teleop IK specific
# Cartesian IK / Teleop IK / GR00T WBC specific
model_path: str | Path | None = None
ee_joint_id: int = 6
hand: Literal["left", "right"] | None = None # teleop_ik only
# Teleop IK gripper specific
gripper_joint: str | None = None
gripper_open_pos: float = 0.0
gripper_closed_pos: float = 0.0
# Tasks that need a hardware reference (e.g. groot_wbc for IMU + 29-DOF state)
hardware_id: str | None = None
# Servo task: optional initial target held until/unless a new one arrives.
default_positions: list[float] | None = None
# Call ``task.start()`` right after registration so the task is live
# from the first tick (e.g. GR00T balance/walk needs to drive joints
# immediately). Default False keeps the existing convention where
# tasks wait for an explicit activation (e.g. from teleop).
auto_start: bool = False
# Arm the task's policy automatically on ``start()`` (applies to
# tasks exposing ``arm()``, e.g. ``GrootWBCTask``). Simulation
# blueprints set this True; real-hardware blueprints leave it False
# so the operator arms via dashboard button after settling.
auto_arm: bool = False
# Start the task in dry-run mode (policy computes but output is
# suppressed). For real-hardware safety checks.
auto_dry_run: bool = False
# Ramp duration (seconds) used by ``arm()`` when called without an
# explicit argument — applies to tasks that interpolate from the
# current pose toward a default on arming.
default_ramp_seconds: float = 10.0


@dataclass
Expand Down Expand Up @@ -178,6 +204,17 @@ class ControlCoordinator(Module[ControlCoordinatorConfig]):
# Input: Teleop buttons for engage/disengage signaling
buttons: In[Buttons]

# Input: Arm/disarm velocity-policy tasks (e.g. GrootWBCTask). True
# → task.arm(); False → task.disarm(). Routed to every task that
# duck-types an ``arm`` method (and ``disarm`` for False).
activate: In[Bool]

# Input: Toggle dry-run on velocity-policy tasks. In dry-run the
# policy keeps computing but the coordinator forwards no command to
# the adapter — operators use this to sanity-check commands on real
# hardware before committing motor torques.
dry_run: In[Bool]

config: ControlCoordinatorConfig
default_config = ControlCoordinatorConfig

Expand All @@ -203,6 +240,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self._cartesian_command_unsub: Callable[[], None] | None = None
self._twist_command_unsub: Callable[[], None] | None = None
self._buttons_unsub: Callable[[], None] | None = None
self._activate_unsub: Callable[[], None] | None = None
self._dry_run_unsub: Callable[[], None] | None = None

logger.info(f"ControlCoordinator initialized at {self.config.tick_rate}Hz")

Expand All @@ -222,6 +261,10 @@ def _setup_from_config(self) -> None:
for task_cfg in self.config.tasks:
task = self._create_task_from_config(task_cfg)
self.add_task(task)
if task_cfg.auto_start:
start = getattr(task, "start", None)
if callable(start):
start()

except Exception:
# Rollback: clean up all successfully added hardware
Expand Down Expand Up @@ -288,6 +331,7 @@ def _create_whole_body_adapter(self, component: HardwareComponent) -> object:
return whole_body_adapter_registry.create(
component.adapter_type,
network_interface=addr if addr is not None else 0,
domain_id=component.domain_id,
)

def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
Expand All @@ -308,12 +352,18 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
elif task_type == "servo":
from dimos.control.tasks import JointServoTask, JointServoTaskConfig

servo_cfg_kwargs: dict[str, object] = {
"joint_names": cfg.joint_names,
"priority": cfg.priority,
}
if cfg.default_positions is not None:
servo_cfg_kwargs["default_positions"] = cfg.default_positions
# Zero timeout pairs naturally with default-hold — otherwise
# the task times out even though it's holding a valid target.
servo_cfg_kwargs["timeout"] = 0.0
return JointServoTask(
cfg.name,
JointServoTaskConfig(
joint_names=cfg.joint_names,
priority=cfg.priority,
),
JointServoTaskConfig(**servo_cfg_kwargs), # type: ignore[arg-type]
)

elif task_type == "velocity":
Expand Down Expand Up @@ -363,6 +413,43 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask:
),
)

elif task_type == "groot_wbc":
from dimos.control.tasks.groot_wbc_task import (
GrootWBCTask,
GrootWBCTaskConfig,
)

if cfg.model_path is None:
raise ValueError(
f"GrootWBCTask '{cfg.name}' requires model_path "
f"(directory containing balance.onnx + walk.onnx)"
)
if cfg.hardware_id is None:
raise ValueError(f"GrootWBCTask '{cfg.name}' requires hardware_id in TaskConfig")
hw = self._hardware.get(cfg.hardware_id)
if hw is None:
raise ValueError(
f"GrootWBCTask '{cfg.name}' references unknown hardware "
f"'{cfg.hardware_id}'. List the hardware before the task "
f"in the blueprint config."
)

model_dir = Path(cfg.model_path)
return GrootWBCTask(
cfg.name,
GrootWBCTaskConfig(
balance_onnx=model_dir / "balance.onnx",
walk_onnx=model_dir / "walk.onnx",
joint_names=cfg.joint_names,
all_joint_names=hw.joint_names,
priority=cfg.priority,
auto_arm=cfg.auto_arm,
auto_dry_run=cfg.auto_dry_run,
default_ramp_seconds=cfg.default_ramp_seconds,
),
adapter=hw.adapter,
)

else:
raise ValueError(f"Unknown task type: {task_type}")

Expand Down Expand Up @@ -612,12 +699,52 @@ def _on_twist_command(self, msg: Twist) -> None:
joint_state = JointState(name=names, velocity=velocities)
self._on_joint_command(joint_state)

# Also route to tasks that accept a (vx, vy, yaw_rate) command —
# e.g. locomotion policies like GrootWBCTask. Duck-typed: any
# task exposing set_velocity_command opts in.
t_now = time.perf_counter()
with self._task_lock:
for task in self._tasks.values():
set_vel = getattr(task, "set_velocity_command", None)
if set_vel is not None:
set_vel(msg.linear.x, msg.linear.y, msg.angular.z, t_now)

def _on_buttons(self, msg: Buttons) -> None:
"""Forward button state to all tasks."""
with self._task_lock:
for task in self._tasks.values():
task.on_buttons(msg)

def _on_activate(self, msg: Bool) -> None:
"""Arm/disarm every task exposing ``arm()`` / ``disarm()``.

Duck-typed to match the ``set_velocity_command`` convention used
by ``_on_twist_command``. The blueprint wires this input to a
dashboard button; operators can also drive it directly via LCM.
"""
engage = bool(msg.data)
with self._task_lock:
for task in self._tasks.values():
method_name = "arm" if engage else "disarm"
handler = getattr(task, method_name, None)
if callable(handler):
try:
handler()
except Exception:
logger.exception(f"{method_name}() raised on task {task.name!r}")

def _on_dry_run(self, msg: Bool) -> None:
"""Forward dry-run toggle to every task exposing ``set_dry_run``."""
enabled = bool(msg.data)
with self._task_lock:
for task in self._tasks.values():
handler = getattr(task, "set_dry_run", None)
if callable(handler):
try:
handler(enabled)
except Exception:
logger.exception(f"set_dry_run() raised on task {task.name!r}")

@rpc
def task_invoke(
self, task_name: TaskName, method: str, kwargs: dict[str, Any] | None = None
Expand Down Expand Up @@ -737,16 +864,25 @@ def start(self) -> None:
"Use task_invoke RPC or set transport via blueprint."
)

# Subscribe to twist commands if any twist base hardware configured
# Subscribe to twist commands if any twist base hardware is configured
# OR if any task accepts velocity commands (locomotion policies like
# GrootWBCTask duck-type with set_velocity_command). Without the
# latter check, a whole-body locomotion blueprint with no BASE
# hardware silently drops every Twist on /cmd_vel.
has_twist_base = any(c.hardware_type == HardwareType.BASE for c in self.config.hardware)
if has_twist_base:
with self._task_lock:
has_velocity_task = any(
callable(getattr(task, "set_velocity_command", None))
for task in self._tasks.values()
)
if has_twist_base or has_velocity_task:
try:
self._twist_command_unsub = self.twist_command.subscribe(self._on_twist_command)
logger.info("Subscribed to twist_command for twist base control")
logger.info("Subscribed to twist_command for twist base / velocity-capable tasks")
except Exception:
logger.warning(
"Twist base configured but could not subscribe to twist_command. "
"Use task_invoke RPC or set transport via blueprint."
"Twist base or velocity-capable task configured but could not subscribe "
"to twist_command. Use task_invoke RPC or set transport via blueprint."
)

# Subscribe to buttons if any teleop_ik tasks configured (engage/disengage)
Expand All @@ -755,6 +891,32 @@ def start(self) -> None:
self._buttons_unsub = self.buttons.subscribe(self._on_buttons)
logger.info("Subscribed to buttons for engage/disengage")

# Subscribe to activate / dry_run if any task exposes arm() / set_dry_run()
# (duck-typed, same convention as twist_command / set_velocity_command).
with self._task_lock:
has_arm = any(callable(getattr(t, "arm", None)) for t in self._tasks.values())
has_dry_run = any(
callable(getattr(t, "set_dry_run", None)) for t in self._tasks.values()
)
if has_arm:
try:
self._activate_unsub = self.activate.subscribe(self._on_activate)
logger.info("Subscribed to activate for arm()/disarm() routing")
except Exception:
logger.warning(
"Arm-capable task configured but could not subscribe to activate. "
"Use task_invoke RPC or set transport via blueprint."
)
if has_dry_run:
try:
self._dry_run_unsub = self.dry_run.subscribe(self._on_dry_run)
logger.info("Subscribed to dry_run for dry-run routing")
except Exception:
logger.warning(
"Dry-run-capable task configured but could not subscribe to dry_run. "
"Use task_invoke RPC or set transport via blueprint."
)

logger.info(f"ControlCoordinator started at {self.config.tick_rate}Hz")

@rpc
Expand All @@ -772,6 +934,12 @@ def stop(self) -> None:
if self._twist_command_unsub:
self._twist_command_unsub()
self._twist_command_unsub = None
if self._activate_unsub:
self._activate_unsub()
self._activate_unsub = None
if self._dry_run_unsub:
self._dry_run_unsub()
self._dry_run_unsub = None
if self._buttons_unsub:
self._buttons_unsub()
self._buttons_unsub = None
Expand Down
Loading
Loading