//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
//                                 The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer    (benedikt.tutzer@tuwien.ac.at),
///          Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at),
///          David Juhasz (david.juhasz@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//

#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP

#include "rosa/support/concurrent_queue.hpp"
#include "rosa/support/debug.hpp"
#include "rosa/support/diagnostics.h"
#include "rosa/support/log.h"

// NOTE: MSVC warns about some things in the following header.
ROSA_DISABLE_WARNING_PUSH
ROSA_DISABLE_WARNING_MSVC_C4100
ROSA_DISABLE_WARNING_MSVC_C4201
#include "mqtt/async_client.h"
ROSA_DISABLE_WARNING_POP

#include <iostream>
#include <sstream>

namespace rosa {
namespace mqtt {

// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::mqtt::MQTTCallback, consider it private.
namespace {

/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template <typename T,
          bool IsSignedInt =
              (std::is_integral<T>::value && std::is_signed<T>::value),
          bool IsUnsignedInt =
              (std::is_integral<T>::value && std::is_unsigned<T>::value),
          bool IsFloat = std::is_floating_point<T>::value,
          bool IsString = std::is_same<T, std::string>::value>
struct ValueParser {

  ///
  ///
  /// \param Cell the \c std::string to parse
  ///
  /// \return the parsed value
  ///
  /// \note The function silently fails if cannot parse \p Cell for type \p T.
  static T parse(const std::string &Cell) noexcept;
};

template <typename T> struct ValueParser<T, true, false, false, false> {
  STATIC_ASSERT((std::is_integral<T>::value && std::is_signed<T>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept {
    return static_cast<T>(std::stoll(Cell));
  }
};

template <typename T> struct ValueParser<T, false, true, false, false> {
  STATIC_ASSERT((std::is_integral<T>::value && std::is_unsigned<T>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept {
    return static_cast<T>(std::stoull(Cell));
  }
};

template <typename T> struct ValueParser<T, false, false, true, false> {
  STATIC_ASSERT((std::is_floating_point<T>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept {
    return static_cast<T>(std::stold(Cell));
  }
};

template <typename T> struct ValueParser<T, false, false, false, true> {
  STATIC_ASSERT((std::is_same<T, std::string>::value),
                "wrong type"); // Sanity check.
  static T parse(const std::string &Cell) noexcept { return Cell; }
};

} // End namespace

/**
 * Helper class managing the connection to the MQTT broker, also serves as
 * callback & listener for use with the client connection.
 */
template <typename T> class MQTTHandler : public virtual ::mqtt::callback {

  /** Callback for when a message arrives.
   * @param Msg Pointer for the MQTT message
   **/
  void message_arrived(::mqtt::const_message_ptr Msg) override {
    std::string Message = Msg->to_string();
    const auto Value = ValueParser<T>::parse(Message);
    Buffer->push(Value);
    LOG_DEBUG_STREAM << "Got value [" << Msg->get_topic() << "] " << Value
                     << std::endl;
  }

  /**
   * This method is called when the connection to the server is lost.
   * @param Cause Why connection got lost
   */
  void connection_lost(const std::string &Cause) override {
    LOG_INFO_STREAM << "Connection to MQTT broker lost: " << Cause << std::endl;
  }

protected:
  /**
   * This function establishes the connection to the MQTT broker according to
   * parameters set in member fields by the constructor.
   */
  void establish_connection(void) {
    ASSERT(!Client);

    // Connect and register callback.
    std::stringstream ss;
    ss << "tcp://" << ServerHost << ":" << ServerPort;
    const std::string ServerURI = ss.str();
    LOG_INFO_STREAM << "Initializing for " << ServerURI << std::endl;
    Client = std::make_unique<::mqtt::async_client>(ServerURI, "");

    LOG_INFO_STREAM << "Connecting to server" << std::endl;
    Client->connect()->wait();

    Client->set_callback(*this);
    Client->subscribe(MQTTTopic, {});
    LOG_INFO_STREAM << "Receiving messages from topic '" << MQTTTopic << "'"
                    << std::endl;
  }

private:
  const std::string ServerHost;
  const uint16_t ServerPort;
  const std::string MQTTTopic;
  const std::shared_ptr<concurrent_queue<T>> Buffer;
  std::unique_ptr<::mqtt::async_client> Client;

public:
  /// Constructor
  ///
  /// \param MQTTTopic MQTT topic to subscribe to
  /// \param ServerHost Hostname of MQTT broker to connect to
  /// \param ServerPort Port number of MQTT broker to connect to
  /// \param Buffer Container to put messages into
  MQTTHandler(const std::string &MQTTTopic, const std::string &ServerHost,
              const uint16_t ServerPort,
              const std::shared_ptr<concurrent_queue<T>> &Buffer) noexcept
      : ServerHost(ServerHost), ServerPort(ServerPort), MQTTTopic(MQTTTopic),
        Buffer(Buffer), Client(nullptr) {
    establish_connection();
  }

  /// Destructor
  virtual ~MQTTHandler(void) {
    // Can do cleanup here as necessary.
    if (Client) {
      LOG_INFO_STREAM << "Disconnecting from MQTT broker." << std::endl;
      // Would disconnect upon destruction here just to wait for it.
      Client->disconnect()->wait();
    }
  }
};

/// Provides `InputIterator` features for iterating over messages published in
/// an MQTT topic.
///
/// \todo Make \c rosa::mqtt::MQTTIterator be able to recover connection in case
/// of any error.
///
/// \tparam T type of values stored in MQTT messages
///
///
/// \note The referred value of the iterator is initialized to a default
/// initialized instance of \p T upon creation. The first received message is
/// read in upon the first dereferencing of the iterator by automatically
/// incrementing the iterator before returning the referred value. The
/// automatic incrementation upon the first dereferencing happens no matter if
/// the iterator has already been incremented explicitly. The automatic
/// incrementation may block if the iterator has been created with blocking
/// behavior. On the other hand, it may leave the default value of \p T if the
/// instance has been created with non-blocking behavior. \see \c Blocking
/// for \c rosa:mqtt:MQTTIterator::MQTTIterator().
///
/// \note The iterator expects each MQTT message to match \p T
///
/// \note The implementation relies on \c rosa::mqtt::MQTTCallback, which in
/// turn relies on \c rosa::mqtt::ValueParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p T
template <typename T> class MQTTIterator {
public:
  /// \defgroup MQTTIteratorTypedefs Typedefs of rosa::mqtt::MQTTIterator
  ///
  /// Standard `typedef`s for iterators.
  ///
  ///@{
  typedef std::input_iterator_tag
      iterator_category;               ///< Category of the iterator.
  typedef T value_type;                ///< Type of values iterated over.
  typedef std::size_t difference_type; ///< Type to identify distance.
  typedef T *pointer;                  ///< Pointer to the type iterated over.
  typedef T &reference;                ///< Reference to the type iterated over.
  ///@}

  /// Creates a new instance.
  ///
  /// \param MQTTTopic MQTT topic to subscribe to
  /// \param Blocking Whether to block if next value is not arrived when
  /// incrementing
  /// \param ServerHost Hostname of MQTT broker to connect to
  /// \param ServerPort Port number of MQTT broker to connect to
  ///
  /// \note If \p Blocking, incrementing the iterator blocks in case the next
  /// value has not arrived yet. If not \p Blocking, incrementing the iterator
  /// has no effect when the next value is not arrived yet.
  MQTTIterator(const std::string &MQTTTopic, const bool Blocking = true,
               const std::string ServerHost = "localhost",
               const uint16_t ServerPort = 1883) noexcept
      : Empty(false), Blocking(Blocking),
        FirstValue(std::make_shared<std::once_flag>()), Current(),
        Buffer(std::make_shared<concurrent_queue<T>>()),
        Handler(std::make_shared<MQTTHandler<T>>(MQTTTopic, ServerHost,
                                                 ServerPort, Buffer)) {}

  /// Creates an empty new instance.
  MQTTIterator(void) noexcept
      : Empty(true), Blocking(false),
        FirstValue(std::make_shared<std::once_flag>()), Current(),
        Buffer(std::make_shared<concurrent_queue<T>>()), Handler(nullptr) {}

  /// Pre-increment operator.
  ///
  /// The implementation reads the next value. The operation blocks if \c
  /// Blocking and the next value has not arrived yet. If not \c Blocking and no
  /// new value is ready yet, the oeprator has no effect.
  ///
  /// \return \p this object after incrementing it.
  MQTTIterator &operator++() {
    LOG_DEBUG_STREAM << "Getting next value..." << std::endl;
    if (Buffer->pop(Current, Blocking) == concurrent_queue<T>::element_popped) {
      LOG_DEBUG_STREAM << "Got next value." << std::endl;
    } else {
      LOG_DEBUG_STREAM << "Next value is not ready yet." << std::endl;
    }
    return *this;
  }

  /// Post-increment operator.
  ///
  /// The implementation uses the pre-increment operator and returns a copy of
  /// the original state of \p this object.
  ///
  /// \return \p this object before incrementing it.
  MQTTIterator operator++(int) {
    MQTTIterator Tmp(*this);
    ++(*this);
    return Tmp;
  }

  /// Returns a constant reference to the current entry.
  ///
  /// \note Automatically increments the iterator to read in the first value
  /// upon the firs dereferencing (i.e, calling either \c operator*() or \c
  /// operator->() ).
  ///
  /// \return constant reference to the current entry.
  const T &operator*(void) noexcept {
    std::call_once(*FirstValue, [&](void) { ++(*this); });
    return Current;
  }

  /// Returns a constant pointer to the current entry.
  ///
  /// \note Automatically increments the iterator to read in the first value
  /// upon the firs dereferencing (i.e, calling either \c operator*() or \c
  /// operator->() ).
  ///
  /// \return constant pointer to the current entry.
  const T *operator->(void) noexcept {
    std::call_once(*FirstValue, [&](void) { ++(*this); });
    return &Current;
  }

  /// Tells if \p this object is equal to another one.
  ///
  /// Two \c rosa::mqtt::MQTTIterator instances are equal if and only if they
  /// are the same or both are empty.
  ///
  /// \param RHS other object to compare to
  ///
  /// \return whether \p this object is equal with \p RHS
  bool operator==(const MQTTIterator &RHS) const noexcept {
    return this == &RHS || (this->Empty && RHS.Empty);
  }

  /// Tells if \p this object is not equal to another one.
  ///
  /// \see rosa::mqtt::MQTTIterator::operator==
  ///
  /// \param RHS other object to compare to
  ///
  /// \return whether \p this object is not equal with \p RHS.
  bool operator!=(const MQTTIterator &RHS) const noexcept {
    return !((*this) == RHS);
  }

private:
  const bool Empty;
  const bool Blocking;
  const std::shared_ptr<std::once_flag> FirstValue;
  T Current;
  const std::shared_ptr<concurrent_queue<T>> Buffer;
  const std::shared_ptr<MQTTHandler<T>> Handler;
};

} // End namespace mqtt
} // End namespace rosa

#endif // ROSA_SUPPORT_MQTT_MQTTREADER_HPP
