Page MenuHomePhorge

No OneTemporary

Size
12 KB
Referenced Files
None
Subscribers
None
diff --git a/include/rosa/support/mqtt/MQTTReader.hpp b/include/rosa/support/mqtt/MQTTReader.hpp
index 9af3d8c..97fc08e 100644
--- a/include/rosa/support/mqtt/MQTTReader.hpp
+++ b/include/rosa/support/mqtt/MQTTReader.hpp
@@ -1,328 +1,316 @@
//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
// The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at),
/// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at),
/// David Juhasz (david.juhasz@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//
#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#include "mqtt/async_client.h"
#include "rosa/support/debug.hpp"
#include <iostream>
#include <queue>
#include <sstream>
namespace rosa {
namespace MQTT {
// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::MQTT::MQTTCallback, consider it private.
namespace {
/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template <typename T,
bool IsSignedInt =
(std::is_integral<T>::value && std::is_signed<T>::value),
bool IsUnsignedInt =
(std::is_integral<T>::value && std::is_unsigned<T>::value),
bool IsFloat = std::is_floating_point<T>::value,
bool IsString = std::is_same<T, std::string>::value>
struct ValueParser {
///
///
/// \param Cell the \c std::string to parse
///
/// \return the parsed value
///
/// \note The function silently fails if cannot parse \p Cell for type \p T.
static T parse(const std::string &Cell) noexcept;
};
template <typename T> struct ValueParser<T, true, false, false, false> {
STATIC_ASSERT((std::is_integral<T>::value && std::is_signed<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stoll(Cell));
}
};
template <typename T> struct ValueParser<T, false, true, false, false> {
STATIC_ASSERT((std::is_integral<T>::value && std::is_unsigned<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stoull(Cell));
}
};
template <typename T> struct ValueParser<T, false, false, true, false> {
STATIC_ASSERT((std::is_floating_point<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stold(Cell));
}
};
template <typename T> struct ValueParser<T, false, false, false, true> {
STATIC_ASSERT((std::is_same<T, std::string>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept { return Cell; }
};
} // End namespace
/**
* Local callback & listener class for use with the client connection.
* This implementation is to receive messages but the mqtt::callback interface
* allows further actions to be defined.
*/
template <typename T> class MQTTCallback : public virtual mqtt::callback {
/** Callback for when a message arrives.
* @param Msg Pointer for the MQTT message
**/
void message_arrived(mqtt::const_message_ptr Msg) override {
LOG_DEBUG_STREAM << "New value" << std::endl;
std::string Message = Msg->to_string();
const auto Value = ValueParser<T>::parse(Message);
LOG_DEBUG_STREAM << "Got value " << Value << std::endl;
Buffer->push(Value);
}
/**
* This method is called when the connection to the server is lost.
* @param Cause Why connection got lost
*/
void connection_lost(const std::string &Cause) override {
LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl;
}
private:
const std::shared_ptr<std::queue<T>> Buffer;
public:
/* Constructor */
MQTTCallback(const std::shared_ptr<std::queue<T>> Buffer) noexcept
: Buffer(Buffer) {}
};
/// Provides `InputIterator` features for iterating over messages published in
/// an MQTT topic.
///
/// \todo Make \c rosa::MQTT::MQTTITerator::Buffer thread-safe.
/// \todo Make \c rosa::MQTT::MQTTIterator be able to recover connection in case
/// of any error.
///
/// \tparam T type of values stored in MQTT messages
///
///
/// \note The referred value of the iterator is initialized to a default
/// initialized instance of \p T upon creation. The first received message may
/// be read in by incrementing the iterator first. Also \see \c
/// rosa::MQTT::MQTTIterator
///
/// \note The iterator expects each MQTT message to match \p T
///
/// \note The implementation relies on \c rosa::MQTT::MQTTCallback, which in
/// turn relies on \c rosa::MQTT::ValueParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p T
template <typename T> class MQTTIterator {
public:
/// \defgroup MQTTIteratorTypedefs Typedefs of rosa::MQTT::MQTTIterator
///
/// Standard `typedef`s for iterators.
///
///@{
typedef std::input_iterator_tag
iterator_category; ///< Category of the iterator.
typedef T value_type; ///< Type of values iterated over.
typedef std::size_t difference_type; ///< Type to identify distance.
typedef T *pointer; ///< Pointer to the type iterated over.
typedef T &reference; ///< Reference to the type iterated over.
///@}
/// Creates a new instance.
///
/// \param MQTTTopic MQTT topic to subscribe to
/// \param Blocking Whether to block if next value is not arrived when
/// incrementing
/// \param ServerHost Hostname of MQTT broker to connect to
/// \param ServerPort Port number of MQTT broker to connect to
///
/// \note If \p Blocking, incrementing the iterator blocks in case the next
/// value has not arrived yet. If not \p Blocking, incrementing the iterator
/// has no effect when the next value is not arrived yet; \c nextReady() can
/// be used to check whether incrementing can move to the next value.
MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true,
const std::string ServerHost = "localhost",
const uint16_t ServerPort = 1883) noexcept
: Empty(false), Blocking(Blocking), ServerHost(ServerHost),
- ServerPort(ServerPort), MQTTTopic(MQTTTopic),
+ ServerPort(ServerPort), MQTTTopic(MQTTTopic), Current(),
Buffer(std::make_shared<std::queue<T>>()),
Callback(std::make_shared<MQTTCallback<T>>(Buffer)) {
- // Initialize the Buffer with a default value.
- Buffer->push(T());
-
// Connect and register callback.
std::stringstream ss;
ss << "tcp://" << ServerHost << ":" << ServerPort;
const std::string ServerURI = ss.str();
LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl;
Client = std::make_shared<mqtt::async_client>(ServerURI, "");
// @note async_client is destructed when the shared pointer is destructed.
LOG_INFO_STREAM << "Connecting to server" << std::endl;
Client->connect()->wait();
LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic
<< "' for a short while..." << std::endl;
Client->set_callback(*Callback);
Client->subscribe(this->MQTTTopic, {});
}
/// Creates an empty new instance.
MQTTIterator(void) noexcept
: Empty(true), Blocking(false), ServerHost(""), ServerPort(0),
- MQTTTopic(""), Buffer(std::make_shared<std::queue<T>>()),
- Callback(nullptr) {
- // Put an element into Buffer and will keep it forever.
- Buffer->push(T());
- }
+ MQTTTopic(""), Current(), Buffer(std::make_shared<std::queue<T>>()),
+ Callback(nullptr) {}
/// Tells whether incrementing \p this can move to the next value.
///
/// \return if the next value has already been received
- inline bool nextReady(void) const noexcept { return Buffer->size() > 1; }
+ inline bool nextReady(void) const noexcept { return !Buffer->empty(); }
/// Pre-increment operator.
///
/// The implementation reads the next value. If If the end of the input stream is
/// reached, the operator becomes empty and has no further effect.
///
/// \return \p this object after incrementing it.
MQTTIterator &operator++() {
- ASSERT(!Buffer->empty());
-
// Wait if Blocking and next value is not ready yet.
auto &out = LOG_DEBUG_STREAM;
out << "Getting next value ";
while (Blocking && !nextReady()) {
out << '+';
}
out << '.' << std::endl;
// Next value is ready, or not Blocking and so we leave the last one
if (nextReady()) {
LOG_DEBUG_STREAM << "Moving MQTTIterator to next value." << std::endl;
+ Current = Buffer->front();
Buffer->pop();
} else {
LOG_DEBUG_STREAM << "Trying to move MQTTIterator without next value."
<< std::endl;
}
return *this;
}
/// Post-increment operator.
///
/// The implementation uses the pre-increment operator and returns a copy of
/// the original state of \p this object.
///
/// \return \p this object before incrementing it.
MQTTIterator operator++(int) {
MQTTIterator Tmp(*this);
++(*this);
return Tmp;
}
/// Returns a constant reference to the current entry.
///
/// \note Should not dereference the iterator when it is empty.
///
/// \return constant reference to the current entry.
- const T &operator*(void)const noexcept {
- ASSERT(!Buffer->empty());
- return Buffer->front();
- }
+ const T &operator*(void)const noexcept { return Current; }
/// Returns a constant pointer to the current entry.
///
/// \note Should not dereference the iterator when it is empty.
///
/// \return constant pointer to the current entry.
- const T *operator->(void)const noexcept {
- ASSERT(!Buffer->empty());
- return &(Buffer->front());
- }
+ const T *operator->(void)const noexcept { return &Current; }
/// Tells if \p this object is equal to another one.
///
/// Two \c rosa::MQTT::MQTTIterator instances are equal if and only if they
/// are the same or both are empty.
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is equal with \p RHS
bool operator==(const MQTTIterator &RHS) const noexcept {
return this == &RHS || (this->Empty && RHS.Empty);
}
/// Tells if \p this object is not equal to another one.
///
/// \see rosa::MQTT::MQTTIterator::operator==
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is not equal with \p RHS.
bool operator!=(const MQTTIterator &RHS) const noexcept {
return !((*this) == RHS);
}
private:
const bool Empty;
const bool Blocking;
const std::string ServerHost;
const uint16_t ServerPort;
const std::string MQTTTopic;
+ T Current;
const std::shared_ptr<std::queue<T>> Buffer;
const std::shared_ptr<MQTTCallback<T>> Callback;
std::shared_ptr<mqtt::async_client> Client;
};
} // End namespace MQTT
} // End namespace rosa
#endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP

File Metadata

Mime Type
text/x-diff
Expires
Mon, Dec 1, 12:44 AM (1 d, 4 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
242596
Default Alt Text
(12 KB)

Event Timeline