频道栏目
首页 > 网络 > 云计算 > 正文
rabbit-mqtt+pmqtt协议+paho库尝鲜(ubuntu16.04)
2017-08-05 14:48:14      个评论    来源:anribras的专栏  
收藏   我要投稿

rabbit-mqtt+pmqtt协议+paho库尝鲜(ubuntu16.04)。

步骤

1 下载安装rabiit-mq mqtt-server
就用deb包
另需要
sudo apt-get install erlang

2 启动
sudo /etc/init.d/rabbitmq-server restart

3 mqtt-3.1协议支持
rabbitmq-plugins enable rabbitmq_mqtt

4 开启动web管理
rabbitmq-plugins enable rabbitmq_management

5 下载编译paho cpp

6 运行mqtt PUB/SUB的例子,用c++封装成hapoWapper, 结合protoc buf

编译:

g++ main.cpp -o -lpaho-mqtt3c --std=c++11

代码

用c++封装了下hapo的c调用库-hapoWapper.h

main.cpp

#include 
#include 

#include "hapoWapper.h"

using std::cout;
using std::endl;
using std::shared_ptr;

#define ADDRESS     "127.0.0.1:1883"
#define CLIENTID     "58daf13e258951000ab2b38f"
#define TOPIC1        "Good man"
#define TOPIC2        "Normal man"
const char* TOKEN  =  "YBljNeKVdOVYPFJo7EjztFVoya94XV4GBuvB9U85KJY=";

void fdelivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("Message with token value %d delivery confirmed\n", dt);
}
int fmsgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    int i;
    char* payloadptr;
    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: ");
    if(!strcmp(topicName,TOPIC1)) {
        printf("Got token size = %d, %s\n",message->payloadlen , (char*)message->payload);
    payloadptr = (char*)message->payload;
    for(i=0; ipayloadlen; i++)
    {
        putchar(*payloadptr++);
    }
    putchar('\n');
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}
void fconnlost(void *context, char *cause)
{
    printf("\nConnection lost\n");
    printf("     cause: %s\n", cause);
}


int main(int argc, char* argv[])
{
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    clientParams paras(ADDRESS,CLIENTID,conn_opts);

    cbGroups cbs(fconnlost,fdelivered,fmsgarrvd);
    pahoWapper wap(paras);

    wap.connect(cbs);

    printf("Subcribe topic:%s %s\n",TOPIC1,TOPIC2);
    wap.subscribe(TOPIC2,1);
    wap.subscribe(TOPIC1,1);

    //printf("Publish with topic:%s\n",TOPIC2);
    //auto words = "coooooooool";
    //wap.publish(TOPIC2,words,strlen(words),1);


    char ch;
    do {
        ch =getchar();
    } while(ch !='Q' && ch != 'q');


    return 0;
}

hapoWapper.h

#ifndef HAPO_WAPPER_H
#define HAPO_WAPPER_H


#include "MQTTClient.h"

#include 
#include 

typedef void (*connlost)(void* ,char*);
typedef void (*delivered)(void *context, MQTTClient_deliveryToken dt);
typedef int (* msgarrvd)(void *context, char *topicName, int topicLen, MQTTClient_message *message);

class clientParams {
    public:
        clientParams(){};
        ~clientParams(){};
        const char* _server_uri; 
        const char* _client_id;
        MQTTClient_connectOptions _conn_opts;
        clientParams(const char*a, const char* b, const MQTTClient_connectOptions & o):
            _server_uri(a),_client_id(b),_conn_opts(o){};
        clientParams & operator=(const clientParams & cp) {
            if(&cp == this) 
                return *this;
            _server_uri = cp._server_uri;
            _client_id = cp._client_id;
            _conn_opts = cp._conn_opts;
            return *this;
        };
};

class cbGroups {
    public:
    connlost connect_cb;
    delivered delivered_cb;
    msgarrvd msgarrvd_cb;
    cbGroups(connlost c, delivered d, msgarrvd m):
        connect_cb(c),delivered_cb(d),msgarrvd_cb(m) {};
};

class pahoWapper {
    public:
        pahoWapper(){};
        ~pahoWapper(){};
        pahoWapper(const clientParams & paras) {
            _paras = paras;

            printf("client %s access %s\n",_paras._client_id, _paras._server_uri);
            MQTTClient_create(&_client, _paras._server_uri, _paras._client_id,
                MQTTCLIENT_PERSISTENCE_NONE, NULL);

        };
        int connect(const cbGroups & cbs) {
            int rc;
            MQTTClient_setCallbacks(_client, NULL, cbs.connect_cb, cbs.msgarrvd_cb, cbs.delivered_cb);
            if ((rc = MQTTClient_connect(_client, &(_paras._conn_opts))) != MQTTCLIENT_SUCCESS) {
                printf("Failed to connect, return code %d\n", rc);
                exit(-1);
            }
            return rc;
        };
        void disconnect() {
            MQTTClient_disconnect(_client, 10000);
            MQTTClient_destroy(&_client);
        };

        void publish(const char* topic, const void* payload, int len , int qos_lvl) {

            MQTTClient_message pubmsg = MQTTClient_message_initializer;
            MQTTClient_deliveryToken token;
            pubmsg.payload = (void*)payload;
            pubmsg.payloadlen = len;
            pubmsg.qos = qos_lvl;
            pubmsg.retained = 0;
            MQTTClient_publishMessage(_client, topic, &pubmsg, &token);
            printf("Waiting for publication of %s\n" "on topic %s for client with ClientID: %s\n",
                    (char*)payload, topic, _paras._client_id);

        };

        void subscribe(const char* topic,int qos_lvl) {
            MQTTClient_subscribe(_client, topic, qos_lvl);
        };

    private:
        MQTTClient _client;
        clientParams _paras;
};


#endif
点击复制链接 与好友分享!回本站首页
上一篇:Flume直接到SparkStreaming的两种方式
下一篇:Spark转换(transform)与动作(action)一览
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站