Asterisk - The Open Source Telephony Project  21.4.1
Data Structures | Macros | Functions | Variables
test_taskprocessor.c File Reference

taskprocessor unit tests More...

#include "asterisk.h"
#include "asterisk/test.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"

Go to the source code of this file.

Data Structures

struct  load_task_data
 Relevant data associated with taskprocessor load test. More...
 
struct  shutdown_data
 
struct  task_data
 userdata associated with baseline taskprocessor test More...
 
struct  test_listener_pvt
 Private data for the test taskprocessor listener. More...
 

Macros

#define HIGH_WATER_MARK   6
 
#define LOW_WATER_MARK   3
 
#define NUM_TASKS   20000
 
#define TEST_DATA_ARRAY_SIZE   10
 

Functions

static void __reg_module (void)
 
static void __unreg_module (void)
 
struct ast_moduleAST_MODULE_SELF_SYM (void)
 
 AST_TEST_DEFINE (default_taskprocessor)
 Baseline test for default taskprocessor. More...
 
 AST_TEST_DEFINE (subsystem_alert)
 Baseline test for subsystem alert.
 
 AST_TEST_DEFINE (default_taskprocessor_load)
 Load test for taskprocessor with default listener. More...
 
 AST_TEST_DEFINE (taskprocessor_listener)
 Test for a taskprocessor with custom listener. More...
 
 AST_TEST_DEFINE (taskprocessor_shutdown)
 
 AST_TEST_DEFINE (taskprocessor_push_local)
 
 AST_TEST_DEFINE (serializer_pool)
 Baseline test for a serializer pool. More...
 
