40 #include "asterisk/threadpool.h"
59 ast_mutex_init(&tld->lock);
60 ast_cond_init(&tld->cond, NULL);
71 tld->num_active = active_threads;
72 tld->num_idle = idle_threads;
73 ast_log(LOG_NOTICE,
"Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74 ast_cond_signal(&tld->cond);
85 tld->was_empty = was_empty;
86 ast_cond_signal(&tld->cond);
94 tld->empty_notice = 1;
95 ast_cond_signal(&tld->cond);
101 ast_cond_destroy(&tld->cond);
102 ast_mutex_destroy(&tld->lock);
125 ast_mutex_init(&std->lock);
126 ast_cond_init(&std->cond, NULL);
136 ast_mutex_destroy(&std->lock);
137 ast_cond_destroy(&std->cond);
142 static int simple_task(
void *data)
146 std->task_executed = 1;
147 ast_cond_signal(&std->cond);
151 static enum ast_test_result_state wait_until_thread_state(
struct ast_test *
test,
struct test_listener_data *tld,
int num_active,
int num_idle)
154 struct timespec end = {
155 .tv_sec = start.tv_sec + 5,
156 .tv_nsec = start.tv_usec * 1000
158 enum ast_test_result_state res = AST_TEST_PASS;
161 while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
162 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
167 if (tld->num_active != num_active && tld->num_idle != num_idle) {
168 ast_test_status_update(test,
"Number of active threads and idle threads not what was expected.\n");
169 ast_test_status_update(test,
"Expected %d active threads but got %d\n", num_active, tld->num_active);
170 ast_test_status_update(test,
"Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
181 struct timespec end = {
182 .tv_sec = start.tv_sec + 5,
183 .tv_nsec = start.tv_usec * 1000
187 while (!tld->task_pushed) {
188 if (ast_cond_timedwait(&tld->cond,
lock, &end) == ETIMEDOUT) {
194 static enum ast_test_result_state wait_for_completion(
struct ast_test *test,
struct simple_task_data *std)
197 struct timespec end = {
198 .tv_sec = start.tv_sec + 5,
199 .tv_nsec = start.tv_usec * 1000
201 enum ast_test_result_state res = AST_TEST_PASS;
204 while (!std->task_executed) {
205 if (ast_cond_timedwait(&std->cond,
lock, &end) == ETIMEDOUT) {
210 if (!std->task_executed) {
211 ast_test_status_update(test,
"Task execution did not occur\n");
217 static enum ast_test_result_state wait_for_empty_notice(
struct ast_test *test,
struct test_listener_data *tld)
220 struct timespec end = {
221 .tv_sec = start.tv_sec + 5,
222 .tv_nsec = start.tv_usec * 1000
224 enum ast_test_result_state res = AST_TEST_PASS;
227 while (!tld->empty_notice) {
228 if (ast_cond_timedwait(&tld->cond,
lock, &end) == ETIMEDOUT) {
233 if (!tld->empty_notice) {
234 ast_test_status_update(test,
"Test listener not notified that threadpool is empty\n");
241 static enum ast_test_result_state listener_check(
242 struct ast_test *test,
252 enum ast_test_result_state res = AST_TEST_PASS;
254 if (tld->task_pushed != task_pushed) {
255 ast_test_status_update(test,
"Expected task %sto be pushed, but it was%s\n",
256 task_pushed ?
"" :
"not ", tld->task_pushed ?
"" :
" not");
259 if (tld->was_empty != was_empty) {
260 ast_test_status_update(test,
"Expected %sto be empty, but it was%s\n",
261 was_empty ?
"" :
"not ", tld->was_empty ?
"" :
" not");
264 if (tld->num_tasks!= num_tasks) {
265 ast_test_status_update(test,
"Expected %d tasks to be pushed, but got %d\n",
266 num_tasks, tld->num_tasks);
269 if (tld->num_active != num_active) {
270 ast_test_status_update(test,
"Expected %d active threads, but got %d\n",
271 num_active, tld->num_active);
274 if (tld->num_idle != num_idle) {
275 ast_test_status_update(test,
"Expected %d idle threads, but got %d\n",
276 num_idle, tld->num_idle);
279 if (tld->empty_notice != empty_notice) {
280 ast_test_status_update(test,
"Expected %s empty notice, but got %s\n",
281 was_empty ?
"an" :
"no", tld->task_pushed ?
"one" :
"none");
294 enum ast_test_result_state res = AST_TEST_FAIL;
296 .
version = AST_THREADPOOL_OPTIONS_VERSION,
306 info->category =
"/main/threadpool/";
307 info->summary =
"Test task";
309 "Basic threadpool test";
310 return AST_TEST_NOT_RUN;
316 return AST_TEST_FAIL;
319 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
324 pool = ast_threadpool_create(info->name, listener, &options);
329 std = simple_task_data_alloc();
334 if (ast_threadpool_push(pool, simple_task, std)) {
338 wait_for_task_pushed(listener);
340 res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
343 ast_threadpool_shutdown(pool);
344 ao2_cleanup(listener);
345 simple_task_data_free(std);
354 enum ast_test_result_state res = AST_TEST_FAIL;
357 .
version = AST_THREADPOOL_OPTIONS_VERSION,
366 info->name =
"initial_threads";
367 info->category =
"/main/threadpool/";
368 info->summary =
"Test threadpool initialization state";
370 "Ensure that a threadpool created with a specific size contains the\n"
371 "proper number of idle threads.";
372 return AST_TEST_NOT_RUN;
379 return AST_TEST_FAIL;
382 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
387 pool = ast_threadpool_create(info->name, listener, &options);
392 res = wait_until_thread_state(test, tld, 0, 3);
395 ast_threadpool_shutdown(pool);
396 ao2_cleanup(listener);
406 enum ast_test_result_state res = AST_TEST_FAIL;
409 .
version = AST_THREADPOOL_OPTIONS_VERSION,
418 info->name =
"thread_creation";
419 info->category =
"/main/threadpool/";
420 info->summary =
"Test threadpool thread creation";
422 "Ensure that threads can be added to a threadpool";
423 return AST_TEST_NOT_RUN;
430 return AST_TEST_FAIL;
433 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
438 pool = ast_threadpool_create(info->name, listener, &options);
446 ast_threadpool_set_size(pool, 1);
448 res = wait_until_thread_state(test, tld, 0, 1);
451 ast_threadpool_shutdown(pool);
452 ao2_cleanup(listener);
461 enum ast_test_result_state res = AST_TEST_FAIL;
464 .
version = AST_THREADPOOL_OPTIONS_VERSION,
473 info->name =
"thread_destruction";
474 info->category =
"/main/threadpool/";
475 info->summary =
"Test threadpool thread destruction";
477 "Ensure that threads are properly destroyed in a threadpool";
478 return AST_TEST_NOT_RUN;
485 return AST_TEST_FAIL;
488 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
493 pool = ast_threadpool_create(info->name, listener, &options);
498 ast_threadpool_set_size(pool, 3);
500 res = wait_until_thread_state(test, tld, 0, 3);
501 if (res == AST_TEST_FAIL) {
505 res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
506 if (res == AST_TEST_FAIL) {
510 ast_threadpool_set_size(pool, 2);
512 res = wait_until_thread_state(test, tld, 0, 2);
515 ast_threadpool_shutdown(pool);
516 ao2_cleanup(listener);
525 enum ast_test_result_state res = AST_TEST_FAIL;
528 .
version = AST_THREADPOOL_OPTIONS_VERSION,
537 info->name =
"thread_timeout";
538 info->category =
"/main/threadpool/";
539 info->summary =
"Test threadpool thread timeout";
541 "Ensure that a thread with a two second timeout dies as expected.";
542 return AST_TEST_NOT_RUN;
549 return AST_TEST_FAIL;
552 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
557 pool = ast_threadpool_create(info->name, listener, &options);
562 ast_threadpool_set_size(pool, 1);
564 res = wait_until_thread_state(test, tld, 0, 1);
565 if (res == AST_TEST_FAIL) {
569 res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
570 if (res == AST_TEST_FAIL) {
574 res = wait_until_thread_state(test, tld, 0, 0);
575 if (res == AST_TEST_FAIL) {
579 res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
582 ast_threadpool_shutdown(pool);
583 ao2_cleanup(listener);
592 enum ast_test_result_state res = AST_TEST_FAIL;
595 .
version = AST_THREADPOOL_OPTIONS_VERSION,
605 info->name =
"thread_timeout_thrash";
606 info->category =
"/main/threadpool/";
607 info->summary =
"Thrash threadpool thread timeout";
609 "Repeatedly queue a task when a threadpool thread should timeout.";
610 return AST_TEST_NOT_RUN;
617 return AST_TEST_FAIL;
620 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
625 pool = ast_threadpool_create(info->name, listener, &options);
630 ast_threadpool_set_size(pool, 1);
632 for (iteration = 0; iteration < 30; ++iteration) {
635 struct timespec end = {
637 .tv_nsec = start.tv_usec * 1000
640 std = simple_task_data_alloc();
646 ast_mutex_lock(&tld->lock);
647 while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
650 ast_mutex_unlock(&tld->lock);
652 if (ast_threadpool_push(pool, simple_task, std)) {
655 res = wait_for_completion(test, std);
658 simple_task_data_free(std);
660 if (res == AST_TEST_FAIL) {
665 res = wait_until_thread_state(test, tld, 0, 0);
666 if (res == AST_TEST_FAIL) {
670 res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
673 ast_threadpool_shutdown(pool);
674 ao2_cleanup(listener);
684 enum ast_test_result_state res = AST_TEST_FAIL;
687 .
version = AST_THREADPOOL_OPTIONS_VERSION,
696 info->name =
"one_task_one_thread";
697 info->category =
"/main/threadpool/";
698 info->summary =
"Test a single task with a single thread";
700 "Push a task into an empty threadpool, then add a thread to the pool.";
701 return AST_TEST_NOT_RUN;
708 return AST_TEST_FAIL;
711 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
716 pool = ast_threadpool_create(info->name, listener, &options);
721 std = simple_task_data_alloc();
726 if (ast_threadpool_push(pool, simple_task, std)) {
730 ast_threadpool_set_size(pool, 1);
736 res = wait_for_completion(test, std);
737 if (res == AST_TEST_FAIL) {
741 res = wait_for_empty_notice(test, tld);
742 if (res == AST_TEST_FAIL) {
747 res = wait_until_thread_state(test, tld, 0, 1);
748 if (res == AST_TEST_FAIL) {
752 res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
755 ast_threadpool_shutdown(pool);
756 ao2_cleanup(listener);
757 simple_task_data_free(std);
768 enum ast_test_result_state res = AST_TEST_FAIL;
771 .
version = AST_THREADPOOL_OPTIONS_VERSION,
780 info->name =
"one_thread_one_task";
781 info->category =
"/main/threadpool/";
782 info->summary =
"Test a single thread with a single task";
784 "Add a thread to the pool and then push a task to it.";
785 return AST_TEST_NOT_RUN;
792 return AST_TEST_FAIL;
795 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
800 pool = ast_threadpool_create(info->name, listener, &options);
805 std = simple_task_data_alloc();
810 ast_threadpool_set_size(pool, 1);
812 res = wait_until_thread_state(test, tld, 0, 1);
813 if (res == AST_TEST_FAIL) {
817 if (ast_threadpool_push(pool, simple_task, std)) {
822 res = wait_for_completion(test, std);
823 if (res == AST_TEST_FAIL) {
827 res = wait_for_empty_notice(test, tld);
828 if (res == AST_TEST_FAIL) {
833 res = wait_until_thread_state(test, tld, 0, 1);
834 if (res == AST_TEST_FAIL) {
838 res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
841 ast_threadpool_shutdown(pool);
842 ao2_cleanup(listener);
843 simple_task_data_free(std);
855 enum ast_test_result_state res = AST_TEST_FAIL;
858 .
version = AST_THREADPOOL_OPTIONS_VERSION,
867 info->name =
"one_thread_multiple_tasks";
868 info->category =
"/main/threadpool/";
869 info->summary =
"Test a single thread with multiple tasks";
871 "Add a thread to the pool and then push three tasks to it.";
872 return AST_TEST_NOT_RUN;
879 return AST_TEST_FAIL;
882 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
887 pool = ast_threadpool_create(info->name, listener, &options);
892 std1 = simple_task_data_alloc();
893 std2 = simple_task_data_alloc();
894 std3 = simple_task_data_alloc();
895 if (!std1 || !std2 || !std3) {
899 ast_threadpool_set_size(pool, 1);
901 res = wait_until_thread_state(test, tld, 0, 1);
902 if (res == AST_TEST_FAIL) {
907 if (ast_threadpool_push(pool, simple_task, std1)) {
911 if (ast_threadpool_push(pool, simple_task, std2)) {
915 if (ast_threadpool_push(pool, simple_task, std3)) {
919 res = wait_for_completion(test, std1);
920 if (res == AST_TEST_FAIL) {
923 res = wait_for_completion(test, std2);
924 if (res == AST_TEST_FAIL) {
927 res = wait_for_completion(test, std3);
928 if (res == AST_TEST_FAIL) {
932 res = wait_for_empty_notice(test, tld);
933 if (res == AST_TEST_FAIL) {
937 res = wait_until_thread_state(test, tld, 0, 1);
938 if (res == AST_TEST_FAIL) {
942 res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
945 ast_threadpool_shutdown(pool);
946 ao2_cleanup(listener);
947 simple_task_data_free(std1);
948 simple_task_data_free(std2);
949 simple_task_data_free(std3);
954 static enum ast_test_result_state wait_until_thread_state_task_pushed(
struct ast_test *test,
957 enum ast_test_result_state res = AST_TEST_PASS;
958 struct timeval start;
961 res = wait_until_thread_state(test, tld, num_active, num_idle);
962 if (res == AST_TEST_FAIL) {
967 end.tv_sec = start.tv_sec + 5;
968 end.tv_nsec = start.tv_usec * 1000;
970 ast_mutex_lock(&tld->lock);
972 while (tld->num_tasks != num_tasks) {
973 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
978 if (tld->num_tasks != num_tasks) {
979 ast_test_status_update(test,
"Number of tasks pushed %d does not match expected %d\n",
980 tld->num_tasks, num_tasks);
984 ast_mutex_unlock(&tld->lock);
997 enum ast_test_result_state res = AST_TEST_FAIL;
1000 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1002 .auto_increment = 3,
1009 info->name =
"auto_increment";
1010 info->category =
"/main/threadpool/";
1011 info->summary =
"Test that the threadpool grows as tasks are added";
1013 "Create an empty threadpool and push a task to it. Once the task is\n"
1014 "pushed, the threadpool should add three threads and be able to\n"
1015 "handle the task. The threads should then go idle";
1016 return AST_TEST_NOT_RUN;
1023 return AST_TEST_FAIL;
1026 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1031 pool = ast_threadpool_create(info->name, listener, &options);
1036 std1 = simple_task_data_alloc();
1037 std2 = simple_task_data_alloc();
1038 std3 = simple_task_data_alloc();
1039 std4 = simple_task_data_alloc();
1040 if (!std1 || !std2 || !std3 || !std4) {
1044 if (ast_threadpool_push(pool, simple_task, std1)) {
1051 res = wait_for_completion(test, std1);
1052 if (res == AST_TEST_FAIL) {
1056 res = wait_for_empty_notice(test, tld);
1057 if (res == AST_TEST_FAIL) {
1061 res = wait_until_thread_state(test, tld, 0, 3);
1062 if (res == AST_TEST_FAIL) {
1069 res = AST_TEST_FAIL;
1071 if (ast_threadpool_push(pool, simple_task, std2)) {
1075 if (ast_threadpool_push(pool, simple_task, std3)) {
1079 if (ast_threadpool_push(pool, simple_task, std4)) {
1083 res = wait_for_completion(test, std2);
1084 if (res == AST_TEST_FAIL) {
1087 res = wait_for_completion(test, std3);
1088 if (res == AST_TEST_FAIL) {
1091 res = wait_for_completion(test, std4);
1092 if (res == AST_TEST_FAIL) {
1096 res = wait_for_empty_notice(test, tld);
1097 if (res == AST_TEST_FAIL) {
1101 res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
1102 if (res == AST_TEST_FAIL) {
1107 ast_threadpool_shutdown(pool);
1108 ao2_cleanup(listener);
1109 simple_task_data_free(std1);
1110 simple_task_data_free(std2);
1111 simple_task_data_free(std3);
1112 simple_task_data_free(std4);
1122 enum ast_test_result_state res = AST_TEST_FAIL;
1125 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1127 .auto_increment = 3,
1134 info->name =
"max_size";
1135 info->category =
"/main/threadpool/";
1136 info->summary =
"Test that the threadpool does not exceed its maximum size restriction";
1138 "Create an empty threadpool and push a task to it. Once the task is\n"
1139 "pushed, the threadpool should attempt to grow by three threads, but the\n"
1140 "pool's restrictions should only allow two threads to be added.";
1141 return AST_TEST_NOT_RUN;
1148 return AST_TEST_FAIL;
1151 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1156 pool = ast_threadpool_create(info->name, listener, &options);
1161 std = simple_task_data_alloc();
1166 if (ast_threadpool_push(pool, simple_task, std)) {
1170 res = wait_for_completion(test, std);
1171 if (res == AST_TEST_FAIL) {
1175 res = wait_until_thread_state(test, tld, 0, 2);
1176 if (res == AST_TEST_FAIL) {
1180 res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1182 ast_threadpool_shutdown(pool);
1183 ao2_cleanup(listener);
1184 simple_task_data_free(std);
1195 enum ast_test_result_state res = AST_TEST_FAIL;
1198 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1200 .auto_increment = 0,
1207 info->name =
"reactivation";
1208 info->category =
"/main/threadpool/";
1209 info->summary =
"Test that a threadpool reactivates when work is added";
1211 "Push a task into a threadpool. Make sure the task executes and the\n"
1212 "thread goes idle. Then push a second task and ensure that the thread\n"
1213 "awakens and executes the second task.";
1214 return AST_TEST_NOT_RUN;
1221 return AST_TEST_FAIL;
1224 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1229 pool = ast_threadpool_create(info->name, listener, &options);
1234 std1 = simple_task_data_alloc();
1235 std2 = simple_task_data_alloc();
1236 if (!std1 || !std2) {
1240 if (ast_threadpool_push(pool, simple_task, std1)) {
1244 ast_threadpool_set_size(pool, 1);
1246 res = wait_for_completion(test, std1);
1247 if (res == AST_TEST_FAIL) {
1251 res = wait_for_empty_notice(test, tld);
1252 if (res == AST_TEST_FAIL) {
1256 res = wait_until_thread_state(test, tld, 0, 1);
1257 if (res == AST_TEST_FAIL) {
1261 res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1262 if (res == AST_TEST_FAIL) {
1267 if (ast_threadpool_push(pool, simple_task, std2)) {
1268 res = AST_TEST_FAIL;
1272 res = wait_for_completion(test, std2);
1273 if (res == AST_TEST_FAIL) {
1277 res = wait_for_empty_notice(test, tld);
1278 if (res == AST_TEST_FAIL) {
1282 res = wait_until_thread_state(test, tld, 0, 1);
1283 if (res == AST_TEST_FAIL) {
1287 res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1290 ast_threadpool_shutdown(pool);
1291 ao2_cleanup(listener);
1292 simple_task_data_free(std1);
1293 simple_task_data_free(std2);
1304 ast_cond_t stall_cond;
1305 ast_cond_t notify_cond;
1315 ast_mutex_init(&ctd->lock);
1316 ast_cond_init(&ctd->stall_cond, NULL);
1317 ast_cond_init(&ctd->notify_cond, NULL);
1327 ast_mutex_destroy(&ctd->lock);
1328 ast_cond_destroy(&ctd->stall_cond);
1329 ast_cond_destroy(&ctd->notify_cond);
1334 static int complex_task(
void *data)
1339 ctd->task_started = 1;
1340 ast_cond_signal(&ctd->notify_cond);
1341 while (!ctd->continue_task) {
1342 ast_cond_wait(&ctd->stall_cond,
lock);
1345 ctd->task_executed = 1;
1346 ast_cond_signal(&ctd->notify_cond);
1353 ctd->continue_task = 1;
1354 ast_cond_signal(&ctd->stall_cond);
1360 struct timespec end = {
1361 .tv_sec = start.tv_sec + 5,
1362 .tv_nsec = start.tv_usec * 1000
1366 while (!ctd->task_started) {
1367 if (ast_cond_timedwait(&ctd->notify_cond,
lock, &end) == ETIMEDOUT) {
1372 return ctd->task_started;
1378 struct timespec end = {
1379 .tv_sec = start.tv_sec + 1,
1380 .tv_nsec = start.tv_usec * 1000
1384 while (!ctd->task_started) {
1385 if (ast_cond_timedwait(&ctd->notify_cond,
lock, &end) == ETIMEDOUT) {
1390 return ctd->task_started;
1393 static enum ast_test_result_state wait_for_complex_completion(
struct complex_task_data *ctd)
1396 struct timespec end = {
1397 .tv_sec = start.tv_sec + 5,
1398 .tv_nsec = start.tv_usec * 1000
1400 enum ast_test_result_state res = AST_TEST_PASS;
1403 while (!ctd->task_executed) {
1404 if (ast_cond_timedwait(&ctd->notify_cond,
lock, &end) == ETIMEDOUT) {
1409 if (!ctd->task_executed) {
1410 res = AST_TEST_FAIL;
1421 enum ast_test_result_state res = AST_TEST_FAIL;
1424 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1426 .auto_increment = 0,
1433 info->name =
"task_distribution";
1434 info->category =
"/main/threadpool/";
1435 info->summary =
"Test that tasks are evenly distributed to threads";
1437 "Push two tasks into a threadpool. Ensure that each is handled by\n"
1438 "a separate thread";
1439 return AST_TEST_NOT_RUN;
1446 return AST_TEST_FAIL;
1449 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1454 pool = ast_threadpool_create(info->name, listener, &options);
1459 ctd1 = complex_task_data_alloc();
1460 ctd2 = complex_task_data_alloc();
1461 if (!ctd1 || !ctd2) {
1465 if (ast_threadpool_push(pool, complex_task, ctd1)) {
1469 if (ast_threadpool_push(pool, complex_task, ctd2)) {
1473 ast_threadpool_set_size(pool, 2);
1475 res = wait_until_thread_state(test, tld, 2, 0);
1476 if (res == AST_TEST_FAIL) {
1480 res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1481 if (res == AST_TEST_FAIL) {
1489 res = wait_for_complex_completion(ctd1);
1490 if (res == AST_TEST_FAIL) {
1493 res = wait_for_complex_completion(ctd2);
1494 if (res == AST_TEST_FAIL) {
1498 res = wait_until_thread_state(test, tld, 0, 2);
1499 if (res == AST_TEST_FAIL) {
1503 res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1506 ast_threadpool_shutdown(pool);
1507 ao2_cleanup(listener);
1508 complex_task_data_free(ctd1);
1509 complex_task_data_free(ctd2);
1520 enum ast_test_result_state res = AST_TEST_FAIL;
1523 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1525 .auto_increment = 0,
1532 info->name =
"more_destruction";
1533 info->category =
"/main/threadpool/";
1534 info->summary =
"Test that threads are destroyed as expected";
1536 "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1537 "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1538 "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1539 "and ensure that both tasks complete.";
1540 return AST_TEST_NOT_RUN;
1547 return AST_TEST_FAIL;
1550 listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1555 pool = ast_threadpool_create(info->name, listener, &options);
1560 ctd1 = complex_task_data_alloc();
1561 ctd2 = complex_task_data_alloc();
1562 if (!ctd1 || !ctd2) {
1566 if (ast_threadpool_push(pool, complex_task, ctd1)) {
1570 if (ast_threadpool_push(pool, complex_task, ctd2)) {
1574 ast_threadpool_set_size(pool, 4);
1576 res = wait_until_thread_state(test, tld, 2, 2);
1577 if (res == AST_TEST_FAIL) {
1581 res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1582 if (res == AST_TEST_FAIL) {
1586 ast_threadpool_set_size(pool, 1);
1591 res = wait_until_thread_state(test, tld, 1, 0);
1592 if (res == AST_TEST_FAIL) {
1596 res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1597 if (res == AST_TEST_FAIL) {
1605 res = wait_for_complex_completion(ctd1);
1606 if (res == AST_TEST_FAIL) {
1609 res = wait_for_complex_completion(ctd2);
1610 if (res == AST_TEST_FAIL) {
1614 res = wait_until_thread_state(test, tld, 0, 1);
1615 if (res == AST_TEST_FAIL) {
1619 res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1622 ast_threadpool_shutdown(pool);
1623 ao2_cleanup(listener);
1624 complex_task_data_free(ctd1);
1625 complex_task_data_free(ctd2);
1634 enum ast_test_result_state res = AST_TEST_FAIL;
1641 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1643 .auto_increment = 0,
1650 info->name =
"threadpool_serializer";
1651 info->category =
"/main/threadpool/";
1652 info->summary =
"Test that serializers";
1654 "Ensures that tasks enqueued to a serialize execute in sequence.";
1655 return AST_TEST_NOT_RUN;
1660 pool = ast_threadpool_create(
"threadpool_serializer", NULL, &options);
1662 ast_test_status_update(test,
"Could not create threadpool\n");
1665 uut = ast_threadpool_serializer(
"ser1", pool);
1666 data1 = complex_task_data_alloc();
1667 data2 = complex_task_data_alloc();
1668 data3 = complex_task_data_alloc();
1669 if (!uut || !data1 || !data2 || !data3) {
1670 ast_test_status_update(test,
"Allocation failed\n");
1676 ast_test_status_update(test,
"Failed to enqueue data1\n");
1679 started = wait_for_complex_start(data1);
1681 ast_test_status_update(test,
"Failed to start data1\n");
1687 ast_test_status_update(test,
"Failed to enqueue data2\n");
1690 started = has_complex_started(data2);
1692 ast_test_status_update(test,
"data2 started out of order\n");
1697 if (ast_threadpool_push(pool, complex_task, data3)) {
1698 ast_test_status_update(test,
"Failed to enqueue data3\n");
1700 started = wait_for_complex_start(data3);
1702 ast_test_status_update(test,
"Failed to start data3\n");
1708 finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1710 ast_test_status_update(test,
"data1 couldn't finish\n");
1713 started = wait_for_complex_start(data2);
1715 ast_test_status_update(test,
"Failed to start data2\n");
1721 finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1723 ast_test_status_update(test,
"data2 couldn't finish\n");
1727 finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1729 ast_test_status_update(test,
"data3 couldn't finish\n");
1733 res = AST_TEST_PASS;
1740 ast_threadpool_shutdown(pool);
1741 complex_task_data_free(data1);
1742 complex_task_data_free(data2);
1743 complex_task_data_free(data3);
1749 enum ast_test_result_state res = AST_TEST_FAIL;
1754 .
version = AST_THREADPOOL_OPTIONS_VERSION,
1756 .auto_increment = 0,
1763 info->name =
"threadpool_serializer_dupe";
1764 info->category =
"/main/threadpool/";
1765 info->summary =
"Test that serializers are uniquely named";
1767 "Creating two serializers with the same name should\n"
1769 return AST_TEST_NOT_RUN;
1774 pool = ast_threadpool_create(
"threadpool_serializer", NULL, &options);
1776 ast_test_status_update(test,
"Could not create threadpool\n");
1780 uut = ast_threadpool_serializer(
"highlander", pool);
1782 ast_test_status_update(test,
"Allocation failed\n");
1786 there_can_be_only_one = ast_threadpool_serializer(
"highlander", pool);
1787 if (there_can_be_only_one) {
1789 ast_test_status_update(test,
"Duplicate name error\n");
1793 res = AST_TEST_PASS;
1797 ast_threadpool_shutdown(pool);
1801 static int unload_module(
void)
1803 ast_test_unregister(threadpool_push);
1804 ast_test_unregister(threadpool_initial_threads);
1805 ast_test_unregister(threadpool_thread_creation);
1806 ast_test_unregister(threadpool_thread_destruction);
1807 ast_test_unregister(threadpool_thread_timeout);
1808 ast_test_unregister(threadpool_thread_timeout_thrash);
1809 ast_test_unregister(threadpool_one_task_one_thread);
1810 ast_test_unregister(threadpool_one_thread_one_task);
1811 ast_test_unregister(threadpool_one_thread_multiple_tasks);
1812 ast_test_unregister(threadpool_auto_increment);
1813 ast_test_unregister(threadpool_max_size);
1814 ast_test_unregister(threadpool_reactivation);
1815 ast_test_unregister(threadpool_task_distribution);
1816 ast_test_unregister(threadpool_more_destruction);
1817 ast_test_unregister(threadpool_serializer);
1818 ast_test_unregister(threadpool_serializer_dupe);
1822 static int load_module(
void)
1824 ast_test_register(threadpool_push);
1825 ast_test_register(threadpool_initial_threads);
1826 ast_test_register(threadpool_thread_creation);
1827 ast_test_register(threadpool_thread_destruction);
1828 ast_test_register(threadpool_thread_timeout);
1829 ast_test_register(threadpool_thread_timeout_thrash);
1830 ast_test_register(threadpool_one_task_one_thread);
1831 ast_test_register(threadpool_one_thread_one_task);
1832 ast_test_register(threadpool_one_thread_multiple_tasks);
1833 ast_test_register(threadpool_auto_increment);
1834 ast_test_register(threadpool_max_size);
1835 ast_test_register(threadpool_reactivation);
1836 ast_test_register(threadpool_task_distribution);
1837 ast_test_register(threadpool_more_destruction);
1838 ast_test_register(threadpool_serializer);
1839 ast_test_register(threadpool_serializer_dupe);
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
int idle_timeout
Time limit in seconds for idle threads.
static void test_emptied(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's emptied callback.
static void test_shutdown(struct ast_taskprocessor_listener *listener)
test taskprocessor listener's shutdown callback.
static pj_pool_t * pool
Global memory pool for configuration and timers.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
test taskprocessor listener's task_pushed callback
#define ast_calloc(num, len)
A wrapper for calloc()
Support for logging to various files, console and syslog Configuration in file logger.conf.
An API for managing task processing threads that can be shared across modules.
listener for a threadpool
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
An opaque threadpool structure.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define AST_TEST_DEFINE(hdr)
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
Structure for mutex and tracking information.