Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F10704180
MQTTReader.hpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Size
12 KB
Referenced Files
None
Subscribers
None
MQTTReader.hpp
View Options
//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
// The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at),
/// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at),
/// David Juhasz (david.juhasz@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//
#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#include
"rosa/support/concurrent_queue.hpp"
#include
"rosa/support/debug.hpp"
#include
"rosa/support/diagnostics.h"
#include
"rosa/support/log.h"
// NOTE: MSVC warns about some things in the following header.
ROSA_DISABLE_WARNING_PUSH
ROSA_DISABLE_WARNING_MSVC_C4100
ROSA_DISABLE_WARNING_MSVC_C4201
#include
"mqtt/async_client.h"
ROSA_DISABLE_WARNING_POP
#include
<iostream>
#include
<sstream>
namespace
rosa
{
namespace
mqtt
{
// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::mqtt::MQTTCallback, consider it private.
namespace
{
/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template
<
typename
T
,
bool
IsSignedInt
=
(
std
::
is_integral
<
T
>::
value
&&
std
::
is_signed
<
T
>::
value
),
bool
IsUnsignedInt
=
(
std
::
is_integral
<
T
>::
value
&&
std
::
is_unsigned
<
T
>::
value
),
bool
IsFloat
=
std
::
is_floating_point
<
T
>::
value
,
bool
IsString
=
std
::
is_same
<
T
,
std
::
string
>::
value
>
struct
ValueParser
{
///
///
/// \param Cell the \c std::string to parse
///
/// \return the parsed value
///
/// \note The function silently fails if cannot parse \p Cell for type \p T.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
;
};
template
<
typename
T
>
struct
ValueParser
<
T
,
true
,
false
,
false
,
false
>
{
STATIC_ASSERT
((
std
::
is_integral
<
T
>::
value
&&
std
::
is_signed
<
T
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
static_cast
<
T
>
(
std
::
stoll
(
Cell
));
}
};
template
<
typename
T
>
struct
ValueParser
<
T
,
false
,
true
,
false
,
false
>
{
STATIC_ASSERT
((
std
::
is_integral
<
T
>::
value
&&
std
::
is_unsigned
<
T
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
static_cast
<
T
>
(
std
::
stoull
(
Cell
));
}
};
template
<
typename
T
>
struct
ValueParser
<
T
,
false
,
false
,
true
,
false
>
{
STATIC_ASSERT
((
std
::
is_floating_point
<
T
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
static_cast
<
T
>
(
std
::
stold
(
Cell
));
}
};
template
<
typename
T
>
struct
ValueParser
<
T
,
false
,
false
,
false
,
true
>
{
STATIC_ASSERT
((
std
::
is_same
<
T
,
std
::
string
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
Cell
;
}
};
}
// End namespace
/**
* Helper class managing the connection to the MQTT broker, also serves as
* callback & listener for use with the client connection.
*/
template
<
typename
T
>
class
MQTTHandler
:
public
virtual
::
mqtt
::
callback
{
/** Callback for when a message arrives.
* @param Msg Pointer for the MQTT message
**/
void
message_arrived
(
::
mqtt
::
const_message_ptr
Msg
)
override
{
std
::
string
Message
=
Msg
->
to_string
();
const
auto
Value
=
ValueParser
<
T
>::
parse
(
Message
);
Buffer
->
push
(
Value
);
LOG_DEBUG_STREAM
<<
"Got value ["
<<
Msg
->
get_topic
()
<<
"] "
<<
Value
<<
std
::
endl
;
}
/**
* This method is called when the connection to the server is lost.
* @param Cause Why connection got lost
*/
void
connection_lost
(
const
std
::
string
&
Cause
)
override
{
LOG_INFO_STREAM
<<
"Connection to MQTT broker lost: "
<<
Cause
<<
std
::
endl
;
}
protected
:
/**
* This function establishes the connection to the MQTT broker according to
* parameters set in member fields by the constructor.
*/
void
establish_connection
(
void
)
{
ASSERT
(
!
Client
);
// Connect and register callback.
std
::
stringstream
ss
;
ss
<<
"tcp://"
<<
ServerHost
<<
":"
<<
ServerPort
;
const
std
::
string
ServerURI
=
ss
.
str
();
LOG_INFO_STREAM
<<
"Initializing for "
<<
ServerURI
<<
std
::
endl
;
Client
=
std
::
make_unique
<::
mqtt
::
async_client
>
(
ServerURI
,
""
);
LOG_INFO_STREAM
<<
"Connecting to server"
<<
std
::
endl
;
Client
->
connect
()
->
wait
();
Client
->
set_callback
(
*
this
);
Client
->
subscribe
(
MQTTTopic
,
{});
LOG_INFO_STREAM
<<
"Receiving messages from topic '"
<<
MQTTTopic
<<
"'"
<<
std
::
endl
;
}
private
:
const
std
::
string
ServerHost
;
const
uint16_t
ServerPort
;
const
std
::
string
MQTTTopic
;
const
std
::
shared_ptr
<
concurrent_queue
<
T
>>
Buffer
;
std
::
unique_ptr
<::
mqtt
::
async_client
>
Client
;
public
:
/// Constructor
///
/// \param MQTTTopic MQTT topic to subscribe to
/// \param ServerHost Hostname of MQTT broker to connect to
/// \param ServerPort Port number of MQTT broker to connect to
/// \param Buffer Container to put messages into
MQTTHandler
(
const
std
::
string
&
MQTTTopic
,
const
std
::
string
&
ServerHost
,
const
uint16_t
ServerPort
,
const
std
::
shared_ptr
<
concurrent_queue
<
T
>>
&
Buffer
)
noexcept
:
ServerHost
(
ServerHost
),
ServerPort
(
ServerPort
),
MQTTTopic
(
MQTTTopic
),
Buffer
(
Buffer
),
Client
(
nullptr
)
{
establish_connection
();
}
/// Destructor
virtual
~
MQTTHandler
(
void
)
{
// Can do cleanup here as necessary.
if
(
Client
)
{
LOG_INFO_STREAM
<<
"Disconnecting from MQTT broker."
<<
std
::
endl
;
// Would disconnect upon destruction here just to wait for it.
Client
->
disconnect
()
->
wait
();
}
}
};
/// Provides `InputIterator` features for iterating over messages published in
/// an MQTT topic.
///
/// \todo Make \c rosa::mqtt::MQTTIterator be able to recover connection in case
/// of any error.
///
/// \tparam T type of values stored in MQTT messages
///
///
/// \note The referred value of the iterator is initialized to a default
/// initialized instance of \p T upon creation. The first received message is
/// read in upon the first dereferencing of the iterator by automatically
/// incrementing the iterator before returning the referred value. The
/// automatic incrementation upon the first dereferencing happens no matter if
/// the iterator has already been incremented explicitly. The automatic
/// incrementation may block if the iterator has been created with blocking
/// behavior. On the other hand, it may leave the default value of \p T if the
/// instance has been created with non-blocking behavior. \see \c Blocking
/// for \c rosa:mqtt:MQTTIterator::MQTTIterator().
///
/// \note The iterator expects each MQTT message to match \p T
///
/// \note The implementation relies on \c rosa::mqtt::MQTTCallback, which in
/// turn relies on \c rosa::mqtt::ValueParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p T
template
<
typename
T
>
class
MQTTIterator
{
public
:
/// \defgroup MQTTIteratorTypedefs Typedefs of rosa::mqtt::MQTTIterator
///
/// Standard `typedef`s for iterators.
///
///@{
typedef
std
::
input_iterator_tag
iterator_category
;
///< Category of the iterator.
typedef
T
value_type
;
///< Type of values iterated over.
typedef
std
::
size_t
difference_type
;
///< Type to identify distance.
typedef
T
*
pointer
;
///< Pointer to the type iterated over.
typedef
T
&
reference
;
///< Reference to the type iterated over.
///@}
/// Creates a new instance.
///
/// \param MQTTTopic MQTT topic to subscribe to
/// \param Blocking Whether to block if next value is not arrived when
/// incrementing
/// \param ServerHost Hostname of MQTT broker to connect to
/// \param ServerPort Port number of MQTT broker to connect to
///
/// \note If \p Blocking, incrementing the iterator blocks in case the next
/// value has not arrived yet. If not \p Blocking, incrementing the iterator
/// has no effect when the next value is not arrived yet.
MQTTIterator
(
const
std
::
string
&
MQTTTopic
,
const
bool
Blocking
=
true
,
const
std
::
string
ServerHost
=
"localhost"
,
const
uint16_t
ServerPort
=
1883
)
noexcept
:
Empty
(
false
),
Blocking
(
Blocking
),
FirstValue
(
std
::
make_shared
<
std
::
once_flag
>
()),
Current
(),
Buffer
(
std
::
make_shared
<
concurrent_queue
<
T
>>
()),
Handler
(
std
::
make_shared
<
MQTTHandler
<
T
>>
(
MQTTTopic
,
ServerHost
,
ServerPort
,
Buffer
))
{}
/// Creates an empty new instance.
MQTTIterator
(
void
)
noexcept
:
Empty
(
true
),
Blocking
(
false
),
FirstValue
(
std
::
make_shared
<
std
::
once_flag
>
()),
Current
(),
Buffer
(
std
::
make_shared
<
concurrent_queue
<
T
>>
()),
Handler
(
nullptr
)
{}
/// Pre-increment operator.
///
/// The implementation reads the next value. The operation blocks if \c
/// Blocking and the next value has not arrived yet. If not \c Blocking and no
/// new value is ready yet, the oeprator has no effect.
///
/// \return \p this object after incrementing it.
MQTTIterator
&
operator
++
()
{
LOG_DEBUG_STREAM
<<
"Getting next value..."
<<
std
::
endl
;
if
(
Buffer
->
pop
(
Current
,
Blocking
)
==
concurrent_queue
<
T
>::
element_popped
)
{
LOG_DEBUG_STREAM
<<
"Got next value."
<<
std
::
endl
;
}
else
{
LOG_DEBUG_STREAM
<<
"Next value is not ready yet."
<<
std
::
endl
;
}
return
*
this
;
}
/// Post-increment operator.
///
/// The implementation uses the pre-increment operator and returns a copy of
/// the original state of \p this object.
///
/// \return \p this object before incrementing it.
MQTTIterator
operator
++
(
int
)
{
MQTTIterator
Tmp
(
*
this
);
++
(
*
this
);
return
Tmp
;
}
/// Returns a constant reference to the current entry.
///
/// \note Automatically increments the iterator to read in the first value
/// upon the firs dereferencing (i.e, calling either \c operator*() or \c
/// operator->() ).
///
/// \return constant reference to the current entry.
const
T
&
operator
*
(
void
)
noexcept
{
std
::
call_once
(
*
FirstValue
,
[
&
](
void
)
{
++
(
*
this
);
});
return
Current
;
}
/// Returns a constant pointer to the current entry.
///
/// \note Automatically increments the iterator to read in the first value
/// upon the firs dereferencing (i.e, calling either \c operator*() or \c
/// operator->() ).
///
/// \return constant pointer to the current entry.
const
T
*
operator
->
(
void
)
noexcept
{
std
::
call_once
(
*
FirstValue
,
[
&
](
void
)
{
++
(
*
this
);
});
return
&
Current
;
}
/// Tells if \p this object is equal to another one.
///
/// Two \c rosa::mqtt::MQTTIterator instances are equal if and only if they
/// are the same or both are empty.
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is equal with \p RHS
bool
operator
==
(
const
MQTTIterator
&
RHS
)
const
noexcept
{
return
this
==
&
RHS
||
(
this
->
Empty
&&
RHS
.
Empty
);
}
/// Tells if \p this object is not equal to another one.
///
/// \see rosa::mqtt::MQTTIterator::operator==
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is not equal with \p RHS.
bool
operator
!=
(
const
MQTTIterator
&
RHS
)
const
noexcept
{
return
!
((
*
this
)
==
RHS
);
}
private
:
const
bool
Empty
;
const
bool
Blocking
;
const
std
::
shared_ptr
<
std
::
once_flag
>
FirstValue
;
T
Current
;
const
std
::
shared_ptr
<
concurrent_queue
<
T
>>
Buffer
;
const
std
::
shared_ptr
<
MQTTHandler
<
T
>>
Handler
;
};
}
// End namespace mqtt
}
// End namespace rosa
#endif
// ROSA_SUPPORT_MQTT_MQTTREADER_HPP
File Metadata
Details
Attached
Mime Type
text/x-c++
Expires
Sun, May 31, 4:31 PM (1 d, 16 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
328604
Default Alt Text
MQTTReader.hpp (12 KB)
Attached To
Mode
R20 SoC_Rosa_repo
Attached
Detach File
Event Timeline
Log In to Comment