Asterisk - The Open Source Telephony Project  21.4.1
test_stasis_endpoints.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*!
20  * \file
21  * \brief Test endpoints.
22  *
23  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
24  *
25  * \ingroup tests
26  */
27 
28 /*** MODULEINFO
29  <depend>TEST_FRAMEWORK</depend>
30  <depend>res_stasis_test</depend>
31  <support_level>core</support_level>
32  ***/
33 
34 #include "asterisk.h"
35 
36 #include "asterisk/astobj2.h"
37 #include "asterisk/channel.h"
38 #include "asterisk/endpoints.h"
39 #include "asterisk/module.h"
40 #include "asterisk/stasis_channels.h"
42 #include "asterisk/stasis_test.h"
43 #include "asterisk/test.h"
44 
45 static const char *test_category = "/stasis/endpoints/";
46 
47 /*! \brief Message matcher looking for cache update messages */
48 static int cache_update(struct stasis_message *msg, const void *data) {
49  struct stasis_cache_update *update;
50  struct ast_endpoint_snapshot *snapshot;
51  const char *name = data;
52 
54  return 0;
55  }
56 
57  update = stasis_message_data(msg);
58  if (ast_endpoint_snapshot_type() != update->type) {
59  return 0;
60  }
61 
62  snapshot = stasis_message_data(update->old_snapshot);
63  if (!snapshot) {
64  snapshot = stasis_message_data(update->new_snapshot);
65  }
66 
67  return 0 == strcmp(name, snapshot->resource);
68 }
69 
71 {
72  RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
73  RAII_VAR(struct ast_channel *, chan, NULL, ast_hangup);
74  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
75  RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
76  struct stasis_message *msg;
77  struct stasis_message_type *type;
78  struct ast_endpoint_snapshot *actual_snapshot;
79  int actual_count;
80 
81  switch (cmd) {
82  case TEST_INIT:
83  info->name = __func__;
84  info->category = test_category;
85  info->summary = "Test endpoint updates as its state changes";
86  info->description =
87  "Test endpoint updates as its state changes";
88  return AST_TEST_NOT_RUN;
89  case TEST_EXECUTE:
90  break;
91  }
92 
93  uut = ast_endpoint_create("TEST", __func__);
94  ast_test_validate(test, NULL != uut);
95 
97  ast_test_validate(test, NULL != sink);
98 
99  sub = stasis_subscribe(ast_endpoint_topic(uut),
100  stasis_message_sink_cb(), sink);
101  ast_test_validate(test, NULL != sub);
102 
104  actual_count = stasis_message_sink_wait_for_count(sink, 1,
105  STASIS_SINK_DEFAULT_WAIT);
106  ast_test_validate(test, 1 == actual_count);
107  msg = sink->messages[0];
108  type = stasis_message_type(msg);
109  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
110  actual_snapshot = stasis_message_data(msg);
111  ast_test_validate(test, AST_ENDPOINT_OFFLINE == actual_snapshot->state);
112 
113  ast_endpoint_set_max_channels(uut, 8675309);
114  actual_count = stasis_message_sink_wait_for_count(sink, 2,
115  STASIS_SINK_DEFAULT_WAIT);
116  ast_test_validate(test, 2 == actual_count);
117  msg = sink->messages[1];
118  type = stasis_message_type(msg);
119  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
120  actual_snapshot = stasis_message_data(msg);
121  ast_test_validate(test, 8675309 == actual_snapshot->max_channels);
122 
123  return AST_TEST_PASS;
124 }
125 
126 AST_TEST_DEFINE(cache_clear)
127 {
128  RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
129  RAII_VAR(struct ast_channel *, chan, NULL, ast_hangup);
130  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
131  RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
132  struct stasis_message *msg;
133  struct stasis_message_type *type;
134  struct ast_endpoint_snapshot *actual_snapshot;
135  struct stasis_cache_update *update;
136  int message_index;
137 
138  switch (cmd) {
139  case TEST_INIT:
140  info->name = __func__;
141  info->category = test_category;
142  info->summary = "Test endpoint state change messages";
143  info->description = "Test endpoint state change messages";
144  return AST_TEST_NOT_RUN;
145  case TEST_EXECUTE:
146  break;
147  }
148 
149  /* Subscribe to the cache topic */
151  ast_test_validate(test, NULL != sink);
152 
153  sub = stasis_subscribe(
155  stasis_message_sink_cb(), sink);
156  ast_test_validate(test, NULL != sub);
157 
158  uut = ast_endpoint_create("TEST", __func__);
159  ast_test_validate(test, NULL != uut);
160 
161  /* Since the cache topic is a singleton (ew), it may have messages from
162  * elsewheres that it's processing, or maybe even some final messages
163  * from the prior test. We've got to wait_for our specific message,
164  * instead of wait_for_count.
165  */
166  message_index = stasis_message_sink_wait_for(sink, 0,
167  cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
168  ast_test_validate(test, 0 <= message_index);
169 
170  /* First message should be a cache creation entry for our endpoint */
171  msg = sink->messages[message_index];
172  type = stasis_message_type(msg);
173  ast_test_validate(test, stasis_cache_update_type() == type);
174  update = stasis_message_data(msg);
175  ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
176  ast_test_validate(test, NULL == update->old_snapshot);
177  actual_snapshot = stasis_message_data(update->new_snapshot);
178  ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
179  ast_test_validate(test,
180  0 == strcmp(__func__, actual_snapshot->resource));
181 
183  uut = NULL;
184 
185  /* Note: there's a few messages between the creation and the clear.
186  * Wait for all of them... */
187  message_index = stasis_message_sink_wait_for(sink, message_index + 2,
188  cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
189  ast_test_validate(test, 0 <= message_index);
190 
191  /* Now we should have a cache removal entry */
192  msg = sink->messages[message_index];
193  type = stasis_message_type(msg);
194  ast_test_validate(test, stasis_cache_update_type() == type);
195  update = stasis_message_data(msg);
196  ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
197  actual_snapshot = stasis_message_data(update->old_snapshot);
198  ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
199  ast_test_validate(test,
200  0 == strcmp(__func__, actual_snapshot->resource));
201  ast_test_validate(test, NULL == update->new_snapshot);
202 
203  return AST_TEST_PASS;
204 }
205 
206 AST_TEST_DEFINE(channel_messages)
207 {
208  RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
209  RAII_VAR(struct ast_channel *, chan, NULL, ast_hangup);
210  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
211  RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
212  struct stasis_message *msg;
213  struct stasis_message_type *type;
214  struct ast_endpoint_snapshot *actual_snapshot;
215  int expected_count;
216  int actual_count;
217  int i;
218  int channel_index = -1;
219  int endpoint_index = -1;
220 
221  switch (cmd) {
222  case TEST_INIT:
223  info->name = __func__;
224  info->category = test_category;
225  info->summary = "Test channel messages on an endpoint topic";
226  info->description =
227  "Test channel messages on an endpoint topic";
228  return AST_TEST_NOT_RUN;
229  case TEST_EXECUTE:
230  break;
231  }
232 
233  uut = ast_endpoint_create("TEST", __func__);
234  ast_test_validate(test, NULL != uut);
235 
237  ast_test_validate(test, NULL != sink);
238 
239  sub = stasis_subscribe(ast_endpoint_topic(uut),
240  stasis_message_sink_cb(), sink);
241  ast_test_validate(test, NULL != sub);
242 
243  chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", __func__, "100",
244  "100", "default", NULL, NULL, 0, "TEST/test_res");
245  ast_test_validate(test, NULL != chan);
246 
247  ast_endpoint_add_channel(uut, chan);
248 
249  actual_count = stasis_message_sink_wait_for_count(sink, 1,
250  STASIS_SINK_DEFAULT_WAIT);
251  ast_test_validate(test, 1 == actual_count);
252 
253  msg = sink->messages[0];
254  type = stasis_message_type(msg);
255  ast_test_validate(test, ast_endpoint_snapshot_type() == type);
256  actual_snapshot = stasis_message_data(msg);
257  ast_test_validate(test, 1 == actual_snapshot->num_channels);
258 
259  ast_hangup(chan);
260  chan = NULL;
261 
262  expected_count = 3;
263  actual_count = stasis_message_sink_wait_for_count(sink, expected_count,
264  STASIS_SINK_DEFAULT_WAIT);
265  ast_test_validate(test, expected_count == actual_count);
266 
267  for (i = 0; i < expected_count; i++) {
268  msg = sink->messages[i];
269  type = stasis_message_type(msg);
270  if (type == ast_channel_snapshot_type()) {
271  channel_index = i;
272  }
273  if (type == ast_endpoint_snapshot_type()) {
274  endpoint_index = i;
275  }
276  }
277  ast_test_validate(test, channel_index >= 0 && endpoint_index >= 0);
278  actual_snapshot = stasis_message_data(sink->messages[endpoint_index]);
279  ast_test_validate(test, 0 == actual_snapshot->num_channels);
280 
281  return AST_TEST_PASS;
282 }
283 
284 static int unload_module(void)
285 {
286  AST_TEST_UNREGISTER(state_changes);
287  AST_TEST_UNREGISTER(cache_clear);
288  AST_TEST_UNREGISTER(channel_messages);
289  return 0;
290 }
291 
292 static int load_module(void)
293 {
294  AST_TEST_REGISTER(state_changes);
295  AST_TEST_REGISTER(cache_clear);
296  AST_TEST_REGISTER(channel_messages);
298 }
299 
300 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, "Endpoint stasis-related testing",
301  .support_level = AST_MODULE_SUPPORT_CORE,
302  .load = load_module,
303  .unload = unload_module,
304  .requires = "res_stasis_test",
305 );
Structure that collects messages from a topic.
Definition: stasis_test.h:44
Main Channel structure associated with a channel.
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_message * old_snapshot
Old value from the cache.
Definition: stasis.h:969
int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
Wait for a message that matches the given criteria.
struct ast_endpoint * ast_endpoint_create(const char *tech, const char *resource)
Create an endpoint struct.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
Test Framework API.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
Updates the state of the given endpoint.
int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Wait for a sink's num_messages field to reach a certain level.
Endpoint abstractions.
struct stasis_message_sink * stasis_message_sink_create(void)
Create a message sink.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
General Asterisk PBX channel definitions.
const ast_string_field resource
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
stasis_subscription_cb stasis_message_sink_cb(void)
Topic callback to receive messages.
Cache update message.
Definition: stasis.h:965
int ast_endpoint_add_channel(struct ast_endpoint *endpoint, struct ast_channel *chan)
Adds a channel to the given endpoint.
A snapshot of an endpoint's state.
static int cache_update(struct stasis_message *msg, const void *data)
Message matcher looking for cache update messages.
struct stasis_message * new_snapshot
New value.
Definition: stasis.h:971
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
const ast_string_field tech
struct stasis_message_type * type
Convenience reference to snapshot type.
Definition: stasis.h:967
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971
void ast_hangup(struct ast_channel *chan)
Hang up a channel.
Definition: channel.c:2541
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int max_channels)
Updates the maximum number of channels an endpoint supports.
#define AST_TEST_DEFINE(hdr)
Definition: test.h:126
The state change queue. State changes are queued for processing by a separate thread.
Definition: devicestate.c:208
void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
Shutsdown an ast_endpoint.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
#define ast_channel_alloc(needqueue, state, cid_num, cid_name, acctcode, exten, context, assignedids, requestor, amaflag,...)
Create a channel structure.
Definition: channel.h:1258
Asterisk module definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
Endpoint abstractions.
enum ast_endpoint_state state
Test infrastructure for dealing with Stasis.