2017年11月25日 星期六

MQTT sample code using libmosquitto


Sample code for publisher

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>

// Server connection parameters
#define MQTT_HOSTNAME "localhost"
#define MQTT_PORT 1883
#define MQTT_TOPIC "/brook"

struct brook_obj {
 unsigned long long published;
 unsigned long long connected;
 unsigned long long disconnected;
 unsigned long long loged;
};

/*
 mosq the mosquitto instance making the callback.
 obj the user data provided in mosquitto_new
 rc the return code of the connection response, one of:
*/
static void connect_cb(struct mosquitto *mosq, void *obj, int rc)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->connected++;
        printf("%s(), connect:%llu, published:%llu, loged:%llu, rc:%d\n",
  __FUNCTION__, bobj->connected, bobj->published, bobj->loged, rc);
}

static void publish_cb(struct mosquitto *mosq, void *obj, int mid)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->published++;
        printf("%s(), connect:%llu, published:%llu, loged:%llu, mid:%d\n",
  __FUNCTION__, bobj->connected, bobj->published, bobj->loged, mid);
}

static void log_cb(struct mosquitto *mosq, void *obj, int level, const char *str)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->loged++;
        printf("%s(), connect:%llu, published:%llu, loged:%llu, level:%d,\n\tstr:%s\n",
  __FUNCTION__, bobj->connected, bobj->published, bobj->loged, level, str);
}

static void disconnect_cb(struct mosquitto *mosq, void *obj, int rc)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->disconnected++;
        printf("%s(), connect:%llu, published:%llu, loged:%llu, disconnected:%llu\n",
  __FUNCTION__, bobj->connected, bobj->published, bobj->loged, bobj->disconnected);
}

/*
 * Start here
 */
int main (int argc, char **argv)
{
 char clientid[24], text[20];
 struct mosquitto *mosq = NULL;
 unsigned long long i;
 int rc, major, minor, rev;
 struct brook_obj brook_obj = {};

 /* Must be called before any other mosquitto functions.
    This function is not thread safe. */
  mosquitto_lib_init();

  rc = mosquitto_lib_version(&major, &minor, &rev);
  fprintf(stdout, "rc:%d, major:%d, minor:%d, rev:%d\n",
  rc, major, minor, rev);

  memset(clientid, 0, sizeof(clientid));
  snprintf(clientid, sizeof(clientid) - 1, "cid-pub-%d", getpid());
  mosq = mosquitto_new(clientid, true, &brook_obj);
 if (!mosq) {
  fprintf (stderr, "new mosq failed \n");
  exit(-1);
 }

  rc = mosquitto_tls_opts_set(mosq, 1, "tlsv1", NULL);
 if (rc) {
  fprintf (stderr, "set tls failed %d\n", rc);
  exit (-1);
 }

  rc = mosquitto_tls_set(mosq, "ca/client/ca.crt", NULL, "ca/client/client.crt", "ca/client/client.key", NULL);
  if (rc) {
  fprintf (stderr, "set tls failed %d\n", rc);
  exit (-1);
  }

   rc = mosquitto_tls_insecure_set(mosq, true);
  if (rc) {
  fprintf (stderr, "set insecure failed %d\n", rc);
  exit (-1);
  }

 // Set callback function
  mosquitto_connect_callback_set(mosq, connect_cb);
  mosquitto_publish_callback_set(mosq, publish_cb);
  mosquitto_log_callback_set(mosq, log_cb);
  mosquitto_disconnect_callback_set(mosq, disconnect_cb);

  rc = mosquitto_connect(mosq, MQTT_HOSTNAME, MQTT_PORT, 60);
  if (rc) {
   fprintf (stderr, "Can't connect to Mosquitto server. %d\n", rc);
   exit (-1);
  }

  for (i = 0; i < 10 ;i++) {
   sprintf(text, "%llu", i);
   printf("send %d, %s\n", (int)strlen(text), text);
   // Publish the message to the topic
  rc = mosquitto_publish(mosq, NULL, MQTT_TOPIC, strlen(text), text, 2, false);
  if (rc) {
   fprintf (stderr, "Can't publish to Mosquitto server\n");
   exit (-1);
  }
  if (!(i % 3)) {
   do {
    rc = mosquitto_loop(mosq, 3, 1);
    fprintf (stdout, "mosquitto_loop %d, published:%llu, i:%llu\n",
      rc, brook_obj.published, i);
   } while(rc == 0 && brook_obj.published < i);
  }
 }

 do {
  rc = mosquitto_loop(mosq, 3, 1);
  fprintf (stdout, "mosquitto_loop %d, published:%llu, i:%llu\n",
    rc, brook_obj.published, i);
 } while(rc == 0 && brook_obj.published != i);

 // Tidy up
 mosquitto_disconnect(mosq);
 mosquitto_destroy(mosq);
 mosquitto_lib_cleanup();

 return 0;
}



