32 #define test_category "/stasis/core/state/"
34 #define TOPIC_COUNT 500
36 #define MANAGER_TOPIC "foo"
55 static size_t sum_total;
58 static size_t running_total;
61 static int expect_null;
63 static int validate_data(
const char *
id,
struct foo_data *foo)
69 ast_log(LOG_ERROR,
"Unable to convert the state's id '%s' to numeric\n",
id);
81 ast_log(LOG_ERROR,
"Expected state data for '%s'\n",
id);
86 ast_log(LOG_ERROR,
"Expected NULL state data for '%s'\n",
id);
90 if (foo->bar != num) {
91 ast_log(LOG_ERROR,
"Unexpected state data for '%s'\n",
id);
101 validate_data(
id, foo);
107 .on_unsubscribe = handle_validate
117 running_total = expect_null = 0;
124 if (running_total != sum_total) {
125 ast_log(LOG_ERROR,
"Failed to destroy all subscriptions: running=%zu, sum=%zu\n",
126 running_total, sum_total);
143 sum_total = running_total = 0;
146 for (i = 0; i < TOPIC_COUNT; ++i) {
150 if (snprintf(
id, 10,
"%zu", i) == -1) {
151 ast_log(LOG_ERROR,
"Unable to convert subscriber id to string\n");
157 ast_log(LOG_ERROR,
"Failed to create a state subscriber for id '%s'\n",
id);
163 ast_log(LOG_ERROR,
"Failed to add to foo_sub to vector for id '%s'\n",
id);
171 if (i != TOPIC_COUNT || running_total != sum_total) {
172 ast_log(LOG_ERROR,
"Failed to create all subscriptions: running=%zu, sum=%zu\n",
173 running_total, sum_total);
174 subscriptions_destroy(manager, subs);
192 for (i = 0; i < TOPIC_COUNT; ++i) {
196 if (snprintf(
id, 10,
"%zu", i) == -1) {
197 ast_log(LOG_ERROR,
"Unable to convert publisher id to string\n");
216 for (i = 0; i < TOPIC_COUNT; ++i) {
220 if (snprintf(
id, 10,
"%zu", i) == -1) {
221 ast_log(LOG_ERROR,
"Unable to convert publisher id to string\n");
228 ast_log(LOG_ERROR,
"Failed to create a state publisher for id '%s'\n",
id);
233 ast_log(LOG_ERROR,
"Failed to add to publisher to vector for id '%s'\n",
id);
239 if (i != TOPIC_COUNT) {
240 ast_log(LOG_ERROR,
"Failed to create all publishers: count=%zu\n", i);
241 publishers_destroy(manager, pubs);
248 static struct stasis_message *create_foo_type_message(
const char *
id)
254 foo = ao2_alloc(
sizeof(*foo), NULL);
256 ast_log(LOG_ERROR,
"Failed to allocate foo data for '%s'\n",
id);
261 ast_log(LOG_ERROR,
"Unable to convert the state's id '%s' to numeric\n",
id);
265 foo->bar = (size_t) tmp;
269 ast_log(LOG_ERROR,
"Failed to create stasis message for '%s'\n",
id);
276 static int implicit_publish_cb(
const char *
id,
struct stasis_message *msg,
void *user_data)
281 if (validate_data(
id, foo)) {
285 msg = create_foo_type_message(
id);
297 static int explicit_publish_cb(
const char *
id,
struct stasis_message *msg,
void *user_data)
305 if (validate_data(
id, foo)) {
309 msg = create_foo_type_message(
id);
322 ast_log(LOG_ERROR,
"Unable to locate publisher for id '%s'\n",
id);
341 if (running_total != sum_total) {
342 ast_log(LOG_ERROR,
"Failed manager_callback (1): running=%zu, sum=%zu\n",
343 running_total, sum_total);
348 running_total = expect_null = 0;
351 if (running_total != sum_total) {
352 ast_log(LOG_ERROR,
"Failed manager_callback (2): running=%zu, sum=%zu\n",
353 running_total, sum_total);
364 int rc = AST_TEST_PASS;
368 info->name = __func__;
369 info->category = test_category;
370 info->summary =
"Test implicit publishing of stasis state";
371 info->description = info->summary;
372 return AST_TEST_NOT_RUN;
378 ast_test_validate(
test, manager != NULL);
380 ast_test_validate(
test, !subscriptions_create(manager, &subs));
382 ast_test_validate_cleanup(
test, !publish(manager, implicit_publish_cb, manager),
386 if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, NULL)) {
387 return AST_TEST_FAIL;
395 if (
ao2_ref(manager, 0) != 1) {
396 ast_log(LOG_ERROR,
"Memory leak - Too many references held on manager\n");
397 return AST_TEST_FAIL;
408 int rc = AST_TEST_PASS;
412 info->name = __func__;
413 info->category = test_category;
414 info->summary =
"Test explicit publishing of stasis state";
415 info->description = info->summary;
416 return AST_TEST_NOT_RUN;
422 ast_test_validate(
test, manager != NULL);
424 ast_test_validate(
test, !subscriptions_create(manager, &subs));
425 ast_test_validate_cleanup(
test, !publishers_create(manager, &pubs), rc,
cleanup);
427 ast_test_validate_cleanup(
test, !publish(manager, explicit_publish_cb, &pubs),
431 if (subscriptions_destroy(manager, &subs) || publishers_destroy(manager, &pubs)) {
432 return AST_TEST_FAIL;
440 if (
ao2_ref(manager, 0) != 1) {
441 ast_log(LOG_ERROR,
"Memory leak - Too many references held on manager\n");
442 return AST_TEST_FAIL;
448 static int unload_module(
void)
450 AST_TEST_UNREGISTER(implicit_publish);
451 AST_TEST_UNREGISTER(explicit_publish);
458 static int load_module(
void)
464 AST_TEST_REGISTER(implicit_publish);
465 AST_TEST_REGISTER(explicit_publish);
Managed stasis state event interface.
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
int ast_str_to_umax(const char *str, uintmax_t *res)
Convert the given string to an unsigned max size integer.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
void(* on_subscribe)(const char *id, struct stasis_state_subscriber *sub)
Raised when any managed state is being subscribed.
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
static void cleanup(void)
Clean up any old apps that we don't need any more.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
Conversion utility functions.
#define AST_VECTOR(name, type)
Define a vector structure.
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define AST_TEST_DEFINE(hdr)
#define ASTERISK_GPL_KEY
The text the key() function should return.
struct stasis_message * stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
Create a new message for an entity.
Asterisk module definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
#define AST_VECTOR_CALLBACK_VOID(vec, callback,...)
Execute a callback on every element in a vector disregarding callback return.