Page MenuHomePhorge

MQTTReader.hpp
No OneTemporary

Size
12 KB
Referenced Files
None
Subscribers
None

MQTTReader.hpp

//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
// The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at),
/// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at),
/// David Juhasz (david.juhasz@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//
#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#include "rosa/support/concurrent_queue.hpp"
#include "rosa/support/debug.hpp"
#include "rosa/support/diagnostics.h"
#include "rosa/support/log.h"
// NOTE: MSVC warns about some things in the following header.
ROSA_DISABLE_WARNING_PUSH
ROSA_DISABLE_WARNING_MSVC_C4100
ROSA_DISABLE_WARNING_MSVC_C4201
#include "mqtt/async_client.h"
ROSA_DISABLE_WARNING_POP
#include <iostream>
#include <sstream>
namespace rosa {
namespace mqtt {
// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::mqtt::MQTTCallback, consider it private.
namespace {
/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template <typename T,
bool IsSignedInt =
(std::is_integral<T>::value && std::is_signed<T>::value),
bool IsUnsignedInt =
(std::is_integral<T>::value && std::is_unsigned<T>::value),
bool IsFloat = std::is_floating_point<T>::value,
bool IsString = std::is_same<T, std::string>::value>
struct ValueParser {
///
///
/// \param Cell the \c std::string to parse
///
/// \return the parsed value
///
/// \note The function silently fails if cannot parse \p Cell for type \p T.
static T parse(const std::string &Cell) noexcept;
};
template <typename T> struct ValueParser<T, true, false, false, false> {
STATIC_ASSERT((std::is_integral<T>::value && std::is_signed<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stoll(Cell));
}
};
template <typename T> struct ValueParser<T, false, true, false, false> {
STATIC_ASSERT((std::is_integral<T>::value && std::is_unsigned<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stoull(Cell));
}
};
template <typename T> struct ValueParser<T, false, false, true, false> {
STATIC_ASSERT((std::is_floating_point<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stold(Cell));
}
};
template <typename T> struct ValueParser<T, false, false, false, true> {
STATIC_ASSERT((std::is_same<T, std::string>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept { return Cell; }
};
} // End namespace
/**
* Helper class managing the connection to the MQTT broker, also serves as
* callback & listener for use with the client connection.
*/
template <typename T> class MQTTHandler : public virtual ::mqtt::callback {
/** Callback for when a message arrives.
* @param Msg Pointer for the MQTT message
**/
void message_arrived(::mqtt::const_message_ptr Msg) override {
std::string Message = Msg->to_string();
const auto Value = ValueParser<T>::parse(Message);
Buffer->push(Value);
LOG_DEBUG_STREAM << "Got value [" << Msg->get_topic() << "] " << Value
<< std::endl;
}
/**
* This method is called when the connection to the server is lost.
* @param Cause Why connection got lost
*/
void connection_lost(const std::string &Cause) override {
LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl;
}
protected:
/**
* This function establishes the connection to the MQTT broker according to
* parameters set in member fields by the constructor.
*/
void establish_connection(void) {
ASSERT(!Client);
// Connect and register callback.
std::stringstream ss;
ss << "tcp://" << ServerHost << ":" << ServerPort;
const std::string ServerURI = ss.str();
LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl;
Client = std::make_unique<::mqtt::async_client>(ServerURI, "");
LOG_INFO_STREAM << "Connecting to server" << std::endl;
Client->connect()->wait();
Client->set_callback(*this);
Client->subscribe(MQTTTopic, {});
LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic << "'"
<< std::endl;
}
private:
const std::string ServerHost;
const uint16_t ServerPort;
const std::string MQTTTopic;
const std::shared_ptr<concurrent_queue<T>> Buffer;
std::unique_ptr<::mqtt::async_client> Client;
public:
/// Constructor
///
/// \param MQTTTopic MQTT topic to subscribe to
/// \param ServerHost Hostname of MQTT broker to connect to
/// \param ServerPort Port number of MQTT broker to connect to
/// \param Buffer Container to put messages into
MQTTHandler(const std::string &MQTTTopic, const std::string &ServerHost,
const uint16_t ServerPort,
const std::shared_ptr<concurrent_queue<T>> &Buffer) noexcept
: ServerHost(ServerHost), ServerPort(ServerPort), MQTTTopic(MQTTTopic),
Buffer(Buffer), Client(nullptr) {
establish_connection();
}
/// Destructor
virtual ~MQTTHandler(void) {
// Can do cleanup here as necessary.
if (Client) {
LOG_INFO_STREAM << "Disconnecting from MQTT broker." << std::endl;
// Would disconnect upon destruction here just to wait for it.
Client->disconnect()->wait();
}
}
};
/// Provides `InputIterator` features for iterating over messages published in
/// an MQTT topic.
///
/// \todo Make \c rosa::mqtt::MQTTIterator be able to recover connection in case
/// of any error.
///
/// \tparam T type of values stored in MQTT messages
///
///
/// \note The referred value of the iterator is initialized to a default
/// initialized instance of \p T upon creation. The first received message is
/// read in upon the first dereferencing of the iterator by automatically
/// incrementing the iterator before returning the referred value. The
/// automatic incrementation upon the first dereferencing happens no matter if
/// the iterator has already been incremented explicitly. The automatic
/// incrementation may block if the iterator has been created with blocking
/// behavior. On the other hand, it may leave the default value of \p T if the
/// instance has been created with non-blocking behavior. \see \c Blocking
/// for \c rosa:mqtt:MQTTIterator::MQTTIterator().
///
/// \note The iterator expects each MQTT message to match \p T
///
/// \note The implementation relies on \c rosa::mqtt::MQTTCallback, which in
/// turn relies on \c rosa::mqtt::ValueParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p T
template <typename T> class MQTTIterator {
public:
/// \defgroup MQTTIteratorTypedefs Typedefs of rosa::mqtt::MQTTIterator
///
/// Standard `typedef`s for iterators.
///
///@{
typedef std::input_iterator_tag
iterator_category; ///< Category of the iterator.
typedef T value_type; ///< Type of values iterated over.
typedef std::size_t difference_type; ///< Type to identify distance.
typedef T *pointer; ///< Pointer to the type iterated over.
typedef T &reference; ///< Reference to the type iterated over.
///@}
/// Creates a new instance.
///
/// \param MQTTTopic MQTT topic to subscribe to
/// \param Blocking Whether to block if next value is not arrived when
/// incrementing
/// \param ServerHost Hostname of MQTT broker to connect to
/// \param ServerPort Port number of MQTT broker to connect to
///
/// \note If \p Blocking, incrementing the iterator blocks in case the next
/// value has not arrived yet. If not \p Blocking, incrementing the iterator
/// has no effect when the next value is not arrived yet.
MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true,
const std::string ServerHost = "localhost",
const uint16_t ServerPort = 1883) noexcept
: Empty(false), Blocking(Blocking),
FirstValue(std::make_shared<std::once_flag>()), Current(),
Buffer(std::make_shared<concurrent_queue<T>>()),
Handler(std::make_shared<MQTTHandler<T>>(MQTTTopic, ServerHost,
ServerPort, Buffer)) {}
/// Creates an empty new instance.
MQTTIterator(void) noexcept
: Empty(true), Blocking(false),
FirstValue(std::make_shared<std::once_flag>()), Current(),
Buffer(std::make_shared<concurrent_queue<T>>()), Handler(nullptr) {}
/// Pre-increment operator.
///
/// The implementation reads the next value. The operation blocks if \c
/// Blocking and the next value has not arrived yet. If not \c Blocking and no
/// new value is ready yet, the oeprator has no effect.
///
/// \return \p this object after incrementing it.
MQTTIterator &operator++() {
LOG_DEBUG_STREAM << "Getting next value..." << std::endl;
if (Buffer->pop(Current, Blocking) == concurrent_queue<T>::element_popped) {
LOG_DEBUG_STREAM << "Got next value." << std::endl;
} else {
LOG_DEBUG_STREAM << "Next value is not ready yet." << std::endl;
}
return *this;
}
/// Post-increment operator.
///
/// The implementation uses the pre-increment operator and returns a copy of
/// the original state of \p this object.
///
/// \return \p this object before incrementing it.
MQTTIterator operator++(int) {
MQTTIterator Tmp(*this);
++(*this);
return Tmp;
}
/// Returns a constant reference to the current entry.
///
/// \note Automatically increments the iterator to read in the first value
/// upon the firs dereferencing (i.e, calling either \c operator*() or \c
/// operator->() ).
///
/// \return constant reference to the current entry.
const T &operator*(void) noexcept {
std::call_once(*FirstValue, [&](void) { ++(*this); });
return Current;
}
/// Returns a constant pointer to the current entry.
///
/// \note Automatically increments the iterator to read in the first value
/// upon the firs dereferencing (i.e, calling either \c operator*() or \c
/// operator->() ).
///
/// \return constant pointer to the current entry.
const T *operator->(void) noexcept {
std::call_once(*FirstValue, [&](void) { ++(*this); });
return &Current;
}
/// Tells if \p this object is equal to another one.
///
/// Two \c rosa::mqtt::MQTTIterator instances are equal if and only if they
/// are the same or both are empty.
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is equal with \p RHS
bool operator==(const MQTTIterator &RHS) const noexcept {
return this == &RHS || (this->Empty && RHS.Empty);
}
/// Tells if \p this object is not equal to another one.
///
/// \see rosa::mqtt::MQTTIterator::operator==
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is not equal with \p RHS.
bool operator!=(const MQTTIterator &RHS) const noexcept {
return !((*this) == RHS);
}
private:
const bool Empty;
const bool Blocking;
const std::shared_ptr<std::once_flag> FirstValue;
T Current;
const std::shared_ptr<concurrent_queue<T>> Buffer;
const std::shared_ptr<MQTTHandler<T>> Handler;
};
} // End namespace mqtt
} // End namespace rosa
#endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP

File Metadata

Mime Type
text/x-c++
Expires
Sun, May 31, 4:31 PM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
328604
Default Alt Text
MQTTReader.hpp (12 KB)

Event Timeline