Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dandelion_commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub enum DandelionError {
PromiseDroppedDebt,
/// there was a non recoverable issue when spawning or running the MMU worker
MmuWorkerError,
/// invalid WorkToDos
InvalidWorkToDo,
// system engine errors
/// The arguments in the context handed to the system function are malformed or otherwise insufissient
/// the string identifies the argument that was malformed or gives other information about the issue
Expand All @@ -82,6 +84,8 @@ pub enum DandelionError {
SystemFuncResponseError,
/// Tried to call parser for system function
CalledSystemFuncParser,
/// Can't post or subscribe data from other Dandelion server
PostDataError(String),
// dispatcher errors
/// dispatcher does not find a loader for this engine type
DispatcherMissingLoader(String),
Expand Down
8 changes: 8 additions & 0 deletions dandelion_commons/src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ pub enum RecordPoint {
EngineEnd,
/// Return from execution engine
FutureReturn,
/// queue to post data
PostDataQueue,
/// start post data
PostDataStart,
/// end post data
PostDataEnd,
/// return post data
PostDataReturn,
/// Send response back to the client
EndService,
}
Expand Down
33 changes: 31 additions & 2 deletions dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use futures::{
use itertools::Itertools;
use log::trace;
use machine_interface::{
function_driver::{Driver, FunctionConfig, WorkToDo},
function_driver::{Driver, FunctionConfig, ReqwestWorkToDo, WorkToDo, WorkDone},
machine_config::{
get_available_domains, get_available_drivers, get_compatibilty_table, DomainType,
EngineType,
},
memory_domain::{Context, MemoryDomain},
memory_domain::{Context, MemoryDomain}, DataSet,
};
use std::{
collections::{BTreeMap, BTreeSet},
Expand Down Expand Up @@ -97,6 +97,35 @@ impl Dispatcher {
.await;
}

pub async fn post_data(
&self,
id: String,
content: DataSet,
binary: Vec<u8>,
mut recorder: Recorder,
) -> DandelionResult<()> {
let engine_queue = match self.engine_queues.get(&EngineType::Reqwest) {
Some(q) => q,
None => return Err(DandelionError::DispatcherConfigError),
};
let subrecoder = recorder.get_sub_recorder()?;
let args = WorkToDo::Reqwest {
work: ReqwestWorkToDo::PostData { id, content, binary },
recorder: subrecoder,
};
recorder.record(RecordPoint::PostDataQueue)?;
let result = engine_queue.enqueu_work(args).await?;
recorder.record(RecordPoint::PostDataReturn)?;
match result {
WorkDone::PostData => {
return Ok(())
}
_ => {
return Err(DandelionError::PostDataError(String::from("should have WorkDone::PostData")))
}
}
}

pub async fn queue_function_by_name(
&self,
function_name: String,
Expand Down
2 changes: 1 addition & 1 deletion dispatcher/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl FunctionRegistry {
// get the config from the parser
let function_config = driver
.parse_function(
String::from(""),
system_function.to_string(),
*domains.get(type_map.get(engine_type).unwrap()).unwrap(),
)
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions machine_interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ http = { version = "1.1", optional = true }
bytes = { version = "1.6", optional = true}
libloading = { version = "0.8.1" }
log = "0.4.20"
bson = "2.9.0"
bincode = "1.3.3"

# disable benchmarks in library, to not run all unit tests on every benchmark
# also needs to be disabled for criterion flags to work that are not available for tests
Expand Down
34 changes: 32 additions & 2 deletions machine_interface/src/function_driver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{
memory_domain::{Context, MemoryDomain},
DataRequirementList, Position,
memory_domain::{Context, MemoryDomain}, DataRequirementList, DataSet, Position
};
extern crate alloc;
use alloc::sync::Arc;
Expand Down Expand Up @@ -29,16 +28,34 @@ pub struct ElfConfig {
#[derive(Clone, Copy)]
pub enum SystemFunction {
HTTP,
SEND,
RECV
}

impl core::fmt::Display for SystemFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
return match self {
SystemFunction::HTTP => write!(f, "HTTP"),
SystemFunction::SEND => write!(f, "SEND"),
SystemFunction::RECV => write!(f, "RECV")
};
}
}

impl core::str::FromStr for SystemFunction {

type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"HTTP" => Ok(SystemFunction::HTTP),
"SEND" => Ok(SystemFunction::SEND),
"RECV" => Ok(SystemFunction::RECV),
_ => Err(format!("'{}' is not a valid SystemFunction", s)),
}
}
}

#[derive(Clone)]
pub struct WasmConfig {
#[cfg(feature = "wasm")]
Expand Down Expand Up @@ -91,6 +108,14 @@ pub enum ComputeResource {
GPU(u8),
}

pub enum ReqwestWorkToDo {
PostData{
id: String,
content: DataSet,
binary: Vec<u8>
}
}

pub enum WorkToDo {
FunctionArguments {
config: FunctionConfig,
Expand All @@ -115,13 +140,18 @@ pub enum WorkToDo {
static_domain: &'static dyn MemoryDomain,
recorder: Recorder,
},
Reqwest {
work: ReqwestWorkToDo,
recorder: Recorder,
},
Shutdown(),
}

pub enum WorkDone {
Context(Context),
Function(Function),
Resources(Vec<ComputeResource>),
PostData,
}

impl WorkDone {
Expand Down
29 changes: 23 additions & 6 deletions machine_interface/src/function_driver/system_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ use crate::function_driver::SystemFunction;
#[cfg(feature = "reqwest_io")]
pub mod reqwest;

#[cfg(feature = "reqwest_io")]
pub mod http;

#[cfg(feature = "reqwest_io")]
pub mod distributed;

#[cfg(feature = "reqwest_io")]
pub mod context_util;

/// HTTP function currently expects one set with requests formated by HTTP standard (in text).
/// This means one line with the reqest method, a space, request url, another space and the protocol version
/// ex.: "PUT /images/logo.png HTTP/1.1"
Expand All @@ -20,22 +29,30 @@ const HTTP_INPUT_SETS: [&str; 1] = ["request"];
/// Additionally there is a set that only contains the bodies with the same item names as the requests.
const HTTP_OUTPUT_SETS: [&str; 2] = ["response", "body"];

const SEND_INPUT_SETS: [&str; 2] = ["send_meta", "send_data"];

const SEND_OUTPUT_SETS: [&str; 0] = [];

const RECV_INPUT_SETS: [&str; 1] = ["recv_meta"];

const RECV_OUTPUT_SETS: [&str; 1] = ["recv_data"];

/// Provides the input set names for a given system function
pub fn get_system_function_input_sets(function: SystemFunction) -> Vec<String> {
return match function {
SystemFunction::HTTP => HTTP_INPUT_SETS,
SystemFunction::HTTP => HTTP_INPUT_SETS.iter().map(|&name| name.to_string()).collect(),
SystemFunction::SEND => SEND_INPUT_SETS.iter().map(|&name| name.to_string()).collect(),
SystemFunction::RECV => RECV_INPUT_SETS.iter().map(|&name| name.to_string()).collect()
}
.map(|name| name.to_string())
.to_vec();
}

/// Provies the output set names for a given system function
pub fn get_system_function_output_sets(function: SystemFunction) -> Vec<String> {
return match function {
SystemFunction::HTTP => &HTTP_OUTPUT_SETS,
SystemFunction::HTTP => HTTP_OUTPUT_SETS.iter().map(|&name| name.to_string()).collect(),
SystemFunction::SEND => SEND_OUTPUT_SETS.iter().map(|&name| name.to_string()).collect(),
SystemFunction::RECV => RECV_OUTPUT_SETS.iter().map(|&name| name.to_string()).collect()
}
.map(|name| name.to_string())
.to_vec();
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use dandelion_commons::{DandelionError, DandelionResult};

use crate::{memory_domain::{Context, ContextTrait}, DataSet, DataItem};

pub fn get_dataset_by_ident(context: &Context, ident: String) -> DandelionResult<&DataSet> {
return match context.content.iter().find(|set_option| {
if let Some(set) = set_option {
return set.ident == ident;
} else {
return false;
}
}) {
Some(Some(set)) => Ok(set),
_ => {
Err(DandelionError::MalformedSystemFuncArg(format!("No dataset {ident}")))
}
};
}

pub fn get_dataitem_by_ident(dataset: &DataSet, ident: String) -> DandelionResult<&DataItem> {
return match dataset.buffers.iter().find(|dataitem| {
return dataitem.ident == ident;
}) {
Some(dataitem) => Ok(dataitem),
_ => {
Err(DandelionError::MalformedSystemFuncArg(format!("No dataitem {ident}")))
}
};
}

pub fn read_dataitem_content(context: &Context, dataitem: &DataItem) -> DandelionResult<Vec<u8>> {
let mut request_buffer = Vec::with_capacity(dataitem.data.size);
request_buffer.resize(dataitem.data.size, 0);
context.read(dataitem.data.offset, &mut request_buffer)?;
return Ok(request_buffer);
}
Loading