Sample code subscriber

#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>

#include <sys/types.h>
#include <unistd.h>

#include <mosquitto.h>

#define MQTT_HOSTNAME "localhost" 
#define MQTT_PORT 1883 
#define MQTT_TOPIC "/brook/#"

struct brook_obj {
 unsigned long long msgs;
 unsigned long long connected;
 unsigned long long disconnected;
 unsigned long long loged;
 unsigned long long subscribed;
 unsigned long long unsubscribed;
};

/*
 mosq the mosquitto instance making the callback.
 obj the user data provided in mosquitto_new
 rc the return code of the connection response, one of:
*/
static void connect_cb(struct mosquitto *mosq, void *obj, int rc)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->connected++;
 printf("%s(), connect:%llu, msgs:%llu, loged:%llu, rc:%d\n",
  __FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, rc);
}

static void log_cb(struct mosquitto *mosq, void *obj, int level, const char *str)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->loged++;
 printf("%s(), connect:%llu, msgs:%llu, loged:%llu, level:%d,\n\tstr:%s\n",
  __FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, level, str);
}

static void disconnect_cb(struct mosquitto *mosq, void *obj, int rc)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->disconnected++;
 printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu\n",
  __FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected);
}

static void message_cb(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->msgs++;
 bool match = 0;
 static unsigned long long i = 0, j = 0;

 printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu\n",
  __FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected);
 printf("%s(), got message mid:%d, '%.*s' for topic '%s'\n",
  __FUNCTION__, message->mid, message->payloadlen, (char*) message->payload, message->topic);

 mosquitto_topic_matches_sub(MQTT_TOPIC, message->topic, &match);
 if (match) {
  printf("got matched topic to %s\n", MQTT_TOPIC);
 }
}

static void unsubscribe_cb(struct mosquitto *mosq, void *obj, int mid)
{
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->unsubscribed++;
 printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu, unsubscribed:%llu\n",
  __FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected,
  bobj->unsubscribed);
}

static void subscribe_cb(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
 int i;
 struct brook_obj *bobj = (struct brook_obj*) obj;
 bobj->subscribed++;
 printf("%s(), mid:%d, qos_count:%d\n",
  __FUNCTION__, mid, qos_count);
 for (i = 0; i < qos_count; i++) {
  printf("\tQ[%d]:%d\n", i, granted_qos[i]);
 }
 printf("%s(), connect:%llu, msgs:%llu, loged:%llu, disconnected:%llu, subscribed:%llu\n",
  __FUNCTION__, bobj->connected, bobj->msgs, bobj->loged, bobj->disconnected,
  bobj->subscribed);
}

int main(int argc, char *argv[])
{
 char clientid[24];
 struct mosquitto *mosq;
 int rc = 0;
 struct brook_obj brook_obj = {};

 mosquitto_lib_init();

 memset(clientid, 0, sizeof(clientid));
 snprintf(clientid, sizeof(clientid) - 1, "cid-sub-%d", getpid());
 mosq = mosquitto_new(clientid, true, &brook_obj);
 if (!mosq) {
  fprintf (stderr, "new mosq failed\n");
  exit(-1);
 }

 rc = mosquitto_tls_opts_set(mosq, 1, "tlsv1", NULL);
 if (rc) {
  fprintf (stderr, "set tls opt failed %d\n", rc);
  exit(-1);
 }

 rc = mosquitto_tls_insecure_set(mosq, true);
 if (rc) {
  fprintf (stderr, "set insecure failed %d\n", rc);
  exit(-1);
 }

 rc = mosquitto_tls_set(mosq, "ca/client/ca.crt", NULL, "ca/client/client.crt", "ca/client/client.key", NULL);
 if (rc) {
  fprintf (stderr, "set tls failed %d\n", rc);
  exit(-1);
 }

 mosquitto_connect_callback_set(mosq, connect_cb);
 mosquitto_log_callback_set(mosq, log_cb);
 mosquitto_subscribe_callback_set(mosq, subscribe_cb);
 mosquitto_unsubscribe_callback_set(mosq, unsubscribe_cb);
 mosquitto_message_callback_set(mosq, message_cb);
 mosquitto_disconnect_callback_set(mosq, disconnect_cb);

 rc = mosquitto_connect(mosq, MQTT_HOSTNAME, MQTT_PORT, 60);
 if (rc) {
  fprintf(stderr, "mosquitto_connect failed. %d\n", rc);
  exit(-1);
 }

 mosquitto_subscribe(mosq, NULL, MQTT_TOPIC, 1);

 do {
  rc = mosquitto_loop(mosq, 3000, 1);
  fprintf(stdout, "mosquitto_loop %d, msgs%llu\n",
   rc, brook_obj.msgs);
 } while(rc == 0 && brook_obj.msgs < 5);

 mosquitto_disconnect(mosq);
 mosquitto_destroy(mosq);
 mosquitto_lib_cleanup();

 return rc;
}

2017年11月5日 星期日

MQTT


MQ Telemetry Transport (MQTT)網路文章已經很多,也很完善,大家可以參考本文章後面的reference,這裡快速帶過幾個重點:
MQTT是一個輕量級的基於broker的Publish/Subscribe messaging protocol,旨在實現開放,簡單,輕量級和易於實現。
協議的特點包括:
  • 提供一對多的訊息發布
  • 三種QoS
    "At most once" - message丟失或重複可能發生。
    "At least once" - 確保message到達,但可能會發生重複。
    "Exactly once" - 確保message只送達一次。

MQTT Architecture(Publish/Subscribe with broker)

MQTT有三種主要的組成元件,分別為Publisher、Subscriber以及Broker。 Publisher為訊息的來源,它會將訊息(Topic)發送給Broker,而Subscriber向Broker註冊,表示他們想要接收某Topic訊息;因此當有某個Publisher對Broker發送Topic訊息時,只要是有對此Topic註冊的Subscriber,都會收到此則訊息。

Topic wildcards

Topic為UTF-8編碼字串,最長長度為32,767字元,Topic支援階層式命名方式,如:"住家/客廳/溫度",階層的分隔符號為"/",所以前面這個Topic有三層架構,Topic可以透過萬用字元一次訂閱多個主題。但是這些萬用字元只能放在最後一層
"#"為Multi-level wildcard,可以包含零個以上的階層,如有人送出"a/b/c/d",那以下訂閱都可以被match "a/b/c/d","#","a/#","a/b/#","a/b/c/#","+/b/c/#",
"+"為Single-level wildcard,只能包含該層的Topic,如有人送出"a/b/c/d",那以下訂閱都可以被match “a/b/c/d","+/b/c/d","a/+/c/d","a/+/+/d","+/+/+/+",而以下這些事不能match的"a/b/c","b/+/c/d","+/+/+"

Deom the MQTT - mosquitto

