Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions source/extensions/dynamic_modules/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ go_library(
srcs = [
"sdk/go/shared/api.go",
"sdk/go/shared/base.go",
"sdk/go/shared/network_api.go",
"sdk/go/shared/network_base.go",
],
cgo = True,
importpath = "github.com/envoyproxy/envoy/source/extensions/dynamic_modules/sdk/go/shared",
Expand All @@ -75,6 +77,7 @@ go_library(
name = "go_sdk_abi",
srcs = [
"sdk/go/abi/internal.go",
"sdk/go/abi/network.go",
"//source/extensions/dynamic_modules/abi:abi.h",
],
cgo = True,
Expand Down
16 changes: 13 additions & 3 deletions source/extensions/dynamic_modules/sdk/cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ licenses(["notice"]) # Apache 2
# All necessary abstractions for creating plugins.
cc_library(
name = "sdk",
srcs = ["sdk.cc"],
hdrs = ["sdk.h"],
srcs = [
"sdk.cc",
"sdk_network.cc",
],
hdrs = [
"sdk.h",
"sdk_network.h",
],
visibility = ["//visibility:public"],
alwayslink = True,
)
Expand All @@ -19,7 +25,11 @@ cc_library(
# the plugins lib.
cc_library(
name = "sdk_abi",
srcs = ["sdk_internal.cc"],
srcs = [
"sdk_internal.cc",
"sdk_internal_common.h",
Comment on lines +28 to +30
"sdk_internal_network.cc",
],
Comment on lines +30 to +32
visibility = ["//visibility:public"],
deps = [
":sdk",
Expand Down
77 changes: 8 additions & 69 deletions source/extensions/dynamic_modules/sdk/cpp/sdk_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "source/extensions/dynamic_modules/abi/abi.h"

#include "sdk.h"
#include "sdk_internal_common.h"

namespace Envoy {
namespace DynamicModules {
Expand Down Expand Up @@ -146,75 +147,13 @@ using ResponseHeaders = HeaderMapImpl<envoy_dynamic_module_type_http_header_type
using ResponseTrailers = HeaderMapImpl<envoy_dynamic_module_type_http_header_type_ResponseTrailer>;

// Scheduler implementation
template <bool IsConfigScheduler> class SchedulerImplBase : public Scheduler {
public:
SchedulerImplBase(void* host_ptr) : scheduler_ptr_(newScheduler(host_ptr)) {}

void schedule(std::function<void()> func) override {
uint64_t task_id = 0;

// Lock to protect access to tasks_ and next_task_id_ manually
{
std::lock_guard<std::mutex> lock(mutex_);
task_id = next_task_id_++;
tasks_[task_id] = std::move(func);
}

commitScheduler(scheduler_ptr_, task_id);
}

void onScheduled(uint64_t task_id) {
std::function<void()> func;

{
// Lock to protect access to tasks_ manually
std::lock_guard<std::mutex> lock(mutex_);
auto it = tasks_.find(task_id);
if (it != tasks_.end()) {
func = std::move(it->second);
tasks_.erase(it);
}
}

if (func) {
func();
}
}

~SchedulerImplBase() override { deleteScheduler(scheduler_ptr_); }

private:
static void* newScheduler(void* host_ptr) {
if constexpr (IsConfigScheduler) {
return envoy_dynamic_module_callback_http_filter_config_scheduler_new(host_ptr);
} else {
return envoy_dynamic_module_callback_http_filter_scheduler_new(host_ptr);
}
}
static void deleteScheduler(void* scheduler_ptr) {
if constexpr (IsConfigScheduler) {
envoy_dynamic_module_callback_http_filter_config_scheduler_delete(scheduler_ptr);
} else {
envoy_dynamic_module_callback_http_filter_scheduler_delete(scheduler_ptr);
}
}
static void commitScheduler(void* scheduler_ptr, uint64_t task_id) {
if constexpr (IsConfigScheduler) {
envoy_dynamic_module_callback_http_filter_config_scheduler_commit(scheduler_ptr, task_id);
} else {
envoy_dynamic_module_callback_http_filter_scheduler_commit(scheduler_ptr, task_id);
}
}

void* scheduler_ptr_{};

std::mutex mutex_;
uint64_t next_task_id_{1}; // 0 is reserved.
std::map<uint64_t, std::function<void()>> tasks_;
};

using SchedulerImpl = SchedulerImplBase<false>;
using ConfigSchedulerImpl = SchedulerImplBase<true>;
using SchedulerImpl = SchedulerImplBase<envoy_dynamic_module_callback_http_filter_scheduler_new,
envoy_dynamic_module_callback_http_filter_scheduler_commit,
envoy_dynamic_module_callback_http_filter_scheduler_delete>;
using ConfigSchedulerImpl =
SchedulerImplBase<envoy_dynamic_module_callback_http_filter_config_scheduler_new,
envoy_dynamic_module_callback_http_filter_config_scheduler_commit,
envoy_dynamic_module_callback_http_filter_config_scheduler_delete>;

std::optional<std::string_view> bufferViewToOptionalStringView(const BufferView& value,
bool found) {
Expand Down
64 changes: 64 additions & 0 deletions source/extensions/dynamic_modules/sdk/cpp/sdk_internal_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once

#include <cstdint>
#include <functional>
#include <map>
#include <mutex>

#include "sdk.h"

namespace Envoy {
namespace DynamicModules {

/**
* Generic Scheduler implementation backed by a host-managed event dispatcher.
*
* All host-side scheduler pointer types are typedef void*, so the three ABI functions are
* passed as non-type template parameters and called with plain void* arguments.
*
* @tparam NewFn ABI function that allocates the native scheduler: void*(void* host_ptr)
* @tparam CommitFn ABI function that enqueues a task_id on the worker thread:
* void(void* scheduler_ptr, uint64_t task_id)
* @tparam DeleteFn ABI function that destroys the native scheduler: void(void* scheduler_ptr)
*/
template <void* (*NewFn)(void*), void (*CommitFn)(void*, uint64_t), void (*DeleteFn)(void*)>
class SchedulerImplBase : public Scheduler {
public:
explicit SchedulerImplBase(void* host_ptr) : scheduler_ptr_(NewFn(host_ptr)) {}

void schedule(std::function<void()> func) override {
uint64_t task_id = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
task_id = next_task_id_++;
tasks_[task_id] = std::move(func);
}
CommitFn(scheduler_ptr_, task_id);
}

void onScheduled(uint64_t task_id) {
std::function<void()> func;
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = tasks_.find(task_id);
if (it != tasks_.end()) {
func = std::move(it->second);
tasks_.erase(it);
}
}
if (func) {
func();
}
}

~SchedulerImplBase() override { DeleteFn(scheduler_ptr_); }

private:
void* scheduler_ptr_{};
std::mutex mutex_;
uint64_t next_task_id_{1}; // 0 is reserved.
std::map<uint64_t, std::function<void()>> tasks_;
};

} // namespace DynamicModules
} // namespace Envoy
Loading
Loading