33 #include "asterisk/stasis_message_router.h"
79 #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
86 #define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
88 static int route_table_remove(
struct route_table *table,
95 static int route_table_add(
struct route_table *table,
102 ast_assert(callback != NULL);
103 ast_assert(route_table_find(table, message_type) == NULL);
116 static void route_table_dtor(
struct route_table *table)
140 static void router_dtor(
void *obj)
149 route_table_dtor(&router->
routes);
153 static int find_route(
162 ast_assert(route_out != NULL);
173 route = route_table_find(&router->
routes, type);
189 static void router_dispatch(
void *data,
196 if (find_route(router, message, &route) == 0) {
206 struct stasis_topic *topic,
int use_thread_pool,
const char *file,
int lineno,
226 if (use_thread_pool) {
245 struct stasis_topic *topic,
const char *file,
int lineno,
const char *func)
247 return stasis_message_router_create_internal(topic, 0, file, lineno, func);
251 struct stasis_topic *topic,
const char *file,
int lineno,
const char *func)
253 return stasis_message_router_create_internal(topic, 1, file, lineno, func);
289 ast_assert(router != NULL);
297 long low_water,
long high_water)
303 low_water, high_water);
314 ast_assert(router != NULL);
321 res = route_table_add(&router->
routes, message_type, callback, data);
340 ast_assert(router != NULL);
347 res = route_table_add(&router->
cache_routes, message_type, callback, data);
359 ast_assert(router != NULL);
366 route_table_remove(&router->
routes, message_type);
374 ast_assert(router != NULL);
381 route_table_remove(&router->
cache_routes, message_type);
400 ast_assert(router != NULL);
401 ast_assert(callback != NULL);
410 if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
421 ast_assert(router != NULL);
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Asterisk main include file. File version handling, generic pbx functions.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
#define AST_VECTOR_REMOVE_CMP_UNORDERED(vec, value, cmp, cleanup)
Remove an element from a vector that matches the given comparison.
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, long low_water, long high_water)
Set the high and low alert water marks of the stasis message router.
struct route_table cache_routes
#define ROUTE_TABLE_ELEM_CMP(elem, value)
route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
void stasis_message_router_accept_formatters(struct stasis_message_router *router, enum stasis_subscription_message_formatters formatters)
Indicate to a message router that we are interested in messages with one or more formatters.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
void stasis_message_router_set_formatters_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data, enum stasis_subscription_message_formatters formatters)
Sets the default route of a router with formatters.
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
int stasis_message_router_is_done(struct stasis_message_router *router)
Returns whether router has received its final message.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, long low_water, long high_water)
Set the high and low alert water marks of the stasis subscription.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
#define AST_VECTOR(name, type)
Define a vector structure.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
struct route_table routes
#define ROUTE_TABLE_ELEM_CLEANUP(elem)
route_table vector element cleanup.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message_route default_route
struct stasis_message_type * type
Convenience reference to snapshot type.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a route from a message router.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
Vector container support.
struct stasis_subscription * __stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription whose callbacks occur on a thread pool.
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
void stasis_message_router_remove_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type)
Remove a cache route from a message router.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
void stasis_message_router_publish_sync(struct stasis_message_router *router, struct stasis_message *message)
Publish a message to a message router's subscription synchronously.
struct stasis_subscription * __stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func)
Create a subscription.
stasis_subscription_cb callback
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
struct stasis_subscription * subscription
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_message_type * message_type
stasis_subscription_message_formatters
Stasis subscription formatter filters.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.