51 static void all_dtor(
void *obj)
55 ao2_cleanup(all->topic);
57 ao2_cleanup(all->topic_cached);
58 all->topic_cached = NULL;
59 ao2_cleanup(all->cache);
61 stasis_forward_cancel(all->forward_all_to_cached);
62 all->forward_all_to_cached = NULL;
68 char *cached_name = NULL;
72 all = ao2_t_alloc(
sizeof(*all), all_dtor, name);
86 ast_free(cached_name);
88 all->forward_all_to_cached =
91 if (!all->topic || !all->topic_cached || !all->cache ||
92 !all->forward_all_to_cached) {
115 return all->topic_cached;
126 static void one_dtor(
void *obj)
131 ast_assert(one->topic_cached == NULL);
132 ast_assert(one->forward_topic_to_all == NULL);
133 ast_assert(one->forward_cached_to_all == NULL);
135 ao2_cleanup(one->topic);
153 if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
167 one = ao2_t_alloc(
sizeof(*one), one_dtor, name);
180 if (!one->topic_cached) {
195 stasis_forward_cancel(one->forward_topic_to_all);
196 one->forward_topic_to_all = NULL;
197 stasis_forward_cancel(one->forward_cached_to_all);
198 one->forward_cached_to_all = NULL;
200 one->topic_cached = NULL;
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
Caching pattern for Stasis Message Bus API topics.
Asterisk main include file. File version handling, generic pbx functions.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
struct stasis_cp_single * stasis_cp_single_create(struct stasis_cp_all *all, const char *name)
Create the 'one' side of the cache pattern.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
struct stasis_topic * stasis_cp_all_topic(struct stasis_cp_all *all)
Get the aggregate topic.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
struct stasis_cp_all * stasis_cp_all_create(const char *name, snapshot_get_id id_fn)
Create an all instance of the cache pattern.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
struct stasis_cache * stasis_cp_all_cache(struct stasis_cp_all *all)
Get the cache.
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, struct stasis_message_type *type)
Indicate to an instance that we are interested in a message type.
struct stasis_topic * stasis_cp_single_topic(struct stasis_cp_single *one)
Get the topic for this instance.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
stasis_subscription_message_filter
Stasis subscription message filters.
struct stasis_topic * stasis_cp_single_topic_cached(struct stasis_cp_single *one)
Get the caching topic for this instance.
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
Stops caching and forwarding messages.
struct stasis_topic * stasis_cp_all_topic_cached(struct stasis_cp_all *all)
Get the caching topic.
static struct sorcery_test_caching cache
Global scope caching structure for testing.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
int stasis_cp_single_set_filter(struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.