diff --git a/apps/ccam/ccam.cpp b/apps/ccam/ccam.cpp index 01234b2..3ff011f 100644 --- a/apps/ccam/ccam.cpp +++ b/apps/ccam/ccam.cpp @@ -1,569 +1,563 @@ //===-- apps/ccam/ccam.cpp --------------------------------------*- C++ -*-===// // // The RoSA Framework -- Application CCAM // // Distributed under the terms and conditions of the Boost Software /// License 1.0. // See accompanying file LICENSE. // // If you did not receive a copy of the license file, see // http://www.boost.org/LICENSE_1_0.txt. // //===----------------------------------------------------------------------===// /// /// \file apps/ccam/ccam.cpp /// /// \author Maximilian Goetzinger (maximilian.goetzinger@tuwien.ac.at) /// \author Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at) /// /// \date 2019 /// /// \brief The application CCAM implements the case study from the paper: /// M. Goetzinger, N. TaheriNejad, H. A. Kholerdi, A. Jantsch, E. Willegger, /// T. Glatzl, A.M. Rahmani, T.Sauter, P. Liljeberg: Model - Free Condition /// Monitoring with Confidence /// /// \todo Clean up source files of this app: add standard RoSA header comment /// for own files and do something with 3rd party files... //===----------------------------------------------------------------------===// #include "rosa/agent/Abstraction.hpp" #include "rosa/agent/Confidence.hpp" #include "rosa/agent/FunctionAbstractions.hpp" #include #include "rosa/config/version.h" #include "rosa/agent/SignalStateDetector.hpp" #include "rosa/agent/SystemStateDetector.hpp" #include "rosa/app/Application.hpp" #include "rosa/support/csv/CSVReader.hpp" #include "rosa/support/csv/CSVWriter.hpp" #include "rosa/support/mqtt/MQTTReader.hpp" #include "rosa/app/AppTuple.hpp" #include #include #include #include #include "configuration.h" #include "statehandlerutils.h" using namespace rosa; using namespace rosa::agent; using namespace rosa::app; using namespace rosa::terminal; using namespace rosa::mqtt; const std::string AppName = "CCAM"; int main(int argc, char **argv) { LOG_INFO_STREAM << '\n' << library_string() << " -- " << Color::Red << AppName << "app" << Color::Default << '\n'; // // Read the filepath of the config file of the observed system. The filepath // is in the first argument passed to the application. Fuzzy functions etc. // are described in this file. // if (argc < 2) { LOG_ERROR("Specify config File!\nUsage:\n\tccam config.json"); return 1; } std::string ConfigPath = argv[1]; // // Load config file and read in all parameters. Fuzzy functions etc. are // described in this file. // if (!readConfigFile(ConfigPath)) { LOG_ERROR_STREAM << "Could not read config from \"" << ConfigPath << "\"\n"; return 2; } // // Create a CCAM context. // LOG_INFO("Creating Context"); std::unique_ptr AppCCAM = Application::create(AppName); // // Create following function which shall give information if the time gap // between changed input(s) and changed output(s) shows already a malfunction // of the system. // // ____________ // / // / // __________/ // std::shared_ptr> BrokenDelayFunction( new PartialFunction( {{{0, AppConfig.BrokenCounter}, std::make_shared>( 0, 0.f, AppConfig.BrokenCounter, 1.f)}, {{AppConfig.BrokenCounter, std::numeric_limits::max()}, std::make_shared>(1.f, 0.f)}}, 0.f)); // // Create following function which shall give information if the time gap // between changed input(s) and changed output(s) still shows a // well-functioning system. // // ____________ // \ // \ // \__________ // std::shared_ptr> OkDelayFunction( new PartialFunction( {{{0, AppConfig.BrokenCounter}, std::make_shared>( 0, 1.f, AppConfig.BrokenCounter, 0.f)}, {{AppConfig.BrokenCounter, std::numeric_limits::max()}, std::make_shared>(0.f, 0.f)}}, 1.f)); // // Create a AppAgent with SystemStateDetector functionality. // LOG_INFO("Create SystemStateDetector agent."); AgentHandle SystemStateDetectorAgent = createSystemStateDetectorAgent( AppCCAM, "SystemStateDetector", AppConfig.SignalConfigurations.size(), BrokenDelayFunction, OkDelayFunction); // // Set policy of SystemStateDetectorAgent that it wait for all // SignalStateDetectorAgents // std::set pos; for (size_t i = 0; i < AppConfig.SignalConfigurations.size(); ++i) pos.insert(pos.end(), i); AppCCAM->setExecutionPolicy(SystemStateDetectorAgent, AppExecutionPolicy::awaitAll(pos)); // // Create Vectors for all sensors, all signal related fuzzy functions, all // signal state detectors, all signal state agents, and all input data files. // LOG_INFO("Creating sensors, SignalStateDetector functionalities and their " "Abstractions."); std::vector Sensors; std::vector>> SampleMatchesFunctions; std::vector>> SampleMismatchesFunctions; std::vector>> SignalIsStableFunctions; std::vector>> SignalIsDriftingFunctions; std::vector>> NumOfSamplesMatchFunctions; std::vector>> NumOfSamplesMismatchFunctions; std::vector>> SampleValidFunctions; std::vector>> SampleInvalidFunctions; std::vector>> NumOfSamplesValidFunctions; std::vector>> NumOfSamplesInvalidFunctions; std::vector>> SignalStateDetectors; std::vector SignalStateDetectorAgents; std::vector DataFiles; // // Go through all signal state configurations (number of signals), and create // functionalities for SignalStateDetector. // for (auto SignalConfiguration : AppConfig.SignalConfigurations) { // // Create application sensors. // Sensors.emplace_back( AppCCAM->createSensor(SignalConfiguration.Name + "_Sensor")); // // Create following function(s) which shall give information whether one // sample matches another one (based on the relative distance between them). // // ____________ // / \ // / \ // __________/ \__________ // // SampleMatchesFunctions.emplace_back(new PartialFunction( { {{-SignalConfiguration.OuterBound, -SignalConfiguration.InnerBound}, std::make_shared>( -SignalConfiguration.OuterBound, 0.f, -SignalConfiguration.InnerBound, 1.f)}, {{-SignalConfiguration.InnerBound, SignalConfiguration.InnerBound}, std::make_shared>(1.f, 0.f)}, {{SignalConfiguration.InnerBound, SignalConfiguration.OuterBound}, std::make_shared>( SignalConfiguration.InnerBound, 1.f, SignalConfiguration.OuterBound, 0.f)}, }, 0)); // // Create following function(s) which shall give information whether one // sample mismatches another one (based on the relative distance between // them). // // ____________ ____________ // \ / // \ / // \__________/ // // SampleMismatchesFunctions.emplace_back(new PartialFunction( { {{-SignalConfiguration.OuterBound, -SignalConfiguration.InnerBound}, std::make_shared>( -SignalConfiguration.OuterBound, 1.f, -SignalConfiguration.InnerBound, 0.f)}, {{-SignalConfiguration.InnerBound, SignalConfiguration.InnerBound}, std::make_shared>(0.f, 0.f)}, {{SignalConfiguration.InnerBound, SignalConfiguration.OuterBound}, std::make_shared>( SignalConfiguration.InnerBound, 0.f, SignalConfiguration.OuterBound, 1.f)}, }, 1)); // // Create following function(s) which shall give information whether a // signal is stable. // // ____________ // / \ // / \ // __________/ \__________ // // SignalIsStableFunctions.emplace_back(new PartialFunction( { {{-SignalConfiguration.OuterBoundDrift, -SignalConfiguration.InnerBoundDrift}, std::make_shared>( -SignalConfiguration.OuterBoundDrift, 0.f, -SignalConfiguration.InnerBoundDrift, 1.f)}, {{-SignalConfiguration.InnerBoundDrift, SignalConfiguration.InnerBoundDrift}, std::make_shared>(1.f, 0.f)}, {{SignalConfiguration.InnerBoundDrift, SignalConfiguration.OuterBoundDrift}, std::make_shared>( SignalConfiguration.InnerBoundDrift, 1.f, SignalConfiguration.OuterBoundDrift, 0.f)}, }, 0)); // // Create following function(s) which shall give information whether a // signal is drifting. // // ____________ ____________ // \ / // \ / // \__________/ // // SignalIsDriftingFunctions.emplace_back(new PartialFunction( { {{-SignalConfiguration.OuterBoundDrift, -SignalConfiguration.InnerBoundDrift}, std::make_shared>( -SignalConfiguration.OuterBoundDrift, 1.f, -SignalConfiguration.InnerBoundDrift, 0.f)}, {{-SignalConfiguration.InnerBoundDrift, SignalConfiguration.InnerBoundDrift}, std::make_shared>(0.f, 0.f)}, {{SignalConfiguration.InnerBoundDrift, SignalConfiguration.OuterBoundDrift}, std::make_shared>( SignalConfiguration.InnerBoundDrift, 0.f, SignalConfiguration.OuterBoundDrift, 1.f)}, }, 1)); // // Create following function(s) which shall give information how many // history samples match another sample. // // ____________ // / // / // __________/ // NumOfSamplesMatchFunctions.emplace_back(new StepFunction( 1.0f / SignalConfiguration.SampleHistorySize, StepDirection::StepUp)); // // Create following function(s) which shall give information how many // history samples mismatch another sample. // // ____________ // \ // \ // \__________ // NumOfSamplesMismatchFunctions.emplace_back(new StepFunction( 1.0f / SignalConfiguration.SampleHistorySize, StepDirection::StepDown)); // // Create following function(s) which shall give information how good all // samples in a state match each other. // // ____________ // / \ // / \ // __________/ \__________ // // SampleValidFunctions.emplace_back(new PartialFunction( { {{-SignalConfiguration.OuterBound, -SignalConfiguration.InnerBound}, std::make_shared>( -SignalConfiguration.OuterBound, 0.f, -SignalConfiguration.InnerBound, 1.f)}, {{-SignalConfiguration.InnerBound, SignalConfiguration.InnerBound}, std::make_shared>(1.f, 0.f)}, {{SignalConfiguration.InnerBound, SignalConfiguration.OuterBound}, std::make_shared>( SignalConfiguration.InnerBound, 1.f, SignalConfiguration.OuterBound, 0.f)}, }, 0)); // // Create following function(s) which shall give information how good all // samples in a state mismatch each other. // // ____________ ____________ // \ / // \ / // \__________/ // // SampleInvalidFunctions.emplace_back(new PartialFunction( { {{-SignalConfiguration.OuterBound, -SignalConfiguration.InnerBound}, std::make_shared>( -SignalConfiguration.OuterBound, 1.f, -SignalConfiguration.InnerBound, 0.f)}, {{-SignalConfiguration.InnerBound, SignalConfiguration.InnerBound}, std::make_shared>(0.f, 0.f)}, {{SignalConfiguration.InnerBound, SignalConfiguration.OuterBound}, std::make_shared>( SignalConfiguration.InnerBound, 0.f, SignalConfiguration.OuterBound, 1.f)}, }, 1)); // // Create following function(s) which shall give information how many // history samples match each other. // // ____________ // / // / // __________/ // NumOfSamplesValidFunctions.emplace_back(new StepFunction( 1.0f / SignalConfiguration.SampleHistorySize, StepDirection::StepUp)); // // Create following function(s) which shall give information how many // history samples mismatch each other. // // ____________ // \ // \ // \__________ // NumOfSamplesInvalidFunctions.emplace_back(new StepFunction( 1.0f / SignalConfiguration.SampleHistorySize, StepDirection::StepDown)); // // Create SignalStateDetector functionality // SignalStateDetectors.emplace_back( new SignalStateDetector( SignalConfiguration.Output ? SignalProperties::OUTPUT : SignalProperties::INPUT, std::numeric_limits::max(), SampleMatchesFunctions.back(), SampleMismatchesFunctions.back(), NumOfSamplesMatchFunctions.back(), NumOfSamplesMismatchFunctions.back(), SampleValidFunctions.back(), SampleInvalidFunctions.back(), NumOfSamplesValidFunctions.back(), NumOfSamplesInvalidFunctions.back(), SignalIsDriftingFunctions.back(), SignalIsStableFunctions.back(), SignalConfiguration.SampleHistorySize, SignalConfiguration.DABSize, SignalConfiguration.DABHistorySize)); // // Create low-level application agents // SignalStateDetectorAgents.push_back(createSignalStateDetectorAgent( AppCCAM, SignalConfiguration.Name, SignalStateDetectors.back())); AppCCAM->setExecutionPolicy( SignalStateDetectorAgents.back(), AppExecutionPolicy::decimation(AppConfig.DownsamplingRate)); // // Connect sensors to low-level agents. // LOG_INFO("Connect sensors to their corresponding low-level agents."); AppCCAM->connectSensor(SignalStateDetectorAgents.back(), 0, Sensors.back(), SignalConfiguration.Name + "_Sensor ->" + SignalConfiguration.Name + "_SignalStateDetector_Agent-Channel"); AppCCAM->connectAgents( SystemStateDetectorAgent, SignalStateDetectors.size() - 1, SignalStateDetectorAgents.back(), SignalConfiguration.Name + "_SignalStateDetector_Agent->SystemStateDetector_Agent_Channel"); } // // For simulation output, create a logger agent writing the output of the // high-level agent into a CSV file. // LOG_INFO("Create a logger agent."); // Create CSV writer. std::ofstream OutputCSV(AppConfig.OutputFilePath); for (auto SignalConfiguration : AppConfig.SignalConfigurations) { OutputCSV << SignalConfiguration.Name + ","; } OutputCSV << "StateID,"; OutputCSV << "Confidence State Valid,"; OutputCSV << "Confidence State Invalid,"; OutputCSV << "Confidence Inputs Matching,"; OutputCSV << "Confidence Outputs Matching,"; OutputCSV << "Confidence Inputs Mismatching,"; OutputCSV << "Confidence Outputs Mismatching,"; OutputCSV << "State Condition,"; OutputCSV << "Confidence System Functioning,"; OutputCSV << "Confidence System Malfunctioning,"; OutputCSV << "Overall Confidence,"; OutputCSV << "\n"; // The agent writes each new input value into a CSV file and produces // nothing. using Input = std::pair; using Result = Optional>; using Handler = std::function; std::string Name = "Logger Agent"; AgentHandle LoggerAgent = AppCCAM->createAgent( "Logger Agent", Handler([&OutputCSV](Input I) -> Result { const SystemStateTuple &T = I.first; OutputCSV << std::get<0>( static_cast &>(T)) << std::endl; return Result(); })); // // Connect the high-level agent to the logger agent. // LOG_INFO("Connect the high-level agent to the logger agent."); AppCCAM->connectAgents(LoggerAgent, 0, SystemStateDetectorAgent, "SystemStateDetector Channel"); // // Only log if the SystemStateDetector actually ran // AppCCAM->setExecutionPolicy(LoggerAgent, AppExecutionPolicy::awaitAll({0})); // // Do simulation. // LOG_INFO("Setting up and performing simulation."); // // Initialize application for simulation. // AppCCAM->initializeSimulation(); // // Open CSV files and register them for their corresponding sensors. // // Make sure DataFiles will not change capacity while adding elements to it. // Changing capacity moves elements away, which invalidates references // captured by CSVIterator. DataFiles.reserve(AppConfig.SignalConfigurations.size()); uint32_t i = 0; for (auto SignalConfiguration : AppConfig.SignalConfigurations) { switch (SignalConfiguration.DataInterfaceType) { case DataInterfaceTypes::CSV: DataFiles.emplace_back(SignalConfiguration.InputPath); if (!DataFiles.at(i)) { LOG_ERROR_STREAM << "Cannot open Input File \"" << SignalConfiguration.InputPath << "\" for Signal \"" << SignalConfiguration.Name << "\"" << std::endl; return 3; } AppCCAM->registerSensorValues(Sensors.at(i), csv::CSVIterator(DataFiles.at(i)), csv::CSVIterator()); std::cout << "Sensor " << SignalConfiguration.Name << " is fed by csv file " << SignalConfiguration.InputPath << std::endl; break; case DataInterfaceTypes::MQTT: { // Cannot tolerate the first default-constructed dummy value of the // iterator, need to wait for the first incoming message here. auto it = MQTTIterator(SignalConfiguration.MQTTTopic); - auto &out = LOG_DEBUG_STREAM; - out << "Waiting for the first message "; - while (!it.nextReady()) { - out << '+'; - } - ++it; // Read in the first value - out << '.' << std::endl; + ++it; // Read in the first value blocking AppCCAM->registerSensorValues(Sensors.at(i), std::move(it), MQTTIterator()); std::cout << "Sensor " << SignalConfiguration.Name << " is fed by MQTT topic " << SignalConfiguration.MQTTTopic << std::endl; break; } default: LOG_ERROR_STREAM << "No data source for " << SignalConfiguration.Name << std::endl; break; } i++; std::cout << "blub1" << std::endl; } // // Start simulation. // AppCCAM->simulate(AppConfig.NumberOfSimulationCycles); return 0; } diff --git a/include/rosa/support/concurrent_queue.hpp b/include/rosa/support/concurrent_queue.hpp new file mode 100644 index 0000000..335e1c8 --- /dev/null +++ b/include/rosa/support/concurrent_queue.hpp @@ -0,0 +1,102 @@ +//===-- rosa/support/concurrent_queue.hpp -----------------------*- C++ -*-===// +// +// The RoSA Framework +// +// Distributed under the terms and conditions of the Boost Software License 1.0. +// See accompanying file LICENSE. +// +// If you did not receive a copy of the license file, see +// http://www.boost.org/LICENSE_1_0.txt. +// +//===----------------------------------------------------------------------===// +/// +/// \file rosa/support/concurrent_queue.hpp +/// +/// \author David Juhasz (david.juhasz@tuwien.ac.at) +/// +/// \date 2020 +/// +/// \brief Thread-safe queue. +/// +//===----------------------------------------------------------------------===// + +#ifndef ROSA_SUPPORT_CONCURRENT_QUEUE_HPP +#define ROSA_SUPPORT_CONCURRENT_QUEUE_HPP + +#include +#include +#include + +namespace rosa { + +/// A thread-safe queue implementation for producer-consumer scenarios. +/// +/// \tparam T element type +template class concurrent_queue { +public: + /// Enumeration type with values indicating outcome of popping from the queue. + /// + /// \see rosa::concurrent_queue::pop() + enum pop_result { + queue_empty, ///< Could not pop a value because the queue is empty + element_popped ///< Popped a value from the queue + }; + + /// Pops an element from the queue. + /// + /// \param Elem [out] variable to store the popped element + /// \param Blocking whether to block until an element is available + /// + /// \note If \p Blocking is true, the function blocks until an element is + /// available in the queue and returns only after having an element popped + /// into \p Elem. Otherwise, the function returns with \c queue_empty + /// immediately if there is no available element in the queue. + /// + /// \return whether an element was popped into \p Elem + pop_result pop(T &Elem, const bool Blocking = true) { + std::unique_lock Lock(Mutex); + if (Queue.empty()) { + if (!Blocking) { + return queue_empty; + } + Cond.wait(Lock, [&](void) { return !Queue.empty(); }); + } + Elem = Queue.front(); + Queue.pop(); + return element_popped; + } + + /// Puts an element into the queue. + /// + /// \param Elem element to put into the queue + void push(const T &Elem) { + std::unique_lock Lock(Mutex); + Queue.push(Elem); + Lock.unlock(); + Cond.notify_one(); + } + + /// Puts an element into the queue. + /// + /// \param Elem element to put into the queue + void push(T &&Elem) { + std::unique_lock Lock(Mutex); + Queue.push(std::move(Elem)); + Lock.unlock(); + Cond.notify_one(); + } + +private: + /// Queue holding elements. + std::queue Queue; + + /// Mutex for providing mutual exclusion of accesses. + std::mutex Mutex; + + /// Used by consumers to lock until data is available. + std::condition_variable Cond; +}; + +} // End namespace rosa + +#endif // ROSA_SUPPORT_CONCURRENT_QUEUE_HPP diff --git a/include/rosa/support/mqtt/MQTTReader.hpp b/include/rosa/support/mqtt/MQTTReader.hpp index 246917b..b2ab80d 100644 --- a/include/rosa/support/mqtt/MQTTReader.hpp +++ b/include/rosa/support/mqtt/MQTTReader.hpp @@ -1,324 +1,307 @@ //===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++ //-*-===// // // The RoSA Framework // // Distributed under the terms and conditions of the Boost Software License 1.0. // See accompanying file LICENSE. // // If you did not receive a copy of the license file, see // http://www.boost.org/LICENSE_1_0.txt. // //===----------------------------------------------------------------------===// /// /// \file rosa/support/MQTT/MQTTReader.hpp /// /// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at), /// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at), /// David Juhasz (david.juhasz@tuwien.ac.at) /// /// \date 2020 /// /// \brief Facitilities to read from MQTT brokers. /// //===----------------------------------------------------------------------===// #ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP #define ROSA_SUPPORT_MQTT_MQTTREADER_HPP -#include "rosa/support/diagnostics.h" +#include "rosa/support/concurrent_queue.hpp" #include "rosa/support/debug.hpp" +#include "rosa/support/diagnostics.h" #include "rosa/support/log.h" // NOTE: MSVC warns about some things in the following header. ROSA_DISABLE_WARNING_PUSH ROSA_DISABLE_WARNING_MSVC_C4100 ROSA_DISABLE_WARNING_MSVC_C4201 #include "mqtt/async_client.h" ROSA_DISABLE_WARNING_POP #include -#include #include namespace rosa { namespace mqtt { // @TODO this is a copy of CSVReader.hpp. Move this functionalities to common // file /// Anonymous namespace providing implementation details for /// \c rosa::mqtt::MQTTCallback, consider it private. namespace { /// Provides facility for parsing one value from a string. /// /// \tparam T type of value to parse /// \tparam IsSignedInt if \p T is a signed integral type, always use default /// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use /// default /// \tparam IsFloat if \p T is a floating-point type, always use default /// \tparam IsString if \p T is \c std::string, always use default /// /// \note Specializations of this struct are provided for arithmentic types /// and \c std::string. template ::value && std::is_signed::value), bool IsUnsignedInt = (std::is_integral::value && std::is_unsigned::value), bool IsFloat = std::is_floating_point::value, bool IsString = std::is_same::value> struct ValueParser { /// /// /// \param Cell the \c std::string to parse /// /// \return the parsed value /// /// \note The function silently fails if cannot parse \p Cell for type \p T. static T parse(const std::string &Cell) noexcept; }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_signed::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoll(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_unsigned::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoull(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_floating_point::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stold(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_same::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return Cell; } }; } // End namespace /** * Local callback & listener class for use with the client connection. * This implementation is to receive messages but the ::mqtt::callback interface * allows further actions to be defined. */ template class MQTTCallback : public virtual ::mqtt::callback { /** Callback for when a message arrives. * @param Msg Pointer for the MQTT message **/ void message_arrived(::mqtt::const_message_ptr Msg) override { LOG_DEBUG_STREAM << "New value" << std::endl; std::string Message = Msg->to_string(); const auto Value = ValueParser::parse(Message); LOG_DEBUG_STREAM << "Got value " << Value << std::endl; Buffer->push(Value); } /** * This method is called when the connection to the server is lost. * @param Cause Why connection got lost */ void connection_lost(const std::string &Cause) override { LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl; } private: - const std::shared_ptr> Buffer; + const std::shared_ptr> Buffer; public: /* Constructor */ - MQTTCallback(const std::shared_ptr> Buffer) noexcept + MQTTCallback(const std::shared_ptr> Buffer) noexcept : Buffer(Buffer) {} }; /// Provides `InputIterator` features for iterating over messages published in /// an MQTT topic. /// -/// \todo Make \c rosa::mqtt::MQTTITerator::Buffer thread-safe. /// \todo Make \c rosa::mqtt::MQTTIterator be able to recover connection in case /// of any error. /// /// \tparam T type of values stored in MQTT messages /// /// /// \note The referred value of the iterator is initialized to a default /// initialized instance of \p T upon creation. The first received message may /// be read in by incrementing the iterator first. Also \see \c /// rosa::mqtt::MQTTIterator /// /// \note The iterator expects each MQTT message to match \p T /// /// \note The implementation relies on \c rosa::mqtt::MQTTCallback, which in /// turn relies on \c rosa::mqtt::ValueParser, which is implemented only for /// `arithmetic` types -- signed and unsigned integral types and floating-point /// types -- and for \c std::string. Those are the valid values for \p T template class MQTTIterator { public: /// \defgroup MQTTIteratorTypedefs Typedefs of rosa::mqtt::MQTTIterator /// /// Standard `typedef`s for iterators. /// ///@{ typedef std::input_iterator_tag iterator_category; ///< Category of the iterator. typedef T value_type; ///< Type of values iterated over. typedef std::size_t difference_type; ///< Type to identify distance. typedef T *pointer; ///< Pointer to the type iterated over. typedef T &reference; ///< Reference to the type iterated over. ///@} /// Creates a new instance. /// /// \param MQTTTopic MQTT topic to subscribe to /// \param Blocking Whether to block if next value is not arrived when /// incrementing /// \param ServerHost Hostname of MQTT broker to connect to /// \param ServerPort Port number of MQTT broker to connect to /// /// \note If \p Blocking, incrementing the iterator blocks in case the next /// value has not arrived yet. If not \p Blocking, incrementing the iterator - /// has no effect when the next value is not arrived yet; \c nextReady() can - /// be used to check whether incrementing can move to the next value. + /// has no effect when the next value is not arrived yet. MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true, const std::string ServerHost = "localhost", const uint16_t ServerPort = 1883) noexcept : Empty(false), Blocking(Blocking), ServerHost(ServerHost), ServerPort(ServerPort), MQTTTopic(MQTTTopic), Current(), - Buffer(std::make_shared>()), + Buffer(std::make_shared>()), Callback(std::make_shared>(Buffer)) { // Connect and register callback. std::stringstream ss; ss << "tcp://" << ServerHost << ":" << ServerPort; const std::string ServerURI = ss.str(); LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl; Client = std::make_shared<::mqtt::async_client>(ServerURI, ""); // @note async_client is destructed when the shared pointer is destructed. LOG_INFO_STREAM << "Connecting to server" << std::endl; Client->connect()->wait(); LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic << "' for a short while..." << std::endl; Client->set_callback(*Callback); Client->subscribe(this->MQTTTopic, {}); } /// Creates an empty new instance. MQTTIterator(void) noexcept : Empty(true), Blocking(false), ServerHost(""), ServerPort(0), - MQTTTopic(""), Current(), Buffer(std::make_shared>()), - Callback(nullptr) {} - - /// Tells whether incrementing \p this can move to the next value. - /// - /// \return if the next value has already been received - inline bool nextReady(void) const noexcept { return !Buffer->empty(); } + MQTTTopic(""), Current(), + Buffer(std::make_shared>()), Callback(nullptr) {} /// Pre-increment operator. /// - /// The implementation reads the next value. If If the end of the input stream is - /// reached, the operator becomes empty and has no further effect. + /// The implementation reads the next value. The operation blocks if \c + /// Blocking and the next value has not arrived yet. If not \c Blocking and no + /// new value is ready yet, the oeprator has no effect. /// /// \return \p this object after incrementing it. MQTTIterator &operator++() { - // Wait if Blocking and next value is not ready yet. - auto &out = LOG_DEBUG_STREAM; - out << "Getting next value "; - while (Blocking && !nextReady()) { - out << '+'; - } - out << '.' << std::endl; - - // Next value is ready, or not Blocking and so we leave the last one - if (nextReady()) { - LOG_DEBUG_STREAM << "Moving MQTTIterator to next value." << std::endl; - Current = Buffer->front(); - Buffer->pop(); + LOG_DEBUG_STREAM << "Getting next value..." << std::endl; + if (Buffer->pop(Current, Blocking) == concurrent_queue::element_popped) { + LOG_DEBUG_STREAM << "Got next value." << std::endl; } else { - LOG_DEBUG_STREAM << "Trying to move MQTTIterator without next value." - << std::endl; + LOG_DEBUG_STREAM << "Next value is not ready yet." << std::endl; } return *this; } /// Post-increment operator. /// /// The implementation uses the pre-increment operator and returns a copy of /// the original state of \p this object. /// /// \return \p this object before incrementing it. MQTTIterator operator++(int) { MQTTIterator Tmp(*this); ++(*this); return Tmp; } /// Returns a constant reference to the current entry. /// /// \note Should not dereference the iterator when it is empty. /// /// \return constant reference to the current entry. const T &operator*(void)const noexcept { return Current; } /// Returns a constant pointer to the current entry. /// /// \note Should not dereference the iterator when it is empty. /// /// \return constant pointer to the current entry. const T *operator->(void)const noexcept { return &Current; } /// Tells if \p this object is equal to another one. /// /// Two \c rosa::mqtt::MQTTIterator instances are equal if and only if they /// are the same or both are empty. /// /// \param RHS other object to compare to /// /// \return whether \p this object is equal with \p RHS bool operator==(const MQTTIterator &RHS) const noexcept { return this == &RHS || (this->Empty && RHS.Empty); } /// Tells if \p this object is not equal to another one. /// /// \see rosa::mqtt::MQTTIterator::operator== /// /// \param RHS other object to compare to /// /// \return whether \p this object is not equal with \p RHS. bool operator!=(const MQTTIterator &RHS) const noexcept { return !((*this) == RHS); } private: const bool Empty; const bool Blocking; const std::string ServerHost; const uint16_t ServerPort; const std::string MQTTTopic; T Current; - const std::shared_ptr> Buffer; + const std::shared_ptr> Buffer; const std::shared_ptr> Callback; std::shared_ptr<::mqtt::async_client> Client; }; } // End namespace mqtt } // End namespace rosa #endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP diff --git a/lib/support/CMakeLists.txt b/lib/support/CMakeLists.txt index 076bc5c..cb978b5 100644 --- a/lib/support/CMakeLists.txt +++ b/lib/support/CMakeLists.txt @@ -1,53 +1,55 @@ set(LIB_INCLUDE_DIR ${ROSA_MAIN_INCLUDE_DIR}/rosa/support) add_library(ROSASupport ${LIB_INCLUDE_DIR}/debug.hpp debug.cpp ${LIB_INCLUDE_DIR}/terminal_colors.h terminal_colors.cpp ${LIB_INCLUDE_DIR}/log.h log.cpp ${LIB_INCLUDE_DIR}/math.hpp math.cpp ${LIB_INCLUDE_DIR}/type_helper.hpp type_helper.cpp ${LIB_INCLUDE_DIR}/types.hpp types.cpp ${LIB_INCLUDE_DIR}/atom.hpp atom.cpp ${LIB_INCLUDE_DIR}/type_pair.hpp type_pair.cpp ${LIB_INCLUDE_DIR}/type_list.hpp type_list.cpp ${LIB_INCLUDE_DIR}/squashed_int.hpp squashed_int.cpp ${LIB_INCLUDE_DIR}/type_numbers.hpp type_numbers.cpp ${LIB_INCLUDE_DIR}/type_token.hpp type_token.cpp ${LIB_INCLUDE_DIR}/tokenized_storages.hpp tokenized_storages.cpp ${LIB_INCLUDE_DIR}/sequence.hpp sequence.cpp + ${LIB_INCLUDE_DIR}/concurrent_queue.hpp + concurrent_queue.cpp ${LIB_INCLUDE_DIR}/csv/namespace.h csv/namespace.cpp ${LIB_INCLUDE_DIR}/csv/CSVReader.hpp csv/CSVReader.cpp ${LIB_INCLUDE_DIR}/csv/CSVWriter.hpp csv/CSVWriter.cpp ${LIB_INCLUDE_DIR}/mqtt/namespace.h mqtt/namespace.cpp ${LIB_INCLUDE_DIR}/mqtt/MQTTReader.hpp mqtt/MQTTReader.cpp ${LIB_INCLUDE_DIR}/iterator/namespace.h iterator/namespace.cpp ${LIB_INCLUDE_DIR}/iterator/split_tuple_iterator.hpp iterator/split_tuple_iterator.cpp ${LIB_INCLUDE_DIR}/writer/namespace.h writer/namespace.cpp ${LIB_INCLUDE_DIR}/writer/split_tuple_writer.hpp writer/split_tuple_writer.cpp ) ROSA_add_library_dependencies(ROSASupport paho-mqttpp3) ROSA_add_library_dependencies(ROSASupport paho-mqttc3::MQTTAsync) diff --git a/lib/support/concurrent_queue.cpp b/lib/support/concurrent_queue.cpp new file mode 100644 index 0000000..5091ed4 --- /dev/null +++ b/lib/support/concurrent_queue.cpp @@ -0,0 +1,25 @@ +//===-- rosa/support/concurrent_queue.hpp -----------------------*- C++ -*-===// +// +// The RoSA Framework +// +// Distributed under the terms and conditions of the Boost Software License 1.0. +// See accompanying file LICENSE. +// +// If you did not receive a copy of the license file, see +// http://www.boost.org/LICENSE_1_0.txt. +// +//===----------------------------------------------------------------------===// +/// +/// \file rosa/support/concurrent_queue.hpp +/// +/// \author David Juhasz (david.juhasz@tuwien.ac.at) +/// +/// \date 2020 +/// +/// \brief Implementation for rosa/support/concurrent_queue.hpp. +/// +/// \note Empty implementation, source file here to have a compile database +/// entry for rosa/support/concurrent_queue.hpp. +//===----------------------------------------------------------------------===// + +#include "rosa/support/concurrent_queue.hpp"