diff --git a/include/rosa/support/mqtt/MQTTReader.hpp b/include/rosa/support/mqtt/MQTTReader.hpp index a52f80d..85b5ca9 100644 --- a/include/rosa/support/mqtt/MQTTReader.hpp +++ b/include/rosa/support/mqtt/MQTTReader.hpp @@ -1,326 +1,352 @@ //===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++ //-*-===// // // The RoSA Framework // // Distributed under the terms and conditions of the Boost Software License 1.0. // See accompanying file LICENSE. // // If you did not receive a copy of the license file, see // http://www.boost.org/LICENSE_1_0.txt. // //===----------------------------------------------------------------------===// /// /// \file rosa/support/MQTT/MQTTReader.hpp /// /// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at), /// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at), /// David Juhasz (david.juhasz@tuwien.ac.at) /// /// \date 2020 /// /// \brief Facitilities to read from MQTT brokers. /// //===----------------------------------------------------------------------===// #ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP #define ROSA_SUPPORT_MQTT_MQTTREADER_HPP #include "rosa/support/concurrent_queue.hpp" #include "rosa/support/debug.hpp" #include "rosa/support/diagnostics.h" #include "rosa/support/log.h" // NOTE: MSVC warns about some things in the following header. ROSA_DISABLE_WARNING_PUSH ROSA_DISABLE_WARNING_MSVC_C4100 ROSA_DISABLE_WARNING_MSVC_C4201 #include "mqtt/async_client.h" ROSA_DISABLE_WARNING_POP #include #include namespace rosa { namespace mqtt { // @TODO this is a copy of CSVReader.hpp. Move this functionalities to common // file /// Anonymous namespace providing implementation details for /// \c rosa::mqtt::MQTTCallback, consider it private. namespace { /// Provides facility for parsing one value from a string. /// /// \tparam T type of value to parse /// \tparam IsSignedInt if \p T is a signed integral type, always use default /// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use /// default /// \tparam IsFloat if \p T is a floating-point type, always use default /// \tparam IsString if \p T is \c std::string, always use default /// /// \note Specializations of this struct are provided for arithmentic types /// and \c std::string. template ::value && std::is_signed::value), bool IsUnsignedInt = (std::is_integral::value && std::is_unsigned::value), bool IsFloat = std::is_floating_point::value, bool IsString = std::is_same::value> struct ValueParser { /// /// /// \param Cell the \c std::string to parse /// /// \return the parsed value /// /// \note The function silently fails if cannot parse \p Cell for type \p T. static T parse(const std::string &Cell) noexcept; }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_signed::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoll(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_unsigned::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoull(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_floating_point::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stold(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_same::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return Cell; } }; } // End namespace /** - * Local callback & listener class for use with the client connection. - * This implementation is to receive messages but the ::mqtt::callback interface - * allows further actions to be defined. + * Helper class managing the connection to the MQTT broker, also serves as + * callback & listener for use with the client connection. */ -template class MQTTCallback : public virtual ::mqtt::callback { +template class MQTTHandler : public virtual ::mqtt::callback { /** Callback for when a message arrives. * @param Msg Pointer for the MQTT message **/ void message_arrived(::mqtt::const_message_ptr Msg) override { std::string Message = Msg->to_string(); const auto Value = ValueParser::parse(Message); Buffer->push(Value); LOG_DEBUG_STREAM << "Got value [" << Msg->get_topic() << "] " << Value << std::endl; } /** * This method is called when the connection to the server is lost. * @param Cause Why connection got lost */ void connection_lost(const std::string &Cause) override { LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl; } +protected: + /** + * This function establishes the connection to the MQTT broker according to + * parameters set in member fields by the constructor. + */ + void establish_connection(void) { + ASSERT(!Client); + + // Connect and register callback. + std::stringstream ss; + ss << "tcp://" << ServerHost << ":" << ServerPort; + const std::string ServerURI = ss.str(); + LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl; + Client = std::make_unique<::mqtt::async_client>(ServerURI, ""); + + LOG_INFO_STREAM << "Connecting to server" << std::endl; + Client->connect()->wait(); + + Client->set_callback(*this); + Client->subscribe(MQTTTopic, {}); + LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic << "'" + << std::endl; + } + private: + const std::string ServerHost; + const uint16_t ServerPort; + const std::string MQTTTopic; const std::shared_ptr> Buffer; + std::unique_ptr<::mqtt::async_client> Client; public: - /* Constructor */ - MQTTCallback(const std::shared_ptr> Buffer) noexcept - : Buffer(Buffer) {} + /// Constructor + /// + /// \param MQTTTopic MQTT topic to subscribe to + /// \param ServerHost Hostname of MQTT broker to connect to + /// \param ServerPort Port number of MQTT broker to connect to + /// \param Buffer Container to put messages into + MQTTHandler(const std::string &MQTTTopic, const std::string &ServerHost, + const uint16_t ServerPort, + const std::shared_ptr> &Buffer) noexcept + : ServerHost(ServerHost), ServerPort(ServerPort), MQTTTopic(MQTTTopic), + Buffer(Buffer), Client(nullptr) { + establish_connection(); + } + + /// Destructor + virtual ~MQTTHandler(void) { + // Can do cleanup here as necessary. + if (Client) { + LOG_INFO_STREAM << "Disconnecting from MQTT broker." << std::endl; + // Would disconnect upon destruction here just to wait for it. + Client->disconnect()->wait(); + } + } }; /// Provides `InputIterator` features for iterating over messages published in /// an MQTT topic. /// /// \todo Make \c rosa::mqtt::MQTTIterator be able to recover connection in case /// of any error. /// /// \tparam T type of values stored in MQTT messages /// /// /// \note The referred value of the iterator is initialized to a default /// initialized instance of \p T upon creation. The first received message is /// read in upon the first dereferencing of the iterator by automatically /// incrementing the iterator before returning the referred value. The /// automatic incrementation upon the first dereferencing happens no matter if /// the iterator has already been incremented explicitly. The automatic /// incrementation may block if the iterator has been created with blocking /// behavior. On the other hand, it may leave the default value of \p T if the /// instance has been created with non-blocking behavior. \see \c Blocking /// for \c rosa:mqtt:MQTTIterator::MQTTIterator(). /// /// \note The iterator expects each MQTT message to match \p T /// /// \note The implementation relies on \c rosa::mqtt::MQTTCallback, which in /// turn relies on \c rosa::mqtt::ValueParser, which is implemented only for /// `arithmetic` types -- signed and unsigned integral types and floating-point /// types -- and for \c std::string. Those are the valid values for \p T template class MQTTIterator { public: /// \defgroup MQTTIteratorTypedefs Typedefs of rosa::mqtt::MQTTIterator /// /// Standard `typedef`s for iterators. /// ///@{ typedef std::input_iterator_tag iterator_category; ///< Category of the iterator. typedef T value_type; ///< Type of values iterated over. typedef std::size_t difference_type; ///< Type to identify distance. typedef T *pointer; ///< Pointer to the type iterated over. typedef T &reference; ///< Reference to the type iterated over. ///@} /// Creates a new instance. /// /// \param MQTTTopic MQTT topic to subscribe to /// \param Blocking Whether to block if next value is not arrived when /// incrementing /// \param ServerHost Hostname of MQTT broker to connect to /// \param ServerPort Port number of MQTT broker to connect to /// /// \note If \p Blocking, incrementing the iterator blocks in case the next /// value has not arrived yet. If not \p Blocking, incrementing the iterator /// has no effect when the next value is not arrived yet. MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true, const std::string ServerHost = "localhost", const uint16_t ServerPort = 1883) noexcept - : Empty(false), Blocking(Blocking), ServerHost(ServerHost), - ServerPort(ServerPort), MQTTTopic(MQTTTopic), + : Empty(false), Blocking(Blocking), FirstValue(std::make_shared()), Current(), Buffer(std::make_shared>()), - Callback(std::make_shared>(Buffer)) { - // Connect and register callback. - std::stringstream ss; - ss << "tcp://" << ServerHost << ":" << ServerPort; - const std::string ServerURI = ss.str(); - LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl; - Client = std::make_shared<::mqtt::async_client>(ServerURI, ""); - // @note async_client is destructed when the shared pointer is destructed. - - LOG_INFO_STREAM << "Connecting to server" << std::endl; - Client->connect()->wait(); - LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic - << "' for a short while..." << std::endl; - - Client->set_callback(*Callback); - Client->subscribe(this->MQTTTopic, {}); - } + Handler(std::make_shared>(MQTTTopic, ServerHost, + ServerPort, Buffer)) {} /// Creates an empty new instance. MQTTIterator(void) noexcept - : Empty(true), Blocking(false), ServerHost(""), ServerPort(0), - MQTTTopic(""), FirstValue(std::make_shared()), - Current(), Buffer(std::make_shared>()), - Callback(nullptr) {} + : Empty(true), Blocking(false), + FirstValue(std::make_shared()), Current(), + Buffer(std::make_shared>()), Handler(nullptr) {} /// Pre-increment operator. /// /// The implementation reads the next value. The operation blocks if \c /// Blocking and the next value has not arrived yet. If not \c Blocking and no /// new value is ready yet, the oeprator has no effect. /// /// \return \p this object after incrementing it. MQTTIterator &operator++() { LOG_DEBUG_STREAM << "Getting next value..." << std::endl; if (Buffer->pop(Current, Blocking) == concurrent_queue::element_popped) { LOG_DEBUG_STREAM << "Got next value." << std::endl; } else { LOG_DEBUG_STREAM << "Next value is not ready yet." << std::endl; } return *this; } /// Post-increment operator. /// /// The implementation uses the pre-increment operator and returns a copy of /// the original state of \p this object. /// /// \return \p this object before incrementing it. MQTTIterator operator++(int) { MQTTIterator Tmp(*this); ++(*this); return Tmp; } /// Returns a constant reference to the current entry. /// /// \note Automatically increments the iterator to read in the first value /// upon the firs dereferencing (i.e, calling either \c operator*() or \c /// operator->() ). /// /// \return constant reference to the current entry. const T &operator*(void) noexcept { std::call_once(*FirstValue, [&](void) { ++(*this); }); return Current; } /// Returns a constant pointer to the current entry. /// /// \note Automatically increments the iterator to read in the first value /// upon the firs dereferencing (i.e, calling either \c operator*() or \c /// operator->() ). /// /// \return constant pointer to the current entry. const T *operator->(void) noexcept { std::call_once(*FirstValue, [&](void) { ++(*this); }); return &Current; } /// Tells if \p this object is equal to another one. /// /// Two \c rosa::mqtt::MQTTIterator instances are equal if and only if they /// are the same or both are empty. /// /// \param RHS other object to compare to /// /// \return whether \p this object is equal with \p RHS bool operator==(const MQTTIterator &RHS) const noexcept { return this == &RHS || (this->Empty && RHS.Empty); } /// Tells if \p this object is not equal to another one. /// /// \see rosa::mqtt::MQTTIterator::operator== /// /// \param RHS other object to compare to /// /// \return whether \p this object is not equal with \p RHS. bool operator!=(const MQTTIterator &RHS) const noexcept { return !((*this) == RHS); } private: const bool Empty; const bool Blocking; - const std::string ServerHost; - const uint16_t ServerPort; - const std::string MQTTTopic; const std::shared_ptr FirstValue; T Current; const std::shared_ptr> Buffer; - const std::shared_ptr> Callback; - std::shared_ptr<::mqtt::async_client> Client; + const std::shared_ptr> Handler; }; } // End namespace mqtt } // End namespace rosa #endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP