multi-thread programming設計很常見, 寫法上與功能上也都很相似, 最基本的幾個操作就是, create/start/stop/periodical run/destroy等等. 最好還能monitor這些thread狀況, 比如被執行幾次, 執行時間多久, 是否有dead lock等等, 於是就寫了這個pattern, 提供這類功能.
service-reg.h
#ifndef SERVICE_REG_H #define SERVICE_REG_H #include <sys/time.h> typedef void *(*srv_fp) (void *); char * srv_dump(char *buf, int sz); int _srv_reg(char *srv_name, srv_fp fp, char *fp_name, void *srv_data); #define srv_reg(srv_name, fp, srv_data) _srv_reg(srv_name, fp, #fp, srv_data) int srv_start(char *srv_name); // run forever without any delay int srv_stop(char *srv_name); int srv_unreg(char *srv_name); int srv_start_periodical(char *srv_name, unsigned long sec, unsigned long nsec); // run forever with delay #define SRV_NO_MEM -1 #endif
service-reg.c
#include <pthread.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <ctype.h> #include <time.h> #include <sys/time.h> #include <sys/timerfd.h> #include <stdint.h> #include <unistd.h> #include "service-reg.h" #include "lookup_table.h" #define SEC2NSEC 1000000000.0 struct srv_info { void *srv_data; char *srv_name; srv_fp fp; int in_fp; char *fp_name; unsigned run_cnt; pthread_t thread_id; /* ID returned by pthread_create() */ pthread_cond_t cond; pthread_mutex_t mutex; struct srv_info *next; volatile int running; /* 1 for running, 0 for stop */ struct timespec active_time; struct timespec last_enter; struct timespec last_exit; int tm_fd; /* used to delay */ }; struct srv_tab { pthread_mutex_t mutex; struct srv_info *list; unsigned int num_srv; } srv_tab = { .mutex = PTHREAD_MUTEX_INITIALIZER, .list = NULL, .num_srv = 0, }; static void _srv_update_active_time(struct srv_info *s) { clock_gettime(CLOCK_MONOTONIC, &(s->last_exit)); s->active_time.tv_sec += s->last_exit.tv_sec - s->last_enter.tv_sec; if (s->last_exit.tv_nsec > s->last_enter.tv_nsec) { s->active_time.tv_nsec += s->last_exit.tv_nsec - s->last_enter.tv_nsec; } else { s->active_time.tv_sec--; s->active_time.tv_nsec += SEC2NSEC + s->last_exit.tv_nsec - s->last_enter.tv_nsec; } if (s->active_time.tv_nsec >= SEC2NSEC) { s->active_time.tv_nsec -= SEC2NSEC; s->active_time.tv_sec++; } } static void *_srv_wrap(void *v) { struct srv_info *s = (struct srv_info *) v; while (1) { recheck: if (s->running == 0) { stoprun: pthread_cond_wait(&(s->cond), &(s->mutex)); goto recheck; } /* if timer is enabled, than wait */ again: if (s->tm_fd != -1) { uint64_t exp; ssize_t sz; sz = read(s->tm_fd, &exp, sizeof(exp)); if (sz != sizeof(exp)) { printf("timerfd_settime failed: s->tm_fd:%d, errno:%d/%s\n", s->tm_fd, errno, strerror(errno)); if (errno == EINTR) { goto again; } } if (s->running == 0) { goto stoprun; } } clock_gettime(CLOCK_MONOTONIC, &(s->last_enter)); s->in_fp = 1; s->fp(s->srv_data); s->in_fp = 0; _srv_update_active_time(s); s->run_cnt++; } return NULL; } int srv_init(void) { return 0; } char * srv_dump(char *buf, int sz) { struct srv_info *s; char *p = buf; int i = 0; struct timespec cur_clk; struct itimerspec cur_timer; clock_gettime(CLOCK_MONOTONIC, &cur_clk); if (buf == NULL) { return NULL; } memset(buf, 0, sz); pthread_mutex_lock(&(srv_tab.mutex)); s = srv_tab.list; p += snprintf(p, sz - (buf - p), "cur-clk: %f\n", cur_clk.tv_sec + cur_clk.tv_nsec/SEC2NSEC); p += snprintf(p, sz - (buf - p), "id srv_name running/in_fp\t\tfp/fname/dp\n"); p += snprintf(p, sz - (buf - p), "\t\t\trun-cnt/run-time\tenter-clk/leave-clk\n"); p += snprintf(p, sz - (buf - p), "\t\t\ttm_fd/next_expir/perodic\n"); while (s != NULL) { p += snprintf(p, sz - (buf - p), "%-8d%-16s%2d/%-2d\t\t%16p/%s/%-16p\t\t\t\n", i++, s->srv_name, s->running, s->in_fp, s->fp, s->fp_name, s->srv_data); p += snprintf(p, sz - (buf - p), "\t\t %10d/%-12f\t%12f/%-12f\n", s->run_cnt, s->active_time.tv_sec + s->active_time.tv_nsec/SEC2NSEC, s->last_enter.tv_sec + s->last_enter.tv_nsec/SEC2NSEC, s->last_exit.tv_sec + s->last_exit.tv_nsec/SEC2NSEC); if (s->tm_fd != -1) { if (timerfd_gettime(s->tm_fd, &cur_timer) < 0) { printf("timerfd_get failed: errno:%d/%s\n", errno, strerror(errno)); p += snprintf(p, sz - (buf - p), "\t\t %6d\n", s->tm_fd); } else { p += snprintf(p, sz - (buf - p), "\t\t %6d/%f/%-16f\n", s->tm_fd, cur_timer.it_value.tv_sec + cur_timer.it_value.tv_nsec/SEC2NSEC, cur_timer.it_interval.tv_sec + cur_timer.it_interval.tv_nsec/SEC2NSEC); } } s = s->next; } pthread_mutex_unlock(&(srv_tab.mutex)); return buf; } int _srv_reg(char *srv_name, srv_fp fp, char *fp_name, void *srv_data) { struct srv_info *s = (struct srv_info *) malloc(sizeof(struct srv_info)); if (s == NULL) { return SRV_NO_MEM; } memset(s, 0, sizeof(struct srv_info)); s->fp = fp; s->srv_data = srv_data; s->srv_name = strdup(srv_name); s->fp_name = strdup(fp_name); pthread_cond_init(&s->cond, NULL); pthread_mutex_init(&s->mutex, NULL); s->tm_fd = -1; if (!s->srv_name) { free(s); return SRV_NO_MEM; } /* insert into tab */ pthread_mutex_lock(&(srv_tab.mutex)); s->next = srv_tab.list; srv_tab.list = s; srv_tab.num_srv++; pthread_mutex_unlock(&(srv_tab.mutex)); /* create the thread */ pthread_create(&s->thread_id, NULL , _srv_wrap, s); return 0; } static int _srv_update_timer(struct srv_info *s, unsigned long sec, unsigned long nsec) { struct timespec now_tm; struct itimerspec new_tm; if (s->tm_fd < 0) { s->tm_fd = timerfd_create(CLOCK_MONOTONIC, 0); if (s->tm_fd < 0) { printf("timerfd_create failed: errno:%d/%s\n", errno, strerror(errno)); return -1; } } if (clock_gettime(CLOCK_MONOTONIC, &now_tm) == -1) { printf("timerfd_create failed: errno:%d/%s\n", errno, strerror(errno)); return -1; } new_tm.it_value.tv_sec = now_tm.tv_sec + sec; new_tm.it_value.tv_nsec = now_tm.tv_nsec + nsec; new_tm.it_interval.tv_sec = sec; new_tm.it_interval.tv_nsec = nsec; if (timerfd_settime(s->tm_fd, TFD_TIMER_ABSTIME, &new_tm, NULL) < 0) { printf("timerfd_settime failed: errno:%d/%s\n", errno, strerror(errno)); return -1; } return 0; } inline static int _srv_name_equal(struct srv_info *s, char *srv_name) { return strlen(s->srv_name) == strlen(srv_name) && !strcmp(s->srv_name, srv_name); } static int _srv_set_running(char *srv_name, int running, unsigned long sec, unsigned long nsec) { struct srv_info *s; int ret = 0; pthread_mutex_lock(&(srv_tab.mutex)); s = srv_tab.list; while (s != NULL) { if (_srv_name_equal(s, srv_name)) { s->running = running; if (sec || nsec) { ret = _srv_update_timer(s, sec, nsec); } else { /* reset timer to zero */ _srv_update_timer(s, 0, 0); /* clear timer */ close(s->tm_fd); s->tm_fd = -1; } pthread_cond_signal(&s->cond); pthread_mutex_unlock(&(srv_tab.mutex)); return ret; } s = s->next; } pthread_mutex_unlock(&(srv_tab.mutex)); return -1; } int srv_start(char *srv_name) { return _srv_set_running(srv_name, 1 /* running */, 0 /* delay.sec */, 0 /* delay.nsec */); } int srv_stop(char *srv_name) { return _srv_set_running(srv_name, 0 /* running */, 0 /* delay.sec */, 0 /* delay.nsec */); } static int _srv_free(struct srv_info *s) { int ret; s->running = 0; /* cancel the thread */ ret = pthread_cancel(s->thread_id); if (ret != 0) { printf("pthread_cancel failed: %d/%d\n", ret, errno); /* FIXME */ } ret = pthread_join(s->thread_id, NULL); if (ret != 0) { printf("pthread_join failed: %d/%d\n", ret, errno); /* FIXME */ } /* free resouce */ pthread_cond_destroy(&(s->cond)); pthread_mutex_destroy(&(s->mutex)); free(s->srv_name); free(s->fp_name); if (s->tm_fd != -1) { close(s->tm_fd); } free(s); return ret; } int srv_unreg(char *srv_name) { struct srv_info **s, *sn = NULL; int ret; s = &(srv_tab.list); pthread_mutex_lock(&(srv_tab.mutex)); while ((*s) != NULL) { if (_srv_name_equal(*s, srv_name)) { sn = *s; *s = (*s)->next; srv_tab.num_srv--; break; } s = &((*s)->next); } pthread_mutex_unlock(&(srv_tab.mutex)); if (sn) { ret = _srv_free(sn); } return 0; } int srv_start_periodical(char *srv_name, unsigned long sec, unsigned long nsec) { return _srv_set_running(srv_name, 1 /* running */, sec /* delay.sec */, nsec /* delay.nsec */); }
main.c (test program)
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <time.h> #include <stdarg.h> #include "service-reg.h" int pr(const char *fmt, ...) { va_list ap; int ret; struct timespec cur_clk; clock_gettime(CLOCK_MONOTONIC, &cur_clk); printf("***CLK: %f\n", cur_clk.tv_sec + cur_clk.tv_nsec/1000000000.0); va_start(ap, fmt); ret = vprintf(fmt, ap); va_end(ap); return ret; } void *sms_service(void *v) { int *i = (int *) v; pr(" ->hello, %p/%d, srv_start_periodical %d\n", v, (*i)++, 2); pr(" ->%d\n", srv_start_periodical("brook", 2, 0)); } int main(int argc, char *argv[]) { int i = 0, data = 0, ret; char name[] = "brook", buf[1024]; pr("srv_reg %s\n", name); srv_reg(name, sms_service, (void*)&data); pr("srv_start %s\n", name); srv_start(name); pr("sleep 3\n"); sleep(3); sleep(3); pr("srv_stop %s\n", name); srv_stop(name); pr("srv_start_periodical %s with %dsec\n", name, 1); srv_start_periodical(name, 1, 0); pr("srv_dump %s\n", name); pr("%s\n", srv_dump(buf, sizeof(buf))); pr("sleep 2\n"); sleep(2); pr("srv_unreg %s\n", name); srv_unreg(name); pr("over\n"); return 0; }
執行結果
brook@vista:~$ ./service-reg ***CLK: 7914669.498393 srv_reg brook ***CLK: 7914669.498469 srv_start brook ***CLK: 7914669.498481 sleep 3 ***CLK: 7914669.498531 ->hello, 0x7ffc24e80df8/0, srv_start_periodical 2 ***CLK: 7914669.498541 ->0 ***CLK: 7914671.498595 ->hello, 0x7ffc24e80df8/1, srv_start_periodical 2 ***CLK: 7914671.498647 ->0 ***CLK: 7914672.498576 srv_stop brook ***CLK: 7914672.498599 srv_start_periodical brook with 1sec ***CLK: 7914672.498606 srv_dump brook ***CLK: 7914672.498621 ->hello, 0x7ffc24e80df8/2, srv_start_periodical 2 ***CLK: 7914672.498662 ->0 ***CLK: 7914672.498625 cur-clk: 7914672.498611 id srv_name running/in_fp fp/fname/dp run-cnt/run-time enter-clk/leave-clk tm_fd/next_expir/perodic 0 brook 1/0 0x40106f/sms_service/0x7ffc24e80df8 2/0.000082 7914671.498594/7914671.498656 3/0.999982/1.000000 ***CLK: 7914672.498680 sleep 2 ***CLK: 7914674.498698 ->hello, 0x7ffc24e80df8/3, srv_start_periodical 2 ***CLK: 7914674.498730 ->0 ***CLK: 7914674.498742 srv_unreg brook ***CLK: 7914674.498997 over