Asterisk - The Open Source Telephony Project  21.4.1
Data Structures | Macros | Functions
stasis_message_router.c File Reference

Stasis message router implementation. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/vector.h"

Go to the source code of this file.

Data Structures

struct  route_table
 
struct  stasis_message_route
 
struct  stasis_message_router
 

Macros

#define ROUTE_TABLE_ELEM_CLEANUP(elem)   ao2_cleanup((elem).message_type)
 route_table vector element cleanup. More...
 
#define ROUTE_TABLE_ELEM_CMP(elem, value)   ((elem).message_type == (value))
 route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED() More...
 

Functions

struct stasis_message_router__stasis_message_router_create (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 
struct stasis_message_router__stasis_message_router_create_pool (struct stasis_topic *topic, const char *file, int lineno, const char *func)
 
static int find_route (struct stasis_message_router *router, struct stasis_message *message, struct stasis_message_route *route_out)
 
static int route_table_add (struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 
static void route_table_dtor (struct route_table *table)
 
static struct stasis_message_routeroute_table_find (struct route_table *table, struct stasis_message_type *message_type)
 
static int route_table_remove (struct route_table *table, struct stasis_message_type *message_type)
 
static void router_dispatch (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
static void router_dtor (void *obj)
 
void stasis_message_router_accept_formatters (struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
 Indicate to a message router that we are interested in messages with one or more formatters. More...
 
int stasis_message_router_add (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route to a message router. More...
 
int stasis_message_router_add_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
 Add a route for stasis_cache_update messages to a message router. More...
 
static struct stasis_message_routerstasis_message_router_create_internal (struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, const char *func)
 
int stasis_message_router_is_done (struct stasis_message_router *router)
 Returns whether router has received its final message. More...
 
void stasis_message_router_publish_sync (struct stasis_message_router *router, struct stasis_message *message)
 Publish a message to a message router's subscription synchronously. More...
 
void stasis_message_router_remove (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a route from a message router. More...
 
void stasis_message_router_remove_cache_update (struct stasis_message_router *router, struct stasis_message_type *message_type)
 Remove a cache route from a message router. More...
 
int stasis_message_router_set_congestion_limits (struct stasis_message_router *router, long low_water, long high_water)
 Set the high and low alert water marks of the stasis message router. More...
 
int stasis_message_router_set_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
 Sets the default route of a router. More...
 
void stasis_message_router_set_formatters_default (struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
 Sets the default route of a router with formatters. More...
 
void stasis_message_router_unsubscribe (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic. More...
 
void stasis_message_router_unsubscribe_and_join (struct stasis_message_router *router)
 Unsubscribe the router from the upstream topic, blocking until the final message has been processed. More...
 

Detailed Description

Stasis message router implementation.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_message_router.c.

Macro Definition Documentation

#define ROUTE_TABLE_ELEM_CLEANUP (   elem)    ao2_cleanup((elem).message_type)

route_table vector element cleanup.

Parameters
elemElement to cleanup

Definition at line 86 of file stasis_message_router.c.

#define ROUTE_TABLE_ELEM_CMP (   elem,
  value 
)    ((elem).message_type == (value))

route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()

Parameters
elemElement to compare against
valueValue to compare with the vector element.
Returns
0 if element does not match.
Non-zero if element matches.

Definition at line 79 of file stasis_message_router.c.

Function Documentation

void stasis_message_router_accept_formatters ( struct stasis_message_router router,
enum stasis_subscription_message_formatters  formatters 
)

Indicate to a message router that we are interested in messages with one or more formatters.

The formatters are passed on to the underlying subscription.

Warning
With direct subscriptions, adding a formatter filter is an OR operation with any message type filters. In the current implementation of message router however, it's an AND operation. Even when setting a default route, the callback will only get messages that have the formatters provides in this call.
Parameters
routerRouter to set the formatters of.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.25.0
16.2.0

Definition at line 418 of file stasis_message_router.c.

References stasis_subscription_accept_formatters(), and stasis_message_router::subscription.

420 {
421  ast_assert(router != NULL);
422 
424 
425  return;
426 }
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
struct stasis_subscription * subscription
int stasis_message_router_add ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route to a message router.

A particular message_type may have at most one route per router. If you route stasis_cache_update messages, the callback will only receive updates for types not handled by routes added with stasis_message_router_add_cache_update().

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeType of message to route.
callbackCallback to forward messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 308 of file stasis_message_router.c.

References stasis_message_router::routes, stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), create_routes(), forwards_create_endpoint(), load_module(), manager_bridging_init(), manager_channels_init(), manager_confbridge_init(), manager_endpoints_init(), manager_mwi_init(), manager_subscriptions_init(), and pjsip_outbound_registration_metrics_init().

311 {
312  int res;
313 
314  ast_assert(router != NULL);
315 
316  if (!message_type) {
317  /* Cannot route to NULL type. */
318  return -1;
319  }
320  ao2_lock(router);
321  res = route_table_add(&router->routes, message_type, callback, data);
322  if (!res) {
324  /* Until a specific message type was added we would already drop the message, so being
325  * selective now doesn't harm us. If we have a default route then we are already forced
326  * to filter nothing and messages will come in regardless.
327  */
329  }
330  ao2_unlock(router);
331  return res;
332 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
struct stasis_subscription * subscription
int stasis_message_router_add_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type,
stasis_subscription_cb  callback,
void *  data 
)

Add a route for stasis_cache_update messages to a message router.

A particular message_type may have at most one cache route per router. These are distinct from regular routes, so one could have both a regular route and a cache route for the same message_type.

Adding multiple routes for the same message type results in undefined behavior.

Parameters
routerRouter to add the route to.
message_typeSubtype of cache update to route.
callbackCallback to forward messages of message_type to.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12

Definition at line 334 of file stasis_message_router.c.

References stasis_message_router::cache_routes, stasis_cache_update_type(), stasis_subscription_accept_message_type(), STASIS_SUBSCRIPTION_FILTER_SELECTIVE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create().

337 {
338  int res;
339 
340  ast_assert(router != NULL);
341 
342  if (!message_type) {
343  /* Cannot cache a route to NULL type. */
344  return -1;
345  }
346  ao2_lock(router);
347  res = route_table_add(&router->cache_routes, message_type, callback, data);
348  if (!res) {
351  }
352  ao2_unlock(router);
353  return res;
354 }
struct route_table cache_routes
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
struct stasis_subscription * subscription
int stasis_message_router_is_done ( struct stasis_message_router router)

Returns whether router has received its final message.

Parameters
routerRouter.
Return values
True(non-zero) if stasis_subscription_final_message() has been received.
False(zero) if waiting for the end.

Definition at line 276 of file stasis_message_router.c.

References stasis_subscription_is_done(), and stasis_message_router::subscription.

277 {
278  if (!router) {
279  /* Null router is about as done as you can get */
280  return 1;
281  }
282 
284 }
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
Definition: stasis.c:1118
struct stasis_subscription * subscription
void stasis_message_router_publish_sync ( struct stasis_message_router router,
struct stasis_message message 
)

Publish a message to a message router's subscription synchronously.

Parameters
routerRouter
messageThe Stasis Message Bus API message

This should be used when a message needs to be published synchronously to the underlying subscription created by a message router. This is analagous to stasis_publish_sync.

Note that the caller will be blocked until the thread servicing the message on the message router's subscription completes handling of the message.

Since
12.1.0

Definition at line 286 of file stasis_message_router.c.

References ao2_bump, stasis_publish_sync(), and stasis_message_router::subscription.

Referenced by ast_cdr_engine_term().

288 {
289  ast_assert(router != NULL);
290 
291  ao2_bump(router);
292  stasis_publish_sync(router->subscription, message);
293  ao2_cleanup(router);
294 }
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
Definition: stasis.c:1516
struct stasis_subscription * subscription
void stasis_message_router_remove ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 356 of file stasis_message_router.c.

References stasis_message_router::routes.

358 {
359  ast_assert(router != NULL);
360 
361  if (!message_type) {
362  /* Cannot remove a NULL type. */
363  return;
364  }
365  ao2_lock(router);
366  route_table_remove(&router->routes, message_type);
367  ao2_unlock(router);
368 }
void stasis_message_router_remove_cache_update ( struct stasis_message_router router,
struct stasis_message_type message_type 
)

Remove a cache route from a message router.

If a route is removed from another thread, there is no notification that all messages using this route have been processed. This typically means that the associated data pointer for this route must be kept until the route itself is disposed of.

Parameters
routerRouter to remove the route from.
message_typeType of message to route.
Since
12

Definition at line 370 of file stasis_message_router.c.

References stasis_message_router::cache_routes.

373 {
374  ast_assert(router != NULL);
375 
376  if (!message_type) {
377  /* Cannot remove a NULL type. */
378  return;
379  }
380  ao2_lock(router);
381  route_table_remove(&router->cache_routes, message_type);
382  ao2_unlock(router);
383 }
struct route_table cache_routes
int stasis_message_router_set_congestion_limits ( struct stasis_message_router router,
long  low_water,
long  high_water 
)

Set the high and low alert water marks of the stasis message router.

Since
13.10.0
Parameters
routerPointer to a stasis message router
low_waterNew queue low water mark. (-1 to set as 90% of high_water)
high_waterNew queue high water mark.
Return values
0on success.
-1on error (water marks not changed).

Definition at line 296 of file stasis_message_router.c.

References stasis_subscription_set_congestion_limits(), and stasis_message_router::subscription.

Referenced by create_routes(), and manager_subscriptions_init().

298 {
299  int res = -1;
300 
301  if (router) {
303  low_water, high_water);
304  }
305  return res;
306 }
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
Definition: stasis.c:1011
struct stasis_subscription * subscription
int stasis_message_router_set_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data 
)

Sets the default route of a router.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
Return values
0on success
-1on failure
Since
12
Note
Setting a default callback will automatically cause the underlying subscription to receive all messages and not be filtered. If filtering is desired then a specific route for each message type should be provided.

Definition at line 385 of file stasis_message_router.c.

References stasis_message_router_set_formatters_default().

388 {
389  stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE);
390 
391  /* While this implementation can never fail, it used to be able to */
392  return 0;
393 }
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
void stasis_message_router_set_formatters_default ( struct stasis_message_router router,
stasis_subscription_cb  callback,
void *  data,
enum stasis_subscription_message_formatters  formatters 
)

Sets the default route of a router with formatters.

Parameters
routerRouter to set the default route of.
callbackCallback to forward messages which otherwise have no home.
dataData pointer to pass to callback.
formattersA bitmap of stasis_subscription_message_formatters we wish to receive.
Since
13.26.0
16.3.0
Note
If formatters are specified then the message router will remain in a selective filtering state. Any explicit routes will receive messages of their message type and the default callback will only receive messages that have one of the given formatters. Explicit routes will not be filtered according to the given formatters.

Definition at line 395 of file stasis_message_router.c.

References stasis_message_route::callback, stasis_message_route::data, stasis_message_router::default_route, stasis_subscription_accept_formatters(), STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, stasis_subscription_set_filter(), and stasis_message_router::subscription.

Referenced by app_create(), manager_subscriptions_init(), and stasis_message_router_set_default().

399 {
400  ast_assert(router != NULL);
401  ast_assert(callback != NULL);
402 
404 
405  ao2_lock(router);
406  router->default_route.callback = callback;
407  router->default_route.data = data;
408  ao2_unlock(router);
409 
410  if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
411  /* Formatters govern what messages the default callback get, so it is only if none is
412  * specified that we accept all messages regardless.
413  */
415  }
416 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
Definition: stasis.c:1093
struct stasis_message_route default_route
stasis_subscription_cb callback
struct stasis_subscription * subscription
void stasis_message_router_unsubscribe ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic.

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 256 of file stasis_message_router.c.

References stasis_unsubscribe(), and stasis_message_router::subscription.

Referenced by app_shutdown(), ast_endpoint_shutdown(), and manager_confbridge_shutdown().

257 {
258  if (!router) {
259  return;
260  }
261 
262  ao2_lock(router);
263  router->subscription = stasis_unsubscribe(router->subscription);
264  ao2_unlock(router);
265 }
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971
struct stasis_subscription * subscription
void stasis_message_router_unsubscribe_and_join ( struct stasis_message_router router)

Unsubscribe the router from the upstream topic, blocking until the final message has been processed.

See stasis_unsubscribe_and_join() for info on when to use this vs. stasis_message_router_unsubscribe().

Parameters
routerRouter to unsubscribe.
Since
12

Definition at line 267 of file stasis_message_router.c.

References stasis_unsubscribe_and_join(), and stasis_message_router::subscription.

269 {
270  if (!router) {
271  return;
272  }
274 }
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Definition: stasis.c:1134
struct stasis_subscription * subscription