Asterisk - The Open Source Telephony Project  21.4.1
Enumerations | Functions
res/stasis/app.h File Reference

Internal API for the Stasis application controller. More...

#include "asterisk/channel.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_app.h"

Go to the source code of this file.

Enumerations

enum  stasis_app_subscription_model { STASIS_APP_SUBSCRIBE_MANUAL, STASIS_APP_SUBSCRIBE_ALL }
 

Functions

struct stasis_appapp_create (const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
 Create a res_stasis application. More...
 
void app_deactivate (struct stasis_app *app)
 Deactivates an application. More...
 
char * app_get_replace_channel_app (struct ast_channel *chan)
 Get the app that the replacement channel will be controlled by. More...
 
int app_is_active (struct stasis_app *app)
 Checks whether an app is active. More...
 
int app_is_finished (struct stasis_app *app)
 Checks whether a deactivated app has no channels. More...
 
int app_is_subscribed_bridge_id (struct stasis_app *app, const char *bridge_id)
 Test if an app is subscribed to a bridge. More...
 
int app_is_subscribed_channel_id (struct stasis_app *app, const char *channel_id)
 Test if an app is subscribed to a channel. More...
 
int app_is_subscribed_endpoint_id (struct stasis_app *app, const char *endpoint_id)
 Test if an app is subscribed to a endpoint. More...
 
void app_send (struct stasis_app *app, struct ast_json *message)
 Send a message to an application. More...
 
int app_send_end_msg (struct stasis_app *app, struct ast_channel *chan)
 Send StasisEnd message to the listening app. More...
 
int app_set_replace_channel_app (struct ast_channel *chan, const char *replace_app)
 Set the app that the replacement channel will be controlled by. More...
 
int app_set_replace_channel_snapshot (struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
 Set the snapshot of the channel that this channel will replace. More...
 
void app_shutdown (struct stasis_app *app)
 Tears down an application. More...
 
int app_subscribe_bridge (struct stasis_app *app, struct ast_bridge *bridge)
 Add a bridge subscription to an existing channel subscription. More...
 
int app_subscribe_channel (struct stasis_app *app, struct ast_channel *chan)
 Subscribes an application to a channel. More...
 
int app_subscribe_endpoint (struct stasis_app *app, struct ast_endpoint *endpoint)
 Subscribes an application to a endpoint. More...
 
struct ast_jsonapp_to_json (const struct stasis_app *app)
 Create a JSON representation of a stasis_app. More...
 
int app_unsubscribe_bridge (struct stasis_app *app, struct ast_bridge *bridge)
 Cancel the bridge subscription for an application. More...
 
int app_unsubscribe_bridge_id (struct stasis_app *app, const char *bridge_id)
 Cancel the subscription an app has for a bridge. More...
 
int app_unsubscribe_channel (struct stasis_app *app, struct ast_channel *chan)
 Cancel the subscription an app has for a channel. More...
 
int app_unsubscribe_channel_id (struct stasis_app *app, const char *channel_id)
 Cancel the subscription an app has for a channel. More...
 
int app_unsubscribe_endpoint_id (struct stasis_app *app, const char *endpoint_id)
 Cancel the subscription an app has for a endpoint. More...
 
void app_update (struct stasis_app *app, stasis_app_cb handler, void *data)
 Update the handler and data for a res_stasis application. More...
 

Detailed Description

Internal API for the Stasis application controller.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m
Since
12

Definition in file res/stasis/app.h.

Enumeration Type Documentation

Enumerator
STASIS_APP_SUBSCRIBE_MANUAL 

An application must manually subscribe to each resource that it cares about. This is the default approach.

STASIS_APP_SUBSCRIBE_ALL 

An application is automatically subscribed to all resources in Asterisk, even if it does not control them.

Definition at line 39 of file res/stasis/app.h.

39  {
40  /*!
41  * \brief An application must manually subscribe to each
42  * resource that it cares about. This is the default approach.
43  */
45  /*!
46  * \brief An application is automatically subscribed to all
47  * resources in Asterisk, even if it does not control them.
48  */
50 };
An application must manually subscribe to each resource that it cares about. This is the default appr...
An application is automatically subscribed to all resources in Asterisk, even if it does not control ...

Function Documentation

struct stasis_app* app_create ( const char *  name,
stasis_app_cb  handler,
void *  data,
enum stasis_app_subscription_model  subscription_model 
)

Create a res_stasis application.

Parameters
nameName of the application.
handlerCallback for messages sent to the application.
dataData pointer provided to the callback.
subscription_model
Returns
New res_stasis application.
Return values
NULLon error.

Definition at line 915 of file res/stasis/app.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_bump, AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, ao2_container_alloc_rbtree, ao2_ref, ast_add_extension(), ast_asprintf, ast_bridge_topic_all(), ast_channel_snapshot_type(), ast_context_find(), ast_context_find_or_create(), ast_endpoint_snapshot_type(), ast_free_ptr(), ast_strdup, stasis_app::bridge_router, stasis_app::data, stasis_app::forwards, stasis_app::handler, stasis_app::name, RAII_VAR, stasis_app::router, stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_set_formatters_default(), stasis_subscription_change_type(), STASIS_SUBSCRIPTION_FORMATTER_JSON, stasis_topic_create(), stasis_app::subscription_model, and stasis_app::topic.

916 {
917  RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
918  size_t size;
919  int res = 0;
920  size_t context_size = strlen("stasis-") + strlen(name) + 1;
921  char context_name[context_size];
922  char *topic_name;
923  int ret;
924 
925  ast_assert(name != NULL);
926  ast_assert(handler != NULL);
927 
928  ast_verb(1, "Creating Stasis app '%s'\n", name);
929 
930  size = sizeof(*app) + strlen(name) + 1;
931  app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
932  if (!app) {
933  return NULL;
934  }
935  app->subscription_model = subscription_model;
936 
939  forwards_sort, NULL);
940  if (!app->forwards) {
941  return NULL;
942  }
943 
944  ret = ast_asprintf(&topic_name, "ari:application/%s", name);
945  if (ret < 0) {
946  return NULL;
947  }
948 
949  app->topic = stasis_topic_create(topic_name);
950  ast_free(topic_name);
951  if (!app->topic) {
952  return NULL;
953  }
954 
955  app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
956  if (!app->bridge_router) {
957  return NULL;
958  }
959 
960  res |= stasis_message_router_add(app->bridge_router,
961  ast_bridge_merge_message_type(), bridge_merge_handler, app);
962 
963  res |= stasis_message_router_add(app->bridge_router,
964  ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
965 
966  res |= stasis_message_router_add(app->bridge_router,
967  ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
968 
969  res |= stasis_message_router_add(app->bridge_router,
970  stasis_subscription_change_type(), bridge_subscription_change_handler, app);
971 
972  if (res != 0) {
973  return NULL;
974  }
975  /* Bridge router holds a reference */
976  ao2_ref(app, +1);
977 
978  app->router = stasis_message_router_create(app->topic);
979  if (!app->router) {
980  return NULL;
981  }
982 
983  res |= stasis_message_router_add(app->router,
984  ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
985 
986  res |= stasis_message_router_add(app->router,
987  ast_channel_snapshot_type(), sub_channel_update_handler, app);
988 
989  res |= stasis_message_router_add_cache_update(app->router,
990  ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
991 
992  res |= stasis_message_router_add(app->router,
993  stasis_subscription_change_type(), sub_subscription_change_handler, app);
994 
996  sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
997 
998  if (res != 0) {
999  return NULL;
1000  }
1001  /* Router holds a reference */
1002  ao2_ref(app, +1);
1003 
1004  strncpy(app->name, name, size - sizeof(*app));
1005  app->handler = handler;
1006  app->data = ao2_bump(data);
1007 
1008  /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
1009  * this should only be done if a context does not already exist. */
1010  strcpy(context_name, "stasis-");
1011  strcat(context_name, name);
1012  if (!ast_context_find(context_name)) {
1013  if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
1014  ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
1015  } else {
1016  ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
1017  ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
1018  }
1019  } else {
1020  ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
1021  context_name, name);
1022  }
1023 
1024  ao2_ref(app, +1);
1025  return app;
1026 }
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
struct stasis_topic * ast_bridge_topic_all(void)
A topic which publishes the events for all bridges.
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
void ast_free_ptr(void *ptr)
free() wrapper
Definition: main/astmm.c:1739
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
struct ast_context * ast_context_find(const char *name)
Find a context.
Definition: extconf.c:4172
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
int ast_add_extension(const char *context, int replace, const char *extension, int priority, const char *label, const char *callerid, const char *application, void *data, void(*datad)(void *), const char *registrar)
Add and extension to an extension context.
Definition: pbx.c:6928
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
#define ao2_container_alloc_rbtree(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a red-black tree container.
Definition: astobj2.h:1349
struct ast_context * ast_context_find_or_create(struct ast_context **extcontexts, struct ast_hashtab *exttable, const char *name, const char *registrar)
Register a new context or find an existing one.
Definition: pbx.c:6149
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
Reject duplicate objects in container.
Definition: astobj2.h:1201
void app_deactivate ( struct stasis_app app)

Deactivates an application.

Any channels currently in the application remain active (since the app might come back), but new channels are rejected.

Parameters
appApplication to deactivate.

Definition at line 1061 of file res/stasis/app.c.

References stasis_app::data, stasis_app::handler, and stasis_app::name.

Referenced by stasis_app_unregister().

1062 {
1063  ao2_lock(app);
1064 
1065  ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
1066  app->handler = NULL;
1067  ao2_cleanup(app->data);
1068  app->data = NULL;
1069 
1070  ao2_unlock(app);
1071 }
stasis_app_cb handler
char* app_get_replace_channel_app ( struct ast_channel chan)

Get the app that the replacement channel will be controlled by.

Parameters
chanThe channel on which this will be set
Return values
NULLon error
Returns
the name of the controlling app (must be ast_free()d)

Definition at line 970 of file res_stasis.c.

971 {
972  struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
973  char *replace_channel_app;
974 
975  if (!replace) {
976  return NULL;
977  }
978 
979  replace_channel_app = replace->app;
980  replace->app = NULL;
981 
982  return replace_channel_app;
983 }
int app_is_active ( struct stasis_app app)

Checks whether an app is active.

Parameters
appApplication to check.
Return values
True(non-zero) if app is active.
False(zero) if app has been deactivated.

Definition at line 1089 of file res/stasis/app.c.

References stasis_app::handler.

Referenced by stasis_app_exec().

1090 {
1091  int ret;
1092 
1093  ao2_lock(app);
1094  ret = app->handler != NULL;
1095  ao2_unlock(app);
1096 
1097  return ret;
1098 }
stasis_app_cb handler
int app_is_finished ( struct stasis_app app)

Checks whether a deactivated app has no channels.

Parameters
appApplication to check.
Return values
True(non-zero) if app is deactivated, and has no associated channels.
False(zero) otherwise.

Definition at line 1100 of file res/stasis/app.c.

References ao2_container_count(), stasis_app::forwards, and stasis_app::handler.

Referenced by app_shutdown().

1101 {
1102  int ret;
1103 
1104  ao2_lock(app);
1105  ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
1106  ao2_unlock(app);
1107 
1108  return ret;
1109 }
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
stasis_app_cb handler
struct ao2_container * forwards
int app_is_subscribed_bridge_id ( struct stasis_app app,
const char *  bridge_id 
)

Test if an app is subscribed to a bridge.

Parameters
appSubscribing application.
bridge_idId of bridge to check.
Return values
True(non-zero) if bridge is subscribed to app.
False(zero) if bridge is not subscribed.

Definition at line 1499 of file res/stasis/app.c.

References stasis_app::forwards, and OBJ_SEARCH_KEY.

1500 {
1501  struct app_forwards *forwards;
1502 
1503  if (ast_strlen_zero(bridge_id)) {
1504  bridge_id = BRIDGE_ALL;
1505  }
1506 
1507  forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
1508  ao2_cleanup(forwards);
1509 
1510  return forwards != NULL;
1511 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
struct ao2_container * forwards
int app_is_subscribed_channel_id ( struct stasis_app app,
const char *  channel_id 
)

Test if an app is subscribed to a channel.

Parameters
appSubscribing application.
channel_idId of channel to check.
Return values
True(non-zero) if channel is subscribed to app.
False(zero) if channel is not subscribed.

Definition at line 1397 of file res/stasis/app.c.

References stasis_app::forwards, and OBJ_SEARCH_KEY.

1398 {
1399  struct app_forwards *forwards;
1400 
1401  if (ast_strlen_zero(channel_id)) {
1402  channel_id = CHANNEL_ALL;
1403  }
1404  forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
1405  ao2_cleanup(forwards);
1406 
1407  return forwards != NULL;
1408 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
struct ao2_container * forwards
int app_is_subscribed_endpoint_id ( struct stasis_app app,
const char *  endpoint_id 
)

Test if an app is subscribed to a endpoint.

Parameters
appSubscribing application.
endpoint_idId of endpoint to check.
Return values
True(non-zero) if endpoint is subscribed to app.
False(zero) if endpoint is not subscribed.

Definition at line 1596 of file res/stasis/app.c.

References stasis_app::forwards, and OBJ_SEARCH_KEY.

1597 {
1598  struct app_forwards *forwards;
1599 
1600  if (ast_strlen_zero(endpoint_id)) {
1601  endpoint_id = ENDPOINT_ALL;
1602  }
1603  forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
1604  ao2_cleanup(forwards);
1605 
1606  return forwards != NULL;
1607 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
struct ao2_container * forwards
void app_send ( struct stasis_app app,
struct ast_json message 
)

Send a message to an application.

Parameters
appApp to send the message to.
messageMessage to send.

Definition at line 1033 of file res/stasis/app.c.

References ao2_bump, ast_eid_default, ast_eid_to_str(), ast_json_object_get(), ast_json_object_set(), ast_json_string_create(), ast_json_string_get(), stasis_app::data, stasis_app::handler, and stasis_app::name.

Referenced by app_update(), stasis_app_exec(), and stasis_app_send().

1034 {
1035  stasis_app_cb handler;
1036  char eid[20];
1037  void *data;
1038 
1039  if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
1040  ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
1041  ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
1042  ast_json_string_get(ast_json_object_get(message, "type")));
1043  }
1044 
1045  /* Copy off mutable state with lock held */
1046  ao2_lock(app);
1047  handler = app->handler;
1048  data = ao2_bump(app->data);
1049  ao2_unlock(app);
1050  /* Name is immutable; no need to copy */
1051 
1052  if (handler) {
1053  handler(data, app->name, message);
1054  } else {
1055  ast_verb(3,
1056  "Inactive Stasis app '%s' missed message\n", app->name);
1057  }
1058  ao2_cleanup(data);
1059 }
char * ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
Convert an EID to a string.
Definition: utils.c:2839
stasis_app_cb handler
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
Definition: json.c:414
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
const char * ast_json_string_get(const struct ast_json *string)
Get the value of a JSON string.
Definition: json.c:283
void(* stasis_app_cb)(void *data, const char *app_name, struct ast_json *message)
Callback for Stasis application handler.
Definition: stasis_app.h:67
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
int app_send_end_msg ( struct stasis_app app,
struct ast_channel chan 
)

Send StasisEnd message to the listening app.

Parameters
appThe app that owns the channel
chanThe channel for which the message is being sent
Return values
zeroon success
Returns
non-zero on failure

Definition at line 1086 of file res_stasis.c.

References app_unsubscribe_channel(), ast_app_get_topic(), ast_channel_blob_create(), ast_json_pack(), ast_json_timeval(), ast_json_unref(), ast_tvnow(), stasis_message_sanitizer::channel, stasis_app_get_sanitizer(), stasis_app_name(), and stasis_publish().

Referenced by stasis_app_exec().

1087 {
1089  struct ast_json *blob;
1090  struct stasis_message *msg;
1091 
1092  if (sanitize && sanitize->channel
1093  && sanitize->channel(chan)) {
1094  return 0;
1095  }
1096 
1097  blob = ast_json_pack("{s: s, s: o}",
1098  "app", stasis_app_name(app),
1099  "timestamp", ast_json_timeval(ast_tvnow(), NULL)
1100  );
1101  if (!blob) {
1102  ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n");
1103  return -1;
1104  }
1105 
1106  remove_masquerade_store(chan);
1107  app_unsubscribe_channel(app, chan);
1108  msg = ast_channel_blob_create(chan, end_message_type(), blob);
1109  if (msg) {
1110  stasis_publish(ast_app_get_topic(app), msg);
1111  }
1112  ao2_cleanup(msg);
1113  ast_json_unref(blob);
1114 
1115  return 0;
1116 }
struct stasis_topic * ast_app_get_topic(struct stasis_app *app)
Returns the stasis topic for an app.
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
Cancel the subscription an app has for a channel.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
struct stasis_message_sanitizer * stasis_app_get_sanitizer(void)
Get the Stasis message sanitizer for app_stasis applications.
Definition: res_stasis.c:2271
Structure containing callbacks for Stasis message sanitization.
Definition: stasis.h:200
const char * stasis_app_name(const struct stasis_app *app)
Retrieve an application's name.
int(* channel)(const struct ast_channel *chan)
Callback which determines whether a channel should be sanitized from a message based on the channel...
Definition: stasis.h:232
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
struct stasis_message * ast_channel_blob_create(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
Creates a ast_channel_blob message.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Definition: stasis.c:1511
Abstract JSON element (object, array, string, int, ...).
int app_set_replace_channel_app ( struct ast_channel chan,
const char *  replace_app 
)

Set the app that the replacement channel will be controlled by.

Parameters
chanThe channel on which this will be set
replace_appThe app that will be controlling this channel
Return values
zerosuccess
non-zerofailure

Definition at line 934 of file res_stasis.c.

References ast_strdup.

935 {
936  struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
937 
938  if (!replace) {
939  return -1;
940  }
941 
942  ast_free(replace->app);
943  replace->app = NULL;
944 
945  if (replace_app) {
946  replace->app = ast_strdup(replace_app);
947  if (!replace->app) {
948  return -1;
949  }
950  }
951 
952  return 0;
953 }
#define ast_strdup(str)
A wrapper for strdup()
Definition: astmm.h:241
int app_set_replace_channel_snapshot ( struct ast_channel chan,
struct ast_channel_snapshot replace_snapshot 
)

Set the snapshot of the channel that this channel will replace.

Parameters
chanThe channel on which this will be set
replace_snapshotThe snapshot of the channel that is being replaced
Return values
zerosuccess
non-zerofailure

Definition at line 922 of file res_stasis.c.

References ao2_replace.

923 {
924  struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
925 
926  if (!replace) {
927  return -1;
928  }
929 
930  ao2_replace(replace->snapshot, replace_snapshot);
931  return 0;
932 }
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition: astobj2.h:501
void app_shutdown ( struct stasis_app app)

Tears down an application.

It should be finished before calling this.

Parameters
appApplication to unsubscribe.

Definition at line 1073 of file res/stasis/app.c.

References app_is_finished(), stasis_app::bridge_router, stasis_app::endpoint_router, stasis_app::router, and stasis_message_router_unsubscribe().

1074 {
1075  ao2_lock(app);
1076 
1077  ast_assert(app_is_finished(app));
1078 
1080  app->router = NULL;
1082  app->bridge_router = NULL;
1084  app->endpoint_router = NULL;
1085 
1086  ao2_unlock(app);
1087 }
struct stasis_message_router * endpoint_router
int app_is_finished(struct stasis_app *app)
Checks whether a deactivated app has no channels.
struct stasis_message_router * router
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
struct stasis_message_router * bridge_router
int app_subscribe_bridge ( struct stasis_app app,
struct ast_bridge bridge 
)

Add a bridge subscription to an existing channel subscription.

Parameters
appApplication.
bridgeBridge to subscribe to.
Return values
0on success.
Non-zeroon error.

Definition at line 1423 of file res/stasis/app.c.

References ao2_link_flags, ao2_ref, ast_debug, stasis_app::forwards, forwards_create_bridge(), app_forwards::interested, stasis_app::name, OBJ_NOLOCK, OBJ_SEARCH_KEY, and ast_bridge::uniqueid.

Referenced by control_swap_channel_in_bridge(), and stasis_app_exec().

1424 {
1425  struct app_forwards *forwards;
1426 
1427  if (!app) {
1428  return -1;
1429  }
1430 
1431  ao2_lock(app->forwards);
1432  /* If subscribed to all, don't subscribe again */
1433  forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1434  if (forwards) {
1435  ao2_unlock(app->forwards);
1436  ao2_ref(forwards, -1);
1437 
1438  return 0;
1439  }
1440 
1441  forwards = ao2_find(app->forwards,
1442  bridge ? bridge->uniqueid : BRIDGE_ALL,
1444  if (!forwards) {
1445  int res;
1446 
1447  /* Forwards not found, create one */
1448  forwards = forwards_create_bridge(app, bridge);
1449  if (!forwards) {
1450  ao2_unlock(app->forwards);
1451 
1452  return -1;
1453  }
1454 
1455  res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1456  if (!res) {
1457  ao2_unlock(app->forwards);
1458  ao2_ref(forwards, -1);
1459 
1460  return -1;
1461  }
1462  }
1463 
1464  ++forwards->interested;
1465  ast_debug(3, "Bridge '%s' is %d interested in %s\n",
1466  bridge ? bridge->uniqueid : "ALL",
1467  forwards->interested,
1468  app->name);
1469 
1470  ao2_unlock(app->forwards);
1471  ao2_ref(forwards, -1);
1472 
1473  return 0;
1474 }
static struct app_forwards * forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge)
const ast_string_field uniqueid
Definition: bridge.h:401
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
struct ao2_container * forwards
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
int app_subscribe_channel ( struct stasis_app app,
struct ast_channel chan 
)

Subscribes an application to a channel.

Parameters
appApplication.
chanChannel to subscribe to.
Return values
0on success.
Non-zeroon error.

Definition at line 1276 of file res/stasis/app.c.

References ao2_link_flags, ao2_ref, ast_debug, stasis_app::forwards, forwards_create_channel(), app_forwards::interested, stasis_app::name, OBJ_NOLOCK, and OBJ_SEARCH_KEY.

Referenced by stasis_app_subscribe_channel().

1277 {
1278  struct app_forwards *forwards;
1279 
1280  if (!app) {
1281  return -1;
1282  }
1283 
1284  ao2_lock(app->forwards);
1285  /* If subscribed to all, don't subscribe again */
1286  forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1287  if (forwards) {
1288  ao2_unlock(app->forwards);
1289  ao2_ref(forwards, -1);
1290 
1291  return 0;
1292  }
1293 
1294  forwards = ao2_find(app->forwards,
1295  chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
1297  if (!forwards) {
1298  int res;
1299 
1300  /* Forwards not found, create one */
1301  forwards = forwards_create_channel(app, chan);
1302  if (!forwards) {
1303  ao2_unlock(app->forwards);
1304 
1305  return -1;
1306  }
1307 
1308  res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1309  if (!res) {
1310  ao2_unlock(app->forwards);
1311  ao2_ref(forwards, -1);
1312 
1313  return -1;
1314  }
1315  }
1316 
1317  ++forwards->interested;
1318  ast_debug(3, "Channel '%s' is %d interested in %s\n",
1319  chan ? ast_channel_uniqueid(chan) : "ALL",
1320  forwards->interested,
1321  app->name);
1322 
1323  ao2_unlock(app->forwards);
1324  ao2_ref(forwards, -1);
1325 
1326  return 0;
1327 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
struct ao2_container * forwards
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
static struct app_forwards * forwards_create_channel(struct stasis_app *app, struct ast_channel *chan)
int app_subscribe_endpoint ( struct stasis_app app,
struct ast_endpoint endpoint 
)

Subscribes an application to a endpoint.

Parameters
appApplication.
endpointEndpoint to subscribe to.
Return values
0on success.
Non-zeroon error.

Definition at line 1526 of file res/stasis/app.c.

References ao2_link_flags, ao2_ref, ast_debug, ast_endpoint_get_id(), stasis_app::forwards, forwards_create_endpoint(), app_forwards::interested, messaging_app_subscribe_endpoint(), stasis_app::name, OBJ_NOLOCK, and OBJ_SEARCH_KEY.

1527 {
1528  struct app_forwards *forwards;
1529 
1530  if (!app) {
1531  return -1;
1532  }
1533 
1534  ao2_lock(app->forwards);
1535  /* If subscribed to all, don't subscribe again */
1536  forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
1537  if (forwards) {
1538  ao2_unlock(app->forwards);
1539  ao2_ref(forwards, -1);
1540 
1541  return 0;
1542  }
1543 
1544  forwards = ao2_find(app->forwards,
1545  endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
1547  if (!forwards) {
1548  int res;
1549 
1550  /* Forwards not found, create one */
1551  forwards = forwards_create_endpoint(app, endpoint);
1552  if (!forwards) {
1553  ao2_unlock(app->forwards);
1554 
1555  return -1;
1556  }
1557 
1558  res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
1559  if (!res) {
1560  ao2_unlock(app->forwards);
1561  ao2_ref(forwards, -1);
1562 
1563  return -1;
1564  }
1565 
1566  /* Subscribe for messages */
1567  messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
1568  }
1569 
1570  ++forwards->interested;
1571  ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
1572  endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
1573  forwards->interested,
1574  app->name);
1575 
1576  ao2_unlock(app->forwards);
1577  ao2_ref(forwards, -1);
1578 
1579  return 0;
1580 }
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
Assume that the ao2_container is already locked.
Definition: astobj2.h:1063
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
Definition: astobj2.h:1554
int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
Subscribe an application to an endpoint for messages.
Definition: messaging.c:493
struct ao2_container * forwards
static struct app_forwards * forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
struct ast_json* app_to_json ( const struct stasis_app app)

Create a JSON representation of a stasis_app.

Parameters
appThe application
Returns
JSON blob on success
Return values
NULLon error

Definition at line 1223 of file res/stasis/app.c.

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_ref, ast_json_array_append(), ast_json_object_get(), ast_json_pack(), ast_json_string_create(), ast_json_unref(), bridges, endpoints, stasis_app::forwards, app_forwards::id, and stasis_app::name.

Referenced by stasis_app_object_to_json().

1224 {
1225  struct ast_json *json;
1226  struct ast_json *channels;
1227  struct ast_json *bridges;
1228  struct ast_json *endpoints;
1229  struct ao2_iterator i;
1230  struct app_forwards *forwards;
1231 
1232  json = ast_json_pack("{s: s, s: [], s: [], s: []}",
1233  "name", app->name,
1234  "channel_ids", "bridge_ids", "endpoint_ids");
1235  if (!json) {
1236  return NULL;
1237  }
1238  channels = ast_json_object_get(json, "channel_ids");
1239  bridges = ast_json_object_get(json, "bridge_ids");
1240  endpoints = ast_json_object_get(json, "endpoint_ids");
1241 
1242  i = ao2_iterator_init(app->forwards, 0);
1243  while ((forwards = ao2_iterator_next(&i))) {
1244  struct ast_json *array = NULL;
1245  int append_res;
1246 
1247  switch (forwards->forward_type) {
1248  case FORWARD_CHANNEL:
1249  array = channels;
1250  break;
1251  case FORWARD_BRIDGE:
1252  array = bridges;
1253  break;
1254  case FORWARD_ENDPOINT:
1255  array = endpoints;
1256  break;
1257  }
1258 
1259  /* If forward_type value is unexpected this will safely return an error. */
1260  append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
1261  ao2_ref(forwards, -1);
1262 
1263  if (append_res != 0) {
1264  ast_log(LOG_ERROR, "Error building response\n");
1266  ast_json_unref(json);
1267 
1268  return NULL;
1269  }
1270  }
1272 
1273  return json;
1274 }
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct ao2_container * forwards
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
Definition: json.c:278
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
int ast_json_array_append(struct ast_json *array, struct ast_json *value)
Append to an array.
Definition: json.c:378
static struct ao2_container * bridges
Definition: bridge.c:123
static struct stasis_rest_handlers endpoints
REST handler for /api-docs/endpoints.json.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
Definition: astobj2.h:1821
struct ast_json * ast_json_object_get(struct ast_json *object, const char *key)
Get a field from a JSON object.
Definition: json.c:407
Abstract JSON element (object, array, string, int, ...).
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
int app_unsubscribe_bridge ( struct stasis_app app,
struct ast_bridge bridge 
)

Cancel the bridge subscription for an application.

Parameters
appSubscribing application.
bridgeBridge to subscribe to.
Return values
0on success.
Non-zeroon error.

Definition at line 1481 of file res/stasis/app.c.

References app_unsubscribe_bridge_id(), and ast_bridge::uniqueid.

Referenced by stasis_app_exec().

1482 {
1483  if (!app) {
1484  return -1;
1485  }
1486 
1487  return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
1488 }
const ast_string_field uniqueid
Definition: bridge.h:401
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
Cancel the subscription an app has for a bridge.
int app_unsubscribe_bridge_id ( struct stasis_app app,
const char *  bridge_id 
)

Cancel the subscription an app has for a bridge.

Parameters
appSubscribing application.
bridge_idId of bridge to unsubscribe from.
Return values
0on success.
Non-zeroon error.

Definition at line 1490 of file res/stasis/app.c.

Referenced by app_unsubscribe_bridge().

1491 {
1492  if (!app) {
1493  return -1;
1494  }
1495 
1496  return unsubscribe(app, "bridge", bridge_id, 0);
1497 }
int app_unsubscribe_channel ( struct stasis_app app,
struct ast_channel chan 
)

Cancel the subscription an app has for a channel.

Parameters
appSubscribing application.
chanChannel to unsubscribe from.
Return values
0on success.
Non-zeroon error.

Definition at line 1379 of file res/stasis/app.c.

References app_unsubscribe_channel_id().

Referenced by app_send_end_msg().

1380 {
1381  if (!app) {
1382  return -1;
1383  }
1384 
1385  return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
1386 }
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
Cancel the subscription an app has for a channel.
int app_unsubscribe_channel_id ( struct stasis_app app,
const char *  channel_id 
)

Cancel the subscription an app has for a channel.

Parameters
appSubscribing application.
channel_idId of channel to unsubscribe from.
Return values
0on success.
Non-zeroon error.

Definition at line 1388 of file res/stasis/app.c.

Referenced by app_unsubscribe_channel().

1389 {
1390  if (!app) {
1391  return -1;
1392  }
1393 
1394  return unsubscribe(app, "channel", channel_id, 0);
1395 }
int app_unsubscribe_endpoint_id ( struct stasis_app app,
const char *  endpoint_id 
)

Cancel the subscription an app has for a endpoint.

Parameters
appSubscribing application.
endpoint_idId of endpoint to unsubscribe from.
Return values
0on success.
Non-zeroon error.

Definition at line 1587 of file res/stasis/app.c.

1588 {
1589  if (!app) {
1590  return -1;
1591  }
1592 
1593  return unsubscribe(app, "endpoint", endpoint_id, 0);
1594 }
void app_update ( struct stasis_app app,
stasis_app_cb  handler,
void *  data 
)

Update the handler and data for a res_stasis application.

If app has been deactivated, this will reactivate it.

Parameters
appApplication to update.
handlerNew application callback.
dataNew data pointer for the callback.

Definition at line 1111 of file res/stasis/app.c.

References ao2_replace, app_send(), ast_json_pack(), ast_json_timeval(), ast_json_unref(), ast_tvnow(), stasis_app::data, stasis_app::handler, and stasis_app::name.

1112 {
1113  ao2_lock(app);
1114  if (app->handler && app->data) {
1115  struct ast_json *msg;
1116 
1117  ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
1118 
1119  msg = ast_json_pack("{s: s, s: o?, s: s}",
1120  "type", "ApplicationReplaced",
1121  "timestamp", ast_json_timeval(ast_tvnow(), NULL),
1122  "application", app->name);
1123  if (msg) {
1124  /*
1125  * The app must be unlocked before calling 'send' since a handler may
1126  * subsequently attempt to grab the app lock after first obtaining a
1127  * lock for another object, thus causing a deadlock.
1128  */
1129  ao2_unlock(app);
1130  app_send(app, msg);
1131  ao2_lock(app);
1132  ast_json_unref(msg);
1133  if (!app->handler) {
1134  /*
1135  * If the handler disappeared then the app was deactivated. In that
1136  * case don't replace. Re-activation will reset the handler later.
1137  */
1138  ao2_unlock(app);
1139  return;
1140  }
1141  }
1142  } else {
1143  ast_verb(1, "Activating Stasis app '%s'\n", app->name);
1144  }
1145 
1146  app->handler = handler;
1147  ao2_replace(app->data, data);
1148  ao2_unlock(app);
1149 }
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
Definition: json.c:612
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Definition: json.c:73
stasis_app_cb handler
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
struct ast_json * ast_json_timeval(const struct timeval tv, const char *zone)
Construct a timeval as JSON.
Definition: json.c:670
void app_send(struct stasis_app *app, struct ast_json *message)
Send a message to an application.
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
Definition: astobj2.h:501
Abstract JSON element (object, array, string, int, ...).