40 #define NUM_CACHE_BUCKETS 17
42 #define NUM_CACHE_BUCKETS 563
62 static void stasis_caching_topic_dtor(
void *obj)
75 ao2_cleanup(caching_topic->sub);
76 caching_topic->sub = NULL;
77 ao2_cleanup(caching_topic->cache);
78 caching_topic->cache = NULL;
79 ao2_cleanup(caching_topic->topic);
80 caching_topic->topic = NULL;
81 ao2_cleanup(caching_topic->original_topic);
82 caching_topic->original_topic = NULL;
87 return caching_topic->topic;
112 if (!caching_topic) {
121 if (!caching_topic) {
137 ao2_ref(caching_topic->sub, +1);
140 ast_log(LOG_ERROR,
"stasis_caching_topic unsubscribed multiple times\n");
142 ao2_cleanup(caching_topic);
148 if (!caching_topic) {
156 ao2_cleanup(caching_topic);
183 static
void cache_entry_dtor(
void *obj)
188 entry->key.
type = NULL;
189 ast_free((
char *) entry->key.
id);
190 entry->key.
id = NULL;
194 ao2_cleanup(entry->
local);
217 ast_assert(
id != NULL);
218 ast_assert(snapshot != NULL);
224 entry = ao2_alloc_options(
sizeof(*entry), cache_entry_dtor,
231 if (!entry->key.
id) {
244 entry->key.
type = type;
245 cache_entry_compute_hash(&entry->key);
259 entry->
local = snapshot;
266 static int cache_entry_hash(
const void *obj,
int flags)
285 return (
int)key->
hash;
288 static int cache_entry_cmp(
void *obj,
void *arg,
int flags)
295 switch (flags & OBJ_SEARCH_MASK) {
297 right_key = &object_right->key;
300 cmp = object_left->key.
type != right_key->
type
301 || strcmp(object_left->key.
id, right_key->
id);
326 static void cache_dtor(
void *obj)
330 ao2_cleanup(cache->entries);
331 cache->entries = NULL;
340 cache = ao2_alloc_options(
sizeof(*cache), cache_dtor,
347 NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
348 if (!cache->entries) {
353 cache->id_fn = id_fn;
354 cache->aggregate_calc_fn = aggregate_calc_fn;
355 cache->aggregate_publish_fn = aggregate_publish_fn;
401 search_key.type = type;
403 cache_entry_compute_hash(&search_key);
432 old_snapshot = cached_entry->
local;
433 cached_entry->
local = NULL;
474 old_snapshot = cached_entry->
local;
512 ast_assert(cache->entries != NULL);
513 ast_assert(eid != NULL);
514 ast_assert(new_snapshot == NULL ||
517 memset(&snapshots, 0,
sizeof(snapshots));
519 ao2_wrlock(cache->entries);
521 cached_entry = cache_find(cache->entries, type,
id);
527 snapshots.
old = cache_remove(cache->entries, cached_entry, eid);
529 }
else if (cached_entry) {
534 cached_entry = cache_entry_create(type,
id, new_snapshot);
541 if (cache->aggregate_calc_fn && cached_entry) {
542 snapshots.
aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
547 ao2_unlock(cache->entries);
549 ao2_cleanup(cached_entry);
568 ast_assert(snapshots != NULL);
569 ast_assert(entry != NULL);
581 err |= !
ao2_link(snapshots, snapshot);
592 ast_assert(cache != NULL);
593 ast_assert(cache->entries != NULL);
594 ast_assert(
id != NULL);
605 ao2_rdlock(cache->entries);
607 cached_entry = cache_find(cache->entries, type,
id);
608 if (cached_entry && cache_entry_dump(found, cached_entry)) {
613 ao2_unlock(cache->entries);
615 ao2_cleanup(cached_entry);
664 ast_assert(cache != NULL);
665 ast_assert(cache->entries != NULL);
666 ast_assert(
id != NULL);
672 ao2_rdlock(cache->entries);
674 cached_entry = cache_find(cache->entries, type,
id);
676 snapshot = cache_entry_by_eid(cached_entry, eid);
680 ao2_unlock(cache->entries);
682 ao2_cleanup(cached_entry);
697 static int cache_dump_by_eid_cb(
void *obj,
void *arg,
int flags)
702 if (!cache_dump->type || entry->key.
type == cache_dump->type) {
705 snapshot = cache_entry_by_eid(entry, cache_dump->eid);
707 if (!
ao2_link(cache_dump->container, snapshot)) {
708 ao2_cleanup(cache_dump->container);
709 cache_dump->container = NULL;
722 ast_assert(cache != NULL);
723 ast_assert(cache->entries != NULL);
725 cache_dump.eid = eid;
726 cache_dump.type = type;
728 if (!cache_dump.container) {
733 return cache_dump.container;
741 static int cache_dump_all_cb(
void *obj,
void *arg,
int flags)
746 if (!cache_dump->type || entry->key.
type == cache_dump->type) {
747 if (cache_entry_dump(cache_dump->container, entry)) {
748 ao2_cleanup(cache_dump->container);
749 cache_dump->container = NULL;
761 ast_assert(cache != NULL);
762 ast_assert(cache->entries != NULL);
764 cache_dump.eid = NULL;
765 cache_dump.type = type;
767 if (!cache_dump.container) {
772 return cache_dump.container;
783 static void stasis_cache_update_dtor(
void *obj)
791 ao2_cleanup(update->
type);
800 ast_assert(old_snapshot != NULL || new_snapshot != NULL);
806 update = ao2_alloc_options(
sizeof(*update), stasis_cache_update_dtor,
844 ast_assert(caching_topic != NULL);
845 ast_assert(caching_topic->topic != NULL);
846 ast_assert(caching_topic->cache != NULL);
847 ast_assert(caching_topic->cache->id_fn != NULL);
850 caching_topic_needs_unref = caching_topic;
852 caching_topic_needs_unref = NULL;
865 if (strcmp(change->
description,
"Unsubscribe") == 0) {
868 ao2_wrlock(caching_topic->cache->entries);
871 ao2_cleanup(cache_remove(caching_topic->cache->entries, cached_sub,
stasis_message_eid(message)));
872 ao2_cleanup(cached_sub);
874 ao2_unlock(caching_topic->cache->entries);
875 ao2_cleanup(caching_topic_needs_unref);
890 ast_assert(msg_type != NULL);
893 msg_id = caching_topic->cache->id_fn(msg);
894 if (msg_id && msg_eid) {
899 snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
900 if (snapshots.old || msg_put) {
902 update = update_create(snapshots.old, msg_put);
910 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
915 if (snapshots.aggregate_old != snapshots.aggregate_new) {
916 if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
917 caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
918 snapshots.aggregate_new);
921 update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
929 ao2_cleanup(snapshots.old);
930 ao2_cleanup(snapshots.aggregate_old);
931 ao2_cleanup(snapshots.aggregate_new);
934 ao2_cleanup(caching_topic_needs_unref);
937 static void print_cache_entry(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
945 entry->key.
id, entry->key.
hash);
951 static int caching_id;
960 caching_topic = ao2_alloc_options(
sizeof(*caching_topic),
962 if (caching_topic == NULL) {
969 if (caching_topic->topic == NULL) {
977 caching_topic->cache =
cache;
978 if (!cache->registered) {
980 ast_log(LOG_ERROR,
"Stasis cache container '%p' for '%s' did not register\n",
981 cache->entries, new_name);
983 cache->registered = 1;
988 caching_topic->sub =
internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989 if (caching_topic->sub == NULL) {
996 caching_topic->original_topic = original_topic;
999 return caching_topic;
1002 static void stasis_cache_cleanup(
void)
1008 int stasis_cache_init(
void)
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
struct stasis_message * stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
Get a remote entity's cache entry snapshot by index.
const char * stasis_message_type_name(const struct stasis_message_type *type)
Gets the name of a given message type.
void( ao2_prnt_fn)(void *where, const char *fmt,...)
Print output.
Asterisk main include file. File version handling, generic pbx functions.
struct stasis_message * old_snapshot
Old value from the cache.
struct stasis_message * stasis_cache_clear_create(struct stasis_message *id_message)
A message which instructs the caching topic to remove an entry from its cache.
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
The arg parameter is a search key, but is not an object.
#define STASIS_MESSAGE_TYPE_INIT(name)
Boiler-plate messaging macro for initializing message types.
struct stasis_cache * stasis_cache_create(snapshot_get_id id_fn)
Create a cache.
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
Return the number of subscribers of a topic.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
void(* cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate)
Callback to publish the aggregate cache entry message.
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container, as described below.
struct ao2_container * stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
Dump all entity items from the cache to a subscription.
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Generic (perhaps overly so) hashtable implementation Hash Table support in Asterisk.
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
Boiler-plate messaging macro for cleaning up message types.
struct stasis_message * stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve an item from the cache for the ast_eid_default entity.
Assume that the ao2_container is already locked.
struct stasis_message_type * stasis_message_type(const struct stasis_message *msg)
Get the message type for a stasis_message.
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
struct stasis_topic * stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
Returns the topic of cached events from a caching topics.
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
#define ast_strdup(str)
A wrapper for strdup()
struct stasis_message * old
struct ao2_container * stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
Retrieve all matching entity items from the cache.
unsigned int stasis_message_type_hash(const struct stasis_message_type *type)
Gets the hash of a given message type.
An Entity ID is essentially a MAC address, brief and unique.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
struct stasis_message *(* cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
Callback to calculate the aggregate cache entry.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
Get the aggregate cache entry snapshot.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
The arg parameter is a partial search key similar to OBJ_SEARCH_KEY.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
const struct ast_eid * stasis_message_eid(const struct stasis_message *msg)
Get the entity id for a stasis_message.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_message_type * stasis_cache_update_type(void)
Message type for cache update messages.
struct stasis_message * aggregate_new
struct stasis_message * stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
Get the local entity's cache entry snapshot.
struct ao2_container * container
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_VECTOR(name, type)
Define a vector structure.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
static int cache_update(struct stasis_message *msg, const void *data)
Message matcher looking for cache update messages.
struct stasis_message * aggregate_old
struct stasis_cache_entry::@402 remote
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
Boiler-plate messaging macro for defining public message types.
struct stasis_message * new_snapshot
New value.
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_message_type * type
Convenience reference to snapshot type.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
The key for an entry in the cache.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
Holds details about changes to subscriptions for the specified topic.
Vector container support.
unsigned int ast_hashtab_hash_string(const void *obj)
Hashes a string to a number.
struct ast_eid ast_eid_default
Global EID.
The arg parameter is an object of the same type.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
int stasis_subscription_is_done(struct stasis_subscription *subscription)
Returns whether subscription has received its final message.
stasis_subscription_message_filter
Stasis subscription message filters.
int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
Indicate to a caching topic that we are interested in a message type.
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
struct stasis_message * local
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
#define ao2_unlink_flags(container, obj, flags)
Remove an object from a container.
struct stasis_caching_topic * stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Create a topic which monitors and caches messages from another topic.
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_caching_topic * stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded...
static struct sorcery_test_caching cache
Global scope caching structure for testing.
Search option field mask.
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
struct stasis_message_type * type
struct ao2_container * stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
Dump cached items to a subscription for the ast_eid_default entity.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
struct stasis_message * aggregate
const char *(* snapshot_get_id)(struct stasis_message *message)
Callback extract a unique identity from a snapshot message.
#define ao2_link(container, obj)
Add an object to a container.