41 #define TECH_WILDCARD "__AST_ALL_TECH"
46 #define ENDPOINTS_NUM_BUCKETS 127
81 static
void application_tuple_dtor(
void *obj)
85 ao2_cleanup(tuple->
pvt);
92 size_t size =
sizeof(*tuple) + strlen(app_name) + 1;
94 ast_assert(callback != NULL);
109 static void message_subscription_dtor(
void *obj)
126 size_t size =
sizeof(*sub) + strlen(token) + 1;
132 strcpy(sub->
token, token);
164 const char *right_key = arg;
169 right_key = object_right->
token;
172 cmp = strcmp(object_left->
token, right_key);
179 cmp = strncmp(object_left->
token, right_key, strlen(right_key));
200 static void msg_to_endpoint(
const struct ast_msg *msg,
char *buf,
size_t len)
205 ast_strlen_zero(endpoint) ?
"" :
"/",
213 static int has_destination_cb(
const struct ast_msg *msg)
219 msg_to_endpoint(msg, buf,
sizeof(buf));
230 || !strncasecmp(sub->
token, buf, strlen(sub->
token))
231 || !strncasecmp(sub->
token, buf, strlen(sub->
token))) {
245 ast_debug(1,
"No subscription found for %s\n", buf);
287 "variables", json_vars);
296 ast_debug(3,
"Dispatching message to subscription %s for endpoint %s\n",
302 tuple->
callback(endpoint_name, json_msg, tuple->
pvt);
306 static int handle_msg_cb(
struct ast_msg *msg)
314 const char *endpoint_name;
317 msg_to_endpoint(msg, buf,
sizeof(buf));
319 json_msg = msg_to_json(msg);
335 || !strncasecmp(sub->
token, buf, strlen(sub->
token))) {
337 matching_subscriptions[j++] = sub;
345 matching_subscriptions[j++] = sub;
349 for (i = 0; i < j; i++) {
350 sub = matching_subscriptions[i];
352 dispatch_message(sub, endpoint_name, json_msg);
364 .handle_msg = handle_msg_cb,
365 .has_destination = has_destination_cb,
370 return !strcmp(sub->
token, key) ? 1 : 0;
373 static int application_tuple_cmp(
struct application_tuple *item,
const char *key)
375 return !strcmp(item->
app_name, key) ? 1 : 0;
386 if (tuple && !strcmp(tuple->
app_name, app_name)) {
429 sub = get_subscription(endpoint);
435 if (!is_app_subscribed(sub, app_name)) {
454 ast_debug(3,
"App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ?
ast_endpoint_get_id(endpoint) :
"-- ALL --");
478 ao2_link(endpoint_subscriptions, sub);
498 sub = get_or_create_subscription(endpoint);
504 if (is_app_subscribed(sub, app_name)) {
509 tuple = application_tuple_alloc(app_name, callback, pvt);
532 ao2_ref(endpoint_subscriptions, -1);
544 if (!endpoint_subscriptions) {
549 ao2_ref(endpoint_subscriptions, -1);
554 ao2_ref(endpoint_subscriptions, -1);
560 ao2_ref(endpoint_subscriptions, -1);
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
int ast_msg_handler_unregister(const struct ast_msg_handler *handler)
Unregister a ast_msg_handler.
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
const char * ast_msg_get_tech(const struct ast_msg *msg)
Retrieve the technology associated with this message.
struct ast_json * ast_json_pack(char const *format,...)
Helper for creating complex JSON values.
const char * ast_msg_get_endpoint(const struct ast_msg *msg)
Retrieve the endpoint associated with this message.
#define AST_VECTOR_REMOVE_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison.
The arg parameter is a search key, but is not an object.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
An external processor of received messages.
static int message_subscription_compare_cb(void *obj, void *arg, int flags)
const char * ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
Gets the technology of the given endpoint.
int ast_msg_handler_register(const struct ast_msg_handler *handler)
Register a ast_msg_handler.
message_received_cb callback
#define ast_rwlock_init(rwlock)
wrapper for rwlock with tracking enabled
int messaging_cleanup(void)
Tidy up the messaging layer.
#define TECH_WILDCARD
Subscription to all technologies.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Storage object for an application.
static ast_rwlock_t tech_subscriptions_lock
RWLock for tech_subscriptions.
static struct @498 tech_subscriptions
The subscriptions to technologies.
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
Out-of-call text message support.
const char * ast_msg_get_body(const struct ast_msg *msg)
Get the body of a message.
int ast_msg_var_iterator_next_received(const struct ast_msg *msg, struct ast_msg_var_iterator *iter, const char **name, const char **value)
Get the next variable name and value that was set on a received message.
int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
Subscribe an application to an endpoint for messages.
void ast_msg_var_iterator_destroy(struct ast_msg_var_iterator *iter)
Destroy a message variable iterator.
const char * ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
Gets the resource name of the given endpoint.
int ast_json_object_set(struct ast_json *object, const char *key, struct ast_json *value)
Set a field in a JSON object.
const char * ast_msg_get_to(const struct ast_msg *msg)
Retrieve the destination of this message.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
static struct ao2_container * endpoint_subscriptions
The subscriptions to endpoints.
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
void ast_msg_var_unref_current(struct ast_msg_var_iterator *iter)
Unref a message var from inside an iterator loop.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
int(* message_received_cb)(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
Callback handler for when a message is received from the core.
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_VECTOR(name, type)
Define a vector structure.
int messaging_init(void)
Initialize the messaging layer.
#define ast_test_suite_event_notify(s, f,...)
static int message_subscription_hash_cb(const void *obj, const int flags)
#define ao2_unlink(container, obj)
Remove an object from a container.
const char * ast_msg_get_from(const struct ast_msg *msg)
Retrieve the source of this message.
const char * app_name(struct ast_app *app)
struct message_subscription::@499 applications
A subscription to some endpoint or technology.
Vector container support.
const char * name
Name of the message handler.
struct ast_json * ast_json_object_create(void)
Create a new JSON object.
The arg parameter is an object of the same type.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Structure for rwlock and tracking information.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
Subscribe for messages from a particular endpoint.
Stasis out-of-call text message support.
Abstract JSON element (object, array, string, int, ...).
#define ENDPOINTS_NUM_BUCKETS
Number of buckets for the endpoint_subscriptions container.
Search option field mask.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
static force_inline int attribute_pure ast_str_hash(const char *str)
Compute a hash value on a string.
#define ao2_link(container, obj)
Add an object to a container.
struct ast_msg_var_iterator * ast_msg_var_iterator_init(const struct ast_msg *msg)
Create a new message variable iterator.