39 static void stasis_message_sink_dtor(
void *obj)
48 ast_cond_wait(&sink->
cond, &sink->
lock);
52 ast_mutex_destroy(&sink->
lock);
53 ast_cond_destroy(&sink->
cond);
63 static struct timespec make_deadline(int timeout_millis)
66 struct timeval delta = {
67 .tv_sec = timeout_millis / 1000,
68 .tv_usec = (timeout_millis % 1000) * 1000,
70 struct timeval deadline_tv =
ast_tvadd(start, delta);
71 struct timespec deadline = {
72 .tv_sec = deadline_tv.tv_sec,
73 .tv_nsec = 1000 * deadline_tv.tv_usec,
83 sink = ao2_alloc(
sizeof(*sink), stasis_message_sink_dtor);
87 ast_mutex_init(&sink->
lock);
88 ast_cond_init(&sink->
cond, NULL);
117 ast_cond_signal(&sink->
cond);
130 sizeof(*new_messages) * new_max_messages);
140 ast_cond_signal(&sink->
cond);
150 int num_messages,
int timeout_millis)
152 struct timespec deadline = make_deadline(timeout_millis);
156 int r = ast_cond_timedwait(&sink->
cond, &sink->
lock, &deadline);
158 if (r == ETIMEDOUT) {
162 ast_log(LOG_ERROR,
"Unexpected condition error: %s\n",
171 int num_messages,
int timeout_millis)
173 struct timespec deadline = make_deadline(timeout_millis);
177 int r = ast_cond_timedwait(&sink->
cond, &sink->
lock, &deadline);
179 if (r == ETIMEDOUT) {
183 ast_log(LOG_ERROR,
"Unexpected condition error: %s\n",
192 stasis_wait_cb cmp_cb,
const void *data,
int timeout_millis)
194 struct timespec deadline = make_deadline(timeout_millis);
200 int r = ast_cond_timedwait(&sink->
cond, &sink->
lock, &deadline);
202 if (r == ETIMEDOUT) {
207 ast_log(LOG_ERROR,
"Unexpected condition error: %s\n",
214 while (!cmp_cb(sink->
messages[start], data)) {
218 int r = ast_cond_timedwait(&sink->
cond,
219 &sink->
lock, &deadline);
221 if (r == ETIMEDOUT) {
226 "Unexpected condition error: %s\n",
245 data = ao2_alloc(1, NULL);
253 static int unload_module(
void)
259 static int load_module(
void)
268 AST_MODULE_INFO(
ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER,
"Stasis test utilities",
269 .support_level = AST_MODULE_SUPPORT_CORE,
271 .unload = unload_module,
Structure that collects messages from a topic.
Asterisk main include file. File version handling, generic pbx functions.
#define ast_realloc(p, len)
A wrapper for realloc()
struct stasis_message ** messages
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
struct stasis_message_type * stasis_test_message_type(void)
Gets the type of messages created by stasis_test_message_create().
int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Wait for a sink's num_messages field to reach a certain level.
struct stasis_message_sink * stasis_message_sink_create(void)
Create a message sink.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
stasis_subscription_cb stasis_message_sink_cb(void)
Topic callback to receive messages.
int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start, stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
Wait for a message that matches the given criteria.
struct stasis_message * stasis_test_message_create(void)
Creates a test message.
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ast_malloc(len)
A wrapper for malloc()
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
int stasis_message_sink_should_stay(struct stasis_message_sink *sink, int num_messages, int timeout_millis)
Ensures that no new messages are received.
Module has failed to load, may be in an inconsistent state.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
static void message_sink_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Implementation of the stasis_message_sink_cb() callback.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Test infrastructure for dealing with Stasis.