22 #include "asterisk/threadpool.h"
28 #define THREAD_BUCKETS 89
163 static int worker_thread_hash(
const void *obj,
int flags);
164 static int worker_thread_cmp(
void *obj,
void *arg,
int flags);
165 static void worker_thread_destroy(
void *obj);
167 static void *worker_start(
void *arg);
169 static int worker_thread_start(
struct worker_thread *worker);
235 static int queued_active_thread_idle(
void *data)
242 threadpool_send_state_changed(pair->
pool);
244 thread_worker_pair_free(pair);
256 static void threadpool_active_thread_idle(
struct ast_threadpool *pool,
266 pair = thread_worker_pair_alloc(pool, worker);
272 thread_worker_pair_free(pair);
284 static int queued_zombie_thread_dead(
void *data)
289 threadpool_send_state_changed(pair->
pool);
291 thread_worker_pair_free(pair);
301 static void threadpool_zombie_thread_dead(
struct ast_threadpool *pool,
311 pair = thread_worker_pair_alloc(pool, worker);
317 thread_worker_pair_free(pair);
321 static int queued_idle_thread_dead(
void *data)
326 threadpool_send_state_changed(pair->
pool);
328 thread_worker_pair_free(pair);
332 static void threadpool_idle_thread_dead(
struct ast_threadpool *pool,
342 pair = thread_worker_pair_alloc(pool, worker);
348 thread_worker_pair_free(pair);
386 static void threadpool_destructor(
void *obj)
407 struct ast_str *control_tps_name;
409 pool = ao2_alloc(
sizeof(*pool), threadpool_destructor);
411 if (!pool || !control_tps_name) {
412 ast_free(control_tps_name);
416 ast_str_set(&control_tps_name, 0,
"%s/pool-control", name);
419 ast_free(control_tps_name);
424 THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
429 THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
434 THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
492 static int activate_thread(
void *obj,
void *arg,
int flags)
501 ast_log(LOG_WARNING,
"Failed to activate thread %d. Remaining idle\n",
506 if (worker_set_state(worker, ALIVE)) {
507 ast_debug(1,
"Failed to activate thread %d. It is dead\n",
536 ast_debug(3,
"Increasing threadpool %s's size by %d\n",
539 for (i = 0; i < delta; ++i) {
545 if (worker_thread_start(worker)) {
546 ast_log(LOG_ERROR,
"Unable to start worker thread %d. Destroying.\n", worker->
id);
550 ast_log(LOG_WARNING,
"Failed to activate worker thread %d. Destroying.\n", worker->
id);
565 static int queued_task_pushed(
void *data)
570 unsigned int existing_active;
585 activate_thread, pool);
595 activate_thread, pool);
598 threadpool_send_state_changed(pool);
622 tpd = task_pushed_data_alloc(pool, was_empty);
639 static int queued_emptied(
void *data)
697 .
start = threadpool_tps_start,
698 .task_pushed = threadpool_tps_task_pushed,
699 .emptied = threadpool_tps_emptied,
700 .shutdown = threadpool_tps_shutdown,
715 static int kill_threads(
void *obj,
void *arg,
int flags)
717 int *num_to_kill = arg;
719 if (*num_to_kill > 0) {
746 static int zombify_threads(
void *obj,
void *arg,
void *data,
int flags)
750 int *num_to_zombify = data;
752 if ((*num_to_zombify)-- > 0) {
754 ast_log(LOG_WARNING,
"Failed to zombify active thread %d. Thread will remain active\n", worker->
id);
757 worker_set_state(worker, ZOMBIE);
784 int idle_threads_to_kill = MIN(delta, idle_threads);
785 int active_threads_to_zombify = delta - idle_threads_to_kill;
787 ast_debug(3,
"Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
791 kill_threads, &idle_threads_to_kill);
793 ast_debug(3,
"Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
797 zombify_threads, pool, &active_threads_to_zombify);
839 static int queued_set_size(
void *data)
843 unsigned int num_threads = ssd->
size;
851 if (current_size == num_threads) {
852 ast_debug(3,
"Not changing threadpool size since new size %u is the same as current %u\n",
853 num_threads, current_size);
857 if (current_size < num_threads) {
859 activate_thread, pool);
864 grow(pool, num_threads - current_size);
866 activate_thread, pool);
868 shrink(pool, current_size - num_threads);
871 threadpool_send_state_changed(pool);
875 void ast_threadpool_set_size(
struct ast_threadpool *pool,
unsigned int size)
884 ssd = set_size_data_alloc(pool, size);
925 pool = threadpool_alloc(name, options);
935 if (options->
version != AST_THREADPOOL_OPTIONS_VERSION) {
936 ast_log(LOG_WARNING,
"Incompatible version of threadpool options in use.\n");
940 fullname =
ast_alloca(strlen(name) + strlen(
"/pool") + 1);
941 sprintf(fullname,
"%s/pool", name);
957 int ast_threadpool_push(
struct ast_threadpool *pool,
int (*
task)(
void *data),
void *data)
985 static int worker_id_counter;
987 static int worker_thread_hash(
const void *obj,
int flags)
994 static int worker_thread_cmp(
void *obj,
void *arg,
int flags)
1012 worker_set_state(worker, DEAD);
1013 if (worker->
thread != AST_PTHREADT_NULL) {
1014 pthread_join(worker->
thread, NULL);
1015 worker->
thread = AST_PTHREADT_NULL;
1026 static void worker_thread_destroy(
void *obj)
1029 ast_debug(3,
"Destroying worker thread %d\n", worker->
id);
1030 worker_shutdown(worker);
1031 ast_mutex_destroy(&worker->
lock);
1032 ast_cond_destroy(&worker->
cond);
1044 static void *worker_start(
void *arg)
1047 enum worker_state saved_state;
1053 ast_mutex_lock(&worker->
lock);
1054 while (worker_idle(worker)) {
1055 ast_mutex_unlock(&worker->
lock);
1056 worker_active(worker);
1057 ast_mutex_lock(&worker->
lock);
1058 if (worker->
state != ALIVE) {
1061 threadpool_active_thread_idle(worker->
pool, worker);
1063 saved_state = worker->
state;
1064 ast_mutex_unlock(&worker->
lock);
1074 if (saved_state == ZOMBIE) {
1075 threadpool_zombie_thread_dead(worker->
pool, worker);
1095 struct worker_thread *worker = ao2_alloc(
sizeof(*worker), worker_thread_destroy);
1100 ast_mutex_init(&worker->
lock);
1101 ast_cond_init(&worker->
cond, NULL);
1103 worker->
thread = AST_PTHREADT_NULL;
1104 worker->
state = ALIVE;
1109 static int worker_thread_start(
struct worker_thread *worker)
1111 return ast_pthread_create(&worker->
thread, NULL, worker_start, worker);
1137 alive = threadpool_execute(worker->
pool);
1156 struct timespec end = {
1158 .tv_nsec = start.tv_usec * 1000,
1162 ast_cond_wait(&worker->
cond, &worker->
lock);
1163 }
else if (ast_cond_timedwait(&worker->
cond, &worker->
lock, &end) == ETIMEDOUT) {
1169 ast_debug(1,
"Worker thread idle timeout reached. Dying.\n");
1170 threadpool_idle_thread_dead(worker->
pool, worker);
1171 worker->
state = DEAD;
1174 return worker->
state == ALIVE;
1195 if (worker->
state == DEAD) {
1198 ast_assert(worker->
state != ZOMBIE);
1203 ast_assert(worker->
state != DEAD);
1207 worker->
state = state;
1209 ast_cond_signal(&worker->
cond);
1222 static void serializer_shutdown_group_dtor(
void *vdoomed)
1226 ast_cond_destroy(&doomed->
cond);
1233 shutdown_group = ao2_alloc(
sizeof(*shutdown_group), serializer_shutdown_group_dtor);
1234 if (!shutdown_group) {
1237 ast_cond_init(&shutdown_group->
cond, NULL);
1238 return shutdown_group;
1246 if (!shutdown_group) {
1251 ast_assert(lock != NULL);
1253 ao2_lock(shutdown_group);
1255 struct timeval start;
1256 struct timespec end;
1259 end.tv_sec = start.tv_sec + timeout;
1260 end.tv_nsec = start.tv_usec * 1000;
1261 while (shutdown_group->
count) {
1262 if (ast_cond_timedwait(&shutdown_group->
cond, lock, &end)) {
1268 while (shutdown_group->
count) {
1269 if (ast_cond_wait(&shutdown_group->
cond, lock)) {
1275 remaining = shutdown_group->
count;
1276 ao2_unlock(shutdown_group);
1289 ao2_lock(shutdown_group);
1290 ++shutdown_group->
count;
1291 ao2_unlock(shutdown_group);
1303 ao2_lock(shutdown_group);
1304 --shutdown_group->
count;
1305 if (!shutdown_group->
count) {
1306 ast_cond_signal(&shutdown_group->
cond);
1308 ao2_unlock(shutdown_group);
1318 static void serializer_dtor(
void *obj)
1322 ao2_cleanup(ser->
pool);
1345 static int execute_tasks(
void *data)
1365 if (ast_threadpool_push(ser->
pool, execute_tasks, tps)) {
1389 .start = serializer_start,
1390 .shutdown = serializer_shutdown,
1405 ser = serializer_create(pool, shutdown_group);
1420 }
else if (shutdown_group) {
1421 serializer_shutdown_group_inc(shutdown_group);
1430 return ast_threadpool_serializer_group(name, pool, NULL);
A listener for taskprocessors.
int auto_increment
Number of threads to increment pool by.
struct ast_serializer_shutdown_group * shutdown_group
struct ast_threadpool_options options
Asterisk main include file. File version handling, generic pbx functions.
void(* state_changed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int active_threads, int idle_threads)
Indicates that the state of threads in the pool has changed.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
int idle_timeout
Time limit in seconds for idle threads.
long ast_taskprocessor_size(struct ast_taskprocessor *tps)
Return the current size of the taskprocessor queue.
int initial_size
Number of threads the pool will start with.
struct ast_taskprocessor * ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
Get a reference to the listener's taskprocessor.
int max_size
Maximum number of threads a pool may have.
Struct used for queued operations involving worker state changes.
char * ast_str_buffer(const struct ast_str *buf)
Returns the string buffer within the ast_str buf.
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container, as described below.
struct ast_taskprocessor * ast_taskprocessor_get(const char *name, enum ast_tps_options create)
Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary...
struct ast_threadpool_options options
struct ao2_container * idle_threads
The container of idle threads. Idle threads are those that are currently waiting to run tasks...
static pj_pool_t * pool
Global memory pool for configuration and timers.
return a reference to a taskprocessor, create one if it does not exist
AST_THREADSTORAGE_RAW(in_intercept_routine)
Assume that the ao2_container is already locked.
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
const char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
Return the name of the taskprocessor singleton.
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
Pop a task off the taskprocessor and execute it.
void(* shutdown)(struct ast_threadpool_listener *listener)
The threadpool is shutting down.
const struct ast_threadpool_listener_callbacks * callbacks
Helper struct used for queued operations that change the size of the threadpool.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
static int task(void *data)
Queued task for baseline test.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void(* emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener)
Indicates the threadpool's taskprocessor has become empty.
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
void * ao2_object_get_lockaddr(void *obj)
Return the mutex lock address of an object.
#define SCOPED_MUTEX(varname, lock)
scoped lock specialization for mutexes
int ast_threadstorage_set_ptr(struct ast_threadstorage *ts, void *ptr)
Set a raw pointer from threadstorage.
struct ast_threadpool * pool
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct ast_threadpool * pool
void(* thread_start)(void)
Function to call when a thread starts.
#define ast_malloc(len)
A wrapper for malloc()
struct worker_thread * worker
#define ast_debug(level,...)
Log a DEBUG message.
void(* thread_end)(void)
Function to call when a thread ends.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
struct ast_threadpool_listener * listener
struct ast_threadpool * pool
struct ast_threadpool * pool
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Support for dynamic strings.
#define ao2_unlink(container, obj)
Remove an object from a container.
struct ast_taskprocessor * tps
The main taskprocessor.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
struct ao2_container * zombie_threads
The container of zombie threads. Zombie threads may be running tasks, but they are scheduled to die s...
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
Allocate a taskprocessor listener.
void * ast_threadstorage_get_ptr(struct ast_threadstorage *ts)
Retrieve a raw pointer from threadstorage.
An API for managing task processing threads that can be shared across modules.
void * ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
Get the user data from the listener.
listener for a threadpool
struct ao2_container * active_threads
The container of active threads. Active threads are those that are currently running tasks...
A ast_taskprocessor structure is a singleton by name.
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int(*task_exe)(void *datap), void *datap) attribute_warn_unused_result
Push a task into the specified taskprocessor queue and signal the taskprocessor thread.
void(* task_pushed)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener, int was_empty)
Indicates that a task was pushed to the threadpool.
An opaque threadpool structure.
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
struct ast_taskprocessor * ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
Create a taskprocessor with a custom listener.
void(* task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty)
Indicates a task was pushed to the processor.
helper used for queued task when tasks are pushed
struct ast_threadpool * pool
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
int(* start)(struct ast_taskprocessor_listener *listener)
The taskprocessor has started completely.
Structure for mutex and tracking information.
#define ast_str_create(init_len)
Create a malloc'ed dynamic length string.
struct ast_taskprocessor * control_tps
The control taskprocessor.
#define ao2_link(container, obj)
Add an object to a container.