static int check_stats (struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
 helper to ensure that statistics the listener is keeping are what we expect More...
 
static int listener_test_task (void *ignore)
 Queued task for taskprocessor listener test. More...
 
static int load_module (void)
 
static int load_task (void *data)
 a queued task to be used in the taskprocessor load test More...
 
static int local_task_exe (struct ast_taskprocessor_local *local)
 
static struct shutdown_datashutdown_data_create (int dont_wait)
 
static void shutdown_data_dtor (void *data)
 
static int shutdown_has_completed (struct shutdown_data *shutdown_data)
 
static void shutdown_poke (struct shutdown_data *shutdown_data)
 
static int shutdown_task_exec (void *data)
 
static int shutdown_waitfor_completion (struct shutdown_data *shutdown_data)
 
static int shutdown_waitfor_start (struct shutdown_data *shutdown_data)
 
static int task (void *data)
 Queued task for baseline test. More...
 
static struct task_datatask_data_create (void)
 Create a task_data object.
 
static void task_data_dtor (void *obj)
 
static int task_wait (struct task_data *task_data)
 Wait for a task to execute.
 
static void test_emptied (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's emptied callback.
 
static void * test_listener_pvt_alloc (void)
 test taskprocessor listener's alloc callback
 
static void test_shutdown (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's shutdown callback.
 
static int test_start (struct ast_taskprocessor_listener *listener)
 test taskprocessor listener's start callback
 
static void test_task_pushed (struct ast_taskprocessor_listener *listener, int was_empty)
 test taskprocessor listener's task_pushed callback More...
 
static void * tps_shutdown_thread (void *data)
 
static int unload_module (void)
 

Variables

static struct ast_module_info __mod_info = { .name = AST_MODULE, .flags = AST_MODFLAG_LOAD_ORDER , .description = "taskprocessor test module" , .key = "This paragraph is copyright (c) 2006 by Digium, Inc. \In order for your module to load, it must return this \key via a function called \"key\". Any code which \includes this paragraph must be licensed under the GNU \General Public License version 2 or later (at your \option). In addition to Digium's general reservations \of rights, Digium expressly reserves the right to \allow other parties to license this paragraph under \different terms. Any use of Digium, Inc. trademarks or \logos (including \"Asterisk\" or \"Digium\") without \express written permission of Digium, Inc. is prohibited.\n" , .buildopt_sum = AST_BUILDOPT_SUM, .load = load_module, .unload = unload_module, .load_pri = AST_MODPRI_DEFAULT, .support_level = AST_MODULE_SUPPORT_CORE, }
 
static const struct ast_module_infoast_module_info = &__mod_info
 
static struct load_task_data load_task_results
 
static const struct ast_taskprocessor_listener_callbacks test_callbacks
 

Detailed Description

taskprocessor unit tests

Author
Mark Michelson mmich.nosp@m.elso.nosp@m.n@dig.nosp@m.ium..nosp@m.com

Definition in file test_taskprocessor.c.

Function Documentation

AST_TEST_DEFINE ( default_taskprocessor  )

Baseline test for default taskprocessor.

This test ensures that when a task is added to a taskprocessor that has been allocated with a default listener that the task gets executed as expected

Definition at line 132 of file test_taskprocessor.c.

References ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), RAII_VAR, task(), task_data_create(), task_wait(), and TPS_REF_DEFAULT.

133 {
135  RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
136  int res;
137 
138  switch (cmd) {
139  case TEST_INIT:
140  info->name = "default_taskprocessor";
141  info->category = "/main/taskprocessor/";
142  info->summary = "Test of default taskprocessor";
143  info->description =
144  "Ensures that a queued task gets executed.";
145  return AST_TEST_NOT_RUN;
146  case TEST_EXECUTE:
147  break;
148  }
149 
150  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
151 
152  if (!tps) {
153  ast_test_status_update(test, "Unable to create test taskprocessor\n");
154  return AST_TEST_FAIL;
155  }
156 
158  if (!task_data) {
159  ast_test_status_update(test, "Unable to create task_data\n");
160  return AST_TEST_FAIL;
161  }
162 
164  ast_test_status_update(test, "Failed to queue task\n");
165  return AST_TEST_FAIL;
166  }
167 
168  res = task_wait(task_data);
169  if (res != 0) {
170  ast_test_status_update(test, "Queued task did not execute!\n");
171  return AST_TEST_FAIL;
172  }
173 
174  return AST_TEST_PASS;
175 }
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
static int task(void *data)
Queued task for baseline test.
static struct task_data * task_data_create(void)
Create a task_data object.
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
AST_TEST_DEFINE ( default_taskprocessor_load  )

Load test for taskprocessor with default listener.

This test queues a large number of tasks, each with random data associated. The test ensures that all of the tasks are run and that the tasks are executed in the same order that they were queued

Definition at line 352 of file test_taskprocessor.c.

References ast_taskprocessor_get(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), ast_tvnow(), load_task_data::cond, load_task(), load_task_data::lock, load_task_data::task_rand, load_task_data::tasks_completed, and TPS_REF_DEFAULT.

353 {
354  struct ast_taskprocessor *tps;
355  struct timeval start;
356  struct timespec ts;
357  enum ast_test_result_state res = AST_TEST_PASS;
358  int timedwait_res;
359  int i;
360  int rand_data[NUM_TASKS];
361 
362  switch (cmd) {
363  case TEST_INIT:
364  info->name = "default_taskprocessor_load";
365  info->category = "/main/taskprocessor/";
366  info->summary = "Load test of default taskprocessor";
367  info->description =
368  "Ensure that a large number of queued tasks are executed in the proper order.";
369  return AST_TEST_NOT_RUN;
370  case TEST_EXECUTE:
371  break;
372  }
373 
374  tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
375 
376  if (!tps) {
377  ast_test_status_update(test, "Unable to create test taskprocessor\n");
378  return AST_TEST_FAIL;
379  }
380 
381  start = ast_tvnow();
382 
383  ts.tv_sec = start.tv_sec + 60;
384  ts.tv_nsec = start.tv_usec * 1000;
385 
386  ast_cond_init(&load_task_results.cond, NULL);
387  ast_mutex_init(&load_task_results.lock);
388  load_task_results.tasks_completed = 0;
389 
390  for (i = 0; i < NUM_TASKS; ++i) {
391  rand_data[i] = ast_random();
392  if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
393  ast_test_status_update(test, "Failed to queue task\n");
394  res = AST_TEST_FAIL;
395  goto test_end;
396  }
397  }
398 
399  ast_mutex_lock(&load_task_results.lock);
400  while (load_task_results.tasks_completed < NUM_TASKS) {
401  timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
402  if (timedwait_res == ETIMEDOUT) {
403  break;
404  }
405  }
406  ast_mutex_unlock(&load_task_results.lock);
407 
408  if (load_task_results.tasks_completed != NUM_TASKS) {
409  ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
410  NUM_TASKS, load_task_results.tasks_completed);
411  res = AST_TEST_FAIL;
412  goto test_end;
413  }
414 
415  for (i = 0; i < NUM_TASKS; ++i) {
416  if (rand_data[i] != load_task_results.task_rand[i]) {
417  ast_test_status_update(test, "Queued tasks did not execute in order\n");
418  res = AST_TEST_FAIL;
419  goto test_end;
420  }
421  }
422 
423 test_end:
425  ast_mutex_destroy(&load_task_results.lock);
426  ast_cond_destroy(&load_task_results.cond);
427  return res;
428 }
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
return a reference to a taskprocessor, create one if it does not exist
Definition: taskprocessor.h:76
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
int task_rand[NUM_TASKS]
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
AST_TEST_DEFINE ( taskprocessor_listener  )

Test for a taskprocessor with custom listener.

This test pushes tasks to a taskprocessor with a custom listener, executes the tasks, and destroys the taskprocessor.

The test ensures that the listener's callbacks are called when expected and that the data being passed in is accurate.

Definition at line 555 of file test_taskprocessor.c.

References ast_taskprocessor_create_with_listener(), ast_taskprocessor_execute(), ast_taskprocessor_listener_alloc(), ast_taskprocessor_push(), ast_taskprocessor_unreference(), check_stats(), listener_test_task(), and test_listener_pvt_alloc().

556 {
557  struct ast_taskprocessor *tps = NULL;
558  struct ast_taskprocessor_listener *listener = NULL;
559  struct test_listener_pvt *pvt = NULL;
560  enum ast_test_result_state res = AST_TEST_PASS;
561 
562  switch (cmd) {
563  case TEST_INIT:
564  info->name = "taskprocessor_listener";
565  info->category = "/main/taskprocessor/";
566  info->summary = "Test of taskprocessor listeners";
567  info->description =
568  "Ensures that listener callbacks are called when expected.";
569  return AST_TEST_NOT_RUN;
570  case TEST_EXECUTE:
571  break;
572  }
573 
574  pvt = test_listener_pvt_alloc();
575  if (!pvt) {
576  ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
577  return AST_TEST_FAIL;
578  }
579 
580  listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
581  if (!listener) {
582  ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
583  res = AST_TEST_FAIL;
584  goto test_exit;
585  }
586 
587  tps = ast_taskprocessor_create_with_listener("test_listener", listener);
588  if (!tps) {
589  ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
590  res = AST_TEST_FAIL;
591  goto test_exit;
592  }
593 
594  if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
595  ast_test_status_update(test, "Failed to queue task\n");
596  res = AST_TEST_FAIL;
597  goto test_exit;
598  }
599 
600  if (check_stats(test, pvt, 1, 0, 1) < 0) {
601  res = AST_TEST_FAIL;
602  goto test_exit;
603  }
604 
605  if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
606  ast_test_status_update(test, "Failed to queue task\n");
607  res = AST_TEST_FAIL;
608  goto test_exit;
609  }
610 
611  if (check_stats(test, pvt, 2, 0, 1) < 0) {
612  res = AST_TEST_FAIL;
613  goto test_exit;
614  }
615 
617 
618  if (check_stats(test, pvt, 2, 0, 1) < 0) {
619  res = AST_TEST_FAIL;
620  goto test_exit;
621  }
622 
624 
625  if (check_stats(test, pvt, 2, 1, 1) < 0) {
626  res = AST_TEST_FAIL;
627  goto test_exit;
628  }
629 
631 
632  if (!pvt->shutdown) {
633  res = AST_TEST_FAIL;
634  goto test_exit;
635  }
636 
637 test_exit:
638  ao2_cleanup(listener);
639  /* This is safe even if tps is NULL */
641  ast_free(pvt);
642  return res;
643 }
A listener for taskprocessors.
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
A ast_taskprocessor structure is a singleton by name.
Definition: taskprocessor.c:69
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
Private data for the test taskprocessor listener.
AST_TEST_DEFINE ( serializer_pool  )

Baseline test for a serializer pool.

This test ensures that when a task is added to a taskprocessor that has been allocated with a default listener that the task gets executed as expected

Definition at line 901 of file test_taskprocessor.c.

References ast_taskprocessor_push(), RAII_VAR, task(), task_data_create(), task_wait(), threadpool, ast_threadpool_options::version, and task_data::wait_time.

