diff --git a/include/rosa/support/mqtt/MQTTReader.hpp b/include/rosa/support/mqtt/MQTTReader.hpp index 9b5a21f..5c19d81 100644 --- a/include/rosa/support/mqtt/MQTTReader.hpp +++ b/include/rosa/support/mqtt/MQTTReader.hpp @@ -1,272 +1,295 @@ //===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++ //-*-===// // // The RoSA Framework // // Distributed under the terms and conditions of the Boost Software License 1.0. // See accompanying file LICENSE. // // If you did not receive a copy of the license file, see // http://www.boost.org/LICENSE_1_0.txt. // //===----------------------------------------------------------------------===// /// /// \file rosa/support/MQTT/MQTTReader.hpp /// /// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at), /// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at) /// /// \date 2020 /// /// \brief Facitilities to read from MQTT brokers. /// //===----------------------------------------------------------------------===// #ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP #define ROSA_SUPPORT_MQTT_MQTTREADER_HPP #include "mqtt/async_client.h" #include "rosa/support/debug.hpp" #include "rosa/support/sequence.hpp" #include #include #include #include #include #include #include #include namespace rosa { namespace MQTT { // @TODO this is a copy of CSVReader.hpp. Move this functionalities to common // file /// Anonymous namespace providing implementation details for /// \c rosa::csv::CSVIterator, consider it private. namespace { /// Provides facility for parsing one value from a string. /// /// \tparam T type of value to parse /// \tparam IsSignedInt if \p T is a signed integral type, always use default /// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use /// default /// \tparam IsFloat if \p T is a floating-point type, always use default /// \tparam IsString if \p T is \c std::string, always use default /// /// \note Specializations of this struct are provided for arithmentic types /// and \c std::string. template ::value && std::is_signed::value), bool IsUnsignedInt = (std::is_integral::value && std::is_unsigned::value), bool IsFloat = std::is_floating_point::value, bool IsString = std::is_same::value> struct ValueParser { /// /// /// \param Cell the \c std::string to parse /// /// \return the parsed value /// /// \note The function silently fails if cannot parse \p Cell for type \p T. static T parse(const std::string &Cell) noexcept; }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_signed::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoll(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_integral::value && std::is_unsigned::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stoull(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_floating_point::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return static_cast(std::stold(Cell)); } }; template struct ValueParser { STATIC_ASSERT((std::is_same::value), "wrong type"); // Sanity check. static T parse(const std::string &Cell) noexcept { return Cell; } }; } // End namespace +//@Benedikt: should this class not in another file? +/** + * Local callback & listener class for use with the client connection. + * This implementation is to receive messages but the mqtt::callback interface + * allows further actions to be defined. + */ +class MQTTCallback : public virtual mqtt::callback { + + /** Callback for when a message arrives. + * @param Msg Pointer for the MQTT message + **/ + void message_arrived(mqtt::const_message_ptr Msg) override { + std::string Topic = Msg->get_topic(); + std::string Message = Msg->to_string(); + LOG_INFO_STREAM << "[Message @ " << Topic << "] " << Message << std::endl; + } + +public: + /* Constructor */ + MQTTCallback(void) noexcept = default; +}; + /// Provides `InputIterator` features for iterating over a MQTT file. /// /// The iterator parses rows into `std::tuple` values and iterates over the /// file row by row. /// /// \tparam T type of values stored in one row of the MQTT file /// /// \note The iterator expects each row to consists of fields matching \p Ts. /// /// \note The implementation relies on \c rosa::MQTT::MQTTRow, which in turn /// relies on \c rosa::MQTT::MQTTRowParser, which is implemented only for /// `arithmetic` types -- signed and unsigned integral types and floating-point /// types -- and for \c std::string. Those are the valid values for \p Ts template class MQTTIterator : public virtual mqtt::callback { public: /// \defgroup MQTTIteratorTypedefs Typedefs of rosa::MQTT::MQTTIterator /// /// Standard `typedef`s for iterators. /// ///@{ typedef std::input_iterator_tag iterator_category; ///< Category of the iterator. typedef T value_type; ///< Type of values iterated over. typedef std::size_t difference_type; ///< Type to identify distance. typedef T *pointer; ///< Pointer to the type iterated over. typedef T &reference; ///< Reference to the type iterated over. ///@} /// Creates a new instance. /// /// \param [in,out] S input stream to iterate over /// \param SkipRows the number of rows you want to skip only once at /// the beginning of the file. /// If you have an header in the file, it is supposed to be /// the first row, and it will be always read out. But after /// this header the next number of Rows will be skipped. /// \param HeaderInfo is used to know wheter the file contains an /// header row or not. /// The header has to be in the first row. /// \param Delimiter is the separator between the differnt values of /// the MQTT file. MQTTIterator(std::string &MQTTTopic, std::string ServerHost = "localhost", uint16_t ServerPort = 1883, size_t TimeoutSecs = 60) { this->MQTTTopic = MQTTTopic; this->ServerHost = ServerHost; this->ServerPort = ServerPort; this->TimeoutSecs = TimeoutSecs; // connect and register callback try { std::stringstream ss; ss << "tcp://" << ServerHost << ":" << ServerPort; const std::string ServerURI = ss.str(); LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl; mqtt::async_client Client(ServerURI, ""); LOG_INFO_STREAM << "Connecting to server" << std::endl; Client.connect()->wait(); LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic << "' for a short while..." << std::endl; - Client.set_callback(this); + MQTTCallback Callback; + Client.set_callback(Callback); Client.subscribe(this->MQTTTopic, {}); //@todo move to destructor // Client.disconnect()->wait(); } catch (const mqtt::exception &e) { //@Benedikt: Because of a syntax error, I had to change this to a notmal // cout (only for now) std::cout << "EXCEPTION!" << std::endl; // logErrorAndExit(e.what(), 1); } } /// Creates an empty new instance. MQTTIterator(void) noexcept {} /// Pre-increment operator. /// /// The implementation reads the next row. If the end of the input stream is /// reached, the operator becomes empty and has no further effect. /// /// \return \p this object after incrementing it. MQTTIterator &operator++() { - buffer.pop_front(); + buffer.pop(); return *this; } /// Post-increment operator. /// /// The implementation uses the pre-increment operator and returns a copy of /// the original state of \p this object. /// /// \return \p this object before incrementing it. MQTTIterator operator++(int) { MQTTIterator Tmp(*this); ++(*this); return Tmp; } /// Returns a constant reference to the current entry. /// /// \note Should not dereference the iterator when it is empty. /// /// \return constant reference to the current entry. - const T &operator*(void)const noexcept { return buffer.front() } + const T &operator*(void)const noexcept { return buffer.front(); } /// Returns a constant pointer to the current entry. /// /// \note Should not dereference the iterator when it is empty. /// /// \return constant pointer to the current entry. const T *operator->(void)const noexcept { return &(buffer.front()); } /// Tells if \p this object is equal to another one. /// /// Two \c rosa::MQTT::MQTTIterator instances are equal if and only if they /// are the same or both are empty. /// /// \param RHS other object to compare to /// /// \return whether \p this object is equal with \p RHS bool operator==(const MQTTIterator &RHS) const noexcept { return ((this == &RHS) || ((this->buffer.empty()) && (RHS.buffer.empty()))); } /// Tells if \p this object is not equal to another one. /// /// \see rosa::MQTT::MQTTIterator::operator== /// /// \param RHS other object to compare to /// /// \return whether \p this object is not equal with \p RHS. bool operator!=(const MQTTIterator &RHS) const noexcept { return !((*this) == RHS); } /** Callback for when a message arrives. * @param Msg Pointer for the MQTT message **/ void message_arrived(mqtt::const_message_ptr Msg) override { std::string Topic = Msg->get_topic(); std::string Message = Msg->to_string(); LOG_INFO_STREAM << "[Message @ " << Topic << "] " << Message << std::endl; //@todo convert and enqueue value buffer.push(parser.parse(Message)); } private: std::string ServerHost; uint16_t ServerPort; size_t TimeoutSecs; std::string MQTTTopic; std::queue buffer; ValueParser parser; }; } // End namespace MQTT } // End namespace rosa #endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP