Skip to content

feat: stability#1165

Merged
chenquan merged 1 commit intomainfrom
feat/stability
Apr 11, 2026
Merged

feat: stability#1165
chenquan merged 1 commit intomainfrom
feat/stability

Conversation

@chenquan
Copy link
Copy Markdown
Member

@chenquan chenquan commented Apr 11, 2026

Summary by CodeRabbit

  • New Features

    • Added authentication middleware with login attempt tracking and automatic account lockouts
    • Added rate limiting capability for API requests
    • Added environment-based credential validation for secure authentication
    • Improved HTTP authentication with constant-time comparison to prevent timing attacks
  • Refactor

    • Consolidated configuration parsing across all input and output components
    • Optimized internal message queue performance for improved throughput
    • Implemented session context pooling for SQL operations
  • Chores

    • Added enhanced error types for authentication, rate limiting, and configuration issues

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 11, 2026

Walkthrough

This PR introduces security and resource management infrastructure. It adds authentication middleware with login attempt tracking, HTTP constant-time credential validation, a token-bucket rate limiter, a DataFusion SessionContext pool for efficient query execution, and refactors configuration parsing across all input/output builders to use a centralized helper. Three workspace dependencies (subtle, governor, dashmap) support these features.

Changes

Cohort / File(s) Summary
Workspace Dependencies
Cargo.toml, crates/arkflow-plugin/Cargo.toml
Added subtle, governor, and dashmap dependencies to workspace and plugin crate.
Error Handling Core
crates/arkflow-core/src/error_helpers.rs, crates/arkflow-core/src/lib.rs
Introduced parse_config<T> helper, error mapping utilities (map_arrow_error, map_datafusion_error), and config_error! macro. Added four new Error variants: Authentication, RateLimit, LockTimeout, InvalidConfig.
Authentication & Rate Limiting
crates/arkflow-plugin/src/auth_middleware.rs, crates/arkflow-plugin/src/rate_limiter.rs
Implemented AuthMiddleware with failed login attempt tracking and lockout enforcement. Implemented SimpleRateLimiter using token-bucket algorithm with CAS-based atomic token decrement.
Resource Pooling
crates/arkflow-plugin/src/context_pool.rs
Introduced SessionContextPool managing fixed-size reusable SessionContext instances with async acquire/release semantics.
Buffer Concurrency Improvement
crates/arkflow-plugin/src/buffer/window.rs
Replaced nested Arc<RwLock<HashMap<...>>> queue structure with DashMap for lock-free concurrent access; updated process_window, write, and queue_is_empty methods.
SQL Query Execution
crates/arkflow-plugin/src/processor/sql.rs
Integrated SessionContextPool for context reuse (pool size hardcoded to 4); replaced per-request context creation with acquire()/release_context() flow.
Configuration Refactoring
crates/arkflow-plugin/src/input/{file,http,kafka,memory,mqtt,multiple_inputs,nats,pulsar,sql,websocket}.rs, crates/arkflow-plugin/src/output/{http,influxdb,mqtt,nats,pulsar,redis,sql,stdout}.rs
Unified input/output builder configuration parsing by delegating to centralized parse_config helper; removed manual Option/serde_json unwrapping and hardcoded error messages.
HTTP Input Security
crates/arkflow-plugin/src/input/http.rs
Added constant-time equality checking for Basic auth credentials (byte-wise comparison) and Bearer tokens using subtle crate; added test verifying timing resistance.
Module Exports
crates/arkflow-plugin/src/lib.rs
Added context_pool and rate_limiter module declarations.
Minor Refactoring
crates/arkflow-plugin/src/processor/vrl.rs, crates/arkflow-plugin/src/output/influxdb.rs
Reordered variable bindings in VRL conversion; adjusted formatting and function signatures in InfluxDB output without behavioral changes.

Sequence Diagrams

sequenceDiagram
    participant Client
    participant AuthMiddleware
    participant Environment
    participant App

    Client->>AuthMiddleware: record_failure(identifier)
    AuthMiddleware->>AuthMiddleware: increment attempt count<br/>check if max_attempts reached
    alt max_attempts exceeded
        AuthMiddleware->>AuthMiddleware: set locked_until = now + lock_duration
    end

    Client->>AuthMiddleware: is_locked(identifier)
    AuthMiddleware->>AuthMiddleware: compare Instant::now()<br/>against locked_until
    AuthMiddleware-->>Client: bool (locked or available)

    Client->>AuthMiddleware: check_credentials_from_env(username, password)
    AuthMiddleware->>Environment: read *_USER and *_PASSWORD env vars
    Environment-->>AuthMiddleware: env values
    AuthMiddleware->>AuthMiddleware: constant-time comparison<br/>of credentials
    AuthMiddleware-->>Client: bool (valid or invalid)

    Client->>AuthMiddleware: record_success(identifier)
    AuthMiddleware->>AuthMiddleware: remove identifier entry
Loading
sequenceDiagram
    participant Client
    participant SqlProcessor
    participant SessionContextPool
    participant DataFusion as DataFusion<br/>SessionContext

    Client->>SqlProcessor: process(message_batch)
    SqlProcessor->>SessionContextPool: acquire()
    loop until context available
        SessionContextPool->>SessionContextPool: check available indices<br/>wait 1ms if none
    end
    SessionContextPool-->>SqlProcessor: Arc<SessionContext>

    SqlProcessor->>DataFusion: register_batch(input_batch)
    SqlProcessor->>DataFusion: execute_statement()
    DataFusion-->>SqlProcessor: result

    SqlProcessor->>DataFusion: deregister_table()
    SqlProcessor->>SessionContextPool: release_context(context)
    SessionContextPool->>SessionContextPool: find context in pool<br/>push index to available

    SqlProcessor-->>Client: output_batch
Loading
sequenceDiagram
    participant Client
    participant SimpleRateLimiter
    participant AtomicState

    Client->>SimpleRateLimiter: check()
    SimpleRateLimiter->>SimpleRateLimiter: compute elapsed_nanos<br/>since last_update
    SimpleRateLimiter->>SimpleRateLimiter: tokens_to_add = elapsed_nanos<br/>* requests_per_second / 1_000_000_000
    SimpleRateLimiter->>SimpleRateLimiter: cap tokens at burst_size
    SimpleRateLimiter->>AtomicState: CAS loop: decrement tokens<br/>if tokens > 0
    alt tokens decremented successfully
        SimpleRateLimiter-->>Client: true (request allowed)
    else insufficient tokens
        SimpleRateLimiter-->>Client: false (request denied)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

enhancement, security, performance


🐰 Hops with glee through fresh configs clean,
Sessions pooled and rate-limited keen,
AuthMiddleware guards with constant-time care,
DashMap concurrency floats through the air,
Infrastructure springs forth, robust and bright!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'feat: stability' is vague and generic, using a non-descriptive term that doesn't convey meaningful information about the specific changes in this substantial pull request. Consider a more descriptive title that captures the primary changes, such as 'feat: add rate limiting, authentication, and session pooling utilities' or 'feat: enhance error handling and add security middleware'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/stability

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

🧹 Nitpick comments (4)
crates/arkflow-plugin/src/context_pool.rs (1)

91-102: Replace the polling loop with a wake-driven notification pattern.

The fixed-interval sleep at line 101 causes unnecessary scheduler wakes and latency under contention. Replace the Mutex-based retry loop with tokio::sync::Notify, signaling waiters only when contexts are actually released back to the pool.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/context_pool.rs` around lines 91 - 102, The acquire
loop in acquire() polls with tokio::time::sleep causing wasted wakes; replace it
with a tokio::sync::Notify-based wait/notify: add a Notify field to the pool
struct (e.g., notify: Notify), change acquire() to lock available, pop an index
if present and return the SessionContext clone, otherwise drop the mutex and
await notify.notified().await (do not sleep) and then retry; ensure the code
that returns/releases a context (the release/return function that pushes back
into available and current code paths that free contexts) calls
notify.notify_one() or notify.notify_waiters() after pushing the index so
waiting tasks are woken. This removes the fixed-interval sleep and uses
wake-driven notifications while preserving the available mutex around the
available Vec and contexts access.
crates/arkflow-plugin/src/processor/sql.rs (2)

235-240: Inconsistent config parsing: SqlProcessorBuilder doesn't use parse_config.

Other builders in this PR have migrated to parse_config, but SqlProcessorBuilder still uses manual config.is_none() check and serde_json::from_value. For consistency, this should also use the centralized helper.

Proposed fix

Add import at top of file:

use arkflow_core::error_helpers::parse_config;

Then update the build method:

     fn build(
         &self,
         _name: Option<&String>,
         config: &Option<serde_json::Value>,
         resource: &Resource,
     ) -> Result<Arc<dyn Processor>, Error> {
-        if config.is_none() {
-            return Err(Error::Config(
-                "Batch processor configuration is missing".to_string(),
-            ));
-        }
-        let config: SqlProcessorConfig = serde_json::from_value(config.clone().unwrap())?;
+        let config: SqlProcessorConfig = parse_config(config, "SQL processor")?;

         Ok(Arc::new(SqlProcessor::new(config, resource)?))
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/processor/sql.rs` around lines 235 - 240, The
SqlProcessorBuilder::build method currently manually checks config.is_none() and
calls serde_json::from_value; replace that with the centralized parse_config
helper: add use arkflow_core::error_helpers::parse_config; then call
parse_config(config, "SqlProcessorConfig") (or appropriate type hint) to produce
a SqlProcessorConfig, handling/propagating the returned Result and removing the
manual is_none()/from_value logic so the builder matches other builders' error
messages and parsing behavior.

88-89: Consider making pool size configurable.

The pool size is hardcoded to 4. For production workloads with varying concurrency requirements, this should ideally be configurable via SqlProcessorConfig.

Suggested config extension
 struct SqlProcessorConfig {
     /// SQL query statement
     query: String,
     /// Table name (used in SQL queries)
     table_name: Option<String>,
     temporary_list: Option<Vec<TemporaryConfig>>,
+    /// Context pool size (default: 4)
+    #[serde(default = "default_pool_size")]
+    pool_size: usize,
 }

