Just been experimenting with QAMQP, so far it's been a very easy introduction to AMQP on C++. The project I'm using AMQP for is a crude SCADA system for energy management. So we've got energy meters being polled (Modbus or BACnet) with data being collected and reported via AMQP messaging.
For testing purposes, I've done a dummy driver. It uses the rand()
function out of stdlib.h
to generate dummy readings. Message content uses YAML format.
I have some existing code using Python (and the Pika library) to act as a client, and I'm implementing the server in C++ for speed. One thing I observe is that sometimes QAMQP seems to get messages mixed up, so one message's content gets appended to another unrelated message sent to the same queue. (In this case, both are for an energy meter named "B79_04_RT"; so sent to a fan-out exchange named "entity.rq.B79_04_RT".)
1427720769 DEBUG AMQPQueueHandler.amq.gen-b20F7zyH9m-osJS8uSLQuA : New message received.
1427720769 DEBUG MessageParser : Content-type is text/x-yaml; charset=utf-8
1427720769 DEBUG MessageParser : Message payload:
_debug_: {created: ! '2015-03-30 13:06:07.446333+00:00', sent: ! '2015-03-30 13:06:08.062488+00:00'}
attributes: {FREQ: ! '2015-03-30 14:06:07.439992+00:00', I_AVG_TOT: ! '2015-03-30
14:06:07.445290+00:00', I_PH4: ! '2015-03-30 14:06:07.441783+00:00', I_PHA: &id001 ! '2015-03-30
14:06:07.428174+00:00', I_PHB: ! '2015-03-30 14:06:07.429421+00:00', I_PHC: ! '2015-03-30
14:06:07.428822+00:00', KVAH_EXP: ! '2015-03-30 14:06:07.434671+00:00', KVAH_IMP: ! '2015-03-30
14:06:07.431752+00:00', KVARH_CAP_EXP: ! '2015-03-30 14:06:07.444093+00:00', KVARH_CAP_IMP: ! '2015-03-30
14:06:07.441202+00:00', KVARH_IND_EXP: ! '2015-03-30 14:06:07.432938+00:00', KVARH_IND_IMP: ! '2015-03-30
14:06:07.437034+00:00', KVAR_PHA: ! '2015-03-30 14:06:07.439386+00:00', KVAR_PHB: ! '2015-03-30
14:06:07.438808+00:00', KVAR_PHC: ! '2015-03-30 14:06:07.438191+00:00', KVA_TOT: ! '2015-03-30
14:06:07.442939+00:00', KWH_EXP: ! '2015-03-30 14:06:07.442358+00:00', KWH_IMP: ! '2015-03-30
14:06:07.435252+00:00', KW_PHA: ! '2015-03-30 14:06:07.443510+00:00', KW_PHB: ! '2015-03-30
14:06:07.444674+00:00', KW_PHC: ! '2015-03-30 14:06:07.431175+00:00', KW_TOT: ! '2015-03-30
14:06:07.440578+00:00', MAX_DEMAND: ! '2015-03-30 14:06:07.430599+00:00', PF_PHA: ! '2015-03-30
14:06:07.432328+00:00', PF_PHB: ! '2015-03-30 14:06:07.434090+00:00', PF_PHC: ! '2015-03-30
14:06:07.433517+00:00', PF_TOT: ! '2015-03-30 14:06:07.430015+00:00', V_PHA: ! '2015-03-30
14:06:07.437617+00:00', V_PHB: ! '2015-03-30 14:06:07.435830+00:00', V_PHC: ! '2015-03-30
14:06:07.436406+00:00'}
deadline: *id001
entity: B79_04_RT
priority: 127
type: demand_add
_debug_: {created: ! '2015-03-30 13:06:09.075516+00:00', sent: ! '2015-03-30 13:06:09.493511+00:00'}
attributes: [FREQ, I_AVG_TOT, I_PH4, I_PHA, I_PHB, I_PHC, KVAH_EXP, KVAH_IMP, KVARH_CAP_EXP,
KVARH_CAP_IMP, KVARH_IND_EXP, KVARH_IND_IMP, KVAR_PHA, KVAR_PHB, KVAR_PHC, KVA_TOT,
KWH_EXP, KWH_IMP, KW_PHA, KW_PHB, KW_PHC, KW_TOT, MAX_DEMAND, PF_PHA, PF_PHB, PF_PHC,
PF_TOT, V_PHA, V_PHB, V_PHC]
deadline: 2015-03-30 13:07:06.700407+00:00
entity: B79_04_RT
priority: 127
type: rt_read
1427720769 DEBUG MessageParser RtReadRequest: Bad incoming message: yaml-cpp: error at line 0, column 0: bad conversion
1427720769 DEBUG MessageParser : Message at 0x8e1d730 has type UNDEFINED???
1427720769 DEBUG AMQPQueueHandler.amq.gen-b20F7zyH9m-osJS8uSLQuA : Bad message received. Dropped.
void AMQPQueueHandler::messageReceived()
{
QAmqpMessage raw_msg(this->queue->dequeue());
log4cpp::Category& log(this->getLog());
log.debugStream()
<< "New message received.";
try {
BaseMessage* msg = BaseMessage::fromMessage(raw_msg);
if (msg != NULL) {
emit messageReceived(
this->ref(),
MessageRef(msg));
} else {
log.debugStream()
<< "Bad message received. Dropped.";
}
} catch (std::exception& ex) {
log.errorStream()
<< "Bad message received: "
<< ex.what();
}
}
/* ... */
/*
* Note, the charset is given here as normally the default is ISO8859-1
* for text/... types. We use x-yaml because at time of writing YAML was
* not an official MIME type.
*/
static const QString CONTENT_TYPE("text/x-yaml; charset=utf-8");
static const QString ALT_CONTENT_TYPE("application/x-yaml");
BaseMessage* BaseMessage::fromMessage(const QAmqpMessage& amqp_msg)
{
log4cpp::Category& log = log4cpp::Category::getInstance(
"MessageParser");
try {
/* Verify all the required fields are present. */
if (!amqp_msg.hasProperty(QAmqpMessage::CorrelationId)) {
log.debugStream()
<< "No correlation ID";
return NULL;
}
if (!amqp_msg.hasProperty(QAmqpMessage::MessageId)) {
log.debugStream()
<< "No message ID";
return NULL;
}
if (!amqp_msg.hasProperty(QAmqpMessage::ContentType)) {
log.debugStream()
<< "No content-type";
return NULL;
}
/* Verify that the content type is correct */
QString content_type(amqp_msg.property(
QAmqpMessage::ContentType).toString());
log.debugStream() << "Content-type is " << content_type;
if ((content_type != CONTENT_TYPE) &&
(content_type != ALT_CONTENT_TYPE))
return NULL;
/* Decode the message body */
log.debugStream() << "Message payload:\n---\n"
<< amqp_msg.payload().constData()
<< "\n---";
YAML::Node body = YAML::Load(amqp_msg.payload().constData());
if (!body.IsMap())
return NULL;
/* What message type do we have? */
MessageType type = body["type"].as<MessageType>();
BaseMessage* msg = NULL;
switch(type) {
/* Diagnostics */
case MSG_TYPE_PING:
msg = new PingRequest(amqp_msg, body);
break;
case MSG_TYPE_PING_RES:
msg = new PingResponse(amqp_msg, body);
break;
/* Namespace queries */
case MSG_TYPE_NS_SEARCH:
msg = new NamespaceSearchRequest(amqp_msg, body);
break;
case MSG_TYPE_NS_SEARCH_RES:
msg = new NamespaceSearchResponse(amqp_msg, body);
break;
/* Configuration */
case MSG_TYPE_CFG_INFO:
msg = new CfgInfoRequest(amqp_msg, body);
break;
case MSG_TYPE_CFG_COC:
msg = new CfgChangeOfConfigMessage(amqp_msg, body);
break;
case MSG_TYPE_CFG_INFO_RES:
msg = new CfgInfoResponse(amqp_msg, body);
break;
/* Demand management */
case MSG_TYPE_DEMAND_ADD:
msg = new DemandAddRequest(amqp_msg, body);
break;
case MSG_TYPE_DEMAND_RM:
msg = new DemandRmRequest(amqp_msg, body);
break;
case MSG_TYPE_DEMAND_RES:
msg = new DemandResponse(amqp_msg, body);
break;
/* Real-time */
case MSG_TYPE_RT_READ:
msg = new RtReadRequest(amqp_msg, body);
break;
case MSG_TYPE_RT_READ_RES:
msg = new RtReadResponse(amqp_msg, body);
break;
case MSG_TYPE_RT_COS:
msg = new RtChangeOfStateMessage(amqp_msg, body);
break;
case MSG_TYPE_RT_WRITE:
msg = new RtWriteRequest(amqp_msg, body);
break;
case MSG_TYPE_RT_WRITE_RES:
msg = new RtWriteResponse(amqp_msg, body);
break;
/* Historical: TODO */
/* Events: TODO */
default:
log.debugStream()
<< "Unhandled message type: "
<< body["type"].as<QString>();
return NULL;
}
log.debugStream()
<< "Message at "
<< (void*)msg
<< " has type "
<< msg->type;
if (msg->type == MSG_TYPE_INVALID) {
delete(msg);
msg = NULL;
}
return msg;
} catch (YAML::Exception& ex) {
/* Conversion fails */
log.debugStream()
<< "YAML error: "
<< ex.what();
return NULL;
}
}
Somehow, a real-time read request got mixed up with a demand-add request. It could be a timing issue, my application is single-threaded however RabbitMQ is running on a dual-core industrial computer. The Python code inter-operates with other Python scripts without issues, so far this problem has been unique to QAMQP.