Asterisk - The Open Source Telephony Project  21.4.1
test_stasis_state.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2019, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <kharwell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 /*** MODULEINFO
20  <depend>TEST_FRAMEWORK</depend>
21  <support_level>core</support_level>
22  ***/
23 
24 #include "asterisk.h"
25 
26 #include "asterisk/astobj2.h"
27 #include "asterisk/conversions.h"
28 #include "asterisk/module.h"
29 #include "asterisk/stasis_state.h"
30 #include "asterisk/test.h"
31 
32 #define test_category "/stasis/core/state/"
33 
34 #define TOPIC_COUNT 500
35 
36 #define MANAGER_TOPIC "foo"
37 
38 struct stasis_message_type *foo_type(void);
39 
40 /*! foo stasis message type */
41 STASIS_MESSAGE_TYPE_DEFN(foo_type);
42 
43 /*! foo_type data */
44 struct foo_data {
45  size_t bar;
46 };
47 
50 
51 /*!
52  * For testing purposes each subscribed state's id is a number. This value is
53  * the summation of all id's.
54  */
55 static size_t sum_total;
56 
57 /*! Test variable that tracks the running total of state ids */
58 static size_t running_total;
59 
60 /*! This value is set to check if state data is NULL before publishing */
61 static int expect_null;
62 
63 static int validate_data(const char *id, struct foo_data *foo)
64 {
65  size_t num;
66  uintmax_t tmp;
67 
68  if (ast_str_to_umax(id, &tmp)) {
69  ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
70  return -1;
71  }
72  num = (size_t) tmp;
73 
74  running_total += num;
75 
76  if (!foo) {
77  if (expect_null) {
78  return 0;
79  }
80 
81  ast_log(LOG_ERROR, "Expected state data for '%s'\n", id);
82  return -1;
83  }
84 
85  if (expect_null) {
86  ast_log(LOG_ERROR, "Expected NULL state data for '%s'\n", id);
87  return -1;
88  }
89 
90  if (foo->bar != num) {
91  ast_log(LOG_ERROR, "Unexpected state data for '%s'\n", id);
92  return -1;
93  }
94 
95  return 0;
96 }
97 
98 static void handle_validate(const char *id, struct stasis_state_subscriber *sub)
99 {
100  struct foo_data *foo = stasis_state_subscriber_data(sub);
101  validate_data(id, foo);
102  ao2_cleanup(foo);
103 }
104 
105 struct stasis_state_observer foo_observer = {
106  .on_subscribe = handle_validate,
107  .on_unsubscribe = handle_validate
108 };
109 
110 static void foo_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
111 {
112  /* No op since we are not really testing stasis topic handling here */
113 }
114 
115 static int subscriptions_destroy(struct stasis_state_manager *manager, struct subscriptions *subs)
116 {
117  running_total = expect_null = 0;
118 
120  AST_VECTOR_FREE(subs);
121 
122  stasis_state_remove_observer(manager, &foo_observer);
123 
124  if (running_total != sum_total) {
125  ast_log(LOG_ERROR, "Failed to destroy all subscriptions: running=%zu, sum=%zu\n",
126  running_total, sum_total);
127  return -1;
128  }
129 
130  return 0;
131 }
132 
133 static int subscriptions_create(struct stasis_state_manager *manager,
134  struct subscriptions *subs)
135 {
136  size_t i;
137 
138  if (stasis_state_add_observer(manager, &foo_observer) ||
139  AST_VECTOR_INIT(subs, TOPIC_COUNT)) {
140  return -1;
141  }
142 
143  sum_total = running_total = 0;
144  expect_null = 1;
145 
146  for (i = 0; i < TOPIC_COUNT; ++i) {
147  struct stasis_state_subscriber *sub;
148  char id[32];
149 
150  if (snprintf(id, 10, "%zu", i) == -1) {
151  ast_log(LOG_ERROR, "Unable to convert subscriber id to string\n");
152  break;
153  }
154 
155  sub = stasis_state_subscribe_pool(manager, id, foo_type_cb, NULL);
156  if (!sub) {
157  ast_log(LOG_ERROR, "Failed to create a state subscriber for id '%s'\n", id);
158  ao2_ref(sub, -1);
159  break;
160  }
161 
162  if (AST_VECTOR_APPEND(subs, sub)) {
163  ast_log(LOG_ERROR, "Failed to add to foo_sub to vector for id '%s'\n", id);
164  ao2_ref(sub, -1);
165  break;
166  }
167 
168  sum_total += i;
169  }
170 
171  if (i != TOPIC_COUNT || running_total != sum_total) {
172  ast_log(LOG_ERROR, "Failed to create all subscriptions: running=%zu, sum=%zu\n",
173  running_total, sum_total);
174  subscriptions_destroy(manager, subs);
175  return -1;
176  }
177 
178  return 0;
179 }
180 
181 static int publishers_destroy(struct stasis_state_manager *manager, struct publishers *pubs)
182 {
183  size_t i;
184 
185  if (pubs) {
186  /* Remove explicit publishers */
187  AST_VECTOR_CALLBACK_VOID(pubs, ao2_cleanup);
188  AST_VECTOR_FREE(pubs);
189  return 0;
190  }
191 
192  for (i = 0; i < TOPIC_COUNT; ++i) {
193  char id[32];
194 
195  /* Remove implicit publishers */
196  if (snprintf(id, 10, "%zu", i) == -1) {
197  ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
198  return -1;
199  }
200 
201  stasis_state_remove_publish_by_id(manager, id, NULL, NULL);
202  }
203 
204  return 0;
205 }
206 
207 static int publishers_create(struct stasis_state_manager *manager,
208  struct publishers *pubs)
209 {
210  size_t i;
211 
212  if (AST_VECTOR_INIT(pubs, TOPIC_COUNT)) {
213  return -1;
214  }
215 
216  for (i = 0; i < TOPIC_COUNT; ++i) {
217  struct stasis_state_publisher *pub;
218  char id[32];
219 
220  if (snprintf(id, 10, "%zu", i) == -1) {
221  ast_log(LOG_ERROR, "Unable to convert publisher id to string\n");
222  break;
223  }
224 
225  /* Create the state publisher */
226  pub = stasis_state_add_publisher(manager, id);
227  if (!pub) {
228  ast_log(LOG_ERROR, "Failed to create a state publisher for id '%s'\n", id);
229  break;
230  }
231 
232  if (AST_VECTOR_APPEND(pubs, pub)) {
233  ast_log(LOG_ERROR, "Failed to add to publisher to vector for id '%s'\n", id);
234  ao2_ref(pub, -1);
235  break;
236  }
237  }
238 
239  if (i != TOPIC_COUNT) {
240  ast_log(LOG_ERROR, "Failed to create all publishers: count=%zu\n", i);
241  publishers_destroy(manager, pubs);
242  return -1;
243  }
244 
245  return 0;
246 }
247 
248 static struct stasis_message *create_foo_type_message(const char *id)
249 {
250  struct stasis_message *msg;
251  struct foo_data *foo;
252  uintmax_t tmp;
253 
254  foo = ao2_alloc(sizeof(*foo), NULL);
255  if (!foo) {
256  ast_log(LOG_ERROR, "Failed to allocate foo data for '%s'\n", id);
257  return NULL;
258  }
259 
260  if (ast_str_to_umax(id, &tmp)) {
261  ast_log(LOG_ERROR, "Unable to convert the state's id '%s' to numeric\n", id);
262  ao2_ref(foo, -1);
263  return NULL;
264  }
265  foo->bar = (size_t) tmp;
266 
267  msg = stasis_message_create_full(foo_type(), foo, NULL);
268  if (!msg) {
269  ast_log(LOG_ERROR, "Failed to create stasis message for '%s'\n", id);
270  }
271 
272  ao2_ref(foo, -1);
273  return msg;
274 }
275 
276 static int implicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
277 {
278  /* For each state object create and publish new state data */
279  struct foo_data *foo = stasis_message_data(msg);
280 
281  if (validate_data(id, foo)) {
282  return CMP_STOP;
283  }
284 
285  msg = create_foo_type_message(id);
286  if (!msg) {
287  return CMP_STOP;
288  }
289 
290  /* Now publish it on the managed state object */
291  stasis_state_publish_by_id(user_data, id, NULL, msg);
292  ao2_ref(msg, -1);
293 
294  return 0;
295 }
296 
297 static int explicit_publish_cb(const char *id, struct stasis_message *msg, void *user_data)
298 {
299  /* For each state object create and publish new state data */
300  struct publishers *pubs = user_data;
301  struct stasis_state_publisher *pub = NULL;
302  struct foo_data *foo = stasis_message_data(msg);
303  size_t i;
304 
305  if (validate_data(id, foo)) {
306  return CMP_STOP;
307  }
308 
309  msg = create_foo_type_message(id);
310  if (!msg) {
311  return CMP_STOP;
312  }
313 
314  for (i = 0; i < AST_VECTOR_SIZE(pubs); ++i) {
315  if (!strcmp(stasis_state_publisher_id(AST_VECTOR_GET(pubs, i)), id)) {
316  pub = AST_VECTOR_GET(pubs, i);
317  break;
318  }
319  }
320 
321  if (!pub) {
322  ast_log(LOG_ERROR, "Unable to locate publisher for id '%s'\n", id);
323  return CMP_STOP;
324  }
325 
326  stasis_state_publish(pub, msg);
327  ao2_ref(msg, -1);
328 
329  return 0;
330 }
331 
332 static int publish(struct stasis_state_manager *manager, on_stasis_state cb,
333  void *user_data)
334 {
335  /* First time there is no state data */
336  expect_null = 1;
337 
338  running_total = 0;
339  stasis_state_callback_all(manager, cb, user_data);
340 
341  if (running_total != sum_total) {
342  ast_log(LOG_ERROR, "Failed manager_callback (1): running=%zu, sum=%zu\n",
343  running_total, sum_total);
344  return -1;
345  }
346 
347  /* Second time check valid state data exists */
348  running_total = expect_null = 0;
349  stasis_state_callback_all(manager, cb, user_data);
350 
351  if (running_total != sum_total) {
352  ast_log(LOG_ERROR, "Failed manager_callback (2): running=%zu, sum=%zu\n",
353  running_total, sum_total);
354  return -1;
355  }
356 
357  return 0;
358 }
359 
360 AST_TEST_DEFINE(implicit_publish)
361 {
362  RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
363  struct subscriptions subs;
364  int rc = AST_TEST_PASS;
365 
366  switch (cmd) {
367  case TEST_INIT:
368  info->name = __func__;
369  info->category = test_category;
370  info->summary = "Test implicit publishing of stasis state";
371  info->description = info->summary;
372  return AST_TEST_NOT_RUN;
373  case TEST_EXECUTE:
374  break;
375  }
376 
377  manager = stasis_state_manager_create(MANAGER_TOPIC);
378  ast_test_validate(test, manager != NULL);
379 
380  ast_test_validate(test, !subscriptions_create(manager, &subs));
381 
382  ast_test_validate_cleanup(test, !publish(manager, implicit_publish_cb, manager),
383  rc, cleanup);
384 
385 cleanup:
386  if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, NULL)) {
387  return AST_TEST_FAIL;
388  }
389 
390  /*
391  * State subscriptions add a ref a state. The state in turn adds a ref
392  * to the manager. So if more than one ref is held on the manager before
393  * exiting, there is a ref leak some place.
394  */
395  if (ao2_ref(manager, 0) != 1) {
396  ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
397  return AST_TEST_FAIL;
398  }
399 
400  return rc;
401 }
402 
403 AST_TEST_DEFINE(explicit_publish)
404 {
405  RAII_VAR(struct stasis_state_manager *, manager, NULL, ao2_cleanup);
406  struct subscriptions subs;
407  struct publishers pubs;
408  int rc = AST_TEST_PASS;
409 
410  switch (cmd) {
411  case TEST_INIT:
412  info->name = __func__;
413  info->category = test_category;
414  info->summary = "Test explicit publishing of stasis state";
415  info->description = info->summary;
416  return AST_TEST_NOT_RUN;
417  case TEST_EXECUTE:
418  break;
419  }
420 
421  manager = stasis_state_manager_create(MANAGER_TOPIC);
422  ast_test_validate(test, manager != NULL);
423 
424  ast_test_validate(test, !subscriptions_create(manager, &subs));
425  ast_test_validate_cleanup(test, !publishers_create(manager, &pubs), rc, cleanup);
426 
427  ast_test_validate_cleanup(test, !publish(manager, explicit_publish_cb, &pubs),
428  rc, cleanup);
429 
430 cleanup:
431  if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, &pubs)) {
432  return AST_TEST_FAIL;
433  }
434 
435  /*
436  * State subscriptions add a ref a state. The state in turn adds a ref
437  * to the manager. So if more than one ref is held on the manager before
438  * exiting, there is a ref leak some place.
439  */
440  if (ao2_ref(manager, 0) != 1) {
441  ast_log(LOG_ERROR, "Memory leak - Too many references held on manager\n");
442  return AST_TEST_FAIL;
443  }
444 
445  return rc;
446 }
447 
448 static int unload_module(void)
449 {
450  AST_TEST_UNREGISTER(implicit_publish);
451  AST_TEST_UNREGISTER(explicit_publish);
452 
453  STASIS_MESSAGE_TYPE_CLEANUP(foo_type);
454 
455  return 0;
456 }
457 
458 static int load_module(void)
459 {
460  if (STASIS_MESSAGE_TYPE_INIT(foo_type) != 0) {
461  return -1;
462  }
463 
464  AST_TEST_REGISTER(implicit_publish);
465  AST_TEST_REGISTER(explicit_publish);
466 
468 }
469 
470 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Stasis state testing");
Managed stasis state event interface.
Definition: stasis_state.h:463
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Definition: vector.h:174
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
Definition: stasis_state.c:447
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
Definition: stasis_state.c:659
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
Definition: stasis_state.c:701
int ast_str_to_umax(const char *str, uintmax_t *res)
Convert the given string to an unsigned max size integer.
Definition: conversions.c:119
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
Definition: stasis.h:1493
void(* on_subscribe)(const char *id, struct stasis_state_subscriber *sub)
Raised when any managed state is being subscribed.
Definition: stasis_state.h:470
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
Definition: stasis_state.c:553
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
Definition: vector.h:256
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Definition: stasis_state.c:498
Test Framework API.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
Definition: stasis.h:1515
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
Definition: stasis_state.h:521
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
Definition: stasis_state.c:478
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
Definition: stasis_state.c:325
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
Definition: stasis_state.c:689
static void cleanup(void)
Clean up any old apps that we don't need any more.
Definition: res_stasis.c:327
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
Definition: vector.h:113
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
Definition: stasis_state.c:741
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Definition: stasis_state.c:532
Conversion utility functions.
#define AST_VECTOR(name, type)
Define a vector structure.
Definition: vector.h:44
Stasis State API.
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
Definition: stasis.h:1440
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
Definition: stasis_state.c:639
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_TEST_DEFINE(hdr)
Definition: test.h:126
#define ASTERISK_GPL_KEY
The text the key() function should return.
Definition: module.h:46
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
Asterisk module definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
Definition: stasis_state.c:563
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.
Definition: vector.h:862