Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions core/src/agent/loop_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::tools::filesystem::{EditFileTool, ListDirTool, ReadFileTool, WriteFil
use crate::tools::shell::ExecTool;
use crate::tools::web::{WebFetchTool, WebSearchTool};
use crate::tools::{MessageTool, SpawnTool, ToolRegistry, ToolRegistryExecutor};
use crate::workspace::SharedWorkspace;
use crate::workspace::types::{AgentId, TaskStatus};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, info};
Expand Down Expand Up @@ -61,6 +63,8 @@ pub struct AgentLoop {
running: Arc<RwLock<bool>>,
/// Task orchestrator for subagent spawning
task_orchestrator: Arc<TaskOrchestrator>,
/// Shared workspace for cross-agent coordination
workspace: Arc<SharedWorkspace>,
/// Max tool iterations
max_iterations: usize,
/// Default model
Expand All @@ -81,7 +85,6 @@ impl AgentLoop {
bus: MessageBus,
sessions: Arc<SessionManager>,
) -> Result<Self> {
let workspace = config.workspace_path();
let max_iterations = config.agents.defaults.max_tool_iterations;
let default_model = config.agents.defaults.model.clone();
let temperature = Some(config.agents.defaults.temperature as f32);
Expand All @@ -90,10 +93,12 @@ impl AgentLoop {

let context = ContextBuilder::new(config);
let tools = Arc::new(RwLock::new(ToolRegistry::new()));
let workspace_path = config.workspace_path();
let workspace = Arc::new(SharedWorkspace::open(&workspace_path).await?);

// Register default tools
let mut tools_guard = tools.write().await;
Self::register_default_tools(&mut tools_guard, &workspace, brave_api_key, bus.clone());
Self::register_default_tools(&mut tools_guard, &workspace_path, brave_api_key, bus.clone());
drop(tools_guard);

// Create mofa TaskOrchestrator for subagent spawning
Expand All @@ -108,6 +113,7 @@ impl AgentLoop {
context,
running: Arc::new(RwLock::new(false)),
task_orchestrator,
workspace,
max_iterations,
default_model,
temperature,
Expand All @@ -132,6 +138,7 @@ impl AgentLoop {
let temperature = Some(config.agents.defaults.temperature as f32);
let max_tokens = Some(config.agents.defaults.max_tokens as u32);
let context = ContextBuilder::new(config);
let workspace = Arc::new(SharedWorkspace::open(config.workspace_path()).await?);

// Create mofa TaskOrchestrator for subagent spawning
let task_orchestrator = Arc::new(TaskOrchestrator::with_defaults(provider.clone()));
Expand All @@ -145,6 +152,7 @@ impl AgentLoop {
context,
running: Arc::new(RwLock::new(false)),
task_orchestrator,
workspace,
max_iterations,
default_model,
temperature,
Expand Down Expand Up @@ -389,27 +397,56 @@ impl AgentLoop {

// Create task origin for routing results
let origin = TaskOrigin::from_channel(origin_channel, origin_chat_id);
let workspace_agent = AgentId::from(format!("subagent:{origin_channel}:{origin_chat_id}"));
let task_record = self
.workspace
.add_task_with_status(
workspace_agent.clone(),
prompt.to_string(),
TaskStatus::InProgress,
)
.await?;

// Spawn using mofa's TaskOrchestrator
let task_id = self
.task_orchestrator
.spawn(prompt, origin)
.await
.map_err(|e| crate::error::AgentError::ProviderError(e.to_string()))?;
.map_err(|e| crate::error::AgentError::ProviderError(e.to_string()));
let task_id = match task_id {
Ok(task_id) => task_id,
Err(err) => {
let _ = self
.workspace
.update_task_status(task_record.id, TaskStatus::Failed)
.await;
return Err(err.into());
}
};

// Subscribe to results and forward to message bus
let mut result_rx = self.task_orchestrator.subscribe_results();
let bus = self.bus.clone();
let workspace = self.workspace.clone();
let task_id_clone = task_id.clone();
let label_clone = display_label.clone();
let prompt_clone = prompt.to_string();
let origin_channel_clone = origin_channel.to_string();
let origin_chat_id_clone = origin_chat_id.to_string();
let task_record_id = task_record.id;

tokio::spawn(async move {
// Wait for this task's result
while let Ok(result) = result_rx.recv().await {
if result.task_id == task_id_clone {
let status = if result.success {
TaskStatus::Completed
} else {
TaskStatus::Failed
};
if let Err(err) = workspace.update_task_status(task_record_id, status).await {
error!("Failed to update shared workspace task state: {}", err);
}
Self::announce_subagent_result(
&bus,
&label_clone,
Expand Down Expand Up @@ -511,6 +548,11 @@ Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not men
pub fn tools(&self) -> &Arc<RwLock<ToolRegistry>> {
&self.tools
}

/// Get the shared workspace instance used by the agent runtime.
pub fn workspace(&self) -> &Arc<SharedWorkspace> {
&self.workspace
}
}

/// Implement the spawn tool's SubagentManager trait directly on AgentLoop
Expand Down
38 changes: 38 additions & 0 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::path::PathBuf;
use thiserror::Error;
use uuid::Uuid;

/// Result type for Mofaclaw operations
pub type Result<T> = std::result::Result<T, MofaclawError>;
Expand Down Expand Up @@ -33,6 +34,10 @@ pub enum MofaclawError {
#[error("Agent error: {0}")]
Agent(#[from] AgentError),

/// Workspace errors
#[error("Workspace error: {0}")]
Workspace(#[from] WorkspaceError),

/// IO errors
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
Expand Down Expand Up @@ -183,6 +188,39 @@ pub enum AgentError {
Cron(String),
}

/// Workspace-related errors
#[derive(Error, Debug)]
pub enum WorkspaceError {
#[error("Artifact not found: {0}")]
ArtifactNotFound(Uuid),

#[error("Artifact {artifact_id} version conflict: expected v{expected}, found v{actual}")]
VersionConflict {
artifact_id: Uuid,
expected: u32,
actual: u32,
},

#[error("Artifact {artifact_id} is locked by agent {held_by}")]
ArtifactLocked {
artifact_id: Uuid,
held_by: crate::workspace::types::AgentId,
},

#[error("Lock on artifact {artifact_id} is held by {held_by}, not {requester}")]
LockNotOwned {
artifact_id: Uuid,
held_by: crate::workspace::types::AgentId,
requester: crate::workspace::types::AgentId,
},

#[error("Version {1} not found for artifact {0}")]
VersionNotFound(Uuid, u32),

#[error("Workspace resource is busy: {0}")]
Busy(String),
}

impl From<anyhow::Error> for MofaclawError {
fn from(err: anyhow::Error) -> Self {
MofaclawError::Other(err.to_string())
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod rbac;
pub mod session;
pub mod tools;
pub mod types;
pub mod workspace;

// Re-exports for convenience
pub use agent::{AgentLoop, ContextBuilder, SubagentManager};
Expand All @@ -43,3 +44,4 @@ pub use session::{
};
pub use tools::ToolRegistry;
pub use types::*;
pub use workspace::SharedWorkspace;
Loading