Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F10709352
MQTTReader.hpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Size
11 KB
Referenced Files
None
Subscribers
None
MQTTReader.hpp
View Options
//===-- rosa/support/MQTT/MQTTReader.hpp --------------------------*- C++
//-*-===//
//
// The RoSA Framework
//
// Distributed under the terms and conditions of the Boost Software License 1.0.
// See accompanying file LICENSE.
//
// If you did not receive a copy of the license file, see
// http://www.boost.org/LICENSE_1_0.txt.
//
//===----------------------------------------------------------------------===//
///
/// \file rosa/support/MQTT/MQTTReader.hpp
///
/// \authors Benedikt Tutzer (benedikt.tutzer@tuwien.ac.at),
/// Maximilian Götzinger (maximilian.goetzinger@tuwien.ac.at)
///
/// \date 2020
///
/// \brief Facitilities to read from MQTT brokers.
///
//===----------------------------------------------------------------------===//
#ifndef ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#define ROSA_SUPPORT_MQTT_MQTTREADER_HPP
#include
"mqtt/async_client.h"
#include
"rosa/support/debug.hpp"
#include
"rosa/support/sequence.hpp"
#include
"rosa/support/terminal_colors.h"
#include
<algorithm>
#include
<iostream>
#include
<map>
#include
<set>
#include
<sstream>
#include
<thread>
#include
<vector>
#include
<queue>
namespace
rosa
{
namespace
MQTT
{
// @TODO this is a copy of CSVReader.hpp. Move this functionalities to common
// file
/// Anonymous namespace providing implementation details for
/// \c rosa::csv::CSVIterator, consider it private.
namespace
{
/// Helper function to print an error message in red color to the terminal and
/// exit from the application.
///
/// \note The function never returns as it calles `exit()`.
///
/// \param Error error message
/// \param ExitCode exit code to return from the application
void
logErrorAndExit
(
const
std
::
string
&
Error
,
const
int
ExitCode
)
{
LOG_ERROR_STREAM
<<
rosa
::
terminal
::
Color
::
Red
<<
Error
<<
rosa
::
terminal
::
Color
::
Default
<<
std
::
endl
;
exit
(
ExitCode
);
}
/// Provides facility for parsing one value from a string.
///
/// \tparam T type of value to parse
/// \tparam IsSignedInt if \p T is a signed integral type, always use default
/// \tparam IsUnsignedInt if \p T is an unsigned integral type, always use
/// default
/// \tparam IsFloat if \p T is a floating-point type, always use default
/// \tparam IsString if \p T is \c std::string, always use default
///
/// \note Specializations of this struct are provided for arithmentic types
/// and \c std::string.
template
<
typename
T
,
bool
IsSignedInt
=
(
std
::
is_integral
<
T
>::
value
&&
std
::
is_signed
<
T
>::
value
),
bool
IsUnsignedInt
=
(
std
::
is_integral
<
T
>::
value
&&
std
::
is_unsigned
<
T
>::
value
),
bool
IsFloat
=
std
::
is_floating_point
<
T
>::
value
,
bool
IsString
=
std
::
is_same
<
T
,
std
::
string
>::
value
>
struct
ValueParser
{
///
///
/// \param Cell the \c std::string to parse
///
/// \return the parsed value
///
/// \note The function silently fails if cannot parse \p Cell for type \p T.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
;
};
template
<
typename
T
>
struct
ValueParser
<
T
,
true
,
false
,
false
,
false
>
{
STATIC_ASSERT
((
std
::
is_integral
<
T
>::
value
&&
std
::
is_signed
<
T
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
static_cast
<
T
>
(
std
::
stoll
(
Cell
));
}
};
template
<
typename
T
>
struct
ValueParser
<
T
,
false
,
true
,
false
,
false
>
{
STATIC_ASSERT
((
std
::
is_integral
<
T
>::
value
&&
std
::
is_unsigned
<
T
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
static_cast
<
T
>
(
std
::
stoull
(
Cell
));
}
};
template
<
typename
T
>
struct
ValueParser
<
T
,
false
,
false
,
true
,
false
>
{
STATIC_ASSERT
((
std
::
is_floating_point
<
T
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
static_cast
<
T
>
(
std
::
stold
(
Cell
));
}
};
template
<
typename
T
>
struct
ValueParser
<
T
,
false
,
false
,
false
,
true
>
{
STATIC_ASSERT
((
std
::
is_same
<
T
,
std
::
string
>::
value
),
"wrong type"
);
// Sanity check.
static
T
parse
(
const
std
::
string
&
Cell
)
noexcept
{
return
Cell
;
}
};
}
// End namespace
/**
* Local callback & listener class for use with the client connection.
* This implementation is to receive messages but the mqtt::callback interface
* allows further actions to be defined.
*/
template
<
typename
T
>
class
MQTTCallback
:
public
virtual
mqtt
::
callback
{
/** Callback for when a message arrives.
* @param Msg Pointer for the MQTT message
**/
void
message_arrived
(
mqtt
::
const_message_ptr
Msg
)
override
{
std
::
cout
<<
"New value"
<<
std
::
endl
;
std
::
string
Topic
=
Msg
->
get_topic
();
std
::
string
Message
=
Msg
->
to_string
();
// LOG_INFO_STREAM << "[Message @ " << Topic << "] " << Message <<
// std::endl;
std
::
cout
<<
"Got value "
<<
parser
.
parse
(
Message
)
<<
std
::
endl
;
buffer
->
push
(
parser
.
parse
(
Message
));
*
newValue
=
true
;
}
private
:
std
::
shared_ptr
<
std
::
queue
<
T
>>
buffer
;
std
::
shared_ptr
<
bool
>
newValue
;
ValueParser
<
T
>
parser
;
public
:
/* Constructor */
MQTTCallback
(
std
::
shared_ptr
<
std
::
queue
<
T
>>
buffer
,
std
::
shared_ptr
<
bool
>
newValue
)
{
this
->
buffer
=
buffer
;
this
->
newValue
=
newValue
;
}
};
/// Provides `InputIterator` features for iterating over a MQTT file.
///
/// The iterator parses rows into `std::tuple` values and iterates over the
/// file row by row.
///
/// \tparam T type of values stored in one row of the MQTT file
///
/// \note The iterator expects each row to consists of fields matching \p Ts.
///
/// \note The implementation relies on \c rosa::MQTT::MQTTRow, which in turn
/// relies on \c rosa::MQTT::MQTTRowParser, which is implemented only for
/// `arithmetic` types -- signed and unsigned integral types and floating-point
/// types -- and for \c std::string. Those are the valid values for \p Ts
template
<
typename
T
>
class
MQTTIterator
:
public
virtual
mqtt
::
callback
{
public
:
/// \defgroup MQTTIteratorTypedefs Typedefs of rosa::MQTT::MQTTIterator
///
/// Standard `typedef`s for iterators.
///
///@{
typedef
std
::
input_iterator_tag
iterator_category
;
///< Category of the iterator.
typedef
T
value_type
;
///< Type of values iterated over.
typedef
std
::
size_t
difference_type
;
///< Type to identify distance.
typedef
T
*
pointer
;
///< Pointer to the type iterated over.
typedef
T
&
reference
;
///< Reference to the type iterated over.
///@}
/// Creates a new instance.
///
/// \param [in,out] S input stream to iterate over
/// \param SkipRows the number of rows you want to skip only once at
/// the beginning of the file.
/// If you have an header in the file, it is supposed to be
/// the first row, and it will be always read out. But after
/// this header the next number of Rows will be skipped.
/// \param HeaderInfo is used to know wheter the file contains an
/// header row or not.
/// The header has to be in the first row.
/// \param Delimiter is the separator between the differnt values of
/// the MQTT file.
MQTTIterator
(
std
::
string
&
MQTTTopic
,
std
::
string
ServerHost
=
"localhost"
,
uint16_t
ServerPort
=
1883
)
{
this
->
MQTTTopic
=
MQTTTopic
;
this
->
ServerHost
=
ServerHost
;
this
->
ServerPort
=
ServerPort
;
// connect and register callback
try
{
std
::
stringstream
ss
;
ss
<<
"tcp://"
<<
ServerHost
<<
":"
<<
ServerPort
;
const
std
::
string
ServerURI
=
ss
.
str
();
LOG_INFO_STREAM
<<
"Initializing for "
<<
ServerURI
<<
std
::
endl
;
Client
=
std
::
make_shared
<
mqtt
::
async_client
>
(
ServerURI
,
""
);
LOG_INFO_STREAM
<<
"Connecting to server"
<<
std
::
endl
;
Client
->
connect
()
->
wait
();
LOG_INFO_STREAM
<<
"Receiving messages from topic '"
<<
MQTTTopic
<<
"' for a short while..."
<<
std
::
endl
;
Callback
=
std
::
make_shared
<
MQTTCallback
<
T
>>
(
this
->
buffer
,
this
->
newValue
);
Client
->
set_callback
(
*
Callback
);
Client
->
subscribe
(
this
->
MQTTTopic
,
{});
//@todo move to destructor
// Client.disconnect()->wait();
}
catch
(
const
mqtt
::
exception
&
e
)
{
std
::
cout
<<
"EXCEPTION!"
<<
std
::
endl
;
logErrorAndExit
(
e
.
what
(),
1
);
}
}
/// Creates an empty new instance.
MQTTIterator
(
void
)
noexcept
{}
const
void
waitForNewValue
()
const
noexcept
{
std
::
cout
<<
"waitForNewValue"
<<
std
::
endl
;
while
(
!
newValue
)
{
std
::
cout
<<
"+"
;
}
std
::
cout
<<
std
::
endl
<<
"got new value"
<<
std
::
endl
;
*
newValue
=
false
;
}
const
void
waitForAnyValue
()
const
noexcept
{
std
::
cout
<<
"waitForAnyValue"
<<
std
::
endl
;
while
(
buffer
->
empty
())
{
std
::
cout
<<
"-"
;
}
}
/// Pre-increment operator.
///
/// The implementation reads the next row. If the end of the input stream is
/// reached, the operator becomes empty and has no further effect.
///
/// \return \p this object after incrementing it.
MQTTIterator
&
operator
++
()
{
waitForNewValue
();
buffer
->
pop
();
return
*
this
;
}
/// Post-increment operator.
///
/// The implementation uses the pre-increment operator and returns a copy of
/// the original state of \p this object.
///
/// \return \p this object before incrementing it.
MQTTIterator
operator
++
(
int
)
{
waitForNewValue
();
MQTTIterator
Tmp
(
*
this
);
++
(
*
this
);
return
Tmp
;
}
/// Returns a constant reference to the current entry.
///
/// \note Should not dereference the iterator when it is empty.
///
/// \return constant reference to the current entry.
const
T
&
operator
*
(
void
)
const
noexcept
{
waitForAnyValue
();
return
buffer
->
front
();
}
/// Returns a constant pointer to the current entry.
///
/// \note Should not dereference the iterator when it is empty.
///
/// \return constant pointer to the current entry.
const
T
*
operator
->
(
void
)
const
noexcept
{
waitForAnyValue
();
return
&
(
buffer
->
front
());
}
/// Tells if \p this object is equal to another one.
///
/// Two \c rosa::MQTT::MQTTIterator instances are equal if and only if they
/// are the same or both are empty.
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is equal with \p RHS
bool
operator
==
(
const
MQTTIterator
&
RHS
)
const
noexcept
{
std
::
cout
<<
"equal operator"
<<
std
::
endl
;
return
(
this
==
&
RHS
);
// ||
//((this->buffer->empty()) && (RHS.buffer->empty())));
}
/// Tells if \p this object is not equal to another one.
///
/// \see rosa::MQTT::MQTTIterator::operator==
///
/// \param RHS other object to compare to
///
/// \return whether \p this object is not equal with \p RHS.
bool
operator
!=
(
const
MQTTIterator
&
RHS
)
const
noexcept
{
return
!
((
*
this
)
==
RHS
);
}
private
:
std
::
string
ServerHost
;
uint16_t
ServerPort
;
std
::
string
MQTTTopic
;
std
::
shared_ptr
<
std
::
queue
<
T
>>
buffer
=
std
::
make_shared
<
std
::
queue
<
T
>>
();
std
::
shared_ptr
<
bool
>
newValue
=
std
::
make_shared
<
bool
>
(
false
);
std
::
shared_ptr
<
mqtt
::
async_client
>
Client
;
std
::
shared_ptr
<
MQTTCallback
<
T
>>
Callback
;
};
}
// End namespace MQTT
}
// End namespace rosa
#endif
// ROSA_SUPPORT_MQTT_MQTTREADER_HPP
File Metadata
Details
Attached
Mime Type
text/x-c++
Expires
Sun, May 31, 5:05 PM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
328833
Default Alt Text
MQTTReader.hpp (11 KB)
Attached To
Mode
R20 SoC_Rosa_repo
Attached
Detach File
Event Timeline
Log In to Comment