Asterisk - The Open Source Telephony Project  21.4.1
Functions | Variables
res_stasis_test.c File Reference

Test infrastructure for dealing with Stasis. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/stasis_test.h"

Go to the source code of this file.

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
static int load_module (void)
 
static struct timespec make_deadline (int timeout_millis)
 
static void message_sink_cb (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Implementation of the stasis_message_sink_cb() callback. More...
 
stasis_subscription_cb stasis_message_sink_cb (void)
 Topic callback to receive messages. More...
 
struct stasis_message_sinkstasis_message_sink_create (void)
 Create a message sink. More...
 
static void stasis_message_sink_dtor (void *obj)
 
int stasis_message_sink_should_stay (struct stasis_message_sink *sink, int num_messages, int timeout_millis)
 Ensures that no new messages are received. More...
 
int stasis_message_sink_wait_for (struct stasis_message_sink *sink, int start, stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
 Wait for a message that matches the given criteria. More...
 
int stasis_message_sink_wait_for_count (struct stasis_message_sink *sink, int num_messages, int timeout_millis)
 Wait for a sink's num_messages field to reach a certain level. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_test_message_type)
 
struct stasis_messagestasis_test_message_create (void)
 Creates a test message.
 
static int unload_module (void)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER , .description = "Stasis test utilities" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_APP_DEPEND, }
 
static const struct ast_module_infoast_module_info = &__mod_info
 

Detailed Description

Test infrastructure for dealing with Stasis.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file res_stasis_test.c.

Function Documentation

static void message_sink_cb ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
)
static

Implementation of the stasis_message_sink_cb() callback.

Why the roundabout way of exposing this via stasis_message_sink_cb()? Well, it has to do with how we previously loaded modules, using RTLD_LAZY.

The stasis_message_sink_cb() function gave us a layer of indirection so that the initial lazy binding would still work as expected.

Definition at line 108 of file res_stasis_test.c.

References ao2_ref, ast_realloc, stasis_message_sink::cond, stasis_message_sink::is_done, stasis_message_sink::lock, lock, stasis_message_sink::max_messages, stasis_message_sink::messages, stasis_message_sink::num_messages, SCOPED_MUTEX, stasis_message_type(), stasis_subscription_change_type(), and stasis_subscription_final_message().

Referenced by stasis_message_sink_cb().

110 {
111  struct stasis_message_sink *sink = data;
112 
113  SCOPED_MUTEX(lock, &sink->lock);
114 
115  if (stasis_subscription_final_message(sub, message)) {
116  sink->is_done = 1;
117  ast_cond_signal(&sink->cond);
118  return;
119  }
120 
122  /* Ignore subscription changes */
123  return;
124  }
125 
126  if (sink->num_messages == sink->max_messages) {
127  size_t new_max_messages = sink->max_messages * 2;
128  struct stasis_message **new_messages = ast_realloc(
129  sink->messages,
130  sizeof(*new_messages) * new_max_messages);
131  if (!new_messages) {
132  return;
133  }
134  sink->max_messages = new_max_messages;
135  sink->messages = new_messages;
136  }
137 
138  ao2_ref(message, +1);
139  sink->messages[sink->num_messages++] = message;
140  ast_cond_signal(&sink->cond);
141 }
Structure that collects messages from a topic.
Definition: stasis_test.h:44
#define ast_realloc(p, len)
A wrapper for realloc()
Definition: astmm.h:226
struct stasis_message ** messages
Definition: stasis_test.h:57
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
ast_mutex_t lock
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
Definition: stasis.c:1174
ast_mutex_t lock
Definition: stasis_test.h:46
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
stasis_subscription_cb stasis_message_sink_cb ( void  )

Topic callback to receive messages.

We return a function pointer instead of simply exposing the function because of the vagaries of dlopen(), RTLD_LAZY, and function pointers. See the comment on the implementation for details why.

Returns
Function pointer to stasis_message_sink's message handling function

Definition at line 143 of file res_stasis_test.c.

References message_sink_cb().

144 {
145  return message_sink_cb;
146 }
static void message_sink_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Implementation of the stasis_message_sink_cb() callback.
struct stasis_message_sink* stasis_message_sink_create ( void  )

Create a message sink.

This is an AO2 managed object, which you ao2_cleanup() when done. The destructor waits for an unsubscribe message to be received, to ensure the object isn't disposed of before the topic is finished.

Definition at line 79 of file res_stasis_test.c.

References ao2_ref, ast_malloc, stasis_message_sink::cond, stasis_message_sink::lock, stasis_message_sink::max_messages, stasis_message_sink::messages, and RAII_VAR.

