Asterisk - The Open Source Telephony Project  21.4.1
stasis_cache_pattern.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*! \file
20  *
21  * \brief Typical cache pattern for Stasis topics.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25 
26 /*** MODULEINFO
27  <support_level>core</support_level>
28  ***/
29 
30 #include "asterisk.h"
31 
32 #include "asterisk/astobj2.h"
34 
35 struct stasis_cp_all {
36  struct stasis_topic *topic;
37  struct stasis_topic *topic_cached;
38  struct stasis_cache *cache;
39 
40  struct stasis_forward *forward_all_to_cached;
41 };
42 
44  struct stasis_topic *topic;
45  struct stasis_caching_topic *topic_cached;
46 
47  struct stasis_forward *forward_topic_to_all;
48  struct stasis_forward *forward_cached_to_all;
49 };
50 
51 static void all_dtor(void *obj)
52 {
53  struct stasis_cp_all *all = obj;
54 
55  ao2_cleanup(all->topic);
56  all->topic = NULL;
57  ao2_cleanup(all->topic_cached);
58  all->topic_cached = NULL;
59  ao2_cleanup(all->cache);
60  all->cache = NULL;
61  stasis_forward_cancel(all->forward_all_to_cached);
62  all->forward_all_to_cached = NULL;
63 }
64 
65 struct stasis_cp_all *stasis_cp_all_create(const char *name,
66  snapshot_get_id id_fn)
67 {
68  char *cached_name = NULL;
69  struct stasis_cp_all *all;
70  static int cache_id;
71 
72  all = ao2_t_alloc(sizeof(*all), all_dtor, name);
73  if (!all) {
74  return NULL;
75  }
76 
77  ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
78  if (!cached_name) {
79  ao2_ref(all, -1);
80 
81  return NULL;
82  }
83 
84  all->topic = stasis_topic_create(name);
85  all->topic_cached = stasis_topic_create(cached_name);
86  ast_free(cached_name);
87  all->cache = stasis_cache_create(id_fn);
88  all->forward_all_to_cached =
89  stasis_forward_all(all->topic, all->topic_cached);
90 
91  if (!all->topic || !all->topic_cached || !all->cache ||
92  !all->forward_all_to_cached) {
93  ao2_ref(all, -1);
94 
95  return NULL;
96  }
97 
98  return all;
99 }
100 
102 {
103  if (!all) {
104  return NULL;
105  }
106  return all->topic;
107 }
108 
110  struct stasis_cp_all *all)
111 {
112  if (!all) {
113  return NULL;
114  }
115  return all->topic_cached;
116 }
117 
119 {
120  if (!all) {
121  return NULL;
122  }
123  return all->cache;
124 }
125 
126 static void one_dtor(void *obj)
127 {
128  struct stasis_cp_single *one = obj;
129 
130  /* Should already be unsubscribed */
131  ast_assert(one->topic_cached == NULL);
132  ast_assert(one->forward_topic_to_all == NULL);
133  ast_assert(one->forward_cached_to_all == NULL);
134 
135  ao2_cleanup(one->topic);
136  one->topic = NULL;
137 }
138 
140  const char *name)
141 {
142  struct stasis_cp_single *one;
143 
144  one = stasis_cp_sink_create(all, name);
145  if (!one) {
146  return NULL;
147  }
148 
149  one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
150  one->forward_cached_to_all = stasis_forward_all(
151  stasis_caching_get_topic(one->topic_cached), all->topic_cached);
152 
153  if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
154  ao2_ref(one, -1);
155 
156  return NULL;
157  }
158 
159  return one;
160 }
161 
163  const char *name)
164 {
165  struct stasis_cp_single *one;
166 
167  one = ao2_t_alloc(sizeof(*one), one_dtor, name);
168  if (!one) {
169  return NULL;
170  }
171 
172  one->topic = stasis_topic_create(name);
173  if (!one->topic) {
174  ao2_ref(one, -1);
175 
176  return NULL;
177  }
178 
179  one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
180  if (!one->topic_cached) {
181  ao2_ref(one, -1);
182 
183  return NULL;
184  }
185 
186  return one;
187 }
188 
190 {
191  if (!one) {
192  return;
193  }
194 
195  stasis_forward_cancel(one->forward_topic_to_all);
196  one->forward_topic_to_all = NULL;
197  stasis_forward_cancel(one->forward_cached_to_all);
198  one->forward_cached_to_all = NULL;
199  stasis_caching_unsubscribe(one->topic_cached);
200  one->topic_cached = NULL;
201 
202  ao2_cleanup(one);
203 }
204 
206 {
207  if (!one) {
208  return NULL;
209  }
210  return one->topic;
211 }
212 
214  struct stasis_cp_single *one)
215 {
216  if (!one) {
217  return NULL;
218  }
219  return stasis_caching_get_topic(one->topic_cached);
220 }
221 
223  struct stasis_message_type *type)
224 {
225  if (!one) {
226  return -1;
227  }
228  return stasis_caching_accept_message_type(one->topic_cached, type);
229 }
230 
233 {
234  if (!one) {
235  return -1;
236  }
237  return stasis_caching_set_filter(one->topic_cached, filter);
238 }
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
Caching pattern for Stasis Message Bus API topics.
Asterisk main include file. File version handling, generic pbx functions.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
Definition: stasis_cache.c:90
struct stasis_cp_single * stasis_cp_single_create(struct stasis_cp_all *all, const char *name)
Create the 'one' side of the cache pattern.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Definition: stasis_cache.c:948
struct stasis_topic * stasis_cp_all_topic(struct stasis_cp_all *all)
Get the aggregate topic.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119
struct stasis_cp_all * stasis_cp_all_create(const char *name, snapshot_get_id id_fn)
Create an all instance of the cache pattern.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
struct stasis_cache * stasis_cp_all_cache(struct stasis_cp_all *all)
Get the cache.
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
Definition: stasis_cache.c:109
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, struct stasis_message_type *type)
Indicate to an instance that we are interested in a message type.
struct stasis_topic * stasis_cp_single_topic(struct stasis_cp_single *one)
Get the topic for this instance.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
Definition: stasis_cache.c:360
stasis_subscription_message_filter
Stasis subscription message filters.
Definition: stasis.h:294
struct stasis_topic * stasis_cp_single_topic_cached(struct stasis_cp_single *one)
Get the caching topic for this instance.
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
Stops caching and forwarding messages.
struct stasis_topic * stasis_cp_all_topic_cached(struct stasis_cp_all *all)
Get the caching topic.
Forwarding information.
Definition: stasis.c:1531
static struct sorcery_test_caching cache
Global scope caching structure for testing.
Definition: test_sorcery.c:178
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
Definition: stasis.c:1578
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Definition: stasis_cache.c:85
int stasis_cp_single_set_filter(struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
Definition: stasis.h:1009