Asterisk - The Open Source Telephony Project  21.4.1
Data Structures | Macros | Functions | Variables
stasis.c File Reference

Stasis Message Bus API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#include "asterisk/cli.h"

Go to the source code of this file.

Data Structures

struct  ast_multi_object_blob
 A multi object blob data structure to carry user event stasis messages. More...
 
struct  stasis_config
 
struct  stasis_declined_config
 A structure to hold global configuration-related options. More...
 
struct  stasis_forward
 Forwarding information. More...
 
struct  stasis_subscription
 
struct  stasis_threadpool_conf
 Threadpool configuration options. More...
 
struct  stasis_topic
 
struct  stasis_topic_pool
 
struct  sync_task_data
 
struct  topic_pool_entry
 
struct  topic_proxy
 

Macros

#define FMT_FIELDS   "%-64s %-64s\n"
 
#define FMT_HEADERS   "%-64s %-64s\n"
 
#define INITIAL_SUBSCRIBERS_MAX   4
 
#define TOPIC_ALL_BUCKETS   997
 
#define topic_lock_both(topic1, topic2)
 Lock two topics.
 
#define TOPIC_POOL_BUCKETS   57
 

Functions

struct stasis_subscription__stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
struct stasis_subscription__stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
 Create a subscription whose callbacks occur on a thread pool. More...
 
static AO2_GLOBAL_OBJ_STATIC (globals)
 A global object container that will contain the stasis_config that gets swapped out on reloads.
 
 AO2_STRING_FIELD_CASE_SORT_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_CMP_FN (topic_proxy, name)
 
 AO2_STRING_FIELD_HASH_FN (topic_proxy, name)
 
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object (snapshot) to the blob. More...
 
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis user event multi object blob. More...
 
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Publish single channel user event (for app_userevent compatibility) More...
 
 CONFIG_INFO_CORE ("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),)
 Register information about the configs being processed by this module.
 
static int declined_handler (const struct aco_option *opt, struct ast_variable *var, void *obj)
 
static int dispatch_exec_async (struct ast_taskprocessor_local *local)
 
static int dispatch_exec_sync (struct ast_taskprocessor_local *local)
 
static unsigned int dispatch_message (struct stasis_subscription *sub, struct stasis_message *message, int synchronous)
 
static void forward_dtor (void *obj)
 
struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 Create a subscription. More...
 
static int link_topic_proxy (struct stasis_topic *topic, const char *name, const char *detail)
 
static void multi_object_blob_dtor (void *obj)
 
static struct ast_strmulti_object_blob_to_ami (void *obj)
 
static struct ast_manager_event_blobmulti_user_event_to_ami (struct stasis_message *message)
 
static struct ast_jsonmulti_user_event_to_json (struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
 
static void proxy_dtor (void *weakproxy, void *container)
 
static void publish_msg (struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub)
 
static void send_subscription_subscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void send_subscription_unsubscribe (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static void stasis_cleanup (void)
 Cleanup function for graceful shutdowns.
 
static void * stasis_config_alloc (void)
 
static void stasis_config_destructor (void *obj)
 
static void stasis_declined_config_destructor (void *obj)
 
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another. More...
 
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
 
int stasis_init (void)
 Initialize the Stasis subsystem. More...
 
void stasis_log_bad_type_access (const char *name)
 
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_subscription_change_type)
 
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers. More...
 
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber. More...
 
static char * stasis_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
static char * stasis_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 
void stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
 Indicate to a subscription that we are interested in messages with one or more formatters. More...
 
int stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are interested in a message type. More...
 
void stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Stasis subscription callback function that does nothing. More...
 
int stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type)
 Indicate to a subscription that we are not interested in a message type. More...
 
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription. More...
 
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message. More...
 
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed. More...
 
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription. More...
 
int stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water)
 Set the high and low alert water marks of the stasis subscription. More...
 
int stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a subscription. More...
 
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription. More...
 
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic. More...
 
struct stasis_topicstasis_topic_create_with_detail (const char *name, const char *detail)
 Create a new topic with given detail. More...
 
const char * stasis_topic_detail (const struct stasis_topic *topic)
 Return the detail of a topic. More...
 
struct stasis_topicstasis_topic_get (const char *name)
 Get a topic of the given name. More...
 
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic. More...
 
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic. More...
 
void stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Delete a topic from the topic pool. More...
 
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Find or create a topic in the pool. More...
 
int stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name)
 Check if a topic exists in a pool. More...
 
size_t stasis_topic_subscribers (const struct stasis_topic *topic)
 Return the number of subscribers of a topic. More...
 
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *sub)
 Cancel a subscription. More...
 
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed. More...
 
static int sub_cleanup (void *data)
 
static struct stasis_subscription_changesubscription_change_alloc (struct stasis_topic *topic, const char *uniqueid, const char *description)
 
static void subscription_change_dtor (void *obj)
 
static void subscription_dtor (void *obj)
 
static void subscription_invoke (struct stasis_subscription *sub, struct stasis_message *message)
 Invoke the subscription's callback. More...
 
static int topic_add_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 Add a subscriber to a topic. More...
 
static char * topic_complete_name (const char *word)
 
static void topic_dtor (void *obj)
 
static void topic_pool_dtor (void *obj)
 
static struct topic_pool_entrytopic_pool_entry_alloc (const char *topic_name)
 
static int topic_pool_entry_cmp (void *obj, void *arg, int flags)
 
static void topic_pool_entry_dtor (void *obj)
 
static int topic_pool_entry_hash (const void *obj, const int flags)
 
static int topic_remove_subscription (struct stasis_topic *topic, struct stasis_subscription *sub)
 
static int userevent_exclusion_cb (const char *key)
 
 STASIS_MESSAGE_TYPE_DEFN (ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,)
 Define multi user event message type(s).
 

Variables

static struct ast_cli_entry cli_stasis []
 
static struct aco_type declined_option
 An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type.
 
struct aco_typedeclined_options [] = ACO_TYPES(&declined_option)
 
struct aco_file stasis_conf
 
static struct ast_threadpoolthreadpool
 
static struct aco_type threadpool_option
 
static struct aco_typethreadpool_options [] = ACO_TYPES(&threadpool_option)
 
struct ao2_containertopic_all
 

Detailed Description

Stasis Message Bus API.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis.c.

Macro Definition Documentation

#define INITIAL_SUBSCRIBERS_MAX   4

Initial size of the subscribers list.

Definition at line 301 of file stasis.c.

Referenced by stasis_topic_create_with_detail().

#define TOPIC_POOL_BUCKETS   57

The number of buckets to use for topic pools

Definition at line 304 of file stasis.c.

Referenced by stasis_topic_pool_create().

Function Documentation

struct stasis_subscription* __stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 942 of file stasis.c.

References internal_stasis_subscribe().

949 {
950  return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
951 }
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:856
struct stasis_subscription* __stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription whose callbacks occur on a thread pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12.8.0
Note
This callback will receive a callback with a message indicating it has been subscribed. This occurs immediately before accepted message types can be set and the callback must expect to receive it.

Definition at line 953 of file stasis.c.

References internal_stasis_subscribe().

960 {
961  return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
962 }
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:856
struct stasis_subscription* internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_thread_poolUse the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12

Definition at line 856 of file stasis.c.

References stasis_subscription::accepted_formatters, stasis_subscription::accepted_message_types, ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), AST_VECTOR_INIT, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::join_cond, stasis_subscription::mailbox, STASIS_SUBSCRIPTION_FILTER_NONE, stasis_topic_name(), stasis_topic::subscriber_id, stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().

865 {
866  struct stasis_subscription *sub;
867  int ret;
868 
869  if (!topic) {
870  return NULL;
871  }
872 
873  /* The ao2 lock is used for join_cond. */
874  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
875  if (!sub) {
876  return NULL;
877  }
878 
879 #ifdef AST_DEVMODE
880  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
881  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
882  if (ret < 0 || !sub->statistics) {
883  ao2_ref(sub, -1);
884  return NULL;
885  }
886 #else
887  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
888  if (ret < 0) {
889  ao2_ref(sub, -1);
890  return NULL;
891  }
892 #endif
893 
894  if (needs_mailbox) {
895  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
896 
897  /* Create name with seq number appended. */
898  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
899  use_thread_pool ? 'p' : 'm',
900  stasis_topic_name(topic));
901 
902  /*
903  * With a small number of subscribers, a thread-per-sub is
904  * acceptable. For a large number of subscribers, a thread
905  * pool should be used.
906  */
907  if (use_thread_pool) {
908  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
909  } else {
911  }
912  if (!sub->mailbox) {
913  ao2_ref(sub, -1);
914 
915  return NULL;
916  }
918  /* Taskprocessor has a reference */
919  ao2_ref(sub, +1);
920  }
921 
922  ao2_ref(topic, +1);
923  sub->topic = topic;
924  sub->callback = callback;
925  sub->data = data;
926  ast_cond_init(&sub->join_cond, NULL);
929  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
930 
931  if (topic_add_subscription(topic, sub) != 0) {
932  ao2_ref(sub, -1);
933  ao2_ref(topic, -1);
934 
935  return NULL;
936  }
937  send_subscription_subscribe(topic, sub);
938 
939  return sub;
940 }
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct stasis_subscription::@400 accepted_message_types
struct stasis_topic * topic
Definition: stasis.c:684
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
ast_cond_t join_cond
Definition: stasis.c:693
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
int subscriber_id
Definition: stasis.c:383
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:704
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1201
stasis_subscription_cb callback
Definition: stasis.c:688
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
static struct ast_threadpool * threadpool
Definition: stasis.c:307
struct stasis_forward* stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
)

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters
from_topicTopic to forward.
to_topicDestination topic of forwarded messages.
Returns
New forwarding subscription.
Return values
NULLon error.
Since
12

Definition at line 1578 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_bump, ao2_ref, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_forward::from_topic, stasis_topic::subscribers, stasis_forward::to_topic, topic_add_subscription(), topic_lock_both, and stasis_topic::upstream_topics.

Referenced by ast_ari_bridges_record(), ast_channel_forward_endpoint(), create_subscriptions(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), and stasis_topic_pool_get_topic().

1580 {
1581  int res;
1582  size_t idx;
1583  struct stasis_forward *forward;
1584 
1585  if (!from_topic || !to_topic) {
1586  return NULL;
1587  }
1588 
1589  forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1590  if (!forward) {
1591  return NULL;
1592  }
1593 
1594  /* Forwards to ourselves are implicit. */
1595  if (to_topic == from_topic) {
1596  return forward;
1597  }
1598 
1599  forward->from_topic = ao2_bump(from_topic);
1600  forward->to_topic = ao2_bump(to_topic);
1601 
1602  topic_lock_both(to_topic, from_topic);
1603  res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
1604  if (res != 0) {
1605  ao2_unlock(from_topic);
1606  ao2_unlock(to_topic);
1607  ao2_ref(forward, -1);
1608  return NULL;
1609  }
1610 
1611  for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
1612  topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
1613  }
1614  ao2_unlock(from_topic);
1615  ao2_unlock(to_topic);
1616 
1617  return forward;
1618 }
struct stasis_topic::@399 upstream_topics
struct stasis_topic::@398 subscribers
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
#define topic_lock_both(topic1, topic2)
Lock two topics.
Definition: stasis.c:425
struct stasis_topic * from_topic
Definition: stasis.c:1533
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_topic * to_topic
Definition: stasis.c:1535
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1201
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
Forwarding information.
Definition: stasis.c:1531
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
int stasis_init ( void  )

Initialize the Stasis subsystem.

Returns
0 on success.
Non-zero on error.
Since
12

Definition at line 3061 of file stasis.c.

References aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ast_cli_register_multiple, ast_multi_user_event_type(), ast_register_cleanup(), AST_VECTOR_INIT, ast_threadpool_options::auto_increment, stasis_config::declined_message_types, FLDSET, ast_threadpool_options::idle_timeout, stasis_threadpool_conf::idle_timeout_sec, ast_threadpool_options::initial_size, stasis_threadpool_conf::initial_size, ast_threadpool_options::max_size, stasis_threadpool_conf::max_size, OPT_INT_T, stasis_cleanup(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), stasis_config::threadpool_options, and ast_threadpool_options::version.

3062 {
3063  struct stasis_config *cfg;
3064  int cache_init;
3065  struct ast_threadpool_options threadpool_opts = { 0, };
3066 #ifdef AST_DEVMODE
3067  struct ao2_container *subscription_stats;
3068  struct ao2_container *topic_stats;
3069 #endif
3070 
3071  /* Be sure the types are cleaned up after the message bus */
3073 
3074  if (aco_info_init(&cfg_info)) {
3075  return -1;
3076  }
3077 
3078  aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
3079  declined_options, "", declined_handler, 0);
3080  aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
3081  threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
3082  FLDSET(struct stasis_threadpool_conf, initial_size), 0,
3083  INT_MAX);
3084  aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
3085  threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
3086  FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
3087  INT_MAX);
3088  aco_option_register(&cfg_info, "max_size", ACO_EXACT,
3089  threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
3090  FLDSET(struct stasis_threadpool_conf, max_size), 0,
3091  INT_MAX);
3092 
3093  if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
3094  struct stasis_config *default_cfg = stasis_config_alloc();
3095 
3096  if (!default_cfg) {
3097  return -1;
3098  }
3099 
3100  if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
3101  ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
3102  ao2_ref(default_cfg, -1);
3103 
3104  return -1;
3105  }
3106 
3107  if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
3108  ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
3109  ao2_ref(default_cfg, -1);
3110 
3111  return -1;
3112  }
3113 
3114  ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
3115  ao2_global_obj_replace_unref(globals, default_cfg);
3116  cfg = default_cfg;
3117  } else {
3118  cfg = ao2_global_obj_ref(globals);
3119  if (!cfg) {
3120  ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
3121 
3122  return -1;
3123  }
3124  }
3125 
3126  threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
3127  threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
3128  threadpool_opts.auto_increment = 1;
3129  threadpool_opts.max_size = cfg->threadpool_options->max_size;
3130  threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
3131  threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
3132  ao2_ref(cfg, -1);
3133  if (!threadpool) {
3134  ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
3135 
3136  return -1;
3137  }
3138 
3139  cache_init = stasis_cache_init();
3140  if (cache_init != 0) {
3141  return -1;
3142  }
3143 
3145  return -1;
3146  }
3148  return -1;
3149  }
3150 
3151  topic_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_ALL_BUCKETS,
3152  topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
3153  if (!topic_all) {
3154  return -1;
3155  }
3156 
3157  if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
3158  return -1;
3159  }
3160 
3161 #ifdef AST_DEVMODE
3162  /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
3163  * topic or subscripton.
3164  */
3165  subscription_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
3166  subscription_statistics_hash, 0, subscription_statistics_cmp);
3167  if (!subscription_stats) {
3168  return -1;
3169  }
3170  ao2_global_obj_replace_unref(subscription_statistics, subscription_stats);
3171  ao2_cleanup(subscription_stats);
3172 
3173  topic_stats = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
3174  topic_statistics_hash, 0, topic_statistics_cmp);
3175  if (!topic_stats) {
3176  return -1;
3177  }
3178  ao2_global_obj_replace_unref(topic_statistics, topic_stats);
3179  ao2_cleanup(topic_stats);
3180  if (!topic_stats) {
3181  return -1;
3182  }
3183 
3184  AST_VECTOR_INIT(&message_type_statistics, 0);
3185 
3186  if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
3187  return -1;
3188  }
3189 #endif
3190 
3191  return 0;
3192 }
int auto_increment
Number of threads to increment pool by.
Definition: threadpool.h:90
int idle_timeout
Time limit in seconds for idle threads.
Definition: threadpool.h:79
#define aco_option_register_custom(info, name, matchtype, types, default_val, handler, flags)
Register a config option.
int initial_size
Number of threads the pool will start with.
Definition: threadpool.h:100
static void stasis_cleanup(void)
Cleanup function for graceful shutdowns.
Definition: stasis.c:3042
int max_size
Maximum number of threads a pool may have.
Definition: threadpool.h:110
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
#define aco_option_register(info, name, matchtype, types, default_val, opt_type, flags,...)
Register a config option.
Threadpool configuration options.
Definition: stasis.c:2186
#define ast_cli_register_multiple(e, len)
Register multiple commands.
Definition: cli.h:265
enum aco_process_status aco_process_config(struct aco_info *info, int reload)
Process a config info via the options registered with an aco_info.
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
#define FLDSET(type,...)
Convert a struct and list of fields to an argument list of field offsets.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
int aco_info_init(struct aco_info *info)
Initialize an aco_info structure.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
Definition: clicompat.c:19
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
Their was an error and no changes were applied.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
int aco_set_defaults(struct aco_type *type, const char *category, void *obj)
Set all default options of obj.
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2213
struct stasis_message_type * ast_multi_user_event_type(void)
Message type for custom user defined events with multi object blobs.
#define ao2_global_obj_replace_unref(holder, obj)
Replace an ao2 object in the global holder, throwing away any old object.
Definition: astobj2.h:901
struct stasis_threadpool_conf * threadpool_options
Definition: stasis.c:2197
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
Generic container type.
Type for default option handler for signed integers.
static struct ast_threadpool * threadpool
Definition: stasis.c:307
int stasis_message_type_declined ( const char *  name)

Check whether a message type is declined.

Parameters
nameThe name of the message type to check
Return values
zeroThe message type is not declined
non-zeroThe message type is declined

Definition at line 2283 of file stasis.c.

References ao2_global_obj_ref, ao2_ref, ast_debug, stasis_declined_config::declined, stasis_config::declined_message_types, and OBJ_SEARCH_KEY.

Referenced by stasis_message_type_create().

2284 {
2285  struct stasis_config *cfg = ao2_global_obj_ref(globals);
2286  char *name_in_declined;
2287  int res;
2288 
2289  if (!cfg || !cfg->declined_message_types) {
2290  ao2_cleanup(cfg);
2291  return 0;
2292  }
2293 
2294  name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
2295  res = name_in_declined ? 1 : 0;
2296  ao2_cleanup(name_in_declined);
2297  ao2_ref(cfg, -1);
2298  if (res) {
2299  ast_debug(4, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
2300  }
2301  return res;
2302 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_global_obj_ref(holder)
Get a reference to the object stored in the global holder.
Definition: astobj2.h:918
struct ao2_container * declined
Definition: stasis.c:2182
struct stasis_declined_config * declined_message_types
Definition: stasis.c:2199
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters
topicTopic.
messageMessage to publish.

This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since
12

Definition at line 1511 of file stasis.c.

Referenced by app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_parked_call(), publish_parked_call_failure(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), and stun_monitor_request().

1512 {
1513  publish_msg(topic, message, NULL);
1514 }
void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters
subSubscription to synchronize on.
messageMessage to publish.

The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since
12.1.0

Definition at line 1516 of file stasis.c.

References stasis_subscription::topic.

Referenced by stasis_message_router_publish_sync().

1517 {
1518  ast_assert(sub != NULL);
1519 
1520  publish_msg(sub->topic, message, sub);
1521 }
struct stasis_topic * topic
Definition: stasis.c:684
void stasis_subscription_accept_formatters ( struct stasis_subscription subscription,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a subscription that we are interested in messages with one or more formatters.

Parameters
subscriptionSubscription to alter.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 1093 of file stasis.c.

References stasis_subscription::accepted_formatters, and stasis_subscription::topic.

Referenced by stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().

1095 {
1096  ast_assert(subscription != NULL);
1097 
1098  ao2_lock(subscription->topic);
1099  subscription->accepted_formatters = formatters;
1100  ao2_unlock(subscription->topic);
1101 
1102  return;
1103 }
struct stasis_topic * topic
Definition: stasis.c:684
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:704
int stasis_subscription_accept_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are interested in a message type.

This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
subscriptionSubscription to add message type to.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0
Note
If you are wanting to use stasis_final_message you will need to accept stasis_subscription_change_type as a message type.
Until the subscription is set to selective filtering it is possible for it to receive messages of message types that would not normally be accepted.

Definition at line 1023 of file stasis.c.

References stasis_subscription::accepted_message_types, AST_VECTOR_REPLACE, stasis_subscription::filter, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by ast_mwi_subscribe_pool(), devstate_init(), load_module(), load_pbx(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), and xmpp_init_event_distribution().

1025 {
1026  if (!subscription) {
1027  return -1;
1028  }
1029 
1030  ast_assert(type != NULL);
1031  ast_assert(stasis_message_type_name(type) != NULL);
1032 
1033  if (!type || !stasis_message_type_name(type)) {
1034  /* Filtering is unreliable as this message type is not yet initialized
1035  * so force all messages through.
1036  */
1038  return 0;
1039  }
1040 
1041  ao2_lock(subscription->topic);
1042  if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
1043  /* We do this for the same reason as above. The subscription can still operate, so allow
1044  * it to do so by forcing all messages through.
1045  */
1047  }
1048  ao2_unlock(subscription->topic);
1049 
1050  return 0;
1051 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_subscription::@400 accepted_message_types
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_topic * topic
Definition: stasis.c:684
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
void stasis_subscription_cb_noop ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)

Stasis subscription callback function that does nothing.

Note
This callback should be used for events are not directly processed, but need to be generated so data can be retrieved from cache later. Subscriptions with this callback can be released with stasis_unsubscribe, even during module unload.
Since
13.5

Definition at line 809 of file stasis.c.

Referenced by build_peer(), and mkintf().

810 {
811 }
int stasis_subscription_decline_message_type ( struct stasis_subscription subscription,
const struct stasis_message_type type 
)

Indicate to a subscription that we are not interested in a message type.

Parameters
subscriptionSubscription to remove message type from.
typeThe message type we don't wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1053 of file stasis.c.

References stasis_subscription::accepted_message_types, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, stasis_message_type_id(), stasis_message_type_name(), and stasis_subscription::topic.

1055 {
1056  if (!subscription) {
1057  return -1;
1058  }
1059 
1060  ast_assert(type != NULL);
1061  ast_assert(stasis_message_type_name(type) != NULL);
1062 
1063  if (!type || !stasis_message_type_name(type)) {
1064  return 0;
1065  }
1066 
1067  ao2_lock(subscription->topic);
1068  if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
1069  /* The memory is already allocated so this can't fail */
1071  }
1072  ao2_unlock(subscription->topic);
1073 
1074  return 0;
1075 }
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
struct stasis_subscription::@400 accepted_message_types
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
struct stasis_topic * topic
Definition: stasis.c:684
#define AST_VECTOR_REPLACE(vec, idx, elem)
Replace an element at a specific position in a vector, growing the vector if needed.
Definition: vector.h:284
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters
subSubscription on which the message was received.
msgMessage to check.
Returns
zero if the provided message is not the final message.
non-zero if the provided message is the final message.
Since
12

Definition at line 1174 of file stasis.c.

References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), and stasis_subscription_change::uniqueid.

Referenced by message_sink_cb(), and subscription_invoke().

1175 {
1176  struct stasis_subscription_change *change;
1177 
1179  return 0;
1180  }
1181 
1182  change = stasis_message_data(msg);
1183  if (strcmp("Unsubscribe", change->description)) {
1184  return 0;
1185  }
1186 
1187  if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
1188  return 0;
1189  }
1190 
1191  return 1;
1192 }
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1169
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
Holds details about changes to subscriptions for the specified topic.
Definition: stasis.h:890
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
int stasis_subscription_is_done ( struct stasis_subscription subscription)

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters
subscriptionSubscription.
Returns
True (non-zero) if stasis_subscription_final_message() has been received.
False (zero) if waiting for the end.

Definition at line 1118 of file stasis.c.

References stasis_subscription::final_message_rxed.

Referenced by stasis_message_router_is_done().

1119 {
1120  if (subscription) {
1121  int ret;
1122 
1123  ao2_lock(subscription);
1124  ret = subscription->final_message_rxed;
1125  ao2_unlock(subscription);
1126 
1127  return ret;
1128  }
1129 
1130  /* Null subscription is about as done as you can get */
1131  return 1;
1132 }
int final_message_rxed
Definition: stasis.c:696
int stasis_subscription_is_subscribed ( const struct stasis_subscription sub)

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters
subSubscription to check
Returns
False (zero) if subscription is not subscribed.
True (non-zero) if still subscribed.

Definition at line 1150 of file stasis.c.

References AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_topic::subscribers, and stasis_subscription::topic.

Referenced by stasis_caching_unsubscribe(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

1151 {
1152  if (sub) {
1153  size_t i;
1154  struct stasis_topic *topic = sub->topic;
1155 
1156  ao2_lock(topic);
1157  for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
1158  if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
1159  ao2_unlock(topic);
1160  return 1;
1161  }
1162  }
1163  ao2_unlock(topic);
1164  }
1165 
1166  return 0;
1167 }
struct stasis_topic * topic
Definition: stasis.c:684
struct stasis_topic::@398 subscribers
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
void stasis_subscription_join ( struct stasis_subscription subscription)

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters
subscriptionSubscription to block on.
Since
12

Definition at line 1105 of file stasis.c.

References ao2_object_get_lockaddr(), stasis_subscription::final_message_processed, and stasis_subscription::join_cond.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

1106 {
1107  if (subscription) {
1108  ao2_lock(subscription);
1109  /* Wait until the processed flag has been set */
1110  while (!subscription->final_message_processed) {
1111  ast_cond_wait(&subscription->join_cond,
1112  ao2_object_get_lockaddr(subscription));
1113  }
1114  ao2_unlock(subscription);
1115  }
1116 }
ast_cond_t join_cond
Definition: stasis.c:693
int final_message_processed
Definition: stasis.c:699
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
Definition: astobj2.c:476
int stasis_subscription_set_congestion_limits ( struct stasis_subscription subscription,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis subscription.

Since
13.10.0
Parameters
subscriptionPointer to a stasis subscription
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 1011 of file stasis.c.

References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.

Referenced by stasis_message_router_set_congestion_limits().

1013 {
1014  int res = -1;
1015 
1016  if (subscription) {
1017  res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
1018  low_water, high_water);
1019  }
1020  return res;
1021 }
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
int stasis_subscription_set_filter ( struct stasis_subscription subscription,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a subscription.

This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
subscriptionSubscription that should receive all messages.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 1077 of file stasis.c.

References stasis_subscription::filter, filter(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.

Referenced by ast_mwi_subscribe_pool(), devstate_init(), load_module(), load_pbx(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), and xmpp_init_event_distribution().

1079 {
1080  if (!subscription) {
1081  return -1;
1082  }
1083 
1084  ao2_lock(subscription->topic);
1085  if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
1086  subscription->filter = filter;
1087  }
1088  ao2_unlock(subscription->topic);
1089 
1090  return 0;
1091 }
struct stasis_topic * topic
Definition: stasis.c:684
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
const char* stasis_subscription_uniqueid ( const struct stasis_subscription sub)

Get the unique ID for the subscription.

Parameters
subSubscription for which to get the unique ID.
Returns
Unique ID for the subscription.
Since
12

Definition at line 1169 of file stasis.c.

References stasis_subscription::uniqueid.

Referenced by stasis_subscription_final_message(), and topic_add_subscription().

1170 {
1171  return sub->uniqueid;
1172 }
struct stasis_topic* stasis_topic_create ( const char *  name)

Create a new topic.

Parameters
nameName of the new topic.
Returns
New topic instance.
Return values
NULLon error.
Since
12
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.
Topic names should be in the form of
<subsystem>:<functionality>[/<object>] 

Definition at line 617 of file stasis.c.

References stasis_topic_create_with_detail().

Referenced by app_create(), app_init(), ast_parking_stasis_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_channels_init(), ast_stasis_system_init(), ast_test_init(), create_subscriptions(), devstate_init(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), and stasis_topic_pool_get_topic().

618 {
620 }
struct stasis_topic * stasis_topic_create_with_detail(const char *name, const char *detail)
Create a new topic with given detail.
Definition: stasis.c:568
char * name
Definition: stasis.c:386
struct stasis_topic* stasis_topic_create_with_detail ( const char *  name,
const char *  detail 
)

Create a new topic with given detail.

Parameters
nameName of the new topic.
detailDetail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
Returns
New topic instance.
Return values
NULLon error.
Note
There is no explicit ability to unsubscribe all subscribers from a topic and destroy it. As a result the topic can persist until the last subscriber unsubscribes itself even if there is no publisher.

Definition at line 568 of file stasis.c.

References ao2_ref, ast_debug, AST_VECTOR_INIT, INITIAL_SUBSCRIBERS_MAX, stasis_topic::name, stasis_topic_get(), stasis_topic::subscribers, and stasis_topic::upstream_topics.

Referenced by stasis_topic_create().

571 {
572  struct stasis_topic *topic;
573  int res = 0;
574 
575  if (!name|| !strlen(name) || !detail) {
576  return NULL;
577  }
578  ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
579 
580  topic = stasis_topic_get(name);
581  if (topic) {
582  ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
583  name, detail);
584  return topic;
585  }
586 
587  topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
588  if (!topic) {
589  return NULL;
590  }
591 
593  res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
594  if (res) {
595  ao2_ref(topic, -1);
596  return NULL;
597  }
598 
599  /* link to the proxy */
600  if (link_topic_proxy(topic, name, detail)) {
601  ao2_ref(topic, -1);
602  return NULL;
603  }
604 
605 #ifdef AST_DEVMODE
606  topic->statistics = stasis_topic_statistics_create(topic);
607  if (!topic->statistics) {
608  ao2_ref(topic, -1);
609  return NULL;
610  }
611 #endif
612  ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
613 
614  return topic;
615 }
char * detail
Definition: stasis.c:389
struct stasis_topic::@399 upstream_topics
struct stasis_topic::@398 subscribers
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
#define INITIAL_SUBSCRIBERS_MAX
Definition: stasis.c:301
struct stasis_topic * stasis_topic_get(const char *name)
Get a topic of the given name.
Definition: stasis.c:622
char * name
Definition: stasis.c:386
const char* stasis_topic_detail ( const struct stasis_topic topic)

Return the detail of a topic.

Parameters
topicTopic.
Returns
Detail of the topic.
Return values
NULLif topic is NULL.
Since
12

Definition at line 635 of file stasis.c.

References stasis_topic::detail.

636 {
637  if (!topic) {
638  return NULL;
639  }
640  return topic->detail;
641 }
char * detail
Definition: stasis.c:389
struct stasis_topic* stasis_topic_get ( const char *  name)

Get a topic of the given name.

Parameters
nameTopic's name.
Returns
Name of the topic.
Return values
NULLon error or not exist.
Note
This SHOULD NOT be used in normal operation for publishing messages.

Definition at line 622 of file stasis.c.

References ao2_weakproxy_find, and OBJ_SEARCH_KEY.

Referenced by stasis_topic_create_with_detail().

623 {
624  return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
625 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
Definition: astobj2.h:1748
char * name
Definition: stasis.c:386
const char* stasis_topic_name ( const struct stasis_topic topic)

Return the name of a topic.

Parameters
topicTopic.
Returns
Name of the topic.
Return values
NULLif topic is NULL.

Definition at line 627 of file stasis.c.

References stasis_topic::name.

Referenced by internal_stasis_subscribe(), stasis_caching_topic_create(), stasis_state_add_publisher(), stasis_state_add_subscriber(), stasis_state_manager_create(), stasis_state_subscribe_pool(), stasis_topic_pool_create(), stasis_topic_pool_delete_topic(), stasis_topic_pool_get_topic(), and topic_add_subscription().

628 {
629  if (!topic) {
630  return NULL;
631  }
632  return topic->name;
633 }
char * name
Definition: stasis.c:386
struct stasis_topic_pool* stasis_topic_pool_create ( struct stasis_topic pooled_topic)

Create a topic pool that routes messages from dynamically generated topics to the given topic.

Parameters
pooled_topicTopic to which messages will be routed
Returns
the new stasis_topic_pool
Return values
NULLon failure

Definition at line 1833 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, pool, stasis_topic_name(), and TOPIC_POOL_BUCKETS.

Referenced by app_init(), and devstate_init().

1834 {
1835  struct stasis_topic_pool *pool;
1836 
1837  pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
1838  if (!pool) {
1839  return NULL;
1840  }
1841 
1842  pool->pool_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
1843  TOPIC_POOL_BUCKETS, topic_pool_entry_hash, NULL, topic_pool_entry_cmp);
1844  if (!pool->pool_container) {
1845  ao2_cleanup(pool);
1846  return NULL;
1847  }
1848 
1849 #ifdef AO2_DEBUG
1850  {
1851  char *container_name =
1852  ast_alloca(strlen(stasis_topic_name(pooled_topic)) + strlen("-pool") + 1);
1853  sprintf(container_name, "%s-pool", stasis_topic_name(pooled_topic));
1854  ao2_container_register(container_name, pool->pool_container, topic_pool_prnt_obj);
1855  }
1856 #endif
1857 
1858  ao2_ref(pooled_topic, +1);
1859  pool->pool_topic = pooled_topic;
1860 
1861  return pool;
1862 }
#define TOPIC_POOL_BUCKETS
Definition: stasis.c:304
static pj_pool_t * pool
Global memory pool for configuration and timers.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
Definition: astmm.h:288
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
void stasis_topic_pool_delete_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Delete a topic from the topic pool.

