Asterisk - The Open Source Telephony Project  21.4.1
stasis.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/utils.h"
38 #include "asterisk/uuid.h"
39 #include "asterisk/vector.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/stasis_bridges.h"
44 #include "asterisk/cli.h"
45 
46 /*** DOCUMENTATION
47  <managerEvent language="en_US" name="UserEvent">
48  <managerEventInstance class="EVENT_FLAG_USER">
49  <synopsis>A user defined event raised from the dialplan.</synopsis>
50  <syntax>
51  <channel_snapshot/>
52  <parameter name="UserEvent">
53  <para>The event name, as specified in the dialplan.</para>
54  </parameter>
55  </syntax>
56  <description>
57  <para>Event may contain additional arbitrary parameters in addition to optional bridge and endpoint snapshots. Multiple snapshots of the same type are prefixed with a numeric value.</para>
58  </description>
59  <see-also>
60  <ref type="application">UserEvent</ref>
61  <ref type="managerEvent">UserEvent</ref>
62  </see-also>
63  </managerEventInstance>
64  </managerEvent>
65  <configInfo name="stasis" language="en_US">
66  <configFile name="stasis.conf">
67  <configObject name="threadpool">
68  <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
69  <configOption name="initial_size" default="5">
70  <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
71  </configOption>
72  <configOption name="idle_timeout_sec" default="20">
73  <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
74  </configOption>
75  <configOption name="max_size" default="50">
76  <synopsis>Maximum number of threads in the threadpool.</synopsis>
77  </configOption>
78  </configObject>
79  <configObject name="declined_message_types">
80  <synopsis>Stasis message types for which to decline creation.</synopsis>
81  <configOption name="decline">
82  <synopsis>The message type to decline.</synopsis>
83  <description>
84  <para>This configuration option defines the name of the Stasis
85  message type that Asterisk is forbidden from creating and can be
86  specified as many times as necessary to achieve the desired result.</para>
87  <enumlist>
88  <enum name="stasis_app_recording_snapshot_type" />
89  <enum name="stasis_app_playback_snapshot_type" />
90  <enum name="stasis_test_message_type" />
91  <enum name="confbridge_start_type" />
92  <enum name="confbridge_end_type" />
93  <enum name="confbridge_join_type" />
94  <enum name="confbridge_leave_type" />
95  <enum name="confbridge_start_record_type" />
96  <enum name="confbridge_stop_record_type" />
97  <enum name="confbridge_mute_type" />
98  <enum name="confbridge_unmute_type" />
99  <enum name="confbridge_talking_type" />
100  <enum name="cel_generic_type" />
101  <enum name="ast_bridge_snapshot_type" />
102  <enum name="ast_bridge_merge_message_type" />
103  <enum name="ast_channel_entered_bridge_type" />
104  <enum name="ast_channel_left_bridge_type" />
105  <enum name="ast_blind_transfer_type" />
106  <enum name="ast_attended_transfer_type" />
107  <enum name="ast_endpoint_snapshot_type" />
108  <enum name="ast_endpoint_state_type" />
109  <enum name="ast_device_state_message_type" />
110  <enum name="ast_test_suite_message_type" />
111  <enum name="ast_mwi_state_type" />
112  <enum name="ast_mwi_vm_app_type" />
113  <enum name="ast_format_register_type" />
114  <enum name="ast_format_unregister_type" />
115  <enum name="ast_manager_get_generic_type" />
116  <enum name="ast_parked_call_type" />
117  <enum name="ast_channel_snapshot_type" />
118  <enum name="ast_channel_dial_type" />
119  <enum name="ast_channel_varset_type" />
120  <enum name="ast_channel_hangup_request_type" />
121  <enum name="ast_channel_dtmf_begin_type" />
122  <enum name="ast_channel_dtmf_end_type" />
123  <enum name="ast_channel_flash_type" />
124  <enum name="ast_channel_wink_type" />
125  <enum name="ast_channel_hold_type" />
126  <enum name="ast_channel_unhold_type" />
127  <enum name="ast_channel_chanspy_start_type" />
128  <enum name="ast_channel_chanspy_stop_type" />
129  <enum name="ast_channel_fax_type" />
130  <enum name="ast_channel_hangup_handler_type" />
131  <enum name="ast_channel_moh_start_type" />
132  <enum name="ast_channel_moh_stop_type" />
133  <enum name="ast_channel_mixmonitor_start_type" />
134  <enum name="ast_channel_mixmonitor_stop_type" />
135  <enum name="ast_channel_mixmonitor_mute_type" />
136  <enum name="ast_channel_agent_login_type" />
137  <enum name="ast_channel_agent_logoff_type" />
138  <enum name="ast_channel_talking_start" />
139  <enum name="ast_channel_talking_stop" />
140  <enum name="ast_security_event_type" />
141  <enum name="ast_named_acl_change_type" />
142  <enum name="ast_local_bridge_type" />
143  <enum name="ast_local_optimization_begin_type" />
144  <enum name="ast_local_optimization_end_type" />
145  <enum name="stasis_subscription_change_type" />
146  <enum name="ast_multi_user_event_type" />
147  <enum name="stasis_cache_clear_type" />
148  <enum name="stasis_cache_update_type" />
149  <enum name="ast_network_change_type" />
150  <enum name="ast_system_registry_type" />
151  <enum name="ast_cc_available_type" />
152  <enum name="ast_cc_offertimerstart_type" />
153  <enum name="ast_cc_requested_type" />
154  <enum name="ast_cc_requestacknowledged_type" />
155  <enum name="ast_cc_callerstopmonitoring_type" />
156  <enum name="ast_cc_callerstartmonitoring_type" />
157  <enum name="ast_cc_callerrecalling_type" />
158  <enum name="ast_cc_recallcomplete_type" />
159  <enum name="ast_cc_failure_type" />
160  <enum name="ast_cc_monitorfailed_type" />
161  <enum name="ast_presence_state_message_type" />
162  <enum name="ast_rtp_rtcp_sent_type" />
163  <enum name="ast_rtp_rtcp_received_type" />
164  <enum name="ast_call_pickup_type" />
165  <enum name="aoc_s_type" />
166  <enum name="aoc_d_type" />
167  <enum name="aoc_e_type" />
168  <enum name="dahdichannel_type" />
169  <enum name="mcid_type" />
170  <enum name="session_timeout_type" />
171  <enum name="cdr_read_message_type" />
172  <enum name="cdr_write_message_type" />
173  <enum name="cdr_prop_write_message_type" />
174  <enum name="corosync_ping_message_type" />
175  <enum name="agi_exec_start_type" />
176  <enum name="agi_exec_end_type" />
177  <enum name="agi_async_start_type" />
178  <enum name="agi_async_exec_type" />
179  <enum name="agi_async_end_type" />
180  <enum name="queue_caller_join_type" />
181  <enum name="queue_caller_leave_type" />
182  <enum name="queue_caller_abandon_type" />
183  <enum name="queue_member_status_type" />
184  <enum name="queue_member_added_type" />
185  <enum name="queue_member_removed_type" />
186  <enum name="queue_member_pause_type" />
187  <enum name="queue_member_penalty_type" />
188  <enum name="queue_member_ringinuse_type" />
189  <enum name="queue_agent_called_type" />
190  <enum name="queue_agent_connect_type" />
191  <enum name="queue_agent_complete_type" />
192  <enum name="queue_agent_dump_type" />
193  <enum name="queue_agent_ringnoanswer_type" />
194  <enum name="meetme_join_type" />
195  <enum name="meetme_leave_type" />
196  <enum name="meetme_end_type" />
197  <enum name="meetme_mute_type" />
198  <enum name="meetme_talking_type" />
199  <enum name="meetme_talk_request_type" />
200  <enum name="appcdr_message_type" />
201  <enum name="forkcdr_message_type" />
202  <enum name="cdr_sync_message_type" />
203  </enumlist>
204  </description>
205  </configOption>
206  </configObject>
207  </configFile>
208  </configInfo>
209 ***/
210 
211 /*!
212  * \page stasis-impl Stasis Implementation Notes
213  *
214  * \par Reference counting
215  *
216  * Stasis introduces a number of objects, which are tightly related to one
217  * another. Because we rely on ref-counting for memory management, understanding
218  * these relationships is important to understanding this code.
219  *
220  * \code{.txt}
221  *
222  * stasis_topic <----> stasis_subscription
223  * ^ ^
224  * \ /
225  * \ /
226  * dispatch
227  * |
228  * |
229  * v
230  * stasis_message
231  * |
232  * |
233  * v
234  * stasis_message_type
235  *
236  * \endcode
237  *
238  * The most troubling thing in this chart is the cyclic reference between
239  * stasis_topic and stasis_subscription. This is both unfortunate, and
240  * necessary. Topics need the subscription in order to dispatch messages;
241  * subscriptions need the topics to unsubscribe and check subscription status.
242  *
243  * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
244  * topic's reference to a subscription. When the subcription is destroyed, it
245  * will remove its reference to the topic.
246  *
247  * This means that until a subscription has be explicitly unsubscribed, it will
248  * not be destroyed. Neither will a topic be destroyed while it has subscribers.
249  * The destructors of both have assertions regarding this to catch ref-counting
250  * problems where a subscription or topic has had an extra ao2_cleanup().
251  *
252  * The \ref dispatch_exec_sync object is a transient object, which is posted to
253  * a subscription's taskprocessor to send a message to the subscriber. They have
254  * short life cycles, allocated on one thread, destroyed on another.
255  *
256  * During shutdown, or the deletion of a domain object, there are a flurry of
257  * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
258  * are processed. Any one of these cleanups could be the one to actually destroy
259  * a given object, so care must be taken to ensure that an object isn't
260  * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
261  * that might happen when a RAII_VAR() goes out of scope.
262  *
263  * \par Typical life cycles
264  *
265  * \li stasis_topic - There are several topics which live for the duration of
266  * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
267  * are actually fed by shorter-lived topics whose lifetime is associated
268  * with some domain object (like ast_channel_topic() for a given
269  * ast_channel).
270  *
271  * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
272  * topics, for similar reasons.
273  *
274  * \li dispatch - Very short lived; just long enough to post a message to a
275  * subscriber.
276  *
277  * \li stasis_message - Short to intermediate lifetimes, but that is mostly
278  * irrelevant. Messages are strictly data and have no behavior associated
279  * with them, so it doesn't really matter if/when they are destroyed. By
280  * design, a component could hold a ref to a message forever without any
281  * ill consequences (aside from consuming more memory).
282  *
283  * \li stasis_message_type - Long life cycles, typically only destroyed on
284  * module unloading or _clean_ process exit.
285  *
286  * \par Subscriber shutdown sequencing
287  *
288  * Subscribers are sensitive to shutdown sequencing, specifically in how the
289  * reference message types. This is fully detailed in the documentation at
290  * https://docs.asterisk.org/Development/Roadmap/Asterisk-12-Projects/Asterisk-12-API-Improvements/Stasis-Message-Bus/Using-the-Stasis-Message-Bus/Stasis-Subscriber-Shutdown-Problem/.
291  *
292  * In short, the lifetime of the \a data (and \a callback, if in a module) must
293  * be held until the stasis_subscription_final_message() has been received.
294  * Depending on the structure of the subscriber code, this can be handled by
295  * using stasis_subscription_final_message() to free resources on the final
296  * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
297  * block until the unsubscribe has completed.
298  */
299 
300 /*! Initial size of the subscribers list. */
301 #define INITIAL_SUBSCRIBERS_MAX 4
302 
303 /*! The number of buckets to use for topic pools */
304 #define TOPIC_POOL_BUCKETS 57
305 
306 /*! Thread pool for topics that don't want a dedicated taskprocessor */
307 static struct ast_threadpool *threadpool;
308 
310 
311 #if defined(LOW_MEMORY)
312 
313 #define TOPIC_ALL_BUCKETS 257
314 
315 #else
316 
317 #define TOPIC_ALL_BUCKETS 997
318 
319 #endif
320 
321 #ifdef AST_DEVMODE
322 
323 /*! The number of buckets to use for topic statistics */
324 #define TOPIC_STATISTICS_BUCKETS 57
325 
326 /*! The number of buckets to use for subscription statistics */
327 #define SUBSCRIPTION_STATISTICS_BUCKETS 57
328 
329 /*! Global container which stores statistics for topics */
330 static AO2_GLOBAL_OBJ_STATIC(topic_statistics);
331 
332 /*! Global container which stores statistics for subscriptions */
333 static AO2_GLOBAL_OBJ_STATIC(subscription_statistics);
334 
335 /*! \internal */
336 struct stasis_message_type_statistics {
337  /*! \brief The number of messages of this published */
338  int published;
339  /*! \brief The number of messages of this that did not reach a subscriber */
340  int unused;
341  /*! \brief The stasis message type */
342  struct stasis_message_type *message_type;
343 };
344 
345 /*! Lock to protect the message types vector */
346 AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
347 
348 /*! Vector containing message type information */
349 static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
350 
351 /*! \internal */
352 struct stasis_topic_statistics {
353  /*! \brief Highest time spent dispatching messages to subscribers */
354  long highest_time_dispatched;
355  /*! \brief Lowest time spent dispatching messages to subscribers */
356  long lowest_time_dispatched;
357  /*! \brief The number of messages that were not dispatched to any subscriber */
358  int messages_not_dispatched;
359  /*! \brief The number of messages that were dispatched to at least 1 subscriber */
360  int messages_dispatched;
361  /*! \brief The ids of the subscribers to this topic */
362  struct ao2_container *subscribers;
363  /*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
364  struct stasis_topic *topic;
365  /*! \brief Name of the topic */
366  char name[0];
367 };
368 #endif
369 
370 /*! \internal */
371 struct stasis_topic {
372  /*! Variable length array of the subscribers */
374 
375  /*! Topics forwarding into this topic */
377 
378 #ifdef AST_DEVMODE
379  struct stasis_topic_statistics *statistics;
380 #endif
381 
382  /*! Unique incrementing integer for subscriber ids */
384 
385  /*! Name of the topic */
386  char *name;
387 
388  /*! Detail of the topic */
389  char *detail;
390 
391  /*! Creation time */
392  struct timeval *creationtime;
393 };
394 
395 struct ao2_container *topic_all;
396 
397 struct topic_proxy {
398  AO2_WEAKPROXY();
399 
400  char *name;
401  char *detail;
402 
403  struct timeval creationtime;
404 
405  char buf[0];
406 };
407 
410 AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name);
411 
412 static void proxy_dtor(void *weakproxy, void *container)
413 {
414  ao2_unlink(container, weakproxy);
415  ao2_cleanup(container);
416 }
417 
418 /* Forward declarations for the tightly-coupled subscription object */
419 static int topic_add_subscription(struct stasis_topic *topic,
420  struct stasis_subscription *sub);
421 
422 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
423 
424 /*! \brief Lock two topics. */
425 #define topic_lock_both(topic1, topic2) \
426  do { \
427  ao2_lock(topic1); \
428  while (ao2_trylock(topic2)) { \
429  AO2_DEADLOCK_AVOIDANCE(topic1); \
430  } \
431  } while (0)
432 
433 static void topic_dtor(void *obj)
434 {
435  struct stasis_topic *topic = obj;
436 #ifdef AST_DEVMODE
437  struct ao2_container *topic_stats;
438 #endif
439 
440  ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
441  topic->name, topic->detail);
442 
443  /* Subscribers hold a reference to topics, so they should all be
444  * unsubscribed before we get here. */
445  ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
446 
447  AST_VECTOR_FREE(&topic->subscribers);
449  ast_debug(1, "Topic '%s': %p destroyed\n", topic->name, topic);
450 
451 #ifdef AST_DEVMODE
452  if (topic->statistics) {
453  topic_stats = ao2_global_obj_ref(topic_statistics);
454  if (topic_stats) {
455  ao2_unlink(topic_stats, topic->statistics);
456  ao2_ref(topic_stats, -1);
457  }
458  ao2_ref(topic->statistics, -1);
459  }
460 #endif
461 }
462 
463 #ifdef AST_DEVMODE
464 static void topic_statistics_destroy(void *obj)
465 {
466  struct stasis_topic_statistics *statistics = obj;
467 
468  ao2_cleanup(statistics->subscribers);
469 }
470 
471 static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
472 {
473  struct stasis_topic_statistics *statistics;
474  RAII_VAR(struct ao2_container *, topic_stats, ao2_global_obj_ref(topic_statistics), ao2_cleanup);
475 
476  if (!topic_stats) {
477  return NULL;
478  }
479 
480  statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
481  if (!statistics) {
482  return NULL;
483  }
484 
485  statistics->subscribers = ast_str_container_alloc(1);
486  if (!statistics->subscribers) {
487  ao2_ref(statistics, -1);
488  return NULL;
489  }
490 
491  /* This is strictly used for the pointer address when showing the topic */
492  statistics->topic = topic;
493  strcpy(statistics->name, topic->name); /* SAFE */
494  ao2_link(topic_stats, statistics);
495 
496  return statistics;
497 }
498 #endif
499 
500 static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
501 {
502  struct topic_proxy *proxy;
503  struct stasis_topic* topic_tmp;
504  size_t detail_len;
505 
506  if (!topic || !name || !strlen(name) || !detail) {
507  return -1;
508  }
509 
510  ao2_wrlock(topic_all);
511 
512  topic_tmp = stasis_topic_get(name);
513  if (topic_tmp) {
514  ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
515  ao2_ref(topic_tmp, -1);
516  ao2_unlock(topic_all);
517 
518  return -1;
519  }
520 
521  detail_len = strlen(detail) + 1;
522 
523  proxy = ao2_t_weakproxy_alloc(
524  sizeof(*proxy) + strlen(name) + 1 + detail_len, NULL, name);
525  if (!proxy) {
526  ao2_unlock(topic_all);
527 
528  return -1;
529  }
530 
531  /* set the proxy info */
532  proxy->name = proxy->buf;
533  proxy->detail = proxy->name + strlen(name) + 1;
534 
535  strcpy(proxy->name, name); /* SAFE */
536  ast_copy_string(proxy->detail, detail, detail_len); /* SAFE */
537  proxy->creationtime = ast_tvnow();
538 
539  /* We have exclusive access to proxy, no need for locking here. */
540  if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
541  ao2_cleanup(proxy);
542  ao2_unlock(topic_all);
543 
544  return -1;
545  }
546 
547  if (ao2_weakproxy_subscribe(proxy, proxy_dtor, ao2_bump(topic_all), OBJ_NOLOCK)) {
548  ao2_cleanup(proxy);
549  ao2_unlock(topic_all);
550  ao2_cleanup(topic_all);
551 
552  return -1;
553  }
554 
555  /* setting the topic point to the proxy */
556  topic->name = proxy->name;
557  topic->detail = proxy->detail;
558  topic->creationtime = &(proxy->creationtime);
559 
560  ao2_link_flags(topic_all, proxy, OBJ_NOLOCK);
561  ao2_ref(proxy, -1);
562 
563  ao2_unlock(topic_all);
564 
565  return 0;
566 }
567 
569  const char *name, const char* detail
570  )
571 {
572  struct stasis_topic *topic;
573  int res = 0;
574 
575  if (!name|| !strlen(name) || !detail) {
576  return NULL;
577  }
578  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
579 
580  topic = stasis_topic_get(name);
581  if (topic) {
582  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
583  name, detail);
584  return topic;
585  }
586 
587  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
588  if (!topic) {
589  return NULL;
590  }
591 
593  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
594  if (res) {
595  ao2_ref(topic, -1);
596  return NULL;
597  }
598 
599  /* link to the proxy */
600  if (link_topic_proxy(topic, name, detail)) {
601  ao2_ref(topic, -1);
602  return NULL;
603  }
604 
605 #ifdef AST_DEVMODE
606  topic->statistics = stasis_topic_statistics_create(topic);
607  if (!topic->statistics) {
608  ao2_ref(topic, -1);
609  return NULL;
610  }
611 #endif
612  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
613 
614  return topic;
615 }
616 
617 struct stasis_topic *stasis_topic_create(const char *name)
618 {
619  return stasis_topic_create_with_detail(name, "");
620 }
621 
622 struct stasis_topic *stasis_topic_get(const char *name)
623 {
624  return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
625 }
626 
627 const char *stasis_topic_name(const struct stasis_topic *topic)
628 {
629  if (!topic) {
630  return NULL;
631  }
632  return topic->name;
633 }
634 
635 const char *stasis_topic_detail(const struct stasis_topic *topic)
636 {
637  if (!topic) {
638  return NULL;
639  }
640  return topic->detail;
641 }
642 
643 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
644 {
645  return AST_VECTOR_SIZE(&topic->subscribers);
646 }
647 
648 #ifdef AST_DEVMODE
649 struct stasis_subscription_statistics {
650  /*! \brief The filename where the subscription originates */
651  const char *file;
652  /*! \brief The function where the subscription originates */
653  const char *func;
654  /*! \brief Names of the topics we are subscribed to */
655  struct ao2_container *topics;
656  /*! \brief The message type that currently took the longest to process */
657  struct stasis_message_type *highest_time_message_type;
658  /*! \brief Highest time spent invoking a message */
659  long highest_time_invoked;
660  /*! \brief Lowest time spent invoking a message */
661  long lowest_time_invoked;
662  /*! \brief The number of messages that were filtered out */
663  int messages_dropped;
664  /*! \brief The number of messages that passed filtering */
665  int messages_passed;
666  /*! \brief Using a mailbox to queue messages */
667  int uses_mailbox;
668  /*! \brief Using stasis threadpool for handling messages */
669  int uses_threadpool;
670  /*! \brief The line number where the subscription originates */
671  int lineno;
672  /*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
673  struct stasis_subscription *sub;
674  /*! \brief Unique ID of the subscription */
675  char uniqueid[0];
676 };
677 #endif
678 
679 /*! \internal */
681  /*! Unique ID for this subscription */
682  char *uniqueid;
683  /*! Topic subscribed to. */
685  /*! Mailbox for processing incoming messages. */
687  /*! Callback function for incoming message processing. */
689  /*! Data pointer to be handed to the callback. */
690  void *data;
691 
692  /*! Condition for joining with subscription. */
693  ast_cond_t join_cond;
694  /*! Flag set when final message for sub has been received.
695  * Be sure join_lock is held before reading/setting. */
697  /*! Flag set when final message for sub has been processed.
698  * Be sure join_lock is held before reading/setting. */
700 
701  /*! The message types this subscription is accepting */
703  /*! The message formatters this subscription is accepting */
705  /*! The message filter currently in use */
707 
708 #ifdef AST_DEVMODE
709  /*! Statistics information */
710  struct stasis_subscription_statistics *statistics;
711 #endif
712 };
713 
714 static void subscription_dtor(void *obj)
715 {
716  struct stasis_subscription *sub = obj;
717 #ifdef AST_DEVMODE
718  struct ao2_container *subscription_stats;
719 #endif
720 
721  /* Subscriptions need to be manually unsubscribed before destruction
722  * b/c there's a cyclic reference between topics and subscriptions */
723  ast_assert(!stasis_subscription_is_subscribed(sub));
724  /* If there are any messages in flight to this subscription; that would
725  * be bad. */
726  ast_assert(stasis_subscription_is_done(sub));
727 
728  ast_free(sub->uniqueid);
729  ao2_cleanup(sub->topic);
730  sub->topic = NULL;
732  sub->mailbox = NULL;
733  ast_cond_destroy(&sub->join_cond);
734 
736 
737 #ifdef AST_DEVMODE
738  if (sub->statistics) {
739  subscription_stats = ao2_global_obj_ref(subscription_statistics);
740  if (subscription_stats) {
741  ao2_unlink(subscription_stats, sub->statistics);
742  ao2_ref(subscription_stats, -1);
743  }
744  ao2_ref(sub->statistics, -1);
745  }
746 #endif
747 }
748 
749 /*!
750  * \brief Invoke the subscription's callback.
751  * \param sub Subscription to invoke.
752  * \param message Message to send.
753  */
754 static void subscription_invoke(struct stasis_subscription *sub,
755  struct stasis_message *message)
756 {
757  unsigned int final = stasis_subscription_final_message(sub, message);
759 #ifdef AST_DEVMODE
760  struct timeval start;
761  long elapsed;
762 
763  start = ast_tvnow();
764 #endif
765 
766  /* Notify that the final message has been received */
767  if (final) {
768  ao2_lock(sub);
769  sub->final_message_rxed = 1;
770  ast_cond_signal(&sub->join_cond);
771  ao2_unlock(sub);
772  }
773 
774  /*
775  * If filtering is turned on and this is a 'final' message, we only invoke the callback
776  * if the subscriber accepts subscription_change message types.
777  */
778  if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
779  (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
780  /* Since sub is mostly immutable, no need to lock sub */
781  sub->callback(sub->data, sub, message);
782  }
783 
784  /* Notify that the final message has been processed */
785  if (final) {
786  ao2_lock(sub);
787  sub->final_message_processed = 1;
788  ast_cond_signal(&sub->join_cond);
789  ao2_unlock(sub);
790  }
791 
792 #ifdef AST_DEVMODE
793  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
794  if (elapsed > sub->statistics->highest_time_invoked) {
795  sub->statistics->highest_time_invoked = elapsed;
796  ao2_lock(sub->statistics);
797  sub->statistics->highest_time_message_type = stasis_message_type(message);
798  ao2_unlock(sub->statistics);
799  }
800  if (elapsed < sub->statistics->lowest_time_invoked) {
801  sub->statistics->lowest_time_invoked = elapsed;
802  }
803 #endif
804 }
805 
806 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
807 static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
808 
810 {
811 }
812 
813 #ifdef AST_DEVMODE
814 static void subscription_statistics_destroy(void *obj)
815 {
816  struct stasis_subscription_statistics *statistics = obj;
817 
818  ao2_cleanup(statistics->topics);
819 }
820 
821 static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
822  int needs_mailbox, int use_thread_pool, const char *file, int lineno,
823  const char *func)
824 {
825  struct stasis_subscription_statistics *statistics;
826  RAII_VAR(struct ao2_container *, subscription_stats, ao2_global_obj_ref(subscription_statistics), ao2_cleanup);
827 
828  if (!subscription_stats) {
829  return NULL;
830  }
831 
832  statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
833  if (!statistics) {
834  return NULL;
835  }
836 
837  statistics->topics = ast_str_container_alloc(1);
838  if (!statistics->topics) {
839  ao2_ref(statistics, -1);
840  return NULL;
841  }
842 
843  statistics->file = file;
844  statistics->lineno = lineno;
845  statistics->func = func;
846  statistics->uses_mailbox = needs_mailbox;
847  statistics->uses_threadpool = use_thread_pool;
848  strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
849  statistics->sub = sub;
850  ao2_link(subscription_stats, statistics);
851 
852  return statistics;
853 }
854 #endif
855 
857  struct stasis_topic *topic,
858  stasis_subscription_cb callback,
859  void *data,
860  int needs_mailbox,
861  int use_thread_pool,
862  const char *file,
863  int lineno,
864  const char *func)
865 {
866  struct stasis_subscription *sub;
867  int ret;
868 
869  if (!topic) {
870  return NULL;
871  }
872 
873  /* The ao2 lock is used for join_cond. */
874  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
875  if (!sub) {
876  return NULL;
877  }
878 
879 #ifdef AST_DEVMODE
880  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
881  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
882  if (ret < 0 || !sub->statistics) {
883  ao2_ref(sub, -1);
884  return NULL;
885  }
886 #else
887  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
888  if (ret < 0) {
889  ao2_ref(sub, -1);
890  return NULL;
891  }
892 #endif
893 
894  if (needs_mailbox) {
895  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
896 
897  /* Create name with seq number appended. */
898  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
899  use_thread_pool ? 'p' : 'm',
900  stasis_topic_name(topic));
901 
902  /*
903  * With a small number of subscribers, a thread-per-sub is
904  * acceptable. For a large number of subscribers, a thread
905  * pool should be used.
906  */
907  if (use_thread_pool) {
908  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
909  } else {
911  }
912  if (!sub->mailbox) {
913  ao2_ref(sub, -1);
914 
915  return NULL;
916  }
918  /* Taskprocessor has a reference */
919  ao2_ref(sub, +1);
920  }
921 
922  ao2_ref(topic, +1);
923  sub->topic = topic;
924  sub->callback = callback;
925  sub->data = data;
926  ast_cond_init(&sub->join_cond, NULL);
929  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
930 
931  if (topic_add_subscription(topic, sub) != 0) {
932  ao2_ref(sub, -1);
933  ao2_ref(topic, -1);
934 
935  return NULL;
936  }
937  send_subscription_subscribe(topic, sub);
938 
939  return sub;
940 }
941 
943  struct stasis_topic *topic,
944  stasis_subscription_cb callback,
945  void *data,
946  const char *file,
947  int lineno,
948  const char *func)
949 {
950  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
951 }
952 
954  struct stasis_topic *topic,
955  stasis_subscription_cb callback,
956  void *data,
957  const char *file,
958  int lineno,
959  const char *func)
960 {
961  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
962 }
963 
964 static int sub_cleanup(void *data)
965 {
966  struct stasis_subscription *sub = data;
967  ao2_cleanup(sub);
968  return 0;
969 }
970 
972 {
973  /* The subscription may be the last ref to this topic. Hold
974  * the topic ref open until after the unlock. */
975  struct stasis_topic *topic;
976 
977  if (!sub) {
978  return NULL;
979  }
980 
981  topic = ao2_bump(sub->topic);
982 
983  /* We have to remove the subscription first, to ensure the unsubscribe
984  * is the final message */
985  if (topic_remove_subscription(sub->topic, sub) != 0) {
986  ast_log(LOG_ERROR,
987  "Internal error: subscription has invalid topic\n");
988  ao2_cleanup(topic);
989 
990  return NULL;
991  }
992 
993  /* Now let everyone know about the unsubscribe */
994  send_subscription_unsubscribe(topic, sub);
995 
996  /* When all that's done, remove the ref the mailbox has on the sub */
997  if (sub->mailbox) {
998  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
999  /* Nothing we can do here, the conditional is just to keep
1000  * the compiler happy that we're not ignoring the result. */
1001  }
1002  }
1003 
1004  /* Unsubscribing unrefs the subscription */
1005  ao2_cleanup(sub);
1006  ao2_cleanup(topic);
1007 
1008  return NULL;
1009 }
1010 
1012  long low_water, long high_water)
1013 {
1014  int res = -1;
1015 
1016  if (subscription) {
1017  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1018  low_water, high_water);
1019  }
1020  return res;
1021 }
1022 
1024  const struct stasis_message_type *type)
1025 {
1026  if (!subscription) {
1027  return -1;
1028  }
1029 
1030  ast_assert(type != NULL);
1031  ast_assert(stasis_message_type_name(type) != NULL);
1032 
1033  if (!type || !stasis_message_type_name(type)) {
1034  /* Filtering is unreliable as this message type is not yet initialized
1035  * so force all messages through.
1036  */
1038  return 0;
1039  }
1040 
1041  ao2_lock(subscription->topic);
1042  if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1043  /* We do this for the same reason as above. The subscription can still operate, so allow
1044  * it to do so by forcing all messages through.
1045  */
1047  }
1048  ao2_unlock(subscription->topic);
1049 
1050  return 0;
1051 }
1052 
1054  const struct stasis_message_type *type)
1055 {
1056  if (!subscription) {
1057  return -1;
1058  }
1059 
1060  ast_assert(type != NULL);
1061  ast_assert(stasis_message_type_name(type) != NULL);
1062 
1063  if (!type || !stasis_message_type_name(type)) {
1064  return 0;
1065  }
1066 
1067  ao2_lock(subscription->topic);
1068  if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1069  /* The memory is already allocated so this can't fail */
1071  }
1072  ao2_unlock(subscription->topic);
1073 
1074  return 0;
1075 }
1076 
1079 {
1080  if (!subscription) {
1081  return -1;
1082  }
1083 
1084  ao2_lock(subscription->topic);
1085  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1086  subscription->filter = filter;
1087  }
1088  ao2_unlock(subscription->topic);
1089 
1090  return 0;
1091 }
1092 
1095 {
1096  ast_assert(subscription != NULL);
1097 
1098  ao2_lock(subscription->topic);
1099  subscription->accepted_formatters = formatters;
1100  ao2_unlock(subscription->topic);
1101 
1102  return;
1103 }
1104 
1106 {
1107  if (subscription) {
1108  ao2_lock(subscription);
1109  /* Wait until the processed flag has been set */
1110  while (!subscription->final_message_processed) {
1111  ast_cond_wait(&subscription->join_cond,
1112  ao2_object_get_lockaddr(subscription));
1113  }
1114  ao2_unlock(subscription);
1115  }
1116 }
1117 
1119 {
1120  if (subscription) {
1121  int ret;
1122 
1123  ao2_lock(subscription);
1124  ret = subscription->final_message_rxed;
1125  ao2_unlock(subscription);
1126 
1127  return ret;
1128  }
1129 
1130  /* Null subscription is about as done as you can get */
1131  return 1;
1132 }
1133 
1135  struct stasis_subscription *subscription)
1136 {
1137  if (!subscription) {
1138  return NULL;
1139  }
1140 
1141  /* Bump refcount to hold it past the unsubscribe */
1142  ao2_ref(subscription, +1);
1143  stasis_unsubscribe(subscription);
1144  stasis_subscription_join(subscription);
1145  /* Now decrement the refcount back */
1146  ao2_cleanup(subscription);
1147  return NULL;
1148 }
1149 
1151 {
1152  if (sub) {
1153  size_t i;
1154  struct stasis_topic *topic = sub->topic;
1155 
1156  ao2_lock(topic);
1157  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1158  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1159  ao2_unlock(topic);
1160  return 1;
1161  }
1162  }
1163  ao2_unlock(topic);
1164  }
1165 
1166  return 0;
1167 }
1168 
1170 {
1171  return sub->uniqueid;
1172 }
1173 
1175 {
1176  struct stasis_subscription_change *change;
1177 
1179  return 0;
1180  }
1181 
1182  change = stasis_message_data(msg);
1183  if (strcmp("Unsubscribe", change->description)) {
1184  return 0;
1185  }
1186 
1187  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1188  return 0;
1189  }
1190 
1191  return 1;
1192 }
1193 
1194 /*!
1195  * \brief Add a subscriber to a topic.
1196  * \param topic Topic
1197  * \param sub Subscriber
1198  * \return 0 on success
1199  * \return Non-zero on error
1200  */
1201 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1202 {
1203  size_t idx;
1204 
1205  ao2_lock(topic);
1206  /* The reference from the topic to the subscription is shared with
1207  * the owner of the subscription, which will explicitly unsubscribe
1208  * to release it.
1209  *
1210  * If we bumped the refcount here, the owner would have to unsubscribe
1211  * and cleanup, which is a bit awkward. */
1212  AST_VECTOR_APPEND(&topic->subscribers, sub);
1213 
1214  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1216  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1217  }
1218 
1219 #ifdef AST_DEVMODE
1221  ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1222 #endif
1223 
1224  ao2_unlock(topic);
1225 
1226  return 0;
1227 }
1228 
1229 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
1230 {
1231  size_t idx;
1232  int res;
1233 
1234  ao2_lock(topic);
1235  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1236  topic_remove_subscription(
1237  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1238  }
1239  res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
1241 
1242 #ifdef AST_DEVMODE
1243  if (!res) {
1245  ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
1246  }
1247 #endif
1248 
1249  ao2_unlock(topic);
1250 
1251  return res;
1252 }
1253 
1254 /*!
1255  * \internal \brief Dispatch a message to a subscriber asynchronously
1256  * \param local \ref ast_taskprocessor_local object
1257  * \return 0
1258  */
1259 static int dispatch_exec_async(struct ast_taskprocessor_local *local)
1260 {
1261  struct stasis_subscription *sub = local->local_data;
1262  struct stasis_message *message = local->data;
1263 
1264  subscription_invoke(sub, message);
1265  ao2_cleanup(message);
1266 
1267  return 0;
1268 }
1269 
1270 /*!
1271  * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
1272  * a published message to a subscriber
1273  */
1275  ast_mutex_t lock;
1276  ast_cond_t cond;
1277  int complete;
1278  void *task_data;
1279 };
1280 
1281 /*!
1282  * \internal \brief Dispatch a message to a subscriber synchronously
1283  * \param local \ref ast_taskprocessor_local object
1284  * \return 0
1285  */
1286 static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
1287 {
1288  struct stasis_subscription *sub = local->local_data;
1289  struct sync_task_data *std = local->data;
1290  struct stasis_message *message = std->task_data;
1291 
1292  subscription_invoke(sub, message);
1293  ao2_cleanup(message);
1294 
1295  ast_mutex_lock(&std->lock);
1296  std->complete = 1;
1297  ast_cond_signal(&std->cond);
1298  ast_mutex_unlock(&std->lock);
1299 
1300  return 0;
1301 }
1302 
1303 /*!
1304  * \internal \brief Dispatch a message to a subscriber
1305  * \param sub The subscriber to dispatch to
1306  * \param message The message to send
1307  * \param synchronous If non-zero, synchronize on the subscriber receiving
1308  * the message
1309  * \retval 0 if message was not dispatched
1310  * \retval 1 if message was dispatched
1311  */
1312 static unsigned int dispatch_message(struct stasis_subscription *sub,
1313  struct stasis_message *message,
1314  int synchronous)
1315 {
1316  int is_final = stasis_subscription_final_message(sub, message);
1317 
1318  /*
1319  * The 'do while' gives us an easy way to skip remaining logic once
1320  * we determine the message should be accepted.
1321  * The code looks more verbose than it needs to be but it optimizes
1322  * down very nicely. It's just easier to understand and debug this way.
1323  */
1324  do {
1325  struct stasis_message_type *message_type = stasis_message_type(message);
1326  int type_id = stasis_message_type_id(message_type);
1327  int type_filter_specified = 0;
1328  int formatter_filter_specified = 0;
1329  int type_filter_passed = 0;
1330  int formatter_filter_passed = 0;
1331 
1332  /* We always accept final messages so only run the filter logic if not final */
1333  if (is_final) {
1334  break;
1335  }
1336 
1337  type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
1338  formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
1339 
1340  /* Accept if no filters of either type were specified */
1341  if (!type_filter_specified && !formatter_filter_specified) {
1342  break;
1343  }
1344 
1345  type_filter_passed = type_filter_specified
1346  && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
1347  && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
1348 
1349  /*
1350  * Since the type and formatter filters are OR'd, we can skip
1351  * the formatter check if the type check passes.
1352  */
1353  if (type_filter_passed) {
1354  break;
1355  }
1356 
1357  formatter_filter_passed = formatter_filter_specified
1359 
1360  if (formatter_filter_passed) {
1361  break;
1362  }
1363 
1364 #ifdef AST_DEVMODE
1365  ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
1366 #endif
1367 
1368  return 0;
1369 
1370  } while (0);
1371 
1372 #ifdef AST_DEVMODE
1373  ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
1374 #endif
1375 
1376  if (!sub->mailbox) {
1377  /* Dispatch directly */
1378  subscription_invoke(sub, message);
1379  return 1;
1380  }
1381 
1382  /* Bump the message for the taskprocessor push. This will get de-ref'd
1383  * by the task processor callback.
1384  */
1385  ao2_bump(message);
1386  if (!synchronous) {
1387  if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
1388  /* Push failed; ugh. */
1389  ast_log(LOG_ERROR, "Dropping async dispatch\n");
1390  ao2_cleanup(message);
1391  return 0;
1392  }
1393  } else {
1394  struct sync_task_data std;
1395 
1396  ast_mutex_init(&std.lock);
1397  ast_cond_init(&std.cond, NULL);
1398  std.complete = 0;
1399  std.task_data = message;
1400 
1401  if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
1402  /* Push failed; ugh. */
1403  ast_log(LOG_ERROR, "Dropping sync dispatch\n");
1404  ao2_cleanup(message);
1405  ast_mutex_destroy(&std.lock);
1406  ast_cond_destroy(&std.cond);
1407  return 0;
1408  }
1409 
1410  ast_mutex_lock(&std.lock);
1411  while (!std.complete) {
1412  ast_cond_wait(&std.cond, &std.lock);
1413  }
1414  ast_mutex_unlock(&std.lock);
1415 
1416  ast_mutex_destroy(&std.lock);
1417  ast_cond_destroy(&std.cond);
1418  }
1419 
1420  return 1;
1421 }
1422 
1423 /*!
1424  * \internal \brief Publish a message to a topic's subscribers
1425  * \brief topic The topic to publish to
1426  * \brief message The message to publish
1427  * \brief sync_sub An optional subscriber of the topic to publish synchronously
1428  * to
1429  */
1430 static void publish_msg(struct stasis_topic *topic,
1431  struct stasis_message *message, struct stasis_subscription *sync_sub)
1432 {
1433  size_t i;
1434 #ifdef AST_DEVMODE
1435  unsigned int dispatched = 0;
1436  int message_type_id = stasis_message_type_id(stasis_message_type(message));
1437  struct stasis_message_type_statistics *statistics;
1438  struct timeval start;
1439  long elapsed;
1440 #endif
1441 
1442  ast_assert(topic != NULL);
1443  ast_assert(message != NULL);
1444 
1445 #ifdef AST_DEVMODE
1446  ast_mutex_lock(&message_type_statistics_lock);
1447  if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
1448  struct stasis_message_type_statistics new_statistics = {
1449  .published = 0,
1450  };
1451  if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
1452  ast_mutex_unlock(&message_type_statistics_lock);
1453  return;
1454  }
1455  }
1456  statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
1457  statistics->message_type = stasis_message_type(message);
1458  ast_mutex_unlock(&message_type_statistics_lock);
1459 
1460  ast_atomic_fetchadd_int(&statistics->published, +1);
1461 #endif
1462 
1463  /* If there are no subscribers don't bother */
1464  if (!stasis_topic_subscribers(topic)) {
1465 #ifdef AST_DEVMODE
1466  ast_atomic_fetchadd_int(&statistics->unused, +1);
1467  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1468 #endif
1469  return;
1470  }
1471 
1472  /*
1473  * The topic may be unref'ed by the subscription invocation.
1474  * Make sure we hold onto a reference while dispatching.
1475  */
1476  ao2_ref(topic, +1);
1477 #ifdef AST_DEVMODE
1478  start = ast_tvnow();
1479 #endif
1480  ao2_lock(topic);
1481  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1482  struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
1483 
1484  ast_assert(sub != NULL);
1485 #ifdef AST_DEVMODE
1486  dispatched +=
1487 #endif
1488  dispatch_message(sub, message, (sub == sync_sub));
1489  }
1490  ao2_unlock(topic);
1491 
1492 #ifdef AST_DEVMODE
1493  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
1494  if (elapsed > topic->statistics->highest_time_dispatched) {
1495  topic->statistics->highest_time_dispatched = elapsed;
1496  }
1497  if (elapsed < topic->statistics->lowest_time_dispatched) {
1498  topic->statistics->lowest_time_dispatched = elapsed;
1499  }
1500  if (dispatched) {
1501  ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
1502  } else {
1503  ast_atomic_fetchadd_int(&statistics->unused, +1);
1504  ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
1505  }
1506 #endif
1507 
1508  ao2_ref(topic, -1);
1509 }
1510 
1511 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
1512 {
1513  publish_msg(topic, message, NULL);
1514 }
1515 
1516 void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
1517 {
1518  ast_assert(sub != NULL);
1519 
1520  publish_msg(sub->topic, message, sub);
1521 }
1522 
1523 /*!
1524  * \brief Forwarding information
1525  *
1526  * Any message posted to \a from_topic is forwarded to \a to_topic.
1527  *
1528  * In cases where both the \a from_topic and \a to_topic need to be locked,
1529  * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
1530  */
1532  /*! Originating topic */
1534  /*! Destination topic */
1536 };
1537 
1538 static void forward_dtor(void *obj)
1539 {
1540  struct stasis_forward *forward = obj;
1541 
1542  ao2_cleanup(forward->from_topic);
1543  forward->from_topic = NULL;
1544  ao2_cleanup(forward->to_topic);
1545  forward->to_topic = NULL;
1546 }
1547 
1548 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
1549 {
1550  int idx;
1551  struct stasis_topic *from;
1552  struct stasis_topic *to;
1553 
1554  if (!forward) {
1555  return NULL;
1556  }
1557 
1558  from = forward->from_topic;
1559  to = forward->to_topic;
1560 
1561  if (from && to) {
1562  topic_lock_both(to, from);
1565 
1566  for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
1567  topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
1568  }
1569  ao2_unlock(from);
1570  ao2_unlock(to);
1571  }
1572 
1573  ao2_cleanup(forward);
1574 
1575  return NULL;
1576 }
1577 
1579  struct stasis_topic *to_topic)
1580 {
1581  int res;
1582  size_t idx;
1583  struct stasis_forward *forward;
1584 
1585  if (!from_topic || !to_topic) {
1586  return NULL;
1587  }
1588 
1589  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1590  if (!forward) {
1591  return NULL;
1592  }
1593 
1594  /* Forwards to ourselves are implicit. */
1595  if (to_topic == from_topic) {
1596  return forward;
1597  }
1598 
1599  forward->from_topic = ao2_bump(from_topic);
1600  forward->to_topic = ao2_bump(to_topic);
1601 
1602  topic_lock_both(to_topic, from_topic);
1603  res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1604  if (res != 0) {
1605  ao2_unlock(from_topic);
1606  ao2_unlock(to_topic);
1607  ao2_ref(forward, -1);
1608  return NULL;
1609  }
1610 
1611  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1612  topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1613  }
1614  ao2_unlock(from_topic);
1615  ao2_unlock(to_topic);
1616 
1617  return forward;
1618 }
1619 
1620 static void subscription_change_dtor(void *obj)
1621 {
1622  struct stasis_subscription_change *change = obj;
1623 
1624  ao2_cleanup(change->topic);
1625 }
1626 
1627 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
1628 {
1629  size_t description_len = strlen(description) + 1;
1630  size_t uniqueid_len = strlen(uniqueid) + 1;
1631  struct stasis_subscription_change *change;
1632 
1633  change = ao2_alloc_options(sizeof(*change) + description_len + uniqueid_len,
1634  subscription_change_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1635  if (!change) {
1636  return NULL;
1637  }
1638 
1639  strcpy(change->description, description); /* SAFE */
1640  change->uniqueid = change->description + description_len;
1641  ast_copy_string(change->uniqueid, uniqueid, uniqueid_len); /* SAFE */
1642  ao2_ref(topic, +1);
1643  change->topic = topic;
1644 
1645  return change;
1646 }
1647 
1648 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
1649 {
1650  struct stasis_subscription_change *change;
1651  struct stasis_message *msg;
1652 
1653  /* This assumes that we have already unsubscribed */
1654  ast_assert(stasis_subscription_is_subscribed(sub));
1655 
1657  return;
1658  }
1659 
1660  change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
1661  if (!change) {
1662  return;
1663  }
1664 
1666  if (!msg) {
1667  ao2_cleanup(change);
1668  return;
1669  }
1670 
1671  stasis_publish(topic, msg);
1672  ao2_cleanup(msg);
1673  ao2_cleanup(change);
1674 }
1675 
1676 static void send_subscription_unsubscribe(struct stasis_topic *topic,
1677  struct stasis_subscription *sub)
1678 {
1679  struct stasis_subscription_change *change;
1680  struct stasis_message *msg;
1681 
1682  /* This assumes that we have already unsubscribed */
1683  ast_assert(!stasis_subscription_is_subscribed(sub));
1684 
1686  return;
1687  }
1688 
1689  change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
1690  if (!change) {
1691  return;
1692  }
1693 
1695  if (!msg) {
1696  ao2_cleanup(change);
1697  return;
1698  }
1699 
1700  stasis_publish(topic, msg);
1701 
1702  /* Now we have to dispatch to the subscription itself */
1703  dispatch_message(sub, msg, 0);
1704 
1705  ao2_cleanup(msg);
1706  ao2_cleanup(change);
1707 }
1708 
1710  struct stasis_forward *forward;
1711  struct stasis_topic *topic;
1712  char name[0];
1713 };
1714 
1715 static void topic_pool_entry_dtor(void *obj)
1716 {
1717  struct topic_pool_entry *entry = obj;
1718 
1719  entry->forward = stasis_forward_cancel(entry->forward);
1720  ao2_cleanup(entry->topic);
1721  entry->topic = NULL;
1722 }
1723 
1724 static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
1725 {
1727 
1728  topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
1729  topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1730  if (!topic_pool_entry) {
1731  return NULL;
1732  }
1733 
1734  strcpy(topic_pool_entry->name, topic_name); /* Safe */
1735 
1736  return topic_pool_entry;
1737 }
1738 
1740  struct ao2_container *pool_container;
1741  struct stasis_topic *pool_topic;
1742 };
1743 
1744 static void topic_pool_dtor(void *obj)
1745 {
1746  struct stasis_topic_pool *pool = obj;
1747 
1748 #ifdef AO2_DEBUG
1749  {
1750  char *container_name =
1751  ast_alloca(strlen(stasis_topic_name(pool->pool_topic)) + strlen("-pool") + 1);
1752  sprintf(container_name, "%s-pool", stasis_topic_name(pool->pool_topic));
1753  ao2_container_unregister(container_name);
1754  }
1755 #endif
1756 
1757  ao2_cleanup(pool->pool_container);
1758  pool->pool_container = NULL;
1759  ao2_cleanup(pool->pool_topic);
1760  pool->pool_topic = NULL;
1761 }
1762 
1763 static int topic_pool_entry_hash(const void *obj, const int flags)
1764 {
1765  const struct topic_pool_entry *object;
1766  const char *key;
1767 
1768  switch (flags & OBJ_SEARCH_MASK) {
1769  case OBJ_SEARCH_KEY:
1770  key = obj;
1771  break;
1772  case OBJ_SEARCH_OBJECT:
1773  object = obj;
1774  key = object->name;
1775  break;
1776  default:
1777  /* Hash can only work on something with a full key. */
1778  ast_assert(0);
1779  return 0;
1780  }
1781  return ast_str_case_hash(key);
1782 }
1783 
1784 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
1785 {
1786  const struct topic_pool_entry *object_left = obj;
1787  const struct topic_pool_entry *object_right = arg;
1788  const char *right_key = arg;
1789  int cmp;
1790 
1791  switch (flags & OBJ_SEARCH_MASK) {
1792  case OBJ_SEARCH_OBJECT:
1793  right_key = object_right->name;
1794  /* Fall through */
1795  case OBJ_SEARCH_KEY:
1796  cmp = strcasecmp(object_left->name, right_key);
1797  break;
1799  /* Not supported by container */
1800  ast_assert(0);
1801  cmp = -1;
1802  break;
1803  default:
1804  /*
1805  * What arg points to is specific to this traversal callback
1806  * and has no special meaning to astobj2.
1807  */
1808  cmp = 0;
1809  break;
1810  }
1811  if (cmp) {
1812  return 0;
1813  }
1814  /*
1815  * At this point the traversal callback is identical to a sorted
1816  * container.
1817  */
1818  return CMP_MATCH;
1819 }
1820 
1821 #ifdef AO2_DEBUG
1822 static void topic_pool_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
1823 {
1824  struct topic_pool_entry *entry = v_obj;
1825 
1826  if (!entry) {
1827  return;
1828  }
1829  prnt(where, "%s", stasis_topic_name(entry->topic));
1830 }
1831 #endif
1832 
1834 {
1835  struct stasis_topic_pool *pool;
1836 
1837  pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1838  if (!pool) {
1839  return NULL;
1840  }
1841 
1842  pool->pool_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
1843  TOPIC_POOL_BUCKETS, topic_pool_entry_hash, NULL, topic_pool_entry_cmp);
1844  if (!pool->pool_container) {
1845  ao2_cleanup(pool);
1846  return NULL;
1847  }
1848 
1849 #ifdef AO2_DEBUG
1850  {
1851  char *container_name =
1852  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1853  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1854  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1855  }
1856 #endif
1857 
1858  ao2_ref(pooled_topic, +1);
1859  pool->pool_topic = pooled_topic;
1860 
1861  return pool;
1862 }
1863 
1864 void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
1865 {
1866  /*
1867  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1868  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1869  * name and search only on <topic_name>.
1870  */
1871  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1872  int pool_topic_name_len = strlen(pool_topic_name);
1873  const char *search_topic_name;
1874 
1875  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876  search_topic_name = topic_name + pool_topic_name_len + 1;
1877  } else {
1878  search_topic_name = topic_name;
1879  }
1880 
1881  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1882 }
1883 
1884 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
1885 {
1886  RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1887  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1888  char *new_topic_name;
1889  int ret;
1890 
1891  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1892  if (topic_pool_entry) {
1893  return topic_pool_entry->topic;
1894  }
1895 
1896  topic_pool_entry = topic_pool_entry_alloc(topic_name);
1897  if (!topic_pool_entry) {
1898  return NULL;
1899  }
1900 
1901  /* To provide further detail and to ensure that the topic is unique within the scope of the
1902  * system we prefix it with the pooling topic name, which should itself already be unique.
1903  */
1904  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1905  if (ret < 0) {
1906  return NULL;
1907  }
1908 
1909  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1910  ast_free(new_topic_name);
1911  if (!topic_pool_entry->topic) {
1912  return NULL;
1913  }
1914 
1915  topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1916  if (!topic_pool_entry->forward) {
1917  return NULL;
1918  }
1919 
1920  if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1921  return NULL;
1922  }
1923 
1924  return topic_pool_entry->topic;
1925 }
1926 
1927 int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
1928 {
1929  struct topic_pool_entry *topic_pool_entry;
1930 
1931  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1932  if (!topic_pool_entry) {
1933  return 0;
1934  }
1935 
1936  ao2_ref(topic_pool_entry, -1);
1937  return 1;
1938 }
1939 
1940 void stasis_log_bad_type_access(const char *name)
1941 {
1942 #ifdef AST_DEVMODE
1943  if (!stasis_message_type_declined(name)) {
1944  ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
1945  }
1946 #endif
1947 }
1948 
1949 /*! \brief A multi object blob data structure to carry user event stasis messages */
1951  struct ast_json *blob; /*< A blob of JSON data */
1952  AST_VECTOR(, void *) snapshots[STASIS_UMOS_MAX]; /*< Vector of snapshots for each type */
1953 };
1954 
1955 /*!
1956  * \internal
1957  * \brief Destructor for \ref ast_multi_object_blob objects
1958  */
1959 static void multi_object_blob_dtor(void *obj)
1960 {
1961  struct ast_multi_object_blob *multi = obj;
1962  int type;
1963  int i;
1964 
1965  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1966  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
1967  ao2_cleanup(AST_VECTOR_GET(&multi->snapshots[type], i));
1968  }
1969  AST_VECTOR_FREE(&multi->snapshots[type]);
1970  }
1971  ast_json_unref(multi->blob);
1972 }
1973 
1974 /*! \brief Create a stasis user event multi object blob */
1976 {
1977  int type;
1978  struct ast_multi_object_blob *multi;
1979 
1980  ast_assert(blob != NULL);
1981 
1982  multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
1983  if (!multi) {
1984  return NULL;
1985  }
1986 
1987  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
1988  if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
1989  ao2_ref(multi, -1);
1990 
1991  return NULL;
1992  }
1993  }
1994 
1995  multi->blob = ast_json_ref(blob);
1996 
1997  return multi;
1998 }
1999 
2000 /*! \brief Add an object (snapshot) to the blob */
2002  enum stasis_user_multi_object_snapshot_type type, void *object)
2003 {
2004  if (!multi || !object || AST_VECTOR_APPEND(&multi->snapshots[type], object)) {
2005  ao2_cleanup(object);
2006  }
2007 }
2008 
2009 /*! \brief Publish single channel user event (for app_userevent compatibility) */
2011  struct stasis_message_type *type, struct ast_json *blob)
2012 {
2013  struct stasis_message *message;
2014  struct ast_channel_snapshot *channel_snapshot;
2015  struct ast_multi_object_blob *multi;
2016 
2017  if (!type) {
2018  return;
2019  }
2020 
2021  multi = ast_multi_object_blob_create(blob);
2022  if (!multi) {
2023  return;
2024  }
2025 
2026  channel_snapshot = ast_channel_snapshot_create(chan);
2027  if (!channel_snapshot) {
2028  ao2_ref(multi, -1);
2029  return;
2030  }
2031 
2032  /* this call steals the channel_snapshot reference */
2033  ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
2034 
2035  message = stasis_message_create(type, multi);
2036  ao2_ref(multi, -1);
2037  if (message) {
2038  /* app_userevent still publishes to channel */
2039  stasis_publish(ast_channel_topic(chan), message);
2040  ao2_ref(message, -1);
2041  }
2042 }
2043 
2044 /*! \internal \brief convert multi object blob to ari json */
2045 static struct ast_json *multi_user_event_to_json(
2046  struct stasis_message *message,
2047  const struct stasis_message_sanitizer *sanitize)
2048 {
2049  struct ast_json *out;
2050  struct ast_multi_object_blob *multi = stasis_message_data(message);
2051  struct ast_json *blob = multi->blob;
2052  const struct timeval *tv = stasis_message_timestamp(message);
2054  int i;
2055 
2056  out = ast_json_object_create();
2057  if (!out) {
2058  return NULL;
2059  }
2060 
2061  ast_json_object_set(out, "type", ast_json_string_create("ChannelUserevent"));
2062  ast_json_object_set(out, "timestamp", ast_json_timeval(*tv, NULL));
2063  ast_json_object_set(out, "eventname", ast_json_ref(ast_json_object_get(blob, "eventname")));
2064  ast_json_object_set(out, "userevent", ast_json_ref(blob));
2065 
2066  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2067  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2068  struct ast_json *json_object = NULL;
2069  char *name = NULL;
2070  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2071 
2072  switch (type) {
2073  case STASIS_UMOS_CHANNEL:
2074  json_object = ast_channel_snapshot_to_json(snapshot, sanitize);
2075  name = "channel";
2076  break;
2077  case STASIS_UMOS_BRIDGE:
2078  json_object = ast_bridge_snapshot_to_json(snapshot, sanitize);
2079  name = "bridge";
2080  break;
2081  case STASIS_UMOS_ENDPOINT:
2082  json_object = ast_endpoint_snapshot_to_json(snapshot, sanitize);
2083  name = "endpoint";
2084  break;
2085  }
2086  if (json_object) {
2087  ast_json_object_set(out, name, json_object);
2088  }
2089  }
2090  }
2091 
2092  return out;
2093 }
2094 
2095 /*! \internal \brief convert multi object blob to ami string */
2096 static struct ast_str *multi_object_blob_to_ami(void *obj)
2097 {
2098  struct ast_str *ami_str=ast_str_create(1024);
2099  struct ast_str *ami_snapshot;
2100  const struct ast_multi_object_blob *multi = obj;
2102  int i;
2103 
2104  if (!ami_str) {
2105  return NULL;
2106  }
2107  if (!multi) {
2108  ast_free(ami_str);
2109  return NULL;
2110  }
2111 
2112  for (type = 0; type < STASIS_UMOS_MAX; ++type) {
2113  for (i = 0; i < AST_VECTOR_SIZE(&multi->snapshots[type]); ++i) {
2114  char *name = NULL;
2115  void *snapshot = AST_VECTOR_GET(&multi->snapshots[type], i);
2116  ami_snapshot = NULL;
2117 
2118  if (i > 0) {
2119  ast_asprintf(&name, "%d", i + 1);
2120  }
2121 
2122  switch (type) {
2123  case STASIS_UMOS_CHANNEL:
2124  ami_snapshot = ast_manager_build_channel_state_string_prefix(snapshot, name ?: "");
2125  break;
2126 
2127  case STASIS_UMOS_BRIDGE:
2128  ami_snapshot = ast_manager_build_bridge_state_string_prefix(snapshot, name ?: "");
2129  break;
2130 
2131  case STASIS_UMOS_ENDPOINT:
2132  /* currently not sending endpoint snapshots to AMI */
2133  break;
2134  }
2135  if (ami_snapshot) {
2136  ast_str_append(&ami_str, 0, "%s", ast_str_buffer(ami_snapshot));
2137  ast_free(ami_snapshot);
2138  }
2139  ast_free(name);
2140  }
2141  }
2142 
2143  return ami_str;
2144 }
2145 
2146 /*! \internal \brief Callback to pass only user defined parameters from blob */
2147 static int userevent_exclusion_cb(const char *key)
2148 {
2149  if (!strcmp("eventname", key)) {
2150  return 1;
2151  }
2152  return 0;
2153 }
2154 
2155 static struct ast_manager_event_blob *multi_user_event_to_ami(
2156  struct stasis_message *message)
2157 {
2158  RAII_VAR(struct ast_str *, object_string, NULL, ast_free);
2159  RAII_VAR(struct ast_str *, body, NULL, ast_free);
2160  struct ast_multi_object_blob *multi = stasis_message_data(message);
2161  const char *eventname;
2162 
2163  eventname = ast_json_string_get(ast_json_object_get(multi->blob, "eventname"));
2164  body = ast_manager_str_from_json_object(multi->blob, userevent_exclusion_cb);
2165  object_string = multi_object_blob_to_ami(multi);
2166  if (!object_string || !body) {
2167  return NULL;
2168  }
2169 
2170  return ast_manager_event_blob_create(EVENT_FLAG_USER, "UserEvent",
2171  "%s"
2172  "UserEvent: %s\r\n"
2173  "%s",
2174  ast_str_buffer(object_string),
2175  eventname,
2176  ast_str_buffer(body));
2177 }
2178 
2179 /*! \brief A structure to hold global configuration-related options */
2181  /*! The list of message types to decline */
2183 };
2184 
2185 /*! \brief Threadpool configuration options */
2187  /*! Initial size of the thread pool */
2189  /*! Time, in seconds, before we expire a thread */
2191  /*! Maximum number of thread to allow */
2193 };
2194 
2196  /*! Thread pool configuration options */
2198  /*! Declined message types */
2200 };
2201 
2202 static struct aco_type threadpool_option = {
2203  .type = ACO_GLOBAL,
2204  .name = "threadpool",
2205  .item_offset = offsetof(struct stasis_config, threadpool_options),
2206  .category = "threadpool",
2207  .category_match = ACO_WHITELIST_EXACT,
2208 };
2209 
2210 static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
2211 
2212 /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
2213 static struct aco_type declined_option = {
2214  .type = ACO_GLOBAL,
2215  .name = "declined_message_types",
2216  .item_offset = offsetof(struct stasis_config, declined_message_types),
2217  .category_match = ACO_WHITELIST_EXACT,
2218  .category = "declined_message_types",
2219 };
2220 
2221 struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
2222 
2223 struct aco_file stasis_conf = {
2224  .filename = "stasis.conf",
2225  .types = ACO_TYPES(&declined_option, &threadpool_option),
2226 };
2227 
2228 /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
2229 static AO2_GLOBAL_OBJ_STATIC(globals);
2230 
2231 static void *stasis_config_alloc(void);
2232 
2233 /*! \brief Register information about the configs being processed by this module */
2234 CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
2235  .files = ACO_FILES(&stasis_conf),
2236 );
2237 
2238 static void stasis_declined_config_destructor(void *obj)
2239 {
2240  struct stasis_declined_config *declined = obj;
2241 
2242  ao2_cleanup(declined->declined);
2243 }
2244 
2245 static void stasis_config_destructor(void *obj)
2246 {
2247  struct stasis_config *cfg = obj;
2248 
2249  ao2_cleanup(cfg->declined_message_types);
2250  ast_free(cfg->threadpool_options);
2251 }
2252 
2253 static void *stasis_config_alloc(void)
2254 {
2255  struct stasis_config *cfg;
2256 
2257  if (!(cfg = ao2_alloc(sizeof(*cfg), stasis_config_destructor))) {
2258  return NULL;
2259  }
2260 
2261  cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
2262  if (!cfg->threadpool_options) {
2263  ao2_ref(cfg, -1);
2264  return NULL;
2265  }
2266 
2267  cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
2268  stasis_declined_config_destructor);
2269  if (!cfg->declined_message_types) {
2270  ao2_ref(cfg, -1);
2271  return NULL;
2272  }
2273 
2275  if (!cfg->declined_message_types->declined) {
2276  ao2_ref(cfg, -1);
2277  return NULL;
2278  }
2279 
2280  return cfg;
2281 }
2282 
2283 int stasis_message_type_declined(const char *name)
2284 {
2285  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2286  char *name_in_declined;
2287  int res;
2288 
2289  if (!cfg || !cfg->declined_message_types) {
2290  ao2_cleanup(cfg);
2291  return 0;
2292  }
2293 
2294  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2295  res = name_in_declined ? 1 : 0;
2296  ao2_cleanup(name_in_declined);
2297  ao2_ref(cfg, -1);
2298  if (res) {
2299  ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2300  }
2301  return res;
2302 }
2303 
2304 static int declined_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2305 {
2306  struct stasis_declined_config *declined = obj;
2307 
2308  if (ast_strlen_zero(var->value)) {
2309  return 0;
2310  }
2311 
2312  if (ast_str_container_add(declined->declined, var->value)) {
2313  return -1;
2314  }
2315 
2316  return 0;
2317 }
2318 
2319 /*!
2320  * @{ \brief Define multi user event message type(s).
2321  */
2322 
2324  .to_json = multi_user_event_to_json,
2325  .to_ami = multi_user_event_to_ami,
2326  );
2327 
2328 /*! @} */
2329 
2330 /*!
2331  * \internal
2332  * \brief CLI command implementation for 'stasis show topics'
2333  */
2334 static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2335 {
2336  struct ao2_iterator iter;
2337  struct topic_proxy *topic;
2338  struct ao2_container *tmp_container;
2339  int count = 0;
2340 #define FMT_HEADERS "%-64s %-64s\n"
2341 #define FMT_FIELDS "%-64s %-64s\n"
2342 
2343  switch (cmd) {
2344  case CLI_INIT:
2345  e->command = "stasis show topics";
2346  e->usage =
2347  "Usage: stasis show topics\n"
2348  " Shows a list of topics\n";
2349  return NULL;
2350  case CLI_GENERATE:
2351  return NULL;
2352  }
2353 
2354  if (a->argc != e->args) {
2355  return CLI_SHOWUSAGE;
2356  }
2357 
2358  ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
2359 
2361  topic_proxy_sort_fn, NULL);
2362 
2363  if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
2364  ao2_cleanup(tmp_container);
2365 
2366  return NULL;
2367  }
2368 
2369  /* getting all topic in order */
2370  iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
2371  while ((topic = ao2_iterator_next(&iter))) {
2372  ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
2373  ao2_ref(topic, -1);
2374  ++count;
2375  }
2376  ao2_iterator_destroy(&iter);
2377  ao2_cleanup(tmp_container);
2378 
2379  ast_cli(a->fd, "\n%d Total topics\n\n", count);
2380 
2381 #undef FMT_HEADERS
2382 #undef FMT_FIELDS
2383 
2384  return CLI_SUCCESS;
2385 }
2386 
2387 /*!
2388  * \internal
2389  * \brief CLI tab completion for topic names
2390  */
2391 static char *topic_complete_name(const char *word)
2392 {
2393  struct topic_proxy *topic;
2394  struct ao2_iterator it;
2395  int wordlen = strlen(word);
2396  int ret;
2397 
2398  it = ao2_iterator_init(topic_all, 0);
2399  while ((topic = ao2_iterator_next(&it))) {
2400  if (!strncasecmp(word, topic->name, wordlen)) {
2401  ret = ast_cli_completion_add(ast_strdup(topic->name));
2402  if (ret) {
2403  ao2_ref(topic, -1);
2404  break;
2405  }
2406  }
2407  ao2_ref(topic, -1);
2408  }
2409  ao2_iterator_destroy(&it);
2410  return NULL;
2411 }
2412 
2413 /*!
2414  * \internal
2415  * \brief CLI command implementation for 'stasis show topic'
2416  */
2417 static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2418 {
2419  struct stasis_topic *topic;
2420  char print_time[32];
2421  int i;
2422 
2423  switch (cmd) {
2424  case CLI_INIT:
2425  e->command = "stasis show topic";
2426  e->usage =
2427  "Usage: stasis show topic <name>\n"
2428  " Show stasis topic detail info.\n";
2429  return NULL;
2430  case CLI_GENERATE:
2431  if (a->pos == 3) {
2432  return topic_complete_name(a->word);
2433  } else {
2434  return NULL;
2435  }
2436  }
2437 
2438  if (a->argc != 4) {
2439  return CLI_SHOWUSAGE;
2440  }
2441 
2442  topic = stasis_topic_get(a->argv[3]);
2443  if (!topic) {
2444  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
2445  return CLI_FAILURE;
2446  }
2447 
2448  ast_cli(a->fd, "Name: %s\n", topic->name);
2449  ast_cli(a->fd, "Detail: %s\n", topic->detail);
2450  ast_cli(a->fd, "Subscribers count: %zu\n", AST_VECTOR_SIZE(&topic->subscribers));
2451  ast_cli(a->fd, "Forwarding topic count: %zu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
2452  ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
2453  ast_cli(a->fd, "Duration time: %s\n", print_time);
2454 
2455  ao2_lock(topic);
2456  ast_cli(a->fd, "\nSubscribers:\n");
2457  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); i++) {
2458  struct stasis_subscription *subscription_tmp = AST_VECTOR_GET(&topic->subscribers, i);
2459  ast_cli(a->fd, " UniqueID: %s, Topic: %s, Detail: %s\n",
2460  subscription_tmp->uniqueid, subscription_tmp->topic->name, subscription_tmp->topic->detail);
2461  }
2462 
2463  ast_cli(a->fd, "\nForwarded topics:\n");
2464  for (i = 0; i < AST_VECTOR_SIZE(&topic->upstream_topics); i++) {
2465  struct stasis_topic *topic_tmp = AST_VECTOR_GET(&topic->upstream_topics, i);
2466  ast_cli(a->fd, " Topic: %s, Detail: %s\n", topic_tmp->name, topic_tmp->detail);
2467  }
2468  ao2_unlock(topic);
2469 
2470  ao2_ref(topic, -1);
2471 
2472  return CLI_SUCCESS;
2473 }
2474 
2475 
2476 static struct ast_cli_entry cli_stasis[] = {
2477  AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
2478  AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
2479 };
2480 
2481 
2482 #ifdef AST_DEVMODE
2483 
2484 AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
2485 
2486 /*!
2487  * \internal
2488  * \brief CLI command implementation for 'stasis statistics show subscriptions'
2489  */
2490 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2491 {
2492  struct ao2_container *sorted_subscriptions;
2493  struct ao2_container *subscription_stats;
2494  struct ao2_iterator iter;
2495  struct stasis_subscription_statistics *statistics;
2496  int count = 0;
2497  int dropped = 0;
2498  int passed = 0;
2499 #define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
2500 #define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
2501 #define FMT_FIELDS2 "%-64s %10d %10d\n"
2502 
2503  switch (cmd) {
2504  case CLI_INIT:
2505  e->command = "stasis statistics show subscriptions";
2506  e->usage =
2507  "Usage: stasis statistics show subscriptions\n"
2508  " Shows a list of subscriptions and their general statistics\n";
2509  return NULL;
2510  case CLI_GENERATE:
2511  return NULL;
2512  }
2513 
2514  if (a->argc != e->args) {
2515  return CLI_SHOWUSAGE;
2516  }
2517 
2518  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2519  if (!subscription_stats) {
2520  ast_cli(a->fd, "Could not fetch subscription_statistics container\n");
2521  return CLI_FAILURE;
2522  }
2523 
2524  sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
2525  stasis_subscription_statistics_sort_fn, NULL);
2526  if (!sorted_subscriptions) {
2527  ao2_ref(subscription_stats, -1);
2528  ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
2529  return CLI_SUCCESS;
2530  }
2531 
2532  if (ao2_container_dup(sorted_subscriptions, subscription_stats, 0)) {
2533  ao2_ref(sorted_subscriptions, -1);
2534  ao2_ref(subscription_stats, -1);
2535  ast_cli(a->fd, "Could not sort subscription statistics\n");
2536  return CLI_SUCCESS;
2537  }
2538 
2539  ao2_ref(subscription_stats, -1);
2540 
2541  ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
2542 
2543  iter = ao2_iterator_init(sorted_subscriptions, 0);
2544  while ((statistics = ao2_iterator_next(&iter))) {
2545  ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
2546  statistics->lowest_time_invoked, statistics->highest_time_invoked);
2547  dropped += statistics->messages_dropped;
2548  passed += statistics->messages_passed;
2549  ao2_ref(statistics, -1);
2550  ++count;
2551  }
2552  ao2_iterator_destroy(&iter);
2553 
2554  ao2_ref(sorted_subscriptions, -1);
2555 
2556  ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
2557  ast_cli(a->fd, "\n%d subscriptions\n\n", count);
2558 
2559 #undef FMT_HEADERS
2560 #undef FMT_FIELDS
2561 #undef FMT_FIELDS2
2562 
2563  return CLI_SUCCESS;
2564 }
2565 
2566 /*!
2567  * \internal
2568  * \brief CLI tab completion for subscription statistics names
2569  */
2570 static char *subscription_statistics_complete_name(const char *word, int state)
2571 {
2572  struct stasis_subscription_statistics *statistics;
2573  struct ao2_container *subscription_stats;
2574  struct ao2_iterator it_statistics;
2575  int wordlen = strlen(word);
2576  int which = 0;
2577  char *result = NULL;
2578 
2579  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2580  if (!subscription_stats) {
2581  return result;
2582  }
2583 
2584  it_statistics = ao2_iterator_init(subscription_stats, 0);
2585  while ((statistics = ao2_iterator_next(&it_statistics))) {
2586  if (!strncasecmp(word, statistics->uniqueid, wordlen)
2587  && ++which > state) {
2588  result = ast_strdup(statistics->uniqueid);
2589  }
2590  ao2_ref(statistics, -1);
2591  if (result) {
2592  break;
2593  }
2594  }
2595  ao2_iterator_destroy(&it_statistics);
2596  ao2_ref(subscription_stats, -1);
2597  return result;
2598 }
2599 
2600 /*!
2601  * \internal
2602  * \brief CLI command implementation for 'stasis statistics show subscription'
2603  */
2604 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2605 {
2606  struct stasis_subscription_statistics *statistics;
2607  struct ao2_container *subscription_stats;
2608  struct ao2_iterator i;
2609  char *name;
2610 
2611  switch (cmd) {
2612  case CLI_INIT:
2613  e->command = "stasis statistics show subscription";
2614  e->usage =
2615  "Usage: stasis statistics show subscription <uniqueid>\n"
2616  " Show stasis subscription statistics.\n";
2617  return NULL;
2618  case CLI_GENERATE:
2619  if (a->pos == 4) {
2620  return subscription_statistics_complete_name(a->word, a->n);
2621  } else {
2622  return NULL;
2623  }
2624  }
2625 
2626  if (a->argc != 5) {
2627  return CLI_SHOWUSAGE;
2628  }
2629 
2630  subscription_stats = ao2_global_obj_ref(subscription_statistics);
2631  if (!subscription_stats) {
2632  ast_cli(a->fd, "Could not fetch subcription_statistics container\n");
2633  return CLI_FAILURE;
2634  }
2635 
2636  statistics = ao2_find(subscription_stats, a->argv[4], OBJ_SEARCH_KEY);
2637  if (!statistics) {
2638  ao2_ref(subscription_stats, -1);
2639  ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
2640  return CLI_FAILURE;
2641  }
2642 
2643  ao2_ref(subscription_stats, -1);
2644 
2645  ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
2646  ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
2647  ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
2648  ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
2649  ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
2650  ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
2651  ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
2652  ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
2653  ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
2654  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
2655  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
2656 
2657  ao2_lock(statistics);
2658  if (statistics->highest_time_message_type) {
2659  ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
2660  }
2661  ao2_unlock(statistics);
2662 
2663  ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
2664 
2665  ast_cli(a->fd, "Subscribed topics:\n");
2666  i = ao2_iterator_init(statistics->topics, 0);
2667  while ((name = ao2_iterator_next(&i))) {
2668  ast_cli(a->fd, "\t%s\n", name);
2669  ao2_ref(name, -1);
2670  }
2672 
2673  ao2_ref(statistics, -1);
2674 
2675  return CLI_SUCCESS;
2676 }
2677 
2678 AO2_STRING_FIELD_SORT_FN(stasis_topic_statistics, name);
2679 
2680 /*!
2681  * \internal
2682  * \brief CLI command implementation for 'stasis statistics show topics'
2683  */
2684 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2685 {
2686  struct ao2_container *sorted_topics;
2687  struct ao2_container *topic_stats;
2688  struct ao2_iterator iter;
2689  struct stasis_topic_statistics *statistics;
2690  int count = 0;
2691  int not_dispatched = 0;
2692  int dispatched = 0;
2693 #define FMT_HEADERS "%-64s %10s %10s %10s %16s %16s\n"
2694 #define FMT_FIELDS "%-64s %10d %10d %10d %16ld %16ld\n"
2695 #define FMT_FIELDS2 "%-64s %10s %10d %10d\n"
2696 
2697  switch (cmd) {
2698  case CLI_INIT:
2699  e->command = "stasis statistics show topics";
2700  e->usage =
2701  "Usage: stasis statistics show topics\n"
2702  " Shows a list of topics and their general statistics\n";
2703  return NULL;
2704  case CLI_GENERATE:
2705  return NULL;
2706  }
2707 
2708  if (a->argc != e->args) {
2709  return CLI_SHOWUSAGE;
2710  }
2711 
2712  topic_stats = ao2_global_obj_ref(topic_statistics);
2713  if (!topic_stats) {
2714  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2715  return CLI_FAILURE;
2716  }
2717 
2719  stasis_topic_statistics_sort_fn, NULL);
2720  if (!sorted_topics) {
2721  ao2_ref(topic_stats, -1);
2722  ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
2723  return CLI_SUCCESS;
2724  }
2725 
2726  if (ao2_container_dup(sorted_topics, topic_stats, 0)) {
2727  ao2_ref(sorted_topics, -1);
2728  ao2_ref(topic_stats, -1);
2729  ast_cli(a->fd, "Could not sort topic statistics\n");
2730  return CLI_SUCCESS;
2731  }
2732 
2733  ao2_ref(topic_stats, -1);
2734 
2735  ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
2736 
2737  iter = ao2_iterator_init(sorted_topics, 0);
2738  while ((statistics = ao2_iterator_next(&iter))) {
2739  ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
2740  statistics->messages_not_dispatched, statistics->messages_dispatched,
2741  statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
2742  not_dispatched += statistics->messages_not_dispatched;
2743  dispatched += statistics->messages_dispatched;
2744  ao2_ref(statistics, -1);
2745  ++count;
2746  }
2747  ao2_iterator_destroy(&iter);
2748 
2749  ao2_ref(sorted_topics, -1);
2750 
2751  ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
2752  ast_cli(a->fd, "\n%d topics\n\n", count);
2753 
2754 #undef FMT_HEADERS
2755 #undef FMT_FIELDS
2756 #undef FMT_FIELDS2
2757 
2758  return CLI_SUCCESS;
2759 }
2760 
2761 /*!
2762  * \internal
2763  * \brief CLI tab completion for topic statistics names
2764  */
2765 static char *topic_statistics_complete_name(const char *word, int state)
2766 {
2767  struct stasis_topic_statistics *statistics;
2768  struct ao2_container *topic_stats;
2769  struct ao2_iterator it_statistics;
2770  int wordlen = strlen(word);
2771  int which = 0;
2772  char *result = NULL;
2773 
2774  topic_stats = ao2_global_obj_ref(topic_statistics);
2775  if (!topic_stats) {
2776  return result;
2777  }
2778 
2779  it_statistics = ao2_iterator_init(topic_stats, 0);
2780  while ((statistics = ao2_iterator_next(&it_statistics))) {
2781  if (!strncasecmp(word, statistics->name, wordlen)
2782  && ++which > state) {
2783  result = ast_strdup(statistics->name);
2784  }
2785  ao2_ref(statistics, -1);
2786  if (result) {
2787  break;
2788  }
2789  }
2790  ao2_iterator_destroy(&it_statistics);
2791  ao2_ref(topic_stats, -1);
2792  return result;
2793 }
2794 
2795 /*!
2796  * \internal
2797  * \brief CLI command implementation for 'stasis statistics show topic'
2798  */
2799 static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2800 {
2801  struct stasis_topic_statistics *statistics;
2802  struct ao2_container *topic_stats;
2803  struct ao2_iterator i;
2804  char *uniqueid;
2805 
2806  switch (cmd) {
2807  case CLI_INIT:
2808  e->command = "stasis statistics show topic";
2809  e->usage =
2810  "Usage: stasis statistics show topic <name>\n"
2811  " Show stasis topic statistics.\n";
2812  return NULL;
2813  case CLI_GENERATE:
2814  if (a->pos == 4) {
2815  return topic_statistics_complete_name(a->word, a->n);
2816  } else {
2817  return NULL;
2818  }
2819  }
2820 
2821  if (a->argc != 5) {
2822  return CLI_SHOWUSAGE;
2823  }
2824 
2825  topic_stats = ao2_global_obj_ref(topic_statistics);
2826  if (!topic_stats) {
2827  ast_cli(a->fd, "Could not fetch topic_statistics container\n");
2828  return CLI_FAILURE;
2829  }
2830 
2831  statistics = ao2_find(topic_stats, a->argv[4], OBJ_SEARCH_KEY);
2832  if (!statistics) {
2833  ao2_ref(topic_stats, -1);
2834  ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
2835  return CLI_FAILURE;
2836  }
2837 
2838  ao2_ref(topic_stats, -1);
2839 
2840  ast_cli(a->fd, "Topic: %s\n", statistics->name);
2841  ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
2842  ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
2843  ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
2844  ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
2845  ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
2846  ast_cli(a->fd, "Number of subscribers: %d\n", ao2_container_count(statistics->subscribers));
2847 
2848  ast_cli(a->fd, "Subscribers:\n");
2849  i = ao2_iterator_init(statistics->subscribers, 0);
2850  while ((uniqueid = ao2_iterator_next(&i))) {
2851  ast_cli(a->fd, "\t%s\n", uniqueid);
2852  ao2_ref(uniqueid, -1);
2853  }
2855 
2856  ao2_ref(statistics, -1);
2857 
2858  return CLI_SUCCESS;
2859 }
2860 
2861 /*!
2862  * \internal
2863  * \brief CLI command implementation for 'stasis statistics show messages'
2864  */
2865 static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
2866 {
2867  int i;
2868  int count = 0;
2869  int published = 0;
2870  int unused = 0;
2871 #define FMT_HEADERS "%-64s %10s %10s\n"
2872 #define FMT_FIELDS "%-64s %10d %10d\n"
2873 
2874  switch (cmd) {
2875  case CLI_INIT:
2876  e->command = "stasis statistics show messages";
2877  e->usage =
2878  "Usage: stasis statistics show messages\n"
2879  " Shows a list of message types and their general statistics\n";
2880  return NULL;
2881  case CLI_GENERATE:
2882  return NULL;
2883  }
2884 
2885  if (a->argc != e->args) {
2886  return CLI_SHOWUSAGE;
2887  }
2888 
2889  ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
2890 
2891  ast_mutex_lock(&message_type_statistics_lock);
2892  for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
2893  struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
2894 
2895  if (!statistics->message_type) {
2896  continue;
2897  }
2898 
2899  ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
2900  statistics->unused);
2901  published += statistics->published;
2902  unused += statistics->unused;
2903  ++count;
2904  }
2905  ast_mutex_unlock(&message_type_statistics_lock);
2906 
2907  ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
2908  ast_cli(a->fd, "\n%d seen message types\n\n", count);
2909 
2910 #undef FMT_HEADERS
2911 #undef FMT_FIELDS
2912 
2913  return CLI_SUCCESS;
2914 }
2915 
2916 static struct ast_cli_entry cli_stasis_statistics[] = {
2917  AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
2918  AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
2919  AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
2920  AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
2921  AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
2922 };
2923 
2924 static int subscription_statistics_hash(const void *obj, const int flags)
2925 {
2926  const struct stasis_subscription_statistics *object;
2927  const char *key;
2928 
2929  switch (flags & OBJ_SEARCH_MASK) {
2930  case OBJ_SEARCH_KEY:
2931  key = obj;
2932  break;
2933  case OBJ_SEARCH_OBJECT:
2934  object = obj;
2935  key = object->uniqueid;
2936  break;
2937  default:
2938  /* Hash can only work on something with a full key. */
2939  ast_assert(0);
2940  return 0;
2941  }
2942  return ast_str_case_hash(key);
2943 }
2944 
2945 static int subscription_statistics_cmp(void *obj, void *arg, int flags)
2946 {
2947  const struct stasis_subscription_statistics *object_left = obj;
2948  const struct stasis_subscription_statistics *object_right = arg;
2949  const char *right_key = arg;
2950  int cmp;
2951 
2952  switch (flags & OBJ_SEARCH_MASK) {
2953  case OBJ_SEARCH_OBJECT:
2954  right_key = object_right->uniqueid;
2955  /* Fall through */
2956  case OBJ_SEARCH_KEY:
2957  cmp = strcasecmp(object_left->uniqueid, right_key);
2958  break;
2960  /* Not supported by container */
2961  ast_assert(0);
2962  cmp = -1;
2963  break;
2964  default:
2965  /*
2966  * What arg points to is specific to this traversal callback
2967  * and has no special meaning to astobj2.
2968  */
2969  cmp = 0;
2970  break;
2971  }
2972  if (cmp) {
2973  return 0;
2974  }
2975  /*
2976  * At this point the traversal callback is identical to a sorted
2977  * container.
2978  */
2979  return CMP_MATCH;
2980 }
2981 
2982 static int topic_statistics_hash(const void *obj, const int flags)
2983 {
2984  const struct stasis_topic_statistics *object;
2985  const char *key;
2986 
2987  switch (flags & OBJ_SEARCH_MASK) {
2988  case OBJ_SEARCH_KEY:
2989  key = obj;
2990  break;
2991  case OBJ_SEARCH_OBJECT:
2992  object = obj;
2993  key = object->name;
2994  break;
2995  default:
2996  /* Hash can only work on something with a full key. */
2997  ast_assert(0);
2998  return 0;
2999  }
3000  return ast_str_case_hash(key);
3001 }
3002 
3003 static int topic_statistics_cmp(void *obj, void *arg, int flags)
3004 {
3005  const struct stasis_topic_statistics *object_left = obj;
3006  const struct stasis_topic_statistics *object_right = arg;
3007  const char *right_key = arg;
3008  int cmp;
3009 
3010  switch (flags & OBJ_SEARCH_MASK) {
3011  case OBJ_SEARCH_OBJECT:
3012  right_key = object_right->name;
3013  /* Fall through */
3014  case OBJ_SEARCH_KEY:
3015  cmp = strcasecmp(object_left->name, right_key);
3016  break;
3018  /* Not supported by container */
3019  ast_assert(0);
3020  cmp = -1;
3021  break;
3022  default:
3023  /*
3024  * What arg points to is specific to this traversal callback
3025  * and has no special meaning to astobj2.
3026  */
3027  cmp = 0;
3028  break;
3029  }
3030  if (cmp) {
3031  return 0;
3032  }
3033  /*
3034  * At this point the traversal callback is identical to a sorted
3035  * container.
3036  */
3037  return CMP_MATCH;
3038 }
3039 #endif
3040 
3041 /*! \brief Cleanup function for graceful shutdowns */
3042 static void stasis_cleanup(void)
3043 {
3044 #ifdef AST_DEVMODE
3045  ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
3046  AST_VECTOR_FREE(&message_type_statistics);
3047  ao2_global_obj_release(subscription_statistics);
3048  ao2_global_obj_release(topic_statistics);
3049 #endif
3050  ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
3051  ao2_cleanup(topic_all);
3052  topic_all = NULL;
3053  ast_threadpool_shutdown(threadpool);
3054  threadpool = NULL;
3057  aco_info_destroy(&cfg_info);
3058  ao2_global_obj_release(globals);
3059 }
3060 
3061 int stasis_init(void)
3062 {
3063  struct stasis_config *cfg;
3064  int cache_init;
3065  struct ast_threadpool_options threadpool_opts = { 0, };
3066 #ifdef AST_DEVMODE
3067  struct ao2_container *subscription_stats;
3068  struct ao2_container *topic_stats;
3069 #endif
3070 
3071  /* Be sure the types are cleaned up after the message bus */
3073 
3074  if (aco_info_init(&cfg_info)) {
3075  return -1;
3076  }
3077 
3078  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3079  declined_options, "", declined_handler, 0);
3080  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3081  threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
3082  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3083  INT_MAX);
3084  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3085  threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
3086  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3087  INT_MAX);
3088  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3089  threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
3090  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3091  INT_MAX);
3092 
3093  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3094  struct stasis_config *default_cfg = stasis_config_alloc();
3095 
3096  if (!default_cfg) {
3097  return -1;
3098  }
3099 
3100  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3101  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3102  ao2_ref(default_cfg, -1);
3103 
3104  return -1;
3105  }
3106 
3107  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3108  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3109  ao2_ref(default_cfg, -1);
3110 
3111  return -1;
3112  }
3113 
3114  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3115  ao2_global_obj_replace_unref(globals, default_cfg);
3116  cfg = default_cfg;
3117  } else {
3118  cfg = ao2_global_obj_ref(globals);
3119  if (!cfg) {
3120  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3121 
3122  return -1;
3123  }
3124  }
3125 
3126  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3127  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3128  threadpool_opts.auto_increment = 1;
3129  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3130  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3131  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3132  ao2_ref(cfg, -1);
3133  if (!threadpool) {
3134  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3135 
3136  return -1;
3137  }
3138 
3139  cache_init = stasis_cache_init();
3140  if (cache_init != 0) {
3141  return -1;
3142  }
3143 
3145  return -1;
3146  }
3148  return -1;
3149  }
3150 
3151  topic_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_ALL_BUCKETS,
3152  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3153  if (!topic_all) {
3154  return -1;
3155  }
3156 
3157  if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
3158  return -1;
3159  }
3160 
3161 #ifdef AST_DEVMODE
3162  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3163  * topic or subscripton.
3164  */
3165  subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3166  subscription_statistics_hash, 0, subscription_statistics_cmp);
3167  if (!subscription_stats) {
3168  return -1;
3169  }
3170  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3171  ao2_cleanup(subscription_stats);
3172 
3173  topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3174  topic_statistics_hash, 0, topic_statistics_cmp);
3175  if (!topic_stats) {
3176  return -1;
3177  }
3178  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3179  ao2_cleanup(topic_stats);
3180  if (!topic_stats) {
3181  return -1;
3182  }
3183 
3184  AST_VECTOR_INIT(&message_type_statistics, 0);
3185 
3186  if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3187  return -1;
3188  }
3189 #endif
3190 
3191  return 0;
3192 }
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,)
Define multi user event message type(s).
Struct containing info for an AMI event to send out.
Definition: manager.h:502
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
char * detail
Definition: stasis.c:389
Main Channel structure associated with a channel.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct timeval * creationtime
Definition: stasis.c:392
struct ast_json * ast_json_ref(struct ast_json *value)
Increase refcount on value.
Definition: json.c:67
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
static AO2_GLOBAL_OBJ_STATIC(globals)
A global object container that will contain the stasis_config that gets swapped out on reloads...
void( ao2_prnt_fn)(void *where, const char *fmt,...)
Print output.
Definition: astobj2.h:1435
struct stasis_subscription::@400 accepted_message_types
Internal Stasis APIs.
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
Definition: vector.h:583
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
AO2_STRING_FIELD_HASH_FN(transport_monitor, key)
Hashing function for struct transport_monitor.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct ast_multi_object_blob * ast_multi_object_blob_create(struct ast_json *blob)
Create a stasis user event multi object blob.
Definition: stasis.c:1975
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
struct stasis_topic * topic
Definition: stasis.c:684
#define STASIS_UMOS_MAX
Number of snapshot types.
Definition: stasis.h:1360
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:304
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Definition: stasis.c:643
int ast_cli_unregister_multiple(struct ast_cli_entry *e, int len)
Unregister multiple commands.
Definition: clicompat.c:30
Definition: stasis.c:1709
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3042
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
struct stasis_topic::@399 upstream_topics
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1169
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
descriptor for a cli entry.
Definition: cli.h:171
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
Definition: strings.h:761
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
ast_cond_t join_cond
Definition: stasis.c:693
struct stasis_topic::@398 subscribers
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
Definition: stasis.c:1053
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
Threadpool configuration options.
Definition: stasis.c:2186
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1118
Structure for variables, used for configurations and for channel variables.
static pj_pool_t * pool
Global memory pool for configuration and timers.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
Structure representing a snapshot of channel state.
Universally unique identifier support.
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
#define ast_str_container_alloc(buckets)
Allocates a hash container for bare strings.
Definition: strings.h:1365
struct ast_str * ast_manager_build_channel_state_string_prefix(const struct ast_channel_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a channel snapshot.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
AO2_STRING_FIELD_CMP_FN(transport_monitor, key)
Comparison function for struct transport_monitor.
struct stasis_topic * stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
Find or create a topic in the pool.
Definition: stasis.c:1884
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
Definition: strings.c:221
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
Register information about the configs being processed by this module.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
Definition: astobj2.c:934
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
The representation of a single configuration file to be processed.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
enum aco_type_t type
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)
Check if a topic exists in a pool.
Definition: stasis.c:1927
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:425
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
Definition: manager.c:10563
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
struct ast_channel_snapshot * ast_channel_snapshot_create(struct ast_channel *chan)
Generate a snapshot of the channel state. This is an ao2 object, so ao2_cleanup() to deallocate...
struct stasis_topic * from_topic
Definition: stasis.c:1533
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
Utility functions.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
struct ast_str * ast_manager_build_bridge_state_string_prefix(const struct ast_bridge_snapshot *snapshot, const char *prefix)
Generate the AMI message body from a bridge snapshot.
int final_message_processed
Definition: stasis.c:699
int args
This gets set in ast_cli_register()
Definition: cli.h:185
struct ast_str * ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
Convert a JSON object into an AMI compatible string.
Definition: manager.c:1981
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:942
struct ao2_container * declined
Definition: stasis.c:2182
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1011
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
Definition: astobj2.h:1116
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
A multi object blob data structure to carry user event stasis messages.
Definition: stasis.c:1950
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
Definition: vector.h:668
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
Definition: vector.h:571
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
int stasis_message_type_declined(const char *name)
Check whether a message type is declined.
Definition: stasis.c:2283
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
stasis_user_multi_object_snapshot_type
Object type code for multi user object snapshots.
Definition: stasis.h:1353
struct stasis_topic * to_topic
Definition: stasis.c:1535
struct ao2_container * container
Definition: res_fax.c:501
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
int final_message_rxed
Definition: stasis.c:696
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
Their was an error and no changes were applied.
struct stasis_topic * topic
Definition: stasis.h:891
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
struct stasis_topic * ast_channel_topic(struct ast_channel *chan)
A topic which publishes the events for a particular channel.
Configuration option-handling.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
Support for dynamic strings.
Definition: strings.h:623
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
#define ao2_unlink(container, obj)
Remove an object from a container.
Definition: astobj2.h:1578
int ao2_container_dup(struct ao2_container *dest, struct ao2_container *src, enum search_flags flags)
Copy all object references in the src container into the dest container.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void aco_info_destroy(struct aco_info *info)
Destroy an initialized aco_info struct.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
#define ao2_global_obj_release(holder)
Release the ao2 object held in the global holder.
Definition: astobj2.h:859
int subscriber_id
Definition: stasis.c:383
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
Local data parameter.
userdata associated with baseline taskprocessor test
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
Definition: stasis.h:611
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:704
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1134
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1748
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:971
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:568
char * command
Definition: cli.h:186
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name)
Delete a topic from the topic pool.
Definition: stasis.c:1864
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
Definition: stasis.c:953
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2213
Vector container support.
void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Stasis subscription callback function that does nothing.
Definition: stasis.c:809
An API for managing task processing threads that can be shared across modules.
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1201
void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
Add an object (snapshot) to the blob.
Definition: stasis.c:2001
const char * usage
Definition: cli.h:177
stasis_subscription_cb callback
Definition: stasis.c:688
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
Definition: json.c:399
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Publish single channel user event (for app_userevent compatibility)
Definition: stasis.c:2010
enum stasis_subscription_message_formatters stasis_message_type_available_formatters(const struct stasis_message_type *message_type)
Get a bitmap of available formatters for a message type.
The arg parameter is an object of the same type.
Definition: astobj2.h:1087
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
const char * stasis_topic_detail(const struct stasis_topic *topic)
Return the detail of a topic.
Definition: stasis.c:635
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:856
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
Standard Command Line Interface.
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1105
Type information about a category-level configurable object.
An opaque threadpool structure.
Definition: threadpool.c:36
void ast_copy_string(char *dst, const char *src, size_t size)
Size-limited null-terminating string copy.
Definition: strings.h:425
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:301
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
Definition: strings.h:80
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
const char * filename
A structure to hold global configuration-related options.
Definition: stasis.c:2180
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message)
Invoke the subscription's callback.
Definition: stasis.c:754
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197
Abstract JSON element (object, array, string, int, ...).
struct ast_json * ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_bridge_snapshot.
Definition: search.h:40
Forwarding information.
Definition: stasis.c:1531
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:622
Generic container type.
AO2_STRING_FIELD_SORT_FN(transport_monitor, key)
Sort function for struct transport_monitor.
Search option field mask.
Definition: astobj2.h:1072
void ast_format_duration_hh_mm_ss(int duration, char *buf, size_t length)
Formats a duration into HH:MM:SS.
Definition: utils.c:2297
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct ast_json * ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_channel_snapshot.
int ast_cli_completion_add(char *value)
Add a result to a request for completion options.
Definition: main/cli.c:2761
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
Definition: stasis.c:307
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
stasis_subscription_message_formatters
Stasis subscription formatter filters.
Definition: stasis.h:308
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
Endpoint abstractions.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
struct ast_json * ast_endpoint_snapshot_to_json(const struct ast_endpoint_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize)
Build a JSON object from a ast_endpoint_snapshot.
struct stasis_topic_pool * stasis_topic_pool_create(struct stasis_topic *pooled_topic)
Create a topic pool that routes messages from dynamically generated topics to the given topic...
Definition: stasis.c:1833
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205
Structure for mutex and tracking information.
Definition: lock.h:135
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1150
static force_inline int attribute_pure ast_str_case_hash(const char *str)
Compute a hash value on a case-insensitive string.
Definition: strings.h:1303
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
Definition: strings.h:659
char * name
Definition: stasis.c:386
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532
int stasis_init(void)
Initialize the Stasis subsystem.
Definition: stasis.c:3061