Sample code for publisher
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
#define MQTT_HOSTNAME "localhost"
#define MQTT_PORT 1883
#define MQTT_TOPIC "/brook"
struct brook_obj {
unsigned long long published;
unsigned long long connected;
unsigned long long disconnected;
unsigned long long loged;
};
static void connect_cb(struct mosquitto *mosq, void *obj, int rc)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->connected++;
printf("%s(), connect:%llu, published:%llu, loged:%llu, rc:%d\n",
__FUNCTION__, bobj->connected, bobj->published, bobj->loged, rc);
}
static void publish_cb(struct mosquitto *mosq, void *obj, int mid)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->published++;
printf("%s(), connect:%llu, published:%llu, loged:%llu, mid:%d\n",
__FUNCTION__, bobj->connected, bobj->published, bobj->loged, mid);
}
static void log_cb(struct mosquitto *mosq, void *obj, int level, const char *str)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->loged++;
printf("%s(), connect:%llu, published:%llu, loged:%llu, level:%d,\n\tstr:%s\n",
__FUNCTION__, bobj->connected, bobj->published, bobj->loged, level, str);
}
static void disconnect_cb(struct mosquitto *mosq, void *obj, int rc)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->disconnected++;
printf("%s(), connect:%llu, published:%llu, loged:%llu, disconnected:%llu\n",
__FUNCTION__, bobj->connected, bobj->published, bobj->loged, bobj->disconnected);
}
int main (int argc, char **argv)
{
char clientid[24], text[20];
struct mosquitto *mosq = NULL;
unsigned long long i;
int rc, major, minor, rev;
struct brook_obj brook_obj = {};
mosquitto_lib_init();
rc = mosquitto_lib_version(&major, &minor, &rev);
fprintf(stdout, "rc:%d, major:%d, minor:%d, rev:%d\n",
rc, major, minor, rev);
memset(clientid, 0, sizeof(clientid));
snprintf(clientid, sizeof(clientid) - 1, "cid-pub-%d", getpid());
mosq = mosquitto_new(clientid, true, &brook_obj);
if (!mosq) {
fprintf (stderr, "new mosq failed \n");
exit(-1);
}
rc = mosquitto_tls_opts_set(mosq, 1, "tlsv1", NULL);
if (rc) {
fprintf (stderr, "set tls failed %d\n", rc);
exit (-1);
}
rc = mosquitto_tls_set(mosq, "ca/client/ca.crt", NULL, "ca/client/client.crt", "ca/client/client.key", NULL);
if (rc) {
fprintf (stderr, "set tls failed %d\n", rc);
exit (-1);
}
rc = mosquitto_tls_insecure_set(mosq, true);
if (rc) {
fprintf (stderr, "set insecure failed %d\n", rc);
exit (-1);
}
mosquitto_connect_callback_set(mosq, connect_cb);
mosquitto_publish_callback_set(mosq, publish_cb);
mosquitto_log_callback_set(mosq, log_cb);
mosquitto_disconnect_callback_set(mosq, disconnect_cb);
rc = mosquitto_connect(mosq, MQTT_HOSTNAME, MQTT_PORT, 60);
if (rc) {
fprintf (stderr, "Can't connect to Mosquitto server. %d\n", rc);
exit (-1);
}
for (i = 0; i < 10 ;i++) {
sprintf(text, "%llu", i);
printf("send %d, %s\n", (int)strlen(text), text);
rc = mosquitto_publish(mosq, NULL, MQTT_TOPIC, strlen(text), text, 2, false);
if (rc) {
fprintf (stderr, "Can't publish to Mosquitto server\n");
exit (-1);
}
if (!(i % 3)) {
do {
rc = mosquitto_loop(mosq, 3, 1);
fprintf (stdout, "mosquitto_loop %d, published:%llu, i:%llu\n",
rc, brook_obj.published, i);
} while(rc == 0 && brook_obj.published < i);
}
}
do {
rc = mosquitto_loop(mosq, 3, 1);
fprintf (stdout, "mosquitto_loop %d, published:%llu, i:%llu\n",
rc, brook_obj.published, i);
} while(rc == 0 && brook_obj.published != i);
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
Sample code subscriber
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <mosquitto.h>
#define MQTT_HOSTNAME "localhost"
#define MQTT_PORT 1883
#define MQTT_TOPIC "/brook/#"
struct brook_obj {
unsigned long long msgs;
unsigned long long connected;
unsigned long long disconnected;
unsigned long long loged;
unsigned long long subscribed;
unsigned long long unsubscribed;
};
static void connect_cb(struct mosquitto *mosq, void *obj, int rc)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->connected++;
printf("%s(), connect:%llu, msgs:%llu, loged:%llu, rc:%d\n",
__FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, rc);
}
static void log_cb(struct mosquitto *mosq, void *obj, int level, const char *str)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->loged++;
printf("%s(), connect:%llu, msgs:%llu, loged:%llu, level:%d,\n\tstr:%s\n",
__FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, level, str);
}
static void disconnect_cb(struct mosquitto *mosq, void *obj, int rc)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->disconnected++;
printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu\n",
__FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected);
}
static void message_cb(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->msgs++;
bool match = 0;
static unsigned long long i = 0, j = 0;
printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu\n",
__FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected);
printf("%s(), got message mid:%d, '%.*s' for topic '%s'\n",
__FUNCTION__, message->mid, message->payloadlen, (char*) message->payload, message->topic);
mosquitto_topic_matches_sub(MQTT_TOPIC, message->topic, &match);
if (match) {
printf("got matched topic to %s\n", MQTT_TOPIC);
}
}
static void unsubscribe_cb(struct mosquitto *mosq, void *obj, int mid)
{
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->unsubscribed++;
printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu, unsubscribed:%llu\n",
__FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected,
bobj->unsubscribed);
}
static void subscribe_cb(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
int i;
struct brook_obj *bobj = (struct brook_obj*) obj;
bobj->subscribed++;
printf("%s(), mid:%d, qos_count:%d\n",
__FUNCTION__, mid, qos_count);
for (i = 0; i < qos_count; i++) {
printf("\tQ[%d]:%d\n", i, granted_qos[i]);
}
printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu, subscribed:%llu\n",
__FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected,
bobj->subscribed);
}
int main(int argc, char *argv[])
{
char clientid[24];
struct mosquitto *mosq;
int rc = 0;
struct brook_obj brook_obj = {};
mosquitto_lib_init();
memset(clientid, 0, sizeof(clientid));
snprintf(clientid, sizeof(clientid) - 1, "cid-sub-%d", getpid());
mosq = mosquitto_new(clientid, true, &brook_obj);
if (!mosq) {
fprintf (stderr, "new mosq failed\n");
exit(-1);
}
rc = mosquitto_tls_opts_set(mosq, 1, "tlsv1", NULL);
if (rc) {
fprintf (stderr, "set tls opt failed %d\n", rc);
exit(-1);
}
rc = mosquitto_tls_insecure_set(mosq, true);
if (rc) {
fprintf (stderr, "set insecure failed %d\n", rc);
exit(-1);
}
rc = mosquitto_tls_set(mosq, "ca/client/ca.crt", NULL, "ca/client/client.crt", "ca/client/client.key", NULL);
if (rc) {
fprintf (stderr, "set tls failed %d\n", rc);
exit(-1);
}
mosquitto_connect_callback_set(mosq, connect_cb);
mosquitto_log_callback_set(mosq, log_cb);
mosquitto_subscribe_callback_set(mosq, subscribe_cb);
mosquitto_unsubscribe_callback_set(mosq, unsubscribe_cb);
mosquitto_message_callback_set(mosq, message_cb);
mosquitto_disconnect_callback_set(mosq, disconnect_cb);
rc = mosquitto_connect(mosq, MQTT_HOSTNAME, MQTT_PORT, 60);
if (rc) {
fprintf(stderr, "mosquitto_connect failed. %d\n", rc);
exit(-1);
}
mosquitto_subscribe(mosq, NULL, MQTT_TOPIC, 1);
do {
rc = mosquitto_loop(mosq, 3000, 1);
fprintf(stdout, "mosquitto_loop %d, msgs%llu\n",
rc, brook_obj.msgs);
} while(rc == 0 && brook_obj.msgs < 5);
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return rc;
}
沒有留言:
張貼留言