+fn default_pool_size() -> usize {
+    4
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/processor/sql.rs` around lines 88 - 89, The
SessionContextPool size is hardcoded to 4; update the SqlProcessorConfig (add a
field like session_pool_size: usize or pool_size: usize with a default of 4) and
use that value when constructing the pool instead of the literal 4 (replace
SessionContextPool::new(4)? with
SessionContextPool::new(config.session_pool_size)?), ensuring context_pool
remains Arc::new(SessionContextPool::new(...)?); also add validation/defaulting
for missing/zero values in the SqlProcessorConfig constructor or builder.
crates/arkflow-core/src/lib.rs (1)

98-109: Consolidate InvalidConfig with existing Config variant.

The new InvalidConfig variant ("Invalid configuration: {0}") is semantically redundant with the established Config variant ("Configuration error: {0}"). The Config variant is already widely used throughout the codebase for configuration-related errors, while InvalidConfig remains unused. Using both introduces unnecessary confusion about which to employ.

Remove InvalidConfig and use the existing Config variant instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-core/src/lib.rs` around lines 98 - 109, Remove the unused
InvalidConfig enum variant and consolidate configuration errors under the
existing Config variant: delete the InvalidConfig("Invalid configuration: {0}")
declaration and replace any references to InvalidConfig with Config (which uses
"Configuration error: {0}"). Update any match arms, error construction sites,
and documentation/comments that reference InvalidConfig to use Config instead,
ensuring all usages compile against the remaining Config variant and its message
format.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/arkflow-plugin/src/auth_middleware.rs`:
- Around line 72-87: In record_failure (and similarly the block at 125-134)
check the Failed_login_attempt.last_attempt timestamp before incrementing: if
the previous last_attempt is older than the configured lock window/decay period,
reset entry.count to 0 and entry.locked_until to None so stale counters don't
persist and re-lock immediately; then set last_attempt = Instant::now() and
increment. Also update cleanup_expired to remove entries whose last_attempt is
older than that same decay window even when locked_until is None, preventing
unbounded growth of the attempts map. Use the existing fields
(Failed_login_attempt, last_attempt, locked_until) and the struct's config
(max_attempts/lock_duration/decay window) to implement these checks and resets.
- Around line 200-208: check_credentials_from_env currently mixes using its
first parameter as both the env-var namespace and the expected username; decide
on a single contract and implement it consistently (recommendation: treat the
first parameter as the namespace/key used to build env vars, e.g.
"{NAMESPACE}_USER" and "{NAMESPACE}_PASSWORD", where env_user holds the allowed
username). Change check_credentials_from_env(username, password) to: 1) build
env var names from the namespace param (username.to_uppercase().replace('-',
"_")), 2) return false if either env var is missing, 3) compare the supplied
username to env_user using normal equality and compare the supplied password to
env_pass using a constant-time comparison (e.g., subtle::ConstantTimeEq or
another constant-time equality helper) to avoid timing attacks. Apply the same
consistent change to the other corresponding implementation referenced (the
block at lines ~286-294).
- Around line 72-80: The code references nonexistent symbols: replace the
incorrect struct name Failed_login_attempt with the defined FailedLoginAttempt
wherever used (e.g., in record_failure's or_insert call) and fix the module path
stdenv::remove_var to std::env::remove_var; update any other occurrences of
those exact identifiers (e.g., in functions record_failure and the removal call
near the token/lock handling) so the names match the declared FailedLoginAttempt
type and the standard env module.

In `@crates/arkflow-plugin/src/buffer/window.rs`:
- Around line 184-187: The per-input queue is being written with push_front
causing newest batches to be popped first by process_window() (which drains with
pop_front()) and reversing arrival order before concat_batches(); change the
enqueue to push_back when inserting into self.queue (the entry used in the
snippet) so that FIFO order is preserved for processing in process_window() and
concat_batches().

In `@crates/arkflow-plugin/src/context_pool.rs`:
- Around line 110-116: release_context currently blindly pushes the slot index
into self.available which allows duplicate releases of the same Arc and causes
multiple tasks to acquire the same pooled SessionContext; fix by adding an
idempotency guard: either add a checked-out tracking structure (e.g., a
Vec<bool> or HashSet<usize> protected by the same async Mutex alongside
available) and in release_context check that the index is still marked
checked-out before pushing and clear the flag, or change the API to return a
non-cloneable CheckoutGuard type from acquire (implement Drop for CheckoutGuard
to call release_context once) so each checkout can only be released once; locate
and modify release_context (and the corresponding acquire/checkout logic and the
shared state fields self.available/self.contexts) to perform the
existence/checked-out check and no-op if already released.

In `@crates/arkflow-plugin/src/input/http.rs`:
- Around line 349-437: The test test_constant_time_comparison is flaky because
it asserts a wall-clock timing threshold; remove all timing measurement and
time-difference assertions and only assert functional correctness of
validate_auth (keep the assert!(result_valid) and assert!(!result_wrong) /
bearers). Locate and edit test_constant_time_comparison to delete the
Instant::now() calls, the time_* variables, and both blocks that compute/compare
time_diff for Basic and Bearer cases, leaving only the credential construction
and the validation result assertions against validate_auth.

In `@crates/arkflow-plugin/src/lib.rs`:
- Around line 18-24: lib.rs is missing a module declaration for the existing
auth_middleware file so the auth middleware isn't compiled or exposed; add a
public module declaration for auth_middleware (i.e., add pub mod
auth_middleware;) in the crate root lib.rs alongside the other pub mod lines so
the auth_middleware module (auth_middleware) is compiled and available to
import/use.

In `@crates/arkflow-plugin/src/output/http.rs`:
- Line 227: The config error context string is incorrect: change the label
passed to parse_config so output-side errors are labeled correctly. In the call
where you parse the HttpOutput config (the line using parse_config(config,
"HttpOutput input") ), replace the second argument with an appropriate output
label such as "HttpOutput output" (or simply "HttpOutput") so parse_config
reports output-side configuration errors accurately.

In `@crates/arkflow-plugin/src/output/influxdb.rs`:
- Line 515: The config parse call currently passes the wrong component label;
update the parse_config invocation that creates an InfluxDBOutputConfig (the
call using parse_config(config, "InfluxDBOutput input")) to use the correct
component name for this builder (e.g., "InfluxDBOutput") so Error::Config
reports the right component; change the string passed to parse_config near the
InfluxDBOutputConfig construction to the correct label.

In `@crates/arkflow-plugin/src/output/mqtt.rs`:
- Line 215: The config parsing call uses the wrong component label which will
misattribute errors; in the call to parse_config where you construct an
MqttOutputConfig (parse_config(config, "MqttOutput input")), change the
component name string to something appropriate for output-side errors (e.g.,
"MqttOutput") so parse_config reports the correct component; update the
invocation that creates the MqttOutputConfig to use the corrected label.

In `@crates/arkflow-plugin/src/output/nats.rs`:
- Line 226: The config parse call uses the misleading component name "NatsOutput
input"; update the parse_config invocation so the component name reflects the
output (e.g., "NatsOutput" or "NatsOutput output") to produce correct error
messages—modify the call where NatsOutputConfig is parsed (parse_config(config,
"...") for the NatsOutputConfig binding) to use the output-specific name.

In `@crates/arkflow-plugin/src/output/pulsar.rs`:
- Line 190: The component name passed to parse_config is incorrect—change the
string literal "PulsarOutput input" to "PulsarOutput" where PulsarOutputConfig
is parsed (the parse_config call that returns a PulsarOutputConfig in
pulsar.rs), so error messages and metrics reference the correct output component
name; ensure this occurs in the same parse_config(...) invocation that
initializes the PulsarOutputConfig.

In `@crates/arkflow-plugin/src/output/redis.rs`:
- Line 185: The call to parse_config incorrectly labels the component as
"RedisOutput input", causing misleading error messages; update the parse_config
invocation that produces RedisOutputConfig (the line using parse_config(config,
"RedisOutput input")?) to use the correct component name "RedisOutput" so
errors/readable messages reflect the output builder, leaving RedisOutputConfig
and surrounding logic unchanged.

In `@crates/arkflow-plugin/src/output/sql.rs`:
- Line 431: The parse_config call uses the wrong component name string causing
confusing errors; update the call to parse_config(config, "SqlOutput input") in
crates/arkflow-plugin/src/output/sql.rs so the component name is "SqlOutput"
instead of "SqlOutput input" (i.e., change the literal passed to parse_config
when constructing SqlOutputConfig), and scan nearby output builder usages (e.g.,
any other calls in the same file referencing SqlOutput or SqlOutputConfig) to
ensure they use the consistent "SqlOutput" component name.

In `@crates/arkflow-plugin/src/output/stdout.rs`:
- Line 103: Update the parse_config context string to reflect this is an output
component: in crates/arkflow-plugin/src/output/stdout.rs change the parse_config
call that currently reads parse_config(config, "StdoutOutput input") to use a
clearer output context such as "StdoutOutput output" (or "StdoutOutput output
config") so the StdoutOutputConfig parsing error messages correctly identify the
component as an output.

In `@crates/arkflow-plugin/src/processor/sql.rs`:
- Around line 416-424: The test's comment and assertion are inconsistent and it
contains a debug println: update the assertion to match the comment's intended
threshold (use duration.as_millis() < 100) or alternatively update the comment
to reflect the intended 500ms threshold, and remove the println! debug artifact
(or replace it with a proper logger call such as log::debug! or
tracing::debug!); locate the assertion referencing duration.as_millis() and the
println! in the Context pool performance test block and apply the change.

In `@crates/arkflow-plugin/src/rate_limiter.rs`:
- Around line 58-109: refill_tokens currently reads tokens, computes new_tokens
and unconditionally stores it, which can overwrite concurrent consumes from
check() and permit bursts; change refill_tokens to perform an atomic update
(e.g., AtomicU32::fetch_update or a compare_exchange loop) on state.tokens so it
reads the current token value and only writes min(current + tokens_to_add,
burst_size) if the token value hasn't changed (retry on Err), then update
state.last_update only after successfully updating tokens (use compare_exchange
on state.last_update or store now only when last_update < now); apply the same
atomic-safe approach in remaining_capacity so both refill_tokens, check, and
remaining_capacity coordinate via atomic compare_exchange/fetch_update on
state.tokens and last_update rather than blind loads/stores.

---

Nitpick comments:
In `@crates/arkflow-core/src/lib.rs`:
- Around line 98-109: Remove the unused InvalidConfig enum variant and
consolidate configuration errors under the existing Config variant: delete the
InvalidConfig("Invalid configuration: {0}") declaration and replace any
references to InvalidConfig with Config (which uses "Configuration error: {0}").
Update any match arms, error construction sites, and documentation/comments that
reference InvalidConfig to use Config instead, ensuring all usages compile
against the remaining Config variant and its message format.

In `@crates/arkflow-plugin/src/context_pool.rs`:
- Around line 91-102: The acquire loop in acquire() polls with
tokio::time::sleep causing wasted wakes; replace it with a
tokio::sync::Notify-based wait/notify: add a Notify field to the pool struct
(e.g., notify: Notify), change acquire() to lock available, pop an index if
present and return the SessionContext clone, otherwise drop the mutex and await
notify.notified().await (do not sleep) and then retry; ensure the code that
returns/releases a context (the release/return function that pushes back into
available and current code paths that free contexts) calls notify.notify_one()
or notify.notify_waiters() after pushing the index so waiting tasks are woken.
This removes the fixed-interval sleep and uses wake-driven notifications while
preserving the available mutex around the available Vec and contexts access.

In `@crates/arkflow-plugin/src/processor/sql.rs`:
- Around line 235-240: The SqlProcessorBuilder::build method currently manually
checks config.is_none() and calls serde_json::from_value; replace that with the
centralized parse_config helper: add use
arkflow_core::error_helpers::parse_config; then call parse_config(config,
"SqlProcessorConfig") (or appropriate type hint) to produce a
SqlProcessorConfig, handling/propagating the returned Result and removing the
manual is_none()/from_value logic so the builder matches other builders' error
messages and parsing behavior.
- Around line 88-89: The SessionContextPool size is hardcoded to 4; update the
SqlProcessorConfig (add a field like session_pool_size: usize or pool_size:
usize with a default of 4) and use that value when constructing the pool instead
of the literal 4 (replace SessionContextPool::new(4)? with
SessionContextPool::new(config.session_pool_size)?), ensuring context_pool
remains Arc::new(SessionContextPool::new(...)?); also add validation/defaulting
for missing/zero values in the SqlProcessorConfig constructor or builder.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bf2a0e27-e22f-4758-9de4-40e152861aba

📥 Commits

Reviewing files that changed from the base of the PR and between 7fe5d17 and 7c4ccbf.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (29)
  • Cargo.toml
  • crates/arkflow-core/src/error_helpers.rs
  • crates/arkflow-core/src/lib.rs
  • crates/arkflow-plugin/Cargo.toml
  • crates/arkflow-plugin/src/auth_middleware.rs
  • crates/arkflow-plugin/src/buffer/window.rs
  • crates/arkflow-plugin/src/context_pool.rs
  • crates/arkflow-plugin/src/input/file.rs
  • crates/arkflow-plugin/src/input/http.rs
  • crates/arkflow-plugin/src/input/kafka.rs
  • crates/arkflow-plugin/src/input/memory.rs
  • crates/arkflow-plugin/src/input/mqtt.rs
  • crates/arkflow-plugin/src/input/multiple_inputs.rs
  • crates/arkflow-plugin/src/input/nats.rs
  • crates/arkflow-plugin/src/input/pulsar.rs
  • crates/arkflow-plugin/src/input/sql.rs
  • crates/arkflow-plugin/src/input/websocket.rs
  • crates/arkflow-plugin/src/lib.rs
  • crates/arkflow-plugin/src/output/http.rs
  • crates/arkflow-plugin/src/output/influxdb.rs
  • crates/arkflow-plugin/src/output/mqtt.rs
  • crates/arkflow-plugin/src/output/nats.rs
  • crates/arkflow-plugin/src/output/pulsar.rs
  • crates/arkflow-plugin/src/output/redis.rs
  • crates/arkflow-plugin/src/output/sql.rs
  • crates/arkflow-plugin/src/output/stdout.rs
  • crates/arkflow-plugin/src/processor/sql.rs
  • crates/arkflow-plugin/src/processor/vrl.rs
  • crates/arkflow-plugin/src/rate_limiter.rs

Comment on lines +72 to +87
pub async fn record_failure(&self, identifier: &str) {
let mut attempts = self.failed_attempts.write().await;
let entry = attempts
.entry(identifier.to_string())
.or_insert(Failed_login_attempt {
count: 0,
last_attempt: Instant::now(),
locked_until: None,
});

entry.count += 1;
entry.last_attempt = Instant::now();

if entry.count >= self.max_attempts {
entry.locked_until = Some(Instant::now() + self.lock_duration);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Expired lockouts never fully reset.

Once locked_until has passed, record_failure() keeps incrementing the stale counter, so the next bad login immediately re-locks the account unless cleanup_expired() happened to run first. cleanup_expired() also keeps every non-locked identifier forever, so spray-and-pray attempts can grow this map without bound. Use last_attempt to expire stale counters before incrementing.

Also applies to: 125-134

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/auth_middleware.rs` around lines 72 - 87, In
record_failure (and similarly the block at 125-134) check the
Failed_login_attempt.last_attempt timestamp before incrementing: if the previous
last_attempt is older than the configured lock window/decay period, reset
entry.count to 0 and entry.locked_until to None so stale counters don't persist
and re-lock immediately; then set last_attempt = Instant::now() and increment.
Also update cleanup_expired to remove entries whose last_attempt is older than
that same decay window even when locked_until is None, preventing unbounded
growth of the attempts map. Use the existing fields (Failed_login_attempt,
last_attempt, locked_until) and the struct's config
(max_attempts/lock_duration/decay window) to implement these checks and resets.

Comment on lines +72 to +80
pub async fn record_failure(&self, identifier: &str) {
let mut attempts = self.failed_attempts.write().await;
let entry = attempts
.entry(identifier.to_string())
.or_insert(Failed_login_attempt {
count: 0,
last_attempt: Instant::now(),
locked_until: None,
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n 'struct FailedLoginAttempt|Failed_login_attempt|stdenv::remove_var' crates/arkflow-plugin/src/auth_middleware.rs

Repository: arkflow-rs/arkflow

Length of output: 190


Fix the unresolved identifiers before wiring this module in.

Failed_login_attempt on line 76 and stdenv::remove_var on line 301 do not exist. The struct is defined as FailedLoginAttempt (line 27), and the correct module path is std::env. As soon as auth_middleware.rs is declared in lib.rs, this file fails to compile.

Proposed fixes
         let entry = attempts
             .entry(identifier.to_string())
-            .or_insert(Failed_login_attempt {
+            .or_insert(FailedLoginAttempt {
                 count: 0,
                 last_attempt: Instant::now(),
                 locked_until: None,
             });
-        stdenv::remove_var("TEST_PASSWORD");
+        std::env::remove_var("TEST_PASSWORD");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn record_failure(&self, identifier: &str) {
let mut attempts = self.failed_attempts.write().await;
let entry = attempts
.entry(identifier.to_string())
.or_insert(Failed_login_attempt {
count: 0,
last_attempt: Instant::now(),
locked_until: None,
});
pub async fn record_failure(&self, identifier: &str) {
let mut attempts = self.failed_attempts.write().await;
let entry = attempts
.entry(identifier.to_string())
.or_insert(FailedLoginAttempt {
count: 0,
last_attempt: Instant::now(),
locked_until: None,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/auth_middleware.rs` around lines 72 - 80, The code
references nonexistent symbols: replace the incorrect struct name
Failed_login_attempt with the defined FailedLoginAttempt wherever used (e.g., in
record_failure's or_insert call) and fix the module path stdenv::remove_var to
std::env::remove_var; update any other occurrences of those exact identifiers
(e.g., in functions record_failure and the removal call near the token/lock
handling) so the names match the declared FailedLoginAttempt type and the
standard env module.

Comment on lines +200 to +208
pub fn check_credentials_from_env(username: &str, password: &str) -> bool {
// Convert username to uppercase for environment variable lookup
let env_username = format!("{}_USER", username.to_uppercase().replace('-', "_"));
let env_password = format!("{}_PASSWORD", username.to_uppercase().replace('-', "_"));

if let (Ok(env_user), Ok(env_pass)) =
(std::env::var(&env_username), std::env::var(&env_password))
{
username == env_user && password == env_pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

check_credentials_from_env() has an inconsistent lookup contract.

The first parameter is used both to choose *_USER / *_PASSWORD and to validate the stored username, so TEST_USER=testuser still makes check_credentials_from_env("test", ...) return false in the case exercised below. That ambiguity is enough to reject valid credentials, and the password check is still plain ==. Pick one contract here (lookup namespace vs. actual username) and implement it consistently.

Also applies to: 286-294

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/auth_middleware.rs` around lines 200 - 208,
check_credentials_from_env currently mixes using its first parameter as both the
env-var namespace and the expected username; decide on a single contract and
implement it consistently (recommendation: treat the first parameter as the
namespace/key used to build env vars, e.g. "{NAMESPACE}_USER" and
"{NAMESPACE}_PASSWORD", where env_user holds the allowed username). Change
check_credentials_from_env(username, password) to: 1) build env var names from
the namespace param (username.to_uppercase().replace('-', "_")), 2) return false
if either env var is missing, 3) compare the supplied username to env_user using
normal equality and compare the supplied password to env_pass using a
constant-time comparison (e.g., subtle::ConstantTimeEq or another constant-time
equality helper) to avoid timing attacks. Apply the same consistent change to
the other corresponding implementation referenced (the block at lines ~286-294).

Comment on lines +184 to +187
self.queue
.entry(input_name)
.or_insert(Arc::new(RwLock::new(VecDeque::new())));
.or_insert_with(|| VecDeque::new())
.push_front((msg, ack));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve FIFO order when enqueuing window batches.

process_window() drains each per-input deque with pop_front(), so switching the writer to push_front() makes the newest batch come out first. That reverses arrival order inside each input before concat_batches() builds the merged batch.

🐛 Proposed fix
         self.queue
             .entry(input_name)
             .or_insert_with(|| VecDeque::new())
-            .push_front((msg, ack));
+            .push_back((msg, ack));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.queue
.entry(input_name)
.or_insert(Arc::new(RwLock::new(VecDeque::new())));
.or_insert_with(|| VecDeque::new())
.push_front((msg, ack));
self.queue
.entry(input_name)
.or_insert_with(|| VecDeque::new())
.push_back((msg, ack));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/buffer/window.rs` around lines 184 - 187, The
per-input queue is being written with push_front causing newest batches to be
popped first by process_window() (which drains with pop_front()) and reversing
arrival order before concat_batches(); change the enqueue to push_back when
inserting into self.queue (the entry used in the snippet) so that FIFO order is
preserved for processing in process_window() and concat_batches().

Comment on lines +110 to +116
pub async fn release_context(&self, context: Arc<SessionContext>) {
// Find the index of this context
for (i, ctx) in self.contexts.iter().enumerate() {
if Arc::ptr_eq(ctx, &context) {
let mut available = self.available.lock().await;
available.push(i);
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

release_context() needs an idempotency guard.

Because callers receive a clonable Arc<SessionContext>, the same pointer can be released more than once. This method always pushes the matching index, so available can accumulate duplicates and multiple tasks can acquire the same pooled context concurrently. Track checked-out state or return a non-cloneable checkout guard that releases exactly once.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/context_pool.rs` around lines 110 - 116,
release_context currently blindly pushes the slot index into self.available
which allows duplicate releases of the same Arc and causes multiple tasks to
acquire the same pooled SessionContext; fix by adding an idempotency guard:
either add a checked-out tracking structure (e.g., a Vec<bool> or HashSet<usize>
protected by the same async Mutex alongside available) and in release_context
check that the index is still marked checked-out before pushing and clear the
flag, or change the API to return a non-cloneable CheckoutGuard type from
acquire (implement Drop for CheckoutGuard to call release_context once) so each
checkout can only be released once; locate and modify release_context (and the
corresponding acquire/checkout logic and the shared state fields
self.available/self.contexts) to perform the existence/checked-out check and
no-op if already released.

));
}
let config: RedisOutputConfig = serde_json::from_value(config.clone().unwrap())?;
let config: RedisOutputConfig = parse_config(config, "RedisOutput input")?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent component name: "RedisOutput input" should be "RedisOutput".

The component name passed to parse_config contains "input" but this is an output builder. This will produce confusing error messages like "RedisOutput input configuration is missing" when configuration is absent.

Proposed fix
-        let config: RedisOutputConfig = parse_config(config, "RedisOutput input")?;
+        let config: RedisOutputConfig = parse_config(config, "RedisOutput")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config: RedisOutputConfig = parse_config(config, "RedisOutput input")?;
let config: RedisOutputConfig = parse_config(config, "RedisOutput")?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/output/redis.rs` at line 185, The call to
parse_config incorrectly labels the component as "RedisOutput input", causing
misleading error messages; update the parse_config invocation that produces
RedisOutputConfig (the line using parse_config(config, "RedisOutput input")?) to
use the correct component name "RedisOutput" so errors/readable messages reflect
the output builder, leaving RedisOutputConfig and surrounding logic unchanged.

}

let config: SqlOutputConfig = serde_json::from_value(config.clone().unwrap())?;
let config: SqlOutputConfig = parse_config(config, "SqlOutput input")?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent component name: "SqlOutput input" should be "SqlOutput".

Same issue as other output builders - using "input" suffix for an output component will produce confusing error messages.

Proposed fix
-        let config: SqlOutputConfig = parse_config(config, "SqlOutput input")?;
+        let config: SqlOutputConfig = parse_config(config, "SqlOutput")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config: SqlOutputConfig = parse_config(config, "SqlOutput input")?;
let config: SqlOutputConfig = parse_config(config, "SqlOutput")?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/output/sql.rs` at line 431, The parse_config call
uses the wrong component name string causing confusing errors; update the call
to parse_config(config, "SqlOutput input") in
crates/arkflow-plugin/src/output/sql.rs so the component name is "SqlOutput"
instead of "SqlOutput input" (i.e., change the literal passed to parse_config
when constructing SqlOutputConfig), and scan nearby output builder usages (e.g.,
any other calls in the same file referencing SqlOutput or SqlOutputConfig) to
ensure they use the consistent "SqlOutput" component name.

));
}
let config: StdoutOutputConfig = serde_json::from_value(config.clone().unwrap())?;
let config: StdoutOutputConfig = parse_config(config, "StdoutOutput input")?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Adjust parse context string for clearer error messages.

Line 103 uses "StdoutOutput input"; this should identify an output component.

Suggested patch
-        let config: StdoutOutputConfig = parse_config(config, "StdoutOutput input")?;
+        let config: StdoutOutputConfig = parse_config(config, "Stdout output")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config: StdoutOutputConfig = parse_config(config, "StdoutOutput input")?;
let config: StdoutOutputConfig = parse_config(config, "Stdout output")?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/output/stdout.rs` at line 103, Update the
parse_config context string to reflect this is an output component: in
crates/arkflow-plugin/src/output/stdout.rs change the parse_config call that
currently reads parse_config(config, "StdoutOutput input") to use a clearer
output context such as "StdoutOutput output" (or "StdoutOutput output config")
so the StdoutOutputConfig parsing error messages correctly identify the
component as an output.

Comment on lines +416 to +424
// With context pool, 10 queries should complete in < 100ms
// Without pool, this would typically take > 500ms
assert!(
duration.as_millis() < 500,
"Context pool performance test failed: {}ms",
duration.as_millis()
);

println!("10 queries completed in {:?}", duration);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test comment/assertion mismatch and debug artifact.

  1. Comment says "< 100ms" but assertion checks "< 500ms"
  2. println! is a debug artifact that should be removed or converted to proper logging
Proposed fix
-        // With context pool, 10 queries should complete in < 100ms
-        // Without pool, this would typically take > 500ms
+        // With context pool, 10 queries should complete quickly
         assert!(
             duration.as_millis() < 500,
             "Context pool performance test failed: {}ms",
             duration.as_millis()
         );
-
-        println!("10 queries completed in {:?}", duration);
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// With context pool, 10 queries should complete in < 100ms
// Without pool, this would typically take > 500ms
assert!(
duration.as_millis() < 500,
"Context pool performance test failed: {}ms",
duration.as_millis()
);
println!("10 queries completed in {:?}", duration);
// With context pool, 10 queries should complete quickly
assert!(
duration.as_millis() < 500,
"Context pool performance test failed: {}ms",
duration.as_millis()
);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/processor/sql.rs` around lines 416 - 424, The
test's comment and assertion are inconsistent and it contains a debug println:
update the assertion to match the comment's intended threshold (use
duration.as_millis() < 100) or alternatively update the comment to reflect the
intended 500ms threshold, and remove the println! debug artifact (or replace it
with a proper logger call such as log::debug! or tracing::debug!); locate the
assertion referencing duration.as_millis() and the println! in the Context pool
performance test block and apply the change.