請用apt-get install mosquitto mosquitto-clients安裝mosquitto套件
run the broker
brook@vista:~$ mosquitto -v
1511094611: mosquitto version 1.4.8 (build date Mon, 26 Jun 2017 09:31:02 +0100) starting
1511094611: Using default config.
1511094611: Opening ipv4 listen socket on port 1883.
1511094611: Opening ipv6 listen socket on port 1883.
1511094635: New connection from 127.0.0.1 on port 1883.
1511094635: New client connected from 127.0.0.1 as mosqsub/16408-jpr-Verit (c1, k60). 有人連上就會顯示
1511094635: Sending CONNACK to mosqsub/16408-jpr-Verit (0, 0)
1511094635: Received SUBSCRIBE from mosqsub/16408-jpr-Verit
1511094635:     /brook/L1 (QoS 0)
1511094635: mosqsub/16408-jpr-Verit 0 /brook/L1
1511094635: Sending SUBACK to mosqsub/16408-jpr-Verit
1511094659: New connection from 127.0.0.1 on port 1883.
1511094659: New client connected from 127.0.0.1 as mosqpub/16687-jpr-Verit (c1, k60).
1511094659: Sending CONNACK to mosqpub/16687-jpr-Verit (0, 0)
1511094659: Received PUBLISH from mosqpub/16687-jpr-Verit (d0, q0, r0, m0, '/brook/L1', ... (11 bytes))
1511094659: Sending PUBLISH to mosqsub/16408-jpr-Verit (d0, q0, r0, m0, '/brook/L1', ... (11 bytes))
1511094659: Received DISCONNECT from mosqpub/16687-jpr-Verit
1511094659: Client mosqpub/16687-jpr-Verit disconnected.

向Broker註冊topic "/brook/L1"
brook@vista:~$ mosquitto_sub -h localhost -t '/brook/L1'
test for L1有人向Broker推送/brook/L1訊息就會顯示

向Broker推送topic "/brook/L1"的訊息
brook@vista:~$ mosquitto_pub -h localhost -t '/brook/L1' -m 'test for L1'

mosquitto - Broker log notes

brook@vista:~$ mosquitto -v
1511094635: New client connected from 127.0.0.1 as mosqsub/16408-jpr-Verit (c1, k60). 

mosquitto-1.4.8/src/read_handle_server.c
521 if(context->username){
522   _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, 
                           "New client connected from %s as %s (c%d, k%d, u'%s').",
                            context->address, client_id, clean_session, 
                            context->keepalive, context->username);
523 }else{
524   _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE,
                           "New client connected from %s as %s (c%d, k%d).", 
                           context->address, client_id, clean_session,
                           context->keepalive);
525 }

1511094635: Sending CONNACK to mosqsub/16408-jpr-Verit (0, 0)
1511094635: Received SUBSCRIBE from mosqsub/16408-jpr-Verit
1511094635:     /brook/L1 (QoS 0)

man mqtt
QUALITY OF SERVICE
  MQTT defines three levels of Quality of Service (QoS). The QoS defines how hard the 
  broker/client will try to ensure that a message is received. Messages may be sent 
  at any QoS level, and clients may attempt to subscribe to topics at any QoS level.
  This means that the client chooses the maximum QoS it will receive. For example, 
  if a message is published at QoS 2 and a client is subscribed with QoS 0, the 
  message will be delivered to that client with QoS 0. If a second client is also 
  subscribed to the same topic, but with QoS 2, then it will receive the same message 
  but with QoS 2. For a second example, if a client is subscribed with QoS 2 and a 
  message is published on QoS 0, the client will receive it on QoS 0.

  Higher levels of QoS are more reliable, but involve higher latency and have higher bandwidth requirements.
    o 0: The broker/client will deliver the message once, with no confirmation.
    o 1: The broker/client will deliver the message at least once, with confirmation required.
    o 2: The broker/client will deliver the message exactly once by using a four step handshake.

1511094635: mosqsub/16408-jpr-Verit 0 /brook/L1
1511094635: Sending SUBACK to mosqsub/16408-jpr-Verit
1511094659: New connection from 127.0.0.1 on port 1883.
1511094659: New client connected from 127.0.0.1 as mosqpub/16687-jpr-Verit (c1, k60).
1511094659: Sending CONNACK to mosqpub/16687-jpr-Verit (0, 0)
1511094659: Received PUBLISH from mosqpub/16687-jpr-Verit (d0, q0, r0, m0, '/brook/L1', ... (11 bytes))

mosquitto-1.4.8/src/read_handle.c
217 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, 
                          "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
                          context->id, dup, qos, retain, mid, topic, (long)payloadlen);

1511094659: Sending PUBLISH to mosqsub/16408-jpr-Verit (d0, q0, r0, m0, '/brook/L1', ... (11 bytes))

mosquitto-1.4.8/lib/send_mosq.c
156 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, 
                          "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
                          mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);

