Asterisk - The Open Source Telephony Project  21.4.1
Data Structures | Functions
stasis_cache_pattern.c File Reference

Typical cache pattern for Stasis topics. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis_cache_pattern.h"

Go to the source code of this file.

Data Structures

struct  stasis_cp_all
 
struct  stasis_cp_single
 

Functions

static void all_dtor (void *obj)
 
static void one_dtor (void *obj)
 
struct stasis_cachestasis_cp_all_cache (struct stasis_cp_all *all)
 Get the cache. More...
 
struct stasis_cp_allstasis_cp_all_create (const char *name, snapshot_get_id id_fn)
 Create an all instance of the cache pattern. More...
 
struct stasis_topicstasis_cp_all_topic (struct stasis_cp_all *all)
 Get the aggregate topic. More...
 
struct stasis_topicstasis_cp_all_topic_cached (struct stasis_cp_all *all)
 Get the caching topic. More...
 
int stasis_cp_single_accept_message_type (struct stasis_cp_single *one, struct stasis_message_type *type)
 Indicate to an instance that we are interested in a message type. More...
 
struct stasis_cp_singlestasis_cp_single_create (struct stasis_cp_all *all, const char *name)
 Create the 'one' side of the cache pattern. More...
 
int stasis_cp_single_set_filter (struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_topicstasis_cp_single_topic (struct stasis_cp_single *one)
 Get the topic for this instance. More...
 
struct stasis_topicstasis_cp_single_topic_cached (struct stasis_cp_single *one)
 Get the caching topic for this instance. More...
 
void stasis_cp_single_unsubscribe (struct stasis_cp_single *one)
 Stops caching and forwarding messages. More...
 
struct stasis_cp_singlestasis_cp_sink_create (struct stasis_cp_all *all, const char *name)
 Create a sink in the cache pattern. More...
 

Detailed Description

Typical cache pattern for Stasis topics.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_cache_pattern.c.

Function Documentation

struct stasis_cache* stasis_cp_all_cache ( struct stasis_cp_all all)

Get the cache.

This is the shared cache for all corresponding stasis_cp_single objects.

Parameters
allAll side caching pattern object.
Returns
The cache.
Return values
NULLif all is NULL

Definition at line 118 of file stasis_cache_pattern.c.

Referenced by ast_endpoint_cache().

119 {
120  if (!all) {
121  return NULL;
122  }
123  return all->cache;
124 }
struct stasis_cp_all* stasis_cp_all_create ( const char *  name,
snapshot_get_id  id_fn 
)

Create an all instance of the cache pattern.

This object is AO2 managed, so dispose of it with ao2_cleanup().

Parameters
nameBase name of the topics.
id_fnIdentity function for the cache.
Returns
All side instance.
Return values
NULLon error.

Definition at line 65 of file stasis_cache_pattern.c.

References ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), stasis_cache_create(), stasis_forward_all(), and stasis_topic_create().

Referenced by ast_endpoint_stasis_init().

67 {
68  char *cached_name = NULL;
69  struct stasis_cp_all *all;
70  static int cache_id;
71 
72  all = ao2_t_alloc(sizeof(*all), all_dtor, name);
73  if (!all) {
74  return NULL;
75  }
76 
77  ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
78  if (!cached_name) {
79  ao2_ref(all, -1);
80 
81  return NULL;
82  }
83 
84  all->topic = stasis_topic_create(name);
85  all->topic_cached = stasis_topic_create(cached_name);
86  ast_free(cached_name);
87  all->cache = stasis_cache_create(id_fn);
88  all->forward_all_to_cached =
89  stasis_forward_all(all->topic, all->topic_cached);
90 
91  if (!all->topic || !all->topic_cached || !all->cache ||
92  !all->forward_all_to_cached) {
93  ao2_ref(all, -1);
94 
95  return NULL;
96  }
97 
98  return all;
99 }
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_topic* stasis_cp_all_topic ( struct stasis_cp_all all)

Get the aggregate topic.

This topic aggregates all messages published to corresponding stasis_cp_single_topic() topics.

Parameters
allAll side caching pattern object.
Returns
The aggregate topic.
Return values
NULLif all is NULL

Definition at line 101 of file stasis_cache_pattern.c.

Referenced by ast_endpoint_topic_all().

102 {
103  if (!all) {
104  return NULL;
105  }
106  return all->topic;
107 }
struct stasis_topic* stasis_cp_all_topic_cached ( struct stasis_cp_all all)

Get the caching topic.

This topic aggregates all messages from the corresponding stasis_cp_single_topic_cached() topics.

Note that one normally only subscribes to the caching topic, since data is fed to it from its upstream topic.

Parameters
allAll side caching pattern object.
Returns
The aggregate caching topic.
Return values
NULLif all is NULL

Definition at line 109 of file stasis_cache_pattern.c.

