38 #include "asterisk/stasis_message_router.h"
41 #define test_category "/stasis/core/"
62 "Message: %s\r\n", text);
83 info->name = __func__;
84 info->category = test_category;
85 info->summary =
"Test basic message_type functions";
86 info->description =
"Test basic message_type functions";
87 return AST_TEST_NOT_RUN;
104 RAII_VAR(
char *, data, NULL, ao2_cleanup);
105 char *expected =
"SomeData";
106 struct timeval expected_timestamp;
107 struct timeval time_diff;
112 info->name = __func__;
113 info->category = test_category;
114 info->summary =
"Test basic message functions";
115 info->description =
"Test basic message functions";
116 return AST_TEST_NOT_RUN;
122 memset(&foreign_eid, 0xFF,
sizeof(foreign_eid));
129 data = ao2_alloc(strlen(expected) + 1, NULL);
130 strcpy(data, expected);
135 ast_test_validate(
test, NULL != uut1);
136 ast_test_validate(
test, NULL != uut2);
149 ast_test_validate(
test, time_diff.tv_sec == 0);
150 ast_test_validate(
test, time_diff.tv_usec < 10000);
159 return AST_TEST_PASS;
165 size_t messages_rxed_len;
166 int ignore_subscriptions;
170 static void consumer_dtor(
void *obj)
174 ast_cond_destroy(&consumer->out);
176 while (consumer->messages_rxed_len > 0) {
177 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
179 ast_free(consumer->messages_rxed);
180 consumer->messages_rxed = NULL;
183 static struct consumer *consumer_create(
int ignore_subscriptions)
185 struct consumer *consumer;
187 consumer = ao2_alloc(
sizeof(*consumer), consumer_dtor);
192 consumer->ignore_subscriptions = ignore_subscriptions;
193 consumer->messages_rxed =
ast_malloc(
sizeof(*consumer->messages_rxed));
194 if (!consumer->messages_rxed) {
195 ao2_cleanup(consumer);
199 ast_cond_init(&consumer->out, NULL);
206 struct consumer *consumer = data;
207 RAII_VAR(
struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
211 ++consumer->messages_rxed_len;
212 consumer->messages_rxed =
ast_realloc(consumer->messages_rxed,
sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
213 ast_assert(consumer->messages_rxed != NULL);
214 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
219 consumer->complete = 1;
220 consumer_needs_cleanup = consumer;
223 ast_cond_signal(&consumer->out);
228 struct consumer *consumer = data;
229 RAII_VAR(
struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
233 ++consumer->messages_rxed_len;
234 consumer->messages_rxed =
ast_realloc(consumer->messages_rxed,
sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
235 ast_assert(consumer->messages_rxed != NULL);
236 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
241 consumer->complete = 1;
242 consumer_needs_cleanup = consumer;
246 static int consumer_wait_for(
struct consumer *consumer,
size_t expected_len)
249 struct timespec end = {
250 .tv_sec = start.tv_sec + 30,
251 .tv_nsec = start.tv_usec * 1000
256 while (consumer->messages_rxed_len < expected_len) {
259 if (r == ETIMEDOUT) {
264 return consumer->messages_rxed_len;
267 static int consumer_wait_for_completion(
struct consumer *consumer)
270 struct timespec end = {
271 .tv_sec = start.tv_sec + 3,
272 .tv_nsec = start.tv_usec * 1000
277 while (!consumer->complete) {
280 if (r == ETIMEDOUT) {
285 return consumer->complete;
288 static int consumer_should_stay(
struct consumer *consumer,
size_t expected_len)
291 struct timeval diff = {
295 struct timeval end_tv =
ast_tvadd(start, diff);
296 struct timespec end = {
297 .tv_sec = end_tv.tv_sec,
298 .tv_nsec = end_tv.tv_usec * 1000
303 while (consumer->messages_rxed_len == expected_len) {
306 if (r == ETIMEDOUT) {
311 return consumer->messages_rxed_len;
321 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
322 RAII_VAR(
char *, expected_uniqueid, NULL, ast_free);
328 info->name = __func__;
329 info->category = test_category;
330 info->summary =
"Test subscribe/unsubscribe messages";
331 info->description =
"Test subscribe/unsubscribe messages";
332 return AST_TEST_NOT_RUN;
340 consumer = consumer_create(0);
341 ast_test_validate(
test, NULL != consumer);
343 uut = stasis_subscribe(
topic, consumer_exec, consumer);
344 ast_test_validate(
test, NULL != uut);
349 complete = consumer_wait_for_completion(consumer);
350 ast_test_validate(
test, 1 == complete);
352 ast_test_validate(
test, 2 == consumer->messages_rxed_len);
358 ast_test_validate(
test, 0 == strcmp(
"Subscribe", change->
description));
359 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
363 ast_test_validate(
test, 0 == strcmp(
"Unsubscribe", change->
description));
364 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
366 return AST_TEST_PASS;
376 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
377 RAII_VAR(
char *, expected_uniqueid, NULL, ast_free);
383 info->name = __func__;
384 info->category = test_category;
385 info->summary =
"Test subscribe/unsubscribe messages using a threadpool subscription";
386 info->description =
"Test subscribe/unsubscribe messages using a threadpool subscription";
387 return AST_TEST_NOT_RUN;
395 consumer = consumer_create(0);
396 ast_test_validate(
test, NULL != consumer);
398 uut = stasis_subscribe_pool(
topic, consumer_exec, consumer);
399 ast_test_validate(
test, NULL != uut);
404 complete = consumer_wait_for_completion(consumer);
405 ast_test_validate(
test, 1 == complete);
407 ast_test_validate(
test, 2 == consumer->messages_rxed_len);
413 ast_test_validate(
test, 0 == strcmp(
"Subscribe", change->
description));
414 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
418 ast_test_validate(
test, 0 == strcmp(
"Unsubscribe", change->
description));
419 ast_test_validate(
test, 0 == strcmp(expected_uniqueid, change->
uniqueid));
421 return AST_TEST_PASS;
431 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
437 info->name = __func__;
438 info->category = test_category;
439 info->summary =
"Test publishing";
440 info->description =
"Test publishing";
441 return AST_TEST_NOT_RUN;
449 consumer = consumer_create(1);
450 ast_test_validate(
test, NULL != consumer);
452 uut = stasis_subscribe(
topic, consumer_exec, consumer);
453 ast_test_validate(
test, NULL != uut);
463 actual_len = consumer_wait_for(consumer, 1);
464 ast_test_validate(
test, 1 == actual_len);
468 return AST_TEST_PASS;
478 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
484 info->name = __func__;
485 info->category = test_category;
486 info->summary =
"Test synchronous publishing";
487 info->description =
"Test synchronous publishing";
488 return AST_TEST_NOT_RUN;
496 consumer = consumer_create(1);
497 ast_test_validate(
test, NULL != consumer);
499 uut = stasis_subscribe(
topic, consumer_exec_sync, consumer);
500 ast_test_validate(
test, NULL != uut);
510 actual_len = consumer->messages_rxed_len;
511 ast_test_validate(
test, 1 == actual_len);
515 return AST_TEST_PASS;
525 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
531 info->name = __func__;
532 info->category = test_category;
533 info->summary =
"Test publishing with a threadpool";
534 info->description =
"Test publishing to a subscriber whose\n"
535 "subscription dictates messages are received through a\n"
537 return AST_TEST_NOT_RUN;
545 consumer = consumer_create(1);
546 ast_test_validate(
test, NULL != consumer);
548 uut = stasis_subscribe_pool(
topic, consumer_exec, consumer);
549 ast_test_validate(
test, NULL != uut);
559 actual_len = consumer_wait_for(consumer, 1);
560 ast_test_validate(
test, 1 == actual_len);
564 return AST_TEST_PASS;
570 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
579 info->name = __func__;
580 info->category = test_category;
581 info->summary =
"Test simple subscriptions";
582 info->description =
"Test simple subscriptions";
583 return AST_TEST_NOT_RUN;
591 consumer = consumer_create(1);
592 ast_test_validate(
test, NULL != consumer);
594 uut = stasis_subscribe(
topic, consumer_exec, consumer);
595 ast_test_validate(
test, NULL != uut);
607 actual_len = consumer_should_stay(consumer, 0);
608 ast_test_validate(
test, 0 == actual_len);
610 return AST_TEST_PASS;
618 RAII_VAR(
struct consumer *, parent_consumer, NULL, ao2_cleanup);
619 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
632 info->name = __func__;
633 info->category = test_category;
634 info->summary =
"Test sending events to a parent topic";
635 info->description =
"Test sending events to a parent topic.\n"
636 "This test creates three topics (one parent, two children)\n"
637 "and publishes a message to one child, and verifies it's\n"
638 "only seen by that child and the parent";
639 return AST_TEST_NOT_RUN;
645 ast_test_validate(
test, NULL != parent_topic);
650 ast_test_validate(
test, NULL != forward_sub);
652 parent_consumer = consumer_create(1);
653 ast_test_validate(
test, NULL != parent_consumer);
654 consumer = consumer_create(1);
655 ast_test_validate(
test, NULL != consumer);
657 parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
658 ast_test_validate(
test, NULL != parent_sub);
660 sub = stasis_subscribe(
topic, consumer_exec, consumer);
661 ast_test_validate(
test, NULL != sub);
671 actual_len = consumer_wait_for(consumer, 1);
672 ast_test_validate(
test, 1 == actual_len);
673 actual_len = consumer_wait_for(parent_consumer, 1);
674 ast_test_validate(
test, 1 == actual_len);
676 return AST_TEST_PASS;
697 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
703 info->name = __func__;
704 info->category = test_category;
705 info->summary =
"Test sending interleaved events to a parent topic";
706 info->description =
"Test sending events to a parent topic.\n"
707 "This test creates three topics (one parent, two children)\n"
708 "and publishes messages alternately between the children.\n"
709 "It verifies that the messages are received in the expected\n"
711 return AST_TEST_NOT_RUN;
717 ast_test_validate(
test, NULL != test_message_type);
723 ast_test_validate(
test, NULL != test_message1);
725 ast_test_validate(
test, NULL != test_message2);
727 ast_test_validate(
test, NULL != test_message3);
730 ast_test_validate(
test, NULL != parent_topic);
732 ast_test_validate(
test, NULL != topic1);
734 ast_test_validate(
test, NULL != topic2);
737 ast_test_validate(
test, NULL != forward_sub1);
739 ast_test_validate(
test, NULL != forward_sub2);
741 consumer = consumer_create(1);
742 ast_test_validate(
test, NULL != consumer);
744 sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
745 ast_test_validate(
test, NULL != sub);
752 actual_len = consumer_wait_for(consumer, 3);
753 ast_test_validate(
test, 3 == actual_len);
755 ast_test_validate(
test, test_message1 == consumer->messages_rxed[0]);
756 ast_test_validate(
test, test_message2 == consumer->messages_rxed[1]);
757 ast_test_validate(
test, test_message3 == consumer->messages_rxed[2]);
759 return AST_TEST_PASS;
781 RAII_VAR(
struct consumer *, consumer1, NULL, ao2_cleanup);
782 RAII_VAR(
struct consumer *, consumer2, NULL, ao2_cleanup);
788 info->name = __func__;
789 info->category = test_category;
790 info->summary =
"Test sending interleaved events to a parent topic with different subscribers";
791 info->description =
"Test sending events to a parent topic.\n"
792 "This test creates three topics (one parent, two children)\n"
793 "and publishes messages alternately between the children.\n"
794 "It verifies that the messages are received in the expected\n"
795 "order, for different subscription types: one with a dedicated\n"
796 "thread, the other on the Stasis threadpool.";
797 return AST_TEST_NOT_RUN;
803 ast_test_validate(
test, NULL != test_message_type);
809 ast_test_validate(
test, NULL != test_message1);
811 ast_test_validate(
test, NULL != test_message2);
813 ast_test_validate(
test, NULL != test_message3);
816 ast_test_validate(
test, NULL != parent_topic);
818 ast_test_validate(
test, NULL != topic1);
820 ast_test_validate(
test, NULL != topic2);
823 ast_test_validate(
test, NULL != forward_sub1);
825 ast_test_validate(
test, NULL != forward_sub2);
827 consumer1 = consumer_create(1);
828 ast_test_validate(
test, NULL != consumer1);
830 consumer2 = consumer_create(1);
831 ast_test_validate(
test, NULL != consumer2);
833 sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
834 ast_test_validate(
test, NULL != sub1);
837 sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
838 ast_test_validate(
test, NULL != sub2);
845 actual_len = consumer_wait_for(consumer1, 3);
846 ast_test_validate(
test, 3 == actual_len);
848 actual_len = consumer_wait_for(consumer2, 3);
849 ast_test_validate(
test, 3 == actual_len);
851 ast_test_validate(
test, test_message1 == consumer1->messages_rxed[0]);
852 ast_test_validate(
test, test_message2 == consumer1->messages_rxed[1]);
853 ast_test_validate(
test, test_message3 == consumer1->messages_rxed[2]);
855 ast_test_validate(
test, test_message1 == consumer2->messages_rxed[0]);
856 ast_test_validate(
test, test_message2 == consumer2->messages_rxed[1]);
857 ast_test_validate(
test, test_message3 == consumer2->messages_rxed[2]);
859 return AST_TEST_PASS;
867 static void cache_test_data_dtor(
void *obj)
872 ast_free(data->value);
879 data = ao2_alloc(
sizeof(*data), cache_test_data_dtor);
884 ast_assert(name != NULL);
885 ast_assert(value != NULL);
889 if (!data->id || !data->value) {
898 return cache_test_message_create_full(type, name, value, &
ast_eid_default);
901 static const char *cache_test_data_id(
struct stasis_message *message)
919 char aggregate_str[30];
926 accumulated += atoi(test_data->value);
928 for (idx = 0; ; ++idx) {
936 accumulated += atoi(test_data->value);
948 if (accumulated == atoi(test_data->value)) {
954 snprintf(aggregate_str,
sizeof(aggregate_str),
"%d", accumulated);
955 aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
956 if (!aggregate_snapshot) {
958 ast_log(LOG_ERROR,
"Could not create aggregate snapshot.\n");
962 return aggregate_snapshot;
983 return value && !strcmp(value, test_data->value);
992 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
999 info->name = __func__;
1000 info->category = test_category;
1001 info->summary =
"Test caching topics only forward cache_update messages.";
1002 info->description =
"Test caching topics only forward cache_update messages.";
1003 return AST_TEST_NOT_RUN;
1009 ast_test_validate(
test, NULL != non_cache_type);
1011 ast_test_validate(
test, NULL != topic);
1013 ast_test_validate(
test, NULL != cache);
1015 ast_test_validate(
test, NULL != caching_topic);
1016 consumer = consumer_create(1);
1017 ast_test_validate(
test, NULL != consumer);
1019 ast_test_validate(
test, NULL != sub);
1022 test_message = cache_test_message_create(non_cache_type,
"1",
"1");
1023 ast_test_validate(
test, NULL != test_message);
1027 actual_len = consumer_should_stay(consumer, 0);
1028 ast_test_validate(
test, 0 == actual_len);
1030 return AST_TEST_PASS;
1039 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
1050 info->name = __func__;
1051 info->category = test_category;
1052 info->summary =
"Test passing messages through cache topic unscathed.";
1053 info->description =
"Test passing messages through cache topic unscathed.";
1054 return AST_TEST_NOT_RUN;
1060 ast_test_validate(
test, NULL != cache_type);
1062 ast_test_validate(
test, NULL != topic);
1064 ast_test_validate(
test, NULL != cache);
1066 ast_test_validate(
test, NULL != caching_topic);
1067 consumer = consumer_create(1);
1068 ast_test_validate(
test, NULL != consumer);
1070 ast_test_validate(
test, NULL != sub);
1073 test_message1_1 = cache_test_message_create(cache_type,
"1",
"1");
1074 ast_test_validate(
test, NULL != test_message1_1);
1075 test_message2_1 = cache_test_message_create(cache_type,
"2",
"1");
1076 ast_test_validate(
test, NULL != test_message2_1);
1081 actual_len = consumer_wait_for(consumer, 2);
1082 ast_test_validate(
test, 2 == actual_len);
1102 test_message2_2 = cache_test_message_create(cache_type,
"2",
"2");
1103 ast_test_validate(
test, NULL != test_message2_2);
1106 actual_len = consumer_wait_for(consumer, 3);
1107 ast_test_validate(
test, 3 == actual_len);
1118 ast_test_validate(
test, NULL != test_message1_clear);
1121 actual_len = consumer_wait_for(consumer, 4);
1122 ast_test_validate(
test, 4 == actual_len);
1129 return AST_TEST_PASS;
1138 RAII_VAR(
struct consumer *, consumer, NULL, ao2_cleanup);
1151 info->name = __func__;
1152 info->category = test_category;
1153 info->summary =
"Test cache dump routines.";
1154 info->description =
"Test cache dump routines.";
1155 return AST_TEST_NOT_RUN;
1161 ast_test_validate(
test, NULL != cache_type);
1163 ast_test_validate(
test, NULL != topic);
1165 ast_test_validate(
test, NULL != cache);
1167 ast_test_validate(
test, NULL != caching_topic);
1168 consumer = consumer_create(1);
1169 ast_test_validate(
test, NULL != consumer);
1171 ast_test_validate(
test, NULL != sub);
1174 test_message1_1 = cache_test_message_create(cache_type,
"1",
"1");
1175 ast_test_validate(
test, NULL != test_message1_1);
1176 test_message2_1 = cache_test_message_create(cache_type,
"2",
"1");
1177 ast_test_validate(
test, NULL != test_message2_1);
1182 actual_len = consumer_wait_for(consumer, 2);
1183 ast_test_validate(
test, 2 == actual_len);
1186 ao2_cleanup(cache_dump);
1188 ast_test_validate(
test, NULL != cache_dump);
1191 while ((obj = ao2_iterator_next(&i))) {
1193 ast_test_validate(
test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
1198 test_message2_2 = cache_test_message_create(cache_type,
"2",
"2");
1199 ast_test_validate(
test, NULL != test_message2_2);
1202 actual_len = consumer_wait_for(consumer, 3);
1203 ast_test_validate(
test, 3 == actual_len);
1206 ao2_cleanup(cache_dump);
1208 ast_test_validate(
test, NULL != cache_dump);
1211 while ((obj = ao2_iterator_next(&i))) {
1213 ast_test_validate(
test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1219 ast_test_validate(
test, NULL != test_message1_clear);
1222 actual_len = consumer_wait_for(consumer, 4);
1223 ast_test_validate(
test, 4 == actual_len);
1226 ao2_cleanup(cache_dump);
1228 ast_test_validate(
test, NULL != cache_dump);
1231 while ((obj = ao2_iterator_next(&i))) {
1233 ast_test_validate(
test, actual_cache_entry == test_message2_2);
1238 ao2_cleanup(cache_dump);
1242 return AST_TEST_PASS;
1251 RAII_VAR(
struct consumer *, cache_consumer, NULL, ao2_cleanup);
1252 RAII_VAR(
struct consumer *, topic_consumer, NULL, ao2_cleanup);
1271 info->name = __func__;
1272 info->category = test_category;
1273 info->summary =
"Test cache eid and aggregate support.";
1274 info->description =
"Test cache eid and aggregate support.";
1275 return AST_TEST_NOT_RUN;
1280 memset(&foreign_eid1, 0xAA,
sizeof(foreign_eid1));
1281 memset(&foreign_eid2, 0xBB,
sizeof(foreign_eid2));
1284 ast_test_validate(
test, NULL != cache_type);
1287 ast_test_validate(
test, NULL != topic);
1290 topic_consumer = consumer_create(1);
1291 ast_test_validate(
test, NULL != topic_consumer);
1293 topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
1294 ast_test_validate(
test, NULL != topic_sub);
1298 cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
1299 ast_test_validate(
test, NULL != cache);
1302 ast_test_validate(
test, NULL != caching_topic);
1305 cache_consumer = consumer_create(1);
1306 ast_test_validate(
test, NULL != cache_consumer);
1309 ast_test_validate(
test, NULL != cache_sub);
1313 test_message1_1 = cache_test_message_create_full(cache_type,
"1",
"1", &
ast_eid_default);
1314 ast_test_validate(
test, NULL != test_message1_1);
1315 test_message2_1 = cache_test_message_create_full(cache_type,
"2",
"1", &
ast_eid_default);
1316 ast_test_validate(
test, NULL != test_message2_1);
1317 test_message2_2 = cache_test_message_create_full(cache_type,
"2",
"2", &foreign_eid1);
1318 ast_test_validate(
test, NULL != test_message2_2);
1319 test_message2_3 = cache_test_message_create_full(cache_type,
"2",
"3", &foreign_eid2);
1320 ast_test_validate(
test, NULL != test_message2_3);
1321 test_message2_4 = cache_test_message_create_full(cache_type,
"2",
"4", &foreign_eid2);
1322 ast_test_validate(
test, NULL != test_message2_4);
1326 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"1",
"1"));
1328 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"2",
"1"));
1330 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"2",
"3"));
1332 actual_len = consumer_wait_for(cache_consumer, 6);
1333 ast_test_validate(
test, 6 == actual_len);
1334 actual_len = consumer_wait_for(topic_consumer, 6);
1335 ast_test_validate(
test, 6 == actual_len);
1338 ao2_cleanup(cache_dump);
1340 ast_test_validate(
test, NULL != cache_dump);
1343 while ((obj = ao2_iterator_next(&i))) {
1346 ast_test_validate(
test,
1347 actual_cache_entry == test_message1_1
1348 || actual_cache_entry == test_message2_1
1349 || actual_cache_entry == test_message2_2);
1354 ao2_cleanup(cache_dump);
1356 ast_test_validate(
test, NULL != cache_dump);
1359 while ((obj = ao2_iterator_next(&i))) {
1362 ast_test_validate(
test,
1363 actual_cache_entry == test_message1_1
1364 || actual_cache_entry == test_message2_1);
1370 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"2",
"6"));
1372 actual_len = consumer_wait_for(cache_consumer, 8);
1373 ast_test_validate(
test, 8 == actual_len);
1374 actual_len = consumer_wait_for(topic_consumer, 8);
1375 ast_test_validate(
test, 8 == actual_len);
1378 ao2_cleanup(cache_dump);
1380 ast_test_validate(
test, NULL != cache_dump);
1383 while ((obj = ao2_iterator_next(&i))) {
1386 ast_test_validate(
test,
1387 actual_cache_entry == test_message1_1
1388 || actual_cache_entry == test_message2_1
1389 || actual_cache_entry == test_message2_2
1390 || actual_cache_entry == test_message2_3);
1395 ao2_cleanup(cache_dump);
1397 ast_test_validate(
test, NULL != cache_dump);
1400 while ((obj = ao2_iterator_next(&i))) {
1403 ast_test_validate(
test, actual_cache_entry == test_message2_2);
1409 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"2",
"7"));
1411 actual_len = consumer_wait_for(cache_consumer, 10);
1412 ast_test_validate(
test, 10 == actual_len);
1413 actual_len = consumer_wait_for(topic_consumer, 10);
1414 ast_test_validate(
test, 10 == actual_len);
1417 ao2_cleanup(cache_dump);
1419 ast_test_validate(
test, NULL != cache_dump);
1422 while ((obj = ao2_iterator_next(&i))) {
1425 ast_test_validate(
test,
1426 actual_cache_entry == test_message1_1
1427 || actual_cache_entry == test_message2_1
1428 || actual_cache_entry == test_message2_2
1429 || actual_cache_entry == test_message2_4);
1434 ao2_cleanup(cache_dump);
1436 ast_test_validate(
test, NULL != cache_dump);
1439 while ((obj = ao2_iterator_next(&i))) {
1442 ast_test_validate(
test,
1443 actual_cache_entry == test_message2_1
1444 || actual_cache_entry == test_message2_2
1445 || actual_cache_entry == test_message2_4);
1451 ast_test_validate(
test, NULL != test_message1_clear);
1453 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"1", NULL));
1455 actual_len = consumer_wait_for(cache_consumer, 12);
1456 ast_test_validate(
test, 12 == actual_len);
1457 actual_len = consumer_wait_for(topic_consumer, 11);
1458 ast_test_validate(
test, 11 == actual_len);
1461 ao2_cleanup(cache_dump);
1463 ast_test_validate(
test, NULL != cache_dump);
1466 while ((obj = ao2_iterator_next(&i))) {
1469 ast_test_validate(
test,
1470 actual_cache_entry == test_message2_1
1471 || actual_cache_entry == test_message2_2
1472 || actual_cache_entry == test_message2_4);
1478 ast_test_validate(
test, NULL != test_message2_clear);
1480 ast_test_validate(
test, check_cache_aggregate(cache, cache_type,
"2",
"5"));
1482 actual_len = consumer_wait_for(cache_consumer, 14);
1483 ast_test_validate(
test, 14 == actual_len);
1484 actual_len = consumer_wait_for(topic_consumer, 13);
1485 ast_test_validate(
test, 13 == actual_len);
1488 ao2_cleanup(cache_dump);
1490 ast_test_validate(
test, NULL != cache_dump);
1493 while ((obj = ao2_iterator_next(&i))) {
1496 ast_test_validate(
test,
1497 actual_cache_entry == test_message2_1
1498 || actual_cache_entry == test_message2_4);
1502 return AST_TEST_PASS;
1509 RAII_VAR(
char *, test_data, NULL, ao2_cleanup);
1513 RAII_VAR(
struct consumer *, consumer1, NULL, ao2_cleanup);
1514 RAII_VAR(
struct consumer *, consumer2, NULL, ao2_cleanup);
1515 RAII_VAR(
struct consumer *, consumer3, NULL, ao2_cleanup);
1519 int actual_len, ret;
1524 info->name = __func__;
1525 info->category = test_category;
1526 info->summary =
"Test simple message routing";
1527 info->description =
"Test simple message routing";
1528 return AST_TEST_NOT_RUN;
1534 ast_test_validate(
test, NULL != topic);
1536 consumer1 = consumer_create(1);
1537 ast_test_validate(
test, NULL != consumer1);
1538 consumer2 = consumer_create(1);
1539 ast_test_validate(
test, NULL != consumer2);
1540 consumer3 = consumer_create(1);
1541 ast_test_validate(
test, NULL != consumer3);
1544 ast_test_validate(
test, NULL != test_message_type1);
1546 ast_test_validate(
test, NULL != test_message_type2);
1548 ast_test_validate(
test, NULL != test_message_type3);
1550 uut = stasis_message_router_create(topic);
1551 ast_test_validate(
test, NULL != uut);
1554 uut, test_message_type1, consumer_exec, consumer1);
1555 ast_test_validate(
test, 0 == ret);
1558 uut, test_message_type2, consumer_exec, consumer2);
1559 ast_test_validate(
test, 0 == ret);
1562 ast_test_validate(
test, 0 == ret);
1565 test_data = ao2_alloc(1, NULL);
1566 ast_test_validate(
test, NULL != test_data);
1568 ast_test_validate(
test, NULL != test_message1);
1570 ast_test_validate(
test, NULL != test_message2);
1572 ast_test_validate(
test, NULL != test_message3);
1578 actual_len = consumer_wait_for(consumer1, 1);
1579 ast_test_validate(
test, 1 == actual_len);
1580 actual_len = consumer_wait_for(consumer2, 1);
1581 ast_test_validate(
test, 1 == actual_len);
1582 actual_len = consumer_wait_for(consumer3, 1);
1583 ast_test_validate(
test, 1 == actual_len);
1585 actual = consumer1->messages_rxed[0];
1586 ast_test_validate(
test, test_message1 == actual);
1588 actual = consumer2->messages_rxed[0];
1589 ast_test_validate(
test, test_message2 == actual);
1591 actual = consumer3->messages_rxed[0];
1592 ast_test_validate(
test, test_message3 == actual);
1595 ao2_cleanup(consumer1);
1596 ao2_cleanup(consumer2);
1598 return AST_TEST_PASS;
1605 RAII_VAR(
char *, test_data, NULL, ao2_cleanup);
1609 RAII_VAR(
struct consumer *, consumer1, NULL, ao2_cleanup);
1610 RAII_VAR(
struct consumer *, consumer2, NULL, ao2_cleanup);
1611 RAII_VAR(
struct consumer *, consumer3, NULL, ao2_cleanup);
1615 int actual_len, ret;
1620 info->name = __func__;
1621 info->category = test_category;
1622 info->summary =
"Test message routing via threadpool";
1623 info->description =
"Test simple message routing when\n"
1624 "the subscriptions dictate usage of the Stasis\n"
1626 return AST_TEST_NOT_RUN;
1632 ast_test_validate(
test, NULL != topic);
1634 consumer1 = consumer_create(1);
1635 ast_test_validate(
test, NULL != consumer1);
1636 consumer2 = consumer_create(1);
1637 ast_test_validate(
test, NULL != consumer2);
1638 consumer3 = consumer_create(1);
1639 ast_test_validate(
test, NULL != consumer3);
1642 ast_test_validate(
test, NULL != test_message_type1);
1644 ast_test_validate(
test, NULL != test_message_type2);
1646 ast_test_validate(
test, NULL != test_message_type3);
1648 uut = stasis_message_router_create_pool(topic);
1649 ast_test_validate(
test, NULL != uut);
1652 uut, test_message_type1, consumer_exec, consumer1);
1653 ast_test_validate(
test, 0 == ret);
1656 uut, test_message_type2, consumer_exec, consumer2);
1657 ast_test_validate(
test, 0 == ret);
1660 ast_test_validate(
test, 0 == ret);
1663 test_data = ao2_alloc(1, NULL);
1664 ast_test_validate(
test, NULL != test_data);
1666 ast_test_validate(
test, NULL != test_message1);
1668 ast_test_validate(
test, NULL != test_message2);
1670 ast_test_validate(
test, NULL != test_message3);
1676 actual_len = consumer_wait_for(consumer1, 1);
1677 ast_test_validate(
test, 1 == actual_len);
1678 actual_len = consumer_wait_for(consumer2, 1);
1679 ast_test_validate(
test, 1 == actual_len);
1680 actual_len = consumer_wait_for(consumer3, 1);
1681 ast_test_validate(
test, 1 == actual_len);
1683 actual = consumer1->messages_rxed[0];
1684 ast_test_validate(
test, test_message1 == actual);
1686 actual = consumer2->messages_rxed[0];
1687 ast_test_validate(
test, test_message2 == actual);
1689 actual = consumer3->messages_rxed[0];
1690 ast_test_validate(
test, test_message3 == actual);
1693 ao2_cleanup(consumer1);
1694 ao2_cleanup(consumer2);
1696 return AST_TEST_PASS;
1701 const char *type_name =
1719 RAII_VAR(
char *, test_data, NULL, ao2_cleanup);
1723 RAII_VAR(
struct consumer *, consumer1, NULL, ao2_cleanup);
1724 RAII_VAR(
struct consumer *, consumer2, NULL, ao2_cleanup);
1725 RAII_VAR(
struct consumer *, consumer3, NULL, ao2_cleanup);
1729 int actual_len, ret;
1734 info->name = __func__;
1735 info->category = test_category;
1736 info->summary =
"Test special handling cache_update messages";
1737 info->description =
"Test special handling cache_update messages";
1738 return AST_TEST_NOT_RUN;
1744 ast_test_validate(
test, NULL != topic);
1747 ast_test_validate(
test, NULL != cache);
1749 ast_test_validate(
test, NULL != caching_topic);
1751 consumer1 = consumer_create(1);
1752 ast_test_validate(
test, NULL != consumer1);
1753 consumer2 = consumer_create(1);
1754 ast_test_validate(
test, NULL != consumer2);
1755 consumer3 = consumer_create(1);
1756 ast_test_validate(
test, NULL != consumer3);
1759 ast_test_validate(
test, NULL != test_message_type1);
1761 ast_test_validate(
test, NULL != test_message_type2);
1763 ast_test_validate(
test, NULL != test_message_type3);
1765 uut = stasis_message_router_create(
1767 ast_test_validate(
test, NULL != uut);
1770 uut, test_message_type1, consumer_exec, consumer1);
1771 ast_test_validate(
test, 0 == ret);
1775 ast_test_validate(
test, 0 == ret);
1778 ast_test_validate(
test, 0 == ret);
1781 test_data = ao2_alloc(1, NULL);
1782 ast_test_validate(
test, NULL != test_data);
1784 ast_test_validate(
test, NULL != test_message1);
1786 ast_test_validate(
test, NULL != test_message2);
1788 ast_test_validate(
test, NULL != test_message3);
1794 actual_len = consumer_wait_for(consumer1, 1);
1795 ast_test_validate(
test, 1 == actual_len);
1796 actual_len = consumer_wait_for(consumer2, 1);
1797 ast_test_validate(
test, 1 == actual_len);
1799 actual_len = consumer_should_stay(consumer3, 0);
1800 ast_test_validate(
test, 0 == actual_len);
1802 actual = consumer1->messages_rxed[0];
1805 ast_test_validate(
test, test_message_type1 == update->
type);
1808 actual = consumer2->messages_rxed[0];
1811 ast_test_validate(
test, test_message_type2 == update->
type);
1815 ao2_cleanup(consumer1);
1816 ao2_cleanup(consumer2);
1818 return AST_TEST_PASS;
1825 RAII_VAR(
char *, data, NULL, ao2_cleanup);
1827 char *expected =
"SomeData";
1831 info->name = __func__;
1832 info->category = test_category;
1833 info->summary =
"Test message to_json function";
1834 info->description =
"Test message to_json function";
1835 return AST_TEST_NOT_RUN;
1842 ast_test_validate(
test, NULL == actual);
1847 data = ao2_alloc(strlen(expected) + 1, NULL);
1848 strcpy(data, expected);
1850 ast_test_validate(
test, NULL != uut);
1853 ast_test_validate(
test, NULL == actual);
1855 return AST_TEST_PASS;
1862 RAII_VAR(
char *, data, NULL, ao2_cleanup);
1864 const char *expected_text =
"SomeData";
1869 info->name = __func__;
1870 info->category = test_category;
1871 info->summary =
"Test message to_json function when NULL";
1872 info->description =
"Test message to_json function when NULL";
1873 return AST_TEST_NOT_RUN;
1880 data = ao2_alloc(strlen(expected_text) + 1, NULL);
1881 strcpy(data, expected_text);
1883 ast_test_validate(
test, NULL != uut);
1889 return AST_TEST_PASS;
1896 RAII_VAR(
char *, data, NULL, ao2_cleanup);
1898 char *expected =
"SomeData";
1902 info->name = __func__;
1903 info->category = test_category;
1904 info->summary =
"Test message to_ami function when NULL";
1905 info->description =
"Test message to_ami function when NULL";
1906 return AST_TEST_NOT_RUN;
1913 ast_test_validate(
test, NULL == actual);
1918 data = ao2_alloc(strlen(expected) + 1, NULL);
1919 strcpy(data, expected);
1921 ast_test_validate(
test, NULL != uut);
1924 ast_test_validate(
test, NULL == actual);
1926 return AST_TEST_PASS;
1933 RAII_VAR(
char *, data, NULL, ao2_cleanup);
1935 const char *expected_text =
"SomeData";
1936 const char *expected =
"Message: SomeData\r\n";
1940 info->name = __func__;
1941 info->category = test_category;
1942 info->summary =
"Test message to_ami function";
1943 info->description =
"Test message to_ami function";
1944 return AST_TEST_NOT_RUN;
1951 data = ao2_alloc(strlen(expected_text) + 1, NULL);
1952 strcpy(data, expected_text);
1954 ast_test_validate(
test, NULL != uut);
1957 ast_test_validate(
test, strcmp(expected, actual->extra_fields) == 0);
1959 return AST_TEST_PASS;
1975 info->name = __func__;
1976 info->category = test_category;
1977 info->summary =
"Test that destruction order doesn't bomb stuff";
1978 info->description =
"Test that destruction order doesn't bomb stuff";
1979 return AST_TEST_NOT_RUN;
1985 ast_test_validate(
test, NULL != topic);
1987 sub = stasis_subscribe(topic, noop, NULL);
1988 ast_test_validate(
test, NULL != sub);
1998 return AST_TEST_PASS;
2016 info->name = __func__;
2017 info->category = test_category;
2018 info->summary =
"Test that destruction order doesn't bomb stuff";
2019 info->description =
"Test that destruction order doesn't bomb stuff";
2020 return AST_TEST_NOT_RUN;
2026 ast_test_validate(
test, NULL != cache);
2029 ast_test_validate(
test, NULL != topic);
2032 ast_test_validate(
test, NULL != caching_topic);
2036 ast_test_validate(
test, NULL != sub);
2047 caching_topic = NULL;
2050 return AST_TEST_PASS;
2065 static void destroy_message_types(
void *obj)
2069 ao2_cleanup(types->none);
2070 ao2_cleanup(types->ami);
2071 ao2_cleanup(types->json);
2072 ao2_cleanup(types->event);
2073 ao2_cleanup(types->amievent);
2074 ao2_cleanup(types->type1);
2075 ao2_cleanup(types->type2);
2076 ao2_cleanup(types->type3);
2084 enum ast_test_result_state __attribute__ ((unused)) rc;
2086 types = ao2_alloc(sizeof(*types), destroy_message_types);
2091 ast_test_validate_cleanup(test,
2095 vtable.
to_ami = fake_ami;
2096 ast_test_validate_cleanup(test,
2102 ast_test_validate_cleanup(test,
2109 ast_test_validate_cleanup(test,
2113 vtable.
to_ami = fake_ami;
2114 ast_test_validate_cleanup(test,
2118 ast_test_validate_cleanup(test,
2122 ast_test_validate_cleanup(test,
2126 ast_test_validate_cleanup(test,
2140 struct consumer *consumer;
2145 static void destroy_cts(
void *obj)
2147 struct cts *c = obj;
2150 ao2_cleanup(c->topic);
2151 ao2_cleanup(c->consumer);
2154 static struct cts *create_cts(
struct ast_test *test)
2156 struct cts *
cts = ao2_alloc(
sizeof(*cts), destroy_cts);
2157 enum ast_test_result_state __attribute__ ((unused)) rc;
2159 ast_test_validate_cleanup(test, cts, rc,
cleanup);
2162 ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
2164 cts->consumer = consumer_create(0);
2165 ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
2168 cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
2169 ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
2187 return (strcmp(data, msg_data->
description) == 0);
2193 static void dump_consumer(
struct ast_test *test,
struct cts *cts)
2198 ast_test_status_update(test,
"Messages received: %zu Final? %s\n", cts->consumer->messages_rxed_len,
2199 cts->consumer->complete ?
"yes" :
"no");
2200 for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
2202 ast_test_status_update(test,
"Message type received: %s %s\n",
2208 static int send_msg(
struct ast_test *test,
struct cts *cts,
struct stasis_message_type *msg_type,
2213 ao2_alloc(
sizeof(*test_data) + (data ? strlen(data) : strlen(
"no data")) + 1, NULL);
2223 ast_test_status_update(test,
"Unable to create %s message\n",
2236 RAII_VAR(
struct cts *, cts, NULL, ao2_cleanup);
2242 info->name = __func__;
2243 info->category = test_category
"filtering/";
2244 info->summary =
"Test message filtering by type";
2245 info->description =
"Test message filtering by type";
2246 return AST_TEST_NOT_RUN;
2251 types = create_message_types(test);
2252 ast_test_validate(test, NULL != types);
2254 cts = create_cts(test);
2255 ast_test_validate(test, NULL != cts);
2263 ast_test_validate(test, send_msg(test, cts, types->type1,
"Pass"));
2264 ast_test_validate(test, send_msg(test, cts, types->type2,
"Pass"));
2266 ast_test_validate(test, send_msg(test, cts, types->type3,
"FAIL"));
2269 consumer_wait_for(cts->consumer, 3);
2275 ast_test_validate(test, send_msg(test, cts, types->type1,
"FAIL"));
2277 ast_test_validate(test, send_msg(test, cts, types->type2,
"Pass2"));
2279 ast_test_validate(test, send_msg(test, cts, types->type3,
"FAIL"));
2282 consumer_wait_for(cts->consumer, 4);
2286 consumer_wait_for_completion(cts->consumer);
2288 dump_consumer(test, cts);
2290 ast_test_validate(test, 1 == cts->consumer->complete);
2291 ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
2292 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change,
"Subscribe"));
2293 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1,
"Pass"));
2294 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2,
"Pass"));
2295 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2,
"Pass2"));
2296 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change,
"Unsubscribe"));
2298 return AST_TEST_PASS;
2303 RAII_VAR(
struct cts *, cts, NULL, ao2_cleanup);
2309 info->name = __func__;
2310 info->category = test_category
"filtering/";
2311 info->summary =
"Test message filtering by formatter";
2312 info->description =
"Test message filtering by formatter";
2313 return AST_TEST_NOT_RUN;
2318 types = create_message_types(test);
2319 ast_test_validate(test, NULL != types);
2321 cts = create_cts(test);
2322 ast_test_validate(test, NULL != cts);
2328 ast_test_validate(test, send_msg(test, cts, types->ami,
"Pass"));
2329 ast_test_validate(test, send_msg(test, cts, types->json,
"Pass"));
2330 ast_test_validate(test, send_msg(test, cts, types->amievent,
"Pass"));
2333 ast_test_validate(test, send_msg(test, cts, types->none,
"FAIL"));
2334 ast_test_validate(test, send_msg(test, cts, types->event,
"FAIL"));
2335 ast_test_validate(test, send_msg(test, cts, types->type1,
"FAIL"));
2338 consumer_wait_for(cts->consumer, 4);
2344 ast_test_validate(test, send_msg(test, cts, types->ami,
"FAIL"));
2345 ast_test_validate(test, send_msg(test, cts, types->json,
"FAIL"));
2347 ast_test_validate(test, send_msg(test, cts, types->amievent,
"Pass2"));
2349 ast_test_validate(test, send_msg(test, cts, types->event,
"Pass"));
2352 consumer_wait_for(cts->consumer, 6);
2356 consumer_wait_for_completion(cts->consumer);
2358 dump_consumer(test, cts);
2360 ast_test_validate(test, 1 == cts->consumer->complete);
2361 ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
2362 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change,
"Subscribe"));
2363 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami,
"Pass"));
2364 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json,
"Pass"));
2365 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent,
"Pass"));
2366 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent,
"Pass2"));
2367 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event,
"Pass"));
2368 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change,
"Unsubscribe"));
2370 return AST_TEST_PASS;
2375 RAII_VAR(
struct cts *, cts, NULL, ao2_cleanup);
2381 info->name = __func__;
2382 info->category = test_category
"filtering/";
2383 info->summary =
"Test message filtering by type and formatter";
2384 info->description =
"Test message filtering by type and formatter";
2385 return AST_TEST_NOT_RUN;
2390 types = create_message_types(test);
2391 ast_test_validate(test, NULL != types);
2393 cts = create_cts(test);
2394 ast_test_validate(test, NULL != cts);
2404 ast_test_validate(test, send_msg(test, cts, types->type1,
"Pass"));
2405 ast_test_validate(test, send_msg(test, cts, types->type2,
"Pass"));
2406 ast_test_validate(test, send_msg(test, cts, types->ami,
"Pass"));
2407 ast_test_validate(test, send_msg(test, cts, types->amievent,
"Pass"));
2408 ast_test_validate(test, send_msg(test, cts, types->json,
"Pass"));
2411 ast_test_validate(test, send_msg(test, cts, types->type3,
"FAIL"));
2412 ast_test_validate(test, send_msg(test, cts, types->event,
"FAIL"));
2415 consumer_wait_for(cts->consumer, 6);
2419 consumer_wait_for_completion(cts->consumer);
2421 dump_consumer(test, cts);
2423 ast_test_validate(test, 1 == cts->consumer->complete);
2424 ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
2425 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change,
"Subscribe"));
2426 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1,
"Pass"));
2427 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2,
"Pass"));
2428 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami,
"Pass"));
2429 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent,
"Pass"));
2430 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json,
"Pass"));
2431 ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change,
"Unsubscribe"));
2433 return AST_TEST_PASS;
2436 static int unload_module(
void)
2438 AST_TEST_UNREGISTER(message_type);
2439 AST_TEST_UNREGISTER(message);
2440 AST_TEST_UNREGISTER(subscription_messages);
2441 AST_TEST_UNREGISTER(subscription_pool_messages);
2442 AST_TEST_UNREGISTER(publish);
2443 AST_TEST_UNREGISTER(publish_sync);
2444 AST_TEST_UNREGISTER(publish_pool);
2445 AST_TEST_UNREGISTER(unsubscribe_stops_messages);
2446 AST_TEST_UNREGISTER(forward);
2447 AST_TEST_UNREGISTER(cache_filter);
2448 AST_TEST_UNREGISTER(cache);
2449 AST_TEST_UNREGISTER(cache_dump);
2450 AST_TEST_UNREGISTER(cache_eid_aggregate);
2451 AST_TEST_UNREGISTER(router);
2452 AST_TEST_UNREGISTER(router_pool);
2453 AST_TEST_UNREGISTER(router_cache_updates);
2454 AST_TEST_UNREGISTER(interleaving);
2455 AST_TEST_UNREGISTER(subscription_interleaving);
2456 AST_TEST_UNREGISTER(no_to_json);
2457 AST_TEST_UNREGISTER(to_json);
2458 AST_TEST_UNREGISTER(no_to_ami);
2459 AST_TEST_UNREGISTER(to_ami);
2460 AST_TEST_UNREGISTER(dtor_order);
2461 AST_TEST_UNREGISTER(caching_dtor_order);
2462 AST_TEST_UNREGISTER(type_filters);
2463 AST_TEST_UNREGISTER(formatter_filters);
2464 AST_TEST_UNREGISTER(combo_filters);
2468 static int load_module(
void)
2470 AST_TEST_REGISTER(message_type);
2471 AST_TEST_REGISTER(message);
2472 AST_TEST_REGISTER(subscription_messages);
2473 AST_TEST_REGISTER(subscription_pool_messages);
2474 AST_TEST_REGISTER(publish);
2475 AST_TEST_REGISTER(publish_sync);
2476 AST_TEST_REGISTER(publish_pool);
2477 AST_TEST_REGISTER(unsubscribe_stops_messages);
2478 AST_TEST_REGISTER(forward);
2479 AST_TEST_REGISTER(cache_filter);
2480 AST_TEST_REGISTER(cache);
2481 AST_TEST_REGISTER(cache_dump);
2482 AST_TEST_REGISTER(cache_eid_aggregate);
2483 AST_TEST_REGISTER(router);
2484 AST_TEST_REGISTER(router_pool);
2485 AST_TEST_REGISTER(router_cache_updates);
2486 AST_TEST_REGISTER(interleaving);
2487 AST_TEST_REGISTER(subscription_interleaving);
2488 AST_TEST_REGISTER(no_to_json);
2489 AST_TEST_REGISTER(to_json);
2490 AST_TEST_REGISTER(no_to_ami);
2491 AST_TEST_REGISTER(to_ami);
2492 AST_TEST_REGISTER(dtor_order);
2493 AST_TEST_REGISTER(caching_dtor_order);
2494 AST_TEST_REGISTER(type_filters);
2495 AST_TEST_REGISTER(formatter_filters);
2496 AST_TEST_REGISTER(combo_filters);
2501 .support_level = AST_MODULE_SUPPORT_CORE,
2502 .load = load_module,
2503 .unload = unload_module
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
Struct containing info for an AMI event to send out.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
struct stasis_message * old_snapshot
Old value from the cache.
Virtual table providing methods for messages.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
void ast_json_unref(struct ast_json *value)
Decrease refcount on value. If refcount reaches zero, value is freed.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
Description Used by: AST_EVENT_SUB, AST_EVENT_UNSUB Payload type: STR.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are not interested in a message type.
enum stasis_message_type_result stasis_message_type_create(const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
Create a new message type.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
#define ast_strdup(str)
A wrapper for strdup()
int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route for stasis_cache_update messages to a message router.
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
struct ast_event *(* to_event)(struct stasis_message *message)
Build the ast_event representation of the message.
An Entity ID is essentially a MAC address, brief and unique.
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
struct ast_manager_event_blob * ast_manager_event_blob_create(int event_flags, const char *manager_event, const char *extra_fields_fmt,...)
Construct a ast_manager_event_blob.
void stasis_message_router_unsubscribe_and_join(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic, blocking until the final message has been processed...
Structure containing callbacks for Stasis message sanitization.
struct ast_json * stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
struct ast_manager_event_blob * stasis_message_to_ami(struct stasis_message *msg)
Build the AMI representation of the message.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
struct stasis_message_type * type
struct ast_json *(* to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
Build the JSON representation of the message.
static void cleanup(void)
Clean up any old apps that we don't need any more.
struct ast_json * ast_json_string_create(const char *value)
Construct a JSON string from value.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
const struct timeval * stasis_message_timestamp(const struct stasis_message *msg)
Get the time when a message was created.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
#define ast_malloc(len)
A wrapper for malloc()
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, enum stasis_subscription_message_formatters formatters)
Indicate to a subscription that we are interested in messages with one or more formatters.
struct stasis_topic * topic
struct stasis_message * new_snapshot
New value.
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_message_type * type
Convenience reference to snapshot type.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Sorcery object created based on backend data.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
struct ast_manager_event_blob *(* to_ami)(struct stasis_message *message)
Build the AMI representation of the message.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Holds details about changes to subscriptions for the specified topic.
struct ast_eid ast_eid_default
Global EID.
void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
struct ast_event * ast_event_new(enum ast_event_type event_type,...)
Create a new event.
int ast_json_equal(const struct ast_json *lhs, const struct ast_json *rhs)
Compare two JSON objects.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
#define AST_TEST_DEFINE(hdr)
int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data)
Sets the default route of a router.
Abstract JSON element (object, array, string, int, ...).
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
static struct sorcery_test_caching cache
Global scope caching structure for testing.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
#define ASTERISK_GPL_KEY
The text the key() function should return.
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
Asterisk module definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
const char * stasis_subscription_uniqueid(const struct stasis_subscription *sub)
Get the unique ID for the subscription.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.