Asterisk - The Open Source Telephony Project  21.4.1
Functions
stasis_internal.h File Reference

Internal Stasis APIs. More...

#include "asterisk/stasis.h"

Go to the source code of this file.

Functions

struct stasis_subscriptioninternal_stasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
 Create a subscription. More...
 

Detailed Description

Internal Stasis APIs.

This header file is used to define functions that are shared between files that make up Stasis Message Bus API. Functions declared here should not be used by any module outside of Stasis.

If you find yourself needing to call one of these functions directly, something has probably gone horribly wrong.

Author
Matt Jordan mjord.nosp@m.an@d.nosp@m.igium.nosp@m..com

Definition in file stasis_internal.h.

Function Documentation

struct stasis_subscription* internal_stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data,
int  needs_mailbox,
int  use_thread_pool,
const char *  file,
int  lineno,
const char *  func 
)

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Note: modules outside of Stasis should use stasis_subscribe.

Parameters
topicTopic to subscribe to.
callbackCallback function for subscription messages.
dataData to be passed to the callback, in addition to the message.
needs_mailboxDetermines whether or not the subscription requires a mailbox. Subscriptions with mailboxes will be delivered on some non-publisher thread; subscriptions without mailboxes will be delivered on the publisher thread.
use_thread_poolUse the thread pool for the subscription. This is only relevant if needs_mailbox is non-zero.
file,lineno,func
Returns
New stasis_subscription object.
Return values
NULLon error.
Since
12

Definition at line 856 of file stasis.c.

References stasis_subscription::accepted_formatters, stasis_subscription::accepted_message_types, ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), ast_taskprocessor_build_name(), ast_taskprocessor_get(), AST_TASKPROCESSOR_MAX_NAME, ast_taskprocessor_set_local(), AST_VECTOR_INIT, stasis_subscription::callback, stasis_subscription::data, stasis_subscription::filter, stasis_subscription::join_cond, stasis_subscription::mailbox, STASIS_SUBSCRIPTION_FILTER_NONE, stasis_topic_name(), stasis_topic::subscriber_id, stasis_subscription::topic, topic_add_subscription(), TPS_REF_DEFAULT, and stasis_subscription::uniqueid.

Referenced by __stasis_subscribe(), __stasis_subscribe_pool(), and stasis_caching_topic_create().

865 {
866  struct stasis_subscription *sub;
867  int ret;
868 
869  if (!topic) {
870  return NULL;
871  }
872 
873  /* The ao2 lock is used for join_cond. */
874  sub = ao2_t_alloc(sizeof(*sub), subscription_dtor, stasis_topic_name(topic));
875  if (!sub) {
876  return NULL;
877  }
878 
879 #ifdef AST_DEVMODE
880  ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
881  sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
882  if (ret < 0 || !sub->statistics) {
883  ao2_ref(sub, -1);
884  return NULL;
885  }
886 #else
887  ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
888  if (ret < 0) {
889  ao2_ref(sub, -1);
890  return NULL;
891  }
892 #endif
893 
894  if (needs_mailbox) {
895  char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
896 
897  /* Create name with seq number appended. */
898  ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
899  use_thread_pool ? 'p' : 'm',
900  stasis_topic_name(topic));
901 
902  /*
903  * With a small number of subscribers, a thread-per-sub is
904  * acceptable. For a large number of subscribers, a thread
905  * pool should be used.
906  */
907  if (use_thread_pool) {
908  sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
909  } else {
911  }
912  if (!sub->mailbox) {
913  ao2_ref(sub, -1);
914 
915  return NULL;
916  }
918  /* Taskprocessor has a reference */
919  ao2_ref(sub, +1);
920  }
921 
922  ao2_ref(topic, +1);
923  sub->topic = topic;
924  sub->callback = callback;
925  sub->data = data;
926  ast_cond_init(&sub->join_cond, NULL);
929  sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
930 
931  if (topic_add_subscription(topic, sub) != 0) {
932  ao2_ref(sub, -1);
933  ao2_ref(topic, -1);
934 
935  return NULL;
936  }
937  send_subscription_subscribe(topic, sub);
938 
939  return sub;
940 }
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
struct stasis_subscription::@400 accepted_message_types
struct stasis_topic * topic
Definition: stasis.c:684
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
ast_cond_t join_cond
Definition: stasis.c:693
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
Definition: taskprocessor.h:61
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
struct ast_taskprocessor * mailbox
Definition: stasis.c:686
int subscriber_id
Definition: stasis.c:383
enum stasis_subscription_message_formatters accepted_formatters
Definition: stasis.c:704
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
Add a subscriber to a topic.
Definition: stasis.c:1201
stasis_subscription_cb callback
Definition: stasis.c:688
enum stasis_subscription_message_filter filter
Definition: stasis.c:706
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
static struct ast_threadpool * threadpool
Definition: stasis.c:307