Referenced by ast_endpoint_topic_all_cached().

111 {
112  if (!all) {
113  return NULL;
114  }
115  return all->topic_cached;
116 }
int stasis_cp_single_accept_message_type ( struct stasis_cp_single one,
struct stasis_message_type type 
)

Indicate to an instance that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
oneOne side of the cache pattern.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 222 of file stasis_cache_pattern.c.

References stasis_caching_accept_message_type().

224 {
225  if (!one) {
226  return -1;
227  }
228  return stasis_caching_accept_message_type(one->topic_cached, type);
229 }
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_cp_single* stasis_cp_single_create ( struct stasis_cp_all all,
const char *  name 
)

Create the 'one' side of the cache pattern.

Create the 'one' and forward to all's topic and topic_cached.

Dispose of using stasis_cp_single_unsubscribe().

Parameters
allCorresponding all side.
nameBase name for the topics.
Returns
One side instance

Definition at line 139 of file stasis_cache_pattern.c.

References ao2_ref, stasis_caching_get_topic(), stasis_cp_sink_create(), and stasis_forward_all().

141 {
142  struct stasis_cp_single *one;
143 
144  one = stasis_cp_sink_create(all, name);
145  if (!one) {
146  return NULL;
147  }
148 
149  one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
150  one->forward_cached_to_all = stasis_forward_all(
151  stasis_caching_get_topic(one->topic_cached), all->topic_cached);
152 
153  if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
154  ao2_ref(one, -1);
155 
156  return NULL;
157  }
158 
159  return one;
160 }
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
int stasis_cp_single_set_filter ( struct stasis_cp_single one,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
oneOne side of the cache pattern.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 231 of file stasis_cache_pattern.c.

References stasis_caching_set_filter().

233 {
234  if (!one) {
235  return -1;
236  }
237  return stasis_caching_set_filter(one->topic_cached, filter);
238 }
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct stasis_topic* stasis_cp_single_topic ( struct stasis_cp_single one)

Get the topic for this instance.

This is the topic to which one would post instance-specific messages, or subscribe for single-instance, uncached messages.

Parameters
oneOne side of the cache pattern.
Returns
The main topic.
Return values
NULLif one is NULL

Definition at line 205 of file stasis_cache_pattern.c.

Referenced by ast_endpoint_topic().

206 {
207  if (!one) {
208  return NULL;
209  }
210  return one->topic;
211 }
struct stasis_topic* stasis_cp_single_topic_cached ( struct stasis_cp_single one)

Get the caching topic for this instance.

Note that one normally only subscribes to the caching topic, since data is fed to it from its upstream topic.

Parameters
oneOne side of the cache pattern.
Returns
The caching topic.
Return values
NULLif one is NULL

Definition at line 213 of file stasis_cache_pattern.c.

References stasis_caching_get_topic().

Referenced by ast_endpoint_topic_cached().

215 {
216  if (!one) {
217  return NULL;
218  }
219  return stasis_caching_get_topic(one->topic_cached);
220 }
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
void stasis_cp_single_unsubscribe ( struct stasis_cp_single one)

Stops caching and forwarding messages.

Parameters
oneOne side of the cache pattern.

Definition at line 189 of file stasis_cache_pattern.c.

References stasis_caching_unsubscribe().

190 {
191  if (!one) {
192  return;
193  }
194 
195  stasis_forward_cancel(one->forward_topic_to_all);
196  one->forward_topic_to_all = NULL;
197  stasis_forward_cancel(one->forward_cached_to_all);
198  one->forward_cached_to_all = NULL;
199  stasis_caching_unsubscribe(one->topic_cached);
200  one->topic_cached = NULL;
201 
202  ao2_cleanup(one);
203 }
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct stasis_cp_single* stasis_cp_sink_create ( struct stasis_cp_all all,
const char *  name 
)

Create a sink in the cache pattern.

Create the 'one' but do not automatically forward to the all's topic. This is useful when aggregating other topic's messages created with stasis_cp_single_create in another caching topic without replicating those messages in the all's topics.

Dispose of using stasis_cp_single_unsubscribe().

Parameters
allCorresponding all side.
nameBase name for the topics.
Returns
One side instance

Definition at line 162 of file stasis_cache_pattern.c.

References ao2_ref, stasis_caching_topic_create(), and stasis_topic_create().

Referenced by stasis_cp_single_create().

164 {
165  struct stasis_cp_single *one;
166 
167  one = ao2_t_alloc(sizeof(*one), one_dtor, name);
168  if (!one) {
169  return NULL;
170  }
171 
172  one->topic = stasis_topic_create(name);
173  if (!one->topic) {
174  ao2_ref(one, -1);
175 
176  return NULL;
177  }
178 
179  one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
180  if (!one->topic_cached) {
181  ao2_ref(one, -1);
182 
183  return NULL;
184  }
185 
186  return one;
187 }
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617