36 #include "asterisk/threadpool.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
301 #define INITIAL_SUBSCRIBERS_MAX 4
304 #define TOPIC_POOL_BUCKETS 57
311 #if defined(LOW_MEMORY)
313 #define TOPIC_ALL_BUCKETS 257
317 #define TOPIC_ALL_BUCKETS 997
324 #define TOPIC_STATISTICS_BUCKETS 57
327 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
336 struct stasis_message_type_statistics {
346 AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
349 static AST_VECTOR(,
struct stasis_message_type_statistics) message_type_statistics;
352 struct stasis_topic_statistics {
354 long highest_time_dispatched;
356 long lowest_time_dispatched;
358 int messages_not_dispatched;
360 int messages_dispatched;
379 struct stasis_topic_statistics *statistics;
403 struct timeval creationtime;
412 static void proxy_dtor(
void *weakproxy,
void *
container)
415 ao2_cleanup(container);
425 #define topic_lock_both(topic1, topic2) \
428 while (ao2_trylock(topic2)) { \
429 AO2_DEADLOCK_AVOIDANCE(topic1); \
433 static void topic_dtor(
void *obj)
440 ast_debug(2,
"Destroying topic. name: %s, detail: %s\n",
449 ast_debug(1,
"Topic '%s': %p destroyed\n", topic->
name, topic);
452 if (topic->statistics) {
458 ao2_ref(topic->statistics, -1);
464 static void topic_statistics_destroy(
void *obj)
466 struct stasis_topic_statistics *statistics = obj;
468 ao2_cleanup(statistics->subscribers);
471 static struct stasis_topic_statistics *stasis_topic_statistics_create(
struct stasis_topic *topic)
473 struct stasis_topic_statistics *statistics;
480 statistics = ao2_alloc(
sizeof(*statistics) + strlen(topic->
name) + 1, topic_statistics_destroy);
486 if (!statistics->subscribers) {
492 statistics->topic = topic;
493 strcpy(statistics->name, topic->
name);
500 static int link_topic_proxy(
struct stasis_topic *topic,
const char *name,
const char *detail)
506 if (!topic || !name || !strlen(name) || !detail) {
510 ao2_wrlock(topic_all);
514 ast_log(LOG_ERROR,
"The same topic is already exist. name: %s\n", name);
516 ao2_unlock(topic_all);
521 detail_len = strlen(detail) + 1;
523 proxy = ao2_t_weakproxy_alloc(
524 sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
526 ao2_unlock(topic_all);
532 proxy->name = proxy->buf;
533 proxy->detail = proxy->name + strlen(name) + 1;
535 strcpy(proxy->name, name);
540 if (ao2_t_weakproxy_set_object(proxy, topic,
OBJ_NOLOCK,
"weakproxy link")) {
542 ao2_unlock(topic_all);
549 ao2_unlock(topic_all);
550 ao2_cleanup(topic_all);
556 topic->
name = proxy->name;
557 topic->
detail = proxy->detail;
563 ao2_unlock(topic_all);
569 const char *name,
const char* detail
575 if (!name|| !strlen(name) || !detail) {
578 ast_debug(2,
"Creating topic. name: %s, detail: %s\n", name, detail);
582 ast_debug(2,
"Topic is already exist. name: %s, detail: %s\n",
587 topic = ao2_t_alloc(
sizeof(*topic), topic_dtor, name);
600 if (link_topic_proxy(topic, name, detail)) {
606 topic->statistics = stasis_topic_statistics_create(topic);
607 if (!topic->statistics) {
612 ast_debug(1,
"Topic '%s': %p created\n", topic->
name, topic);
649 struct stasis_subscription_statistics {
659 long highest_time_invoked;
661 long lowest_time_invoked;
663 int messages_dropped;
710 struct stasis_subscription_statistics *statistics;
714 static void subscription_dtor(
void *obj)
729 ao2_cleanup(sub->
topic);
738 if (sub->statistics) {
740 if (subscription_stats) {
741 ao2_unlink(subscription_stats, sub->statistics);
742 ao2_ref(subscription_stats, -1);
760 struct timeval start;
794 if (elapsed > sub->statistics->highest_time_invoked) {
795 sub->statistics->highest_time_invoked = elapsed;
796 ao2_lock(sub->statistics);
798 ao2_unlock(sub->statistics);
800 if (elapsed < sub->statistics->lowest_time_invoked) {
801 sub->statistics->lowest_time_invoked = elapsed;
814 static void subscription_statistics_destroy(
void *obj)
816 struct stasis_subscription_statistics *statistics = obj;
818 ao2_cleanup(statistics->topics);
821 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(
struct stasis_subscription *sub,
822 int needs_mailbox,
int use_thread_pool,
const char *file,
int lineno,
825 struct stasis_subscription_statistics *statistics;
828 if (!subscription_stats) {
832 statistics = ao2_alloc(
sizeof(*statistics) + strlen(sub->
uniqueid) + 1, subscription_statistics_destroy);
838 if (!statistics->topics) {
843 statistics->file = file;
844 statistics->lineno = lineno;
845 statistics->func = func;
846 statistics->uses_mailbox = needs_mailbox;
847 statistics->uses_threadpool = use_thread_pool;
848 strcpy(statistics->uniqueid, sub->
uniqueid);
849 statistics->sub = sub;
850 ao2_link(subscription_stats, statistics);
881 sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
882 if (ret < 0 || !sub->statistics) {
899 use_thread_pool ?
'p' :
'm',
907 if (use_thread_pool) {
908 sub->
mailbox = ast_threadpool_serializer(tps_name, threadpool);
937 send_subscription_subscribe(topic, sub);
964 static int sub_cleanup(
void *data)
985 if (topic_remove_subscription(sub->
topic, sub) != 0) {
987 "Internal error: subscription has invalid topic\n");
994 send_subscription_unsubscribe(topic, sub);
1012 long low_water,
long high_water)
1018 low_water, high_water);
1026 if (!subscription) {
1030 ast_assert(type != NULL);
1041 ao2_lock(subscription->
topic);
1048 ao2_unlock(subscription->
topic);
1056 if (!subscription) {
1060 ast_assert(type != NULL);
1067 ao2_lock(subscription->
topic);
1072 ao2_unlock(subscription->
topic);
1080 if (!subscription) {
1084 ao2_lock(subscription->
topic);
1088 ao2_unlock(subscription->
topic);
1096 ast_assert(subscription != NULL);
1098 ao2_lock(subscription->
topic);
1100 ao2_unlock(subscription->
topic);
1108 ao2_lock(subscription);
1114 ao2_unlock(subscription);
1123 ao2_lock(subscription);
1125 ao2_unlock(subscription);
1137 if (!subscription) {
1146 ao2_cleanup(subscription);
1236 topic_remove_subscription(
1265 ao2_cleanup(message);
1293 ao2_cleanup(message);
1295 ast_mutex_lock(&std->lock);
1297 ast_cond_signal(&std->cond);
1298 ast_mutex_unlock(&std->lock);
1327 int type_filter_specified = 0;
1328 int formatter_filter_specified = 0;
1329 int type_filter_passed = 0;
1330 int formatter_filter_passed = 0;
1338 formatter_filter_specified = sub->
accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1341 if (!type_filter_specified && !formatter_filter_specified) {
1345 type_filter_passed = type_filter_specified
1353 if (type_filter_passed) {
1357 formatter_filter_passed = formatter_filter_specified
1360 if (formatter_filter_passed) {
1389 ast_log(LOG_ERROR,
"Dropping async dispatch\n");
1390 ao2_cleanup(message);
1396 ast_mutex_init(&std.lock);
1397 ast_cond_init(&std.cond, NULL);
1399 std.task_data = message;
1403 ast_log(LOG_ERROR,
"Dropping sync dispatch\n");
1404 ao2_cleanup(message);
1405 ast_mutex_destroy(&std.lock);
1406 ast_cond_destroy(&std.cond);
1410 ast_mutex_lock(&std.lock);
1411 while (!std.complete) {
1412 ast_cond_wait(&std.cond, &std.lock);
1414 ast_mutex_unlock(&std.lock);
1416 ast_mutex_destroy(&std.lock);
1417 ast_cond_destroy(&std.cond);
1435 unsigned int dispatched = 0;
1437 struct stasis_message_type_statistics *statistics;
1438 struct timeval start;
1442 ast_assert(topic != NULL);
1443 ast_assert(message != NULL);
1446 ast_mutex_lock(&message_type_statistics_lock);
1448 struct stasis_message_type_statistics new_statistics = {
1452 ast_mutex_unlock(&message_type_statistics_lock);
1458 ast_mutex_unlock(&message_type_statistics_lock);
1484 ast_assert(sub != NULL);
1488 dispatch_message(sub, message, (sub == sync_sub));
1494 if (elapsed > topic->statistics->highest_time_dispatched) {
1495 topic->statistics->highest_time_dispatched = elapsed;
1497 if (elapsed < topic->statistics->lowest_time_dispatched) {
1498 topic->statistics->lowest_time_dispatched = elapsed;
1513 publish_msg(topic, message, NULL);
1518 ast_assert(sub != NULL);
1520 publish_msg(sub->
topic, message, sub);
1538 static void forward_dtor(
void *obj)
1573 ao2_cleanup(forward);
1585 if (!from_topic || !to_topic) {
1595 if (to_topic == from_topic) {
1605 ao2_unlock(from_topic);
1606 ao2_unlock(to_topic);
1614 ao2_unlock(from_topic);
1615 ao2_unlock(to_topic);
1620 static void subscription_change_dtor(
void *obj)
1624 ao2_cleanup(change->
topic);
1629 size_t description_len = strlen(description) + 1;
1630 size_t uniqueid_len = strlen(uniqueid) + 1;
1633 change = ao2_alloc_options(
sizeof(*change) + description_len + uniqueid_len,
1660 change = subscription_change_alloc(topic, sub->
uniqueid,
"Subscribe");
1667 ao2_cleanup(change);
1673 ao2_cleanup(change);
1676 static void send_subscription_unsubscribe(
struct stasis_topic *topic,
1689 change = subscription_change_alloc(topic, sub->
uniqueid,
"Unsubscribe");
1696 ao2_cleanup(change);
1703 dispatch_message(sub, msg, 0);
1706 ao2_cleanup(change);
1715 static void topic_pool_entry_dtor(
void *obj)
1719 entry->forward = stasis_forward_cancel(entry->forward);
1720 ao2_cleanup(entry->topic);
1721 entry->topic = NULL;
1724 static struct topic_pool_entry *topic_pool_entry_alloc(
const char *topic_name)
1728 topic_pool_entry = ao2_alloc_options(
sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1730 if (!topic_pool_entry) {
1734 strcpy(topic_pool_entry->name, topic_name);
1736 return topic_pool_entry;
1744 static void topic_pool_dtor(
void *obj)
1750 char *container_name =
1757 ao2_cleanup(pool->pool_container);
1758 pool->pool_container = NULL;
1759 ao2_cleanup(pool->pool_topic);
1760 pool->pool_topic = NULL;
1763 static int topic_pool_entry_hash(
const void *obj,
const int flags)
1765 const struct topic_pool_entry *object;
1784 static int topic_pool_entry_cmp(
void *obj,
void *arg,
int flags)
1786 const struct topic_pool_entry *object_left = obj;
1787 const struct topic_pool_entry *object_right = arg;
1788 const char *right_key = arg;
1793 right_key = object_right->name;
1796 cmp = strcasecmp(object_left->name, right_key);
1822 static void topic_pool_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
1824 struct topic_pool_entry *
entry = v_obj;
1844 if (!pool->pool_container) {
1851 char *container_name =
1859 pool->pool_topic = pooled_topic;
1872 int pool_topic_name_len = strlen(pool_topic_name);
1873 const char *search_topic_name;
1875 if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876 search_topic_name = topic_name + pool_topic_name_len + 1;
1878 search_topic_name = topic_name;
1886 RAII_VAR(
struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1888 char *new_topic_name;
1892 if (topic_pool_entry) {
1893 return topic_pool_entry->topic;
1896 topic_pool_entry = topic_pool_entry_alloc(topic_name);
1897 if (!topic_pool_entry) {
1910 ast_free(new_topic_name);
1911 if (!topic_pool_entry->topic) {
1915 topic_pool_entry->forward =
stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1916 if (!topic_pool_entry->forward) {
1924 return topic_pool_entry->topic;
1929 struct topic_pool_entry *topic_pool_entry;
1931 topic_pool_entry = ao2_find(pool->pool_container, topic_name,
OBJ_SEARCH_KEY);
1932 if (!topic_pool_entry) {
1936 ao2_ref(topic_pool_entry, -1);
1940 void stasis_log_bad_type_access(
const char *name)
1944 ast_log(LOG_ERROR,
"Use of %s() before init/after destruction\n", name);
1959 static void multi_object_blob_dtor(
void *obj)
1980 ast_assert(blob != NULL);
1982 multi = ao2_alloc(
sizeof(*multi), multi_object_blob_dtor);
2005 ao2_cleanup(
object);
2027 if (!channel_snapshot) {
2045 static struct ast_json *multi_user_event_to_json(
2051 struct ast_json *blob = multi->blob;
2068 struct ast_json *json_object = NULL;
2096 static struct ast_str *multi_object_blob_to_ami(
void *obj)
2116 ami_snapshot = NULL;
2137 ast_free(ami_snapshot);
2147 static int userevent_exclusion_cb(
const char *key)
2149 if (!strcmp(
"eventname", key)) {
2161 const char *eventname;
2165 object_string = multi_object_blob_to_ami(multi);
2166 if (!object_string || !body) {
2202 static struct aco_type threadpool_option = {
2204 .name =
"threadpool",
2205 .item_offset = offsetof(
struct stasis_config, threadpool_options),
2206 .category =
"threadpool",
2215 .name =
"declined_message_types",
2216 .item_offset = offsetof(
struct stasis_config, declined_message_types),
2218 .category =
"declined_message_types",
2225 .types =
ACO_TYPES(&declined_option, &threadpool_option),
2231 static void *stasis_config_alloc(
void);
2235 .files = ACO_FILES(&stasis_conf),
2238 static void stasis_declined_config_destructor(
void *obj)
2245 static void stasis_config_destructor(
void *obj)
2253 static void *stasis_config_alloc(
void)
2257 if (!(cfg = ao2_alloc(
sizeof(*cfg), stasis_config_destructor))) {
2268 stasis_declined_config_destructor);
2286 char *name_in_declined;
2295 res = name_in_declined ? 1 : 0;
2296 ao2_cleanup(name_in_declined);
2299 ast_debug(4,
"Declining to allocate Stasis message type '%s' due to configuration\n", name);
2308 if (ast_strlen_zero(var->
value)) {
2324 .to_json = multi_user_event_to_json,
2325 .to_ami = multi_user_event_to_ami,
2340 #define FMT_HEADERS "%-64s %-64s\n"
2341 #define FMT_FIELDS "%-64s %-64s\n"
2345 e->
command =
"stasis show topics";
2347 "Usage: stasis show topics\n"
2348 " Shows a list of topics\n";
2354 if (a->argc != e->
args) {
2355 return CLI_SHOWUSAGE;
2358 ast_cli(a->fd,
"\n" FMT_HEADERS,
"Name",
"Detail");
2361 topic_proxy_sort_fn, NULL);
2364 ao2_cleanup(tmp_container);
2371 while ((topic = ao2_iterator_next(&iter))) {
2372 ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2377 ao2_cleanup(tmp_container);
2379 ast_cli(a->fd,
"\n%d Total topics\n\n", count);
2391 static char *topic_complete_name(
const char *word)
2395 int wordlen = strlen(word);
2399 while ((topic = ao2_iterator_next(&it))) {
2400 if (!strncasecmp(word, topic->name, wordlen)) {
2420 char print_time[32];
2425 e->
command =
"stasis show topic";
2427 "Usage: stasis show topic <name>\n"
2428 " Show stasis topic detail info.\n";
2432 return topic_complete_name(a->word);
2439 return CLI_SHOWUSAGE;
2444 ast_cli(a->fd,
"Specified topic '%s' does not exist\n", a->argv[3]);
2448 ast_cli(a->fd,
"Name: %s\n", topic->
name);
2449 ast_cli(a->fd,
"Detail: %s\n", topic->
detail);
2453 ast_cli(a->fd,
"Duration time: %s\n", print_time);
2456 ast_cli(a->fd,
"\nSubscribers:\n");
2459 ast_cli(a->fd,
" UniqueID: %s, Topic: %s, Detail: %s\n",
2463 ast_cli(a->fd,
"\nForwarded topics:\n");
2466 ast_cli(a->fd,
" Topic: %s, Detail: %s\n", topic_tmp->
name, topic_tmp->
detail);
2477 AST_CLI_DEFINE(stasis_show_topics,
"Show all topics"),
2478 AST_CLI_DEFINE(stasis_show_topic,
"Show topic"),
2495 struct stasis_subscription_statistics *statistics;
2499 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2500 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2501 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2505 e->
command =
"stasis statistics show subscriptions";
2507 "Usage: stasis statistics show subscriptions\n"
2508 " Shows a list of subscriptions and their general statistics\n";
2514 if (a->argc != e->
args) {
2515 return CLI_SHOWUSAGE;
2519 if (!subscription_stats) {
2520 ast_cli(a->fd,
"Could not fetch subscription_statistics container\n");
2525 stasis_subscription_statistics_sort_fn, NULL);
2526 if (!sorted_subscriptions) {
2527 ao2_ref(subscription_stats, -1);
2528 ast_cli(a->fd,
"Could not create container for sorting subscription statistics\n");
2533 ao2_ref(sorted_subscriptions, -1);
2534 ao2_ref(subscription_stats, -1);
2535 ast_cli(a->fd,
"Could not sort subscription statistics\n");
2539 ao2_ref(subscription_stats, -1);
2541 ast_cli(a->fd,
"\n" FMT_HEADERS,
"Subscription",
"Dropped",
"Passed",
"Lowest Invoke",
"Highest Invoke");
2544 while ((statistics = ao2_iterator_next(&iter))) {
2545 ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2546 statistics->lowest_time_invoked, statistics->highest_time_invoked);
2547 dropped += statistics->messages_dropped;
2548 passed += statistics->messages_passed;
2554 ao2_ref(sorted_subscriptions, -1);
2556 ast_cli(a->fd, FMT_FIELDS2,
"Total", dropped, passed);
2557 ast_cli(a->fd,
"\n%d subscriptions\n\n", count);
2570 static char *subscription_statistics_complete_name(
const char *word,
int state)
2572 struct stasis_subscription_statistics *statistics;
2575 int wordlen = strlen(word);
2577 char *result = NULL;
2580 if (!subscription_stats) {
2585 while ((statistics = ao2_iterator_next(&it_statistics))) {
2586 if (!strncasecmp(word, statistics->uniqueid, wordlen)
2587 && ++which > state) {
2596 ao2_ref(subscription_stats, -1);
2606 struct stasis_subscription_statistics *statistics;
2613 e->
command =
"stasis statistics show subscription";
2615 "Usage: stasis statistics show subscription <uniqueid>\n"
2616 " Show stasis subscription statistics.\n";
2620 return subscription_statistics_complete_name(a->word, a->n);
2627 return CLI_SHOWUSAGE;
2631 if (!subscription_stats) {
2632 ast_cli(a->fd,
"Could not fetch subcription_statistics container\n");
2636 statistics = ao2_find(subscription_stats, a->argv[4],
OBJ_SEARCH_KEY);
2638 ao2_ref(subscription_stats, -1);
2639 ast_cli(a->fd,
"Specified subscription '%s' does not exist\n", a->argv[4]);
2643 ao2_ref(subscription_stats, -1);
2645 ast_cli(a->fd,
"Subscription: %s\n", statistics->uniqueid);
2646 ast_cli(a->fd,
"Pointer Address: %p\n", statistics->sub);
2647 ast_cli(a->fd,
"Source filename: %s\n",
S_OR(statistics->file,
"<unavailable>"));
2648 ast_cli(a->fd,
"Source line number: %d\n", statistics->lineno);
2649 ast_cli(a->fd,
"Source function: %s\n",
S_OR(statistics->func,
"<unavailable>"));
2650 ast_cli(a->fd,
"Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2651 ast_cli(a->fd,
"Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2652 ast_cli(a->fd,
"Using mailbox to queue messages: %s\n", statistics->uses_mailbox ?
"Yes" :
"No");
2653 ast_cli(a->fd,
"Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ?
"Yes" :
"No");
2654 ast_cli(a->fd,
"Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2655 ast_cli(a->fd,
"Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2657 ao2_lock(statistics);
2658 if (statistics->highest_time_message_type) {
2659 ast_cli(a->fd,
"Offender message type for highest invoking time: %s\n",
stasis_message_type_name(statistics->highest_time_message_type));
2661 ao2_unlock(statistics);
2665 ast_cli(a->fd,
"Subscribed topics:\n");
2667 while ((name = ao2_iterator_next(&i))) {
2668 ast_cli(a->fd,
"\t%s\n", name);
2689 struct stasis_topic_statistics *statistics;
2691 int not_dispatched = 0;
2693 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2694 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2695 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2699 e->
command =
"stasis statistics show topics";
2701 "Usage: stasis statistics show topics\n"
2702 " Shows a list of topics and their general statistics\n";
2708 if (a->argc != e->
args) {
2709 return CLI_SHOWUSAGE;
2714 ast_cli(a->fd,
"Could not fetch topic_statistics container\n");
2719 stasis_topic_statistics_sort_fn, NULL);
2720 if (!sorted_topics) {
2722 ast_cli(a->fd,
"Could not create container for sorting topic statistics\n");
2729 ast_cli(a->fd,
"Could not sort topic statistics\n");
2735 ast_cli(a->fd,
"\n" FMT_HEADERS,
"Topic",
"Subscribers",
"Dropped",
"Dispatched",
"Lowest Dispatch",
"Highest Dispatch");
2738 while ((statistics = ao2_iterator_next(&iter))) {
2739 ast_cli(a->fd, FMT_FIELDS, statistics->name,
ao2_container_count(statistics->subscribers),
2740 statistics->messages_not_dispatched, statistics->messages_dispatched,
2741 statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2742 not_dispatched += statistics->messages_not_dispatched;
2743 dispatched += statistics->messages_dispatched;
2751 ast_cli(a->fd, FMT_FIELDS2,
"Total",
"", not_dispatched, dispatched);
2752 ast_cli(a->fd,
"\n%d topics\n\n", count);
2765 static char *topic_statistics_complete_name(
const char *word,
int state)
2767 struct stasis_topic_statistics *statistics;
2770 int wordlen = strlen(word);
2772 char *result = NULL;
2780 while ((statistics = ao2_iterator_next(&it_statistics))) {
2781 if (!strncasecmp(word, statistics->name, wordlen)
2782 && ++which > state) {
2801 struct stasis_topic_statistics *statistics;
2808 e->
command =
"stasis statistics show topic";
2810 "Usage: stasis statistics show topic <name>\n"
2811 " Show stasis topic statistics.\n";
2815 return topic_statistics_complete_name(a->word, a->n);
2822 return CLI_SHOWUSAGE;
2827 ast_cli(a->fd,
"Could not fetch topic_statistics container\n");
2834 ast_cli(a->fd,
"Specified topic '%s' does not exist\n", a->argv[4]);
2840 ast_cli(a->fd,
"Topic: %s\n", statistics->name);
2841 ast_cli(a->fd,
"Pointer Address: %p\n", statistics->topic);
2842 ast_cli(a->fd,
"Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2843 ast_cli(a->fd,
"Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2844 ast_cli(a->fd,
"Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2845 ast_cli(a->fd,
"Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2846 ast_cli(a->fd,
"Number of subscribers: %d\n",
ao2_container_count(statistics->subscribers));
2848 ast_cli(a->fd,
"Subscribers:\n");
2850 while ((uniqueid = ao2_iterator_next(&i))) {
2851 ast_cli(a->fd,
"\t%s\n", uniqueid);
2871 #define FMT_HEADERS "%-64s %10s %10s\n"
2872 #define FMT_FIELDS "%-64s %10d %10d\n"
2876 e->
command =
"stasis statistics show messages";
2878 "Usage: stasis statistics show messages\n"
2879 " Shows a list of message types and their general statistics\n";
2885 if (a->argc != e->
args) {
2886 return CLI_SHOWUSAGE;
2889 ast_cli(a->fd,
"\n" FMT_HEADERS,
"Message Type",
"Published",
"Unused");
2891 ast_mutex_lock(&message_type_statistics_lock);
2893 struct stasis_message_type_statistics *statistics =
AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2895 if (!statistics->message_type) {
2900 statistics->unused);
2901 published += statistics->published;
2902 unused += statistics->unused;
2905 ast_mutex_unlock(&message_type_statistics_lock);
2907 ast_cli(a->fd, FMT_FIELDS,
"Total", published, unused);
2908 ast_cli(a->fd,
"\n%d seen message types\n\n", count);
2917 AST_CLI_DEFINE(statistics_show_subscriptions,
"Show subscriptions with general statistics"),
2918 AST_CLI_DEFINE(statistics_show_subscription,
"Show subscription statistics"),
2919 AST_CLI_DEFINE(statistics_show_topics,
"Show topics with general statistics"),
2920 AST_CLI_DEFINE(statistics_show_topic,
"Show topic statistics"),
2921 AST_CLI_DEFINE(statistics_show_messages,
"Show message types with general statistics"),
2924 static int subscription_statistics_hash(
const void *obj,
const int flags)
2926 const struct stasis_subscription_statistics *object;
2929 switch (flags & OBJ_SEARCH_MASK) {
2935 key =
object->uniqueid;
2945 static int subscription_statistics_cmp(
void *obj,
void *arg,
int flags)
2947 const struct stasis_subscription_statistics *object_left = obj;
2948 const struct stasis_subscription_statistics *object_right = arg;
2949 const char *right_key = arg;
2952 switch (flags & OBJ_SEARCH_MASK) {
2954 right_key = object_right->uniqueid;
2957 cmp = strcasecmp(object_left->uniqueid, right_key);
2982 static int topic_statistics_hash(
const void *obj,
const int flags)
2984 const struct stasis_topic_statistics *object;
2987 switch (flags & OBJ_SEARCH_MASK) {
3003 static int topic_statistics_cmp(
void *obj,
void *arg,
int flags)
3005 const struct stasis_topic_statistics *object_left = obj;
3006 const struct stasis_topic_statistics *object_right = arg;
3007 const char *right_key = arg;
3010 switch (flags & OBJ_SEARCH_MASK) {
3012 right_key = object_right->name;
3015 cmp = strcasecmp(object_left->name, right_key);
3051 ao2_cleanup(topic_all);
3053 ast_threadpool_shutdown(threadpool);
3079 declined_options,
"", declined_handler, 0);
3081 threadpool_options,
"5",
OPT_INT_T, PARSE_IN_RANGE,
3085 threadpool_options,
"20",
OPT_INT_T, PARSE_IN_RANGE,
3089 threadpool_options,
"50",
OPT_INT_T, PARSE_IN_RANGE,
3101 ast_log(LOG_ERROR,
"Failed to initialize defaults on Stasis configuration object\n");
3108 ast_log(LOG_ERROR,
"Failed to load stasis.conf and failed to initialize defaults.\n");
3114 ast_log(LOG_NOTICE,
"Could not load Stasis configuration; using defaults\n");
3120 ast_log(LOG_ERROR,
"Failed to obtain Stasis configuration object\n");
3126 threadpool_opts.
version = AST_THREADPOOL_OPTIONS_VERSION;
3131 threadpool = ast_threadpool_create(
"stasis", NULL, &threadpool_opts);
3134 ast_log(LOG_ERROR,
"Failed to create 'stasis-core' threadpool\n");
3139 cache_init = stasis_cache_init();
3140 if (cache_init != 0) {
3152 topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3166 subscription_statistics_hash, 0, subscription_statistics_cmp);
3167 if (!subscription_stats) {
3171 ao2_cleanup(subscription_stats);
3174 topic_statistics_hash, 0, topic_statistics_cmp);
3179 ao2_cleanup(topic_stats);
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,)
Define multi user event message type(s).
Struct containing info for an AMI event to send out.
int auto_increment
Number of threads to increment pool by.
Main Channel structure associated with a channel.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct timeval * creationtime
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads...
void( ao2_prnt_fn)(void *where, const char *fmt,...)
Print output.
struct stasis_subscription::@400 accepted_message_types
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
AO2_STRING_FIELD_HASH_FN(transport_monitor, key)
Hashing function for struct transport_monitor.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
int idle_timeout
Time limit in seconds for idle threads.
struct stasis_topic * topic
#define STASIS_UMOS_MAX
Number of snapshot types.
#define TOPIC_POOL_BUCKETS
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
The arg parameter is a search key, but is not an object.
int initial_size
Number of threads the pool will start with.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
int max_size
Maximum number of threads a pool may have.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
struct stasis_topic::@399 upstream_topics
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
descriptor for a cli entry.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
struct stasis_topic::@398 subscribers
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Threadpool configuration options.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Structure for variables, used for configurations and for channel variables.
static pj_pool_t * pool
Global memory pool for configuration and timers.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Structure representing a snapshot of channel state.
Universally unique identifier support.
return a reference to a taskprocessor, create one if it does not exist
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Assume that the ao2_container is already locked.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
AO2_STRING_FIELD_CMP_FN(transport_monitor, key)
Comparison function for struct transport_monitor.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
#define ast_cli_register_multiple(e, len)
Register multiple commands.
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
#define ast_strdup(str)
A wrapper for strdup()
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
The representation of a single configuration file to be processed.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
#define topic_lock_both(topic1, topic2)
Lock two topics.
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Structure containing callbacks for Stasis message sanitization.
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate...
struct stasis_topic * from_topic
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
int final_message_processed
int args
This gets set in ast_cli_register()
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
struct ao2_container * declined
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
struct stasis_declined_config * declined_message_types
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
A multi object blob data structure to carry user event stasis messages.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
struct stasis_topic * to_topic
struct ao2_container * container
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_VECTOR(name, type)
Define a vector structure.
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Their was an error and no changes were applied.
struct stasis_topic * topic
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Configuration option-handling.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Support for dynamic strings.
struct ast_taskprocessor * mailbox
#define ao2_unlink(container, obj)
Remove an object from a container.
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
userdata associated with baseline taskprocessor test
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
enum stasis_subscription_message_formatters accepted_formatters
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
#define ast_calloc(num, len)
A wrapper for calloc()
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Holds details about changes to subscriptions for the specified topic.
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Vector container support.
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
An API for managing task processing threads that can be shared across modules.
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
stasis_subscription_cb callback
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
enum stasis_subscription_message_filter filter
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
The arg parameter is an object of the same type.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
A ast_taskprocessor structure is a singleton by name.
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
stasis_subscription_message_filter
Stasis subscription message filters.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Standard Command Line Interface.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Type information about a category-level configurable object.
An opaque threadpool structure.
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
#define INITIAL_SUBSCRIBERS_MAX
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
A structure to hold global configuration-related options.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
struct stasis_threadpool_conf * threadpool_options
Abstract JSON element (object, array, string, int, ...).
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
AO2_STRING_FIELD_SORT_FN(transport_monitor, key)
Sort function for struct transport_monitor.
Search option field mask.
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
stasis_subscription_message_formatters
Stasis subscription formatter filters.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic...
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Structure for mutex and tracking information.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
#define ao2_link(container, obj)
Add an object to a container.
int stasis_init(void)
Initialize the Stasis subsystem.