1511094659: Received DISCONNECT from mosqpub/16687-jpr-Verit
1511094659: Client mosqpub/16687-jpr-Verit disconnected.



2017年7月29日 星期六

A pattern for state machine


State machine是很常見的應用/Pattern,這章節會根據下面的圖來實作State machine pattern


enum state {
 STATE_INITIAL = 0,
 STATE_1,
 STATE_2,
 STATE_3,
 STATE_4,
};

enum event {
 E1 = 1,
 E2,
 E3,
 E4,
};
先定義State Machine狀態與Event



struct instance_data {
 enum event evt;
 int data;
};

struct instance {
 enum state cur_state;
 struct instance_data data;
};
定義一個struct來儲存"現在狀態",Event與data



typedef enum state state_func_t(struct instance_data *data);

state_func_t* const state_table[] = {
 [STATE_INITIAL] = in_init,
 [STATE_1] = in_state_1,
 [STATE_2] = in_state_2,
 [STATE_3] = in_state_3,
 [STATE_4] = in_state_4,
};

void run_state(struct instance *inst)
{
 inst->cur_state = state_table[inst->cur_state](&(inst->data));
};

建立一個Table,將每一個state對應的function填入,run_state()會根據目前的State處理該event/data,並回傳下一個狀態



enum state in_state_1(struct instance_data *data)
{
 printf("%s(#%d): got EVT:%d\n", __FUNCTION__, __LINE__, data->evt);
 switch (data->evt) {
 case E1:
  printf("change to S2\n");
  return do_s2();
 case E2:
  printf("change to S3\n");
  return do_s3();
 default:
  printf("keep the same STATE && do nothing\n");
  return STATE_1;
 }
}

enum state in_state_2(struct instance_data *data)
{
 printf("%s(#%d): got EVT:%d\n", __FUNCTION__, __LINE__, data->evt);
 switch (data->evt) {
 case E3:
  printf("change to S3\n");
  return do_s3();
 default:
  printf("keep the same STATE && do s2 again\n");
  return do_s2(); // do s2 again
 }
}

enum state in_state_3(struct instance_data *data)
{
 printf("%s(#%d): got EVT:%d\n", __FUNCTION__, __LINE__, data->evt);
 switch (data->evt) {
 case E2:
  printf("change to S4\n");
  return do_s4();
 default:
  printf("keep the same STATE && do nothing\n");
  return STATE_3;
 }
}

enum state in_state_4(struct instance_data *data)
{
 printf("%s(#%d): got EVT:%d\n", __FUNCTION__, __LINE__, data->evt);
 switch (data->evt) {
 case E1:
  printf("change to S2\n");
  return do_s2();
 case E3:
  printf("change to S1\n");
  return do_s1();
 default:
  printf("keep the same STATE && do again\n");
  return do_s4();
 }
}
定義每一個狀態中的行為



enum state do_s1(void)
{
 printf("%s(#%d)\n", __FUNCTION__, __LINE__);
 return STATE_1;
}

enum state do_s2(void)
{
 printf("%s(#%d)\n", __FUNCTION__, __LINE__);
 return STATE_2;
}

enum state do_s3(void)
{
 printf("%s(#%d)\n", __FUNCTION__, __LINE__);
 return STATE_3;
}

enum state do_s4(void)
{
 printf("%s(#%d)\n", __FUNCTION__, __LINE__);
 return STATE_4;
}

enum state in_init(struct instance_data *data)
{
 printf("%s(#%d): do some init. E:%d\n", __FUNCTION__, __LINE__, data->evt);
 printf("change to S1\n");
 return STATE_1;
}
定義進入每一個狀態要執行的動作



int main( void ) {
 int ch;
 struct instance inst = {STATE_INITIAL, 0};
 while ( 1 ) {
  run_state(&inst);
  // do other program logic, run other state machines, etc
  printf("MENU: E1/E2/E3/E4\n");
  while(((ch = getc(stdin)) == '\n') || (ch <= '0') || (ch > '4'));
  inst.data.evt = ch - '0';
 }
}
main function模擬State Machine收到不同event(1~4)


以下是收到event 1, 2, 3, 4的執行結果





熱門文章