35 #include <corosync/cpg.h>
36 #include <corosync/cfg.h>
40 #include "asterisk/poll-compat.h"
47 #include "asterisk/stasis_message_router.h"
48 #include "asterisk/stasis_system.h"
51 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
52 AST_RWLOCK_DEFINE_STATIC(init_cpg_lock);
55 #define COROSYNC_POLL_TIMEOUT (10 * 1000)
89 #define COROSYNC_IPC_BUFFER_SIZE (8192 * 128)
92 #define corosync_pthread_create_background(a, b, c, d) \
93 ast_pthread_create_stack(a, b, c, d, \
94 (AST_BACKGROUND_STACKSIZE + (3 * COROSYNC_IPC_BUFFER_SIZE)), \
95 __FILE__, __FUNCTION__, __LINE__, #c)
113 static int corosync_node_hash_fn(
const void *obj,
const int flags)
133 static int corosync_node_cmp_fn(
void *obj,
void *arg,
int flags)
145 cmp = (left->
id == *
id);
148 cmp = (left->
id == right->
id);
171 ast_free(payload->
event);
187 if (!payload->
event) {
211 ast_assert(event != NULL);
213 if (!corosync_ping_message_type()) {
228 ao2_t_ref(payload, -1,
"Destroy payload on off nominal");
234 ao2_t_ref(payload, -1,
"Hand ref to stasis");
235 ao2_t_ref(message, -1,
"Hand ref to stasis");
241 unsigned char publish;
242 unsigned char publish_default;
243 unsigned char subscribe;
244 unsigned char subscribe_default;
248 void (* publish_to_stasis)(
struct ast_event *);
261 .publish_default = 1,
262 .subscribe_default = 1,
264 .message_type_fn = corosync_ping_message_type,
267 .publish_default = 1,
268 .subscribe_default = 1,
270 .message_type_fn = ast_cluster_discovery_type,
278 } dispatch_thread = {
279 .id = AST_PTHREADT_NULL,
280 .alert_pipe = { -1, -1 },
283 static cpg_handle_t cpg_handle;
284 static corosync_cfg_handle_t cfg_handle;
286 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
287 static void cfg_state_track_cb(
288 corosync_cfg_state_notification_buffer_t *notification_buffer,
292 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
293 corosync_cfg_shutdown_flags_t flags);
295 static corosync_cfg_callbacks_t cfg_callbacks = {
296 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
297 .corosync_cfg_state_track_callback = cfg_state_track_cb,
299 .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
314 ast_log(AST_LOG_NOTICE,
"Node %u (%s) at %s %s the cluster\n",
318 joined ?
"joined" :
"left");
374 node = corosync_node_alloc(event);
398 unsigned int new_msgs;
399 unsigned int old_msgs;
410 if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
414 if (new_msgs > INT_MAX) {
418 if (old_msgs > INT_MAX) {
423 (
int)old_msgs, NULL, event_eid)) {
426 ast_log(LOG_WARNING,
"Failed to publish MWI message for %s@%s from %s\n",
427 mailbox, context, eid);
436 unsigned int cachable;
446 if (ast_strlen_zero(device)) {
453 ast_log(LOG_WARNING,
"Failed to publish device state message for %s from %s\n",
458 static void cpg_deliver_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
459 uint32_t nodeid, uint32_t pid,
void *msg,
size_t msg_len);
461 static void cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
462 const struct cpg_address *member_list,
size_t member_list_entries,
463 const struct cpg_address *left_list,
size_t left_list_entries,
464 const struct cpg_address *joined_list,
size_t joined_list_entries);
466 static cpg_callbacks_t cpg_callbacks = {
467 .cpg_deliver_fn = cpg_deliver_cb,
468 .cpg_confchg_fn = cpg_confchg_cb,
471 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
472 static void cfg_state_track_cb(
473 corosync_cfg_state_notification_buffer_t *notification_buffer,
479 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
480 corosync_cfg_shutdown_flags_t flags)
484 static void cpg_deliver_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
485 uint32_t nodeid, uint32_t pid,
void *msg,
size_t msg_len)
488 void (*publish_handler)(
struct ast_event *) = NULL;
493 ast_debug(1,
"Ignoring event that's too small. %u < %u\n",
494 (
unsigned int) msg_len,
511 ast_rwlock_rdlock(&event_types_lock);
513 publish_handler = event_types[event_type].publish_to_stasis;
514 if (!event_types[event_type].subscribe || !publish_handler) {
517 ast_rwlock_unlock(&event_types_lock);
521 ast_rwlock_unlock(&event_types_lock);
528 memcpy(event, msg, msg_len);
536 ast_log(LOG_NOTICE,
"Got event PING from server with EID: '%s'\n", buf);
538 ast_debug(5,
"Publishing event %s (%u) to stasis\n",
540 publish_handler(event);
544 static void publish_event_to_corosync(
struct ast_event *event)
549 iov.iov_base = (
void *)event;
552 ast_debug(5,
"Publishing event %s (%u) to corosync\n",
558 ast_debug(5,
"publish_event_to_corosync rdlock\n");
559 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
560 ast_log(LOG_WARNING,
"CPG mcast failed (%u) for event %s (%u)\n",
563 ast_rwlock_unlock(&init_cpg_lock);
564 ast_debug(5,
"publish_event_to_corosync unlock\n");
566 ast_log(LOG_WARNING,
"CPG mcast not executed for event %s (%u): initializing CPG.\n",
594 ast_log(LOG_NOTICE,
"Sending event PING from this server with EID: '%s'\n", buf);
597 publish_event_to_corosync(event);
607 publish_to_corosync(message);
610 static int dump_cache_cb(
void *obj,
void *arg,
int flags)
618 publish_to_corosync(message);
623 static int clear_node_cache(
void *obj,
void *arg,
int flags)
647 static void cpg_confchg_cb(cpg_handle_t handle,
const struct cpg_name *group_name,
648 const struct cpg_address *member_list,
size_t member_list_entries,
649 const struct cpg_address *left_list,
size_t left_list_entries,
650 const struct cpg_address *joined_list,
size_t joined_list_entries)
655 for (i = 0; i < left_list_entries; i++) {
656 const struct cpg_address *cpg_node = &left_list[i];
665 for (j = 0; j < ARRAY_LEN(event_types); j++) {
669 ast_rwlock_rdlock(&event_types_lock);
671 if (!event_types[j].subscribe) {
672 ast_rwlock_unlock(&event_types_lock);
677 if (!event_types[j].cache_fn || !event_types[j].message_type_fn) {
678 ast_rwlock_unlock(&event_types_lock);
682 ast_rwlock_unlock(&event_types_lock);
688 ast_log(LOG_NOTICE,
"Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->
id);
690 ast_log(LOG_NOTICE,
"Cleared events of type %s from stasis cache.\n", event_types[j].name);
692 ao2_t_ref(messages, -1,
"Dispose of flushed cache");
701 if (!joined_list_entries) {
705 for (i = 0; i < ARRAY_LEN(event_types); i++) {
709 ast_rwlock_rdlock(&event_types_lock);
711 if (!event_types[i].publish) {
712 ast_rwlock_unlock(&event_types_lock);
717 if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
718 ast_rwlock_unlock(&event_types_lock);
722 ast_rwlock_unlock(&event_types_lock);
728 ast_log(LOG_NOTICE,
"Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
730 ast_log(LOG_NOTICE,
"Sent events of type %s to corosync.\n", event_types[i].name);
732 ao2_t_ref(messages, -1,
"Dispose of dumped cache");
740 unsigned int node_id;
742 corosync_cfg_node_address_t corosync_addr;
749 if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
750 ast_debug(5,
"send_cluster_notify rdlock\n");
752 if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
753 ast_log(LOG_WARNING,
"Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
757 if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
758 ast_log(LOG_WARNING,
"Failed to get local Corosync address. Not informing cluster of existance.\n");
762 ast_rwlock_unlock(&init_cpg_lock);
763 ast_debug(5,
"send_cluster_notify unlock\n");
766 sa = (
struct sockaddr *)corosync_addr.address;
767 sa_len = (
size_t)corosync_addr.address_length;
768 if ((res = getnameinfo(sa, sa_len, buf,
sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
769 ast_log(LOG_WARNING,
"Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
770 gai_strerror(res), res);
778 publish_event_to_corosync(event);
782 static void *dispatch_thread_handler(
void *data)
785 struct pollfd pfd[3] = {
786 { .events = POLLIN, },
787 { .events = POLLIN, },
788 { .events = POLLIN, },
791 if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
792 ast_debug(5,
"dispatch_thread_handler rdlock\n");
793 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
794 ast_log(LOG_ERROR,
"Failed to get CPG fd. This module is now broken.\n");
795 ast_rwlock_unlock(&init_cpg_lock);
796 ast_debug(5,
"dispatch_thread_handler unlock\n");
800 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
801 ast_log(LOG_ERROR,
"Failed to get CFG fd. This module is now broken.\n");
802 ast_rwlock_unlock(&init_cpg_lock);
803 ast_debug(5,
"dispatch_thread_handler unlock\n");
807 pfd[2].fd = dispatch_thread.alert_pipe[0];
808 ast_rwlock_unlock(&init_cpg_lock);
809 ast_debug(5,
"dispatch_thread_handler unlock\n");
811 ast_log(LOG_ERROR,
"Failed to get fd: initializing CPG. This module is now broken.\n");
815 while (!dispatch_thread.stop) {
825 if (res == -1 && errno != EINTR && errno != EAGAIN) {
826 ast_log(LOG_ERROR,
"poll() error: %s (%d)\n", strerror(errno), errno);
827 cs_err = CS_ERR_BAD_HANDLE;
828 }
else if (res == 0) {
829 unsigned int local_nodeid;
831 if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
832 ast_debug(5,
"dispatch_thread_handler rdlock\n");
833 if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
834 struct cpg_name name;
835 struct cpg_address address[CPG_MEMBERS_MAX];
836 int entries = CPG_MEMBERS_MAX;
839 name.length = strlen(name.value);
840 if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
844 ast_debug(1,
"CPG group has %i node membership\n", entries);
845 for (i = 0; (i < entries) && !found; i++) {
846 if (address[i].nodeid == local_nodeid)
850 ast_log(LOG_WARNING,
"Failed to check CPG node membership\n");
852 cs_err = CS_ERR_BAD_HANDLE;
855 ast_log(LOG_WARNING,
"Failed to get CPG node membership: %u\n", cs_err);
857 cs_err = CS_ERR_BAD_HANDLE;
860 ast_log(LOG_WARNING,
"Failed to get CPG local node id: %u\n", cs_err);
862 cs_err = CS_ERR_BAD_HANDLE;
864 ast_rwlock_unlock(&init_cpg_lock);
865 ast_debug(5,
"dispatch_thread_handler unlock\n");
867 ast_log(LOG_WARNING,
"Failed to check CPG node membership: initializing CPG.\n");
869 cs_err = CS_ERR_BAD_HANDLE;
872 if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
873 ast_debug(5,
"dispatch_thread_handler rdlock\n");
874 if (pfd[0].revents & POLLIN) {
875 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
876 ast_log(LOG_WARNING,
"Failed CPG dispatch: %u\n", cs_err);
880 if (pfd[1].revents & POLLIN) {
881 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
882 ast_log(LOG_WARNING,
"Failed CFG dispatch: %u\n", cs_err);
885 ast_rwlock_unlock(&init_cpg_lock);
886 ast_debug(5,
"dispatch_thread_handler unlock\n");
888 ast_log(LOG_WARNING,
"Failed to dispatch: initializing CPG.\n");
891 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
895 ast_log(LOG_NOTICE,
"Attempting to recover from corosync failure.\n");
897 if (!ast_rwlock_trywrlock(&init_cpg_lock)) {
898 struct cpg_name name;
899 ast_debug(5,
"dispatch_thread_handler wrlock\n");
902 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
903 ast_log(LOG_ERROR,
"Failed to finalize cpg (%d)\n", (
int) cs_err);
906 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
907 ast_log(LOG_ERROR,
"Failed to finalize cfg (%d)\n", (
int) cs_err);
910 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
911 ast_log(LOG_ERROR,
"Failed to initialize cfg (%d)\n", (
int) cs_err);
912 ast_rwlock_unlock(&init_cpg_lock);
913 ast_debug(5,
"dispatch_thread_handler unlock\n");
918 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
919 ast_log(LOG_ERROR,
"Failed to initialize cpg (%d)\n", (
int) cs_err);
920 ast_rwlock_unlock(&init_cpg_lock);
921 ast_debug(5,
"dispatch_thread_handler unlock\n");
926 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
927 ast_log(LOG_ERROR,
"Failed to get CPG fd.\n");
928 ast_rwlock_unlock(&init_cpg_lock);
929 ast_debug(5,
"dispatch_thread_handler unlock\n");
934 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
935 ast_log(LOG_ERROR,
"Failed to get CFG fd.\n");
936 ast_rwlock_unlock(&init_cpg_lock);
937 ast_debug(5,
"dispatch_thread_handler unlock\n");
943 name.length = strlen(name.value);
944 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
945 ast_log(LOG_ERROR,
"Failed to join cpg (%d)\n", (
int) cs_err);
946 ast_rwlock_unlock(&init_cpg_lock);
947 ast_debug(5,
"dispatch_thread_handler unlock\n");
952 ast_rwlock_unlock(&init_cpg_lock);
953 ast_debug(5,
"dispatch_thread_handler unlock\n");
954 ast_log(LOG_NOTICE,
"Corosync recovery complete.\n");
957 ast_log(LOG_NOTICE,
"Failed to recover from corosync failure: initializing CPG.\n");
968 cpg_iteration_handle_t cpg_iter;
969 struct cpg_iteration_description_t cpg_desc;
974 e->
command =
"corosync show members";
976 "Usage: corosync show members\n"
977 " Show corosync cluster members\n";
984 if (a->argc != e->
args) {
985 return CLI_SHOWUSAGE;
988 if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
989 ast_debug(5,
"corosync_show_members rdlock\n");
990 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
992 if (cs_err != CS_OK) {
993 ast_cli(a->fd,
"Failed to initialize CPG iterator: %u.\n", cs_err);
994 cpg_iteration_finalize(cpg_iter);
995 ast_rwlock_unlock(&init_cpg_lock);
996 ast_debug(5,
"corosync_show_members unlock\n");
1001 "=============================================================\n"
1002 "=== Cluster members =========================================\n"
1003 "=============================================================\n"
1006 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
1008 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
1009 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1010 corosync_cfg_node_address_t addrs[8];
1015 ast_cli(a->fd,
"=== Node %u\n", i);
1016 ast_cli(a->fd,
"=== --> Group: %s\n", cpg_desc.group.value);
1018 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
1024 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
1025 ARRAY_LEN(addrs), &num_addrs, addrs);
1026 if (cs_err != CS_OK) {
1027 ast_log(LOG_WARNING,
"Failed to get node addresses\n");
1031 for (j = 0; j < num_addrs; j++) {
1032 struct sockaddr *sa = (
struct sockaddr *) addrs[j].address;
1033 size_t sa_len = (size_t) addrs[j].address_length;
1036 getnameinfo(sa, sa_len, buf,
sizeof(buf), NULL, 0, NI_NUMERICHOST);
1038 ast_cli(a->fd,
"=== --> Address %u: %s\n", j + 1, buf);
1041 ast_cli(a->fd,
"=== --> Nodeid: %"PRIu32
"\n", cpg_desc.nodeid);
1045 ast_cli(a->fd,
"===\n"
1046 "=============================================================\n"
1049 cpg_iteration_finalize(cpg_iter);
1050 ast_rwlock_unlock(&init_cpg_lock);
1051 ast_debug(5,
"corosync_show_members unlock\n");
1053 ast_cli(a->fd,
"Failed to initialize CPG iterator: initializing CPG.\n");
1067 "Usage: corosync ping\n"
1068 " Send a test ping to the cluster.\n"
1069 "A NOTICE will be in the log for every ping received\n"
1070 "on a server.\n If you send a ping, you should see a NOTICE\n"
1071 "in the log for every server in the cluster.\n";
1078 if (a->argc != e->
args) {
1079 return CLI_SHOWUSAGE;
1100 e->
command =
"corosync show config";
1102 "Usage: corosync show config\n"
1103 " Show configuration loaded from res_corosync.conf\n";
1110 if (a->argc != e->
args) {
1111 return CLI_SHOWUSAGE;
1115 "=============================================================\n"
1116 "=== res_corosync config =====================================\n"
1117 "=============================================================\n"
1120 ast_rwlock_rdlock(&event_types_lock);
1121 ast_debug(5,
"corosync_show_config rdlock\n");
1122 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1123 if (event_types[i].publish) {
1124 ast_cli(a->fd,
"=== ==> Publishing Event Type: %s\n",
1125 event_types[i].name);
1127 if (event_types[i].subscribe) {
1128 ast_cli(a->fd,
"=== ==> Subscribing to Event Type: %s\n",
1129 event_types[i].name);
1132 ast_rwlock_unlock(&event_types_lock);
1133 ast_debug(5,
"corosync_show_config unlock\n");
1135 ast_cli(a->fd,
"===\n"
1136 "=============================================================\n"
1143 AST_CLI_DEFINE(corosync_show_config,
"Show configuration"),
1144 AST_CLI_DEFINE(corosync_show_members,
"Show cluster members"),
1145 AST_CLI_DEFINE(corosync_ping,
"Send a test ping to the cluster"),
1153 static int set_event(
const char *event_type,
int pubsub)
1157 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1158 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
1164 event_types[i].publish = 1;
1167 event_types[i].subscribe = 1;
1174 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
1177 static int load_general_config(
struct ast_config *cfg)
1183 ast_rwlock_wrlock(&event_types_lock);
1184 ast_debug(5,
"load_general_config wrlock\n");
1186 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1187 event_types[i].publish = event_types[i].publish_default;
1188 event_types[i].subscribe = event_types[i].subscribe_default;
1191 for (v = ast_variable_browse(cfg,
"general"); v && !res; v = v->
next) {
1192 if (!strcasecmp(v->
name,
"publish_event")) {
1193 res = set_event(v->
value, PUBLISH);
1194 }
else if (!strcasecmp(v->
name,
"subscribe_event")) {
1195 res = set_event(v->
value, SUBSCRIBE);
1197 ast_log(LOG_WARNING,
"Unknown option '%s'\n", v->
name);
1201 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1202 if (event_types[i].publish && !event_types[i].sub) {
1206 event_types[i].message_type_fn(),
1209 }
else if (!event_types[i].publish && event_types[i].sub) {
1210 event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
1212 event_types[i].message_type_fn());
1216 ast_rwlock_unlock(&event_types_lock);
1217 ast_debug(5,
"load_general_config unlock\n");
1222 static int load_config(
unsigned int reload)
1224 static const char filename[] =
"res_corosync.conf";
1226 const char *cat = NULL;
1232 if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
1237 if (!strcasecmp(cat,
"general")) {
1238 res = load_general_config(cfg);
1240 ast_log(LOG_WARNING,
"Unknown configuration section '%s'\n", cat);
1249 static void cleanup_module(
void)
1254 if (stasis_router) {
1257 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1260 unsigned char subscribe = 0;
1262 ast_rwlock_wrlock(&event_types_lock);
1263 ast_debug(5,
"cleanup_module wrlock\n");
1264 subscribe = event_types[i].subscribe;
1266 if (event_types[i].sub) {
1267 event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
1270 event_types[i].publish = 0;
1271 event_types[i].subscribe = 0;
1272 ast_rwlock_unlock(&event_types_lock);
1273 ast_debug(5,
"cleanup_module unlock\n");
1275 if (subscribe && event_types[i].cache_fn && event_types[i].message_type_fn) {
1278 ast_log(LOG_NOTICE,
"Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
1280 ast_log(LOG_NOTICE,
"Cleared events of type %s from stasis cache.\n", event_types[i].name);
1281 ao2_t_ref(messages, -1,
"Dispose of flushed cache");
1286 stasis_router = NULL;
1289 if (corosync_aggregate_topic) {
1290 ao2_t_ref(corosync_aggregate_topic, -1,
"Dispose of topic on cleanup");
1291 corosync_aggregate_topic = NULL;
1296 if (dispatch_thread.id != AST_PTHREADT_NULL) {
1297 char meepmeep =
'x';
1298 dispatch_thread.stop = 1;
1301 ast_log(LOG_ERROR,
"Failed to write to pipe: %s (%d)\n",
1302 strerror(errno), errno);
1304 pthread_join(dispatch_thread.id, NULL);
1307 if (dispatch_thread.alert_pipe[0] != -1) {
1308 close(dispatch_thread.alert_pipe[0]);
1309 dispatch_thread.alert_pipe[0] = -1;
1312 if (dispatch_thread.alert_pipe[1] != -1) {
1313 close(dispatch_thread.alert_pipe[1]);
1314 dispatch_thread.alert_pipe[1] = -1;
1317 if (!ast_rwlock_trywrlock(&init_cpg_lock)) {
1318 ast_debug(5,
"cleanup_module wrlock\n");
1319 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1320 ast_log(LOG_ERROR,
"Failed to finalize cpg (%d)\n", (
int) cs_err);
1324 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1325 ast_log(LOG_ERROR,
"Failed to finalize cfg (%d)\n", (
int) cs_err);
1329 ast_rwlock_unlock(&init_cpg_lock);
1330 ast_debug(5,
"cleanup_module unlock\n");
1336 static int load_module(
void)
1339 struct cpg_name name;
1342 ast_log(LOG_ERROR,
"Entity ID is not set.\n");
1347 corosync_node_hash_fn, NULL, corosync_node_cmp_fn);
1353 if (!corosync_aggregate_topic) {
1354 ast_log(AST_LOG_ERROR,
"Failed to create stasis topic for corosync\n");
1358 stasis_router = stasis_message_router_create(corosync_aggregate_topic);
1359 if (!stasis_router) {
1360 ast_log(AST_LOG_ERROR,
"Failed to create message router for corosync topic\n");
1367 ast_log(AST_LOG_ERROR,
"Failed to initialize corosync ping message type\n");
1371 if (load_config(0)) {
1376 if (!ast_rwlock_trywrlock(&init_cpg_lock)) {
1379 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1380 ast_log(LOG_ERROR,
"Failed to initialize cfg: (%d)\n", (
int) cs_err);
1381 ast_rwlock_unlock(&init_cpg_lock);
1386 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1387 ast_log(LOG_ERROR,
"Failed to initialize cpg: (%d)\n", (
int) cs_err);
1388 ast_rwlock_unlock(&init_cpg_lock);
1394 name.length = strlen(name.value);
1396 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1397 ast_log(LOG_ERROR,
"Failed to join: (%d)\n", (
int) cs_err);
1398 ast_rwlock_unlock(&init_cpg_lock);
1403 if (pipe(dispatch_thread.alert_pipe) == -1) {
1404 ast_log(LOG_ERROR,
"Failed to create alert pipe: %s (%d)\n",
1405 strerror(errno), errno);
1406 ast_rwlock_unlock(&init_cpg_lock);
1412 ast_rwlock_unlock(&init_cpg_lock);
1415 dispatch_thread_handler, NULL)) {
1416 ast_log(LOG_ERROR,
"Error starting CPG dispatch thread.\n");
1433 static int unload_module(
void)
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
struct ast_variable * next
ast_device_state
Device States.
Asterisk main include file. File version handling, generic pbx functions.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct stasis_topic * ast_system_topic(void)
A Stasis Message Bus API topic which publishes messages regarding system changes. ...
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
enum ast_event_type ast_event_get_type(const struct ast_event *event)
Get the type for an event.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
int ast_sockaddr_parse(struct ast_sockaddr *addr, const char *str, int flags)
Parse an IPv4 or IPv6 address string.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
#define COROSYNC_POLL_TIMEOUT
Timeout for Corosync's poll process.
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
The arg parameter is a search key, but is not an object.
struct ast_json_payload * ast_json_payload_create(struct ast_json *json)
Create an ao2 object to pass json blobs as data payloads for stasis.
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
int ast_carefulwrite(int fd, char *s, int len, int timeoutms)
Try to write string, but wait no more than ms milliseconds before timing out.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
static void corosync_ping_payload_dtor(void *obj)
Destructor for the corosync_ping_payload wrapper object.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
descriptor for a cli entry.
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container, as described below.
static void send_cluster_notify(void)
Informs the cluster of our EID and our IP addresses.
Structure for variables, used for configurations and for channel variables.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Assume that the ao2_container is already locked.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Number of new messages Used by: AST_EVENT_MWI Payload type: UINT.
Number of Used by: AST_EVENT_MWI Payload type: UINT.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
static void publish_cluster_discovery_to_stasis(struct ast_event *event)
Publish a received cluster discovery ast_event to Stasis Message Bus API.
char * ast_category_browse(struct ast_config *config, const char *prev_name)
Browse categories.
static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
Publish cluster discovery to Stasis Message Bus API.
Socket address structure.
An Entity ID is essentially a MAC address, brief and unique.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
Entity ID Used by All events Payload type: RAW This IE indicates which server the event originated fr...
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
int args
This gets set in ast_cli_register()
Configuration File Parser.
struct stasis_message_type * ast_device_state_message_type(void)
Get the Stasis message type for device state messages.
static void publish_corosync_ping_to_stasis(struct ast_event *event)
Publish a Corosync ping to Stasis Message Bus API.
Context IE Used by AST_EVENT_MWI Payload type: str.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
#define ast_config_load(filename, flags)
Load a config file.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
static struct ast_event * corosync_ping_to_event(struct stasis_message *message)
Convert a Corosync PING to a ast_event.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_cache * ast_mwi_state_cache(void)
Backend cache for ast_mwi_topic_cached().
#define ast_malloc(len)
A wrapper for malloc()
#define ast_debug(level,...)
Log a DEBUG message.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
static void publish_device_state_to_stasis(struct ast_event *event)
Publish a received device state ast_event to Stasis Message Bus API.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Event non-cacheability flag Used by: All events Payload type: UINT.
const void * ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a raw payload.
static int corosync_node_joined
Join to corosync.
int ast_publish_mwi_state_full(const char *mailbox, const char *context, int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
Publish a MWI state update via stasis with all parameters.
int ast_publish_device_state_full(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, struct ast_eid *eid)
Publish a device state update with EID.
A payload wrapper around a corosync ping event.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
struct stasis_cache * ast_device_state_cache(void)
Backend cache for ast_device_state_topic_cached()
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_message_type * ast_mwi_state_type(void)
Get the Stasis Message Bus API message type for MWI messages.
#define corosync_pthread_create_background(a, b, c, d)
Version of pthread_create to ensure stack is large enough.
struct stasis_topic * ast_device_state_topic_all(void)
Get the Stasis topic for device state messages.
struct ast_event * stasis_message_to_event(struct stasis_message *msg)
Build the Generic event system representation of the message.
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
size_t ast_event_minimum_length(void)
Get the minimum length of an ast_event.
Support for logging to various files, console and syslog Configuration in file logger.conf.
Module has failed to load, may be in an inconsistent state.
An API for managing task processing threads that can be shared across modules.
Structure used to handle boolean flags.
static void publish_mwi_to_stasis(struct ast_event *event)
Publish a received MWI ast_event to Stasis Message Bus API.
uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has an integer payload.
struct ast_eid ast_eid_default
Global EID.
STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_sync_message_type)
A message type used to synchronize with the CDR topic.
The arg parameter is an object of the same type.
void ast_event_destroy(struct ast_event *event)
Destroy an event.
Standard Command Line Interface.
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
static struct stasis_message_router * stasis_router
Our Stasis Message Bus API message router.
Abstract JSON element (object, array, string, int, ...).
size_t ast_event_get_size(const struct ast_event *event)
Get the size of an event.
static struct stasis_topic * corosync_aggregate_topic
The internal topic used for message forwarding and pings.
const char * ast_event_get_type_name(const struct ast_event *event)
Get the string representation of the type of the given event.
int ast_eid_is_empty(const struct ast_eid *eid)
Check if EID is empty.
Search option field mask.
struct stasis_topic * ast_mwi_topic_all(void)
Get the Stasis Message Bus API topic for MWI messages.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void ast_config_destroy(struct ast_config *cfg)
Destroys a config.
Generic State IE Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: UINT The actual state values dep...
const char * ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type)
Get the value of an information element that has a string payload.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
Device Name Used by AST_EVENT_DEVICE_STATE_CHANGE Payload type: STR.
Cluster node ID Used by: Corosync Payload type: UINT.
static struct stasis_topic * corosync_topic(void)
Internal accessor for our topic.
static struct ao2_container * nodes
All the nodes that we're aware of.