Comment on lines +58 to +109
pub fn check(&self) -> bool {
self.refill_tokens();

// Try to consume one token
let mut current = self.state.tokens.load(Ordering::Relaxed);
loop {
if current == 0 {
return false;
}
let new = current - 1;
match self.state.tokens.compare_exchange(
current,
new,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
}

/// Get the number of requests remaining in the current period
pub fn remaining_capacity(&self) -> u32 {
self.refill_tokens();
self.state.tokens.load(Ordering::Relaxed)
}

fn refill_tokens(&self) {
let now = Self::now_nanos();
let last_update = self.state.last_update.load(Ordering::Relaxed);

if now <= last_update {
return;
}

let elapsed_nanos = now - last_update;
let elapsed_seconds = elapsed_nanos as f64 / 1_000_000_000.0;

// Calculate how many tokens to add based on elapsed time
let tokens_to_add = (elapsed_seconds * self.requests_per_second as f64) as u32;

if tokens_to_add == 0 {
return;
}

// Update tokens, but don't exceed burst_size
let current = self.state.tokens.load(Ordering::Relaxed);
let new_tokens = std::cmp::min(current + tokens_to_add, self.burst_size);

self.state.tokens.store(new_tokens, Ordering::Relaxed);
self.state.last_update.store(now, Ordering::Relaxed);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

refill_tokens can overwrite concurrent consumes.

Line 105 snapshots tokens, but Line 108 stores a derived value without coordinating with concurrent check() calls. If another thread decrements the bucket in between, this write puts tokens back, so parallel callers can exceed the configured rate limit.

🔒 Suggested direction
-use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
+use std::time::Instant;
@@
 pub struct SimpleRateLimiter {
     requests_per_second: u32,
     burst_size: u32,
-    state: Arc<RateLimiterState>,
+    state: Arc<Mutex<RateLimiterState>>,
 }
 
 struct RateLimiterState {
-    tokens: AtomicU32,
-    last_update: AtomicU64, // Stores nanoseconds since epoch
+    tokens: u32,
+    last_update: Instant,
 }
@@
             requests_per_second,
             burst_size,
-            state: Arc::new(RateLimiterState {
-                tokens: AtomicU32::new(burst_size),
-                last_update: AtomicU64::new(Self::now_nanos()),
+            state: Arc::new(Mutex::new(RateLimiterState {
+                tokens: burst_size,
+                last_update: Instant::now(),
-            }),
+            })),
         }
     }
@@
     pub fn check(&self) -> bool {
-        self.refill_tokens();
-
-        // Try to consume one token
-        let mut current = self.state.tokens.load(Ordering::Relaxed);
-        loop {
-            if current == 0 {
-                return false;
-            }
-            let new = current - 1;
-            match self.state.tokens.compare_exchange(
-                current,
-                new,
-                Ordering::Relaxed,
-                Ordering::Relaxed,
-            ) {
-                Ok(_) => return true,
-                Err(actual) => current = actual,
-            }
+        let mut state = self.state.lock().unwrap();
+        self.refill_tokens_locked(&mut state);
+        if state.tokens == 0 {
+            return false;
         }
+        state.tokens -= 1;
+        true
     }

Hold the same lock while refilling and consuming, and do the same in remaining_capacity, so one request cannot be refunded by a concurrent refill.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/arkflow-plugin/src/rate_limiter.rs` around lines 58 - 109,
refill_tokens currently reads tokens, computes new_tokens and unconditionally
stores it, which can overwrite concurrent consumes from check() and permit
bursts; change refill_tokens to perform an atomic update (e.g.,
AtomicU32::fetch_update or a compare_exchange loop) on state.tokens so it reads
the current token value and only writes min(current + tokens_to_add, burst_size)
if the token value hasn't changed (retry on Err), then update state.last_update
only after successfully updating tokens (use compare_exchange on
state.last_update or store now only when last_update < now); apply the same
atomic-safe approach in remaining_capacity so both refill_tokens, check, and
remaining_capacity coordinate via atomic compare_exchange/fetch_update on
state.tokens and last_update rather than blind loads/stores.

@chenquan chenquan enabled auto-merge (squash) April 11, 2026 17:44
@chenquan chenquan disabled auto-merge April 11, 2026 17:45
@chenquan chenquan merged commit 63455e8 into main Apr 11, 2026
3 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant