Asterisk - The Open Source Telephony Project
21.4.1
|
Stasis Message Bus API. More...
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/config_options.h"
#include "asterisk/cli.h"
Go to the source code of this file.
Data Structures | |
struct | ast_multi_object_blob |
A multi object blob data structure to carry user event stasis messages. More... | |
struct | stasis_config |
struct | stasis_declined_config |
A structure to hold global configuration-related options. More... | |
struct | stasis_forward |
Forwarding information. More... | |
struct | stasis_subscription |
struct | stasis_threadpool_conf |
Threadpool configuration options. More... | |
struct | stasis_topic |
struct | stasis_topic_pool |
struct | sync_task_data |
struct | topic_pool_entry |
struct | topic_proxy |
Macros | |
#define | FMT_FIELDS "%-64s %-64s\n" |
#define | FMT_HEADERS "%-64s %-64s\n" |
#define | INITIAL_SUBSCRIBERS_MAX 4 |
#define | TOPIC_ALL_BUCKETS 997 |
#define | topic_lock_both(topic1, topic2) |
Lock two topics. | |
#define | TOPIC_POOL_BUCKETS 57 |
Functions | |
struct stasis_subscription * | __stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func) |
Create a subscription. More... | |
struct stasis_subscription * | __stasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func) |
Create a subscription whose callbacks occur on a thread pool. More... | |
static | AO2_GLOBAL_OBJ_STATIC (globals) |
A global object container that will contain the stasis_config that gets swapped out on reloads. | |
AO2_STRING_FIELD_CASE_SORT_FN (topic_proxy, name) | |
AO2_STRING_FIELD_CMP_FN (topic_proxy, name) | |
AO2_STRING_FIELD_HASH_FN (topic_proxy, name) | |
void | ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object) |
Add an object (snapshot) to the blob. More... | |
struct ast_multi_object_blob * | ast_multi_object_blob_create (struct ast_json *blob) |
Create a stasis user event multi object blob. More... | |
void | ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob) |
Publish single channel user event (for app_userevent compatibility) More... | |
CONFIG_INFO_CORE ("stasis", cfg_info, globals, stasis_config_alloc,.files=ACO_FILES(&stasis_conf),) | |
Register information about the configs being processed by this module. | |
static int | declined_handler (const struct aco_option *opt, struct ast_variable *var, void *obj) |
static int | dispatch_exec_async (struct ast_taskprocessor_local *local) |
static int | dispatch_exec_sync (struct ast_taskprocessor_local *local) |
static unsigned int | dispatch_message (struct stasis_subscription *sub, struct stasis_message *message, int synchronous) |
static void | forward_dtor (void *obj) |
struct stasis_subscription * | internal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func) |
Create a subscription. More... | |
static int | link_topic_proxy (struct stasis_topic *topic, const char *name, const char *detail) |
static void | multi_object_blob_dtor (void *obj) |
static struct ast_str * | multi_object_blob_to_ami (void *obj) |
static struct ast_manager_event_blob * | multi_user_event_to_ami (struct stasis_message *message) |
static struct ast_json * | multi_user_event_to_json (struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) |
static void | proxy_dtor (void *weakproxy, void *container) |
static void | publish_msg (struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub) |
static void | send_subscription_subscribe (struct stasis_topic *topic, struct stasis_subscription *sub) |
static void | send_subscription_unsubscribe (struct stasis_topic *topic, struct stasis_subscription *sub) |
static void | stasis_cleanup (void) |
Cleanup function for graceful shutdowns. | |
static void * | stasis_config_alloc (void) |
static void | stasis_config_destructor (void *obj) |
static void | stasis_declined_config_destructor (void *obj) |
struct stasis_forward * | stasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic) |
Create a subscription which forwards all messages from one topic to another. More... | |
struct stasis_forward * | stasis_forward_cancel (struct stasis_forward *forward) |
int | stasis_init (void) |
Initialize the Stasis subsystem. More... | |
void | stasis_log_bad_type_access (const char *name) |
int | stasis_message_type_declined (const char *name) |
Check whether a message type is declined. More... | |
STASIS_MESSAGE_TYPE_DEFN (stasis_subscription_change_type) | |
void | stasis_publish (struct stasis_topic *topic, struct stasis_message *message) |
Publish a message to a topic's subscribers. More... | |
void | stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message) |
Publish a message to a topic's subscribers, synchronizing on the specified subscriber. More... | |
static char * | stasis_show_topic (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
static char * | stasis_show_topics (struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) |
void | stasis_subscription_accept_formatters (struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters) |
Indicate to a subscription that we are interested in messages with one or more formatters. More... | |
int | stasis_subscription_accept_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type) |
Indicate to a subscription that we are interested in a message type. More... | |
void | stasis_subscription_cb_noop (void *data, struct stasis_subscription *sub, struct stasis_message *message) |
Stasis subscription callback function that does nothing. More... | |
int | stasis_subscription_decline_message_type (struct stasis_subscription *subscription, const struct stasis_message_type *type) |
Indicate to a subscription that we are not interested in a message type. More... | |
int | stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg) |
Determine whether a message is the final message to be received on a subscription. More... | |
int | stasis_subscription_is_done (struct stasis_subscription *subscription) |
Returns whether subscription has received its final message. More... | |
int | stasis_subscription_is_subscribed (const struct stasis_subscription *sub) |
Returns whether a subscription is currently subscribed. More... | |
void | stasis_subscription_join (struct stasis_subscription *subscription) |
Block until the last message is processed on a subscription. More... | |
int | stasis_subscription_set_congestion_limits (struct stasis_subscription *subscription, long low_water, long high_water) |
Set the high and low alert water marks of the stasis subscription. More... | |
int | stasis_subscription_set_filter (struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter) |
Set the message type filtering level on a subscription. More... | |
const char * | stasis_subscription_uniqueid (const struct stasis_subscription *sub) |
Get the unique ID for the subscription. More... | |
struct stasis_topic * | stasis_topic_create (const char *name) |
Create a new topic. More... | |
struct stasis_topic * | stasis_topic_create_with_detail (const char *name, const char *detail) |
Create a new topic with given detail. More... | |
const char * | stasis_topic_detail (const struct stasis_topic *topic) |
Return the detail of a topic. More... | |
struct stasis_topic * | stasis_topic_get (const char *name) |
Get a topic of the given name. More... | |
const char * | stasis_topic_name (const struct stasis_topic *topic) |
Return the name of a topic. More... | |
struct stasis_topic_pool * | stasis_topic_pool_create (struct stasis_topic *pooled_topic) |
Create a topic pool that routes messages from dynamically generated topics to the given topic. More... | |
void | stasis_topic_pool_delete_topic (struct stasis_topic_pool *pool, const char *topic_name) |
Delete a topic from the topic pool. More... | |
struct stasis_topic * | stasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name) |
Find or create a topic in the pool. More... | |
int | stasis_topic_pool_topic_exists (const struct stasis_topic_pool *pool, const char *topic_name) |
Check if a topic exists in a pool. More... | |
size_t | stasis_topic_subscribers (const struct stasis_topic *topic) |
Return the number of subscribers of a topic. More... | |
struct stasis_subscription * | stasis_unsubscribe (struct stasis_subscription *sub) |
Cancel a subscription. More... | |
struct stasis_subscription * | stasis_unsubscribe_and_join (struct stasis_subscription *subscription) |
Cancel a subscription, blocking until the last message is processed. More... | |
static int | sub_cleanup (void *data) |
static struct stasis_subscription_change * | subscription_change_alloc (struct stasis_topic *topic, const char *uniqueid, const char *description) |
static void | subscription_change_dtor (void *obj) |
static void | subscription_dtor (void *obj) |
static void | subscription_invoke (struct stasis_subscription *sub, struct stasis_message *message) |
Invoke the subscription's callback. More... | |
static int | topic_add_subscription (struct stasis_topic *topic, struct stasis_subscription *sub) |
Add a subscriber to a topic. More... | |
static char * | topic_complete_name (const char *word) |
static void | topic_dtor (void *obj) |
static void | topic_pool_dtor (void *obj) |
static struct topic_pool_entry * | topic_pool_entry_alloc (const char *topic_name) |
static int | topic_pool_entry_cmp (void *obj, void *arg, int flags) |
static void | topic_pool_entry_dtor (void *obj) |
static int | topic_pool_entry_hash (const void *obj, const int flags) |
static int | topic_remove_subscription (struct stasis_topic *topic, struct stasis_subscription *sub) |
static int | userevent_exclusion_cb (const char *key) |
STASIS_MESSAGE_TYPE_DEFN (ast_multi_user_event_type,.to_json=multi_user_event_to_json,.to_ami=multi_user_event_to_ami,) | |
Define multi user event message type(s). | |
Variables | |
static struct ast_cli_entry | cli_stasis [] |
static struct aco_type | declined_option |
An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type. | |
struct aco_type * | declined_options [] = ACO_TYPES(&declined_option) |
struct aco_file | stasis_conf |
static struct ast_threadpool * | threadpool |
static struct aco_type | threadpool_option |
static struct aco_type * | threadpool_options [] = ACO_TYPES(&threadpool_option) |
struct ao2_container * | topic_all |
Stasis Message Bus API.
Definition in file stasis.c.
#define INITIAL_SUBSCRIBERS_MAX 4 |
Initial size of the subscribers list.
Definition at line 301 of file stasis.c.
Referenced by stasis_topic_create_with_detail().
#define TOPIC_POOL_BUCKETS 57 |
The number of buckets to use for topic pools
Definition at line 304 of file stasis.c.
Referenced by stasis_topic_pool_create().
struct stasis_subscription* __stasis_subscribe | ( | struct stasis_topic * | topic, |
stasis_subscription_cb | callback, | ||
void * | data, | ||
const char * | file, | ||
int | lineno, | ||
const char * | func | ||
) |
Create a subscription.
In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().
The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.
topic | Topic to subscribe to. |
callback | Callback function for subscription messages. |
data | Data to be passed to the callback, in addition to the message. |
file,lineno,func |
NULL | on error. |
Definition at line 942 of file stasis.c.
References internal_stasis_subscribe().
struct stasis_subscription* __stasis_subscribe_pool | ( | struct stasis_topic * | topic, |
stasis_subscription_cb | callback, | ||
void * | data, | ||
const char * | file, | ||
int | lineno, | ||
const char * | func | ||
) |
Create a subscription whose callbacks occur on a thread pool.
In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().
The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.
Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback
. This form of subscription should be used when many subscriptions may be made to the specified topic
.
topic | Topic to subscribe to. |
callback | Callback function for subscription messages. |
data | Data to be passed to the callback, in addition to the message. |
file,lineno,func |
NULL | on error. |
Definition at line 953 of file stasis.c.
References internal_stasis_subscribe().
struct stasis_subscription* internal_stasis_subscribe | ( | struct stasis_topic * | topic, |
stasis_subscription_cb | callback, | ||
void * | data, | ||
int | needs_mailbox, | ||
int | use_thread_pool, | ||
const char * | file, | ||
int | lineno, | ||
const char * | func | ||
) |
Create a subscription.
In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().
The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.
Note: modules outside of Stasis should use stasis_subscribe.
topic | Topic to subscribe to. |
callback | Callback function for subscription messages. |
data | Data to be passed to the callback, in addition to the message. |
needs_mailbox | Determines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread. |
use_thread_pool | Use the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero. |
file,lineno,func |
NULL | on error. |
Definition at line 856 of file stasis.c.
References stasis_subscription::accepted_formatters, stasis_subscription::accepted_message_types, ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), AST_VECTOR_INIT, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::join_cond, stasis_subscription::mailbox, STASIS_SUBSCRIPTION_FILTER_NONE, stasis_topic_name(), stasis_topic::subscriber_id, stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.
Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().
struct stasis_forward* stasis_forward_all | ( | struct stasis_topic * | from_topic, |
struct stasis_topic * | to_topic | ||
) |
Create a subscription which forwards all messages from one topic to another.
Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.
from_topic | Topic to forward. |
to_topic | Destination topic of forwarded messages. |
NULL | on error. |
Definition at line 1578 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_bump, ao2_ref, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_forward::from_topic, stasis_topic::subscribers, stasis_forward::to_topic, topic_add_subscription(), topic_lock_both, and stasis_topic::upstream_topics.
Referenced by ast_ari_bridges_record(), ast_channel_forward_endpoint(), create_subscriptions(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), and stasis_topic_pool_get_topic().
int stasis_init | ( | void | ) |
Initialize the Stasis subsystem.
Definition at line 3061 of file stasis.c.
References aco_info_init(), aco_option_register, aco_option_register_custom, aco_process_config(), ACO_PROCESS_ERROR, aco_set_defaults(), AO2_ALLOC_OPT_LOCK_MUTEX, ao2_container_alloc_hash, ao2_global_obj_ref, ao2_global_obj_replace_unref, ao2_ref, ast_cli_register_multiple, ast_multi_user_event_type(), ast_register_cleanup(), AST_VECTOR_INIT, ast_threadpool_options::auto_increment, stasis_config::declined_message_types, FLDSET, ast_threadpool_options::idle_timeout, stasis_threadpool_conf::idle_timeout_sec, ast_threadpool_options::initial_size, stasis_threadpool_conf::initial_size, ast_threadpool_options::max_size, stasis_threadpool_conf::max_size, OPT_INT_T, stasis_cleanup(), STASIS_MESSAGE_TYPE_INIT, stasis_subscription_change_type(), stasis_config::threadpool_options, and ast_threadpool_options::version.
int stasis_message_type_declined | ( | const char * | name | ) |
Check whether a message type is declined.
name | The name of the message type to check |
zero | The message type is not declined |
non-zero | The message type is declined |
Definition at line 2283 of file stasis.c.
References ao2_global_obj_ref, ao2_ref, ast_debug, stasis_declined_config::declined, stasis_config::declined_message_types, and OBJ_SEARCH_KEY.
Referenced by stasis_message_type_create().
void stasis_publish | ( | struct stasis_topic * | topic, |
struct stasis_message * | message | ||
) |
Publish a message to a topic's subscribers.
topic | Topic. |
message | Message to publish. |
This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.
Definition at line 1511 of file stasis.c.
Referenced by app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_final_snapshot(), ast_channel_publish_snapshot(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_publish_device_state_full(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), local_optimization_finished_cb(), local_optimization_started_cb(), manager_mute_mixmonitor(), publish_cluster_discovery_to_stasis_full(), publish_corosync_ping_to_stasis(), publish_hint_change(), publish_hint_remove(), publish_load_message_type(), publish_parked_call(), publish_parked_call_failure(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), stasis_app_control_publish(), stasis_app_user_event(), stasis_state_publish(), stasis_state_publish_by_id(), stasis_state_remove_publish_by_id(), and stun_monitor_request().
void stasis_publish_sync | ( | struct stasis_subscription * | sub, |
struct stasis_message * | message | ||
) |
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
sub | Subscription to synchronize on. |
message | Message to publish. |
The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.
All other subscribers to the topic the stasis_subscription is subscribed to are also delivered the message; this delivery however happens asynchronously.
Definition at line 1516 of file stasis.c.
References stasis_subscription::topic.
Referenced by stasis_message_router_publish_sync().
void stasis_subscription_accept_formatters | ( | struct stasis_subscription * | subscription, |
enum stasis_subscription_message_formatters | formatters | ||
) |
Indicate to a subscription that we are interested in messages with one or more formatters.
subscription | Subscription to alter. |
formatters | A bitmap of stasis_subscription_message_formatters we wish to receive. |
Definition at line 1093 of file stasis.c.
References stasis_subscription::accepted_formatters, and stasis_subscription::topic.
Referenced by stasis_message_router_accept_formatters(), and stasis_message_router_set_formatters_default().
int stasis_subscription_accept_message_type | ( | struct stasis_subscription * | subscription, |
const struct stasis_message_type * | type | ||
) |
Indicate to a subscription that we are interested in a message type.
This will cause the subscription to allow the given message type to be raised to our subscription callback. This enables internal filtering in the stasis message bus to reduce messages.
subscription | Subscription to add message type to. |
type | The message type we wish to receive. |
0 | on success |
-1 | failure |
Definition at line 1023 of file stasis.c.
References stasis_subscription::accepted_message_types, AST_VECTOR_REPLACE, stasis_subscription::filter, stasis_message_type_id(), stasis_message_type_name(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.
Referenced by ast_mwi_subscribe_pool(), devstate_init(), load_module(), load_pbx(), rtp_reload(), stasis_caching_accept_message_type(), stasis_message_router_add(), stasis_message_router_add_cache_update(), and xmpp_init_event_distribution().
void stasis_subscription_cb_noop | ( | void * | data, |
struct stasis_subscription * | sub, | ||
struct stasis_message * | message | ||
) |
Stasis subscription callback function that does nothing.
Definition at line 809 of file stasis.c.
Referenced by build_peer(), and mkintf().
int stasis_subscription_decline_message_type | ( | struct stasis_subscription * | subscription, |
const struct stasis_message_type * | type | ||
) |
Indicate to a subscription that we are not interested in a message type.
subscription | Subscription to remove message type from. |
type | The message type we don't wish to receive. |
0 | on success |
-1 | failure |
Definition at line 1053 of file stasis.c.
References stasis_subscription::accepted_message_types, AST_VECTOR_REPLACE, AST_VECTOR_SIZE, stasis_message_type_id(), stasis_message_type_name(), and stasis_subscription::topic.
int stasis_subscription_final_message | ( | struct stasis_subscription * | sub, |
struct stasis_message * | msg | ||
) |
Determine whether a message is the final message to be received on a subscription.
sub | Subscription on which the message was received. |
msg | Message to check. |
Definition at line 1174 of file stasis.c.
References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), and stasis_subscription_change::uniqueid.
Referenced by message_sink_cb(), and subscription_invoke().
int stasis_subscription_is_done | ( | struct stasis_subscription * | subscription | ) |
Returns whether subscription has received its final message.
Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.
subscription | Subscription. |
Definition at line 1118 of file stasis.c.
References stasis_subscription::final_message_rxed.
Referenced by stasis_message_router_is_done().
int stasis_subscription_is_subscribed | ( | const struct stasis_subscription * | sub | ) |
Returns whether a subscription is currently subscribed.
Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.
sub | Subscription to check |
Definition at line 1150 of file stasis.c.
References AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_topic::subscribers, and stasis_subscription::topic.
Referenced by stasis_caching_unsubscribe(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().
void stasis_subscription_join | ( | struct stasis_subscription * | subscription | ) |
Block until the last message is processed on a subscription.
This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.
subscription | Subscription to block on. |
Definition at line 1105 of file stasis.c.
References ao2_object_get_lockaddr(), stasis_subscription::final_message_processed, and stasis_subscription::join_cond.
Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().
int stasis_subscription_set_congestion_limits | ( | struct stasis_subscription * | subscription, |
long | low_water, | ||
long | high_water | ||
) |
Set the high and low alert water marks of the stasis subscription.
subscription | Pointer to a stasis subscription |
low_water | New queue low water mark. (-1 to set as 90% of high_water) |
high_water | New queue high water mark. |
0 | on success. |
-1 | on error (water marks not changed). |
Definition at line 1011 of file stasis.c.
References ast_taskprocessor_alert_set_levels(), and stasis_subscription::mailbox.
Referenced by stasis_message_router_set_congestion_limits().
int stasis_subscription_set_filter | ( | struct stasis_subscription * | subscription, |
enum stasis_subscription_message_filter | filter | ||
) |
Set the message type filtering level on a subscription.
This will cause the subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.
subscription | Subscription that should receive all messages. |
filter | What filter to use |
0 | on success |
-1 | failure |
Definition at line 1077 of file stasis.c.
References stasis_subscription::filter, filter(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, and stasis_subscription::topic.
Referenced by ast_mwi_subscribe_pool(), devstate_init(), load_module(), load_pbx(), rtp_reload(), stasis_caching_set_filter(), stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), and xmpp_init_event_distribution().
const char* stasis_subscription_uniqueid | ( | const struct stasis_subscription * | sub | ) |
Get the unique ID for the subscription.
sub | Subscription for which to get the unique ID. |
Definition at line 1169 of file stasis.c.
References stasis_subscription::uniqueid.
Referenced by stasis_subscription_final_message(), and topic_add_subscription().
struct stasis_topic* stasis_topic_create | ( | const char * | name | ) |
Create a new topic.
name | Name of the new topic. |
NULL | on error. |
<subsystem>:<functionality>[/<object>]
Definition at line 617 of file stasis.c.
References stasis_topic_create_with_detail().
Referenced by app_create(), app_init(), ast_parking_stasis_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_channels_init(), ast_stasis_system_init(), ast_test_init(), create_subscriptions(), devstate_init(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_sink_create(), stasis_state_manager_create(), and stasis_topic_pool_get_topic().
struct stasis_topic* stasis_topic_create_with_detail | ( | const char * | name, |
const char * | detail | ||
) |
Create a new topic with given detail.
name | Name of the new topic. |
detail | Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event" |
NULL | on error. |
Definition at line 568 of file stasis.c.
References ao2_ref, ast_debug, AST_VECTOR_INIT, INITIAL_SUBSCRIBERS_MAX, stasis_topic::name, stasis_topic_get(), stasis_topic::subscribers, and stasis_topic::upstream_topics.
Referenced by stasis_topic_create().
const char* stasis_topic_detail | ( | const struct stasis_topic * | topic | ) |
Return the detail of a topic.
topic | Topic. |
NULL | if topic is NULL . |
Definition at line 635 of file stasis.c.
References stasis_topic::detail.
struct stasis_topic* stasis_topic_get | ( | const char * | name | ) |
Get a topic of the given name.
name | Topic's name. |
NULL | on error or not exist. |
Definition at line 622 of file stasis.c.
References ao2_weakproxy_find, and OBJ_SEARCH_KEY.
Referenced by stasis_topic_create_with_detail().
const char* stasis_topic_name | ( | const struct stasis_topic * | topic | ) |
Return the name of a topic.
topic | Topic. |
NULL | if topic is NULL . |
Definition at line 627 of file stasis.c.
References stasis_topic::name.
Referenced by internal_stasis_subscribe(), stasis_caching_topic_create(), stasis_state_add_publisher(), stasis_state_add_subscriber(), stasis_state_manager_create(), stasis_state_subscribe_pool(), stasis_topic_pool_create(), stasis_topic_pool_delete_topic(), stasis_topic_pool_get_topic(), and topic_add_subscription().
struct stasis_topic_pool* stasis_topic_pool_create | ( | struct stasis_topic * | pooled_topic | ) |
Create a topic pool that routes messages from dynamically generated topics to the given topic.
pooled_topic | Topic to which messages will be routed |
NULL | on failure |
Definition at line 1833 of file stasis.c.
References AO2_ALLOC_OPT_LOCK_MUTEX, AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_alloc_hash, ao2_container_register(), ao2_ref, ast_alloca, pool, stasis_topic_name(), and TOPIC_POOL_BUCKETS.
Referenced by app_init(), and devstate_init().
void stasis_topic_pool_delete_topic | ( | struct stasis_topic_pool * | pool, |
const char * | topic_name | ||
) |
Delete a topic from the topic pool.
pool | Pool from which to delete the topic |
topic_name | Name of the topic to delete in the form of [<pool_topic_name>/]<topic_name> |
Definition at line 1864 of file stasis.c.
References OBJ_NODATA, OBJ_SEARCH_KEY, OBJ_UNLINK, and stasis_topic_name().
struct stasis_topic* stasis_topic_pool_get_topic | ( | struct stasis_topic_pool * | pool, |
const char * | topic_name | ||
) |
Find or create a topic in the pool.
pool | Pool for which to get the topic |
topic_name | Name of the topic to get |
NULL | if the topic was not found and could not be allocated |
Definition at line 1884 of file stasis.c.
References ao2_link_flags, ast_asprintf, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, SCOPED_AO2LOCK, stasis_forward_all(), stasis_topic_create(), and stasis_topic_name().
Referenced by ast_device_state_topic(), and ast_queue_topic().
int stasis_topic_pool_topic_exists | ( | const struct stasis_topic_pool * | pool, |
const char * | topic_name | ||
) |
Check if a topic exists in a pool.
pool | Pool to check |
topic_name | Name of the topic to check |
1 | exists |
0 | does not exist |
Definition at line 1927 of file stasis.c.
References ao2_ref, and OBJ_SEARCH_KEY.
Referenced by ast_publish_device_state_full().
size_t stasis_topic_subscribers | ( | const struct stasis_topic * | topic | ) |
Return the number of subscribers of a topic.
topic | Topic. |
Definition at line 643 of file stasis.c.
References AST_VECTOR_SIZE, and stasis_topic::subscribers.
struct stasis_subscription* stasis_unsubscribe | ( | struct stasis_subscription * | subscription | ) |
Cancel a subscription.
Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.
subscription | Subscription to cancel. |
NULL | for convenience |
Definition at line 971 of file stasis.c.
References ao2_bump, ast_taskprocessor_push(), stasis_subscription::mailbox, and stasis_subscription::topic.
Referenced by stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_state_unsubscribe(), stasis_unsubscribe_and_join(), and xmpp_init_event_distribution().
struct stasis_subscription* stasis_unsubscribe_and_join | ( | struct stasis_subscription * | subscription | ) |
Cancel a subscription, blocking until the last message is processed.
While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).
subscription | Subscription to cancel. |
NULL | for convenience |
Definition at line 1134 of file stasis.c.
References ao2_ref, stasis_subscription_join(), and stasis_unsubscribe().
Referenced by ast_xmpp_client_disconnect(), rtp_reload(), stasis_message_router_unsubscribe_and_join(), stasis_state_unsubscribe_and_join(), and unload_module().
|
static |
Invoke the subscription's callback.
sub | Subscription to invoke. |
message | Message to send. |
Definition at line 754 of file stasis.c.
References stasis_subscription::accepted_message_types, ast_tvdiff_ms(), ast_tvnow(), AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::final_message_processed, stasis_subscription::final_message_rxed, stasis_subscription::join_cond, stasis_message_type(), stasis_message_type_id(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, and stasis_subscription_final_message().
|
static |
Add a subscriber to a topic.
topic | Topic |
sub | Subscriber |
Definition at line 1201 of file stasis.c.
References ast_str_container_add(), AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, stasis_subscription_uniqueid(), stasis_topic_name(), stasis_topic::subscribers, and stasis_topic::upstream_topics.
Referenced by internal_stasis_subscribe(), and stasis_forward_all().
|
static |
struct aco_file stasis_conf |
|
static |