Asterisk - The Open Source Telephony Project  21.4.1
Data Structures | Macros | Functions
stasis_cache.c File Reference

Stasis Message API. More...

#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/hashtab.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"

Go to the source code of this file.

Data Structures

struct  cache_dump_data
 
struct  cache_entry_key
 The key for an entry in the cache. More...
 
struct  cache_put_snapshots
 
struct  stasis_cache
 
struct  stasis_cache_entry
 
struct  stasis_caching_topic
 

Macros

#define NUM_CACHE_BUCKETS   563
 

Functions

static void cache_dtor (void *obj)
 
static int cache_dump_all_cb (void *obj, void *arg, int flags)
 
static int cache_dump_by_eid_cb (void *obj, void *arg, int flags)
 
static struct stasis_messagecache_entry_by_eid (const struct stasis_cache_entry *entry, const struct ast_eid *eid)
 
static int cache_entry_cmp (void *obj, void *arg, int flags)
 
static void cache_entry_compute_hash (struct cache_entry_key *key)
 
static struct stasis_cache_entrycache_entry_create (struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
 
static void cache_entry_dtor (void *obj)
 
static int cache_entry_dump (struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
 
static int cache_entry_hash (const void *obj, int flags)
 
static struct stasis_cache_entrycache_find (struct ao2_container *entries, struct stasis_message_type *type, const char *id)
 
static struct cache_put_snapshots cache_put (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 
static struct stasis_messagecache_remove (struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
 
static struct stasis_messagecache_update (struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 
static void caching_topic_exec (void *data, struct stasis_subscription *sub, struct stasis_message *message)
 
static void print_cache_entry (void *v_obj, void *where, ao2_prnt_fn *prnt)
 
static void stasis_cache_cleanup (void)
 
struct stasis_messagestasis_cache_clear_create (struct stasis_message *id_message)
 A message which instructs the caching topic to remove an entry from its cache. More...
 
struct stasis_cachestasis_cache_create (snapshot_get_id id_fn)
 Create a cache. More...
 
struct stasis_cachestasis_cache_create_full (snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
 Create a cache. More...
 
struct ao2_containerstasis_cache_dump (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump cached items to a subscription for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_dump_all (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump all entity items from the cache to a subscription. More...
 
struct ao2_containerstasis_cache_dump_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 Dump cached items to a subscription for a specific entity. More...
 
struct stasis_messagestasis_cache_entry_get_aggregate (struct stasis_cache_entry *entry)
 Get the aggregate cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_local (struct stasis_cache_entry *entry)
 Get the local entity's cache entry snapshot. More...
 
struct stasis_messagestasis_cache_entry_get_remote (struct stasis_cache_entry *entry, int idx)
 Get a remote entity's cache entry snapshot by index. More...
 
struct stasis_messagestasis_cache_get (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve an item from the cache for the ast_eid_default entity. More...
 
struct ao2_containerstasis_cache_get_all (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve all matching entity items from the cache. More...
 
struct stasis_messagestasis_cache_get_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 Retrieve an item from the cache for a specific entity. More...
 
int stasis_cache_init (void)
 
static void stasis_cache_update_dtor (void *obj)
 
int stasis_caching_accept_message_type (struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
 Indicate to a caching topic that we are interested in a message type. More...
 
struct stasis_topicstasis_caching_get_topic (struct stasis_caching_topic *caching_topic)
 Returns the topic of cached events from a caching topics. More...
 
int stasis_caching_set_filter (struct stasis_caching_topic *caching_topic, enum stasis_subscription_message_filter filter)
 Set the message type filtering level on a cache. More...
 
struct stasis_caching_topicstasis_caching_topic_create (struct stasis_topic *original_topic, struct stasis_cache *cache)
 Create a topic which monitors and caches messages from another topic. More...
 
static void stasis_caching_topic_dtor (void *obj)
 
struct stasis_caching_topicstasis_caching_unsubscribe (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic. More...
 
struct stasis_caching_topicstasis_caching_unsubscribe_and_join (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded. More...
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_cache_clear_type)
 
 STASIS_MESSAGE_TYPE_DEFN (stasis_cache_update_type)
 
static struct stasis_messageupdate_create (struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
 

Detailed Description

Stasis Message API.

Author
David M. Lee, II dlee@.nosp@m.digi.nosp@m.um.co.nosp@m.m

Definition in file stasis_cache.c.

Function Documentation

struct stasis_message* stasis_cache_clear_create ( struct stasis_message message)

A message which instructs the caching topic to remove an entry from its cache.

Parameters
messageMessage representative of the cache entry that should be cleared. This will become the data held in the stasis_cache_clear message.
Returns
Message which, when sent to a stasis_caching_topic, will clear the item from the cache.
Return values
NULLon error.
Since
12

Definition at line 778 of file stasis_cache.c.

References stasis_cache_clear_type(), and stasis_message_create().

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), and ast_endpoint_shutdown().

779 {
780  return stasis_message_create(stasis_cache_clear_type(), id_message);
781 }
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
struct stasis_cache* stasis_cache_create ( snapshot_get_id  id_fn)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
Returns
New cache indexed by id_fn.
Return values
NULLon error
Since
12

Definition at line 360 of file stasis_cache.c.

References stasis_cache_create_full().

Referenced by mwi_init(), and stasis_cp_all_create().

361 {
362  return stasis_cache_create_full(id_fn, NULL, NULL);
363 }
struct stasis_cache * stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
Create a cache.
Definition: stasis_cache.c:334
struct stasis_cache* stasis_cache_create_full ( snapshot_get_id  id_fn,
cache_aggregate_calc_fn  aggregate_calc_fn,
cache_aggregate_publish_fn  aggregate_publish_fn 
)

Create a cache.

This is the backend store for a stasis_caching_topic. The cache is thread safe, allowing concurrent reads and writes.

The returned object is AO2 managed, so ao2_cleanup() when you're done.

Parameters
id_fnCallback to extract the id from a snapshot message.
aggregate_calc_fnCallback to calculate the aggregate cache entry.
aggregate_publish_fnCallback to publish the aggregate cache entry.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
New cache indexed by id_fn.
Return values
NULLon error
Since
12.2.0

Definition at line 334 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, AO2_ALLOC_OPT_LOCK_RWLOCK, ao2_container_alloc_hash, and cache.

Referenced by devstate_init(), and stasis_cache_create().

337 {
338  struct stasis_cache *cache;
339 
340  cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
342  if (!cache) {
343  return NULL;
344  }
345 
347  NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
348  if (!cache->entries) {
349  ao2_cleanup(cache);
350  return NULL;
351  }
352 
353  cache->id_fn = id_fn;
354  cache->aggregate_calc_fn = aggregate_calc_fn;
355  cache->aggregate_publish_fn = aggregate_publish_fn;
356 
357  return cache;
358 }
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
static struct sorcery_test_caching cache
Global scope caching structure for testing.
Definition: test_sorcery.c:178
struct ao2_container* stasis_cache_dump ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump cached items to a subscription for the ast_eid_default entity.

Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error
Since
12

Definition at line 736 of file stasis_cache.c.

References ast_eid_default, and stasis_cache_dump_by_eid().

Referenced by ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), and xmpp_init_event_distribution().

737 {
738  return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
739 }
struct ao2_container * stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
Dump cached items to a subscription for a specific entity.
Definition: stasis_cache.c:718
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
struct ao2_container* stasis_cache_dump_all ( struct stasis_cache cache,
struct stasis_message_type type 
)

Dump all entity items from the cache to a subscription.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error

Definition at line 757 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, OBJ_MULTIPLE, and OBJ_NODATA.

758 {
759  struct cache_dump_data cache_dump;
760 
761  ast_assert(cache != NULL);
762  ast_assert(cache->entries != NULL);
763 
764  cache_dump.eid = NULL;
765  cache_dump.type = type;
766  cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
767  if (!cache_dump.container) {
768  return NULL;
769  }
770 
771  ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
772  return cache_dump.container;
773 }
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container, as described below.
Definition: astobj2.h:1693
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
struct ao2_container* stasis_cache_dump_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const struct ast_eid eid 
)

Dump cached items to a subscription for a specific entity.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to dump (any type if NULL).
eidSpecific entity id to retrieve. NULL for aggregate.
Returns
ao2_container containing all matches (must be unreffed by caller)
Return values
NULLon allocation error

Definition at line 718 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_callback, ao2_container_alloc_list, OBJ_MULTIPLE, and OBJ_NODATA.

Referenced by stasis_cache_dump().

719 {
720  struct cache_dump_data cache_dump;
721 
722  ast_assert(cache != NULL);
723  ast_assert(cache->entries != NULL);
724 
725  cache_dump.eid = eid;
726  cache_dump.type = type;
727  cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
728  if (!cache_dump.container) {
729  return NULL;
730  }
731 
732  ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
733  return cache_dump.container;
734 }
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container, as described below.
Definition: astobj2.h:1693
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
struct stasis_message* stasis_cache_entry_get_aggregate ( struct stasis_cache_entry entry)

Get the aggregate cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the aggregate snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values
Aggregate-snapshotin cache.
NULLif not present.

Definition at line 365 of file stasis_cache.c.

References stasis_cache_entry::aggregate.

366 {
367  return entry->aggregate;
368 }
struct stasis_message * aggregate
Definition: stasis_cache.c:176
struct stasis_message* stasis_cache_entry_get_local ( struct stasis_cache_entry entry)

Get the local entity's cache entry snapshot.

Since
12.2.0
Parameters
entryCache entry to get the local entity's snapshot.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Internal-snapshotin cache.
NULLif not present.

Definition at line 370 of file stasis_cache.c.

References stasis_cache_entry::local.

371 {
372  return entry->local;
373 }
struct stasis_message * local
Definition: stasis_cache.c:178
struct stasis_message* stasis_cache_entry_get_remote ( struct stasis_cache_entry entry,
int  idx 
)

Get a remote entity's cache entry snapshot by index.

Since
12.2.0
Parameters
entryCache entry to get a remote entity's snapshot.
idxWhich remote entity's snapshot to get.
Note
A reference is not given to the returned pointer so don't unref it.
Return values
Remote-entity-snapshotin cache.
NULLif not present.

Definition at line 375 of file stasis_cache.c.

References AST_VECTOR_GET, AST_VECTOR_SIZE, and stasis_cache_entry::remote.

376 {
377  if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378  return AST_VECTOR_GET(&entry->remote, idx);
379  }
380  return NULL;
381 }
struct stasis_cache_entry::@402 remote
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
Definition: vector.h:680
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
Definition: vector.h:609
struct stasis_message* stasis_cache_get ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve an item from the cache for the ast_eid_default entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Returns
Message from the cache.
Return values
NULLif message is not found.
Since
12

Definition at line 686 of file stasis_cache.c.

References ast_eid_default, and stasis_cache_get_by_eid().

Referenced by ast_endpoint_latest_snapshot(), and update_registry().

687 {
688  return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
689 }
struct stasis_message * stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
Retrieve an item from the cache for a specific entity.
Definition: stasis_cache.c:659
struct ast_eid ast_eid_default
Global EID.
Definition: options.c:93
struct ao2_container* stasis_cache_get_all ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id 
)

Retrieve all matching entity items from the cache.

Since
12.2.0
Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
Returns
Container of matching items found.
Return values
NULLif error.

Definition at line 587 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, and ao2_container_alloc_list.

588 {
589  struct stasis_cache_entry *cached_entry;
590  struct ao2_container *found;
591 
592  ast_assert(cache != NULL);
593  ast_assert(cache->entries != NULL);
594  ast_assert(id != NULL);
595 
596  if (!type) {
597  return NULL;
598  }
599 
600  found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
601  if (!found) {
602  return NULL;
603  }
604 
605  ao2_rdlock(cache->entries);
606 
607  cached_entry = cache_find(cache->entries, type, id);
608  if (cached_entry && cache_entry_dump(found, cached_entry)) {
609  ao2_cleanup(found);
610  found = NULL;
611  }
612 
613  ao2_unlock(cache->entries);
614 
615  ao2_cleanup(cached_entry);
616  return found;
617 }
#define ao2_container_alloc_list(ao2_options, container_options, sort_fn, cmp_fn)
Allocate and initialize a list container.
Definition: astobj2.h:1327
Generic container type.
Definition: stasis_cache.c:173
struct stasis_message* stasis_cache_get_by_eid ( struct stasis_cache cache,
struct stasis_message_type type,
const char *  id,
const struct ast_eid eid 
)

Retrieve an item from the cache for a specific entity.

The returned item is AO2 managed, so ao2_cleanup() when you're done with it.

Parameters
cacheThe cache to query.
typeType of message to retrieve.
idIdentity of the snapshot to retrieve.
eidSpecific entity id to retrieve. NULL for aggregate.
Note
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Returns
Message from the cache.
Return values
NULLif message is not found.
Since
12.2.0

Definition at line 659 of file stasis_cache.c.

References ao2_bump.

Referenced by ast_delete_mwi_state_full(), ast_device_state_clear_cache(), and stasis_cache_get().

660 {
661  struct stasis_cache_entry *cached_entry;
662  struct stasis_message *snapshot = NULL;
663 
664  ast_assert(cache != NULL);
665  ast_assert(cache->entries != NULL);
666  ast_assert(id != NULL);
667 
668  if (!type) {
669  return NULL;
670  }
671 
672  ao2_rdlock(cache->entries);
673 
674  cached_entry = cache_find(cache->entries, type, id);
675  if (cached_entry) {
676  snapshot = cache_entry_by_eid(cached_entry, eid);
677  ao2_bump(snapshot);
678  }
679 
680  ao2_unlock(cache->entries);
681 
682  ao2_cleanup(cached_entry);
683  return snapshot;
684 }
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
Definition: stasis_cache.c:173
int stasis_caching_accept_message_type ( struct stasis_caching_topic caching_topic,
struct stasis_message_type type 
)

Indicate to a caching topic that we are interested in a message type.

This will cause the caching topic to receive messages of the given message type. This enables internal filtering in the stasis message bus to reduce messages.

Parameters
caching_topicThe caching topic.
typeThe message type we wish to receive.
Return values
0on success
-1failure
Since
17.0.0

Definition at line 90 of file stasis_cache.c.

References stasis_cache_clear_type(), stasis_subscription_accept_message_type(), and stasis_subscription_change_type().

Referenced by devstate_init(), and stasis_cp_single_accept_message_type().

92 {
93  int res;
94 
95  if (!caching_topic) {
96  return -1;
97  }
98 
99  /* We wait to accept the stasis specific message types until now so that by default everything
100  * will flow to us.
101  */
104  res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105 
106  return res;
107 }
struct stasis_message_type * stasis_cache_clear_type(void)
Message type for clearing a message from a stasis cache.
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, const struct stasis_message_type *type)
Indicate to a subscription that we are interested in a message type.
Definition: stasis.c:1023
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
struct stasis_topic* stasis_caching_get_topic ( struct stasis_caching_topic caching_topic)

Returns the topic of cached events from a caching topics.

Parameters
caching_topicThe caching topic.
Returns
The topic that publishes cache update events, along with passthrough events from the underlying topic.
Return values
NULLif caching_topic is NULL.
Since
12

Definition at line 85 of file stasis_cache.c.

Referenced by ast_device_state_topic_cached(), ast_mwi_topic_cached(), ast_presence_state_topic_cached(), stasis_cp_single_create(), and stasis_cp_single_topic_cached().

86 {
87  return caching_topic->topic;
88 }
int stasis_caching_set_filter ( struct stasis_caching_topic caching_topic,
enum stasis_subscription_message_filter  filter 
)

Set the message type filtering level on a cache.

This will cause the underlying subscription to filter messages according to the provided filter level. For example if selective is used then only messages matching those provided to stasis_subscription_accept_message_type will be raised to the subscription callback.

Parameters
caching_topicThe caching topic.
filterWhat filter to use
Return values
0on success
-1failure
Since
17.0.0

Definition at line 109 of file stasis_cache.c.

References stasis_subscription_set_filter().

Referenced by devstate_init(), and stasis_cp_single_set_filter().

111 {
112  if (!caching_topic) {
113  return -1;
114  }
115  return stasis_subscription_set_filter(caching_topic->sub, filter);
116 }
int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a subscription.
Definition: stasis.c:1077
static int filter(struct ast_channel *chan, const char *cmd, char *parse, char *buf, size_t len)
Definition: func_strings.c:807
struct stasis_caching_topic* stasis_caching_topic_create ( struct stasis_topic original_topic,
struct stasis_cache cache 
)

Create a topic which monitors and caches messages from another topic.

The idea is that some topics publish 'snapshots' of some other object's state that should be cached. When these snapshot messages are received, the cache is updated, and a stasis_cache_update() message is forwarded, which has both the original snapshot message and the new message.

The returned object is AO2 managed, so ao2_cleanup() when done with it.

Parameters
original_topicTopic publishing snapshot messages.
cacheBackend cache in which to keep snapshots.
Returns
New topic which changes snapshot messages to stasis_cache_update() messages, and forwards all other messages from the original topic.
Return values
NULLon error
Since
12

Definition at line 948 of file stasis_cache.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_container_register(), ao2_ref, ast_asprintf, ast_atomic_fetchadd_int(), cache, internal_stasis_subscribe(), stasis_topic_create(), and stasis_topic_name().

Referenced by devstate_init(), mwi_init(), and stasis_cp_sink_create().

949 {
950  struct stasis_caching_topic *caching_topic;
951  static int caching_id;
952  char *new_name;
953  int ret;
954 
955  ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
956  if (ret < 0) {
957  return NULL;
958  }
959 
960  caching_topic = ao2_alloc_options(sizeof(*caching_topic),
961  stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
962  if (caching_topic == NULL) {
963  ast_free(new_name);
964 
965  return NULL;
966  }
967 
968  caching_topic->topic = stasis_topic_create(new_name);
969  if (caching_topic->topic == NULL) {
970  ao2_ref(caching_topic, -1);
971  ast_free(new_name);
972 
973  return NULL;
974  }
975 
976  ao2_ref(cache, +1);
977  caching_topic->cache = cache;
978  if (!cache->registered) {
979  if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
980  ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
981  cache->entries, new_name);
982  } else {
983  cache->registered = 1;
984  }
985  }
986  ast_free(new_name);
987 
988  caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
989  if (caching_topic->sub == NULL) {
990  ao2_ref(caching_topic, -1);
991 
992  return NULL;
993  }
994 
995  ao2_ref(original_topic, +1);
996  caching_topic->original_topic = original_topic;
997 
998  /* The subscription holds the reference, so no additional ref bump. */
999  return caching_topic;
1000 }
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
Definition: lock.h:757
struct stasis_subscription * internal_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool, const char *file, int lineno, const char *func)
Create a subscription.
Definition: stasis.c:856
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
Definition: astmm.h:267
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
Definition: stasis.c:617
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
Definition: stasis.c:627
static struct sorcery_test_caching cache
Global scope caching structure for testing.
Definition: test_sorcery.c:178
struct stasis_caching_topic* stasis_caching_unsubscribe ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic.

This function returns immediately, so be sure to cleanup when stasis_subscription_final_message() is received.

Parameters
caching_topicCaching topic to unsubscribe
Return values
NULLfor convenience
Since
12

Definition at line 119 of file stasis_cache.c.

References ao2_ref, stasis_subscription_is_subscribed(), and stasis_unsubscribe().

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_cp_single_unsubscribe().

120 {
121  if (!caching_topic) {
122  return NULL;
123  }
124 
125  /*
126  * The subscription may hold the last reference to this caching
127  * topic, but we want to make sure the unsubscribe finishes
128  * before kicking of the caching topic's dtor.
129  */
130  ao2_ref(caching_topic, +1);
131 
132  if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133  /*
134  * Increment the reference to hold on to it past the
135  * unsubscribe. Will be cleaned up in dtor.
136  */
137  ao2_ref(caching_topic->sub, +1);
138  stasis_unsubscribe(caching_topic->sub);
139  } else {
140  ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141  }
142  ao2_cleanup(caching_topic);
143  return NULL;
144 }
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
Returns whether a subscription is currently subscribed.
Definition: stasis.c:1150
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
Definition: stasis.c:971
struct stasis_caching_topic* stasis_caching_unsubscribe_and_join ( struct stasis_caching_topic caching_topic)

Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.

See stasis_unsubscribe_and_join() for more info on when to use this as opposed to stasis_caching_unsubscribe().

Parameters
caching_topicCaching topic to unsubscribe
Return values
NULLfor convenience
Since
12

Definition at line 146 of file stasis_cache.c.

References ao2_ref, stasis_caching_unsubscribe(), and stasis_subscription_join().

147 {
148  if (!caching_topic) {
149  return NULL;
150  }
151 
152  /* Hold a ref past the unsubscribe */
153  ao2_ref(caching_topic, +1);
154  stasis_caching_unsubscribe(caching_topic);
155  stasis_subscription_join(caching_topic->sub);
156  ao2_cleanup(caching_topic);
157  return NULL;
158 }
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
void stasis_subscription_join(struct stasis_subscription *subscription)
Block until the last message is processed on a subscription.
Definition: stasis.c:1105
struct stasis_caching_topic * stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
Unsubscribes a caching topic from its upstream topic.
Definition: stasis_cache.c:119