80 {
81  RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
82 
83  sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
84  if (!sink) {
85  return NULL;
86  }
87  ast_mutex_init(&sink->lock);
88  ast_cond_init(&sink->cond, NULL);
89  sink->max_messages = 4;
90  sink->messages =
91  ast_malloc(sizeof(*sink->messages) * sink->max_messages);
92  if (!sink->messages) {
93  return NULL;
94  }
95  ao2_ref(sink, +1);
96  return sink;
97 }
Structure that collects messages from a topic.
Definition: stasis_test.h:44
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_malloc(len)
A wrapper for malloc()
Definition: astmm.h:191
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
int stasis_message_sink_should_stay ( struct stasis_message_sink sink,
int  num_messages,
int  timeout_millis 
)

Ensures that no new messages are received.

The optional timeout prevents complete deadlock in a test.

Parameters
sinkSink to wait on.
num_messagesexpecte sink->num_messages.
timeout_millisNumber of milliseconds to wait for.
Returns
Actual sink->num_messages value at return. If this is < num_messages, then the timeout expired.

Definition at line 170 of file res_stasis_test.c.

References stasis_message_sink::cond, stasis_message_sink::lock, lock, stasis_message_sink::num_messages, and SCOPED_MUTEX.

172 {
173  struct timespec deadline = make_deadline(timeout_millis);
174 
175  SCOPED_MUTEX(lock, &sink->lock);
176  while (sink->num_messages == num_messages) {
177  int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
178 
179  if (r == ETIMEDOUT) {
180  break;
181  }
182  if (r != 0) {
183  ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
184  strerror(r));
185  break;
186  }
187  }
188  return sink->num_messages;
189 }
ast_mutex_t lock
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
ast_mutex_t lock
Definition: stasis_test.h:46
int stasis_message_sink_wait_for ( struct stasis_message_sink sink,
int  start,
stasis_wait_cb  cmp_cb,
const void *  data,
int  timeout_millis 
)

Wait for a message that matches the given criteria.

Parameters
sinkSink to wait on.
startIndex of message to start with.
cmp_cbcomparison function. This returns true (non-zero) on match and false (zero) on match.
data
timeout_millisNumber of milliseconds to wait.
Returns
Index of the matching message.
Negative for no match.

Definition at line 191 of file res_stasis_test.c.

References stasis_message_sink::cond, stasis_message_sink::lock, lock, stasis_message_sink::messages, stasis_message_sink::num_messages, and SCOPED_MUTEX.

193 {
194  struct timespec deadline = make_deadline(timeout_millis);
195 
196  SCOPED_MUTEX(lock, &sink->lock);
197 
198  /* wait for the start */
199  while (sink->num_messages < start + 1) {
200  int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
201 
202  if (r == ETIMEDOUT) {
203  /* Timed out waiting for the start */
204  return -1;
205  }
206  if (r != 0) {
207  ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
208  strerror(r));
209  return -2;
210  }
211  }
212 
213 
214  while (!cmp_cb(sink->messages[start], data)) {
215  ++start;
216 
217  while (sink->num_messages < start + 1) {
218  int r = ast_cond_timedwait(&sink->cond,
219  &sink->lock, &deadline);
220 
221  if (r == ETIMEDOUT) {
222  return -1;
223  }
224  if (r != 0) {
225  ast_log(LOG_ERROR,
226  "Unexpected condition error: %s\n",
227  strerror(r));
228  return -2;
229  }
230  }
231  }
232 
233  return start;
234 }
struct stasis_message ** messages
Definition: stasis_test.h:57
ast_mutex_t lock
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
ast_mutex_t lock
Definition: stasis_test.h:46
int stasis_message_sink_wait_for_count ( struct stasis_message_sink sink,
int  num_messages,
int  timeout_millis 
)

Wait for a sink's num_messages field to reach a certain level.

The optional timeout prevents complete deadlock in a test.

Parameters
sinkSink to wait on.
num_messagessink->num_messages value to wait for.
timeout_millisNumber of milliseconds to wait. -1 to wait forever.
Returns
Actual sink->num_messages value at return. If this is < num_messages, then the timeout expired.

Definition at line 149 of file res_stasis_test.c.

References stasis_message_sink::cond, stasis_message_sink::lock, lock, stasis_message_sink::num_messages, and SCOPED_MUTEX.

151 {
152  struct timespec deadline = make_deadline(timeout_millis);
153 
154  SCOPED_MUTEX(lock, &sink->lock);
155  while (sink->num_messages < num_messages) {
156  int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
157 
158  if (r == ETIMEDOUT) {
159  break;
160  }
161  if (r != 0) {
162  ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
163  strerror(r));
164  break;
165  }
166  }
167  return sink->num_messages;
168 }
ast_mutex_t lock
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
ast_mutex_t lock
Definition: stasis_test.h:46