Page MenuHomePhorge

MQTTReader.hpp
No OneTemporary

Size
11 KB
Referenced Files
None
Subscribers
None

MQTTReader.hpp

//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
// The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at),
/// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//
#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#include "mqtt/async_client.h"
#include "rosa/support/debug.hpp"
#include "rosa/support/sequence.hpp"
#include "rosa/support/terminal_colors.h"
#include <algorithm>
#include <iostream>
#include <map>
#include <set>
#include <sstream>
#include <thread>
#include <vector>
#include <queue>
namespace rosa {
namespace MQTT {
// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::csv::CSVIterator, consider it private.
namespace {
/// Helper function to print an error message in red color to the terminal and
/// exit from the application.
///
/// \note The function never returns as it calles `exit()`.
///
/// \param Error error message
/// \param ExitCode exit code to return from the application
void logErrorAndExit(const std::string &Error, const int ExitCode) {
LOG_ERROR_STREAM << rosa::terminal::Color::Red << Error
<< rosa::terminal::Color::Default << std::endl;
exit(ExitCode);
}
/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template <typename T,
bool IsSignedInt =
(std::is_integral<T>::value && std::is_signed<T>::value),
bool IsUnsignedInt =
(std::is_integral<T>::value && std::is_unsigned<T>::value),
bool IsFloat = std::is_floating_point<T>::value,
bool IsString = std::is_same<T, std::string>::value>
struct ValueParser {
///
///
/// \param Cell the \c std::string to parse
///
/// \return the parsed value
///
/// \note The function silently fails if cannot parse \p Cell for type \p T.
static T parse(const std::string &Cell) noexcept;
};
template <typename T> struct ValueParser<T, true, false, false, false> {
STATIC_ASSERT((std::is_integral<T>::value && std::is_signed<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stoll(Cell));
}
};
template <typename T> struct ValueParser<T, false, true, false, false> {
STATIC_ASSERT((std::is_integral<T>::value && std::is_unsigned<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stoull(Cell));
}
};
template <typename T> struct ValueParser<T, false, false, true, false> {
STATIC_ASSERT((std::is_floating_point<T>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept {
return static_cast<T>(std::stold(Cell));
}
};
template <typename T> struct ValueParser<T, false, false, false, true> {
STATIC_ASSERT((std::is_same<T, std::string>::value),
"wrong type"); // Sanity check.
static T parse(const std::string &Cell) noexcept { return Cell; }
};
} // End namespace
/**
* Local callback & listener class for use with the client connection.
* This implementation is to receive messages but the mqtt::callback interface
* allows further actions to be defined.
*/
template <typename T> class MQTTCallback : public virtual mqtt::callback {
/** Callback for when a message arrives.
* @param Msg Pointer for the MQTT message
**/
void message_arrived(mqtt::const_message_ptr Msg) override {
std::cout << "New value" << std::endl;
std::string Topic = Msg->get_topic();
std::string Message = Msg->to_string();
// LOG_INFO_STREAM << "[Message @ " << Topic << "] " << Message <<
// std::endl;
std::cout << "Got value " << parser.parse(Message) << std::endl;
buffer->push(parser.parse(Message));
*newValue = true;
}
private:
std::shared_ptr<std::queue<T>> buffer;
std::shared_ptr<bool> newValue;
ValueParser<T> parser;
public:
/* Constructor */
MQTTCallback(std::shared_ptr<std::queue<T>> buffer,
std::shared_ptr<bool> newValue) {
this->buffer = buffer;
this->newValue = newValue;
}
};
/// Provides `InputIterator` features for iterating over a MQTT file.
///
/// The iterator parses rows into `std::tuple` values and iterates over the
/// file row by row.
///
/// \tparam T type of values stored in one row of the MQTT file
///
/// \note The iterator expects each row to consists of fields matching \p Ts.
///
/// \note The implementation relies on \c rosa::MQTT::MQTTRow, which in turn
/// relies on \c rosa::MQTT::MQTTRowParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p Ts
template <typename T> class MQTTIterator : public virtual mqtt::callback {
public:
/// \defgroup MQTTIteratorTypedefs Typedefs of rosa::MQTT::MQTTIterator
///
/// Standard `typedef`s for iterators.
///
///@{
typedef std::input_iterator_tag
iterator_category; ///< Category of the iterator.
typedef T value_type; ///< Type of values iterated over.
typedef std::size_t difference_type; ///< Type to identify distance.
typedef T *pointer; ///< Pointer to the type iterated over.
typedef T &reference; ///< Reference to the type iterated over.
///@}
/// Creates a new instance.
///
/// \param [in,out] S input stream to iterate over
/// \param SkipRows the number of rows you want to skip only once at
/// the beginning of the file.
/// If you have an header in the file, it is supposed to be
/// the first row, and it will be always read out. But after
/// this header the next number of Rows will be skipped.
/// \param HeaderInfo is used to know wheter the file contains an
/// header row or not.
/// The header has to be in the first row.
/// \param Delimiter is the separator between the differnt values of
/// the MQTT file.
MQTTIterator(std::string &MQTTTopic, std::string ServerHost = "localhost",
uint16_t ServerPort = 1883) {
this->MQTTTopic = MQTTTopic;
this->ServerHost = ServerHost;
this->ServerPort = ServerPort;
// connect and register callback
try {
std::stringstream ss;
ss << "tcp://" << ServerHost << ":" << ServerPort;
const std::string ServerURI = ss.str();
LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl;
Client = std::make_shared<mqtt::async_client>(ServerURI, "");
LOG_INFO_STREAM << "Connecting to server" << std::endl;
Client->connect()->wait();
LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic
<< "' for a short while..." << std::endl;
Callback =
std::make_shared<MQTTCallback<T>>(this->buffer, this->newValue);
Client->set_callback(*Callback);
Client->subscribe(this->MQTTTopic, {});
//@todo move to destructor
// Client.disconnect()->wait();
} catch (const mqtt::exception &e) {
std::cout << "EXCEPTION!" << std::endl;
logErrorAndExit(e.what(), 1);
}
}
/// Creates an empty new instance.
MQTTIterator(void) noexcept {}
const void waitForNewValue() const noexcept {
std::cout << "waitForNewValue" << std::endl;
while (!newValue) {
std::cout << "+";
}
std::cout << std::endl << "got new value" << std::endl;
*newValue = false;
}
const void waitForAnyValue() const noexcept {
std::cout << "waitForAnyValue" << std::endl;
while (buffer->empty()) {
std::cout << "-";
}
}
/// Pre-increment operator.
///
/// The implementation reads the next row. If the end of the input stream is
/// reached, the operator becomes empty and has no further effect.
///
/// \return \p this object after incrementing it.
MQTTIterator &operator++() {
waitForNewValue();
buffer->pop();
return *this;
}
/// Post-increment operator.
///
/// The implementation uses the pre-increment operator and returns a copy of
/// the original state of \p this object.
///
/// \return \p this object before incrementing it.
MQTTIterator operator++(int) {
waitForNewValue();
MQTTIterator Tmp(*this);
++(*this);
return Tmp;
}
/// Returns a constant reference to the current entry.
///
/// \note Should not dereference the iterator when it is empty.
///
/// \return constant reference to the current entry.
const T &operator*(void)const noexcept {
waitForAnyValue();
return buffer->front();
}
/// Returns a constant pointer to the current entry.
///
/// \note Should not dereference the iterator when it is empty.
///
/// \return constant pointer to the current entry.
const T *operator->(void)const noexcept {
waitForAnyValue();
return &(buffer->front());
}
/// Tells if \p this object is equal to another one.
///
/// Two \c rosa::MQTT::MQTTIterator instances are equal if and only if they
/// are the same or both are empty.
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is equal with \p RHS
bool operator==(const MQTTIterator &RHS) const noexcept {
std::cout << "equal operator" << std::endl;
return (this ==
&RHS); // ||
//((this->buffer->empty()) && (RHS.buffer->empty())));
}
/// Tells if \p this object is not equal to another one.
///
/// \see rosa::MQTT::MQTTIterator::operator==
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is not equal with \p RHS.
bool operator!=(const MQTTIterator &RHS) const noexcept {
return !((*this) == RHS);
}
private:
std::string ServerHost;
uint16_t ServerPort;
std::string MQTTTopic;
std::shared_ptr<std::queue<T>> buffer = std::make_shared<std::queue<T>>();
std::shared_ptr<bool> newValue = std::make_shared<bool>(false);
std::shared_ptr<mqtt::async_client> Client;
std::shared_ptr<MQTTCallback<T>> Callback;
};
} // End namespace MQTT
} // End namespace rosa
#endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP

File Metadata

Mime Type
text/x-c++
Expires
Sun, May 31, 5:05 PM (1 d, 4 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
328833
Default Alt Text
MQTTReader.hpp (11 KB)

Event Timeline