//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
//                                 The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer    (benedikt.tutzer@tuwien.ac.at),
///          Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at),
///          David Juhasz (david.juhasz@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//

#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP

#include "mqtt/async_client.h"
#include "rosa/support/debug.hpp"

#include <iostream>
#include <queue>
#include <sstream>

namespace rosa {
namespace MQTT {

// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::MQTT::MQTTCallback, consider it private.
namespace {

/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template <typename T,
          bool IsSignedInt =
              (std::is_integral<T>::value && std::is_signed<T>::value),
          bool IsUnsignedInt =
              (std::is_integral<T>::value && std::is_unsigned<T>::value),
          bool IsFloat = std::is_floating_point<T>::value,
          bool IsString = std::is_same<T, std::string>::value>
struct ValueParser {

  ///
  ///
  /// \param Cell the \c std::string to parse
  ///
  /// \return the parsed value
  ///
  /// \note The function silently fails if cannot parse \p Cell for type \p T.
  static T parse(const std::string &Cell) noexcept;
};

template <typename T> struct ValueParser<T, true, false, false, false> {
  STATIC_ASSERT((std::is_integral<T>::value && std::is_signed<T>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept {
    return static_cast<T>(std::stoll(Cell));
  }
};

template <typename T> struct ValueParser<T, false, true, false, false> {
  STATIC_ASSERT((std::is_integral<T>::value && std::is_unsigned<T>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept {
    return static_cast<T>(std::stoull(Cell));
  }
};

template <typename T> struct ValueParser<T, false, false, true, false> {
  STATIC_ASSERT((std::is_floating_point<T>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept {
    return static_cast<T>(std::stold(Cell));
  }
};

template <typename T> struct ValueParser<T, false, false, false, true> {
  STATIC_ASSERT((std::is_same<T, std::string>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept { return Cell; }
};

} // End namespace

/**
 * Local callback & listener class for use with the client connection.
 * This implementation is to receive messages but the mqtt::callback interface
 * allows further actions to be defined.
 */
template <typename T> class MQTTCallback : public virtual mqtt::callback {

  /** Callback for when a message arrives.
   * @param Msg Pointer for the MQTT message
   **/
  void message_arrived(mqtt::const_message_ptr Msg) override {
    LOG_DEBUG_STREAM << "New value" << std::endl;
    std::string Message = Msg->to_string();
    const auto Value = ValueParser<T>::parse(Message);
    LOG_DEBUG_STREAM << "Got value " << Value << std::endl;
    Buffer->push(Value);
  }

  /**
   * This method is called when the connection to the server is lost.
   * @param Cause Why connection got lost
   */
  void connection_lost(const std::string &Cause) override {
    LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl;
  }

private:
  const std::shared_ptr<std::queue<T>> Buffer;

public:
  /* Constructor */
  MQTTCallback(const std::shared_ptr<std::queue<T>> Buffer) noexcept
      : Buffer(Buffer) {}
};

/// Provides `InputIterator` features for iterating over messages published in
/// an MQTT topic.
///
/// \todo Make \c rosa::MQTT::MQTTITerator::Buffer thread-safe.
/// \todo Make \c rosa::MQTT::MQTTIterator be able to recover connection in case
/// of any error.
///
/// \tparam T type of values stored in MQTT messages
///
///
/// \note The referred value of the iterator is initialized to a default
/// initialized instance of \p T upon creation. The first received message may
/// be read in by incrementing the iterator first. Also \see \c
/// rosa::MQTT::MQTTIterator
///
/// \note The iterator expects each MQTT message to match \p T
///
/// \note The implementation relies on \c rosa::MQTT::MQTTCallback, which in
/// turn relies on \c rosa::MQTT::ValueParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p T
template <typename T> class MQTTIterator {
public:
  /// \defgroup MQTTIteratorTypedefs Typedefs of rosa::MQTT::MQTTIterator
  ///
  /// Standard `typedef`s for iterators.
  ///
  ///@{
  typedef std::input_iterator_tag
      iterator_category;               ///< Category of the iterator.
  typedef T value_type;                ///< Type of values iterated over.
  typedef std::size_t difference_type; ///< Type to identify distance.
  typedef T *pointer;                  ///< Pointer to the type iterated over.
  typedef T &reference;                ///< Reference to the type iterated over.
  ///@}

  /// Creates a new instance.
  ///
  /// \param MQTTTopic MQTT topic to subscribe to
  /// \param Blocking Whether to block if next value is not arrived when
  /// incrementing
  /// \param ServerHost Hostname of MQTT broker to connect to
  /// \param ServerPort Port number of MQTT broker to connect to
  ///
  /// \note If \p Blocking, incrementing the iterator blocks in case the next
  /// value has not arrived yet. If not \p Blocking, incrementing the iterator
  /// has no effect when the next value is not arrived yet; \c nextReady() can
  /// be used to check whether incrementing can move to the next value.
  MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true,
               const std::string ServerHost = "localhost",
               const uint16_t ServerPort = 1883) noexcept
      : Empty(false), Blocking(Blocking), ServerHost(ServerHost),
        ServerPort(ServerPort), MQTTTopic(MQTTTopic), Current(),
        Buffer(std::make_shared<std::queue<T>>()),
        Callback(std::make_shared<MQTTCallback<T>>(Buffer)) {
    // Connect and register callback.
    std::stringstream ss;
    ss << "tcp://" << ServerHost << ":" << ServerPort;
    const std::string ServerURI = ss.str();
    LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl;
    Client = std::make_shared<mqtt::async_client>(ServerURI, "");
    // @note async_client is destructed when the shared pointer is destructed.

    LOG_INFO_STREAM << "Connecting to server" << std::endl;
    Client->connect()->wait();
    LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic
                    << "' for a short while..." << std::endl;

    Client->set_callback(*Callback);
    Client->subscribe(this->MQTTTopic, {});
  }

  /// Creates an empty new instance.
  MQTTIterator(void) noexcept
      : Empty(true), Blocking(false), ServerHost(""), ServerPort(0),
        MQTTTopic(""), Current(), Buffer(std::make_shared<std::queue<T>>()),
        Callback(nullptr) {}

  /// Tells whether incrementing \p this can move to the next value.
  ///
  /// \return if the next value has already been received
  inline bool nextReady(void) const noexcept { return !Buffer->empty(); }

  /// Pre-increment operator.
  ///
  /// The implementation reads the next value. If If the end of the input stream is
  /// reached, the operator becomes empty and has no further effect.
  ///
  /// \return \p this object after incrementing it.
  MQTTIterator &operator++() {
    // Wait if Blocking and next value is not ready yet.
    auto &out = LOG_DEBUG_STREAM;
    out << "Getting next value ";
    while (Blocking && !nextReady()) {
      out << '+';
    }
    out << '.' << std::endl;

    // Next value is ready, or not Blocking and so we leave the last one
    if (nextReady()) {
      LOG_DEBUG_STREAM << "Moving MQTTIterator to next value." << std::endl;
      Current = Buffer->front();
      Buffer->pop();
    } else {
      LOG_DEBUG_STREAM << "Trying to move MQTTIterator without next value."
                       << std::endl;
    }
    return *this;
  }

  /// Post-increment operator.
  ///
  /// The implementation uses the pre-increment operator and returns a copy of
  /// the original state of \p this object.
  ///
  /// \return \p this object before incrementing it.
  MQTTIterator operator++(int) {
    MQTTIterator Tmp(*this);
    ++(*this);
    return Tmp;
  }

  /// Returns a constant reference to the current entry.
  ///
  /// \note Should not dereference the iterator when it is empty.
  ///
  /// \return constant reference to the current entry.
  const T &operator*(void)const noexcept { return Current; }

  /// Returns a constant pointer to the current entry.
  ///
  /// \note Should not dereference the iterator when it is empty.
  ///
  /// \return constant pointer to the current entry.
  const T *operator->(void)const noexcept { return &Current; }

  /// Tells if \p this object is equal to another one.
  ///
  /// Two \c rosa::MQTT::MQTTIterator instances are equal if and only if they
  /// are the same or both are empty.
  ///
  /// \param RHS other object to compare to
  ///
  /// \return whether \p this object is equal with \p RHS
  bool operator==(const MQTTIterator &RHS) const noexcept {
    return this == &RHS || (this->Empty && RHS.Empty);
  }

  /// Tells if \p this object is not equal to another one.
  ///
  /// \see rosa::MQTT::MQTTIterator::operator==
  ///
  /// \param RHS other object to compare to
  ///
  /// \return whether \p this object is not equal with \p RHS.
  bool operator!=(const MQTTIterator &RHS) const noexcept {
    return !((*this) == RHS);
  }

private:
  const bool Empty;
  const bool Blocking;
  const std::string ServerHost;
  const uint16_t ServerPort;
  const std::string MQTTTopic;
  T Current;
  const std::shared_ptr<std::queue<T>> Buffer;
  const std::shared_ptr<MQTTCallback<T>> Callback;
  std::shared_ptr<mqtt::async_client> Client;
};

} // End namespace MQTT
} // End namespace rosa

#endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP
