Asterisk - The Open Source Telephony Project  21.4.1
sched.c
Go to the documentation of this file.
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2010, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  * Russell Bryant <russell@digium.com>
8  *
9  * See http://www.asterisk.org for more information about
10  * the Asterisk project. Please do not directly contact
11  * any of the maintainers of this project for assistance;
12  * the project provides a web site, mailing lists and IRC
13  * channels for your use.
14  *
15  * This program is free software, distributed under the terms of
16  * the GNU General Public License Version 2. See the LICENSE file
17  * at the top of the source tree.
18  */
19 
20 /*! \file
21  *
22  * \brief Scheduler Routines (from cheops-NG)
23  *
24  * \author Mark Spencer <markster@digium.com>
25  */
26 
27 /*** MODULEINFO
28  <support_level>core</support_level>
29  ***/
30 
31 #include "asterisk.h"
32 
33 #ifdef DEBUG_SCHEDULER
34 #define DEBUG(a) a
35 #else
36 #define DEBUG(a)
37 #endif
38 
39 #include <sys/time.h>
40 
41 #include "asterisk/sched.h"
42 #include "asterisk/channel.h"
43 #include "asterisk/lock.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/heap.h"
46 #include "asterisk/threadstorage.h"
47 
48 /*!
49  * \brief Max num of schedule structs
50  *
51  * \note The max number of schedule structs to keep around
52  * for use. Undefine to disable schedule structure
53  * caching. (Only disable this on very low memory
54  * machines)
55  */
56 #define SCHED_MAX_CACHE 128
57 
58 AST_THREADSTORAGE(last_del_id);
59 
60 /*!
61  * \brief Scheduler ID holder
62  *
63  * These form a queue on a scheduler context. When a new
64  * scheduled item is created, a sched_id is popped off the
65  * queue and its id is assigned to the new scheduled item.
66  * When the scheduled task is complete, the sched_id on that
67  * task is then pushed to the back of the queue to be re-used
68  * on some future scheduled item.
69  */
70 struct sched_id {
71  /*! Immutable ID number that is copied onto the scheduled task */
72  int id;
74 };
75 
76 struct sched {
77  AST_LIST_ENTRY(sched) list;
78  /*! The ID that has been popped off the scheduler context's queue */
79  struct sched_id *sched_id;
80  struct timeval when; /*!< Absolute time event should take place */
81  /*!
82  * \brief Tie breaker in case the when is the same for multiple entries.
83  *
84  * \note The oldest expiring entry in the scheduler heap goes first.
85  * This is possible when multiple events are scheduled to expire at
86  * the same time by internal coding.
87  */
88  unsigned int tie_breaker;
89  int resched; /*!< When to reschedule */
90  int variable; /*!< Use return value from callback to reschedule */
91  const void *data; /*!< Data */
92  ast_sched_cb callback; /*!< Callback */
93  ssize_t __heap_index;
94  /*!
95  * Used to synchronize between thread running a task and thread
96  * attempting to delete a task
97  */
98  ast_cond_t cond;
99  /*! Indication that a running task was deleted. */
100  unsigned int deleted:1;
101  /*! Indication that a running task was rescheduled. */
102  unsigned int rescheduled:1;
103 };
104 
105 struct sched_thread {
106  pthread_t thread;
107  ast_cond_t cond;
108  unsigned int stop:1;
109 };
110 
113  unsigned int eventcnt; /*!< Number of events processed */
114  unsigned int highwater; /*!< highest count so far */
115  /*! Next tie breaker in case events expire at the same time. */
116  unsigned int tie_breaker;
117  struct ast_heap *sched_heap;
118  struct sched_thread *sched_thread;
119  /*! The scheduled task that is currently executing */
121  /*! Valid while currently_executing is not NULL */
123 
124 #ifdef SCHED_MAX_CACHE
125  AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
126  unsigned int schedccnt;
127 #endif
128  /*! Queue of scheduler task IDs to assign */
129  AST_LIST_HEAD_NOLOCK(, sched_id) id_queue;
130  /*! The number of IDs in the id_queue */
131  int id_queue_size;
132 };
133 
134 static void *sched_run(void *data)
135 {
136  struct ast_sched_context *con = data;
137 
138  while (!con->sched_thread->stop) {
139  int ms;
140  struct timespec ts = {
141  .tv_sec = 0,
142  };
143 
144  ast_mutex_lock(&con->lock);
145 
146  if (con->sched_thread->stop) {
147  ast_mutex_unlock(&con->lock);
148  return NULL;
149  }
150 
151  ms = ast_sched_wait(con);
152 
153  if (ms == -1) {
154  ast_cond_wait(&con->sched_thread->cond, &con->lock);
155  } else {
156  struct timeval tv;
157  tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
158  ts.tv_sec = tv.tv_sec;
159  ts.tv_nsec = tv.tv_usec * 1000;
160  ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
161  }
162 
163  ast_mutex_unlock(&con->lock);
164 
165  if (con->sched_thread->stop) {
166  return NULL;
167  }
168 
169  ast_sched_runq(con);
170  }
171 
172  return NULL;
173 }
174 
175 static void sched_thread_destroy(struct ast_sched_context *con)
176 {
177  if (!con->sched_thread) {
178  return;
179  }
180 
181  if (con->sched_thread->thread != AST_PTHREADT_NULL) {
182  ast_mutex_lock(&con->lock);
183  con->sched_thread->stop = 1;
184  ast_cond_signal(&con->sched_thread->cond);
185  ast_mutex_unlock(&con->lock);
186  pthread_join(con->sched_thread->thread, NULL);
187  con->sched_thread->thread = AST_PTHREADT_NULL;
188  }
189 
190  ast_cond_destroy(&con->sched_thread->cond);
191 
192  ast_free(con->sched_thread);
193 
194  con->sched_thread = NULL;
195 }
196 
198 {
199  struct sched_thread *st;
200 
201  if (con->sched_thread) {
202  ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
203  return -1;
204  }
205 
206  if (!(st = ast_calloc(1, sizeof(*st)))) {
207  return -1;
208  }
209 
210  ast_cond_init(&st->cond, NULL);
211 
212  st->thread = AST_PTHREADT_NULL;
213 
214  con->sched_thread = st;
215 
216  if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
217  ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
218  sched_thread_destroy(con);
219  return -1;
220  }
221 
222  return 0;
223 }
224 
225 static int sched_time_cmp(void *va, void *vb)
226 {
227  struct sched *a = va;
228  struct sched *b = vb;
229  int cmp;
230 
231  cmp = ast_tvcmp(b->when, a->when);
232  if (!cmp) {
233  cmp = b->tie_breaker - a->tie_breaker;
234  }
235  return cmp;
236 }
237 
239 {
240  struct ast_sched_context *tmp;
241 
242  if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
243  return NULL;
244  }
245 
246  ast_mutex_init(&tmp->lock);
247  tmp->eventcnt = 1;
248 
250 
251  if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
252  offsetof(struct sched, __heap_index)))) {
254  return NULL;
255  }
256 
257  return tmp;
258 }
259 
260 static void sched_free(struct sched *task)
261 {
262  /* task->sched_id will be NULL most of the time, but when the
263  * scheduler context shuts down, it will free all scheduled
264  * tasks, and in that case, the task->sched_id will be non-NULL
265  */
266  ast_free(task->sched_id);
267  ast_cond_destroy(&task->cond);
268  ast_free(task);
269 }
270 
272 {
273  struct sched *s;
274  struct sched_id *sid;
275 
276  sched_thread_destroy(con);
277  con->sched_thread = NULL;
278 
279  ast_mutex_lock(&con->lock);
280 
281 #ifdef SCHED_MAX_CACHE
282  while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
283  sched_free(s);
284  }
285 #endif
286 
287  if (con->sched_heap) {
288  while ((s = ast_heap_pop(con->sched_heap))) {
289  sched_free(s);
290  }
291  ast_heap_destroy(con->sched_heap);
292  con->sched_heap = NULL;
293  }
294 
295  while ((sid = AST_LIST_REMOVE_HEAD(&con->id_queue, list))) {
296  ast_free(sid);
297  }
298 
299  ast_mutex_unlock(&con->lock);
300  ast_mutex_destroy(&con->lock);
301 
302  ast_free(con);
303 }
304 
305 #define ID_QUEUE_INCREMENT 16
306 
307 /*!
308  * \brief Add new scheduler IDs to the queue.
309  *
310  * \retval The number of IDs added to the queue
311  */
312 static int add_ids(struct ast_sched_context *con)
313 {
314  int new_size;
315  int original_size;
316  int i;
317 
318  original_size = con->id_queue_size;
319  /* So we don't go overboard with the mallocs here, we'll just up
320  * the size of the list by a fixed amount each time instead of
321  * multiplying the size by any particular factor
322  */
323  new_size = original_size + ID_QUEUE_INCREMENT;
324  if (new_size < 0) {
325  /* Overflow. Cap it at INT_MAX. */
326  new_size = INT_MAX;
327  }
328  for (i = original_size; i < new_size; ++i) {
329  struct sched_id *new_id;
330 
331  new_id = ast_calloc(1, sizeof(*new_id));
332  if (!new_id) {
333  break;
334  }
335 
336  /*
337  * According to the API doxygen a sched ID of 0 is valid.
338  * Unfortunately, 0 was never returned historically and
339  * several users incorrectly coded usage of the returned
340  * sched ID assuming that 0 was invalid.
341  */
342  new_id->id = ++con->id_queue_size;
343 
344  AST_LIST_INSERT_TAIL(&con->id_queue, new_id, list);
345  }
346 
347  return con->id_queue_size - original_size;
348 }
349 
350 static int set_sched_id(struct ast_sched_context *con, struct sched *new_sched)
351 {
352  if (AST_LIST_EMPTY(&con->id_queue) && (add_ids(con) == 0)) {
353  return -1;
354  }
355 
356  new_sched->sched_id = AST_LIST_REMOVE_HEAD(&con->id_queue, list);
357  return 0;
358 }
359 
360 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
361 {
362  if (tmp->sched_id) {
363  AST_LIST_INSERT_TAIL(&con->id_queue, tmp->sched_id, list);
364  tmp->sched_id = NULL;
365  }
366 
367  /*
368  * Add to the cache, or just free() if we
369  * already have too many cache entries
370  */
371 #ifdef SCHED_MAX_CACHE
372  if (con->schedccnt < SCHED_MAX_CACHE) {
373  AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
374  con->schedccnt++;
375  } else
376 #endif
377  sched_free(tmp);
378 }
379 
380 static struct sched *sched_alloc(struct ast_sched_context *con)
381 {
382  struct sched *tmp;
383 
384  /*
385  * We keep a small cache of schedule entries
386  * to minimize the number of necessary malloc()'s
387  */
388 #ifdef SCHED_MAX_CACHE
389  if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
390  con->schedccnt--;
391  } else
392 #endif
393  {
394  tmp = ast_calloc(1, sizeof(*tmp));
395  if (!tmp) {
396  return NULL;
397  }
398  ast_cond_init(&tmp->cond, NULL);
399  }
400 
401  if (set_sched_id(con, tmp)) {
402  sched_release(con, tmp);
403  return NULL;
404  }
405 
406  return tmp;
407 }
408 
410 {
411  int i = 1;
412  struct sched *current;
413 
414  ast_mutex_lock(&con->lock);
415  while ((current = ast_heap_peek(con->sched_heap, i))) {
416  if (current->callback != match) {
417  i++;
418  continue;
419  }
420 
421  ast_heap_remove(con->sched_heap, current);
422 
423  cleanup_cb(current->data);
424  sched_release(con, current);
425  }
426  ast_mutex_unlock(&con->lock);
427 }
428 
429 /*! \brief
430  * Return the number of milliseconds
431  * until the next scheduled event
432  */
434 {
435  int ms;
436  struct sched *s;
437 
438  DEBUG(ast_debug(1, "ast_sched_wait()\n"));
439 
440  ast_mutex_lock(&con->lock);
441  if ((s = ast_heap_peek(con->sched_heap, 1))) {
442  ms = ast_tvdiff_ms(s->when, ast_tvnow());
443  if (ms < 0) {
444  ms = 0;
445  }
446  } else {
447  ms = -1;
448  }
449  ast_mutex_unlock(&con->lock);
450 
451  return ms;
452 }
453 
454 
455 /*! \brief
456  * Take a sched structure and put it in the
457  * queue, such that the soonest event is
458  * first in the list.
459  */
460 static void schedule(struct ast_sched_context *con, struct sched *s)
461 {
462  size_t size;
463 
464  size = ast_heap_size(con->sched_heap);
465 
466  /* Record the largest the scheduler heap became for reporting purposes. */
467  if (con->highwater <= size) {
468  con->highwater = size + 1;
469  }
470 
471  /* Determine the tie breaker value for the new entry. */
472  if (size) {
473  ++con->tie_breaker;
474  } else {
475  /*
476  * Restart the sequence for the first entry to make integer
477  * roll over more unlikely.
478  */
479  con->tie_breaker = 0;
480  }
481  s->tie_breaker = con->tie_breaker;
482 
483  ast_heap_push(con->sched_heap, s);
484 }
485 
486 /*! \brief
487  * given the last event *tv and the offset in milliseconds 'when',
488  * computes the next value,
489  */
490 static void sched_settime(struct timeval *t, int when)
491 {
492  struct timeval now = ast_tvnow();
493 
494  if (when < 0) {
495  /*
496  * A negative when value is likely a bug as it
497  * represents a VERY large timeout time.
498  */
499  ast_log(LOG_WARNING,
500  "Bug likely: Negative time interval %d (interpreted as %u ms) requested!\n",
501  when, (unsigned int) when);
502  ast_assert(0);
503  }
504 
505  /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
506  if (ast_tvzero(*t)) /* not supplied, default to now */
507  *t = now;
508  *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
509  if (ast_tvcmp(*t, now) < 0) {
510  *t = now;
511  }
512 }
513 
514 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
515 {
516  /* 0 means the schedule item is new; do not delete */
517  if (old_id > 0) {
518  AST_SCHED_DEL(con, old_id);
519  }
520  return ast_sched_add_variable(con, when, callback, data, variable);
521 }
522 
523 /*! \brief
524  * Schedule callback(data) to happen when ms into the future
525  */
526 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
527 {
528  struct sched *tmp;
529  int res = -1;
530 
531  DEBUG(ast_debug(1, "ast_sched_add()\n"));
532 
533  ast_mutex_lock(&con->lock);
534  if ((tmp = sched_alloc(con))) {
535  con->eventcnt++;
536  tmp->callback = callback;
537  tmp->data = data;
538  tmp->resched = when;
539  tmp->variable = variable;
540  tmp->when = ast_tv(0, 0);
541  tmp->deleted = 0;
542 
543  sched_settime(&tmp->when, when);
544  schedule(con, tmp);
545  res = tmp->sched_id->id;
546  }
547 #ifdef DUMP_SCHEDULER
548  /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
549  ast_sched_dump(con);
550 #endif
551  if (con->sched_thread) {
552  ast_cond_signal(&con->sched_thread->cond);
553  }
554  ast_mutex_unlock(&con->lock);
555 
556  return res;
557 }
558 
559 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
560 {
561  if (old_id > -1) {
562  AST_SCHED_DEL(con, old_id);
563  }
564  return ast_sched_add(con, when, callback, data);
565 }
566 
567 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
568 {
569  return ast_sched_add_variable(con, when, callback, data, 0);
570 }
571 
572 static struct sched *sched_find(struct ast_sched_context *con, int id)
573 {
574  int x;
575  size_t heap_size;
576 
577  heap_size = ast_heap_size(con->sched_heap);
578  for (x = 1; x <= heap_size; x++) {
579  struct sched *cur = ast_heap_peek(con->sched_heap, x);
580 
581  if (cur->sched_id->id == id) {
582  return cur;
583  }
584  }
585 
586  return NULL;
587 }
588 
589 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
590 {
591  struct sched *s;
592  const void *data = NULL;
593 
594  ast_mutex_lock(&con->lock);
595 
596  s = sched_find(con, id);
597  if (s) {
598  data = s->data;
599  }
600 
601  ast_mutex_unlock(&con->lock);
602 
603  return data;
604 }
605 
606 /*! \brief
607  * Delete the schedule entry with number
608  * "id". It's nearly impossible that there
609  * would be two or more in the list with that
610  * id.
611  * Deprecated in favor of ast_sched_del_nonrunning
612  * which checks running event status.
613  */
614 int ast_sched_del(struct ast_sched_context *con, int id)
615 {
616  return ast_sched_del_nonrunning(con, id) ? -1 : 0;
617 }
618 
619 /*! \brief
620  * Delete the schedule entry with number "id".
621  * If running, wait for the task to complete,
622  * check to see if it is rescheduled then
623  * schedule the release.
624  * It's nearly impossible that there would be
625  * two or more in the list with that id.
626  */
628 {
629  struct sched *s = NULL;
630  int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
631  int res = 0;
632 
633  DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
634 
635  if (id < 0) {
636  return 0;
637  }
638 
639  ast_mutex_lock(&con->lock);
640 
641  s = sched_find(con, id);
642  if (s) {
643  if (!ast_heap_remove(con->sched_heap, s)) {
644  ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->sched_id->id);
645  }
646  sched_release(con, s);
647  } else if (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
648  if (con->executing_thread_id == pthread_self()) {
649  /* The scheduled callback is trying to delete itself.
650  * Not good as that is a deadlock. */
651  ast_log(LOG_ERROR,
652  "BUG! Trying to delete sched %d from within the callback %p. "
653  "Ignoring so we don't deadlock\n",
654  id, con->currently_executing->callback);
656  /* We'll return -1 below because s is NULL.
657  * The caller will rightly assume that the unscheduling failed. */
658  } else {
659  s = con->currently_executing;
660  s->deleted = 1;
661  /* Wait for executing task to complete so that the caller of
662  * ast_sched_del() does not free memory out from under the task. */
663  while (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
664  ast_cond_wait(&s->cond, &con->lock);
665  }
666  /* This is not rescheduled so the caller of ast_sched_del_nonrunning needs to know
667  * that it was still deleted
668  */
669  if (!s->rescheduled) {
670  res = -2;
671  }
672  /* ast_sched_runq knows we are waiting on this item and is passing responsibility for
673  * its destruction to us
674  */
675  sched_release(con, s);
676  s = NULL;
677  }
678  }
679 
680 #ifdef DUMP_SCHEDULER
681  /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
682  ast_sched_dump(con);
683 #endif
684  if (con->sched_thread) {
685  ast_cond_signal(&con->sched_thread->cond);
686  }
687  ast_mutex_unlock(&con->lock);
688 
689  if(res == -2){
690  return res;
691  }
692  else if (!s && *last_id != id) {
693  ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
694  /* Removing nonexistent schedule entry shouldn't trigger assert (it was enabled in DEV_MODE);
695  * because in many places entries is deleted without having valid id. */
696  *last_id = id;
697  return -1;
698  } else if (!s) {
699  return -1;
700  }
701 
702  return res;
703 }
704 
705 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
706 {
707  int i, x;
708  struct sched *cur;
709  int countlist[cbnames->numassocs + 1];
710  size_t heap_size;
711 
712  memset(countlist, 0, sizeof(countlist));
713  ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
714 
715  ast_mutex_lock(&con->lock);
716 
717  heap_size = ast_heap_size(con->sched_heap);
718  for (x = 1; x <= heap_size; x++) {
719  cur = ast_heap_peek(con->sched_heap, x);
720  /* match the callback to the cblist */
721  for (i = 0; i < cbnames->numassocs; i++) {
722  if (cur->callback == cbnames->cblist[i]) {
723  break;
724  }
725  }
726  if (i < cbnames->numassocs) {
727  countlist[i]++;
728  } else {
729  countlist[cbnames->numassocs]++;
730  }
731  }
732 
733  ast_mutex_unlock(&con->lock);
734 
735  for (i = 0; i < cbnames->numassocs; i++) {
736  ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
737  }
738 
739  ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
740 }
741 
742 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
744 {
745  struct sched *q;
746  struct timeval when;
747  int x;
748  size_t heap_size;
749 
750  if (!DEBUG_ATLEAST(1)) {
751  return;
752  }
753 
754  when = ast_tvnow();
755 #ifdef SCHED_MAX_CACHE
756  ast_log(LOG_DEBUG, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n",
757  ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
758 #else
759  ast_log(LOG_DEBUG, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n",
760  ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
761 #endif
762 
763  ast_log(LOG_DEBUG, "=============================================================\n");
764  ast_log(LOG_DEBUG, "|ID Callback Data Time (sec:ms) |\n");
765  ast_log(LOG_DEBUG, "+-----+-----------------+-----------------+-----------------+\n");
766  ast_mutex_lock(&con->lock);
767  heap_size = ast_heap_size(con->sched_heap);
768  for (x = 1; x <= heap_size; x++) {
769  struct timeval delta;
770  q = ast_heap_peek(con->sched_heap, x);
771  delta = ast_tvsub(q->when, when);
772  ast_log(LOG_DEBUG, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
773  q->sched_id->id,
774  q->callback,
775  q->data,
776  (long)delta.tv_sec,
777  (long int)delta.tv_usec);
778  }
779  ast_mutex_unlock(&con->lock);
780  ast_log(LOG_DEBUG, "=============================================================\n");
781 }
782 
783 /*! \brief
784  * Launch all events which need to be run at this time.
785  */
787 {
788  struct sched *current;
789  struct timeval when;
790  int numevents;
791  int res;
792 
793  DEBUG(ast_debug(1, "ast_sched_runq()\n"));
794 
795  ast_mutex_lock(&con->lock);
796 
797  when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
798  for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
799  /* schedule all events which are going to expire within 1ms.
800  * We only care about millisecond accuracy anyway, so this will
801  * help us get more than one event at one time if they are very
802  * close together.
803  */
804  if (ast_tvcmp(current->when, when) != -1) {
805  break;
806  }
807 
808  current = ast_heap_pop(con->sched_heap);
809 
810  /*
811  * At this point, the schedule queue is still intact. We
812  * have removed the first event and the rest is still there,
813  * so it's permissible for the callback to add new events, but
814  * trying to delete itself won't work because it isn't in
815  * the schedule queue. If that's what it wants to do, it
816  * should return 0.
817  */
818 
819  con->currently_executing = current;
820  con->executing_thread_id = pthread_self();
821  ast_mutex_unlock(&con->lock);
822  res = current->callback(current->data);
823  ast_mutex_lock(&con->lock);
824  con->currently_executing = NULL;
825  ast_cond_signal(&current->cond);
826 
827  if (current->deleted) {
828  /*
829  * Another thread is waiting on this scheduled item. That thread
830  * will be responsible for it's destruction
831  */
832  current->rescheduled = res ? 1 : 0;
833  } else if (res) {
834  /*
835  * If they return non-zero, we should schedule them to be
836  * run again.
837  */
838  sched_settime(&current->when, current->variable ? res : current->resched);
839  schedule(con, current);
840  } else {
841  /* No longer needed, so release it */
842  sched_release(con, current);
843  }
844  }
845 
846  ast_mutex_unlock(&con->lock);
847 
848  return numevents;
849 }
850 
851 long ast_sched_when(struct ast_sched_context *con,int id)
852 {
853  struct sched *s;
854  long secs = -1;
855  DEBUG(ast_debug(1, "ast_sched_when()\n"));
856 
857  ast_mutex_lock(&con->lock);
858 
859  s = sched_find(con, id);
860  if (s) {
861  struct timeval now = ast_tvnow();
862  secs = s->when.tv_sec - now.tv_sec;
863  }
864 
865  ast_mutex_unlock(&con->lock);
866 
867  return secs;
868 }
#define AST_THREADSTORAGE(name)
Define a thread storage variable.
Definition: threadstorage.h:86
unsigned int deleted
Definition: sched.c:100
Asterisk locking-related definitions:
Asterisk main include file. File version handling, generic pbx functions.
struct sched_id * sched_id
Definition: sched.c:79
void * ast_threadstorage_get(struct ast_threadstorage *ts, size_t init_size)
Retrieve thread storage.
const void * ast_sched_find_data(struct ast_sched_context *con, int id)
Find a sched structure and return the data field associated with it.
Definition: sched.c:589
struct ast_sched_context * ast_sched_context_create(void)
Create a scheduler context.
Definition: sched.c:238
static void sched_settime(struct timeval *t, int when)
given the last event *tv and the offset in milliseconds 'when', computes the next value...
Definition: sched.c:490
int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
Schedule callback(data) to happen when ms into the future.
Definition: sched.c:526
static int add_ids(struct ast_sched_context *con)
Add new scheduler IDs to the queue.
Definition: sched.c:312
pthread_t thread
Definition: app_sla.c:329
int ast_tvzero(const struct timeval t)
Returns true if the argument is 0,0.
Definition: time.h:117
unsigned int tie_breaker
Definition: sched.c:116
Definition: heap.c:36
Definition: sched.c:76
int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
Adds a scheduled event.
Definition: sched.c:567
struct ast_heap * ast_heap_destroy(struct ast_heap *h)
Destroy a max heap.
Definition: heap.c:146
void ast_sched_context_destroy(struct ast_sched_context *con)
destroys a schedule context
Definition: sched.c:271
Scheduler ID holder.
Definition: sched.c:70
int ast_str_append(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Append to a thread local dynamic string.
Definition: strings.h:1139
void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
Clean all scheduled events with matching callback.
Definition: sched.c:409
struct timeval ast_tvnow(void)
Returns current timeval. Meant to replace calls to gettimeofday().
Definition: time.h:159
int id_queue_size
Definition: sched.c:131
int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
replace a scheduler entry
Definition: sched.c:559
#define AST_LIST_EMPTY(head)
Checks whether the specified list contains any entries.
Definition: linkedlists.h:450
int64_t ast_tvdiff_ms(struct timeval end, struct timeval start)
Computes the difference (in milliseconds) between two struct timeval instances.
Definition: time.h:107
Definitions to aid in the use of thread local storage.
void * ast_heap_pop(struct ast_heap *h)
Pop the max element off of the heap.
Definition: heap.c:262
#define SCHED_MAX_CACHE
Max num of schedule structs.
Definition: sched.c:56
struct ast_sched_context::@394 id_queue
int resched
Definition: sched.c:89
void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
Show statics on what it is in the schedule queue.
Definition: sched.c:705
void ast_log_backtrace(void)
Log a backtrace of the current thread's execution stack to the Asterisk log.
Definition: logger.c:2510
static int task(void *data)
Queued task for baseline test.
Utility functions.
int ast_str_set(struct ast_str **buf, ssize_t max_len, const char *fmt,...)
Set a dynamic string using variable arguments.
Definition: strings.h:1113
pthread_t executing_thread_id
Definition: sched.c:122
Max Heap data structure.
int ast_sched_start_thread(struct ast_sched_context *con)
Start a thread for processing scheduler entries.
Definition: sched.c:197
#define ast_heap_push(h, elm)
Push an element on to a heap.
Definition: heap.h:125
ast_mutex_t lock
General Asterisk PBX channel definitions.
static void schedule(struct ast_sched_context *con, struct sched *s)
Take a sched structure and put it in the queue, such that the soonest event is first in the list...
Definition: sched.c:460
#define AST_SCHED_DEL(sched, id)
Remove a scheduler entry.
Definition: sched.h:46
Scheduler Routines (derived from cheops)
struct timeval ast_samp2tv(unsigned int _nsamp, unsigned int _rate)
Returns a timeval corresponding to the duration of n samples at rate r. Useful to convert samples to ...
Definition: time.h:282
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_LIST_REMOVE_HEAD(head, field)
Removes and returns the head entry from a list.
Definition: linkedlists.h:833
int ast_sched_runq(struct ast_sched_context *con)
Launch all events which need to be run at this time.
Definition: sched.c:786
struct timeval when
Definition: sched.c:80
int ast_sched_wait(struct ast_sched_context *con)
Return the number of milliseconds until the next scheduled event.
Definition: sched.c:433
#define AST_LIST_HEAD_NOLOCK(name, type)
Defines a structure to be used to hold a list of specified type (with no lock).
Definition: linkedlists.h:225
int ast_tvcmp(struct timeval _a, struct timeval _b)
Compress two struct timeval instances returning -1, 0, 1 if the first arg is smaller, equal or greater to the second.
Definition: time.h:137
#define AST_LIST_INSERT_TAIL(head, elm, field)
Appends a list entry to the tail of a list.
Definition: linkedlists.h:731
Support for dynamic strings.
Definition: strings.h:623
long ast_sched_when(struct ast_sched_context *con, int id)
Returns the number of seconds before an event takes place.
Definition: sched.c:851
int ast_sched_del(struct ast_sched_context *con, int id)
Delete the schedule entry with number "id". It's nearly impossible that there would be two or more in...
Definition: sched.c:614
struct timeval ast_tvadd(struct timeval a, struct timeval b)
Returns the sum of two timevals a + b.
Definition: extconf.c:2282
int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
replace a scheduler entry
Definition: sched.c:514
#define AST_LIST_ENTRY(type)
Declare a forward link structure inside a list entry.
Definition: linkedlists.h:410
#define AST_LIST_INSERT_HEAD(head, elm, field)
Inserts a list entry at the head of a list.
Definition: linkedlists.h:711
unsigned int stop
Definition: res_smdi.c:217
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
void ast_sched_dump(struct ast_sched_context *con)
Dump the contents of the scheduler to LOG_DEBUG.
Definition: sched.c:743
const void * data
Definition: sched.c:91
int(* ast_sched_cb)(const void *data)
scheduler callback
Definition: sched.h:178
struct sched * currently_executing
Definition: sched.c:120
int ast_sched_del_nonrunning(struct ast_sched_context *con, int id)
Delete the schedule entry with number "id". If running, wait for the task to complete, check to see if it is rescheduled then schedule the release. It's nearly impossible that there would be two or more in the list with that id.
Definition: sched.c:627
#define ast_heap_create(init_height, cmp_fn, index_offset)
Create a max heap.
Definition: heap.h:100
unsigned int rescheduled
Definition: sched.c:102
#define AST_LIST_HEAD_INIT_NOLOCK(head)
Initializes a list head structure.
Definition: linkedlists.h:681
void * ast_heap_remove(struct ast_heap *h, void *elm)
Remove a specific element from a heap.
Definition: heap.c:251
unsigned int eventcnt
Definition: sched.c:113
ast_cond_t cond
Definition: sched.c:98
struct timeval ast_tv(ast_time_t sec, ast_suseconds_t usec)
Returns a timeval from sec, usec.
Definition: time.h:235
size_t ast_heap_size(struct ast_heap *h)
Get the current size of a heap.
Definition: heap.c:276
void * ast_heap_peek(struct ast_heap *h, unsigned int index)
Peek at an element on a heap.
Definition: heap.c:267
struct ast_sched_context::@393 schedc
int variable
Definition: sched.c:90
struct timeval ast_tvsub(struct timeval a, struct timeval b)
Returns the difference of two timevals a - b.
Definition: extconf.c:2297
unsigned int highwater
Definition: sched.c:114
unsigned int tie_breaker
Tie breaker in case the when is the same for multiple entries.
Definition: sched.c:88
ast_sched_cb callback
Definition: sched.c:92
int id
Definition: sched.c:72
Structure for mutex and tracking information.
Definition: lock.h:135