Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ namespace isobus

std::shared_ptr<CANHardwarePlugin> frameHandler; ///< The CAN driver to use for a CAN channel

LockFreeQueue<CANMessageFrame> messagesToBeTransmittedQueue; ///< Transmit message queue for a CAN channel
LockFreeQueue<CANMessageFrame> receivedMessagesQueue; ///< Receive message queue for a CAN channel
Queue<CANMessageFrame> messagesToBeTransmittedQueue; ///< Transmit message queue for a CAN channel
Queue<CANMessageFrame> receivedMessagesQueue; ///< Receive message queue for a CAN channel
};

/// @brief Singleton instance of the CANHardwareInterface class
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ set(TEST_INCLUDE helpers/control_function_helpers.hpp

# Set test source files
set(TEST_SRC
utility_queue_tests.cpp
core_network_management_tests.cpp
identifier_tests.cpp
transport_protocol_tests.cpp
Expand Down
156 changes: 156 additions & 0 deletions test/utility_queue_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <isobus/utility/thread_synchronization.hpp>
#include <random>
#include <thread>
#include <vector>
class QUEUE_TESTS : public ::testing::Test
{
protected:
static constexpr std::size_t QUEUE_SIZE = 500;
Queue<int> queue{ QUEUE_SIZE };
};

TEST_F(QUEUE_TESTS, MultipleProducersMultipleConsumersStressTest)
{
const int NUM_PRODUCERS = 16;
const int NUM_CONSUMERS = 4;
const int ITEMS_PER_PRODUCER = 100000; // Reduced for faster testing
const int TOTAL_ITEMS = NUM_PRODUCERS * ITEMS_PER_PRODUCER;

std::atomic<int> produced_count{ 0 };
std::atomic<int> consumed_count{ 0 };
std::vector<std::atomic<bool>> producer_done(NUM_PRODUCERS);
for (auto &done : producer_done)
done = false;
std::vector<std::thread> producers;

// Producers
for (int p = 0; p < NUM_PRODUCERS; ++p)
{
producers.emplace_back([this, ITEMS_PER_PRODUCER, &produced_count, &producer_done, p]() {
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i)
{
while (!queue.push(i))
{
// Spin if queue is full
std::this_thread::yield();
}
produced_count++;
}
producer_done[p] = true; // Mark this producer as done
});
}

// Multiple Consumers
std::vector<std::thread> consumers;
for (int c = 0; c < NUM_CONSUMERS; ++c)
{
consumers.emplace_back([this, &consumed_count, &producer_done]() {
while (true)
{
bool all_producers_done = true;
for (const auto &done : producer_done)
{
if (!done.load())
{
all_producers_done = false;
break;
}
}

if (queue.pop())
{
consumed_count++;
}
else if (all_producers_done)
{
break; // All producers done and queue is empty
}
else
{
std::this_thread::yield();
}
}
});
}

// Wait for producers
for (auto &t : producers)
{
t.join();
}

// Wait for consumers
for (auto &t : consumers)
{
t.join();
}

// Check if all items were produced
EXPECT_EQ(produced_count.load(), TOTAL_ITEMS);
// Due to race conditions, consumed_count might be less than produced if data is overwritten
std::cout << "Produced: " << produced_count.load() << ", Consumed: " << consumed_count.load() << std::endl;

// This assertion may fail due to race conditions
EXPECT_EQ(consumed_count.load(), TOTAL_ITEMS);
}

// Test all methods of the queue
TEST_F(QUEUE_TESTS, QueueAPIMethodsTest)
{
// Test 1: Basic push/pop operations
EXPECT_TRUE(queue.push(1));
EXPECT_TRUE(queue.push(2));
EXPECT_TRUE(queue.push(3));

EXPECT_EQ(queue.size(), 3);
EXPECT_FALSE(queue.is_empty());

// Test 2: peek method
int peek_value = 0;
EXPECT_TRUE(queue.peek(peek_value));
EXPECT_EQ(peek_value, 1); // Should be first item

// Test 3: pop() without parameter
EXPECT_TRUE(queue.pop());
EXPECT_EQ(queue.size(), 2);

// Test 4: pop(value_type*) method
int popped_value1 = 0;
EXPECT_TRUE(queue.pop(&popped_value1));
EXPECT_EQ(popped_value1, 2);
EXPECT_EQ(queue.size(), 1);

// Test 5: pop(value_type&) method
int popped_value2 = 0;
EXPECT_TRUE(queue.pop(popped_value2));
EXPECT_EQ(popped_value2, 3);
EXPECT_EQ(queue.size(), 0);

// Test 6: Empty queue checks
EXPECT_TRUE(queue.is_empty());
int temp = 0;
EXPECT_FALSE(queue.peek(temp));
EXPECT_FALSE(queue.pop());
EXPECT_FALSE(queue.pop(&temp));
EXPECT_FALSE(queue.pop(temp));

// Test 7: Clear method
EXPECT_TRUE(queue.push(10));
EXPECT_TRUE(queue.push(20));
EXPECT_EQ(queue.size(), 2);

queue.clear();
EXPECT_EQ(queue.size(), 0);
EXPECT_TRUE(queue.is_empty());

// Test 8: Move semantics
int moved_value = 42;
EXPECT_TRUE(queue.push(std::move(moved_value)));
int result = 0;
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(result, 42);
}
154 changes: 148 additions & 6 deletions utility/include/isobus/utility/thread_synchronization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ namespace isobus
/// @brief A template class for a lock free queue.
/// @tparam T The item type for the queue.
template<typename T>
class LockFreeQueue
class SPSCLockFreeQueue
{
public:
/// @brief Constructor for the lock free queue.
explicit LockFreeQueue(std::size_t size) :
explicit SPSCLockFreeQueue(std::size_t size) :
buffer(size), capacity(size)
{
// Validate the size of the queue, if assertion is disabled, set the size to 1.
Expand Down Expand Up @@ -210,12 +210,52 @@ class UnsafeQueue
public:
using value_type = T;

/// @brief Constructor for the queue.
/// @param size For backward compatibility.
explicit UnsafeQueue(std::size_t size)
{
(void)size;
}

UnsafeQueue() = default;

template<typename U, typename = typename std::enable_if<std::is_convertible<U, value_type>::value>::type>
void push(U &&item)
bool push(U &&item)
{
queue.push(std::forward<U>(item));
return true;
}

/// @brief Peek at the next item in the queue.
/// @param item The item to peek at in the queue.
/// @return True if the item was peeked at in the queue, false if the queue is empty.
bool peek(value_type &item)
{
if (queue.empty())
{
return false;
}

item = queue.front();
return true;
}

/// @brief Pop an item from the queue.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop()
{
if (queue.empty())
{
return false;
}

queue.pop();
return true;
}

/// @brief Pop an item from the queue and return it.
/// @param item Pointer to store the popped item.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop(value_type *item)
{
if (queue.empty())
Expand All @@ -227,6 +267,42 @@ class UnsafeQueue
return true;
}

/// @brief Pop an item from the queue and return it.
/// @param item Reference to store the popped item.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop(value_type &item)
{
if (queue.empty())
{
return false;
}
item = std::move(queue.front());
queue.pop();
return true;
}

/// @brief Check if the queue is full.
/// @return Always returns false, since this version of the queue is not limited in size.
bool is_full() const
{
return false;
}

/// @brief Check if the queue is empty.
/// @return True if the queue is empty, false otherwise.
bool is_empty() const
{
return queue.empty();
}

/// @brief Get the number of items in the queue.
/// @return The number of items in the queue.
std::size_t size() const
{
return queue.size();
}

/// @brief Clear the queue.
void clear()
{
queue = {};
Expand All @@ -251,28 +327,94 @@ class SafeQueue : private UnsafeQueue<T>
public:
using value_type = T;

/// @brief Constructor for the safe queue.
/// @param size For backward compatibility.
explicit SafeQueue(std::size_t size) :
Q(size) {}

SafeQueue() = default;

/// @brief Push an item to the queue
/// @tparam U The type of the item to push (must be convertible to value_type).
/// @param item The item to push to the queue.
/// @return True if the item was pushed to the queue.
template<typename U, typename = typename std::enable_if<std::is_convertible<U, value_type>::value>::type>
void push(U &&item)
bool push(U &&item)
{
std::lock_guard<std::mutex> lock(mtx);
return Q::push(std::forward<U>(item));
}

/// @brief Peek at the next item in the queue.
/// @param item The item to peek at in the queue.
/// @return True if the item was peeked at in the queue, false if the queue is empty.
bool peek(value_type &item)
{
std::lock_guard<std::mutex> lock(mtx);
return Q::peek(item);
}

/// @brief Pop an item from the queue.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop()
{
std::lock_guard<std::mutex> lock(mtx);
Q::push(std::forward<U>(item));
return Q::pop();
}

/// @brief Pop an item from the queue and return it.
/// @param item Pointer to store the popped item.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop(value_type *item)
{
std::lock_guard<std::mutex> lock(mtx);
return Q::pop(item);
}

/// @brief Pop an item from the queue and return it.
/// @param item Reference to store the popped item.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop(value_type &item)
{
std::lock_guard<std::mutex> lock(mtx);
return Q::pop(item);
}

/// @brief Check if the queue is full.
/// @return Always returns false, since this version of the queue is not limited in size.(For backward compatibility.)
bool is_full() const
{
std::lock_guard<std::mutex> lock(mtx);
return Q::is_full();
}

/// @brief Check if the queue is empty.
/// @return True if the queue is empty, false otherwise.
bool is_empty() const
{
std::lock_guard<std::mutex> lock(mtx);
return Q::is_empty();
}

/// @brief Get the number of items in the queue.
/// @return The number of items in the queue.
std::size_t size() const
{
std::lock_guard<std::mutex> lock(mtx);
return Q::size();
}

/// @brief Clear the queue.
void clear()
{
std::lock_guard<std::mutex> lock(mtx);
Q::clear();
}

private:
std::mutex mtx;
mutable std::mutex mtx;
};

template<typename T>
using Queue = SafeQueue<T>;
#endif
Expand Down
Loading