Parameters
poolPool from which to delete the topic
topic_nameName of the topic to delete in the form of
[<pool_topic_name>/]<topic_name> 
Since
13.24
15.6
16.1

Definition at line 1864 of file stasis.c.

References OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, and stasis_topic_name().

1865 {
1866  /*
1867  * The topic_name passed in could be a fully-qualified name like <pool_topic_name>/<topic_name>
1868  * or just <topic_name>. If it's fully qualified, we need to skip past <pool_topic_name>
1869  * name and search only on <topic_name>.
1870  */
1871  const char *pool_topic_name = stasis_topic_name(pool->pool_topic);
1872  int pool_topic_name_len = strlen(pool_topic_name);
1873  const char *search_topic_name;
1874 
1875  if (strncmp(pool_topic_name, topic_name, pool_topic_name_len) == 0) {
1876  search_topic_name = topic_name + pool_topic_name_len + 1;
1877  } else {
1878  search_topic_name = topic_name;
1879  }
1880 
1881  ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
1882 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
struct stasis_topic* stasis_topic_pool_get_topic ( struct stasis_topic_pool pool,
const char *  topic_name 
)

Find or create a topic in the pool.

Parameters
poolPool for which to get the topic
topic_nameName of the topic to get
Returns
The already stored or newly allocated topic
Return values
NULLif the topic was not found and could not be allocated

Definition at line 1884 of file stasis.c.

References ao2_link_flags, ast_asprintf, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, SCOPED_AO2LOCK, stasis_forward_all(), stasis_topic_create(), and stasis_topic_name().

Referenced by ast_device_state_topic(), and ast_queue_topic().

1885 {
1886  RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
1887  SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
1888  char *new_topic_name;
1889  int ret;
1890 
1891  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1892  if (topic_pool_entry) {
1893  return topic_pool_entry->topic;
1894  }
1895 
1896  topic_pool_entry = topic_pool_entry_alloc(topic_name);
1897  if (!topic_pool_entry) {
1898  return NULL;
1899  }
1900 
1901  /* To provide further detail and to ensure that the topic is unique within the scope of the
1902  * system we prefix it with the pooling topic name, which should itself already be unique.
1903  */
1904  ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
1905  if (ret < 0) {
1906  return NULL;
1907  }
1908 
1909  topic_pool_entry->topic = stasis_topic_create(new_topic_name);
1910  ast_free(new_topic_name);
1911  if (!topic_pool_entry->topic) {
1912  return NULL;
1913  }
1914 
1915  topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
1916  if (!topic_pool_entry->forward) {
1917  return NULL;
1918  }
1919 
1920  if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
1921  return NULL;
1922  }
1923 
1924  return topic_pool_entry->topic;
1925 }
Definition: stasis.c:1709
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
int stasis_topic_pool_topic_exists ( const struct stasis_topic_pool pool,
const char *  topic_name 
)

Check if a topic exists in a pool.

Parameters
poolPool to check
topic_nameName of the topic to check
Return values
1exists
0does not exist
Since
13.23.0

Definition at line 1927 of file stasis.c.

References ao2_ref, and OBJ_SEARCH_KEY.

Referenced by ast_publish_device_state_full().

1928 {
1930 
1931  topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY);
1932  if (!topic_pool_entry) {
1933  return 0;
1934  }
1935 
1936  ao2_ref(topic_pool_entry, -1);
1937  return 1;
1938 }
Definition: stasis.c:1709
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
size_t stasis_topic_subscribers ( const struct stasis_topic topic)

Return the number of subscribers of a topic.

Parameters
topicTopic.
Returns
Number of subscribers of the topic.
Since
17.0.0

Definition at line 643 of file stasis.c.

References AST_VECTOR_SIZE, and stasis_topic::subscribers.

644 {
645  return AST_VECTOR_SIZE(&topic->subscribers);
646 }
struct stasis_topic::@398 subscribers
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
struct stasis_subscription* stasis_unsubscribe ( struct stasis_subscription subscription)

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 971 of file stasis.c.

References ao2_bump, ast_taskprocessor_push(), stasis_subscription::mailbox, and stasis_subscription::topic.

Referenced by stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), and xmpp_init_event_distribution().

972 {
973  /* The subscription may be the last ref to this topic. Hold
974  * the topic ref open until after the unlock. */
975  struct stasis_topic *topic;
976 
977  if (!sub) {
978  return NULL;
979  }
980 
981  topic = ao2_bump(sub->topic);
982 
983  /* We have to remove the subscription first, to ensure the unsubscribe
984  * is the final message */
985  if (topic_remove_subscription(sub->topic, sub) != 0) {
986  ast_log(LOG_ERROR,
987  "Internal error: subscription has invalid topic\n");
988  ao2_cleanup(topic);
989 
990  return NULL;
991  }
992 
993  /* Now let everyone know about the unsubscribe */
994  send_subscription_unsubscribe(topic, sub);
995 
996  /* When all that's done, remove the ref the mailbox has on the sub */
997  if (sub->mailbox) {
998  if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
999  /* Nothing we can do here, the conditional is just to keep
1000  * the compiler happy that we're not ignoring the result. */
1001  }
1002  }
1003 
1004  /* Unsubscribing unrefs the subscription */
1005  ao2_cleanup(sub);
1006  ao2_cleanup(topic);
1007 
1008  return NULL;
1009 }
struct stasis_topic * topic
Definition: stasis.c:684
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
struct stasis_subscription* stasis_unsubscribe_and_join ( struct stasis_subscription subscription)

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters
subscriptionSubscription to cancel.
Return values
NULLfor convenience
Since
12

Definition at line 1134 of file stasis.c.

References ao2_ref, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by ast_xmpp_client_disconnect(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), and unload_module().

1136 {
1137  if (!subscription) {
1138  return NULL;
1139  }
1140 
1141  /* Bump refcount to hold it past the unsubscribe */
1142  ao2_ref(subscription, +1);
1143  stasis_unsubscribe(subscription);
1144  stasis_subscription_join(subscription);
1145  /* Now decrement the refcount back */
1146  ao2_cleanup(subscription);
1147  return NULL;
1148 }
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *sub)
Cancel a subscription.
Definition: stasis.c:971
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1105
static void subscription_invoke ( struct stasis_subscription sub,
struct stasis_message message 
)
static

Invoke the subscription's callback.

Parameters
subSubscription to invoke.
messageMessage to send.

Definition at line 754 of file stasis.c.

References stasis_subscription::accepted_message_types, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::final_message_processed, stasis_subscription::final_message_rxed, stasis_subscription::join_cond, stasis_message_type(), stasis_message_type_id(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, and stasis_subscription_final_message().

756 {
757  unsigned int final = stasis_subscription_final_message(sub, message);
759 #ifdef AST_DEVMODE
760  struct timeval start;
761  long elapsed;
762 
763  start = ast_tvnow();
764 #endif
765 
766  /* Notify that the final message has been received */
767  if (final) {
768  ao2_lock(sub);
769  sub->final_message_rxed = 1;
770  ast_cond_signal(&sub->join_cond);
771  ao2_unlock(sub);
772  }
773 
774  /*
775  * If filtering is turned on and this is a 'final' message, we only invoke the callback
776  * if the subscriber accepts subscription_change message types.
777  */
778  if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
779  (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
780  /* Since sub is mostly immutable, no need to lock sub */
781  sub->callback(sub->data, sub, message);
782  }
783 
784  /* Notify that the final message has been processed */
785  if (final) {
786  ao2_lock(sub);
787  sub->final_message_processed = 1;
788  ast_cond_signal(&sub->join_cond);
789  ao2_unlock(sub);
790  }
791 
792 #ifdef AST_DEVMODE
793  elapsed = ast_tvdiff_ms(ast_tvnow(), start);
794  if (elapsed > sub->statistics->highest_time_invoked) {
795  sub->statistics->highest_time_invoked = elapsed;
796  ao2_lock(sub->statistics);
797  sub->statistics->highest_time_message_type = stasis_message_type(message);
798  ao2_unlock(sub->statistics);
799  }
800  if (elapsed < sub->statistics->lowest_time_invoked) {
801  sub->statistics->lowest_time_invoked = elapsed;
802  }
803 #endif
804 }
struct stasis_subscription::@400 accepted_message_types
int stasis_message_type_id(const struct stasis_message_type *type)
Gets the id of a given message type.
ast_cond_t join_cond
Definition: stasis.c:693
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
int final_message_processed
Definition: stasis.c:699
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
int final_message_rxed
Definition: stasis.c:696
stasis_subscription_cb callback
Definition: stasis.c:688
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
static int topic_add_subscription ( struct stasis_topic topic,
struct stasis_subscription sub 
)
static

Add a subscriber to a topic.

Parameters
topicTopic
subSubscriber
Returns
0 on success
Non-zero on error

Definition at line 1201 of file stasis.c.

References ast_str_container_add(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), stasis_topic_name(), stasis_topic::subscribers, and stasis_topic::upstream_topics.

Referenced by internal_stasis_subscribe(), and stasis_forward_all().

1202 {
1203  size_t idx;
1204 
1205  ao2_lock(topic);
1206  /* The reference from the topic to the subscription is shared with
1207  * the owner of the subscription, which will explicitly unsubscribe
1208  * to release it.
1209  *
1210  * If we bumped the refcount here, the owner would have to unsubscribe
1211  * and cleanup, which is a bit awkward. */
1212  AST_VECTOR_APPEND(&topic->subscribers, sub);
1213 
1214  for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
1216  AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
1217  }
1218 
1219 #ifdef AST_DEVMODE
1221  ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
1222 #endif
1223 
1224  ao2_unlock(topic);
1225 
1226  return 0;
1227 }
struct stasis_topic::@399 upstream_topics
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
Definition: stasis.c:1169
struct stasis_topic::@398 subscribers
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1201
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
Definition: strings.c:205

Variable Documentation

struct ast_cli_entry cli_stasis[]
static
Initial value:
= {
{ .handler = stasis_show_topics , .summary = "Show all topics" ,},
{ .handler = stasis_show_topic , .summary = "Show topic" ,},
}

Definition at line 2476 of file stasis.c.

struct aco_file stasis_conf
Initial value:
= {
.filename = "stasis.conf",
.types = ACO_TYPES(&declined_option, &threadpool_option),
}
#define ACO_TYPES(...)
A helper macro to ensure that aco_info types always have a sentinel.
static struct aco_type declined_option
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config typ...
Definition: stasis.c:2213

Definition at line 2223 of file stasis.c.

struct ast_threadpool* threadpool
static

Thread pool for topics that don't want a dedicated taskprocessor

Definition at line 307 of file stasis.c.