diff --git a/include/rosa/support/mqtt/MQTTReader.hpp b/include/rosa/support/mqtt/MQTTReader.hpp index 9af3d8c..97fc08e 100644 --- a/include/rosa/support/mqtt/MQTTReader.hpp +++ b/include/rosa/support/mqtt/MQTTReader.hpp @@ -1,328 +1,316 @@ //===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++ //-*-===// // // The RoSA Framework // // Distributed under the terms and conditions of the Boost Software License 1.0. // See accompanying file LICENSE. // // If you did not receive a copy of the license file, see // http://www.boost.org/LICENSE_1_0.txt. // //===----------------------------------------------------------------------===// /// /// \file rosa/support/MQTT/MQTTReader.hpp /// /// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at), /// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at), /// David Juhasz (david.juhasz@tuwien.ac.at) /// /// \date 2020 /// /// \brief Facitilities to read from MQTT brokers. /// //===----------------------------------------------------------------------===// #ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP #define ROSA_SUPPORT_MQTT_MQTTREADER_HPP #include "mqtt/async_client.h" #include "rosa/support/debug.hpp" #include #include #include namespace rosa { namespace MQTT { // @TODO this is a copy of CSVReader.hpp. Move this functionalities to common // file /// Anonymous namespace providing implementation details for /// \c rosa::MQTT::MQTTCallback, consider it private. namespace { /// Provides facility for parsing one value from a string. /// /// \tparam T type of value to parse /// \tparam IsSignedInt if \p T is a signed integral type, always use default /// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use /// default /// \tparam IsFloat if \p T is a floating-point type, always use default /// \tparam IsString if \p T is \c std::string, always use default /// /// \note Specializations of this struct are provided for arithmentic types /// and \c std::string. template ::value && std::is_signed::value), bool IsUnsignedInt = (std::is_integral::value && std::is_unsigned::value), bool IsFloat = std::is_floating_point::value, bool IsString = std::is_same::value> struct ValueParser { /// /// /// \param Cell the \c std::string to parse /// /// \return the parsed value /// /// \note The function silently fails if cannot parse \p Cell for type \p T. static T parse(const std::string &Cell) noexcept; }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_signed::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoll(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_unsigned::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoull(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_floating_point::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stold(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_same::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return Cell; } }; } // End namespace /** * Local callback & listener class for use with the client connection. * This implementation is to receive messages but the mqtt::callback interface * allows further actions to be defined. */ template class MQTTCallback : public virtual mqtt::callback { /** Callback for when a message arrives. * @param Msg Pointer for the MQTT message **/ void message_arrived(mqtt::const_message_ptr Msg) override { LOG_DEBUG_STREAM << "New value" << std::endl; std::string Message = Msg->to_string(); const auto Value = ValueParser::parse(Message); LOG_DEBUG_STREAM << "Got value " << Value << std::endl; Buffer->push(Value); } /** * This method is called when the connection to the server is lost. * @param Cause Why connection got lost */ void connection_lost(const std::string &Cause) override { LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl; } private: const std::shared_ptr> Buffer; public: /* Constructor */ MQTTCallback(const std::shared_ptr> Buffer) noexcept : Buffer(Buffer) {} }; /// Provides `InputIterator` features for iterating over messages published in /// an MQTT topic. /// /// \todo Make \c rosa::MQTT::MQTTITerator::Buffer thread-safe. /// \todo Make \c rosa::MQTT::MQTTIterator be able to recover connection in case /// of any error. /// /// \tparam T type of values stored in MQTT messages /// /// /// \note The referred value of the iterator is initialized to a default /// initialized instance of \p T upon creation. The first received message may /// be read in by incrementing the iterator first. Also \see \c /// rosa::MQTT::MQTTIterator /// /// \note The iterator expects each MQTT message to match \p T /// /// \note The implementation relies on \c rosa::MQTT::MQTTCallback, which in /// turn relies on \c rosa::MQTT::ValueParser, which is implemented only for /// `arithmetic` types -- signed and unsigned integral types and floating-point /// types -- and for \c std::string. Those are the valid values for \p T template class MQTTIterator { public: /// \defgroup MQTTIteratorTypedefs Typedefs of rosa::MQTT::MQTTIterator /// /// Standard `typedef`s for iterators. /// ///@{ typedef std::input_iterator_tag iterator_category; ///< Category of the iterator. typedef T value_type; ///< Type of values iterated over. typedef std::size_t difference_type; ///< Type to identify distance. typedef T *pointer; ///< Pointer to the type iterated over. typedef T &reference; ///< Reference to the type iterated over. ///@} /// Creates a new instance. /// /// \param MQTTTopic MQTT topic to subscribe to /// \param Blocking Whether to block if next value is not arrived when /// incrementing /// \param ServerHost Hostname of MQTT broker to connect to /// \param ServerPort Port number of MQTT broker to connect to /// /// \note If \p Blocking, incrementing the iterator blocks in case the next /// value has not arrived yet. If not \p Blocking, incrementing the iterator /// has no effect when the next value is not arrived yet; \c nextReady() can /// be used to check whether incrementing can move to the next value. MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true, const std::string ServerHost = "localhost", const uint16_t ServerPort = 1883) noexcept : Empty(false), Blocking(Blocking), ServerHost(ServerHost), - ServerPort(ServerPort), MQTTTopic(MQTTTopic), + ServerPort(ServerPort), MQTTTopic(MQTTTopic), Current(), Buffer(std::make_shared>()), Callback(std::make_shared>(Buffer)) { - // Initialize the Buffer with a default value. - Buffer->push(T()); - // Connect and register callback. std::stringstream ss; ss << "tcp://" << ServerHost << ":" << ServerPort; const std::string ServerURI = ss.str(); LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl; Client = std::make_shared(ServerURI, ""); // @note async_client is destructed when the shared pointer is destructed. LOG_INFO_STREAM << "Connecting to server" << std::endl; Client->connect()->wait(); LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic << "' for a short while..." << std::endl; Client->set_callback(*Callback); Client->subscribe(this->MQTTTopic, {}); } /// Creates an empty new instance. MQTTIterator(void) noexcept : Empty(true), Blocking(false), ServerHost(""), ServerPort(0), - MQTTTopic(""), Buffer(std::make_shared>()), - Callback(nullptr) { - // Put an element into Buffer and will keep it forever. - Buffer->push(T()); - } + MQTTTopic(""), Current(), Buffer(std::make_shared>()), + Callback(nullptr) {} /// Tells whether incrementing \p this can move to the next value. /// /// \return if the next value has already been received - inline bool nextReady(void) const noexcept { return Buffer->size() > 1; } + inline bool nextReady(void) const noexcept { return !Buffer->empty(); } /// Pre-increment operator. /// /// The implementation reads the next value. If If the end of the input stream is /// reached, the operator becomes empty and has no further effect. /// /// \return \p this object after incrementing it. MQTTIterator &operator++() { - ASSERT(!Buffer->empty()); - // Wait if Blocking and next value is not ready yet. auto &out = LOG_DEBUG_STREAM; out << "Getting next value "; while (Blocking && !nextReady()) { out << '+'; } out << '.' << std::endl; // Next value is ready, or not Blocking and so we leave the last one if (nextReady()) { LOG_DEBUG_STREAM << "Moving MQTTIterator to next value." << std::endl; + Current = Buffer->front(); Buffer->pop(); } else { LOG_DEBUG_STREAM << "Trying to move MQTTIterator without next value." << std::endl; } return *this; } /// Post-increment operator. /// /// The implementation uses the pre-increment operator and returns a copy of /// the original state of \p this object. /// /// \return \p this object before incrementing it. MQTTIterator operator++(int) { MQTTIterator Tmp(*this); ++(*this); return Tmp; } /// Returns a constant reference to the current entry. /// /// \note Should not dereference the iterator when it is empty. /// /// \return constant reference to the current entry. - const T &operator*(void)const noexcept { - ASSERT(!Buffer->empty()); - return Buffer->front(); - } + const T &operator*(void)const noexcept { return Current; } /// Returns a constant pointer to the current entry. /// /// \note Should not dereference the iterator when it is empty. /// /// \return constant pointer to the current entry. - const T *operator->(void)const noexcept { - ASSERT(!Buffer->empty()); - return &(Buffer->front()); - } + const T *operator->(void)const noexcept { return &Current; } /// Tells if \p this object is equal to another one. /// /// Two \c rosa::MQTT::MQTTIterator instances are equal if and only if they /// are the same or both are empty. /// /// \param RHS other object to compare to /// /// \return whether \p this object is equal with \p RHS bool operator==(const MQTTIterator &RHS) const noexcept { return this == &RHS || (this->Empty && RHS.Empty); } /// Tells if \p this object is not equal to another one. /// /// \see rosa::MQTT::MQTTIterator::operator== /// /// \param RHS other object to compare to /// /// \return whether \p this object is not equal with \p RHS. bool operator!=(const MQTTIterator &RHS) const noexcept { return !((*this) == RHS); } private: const bool Empty; const bool Blocking; const std::string ServerHost; const uint16_t ServerPort; const std::string MQTTTopic; + T Current; const std::shared_ptr> Buffer; const std::shared_ptr> Callback; std::shared_ptr Client; }; } // End namespace MQTT } // End namespace rosa #endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP