Skip to content

Commit ac4133d

Browse files
committed
mqtt subscriber fb: component status protection
1 parent 2284d04 commit ac4133d

2 files changed

Lines changed: 3 additions & 1 deletion

File tree

modules/mqtt_streaming_module/include/mqtt_streaming_module/mqtt_subscriber_fb_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class MqttSubscriberFbImpl final : public FunctionBlock
8686
std::mutex queueMutex;
8787
std::condition_variable queueCv;
8888
mutable std::recursive_mutex processingMutex;
89+
std::mutex componentStatusMutex;
8990

9091
DAQ_MQTT_STREAM_MODULE_API void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, const mqtt::MqttMessage& msg);
9192

modules/mqtt_streaming_module/src/mqtt_subscriber_fb_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ void MqttSubscriberFbImpl::updateStatuses()
101101
if (!statuses->isUpdated())
102102
return;
103103

104+
std::scoped_lock lock(componentStatusMutex);
105+
104106
if (!jsonConfigErr.ok())
105107
{
106108
setComponentStatusWithMessage(ComponentStatus::Error, jsonConfigErr.buildStatusMessage());
@@ -508,7 +510,6 @@ void MqttSubscriberFbImpl::processMessageImpl(const mqtt::MqttMessage& msg, cons
508510
decoderFb->processMessage(jsonObjStr, epochTime);
509511
}
510512
}
511-
updateStatuses();
512513
}
513514

514515
void MqttSubscriberFbImpl::processingLoop()

0 commit comments

Comments
 (0)