38 #include "asterisk/serializer.h"
39 #include "asterisk/threadpool.h"
55 static void task_data_dtor(
void *obj)
59 ast_mutex_destroy(&task_data->lock);
60 ast_cond_destroy(&task_data->cond);
67 ao2_alloc(
sizeof(*task_data), task_data_dtor);
73 ast_cond_init(&task_data->cond, NULL);
74 ast_mutex_init(&task_data->lock);
88 static int task(
void *data)
97 ast_cond_signal(&task_data->cond);
110 end.tv_sec = start.tv_sec + 30;
111 end.tv_nsec = start.tv_usec * 1000;
115 res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
117 if (res == ETIMEDOUT) {
140 info->name =
"default_taskprocessor";
141 info->category =
"/main/taskprocessor/";
142 info->summary =
"Test of default taskprocessor";
144 "Ensures that a queued task gets executed.";
145 return AST_TEST_NOT_RUN;
153 ast_test_status_update(
test,
"Unable to create test taskprocessor\n");
154 return AST_TEST_FAIL;
159 ast_test_status_update(
test,
"Unable to create task_data\n");
160 return AST_TEST_FAIL;
164 ast_test_status_update(
test,
"Failed to queue task\n");
165 return AST_TEST_FAIL;
170 ast_test_status_update(
test,
"Queued task did not execute!\n");
171 return AST_TEST_FAIL;
174 return AST_TEST_PASS;
183 #define TEST_DATA_ARRAY_SIZE 10
184 #define LOW_WATER_MARK 3
185 #define HIGH_WATER_MARK 6
190 unsigned int alert_level;
191 unsigned int subsystem_alert_level;
195 info->name =
"subsystem_alert";
196 info->category =
"/main/taskprocessor/";
197 info->summary =
"Test of subsystem alerts";
199 "Ensures alerts are generated properly.";
200 return AST_TEST_NOT_RUN;
208 ast_test_status_update(
test,
"Unable to create test taskprocessor\n");
209 return AST_TEST_FAIL;
215 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
218 ast_test_status_update(
test,
"Unable to create task_data\n");
224 ast_test_status_update(
test,
"Pushing task %d\n", i);
226 ast_test_status_update(
test,
"Failed to queue task\n");
235 if (queue_count == HIGH_WATER_MARK) {
236 if (subsystem_alert_level) {
237 ast_test_status_update(
test,
"Subsystem alert triggered correctly at %ld\n", queue_count);
240 ast_test_status_update(
test,
"Global alert triggered correctly at %ld\n", queue_count);
242 }
else if (queue_count < HIGH_WATER_MARK) {
243 if (subsystem_alert_level > 0) {
244 ast_test_status_update(
test,
"Subsystem alert triggered unexpectedly at %ld\n", queue_count);
247 if (alert_level > 0) {
248 ast_test_status_update(
test,
"Global alert triggered unexpectedly at %ld\n", queue_count);
252 if (subsystem_alert_level == 0) {
253 ast_test_status_update(
test,
"Subsystem alert failed to trigger at %ld\n", queue_count);
256 if (alert_level == 0) {
257 ast_test_status_update(
test,
"Global alert failed to trigger at %ld\n", queue_count);
265 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
266 ast_test_status_update(
test,
"Waiting on task %d\n", i);
268 ast_test_status_update(
test,
"Queued task '%d' did not execute!\n", i);
277 if (queue_count == LOW_WATER_MARK) {
278 if (!subsystem_alert_level) {
279 ast_test_status_update(
test,
"Subsystem alert cleared correctly at %ld\n", queue_count);
282 ast_test_status_update(
test,
"Global alert cleared correctly at %ld\n", queue_count);
284 }
else if (queue_count > LOW_WATER_MARK) {
285 if (subsystem_alert_level == 0) {
286 ast_test_status_update(
test,
"Subsystem alert cleared unexpectedly at %ld\n", queue_count);
289 if (alert_level == 0) {
290 ast_test_status_update(
test,
"Global alert cleared unexpectedly at %ld\n", queue_count);
294 if (subsystem_alert_level > 0) {
295 ast_test_status_update(
test,
"Subsystem alert failed to clear at %ld\n", queue_count);
298 if (alert_level > 0) {
299 ast_test_status_update(
test,
"Global alert failed to clear at %ld\n", queue_count);
307 for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
311 return res ? AST_TEST_FAIL : AST_TEST_PASS;
314 #define NUM_TASKS 20000
338 int *randdata = data;
341 ast_cond_signal(&load_task_results.
cond);
355 struct timeval start;
357 enum ast_test_result_state res = AST_TEST_PASS;
360 int rand_data[NUM_TASKS];
364 info->name =
"default_taskprocessor_load";
365 info->category =
"/main/taskprocessor/";
366 info->summary =
"Load test of default taskprocessor";
368 "Ensure that a large number of queued tasks are executed in the proper order.";
369 return AST_TEST_NOT_RUN;
377 ast_test_status_update(
test,
"Unable to create test taskprocessor\n");
378 return AST_TEST_FAIL;
383 ts.tv_sec = start.tv_sec + 60;
384 ts.tv_nsec = start.tv_usec * 1000;
386 ast_cond_init(&load_task_results.
cond, NULL);
387 ast_mutex_init(&load_task_results.
lock);
390 for (i = 0; i < NUM_TASKS; ++i) {
391 rand_data[i] = ast_random();
393 ast_test_status_update(
test,
"Failed to queue task\n");
399 ast_mutex_lock(&load_task_results.
lock);
401 timedwait_res = ast_cond_timedwait(&load_task_results.
cond, &load_task_results.
lock, &ts);
402 if (timedwait_res == ETIMEDOUT) {
406 ast_mutex_unlock(&load_task_results.
lock);
409 ast_test_status_update(
test,
"Unexpected number of tasks executed. Expected %d but got %d\n",
415 for (i = 0; i < NUM_TASKS; ++i) {
416 if (rand_data[i] != load_task_results.
task_rand[i]) {
417 ast_test_status_update(
test,
"Queued tasks did not execute in order\n");
425 ast_mutex_destroy(&load_task_results.
lock);
426 ast_cond_destroy(&load_task_results.
cond);
473 ++pvt->num_was_empty;
525 if (pvt->num_pushed != num_pushed) {
526 ast_test_status_update(test,
"Unexpected number of tasks pushed. Expected %d but got %d\n",
527 num_pushed, pvt->num_pushed);
531 if (pvt->num_emptied != num_emptied) {
532 ast_test_status_update(test,
"Unexpected number of empties. Expected %d but got %d\n",
533 num_emptied, pvt->num_emptied);
537 if (pvt->num_was_empty != num_was_empty) {
538 ast_test_status_update(test,
"Unexpected number of empties. Expected %d but got %d\n",
539 num_was_empty, pvt->num_emptied);
560 enum ast_test_result_state res = AST_TEST_PASS;
564 info->name =
"taskprocessor_listener";
565 info->category =
"/main/taskprocessor/";
566 info->summary =
"Test of taskprocessor listeners";
568 "Ensures that listener callbacks are called when expected.";
569 return AST_TEST_NOT_RUN;
576 ast_test_status_update(
test,
"Unable to allocate test taskprocessor listener user data\n");
577 return AST_TEST_FAIL;
582 ast_test_status_update(
test,
"Unable to allocate test taskprocessor listener\n");
589 ast_test_status_update(
test,
"Unable to allocate test taskprocessor\n");
595 ast_test_status_update(
test,
"Failed to queue task\n");
606 ast_test_status_update(
test,
"Failed to queue task\n");
632 if (!pvt->shutdown) {
638 ao2_cleanup(listener);
651 int task_stop_waiting;
654 static void shutdown_data_dtor(
void *data)
657 ast_mutex_destroy(&shutdown_data->lock);
658 ast_cond_destroy(&shutdown_data->in);
659 ast_cond_destroy(&shutdown_data->out);
662 static struct shutdown_data *shutdown_data_create(
int dont_wait)
679 static int shutdown_task_exec(
void *data)
683 shutdown_data->task_started = 1;
684 ast_cond_signal(&shutdown_data->out);
685 while (!shutdown_data->task_stop_waiting) {
686 ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
688 shutdown_data->task_complete = 1;
689 ast_cond_signal(&shutdown_data->out);
693 static int shutdown_waitfor_completion(
struct shutdown_data *shutdown_data)
696 struct timespec end = {
697 .tv_sec = start.tv_sec + 5,
698 .tv_nsec = start.tv_usec * 1000
702 while (!shutdown_data->task_complete) {
703 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
708 return shutdown_data->task_complete;
711 static int shutdown_has_completed(
struct shutdown_data *shutdown_data)
714 return shutdown_data->task_complete;
717 static int shutdown_waitfor_start(
struct shutdown_data *shutdown_data)
720 struct timespec end = {
721 .tv_sec = start.tv_sec + 5,
722 .tv_nsec = start.tv_usec * 1000
726 while (!shutdown_data->task_started) {
727 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
732 return shutdown_data->task_started;
735 static void shutdown_poke(
struct shutdown_data *shutdown_data)
738 shutdown_data->task_stop_waiting = 1;
739 ast_cond_signal(&shutdown_data->in);
742 static void *tps_shutdown_thread(
void *data)
752 RAII_VAR(
struct shutdown_data *, task1, NULL, ao2_cleanup);
753 RAII_VAR(
struct shutdown_data *, task2, NULL, ao2_cleanup);
757 pthread_t shutdown_thread;
761 info->name =
"taskprocessor_shutdown";
762 info->category =
"/main/taskprocessor/";
763 info->summary =
"Test of taskprocessor shutdown sequence";
765 "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
766 return AST_TEST_NOT_RUN;
772 task1 = shutdown_data_create(0);
773 task2 = shutdown_data_create(1);
775 if (!tps || !task1 || !task2) {
776 ast_test_status_update(
test,
"Allocation error\n");
777 return AST_TEST_FAIL;
782 ast_test_status_update(
test,
"Could not push task1\n");
783 return AST_TEST_FAIL;
788 ast_test_status_update(
test,
"Could not push task2\n");
789 return AST_TEST_FAIL;
792 wait_res = shutdown_waitfor_start(task1);
794 ast_test_status_update(
test,
"Task1 didn't start\n");
795 return AST_TEST_FAIL;
798 pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
799 if (pthread_res != 0) {
800 ast_test_status_update(
test,
"Failed to create shutdown thread\n");
801 return AST_TEST_FAIL;
806 shutdown_poke(task1);
807 wait_res = shutdown_waitfor_completion(task1);
809 ast_test_status_update(
test,
"Task1 didn't complete\n");
810 return AST_TEST_FAIL;
814 pthread_join(shutdown_thread, NULL);
817 wait_res = shutdown_has_completed(task2);
819 ast_test_status_update(
test,
"Task2 didn't finish\n");
820 return AST_TEST_FAIL;
823 return AST_TEST_PASS;
847 info->name = __func__;
848 info->category =
"/main/taskprocessor/";
849 info->summary =
"Test of pushing local data";
851 "Ensures that local data is passed along.";
852 return AST_TEST_NOT_RUN;
860 ast_test_status_update(
test,
"Unable to create test taskprocessor\n");
861 return AST_TEST_FAIL;
867 ast_test_status_update(
test,
"Unable to create task_data\n");
868 return AST_TEST_FAIL;
875 ast_test_status_update(
test,
"Failed to queue task\n");
876 return AST_TEST_FAIL;
881 ast_test_status_update(
test,
"Queued task did not execute!\n");
882 return AST_TEST_FAIL;
885 if (local_data != 1) {
886 ast_test_status_update(
test,
887 "Queued task did not set local_data!\n");
888 return AST_TEST_FAIL;
891 return AST_TEST_PASS;
907 .
version = AST_THREADPOOL_OPTIONS_VERSION,
917 info->name =
"serializer_pool";
918 info->category =
"/main/taskprocessor/";
919 info->summary =
"Test using a serializer pool";
921 "Ensures that a queued task gets executed.";
922 return AST_TEST_NOT_RUN;
927 ast_test_validate(
test,
threadpool = ast_threadpool_create(
"test", NULL, &options));
928 ast_test_validate(
test, serializer_pool = ast_serializer_pool_create(
930 ast_test_validate(
test, !strcmp(ast_serializer_pool_name(serializer_pool),
"test/test"));
931 ast_test_validate(
test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
938 if (!ast_serializer_pool_destroy(serializer_pool)) {
939 ast_test_status_update(
test,
"Unexpected pool destruction!\n");
944 serializer_pool = NULL;
945 return AST_TEST_FAIL;
951 if (ast_serializer_pool_destroy(serializer_pool)) {
952 ast_test_status_update(
test,
"Unable to destroy serializer pool in allotted time!\n");
957 return AST_TEST_FAIL;
961 serializer_pool = NULL;
963 return AST_TEST_PASS;
966 static int unload_module(
void)
968 ast_test_unregister(default_taskprocessor);
969 ast_test_unregister(default_taskprocessor_load);
971 ast_test_unregister(taskprocessor_listener);
972 ast_test_unregister(taskprocessor_shutdown);
973 ast_test_unregister(taskprocessor_push_local);
974 ast_test_unregister(serializer_pool);
978 static int load_module(
void)
980 ast_test_register(default_taskprocessor);
981 ast_test_register(default_taskprocessor_load);
983 ast_test_register(taskprocessor_listener);
984 ast_test_register(taskprocessor_shutdown);
985 ast_test_register(taskprocessor_push_local);
986 ast_test_register(serializer_pool);
A listener for taskprocessors.
static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
helper to ensure that statistics the listener is keeping are what we expect
Asterisk main include file. File version handling, generic pbx functions.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
static int load_task(void *data)
a queued task to be used in the taskprocessor load test
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
return a reference to a taskprocessor, create one if it does not exist
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
Set the high and low alert water marks of the given taskprocessor queue.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
Relevant data associated with taskprocessor load test.
static int task(void *data)
Queued task for baseline test.
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is suspended.
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
Indicate the taskprocessor is unsuspended.
static struct ast_threadpool * threadpool
Thread pool for observers.
static struct task_data * task_data_create(void)
Create a task_data object.
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
static int listener_test_task(void *ignore)
Queued task for taskprocessor listener test.
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int(*task_exe)(struct ast_taskprocessor_local *local), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
Get the current taskprocessor high water alert count by subsystem.
userdata associated with baseline taskprocessor test
static int task_wait(struct task_data *task_data)
Wait for a task to execute.
#define ast_calloc(num, len)
A wrapper for calloc()
static void * test_listener_pvt_alloc(void)
test taskprocessor listener's alloc callback
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
Sets the local data associated with a taskprocessor.
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
unsigned int ast_taskprocessor_alert_get(void)
Get the current taskprocessor high water alert count.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
Private data for the test taskprocessor listener.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Structure for mutex and tracking information.
static int test_start(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's start callback
AST_TEST_DEFINE(default_taskprocessor)
Baseline test for default taskprocessor.