902 {
903  RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
904  RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
905  RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
906  struct ast_threadpool_options options = {
907  .version = AST_THREADPOOL_OPTIONS_VERSION,
908  .idle_timeout = 0,
909  .auto_increment = 0,
910  .initial_size = 1,
911  .max_size = 0,
912  };
913  /* struct ast_taskprocessor *tps; */
914 
915  switch (cmd) {
916  case TEST_INIT:
917  info->name = "serializer_pool";
918  info->category = "/main/taskprocessor/";
919  info->summary = "Test using a serializer pool";
920  info->description =
921  "Ensures that a queued task gets executed.";
922  return AST_TEST_NOT_RUN;
923  case TEST_EXECUTE:
924  break;
925  }
926 
927  ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
928  ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
929  "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
930  ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
931  ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
932  ast_test_validate(test, task_data = task_data_create());
933 
934  task_data->wait_time = 4000; /* task takes 4 seconds */
935  ast_test_validate(test, !ast_taskprocessor_push(
936  ast_serializer_pool_get(serializer_pool), task, task_data));
937 
938  if (!ast_serializer_pool_destroy(serializer_pool)) {
939  ast_test_status_update(test, "Unexpected pool destruction!\n");
940  /*
941  * The pool should have timed out, so if it destruction reports success
942  * we need to fail.
943  */
944  serializer_pool = NULL;
945  return AST_TEST_FAIL;
946  }
947 
948  ast_test_validate(test, !task_wait(task_data));
949 
950  /* The first attempt should have failed. Second try should destroy successfully */
951  if (ast_serializer_pool_destroy(serializer_pool)) {
952  ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
953  /*
954  * If this fails we'll try again on return to hopefully avoid a memory leak.
955  * If it again times out a third time, well not much we can do.
956  */
957  return AST_TEST_FAIL;
958  }
959 
960  /* Test passed, so set pool to NULL to avoid "re-running" destroy */
961  serializer_pool = NULL;
962 
963  return AST_TEST_PASS;
964 }
unsigned long wait_time
static int task(void *data)
Queued task for baseline test.
static struct ast_threadpool * threadpool
Thread pool for observers.
Definition: sorcery.c:86
static struct task_data * task_data_create(void)
Create a task_data object.
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
Definition: threadpool.c:36
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
Definition: utils.h:941
static int check_stats ( struct ast_test *  test,
const struct test_listener_pvt pvt,
int  num_pushed,
int  num_emptied,
int  num_was_empty 
)
static

helper to ensure that statistics the listener is keeping are what we expect

Parameters
testThe currently-running test
pvtThe private data for the taskprocessor listener
num_pushedThe expected current number of tasks pushed to the processor
num_emptiedThe expected current number of times the taskprocessor has become empty
num_was_emptyThe expected current number of times that tasks were pushed to an empty taskprocessor
Return values
-1Stats were not as expected
0Stats were as expected

Definition at line 523 of file test_taskprocessor.c.

Referenced by AST_TEST_DEFINE().

524 {
525  if (pvt->num_pushed != num_pushed) {
526  ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
527  num_pushed, pvt->num_pushed);
528  return -1;
529  }
530 
531  if (pvt->num_emptied != num_emptied) {
532  ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
533  num_emptied, pvt->num_emptied);
534  return -1;
535  }
536 
537  if (pvt->num_was_empty != num_was_empty) {
538  ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
539  num_was_empty, pvt->num_emptied);
540  return -1;
541  }
542 
543  return 0;
544 }
static int listener_test_task ( void *  ignore)
static

Queued task for taskprocessor listener test.

Does nothing.

Definition at line 507 of file test_taskprocessor.c.

Referenced by AST_TEST_DEFINE().

508 {
509  return 0;
510 }
static int load_task ( void *  data)
static

a queued task to be used in the taskprocessor load test

The task increments the number of tasks executed and puts the passed-in data into the next slot in the array of random data.

Definition at line 336 of file test_taskprocessor.c.

References load_task_data::cond, lock, load_task_data::lock, SCOPED_MUTEX, load_task_data::task_rand, and load_task_data::tasks_completed.

Referenced by AST_TEST_DEFINE().

337 {
338  int *randdata = data;
339  SCOPED_MUTEX(lock, &load_task_results.lock);
340  load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
341  ast_cond_signal(&load_task_results.cond);
342  return 0;
343 }
ast_mutex_t lock
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
int task_rand[NUM_TASKS]
static int task ( void *  data)
static

Queued task for baseline test.

The task simply sets a boolean to indicate the task has been run and then signals a condition saying it's complete

Definition at line 88 of file test_taskprocessor.c.

References lock, SCOPED_MUTEX, task_data::task_complete, and task_data::wait_time.

Referenced by AST_TEST_DEFINE().

89 {
90  struct task_data *task_data = data;
91 
92  SCOPED_MUTEX(lock, &task_data->lock);
93  if (task_data->wait_time > 0) {
94  usleep(task_data->wait_time * 1000);
95  }
96  task_data->task_complete = 1;
97  ast_cond_signal(&task_data->cond);
98  return 0;
99 }
unsigned long wait_time
ast_mutex_t lock
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
Definition: lock.h:589
userdata associated with baseline taskprocessor test
static void test_task_pushed ( struct ast_taskprocessor_listener listener,
int  was_empty 
)
static

test taskprocessor listener's task_pushed callback

Adjusts private data's stats as indicated by the parameters.

Definition at line 468 of file test_taskprocessor.c.

References ast_taskprocessor_listener_get_user_data().

469 {
471  ++pvt->num_pushed;
472  if (was_empty) {
473  ++pvt->num_was_empty;
474  }
475 }
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
Private data for the test taskprocessor listener.