77 #define STATE_BUCKETS 57
116 ast_assert(
id != NULL && *(
id + 1) !=
'\0');
121 static void state_dtor(
void *obj)
126 ao2_cleanup(state->
topic);
128 ao2_cleanup(state->
msg);
136 static void state_proxy_dtor(
void *obj) {
137 struct stasis_state_proxy *proxy = obj;
142 static void state_proxy_sub_cb(
void *obj,
void *data)
144 struct stasis_state_proxy *proxy = obj;
170 const char *file,
int line,
const char *func)
172 struct stasis_state_proxy *proxy = NULL;
177 ast_assert(state_topic != NULL);
180 id = state_id_by_topic(manager->
all_topic, state_topic);
213 state->
topic = state_topic;
216 proxy = ao2_t_weakproxy_alloc(
sizeof(*proxy) + strlen(
id) + 1, state_proxy_dtor,
id);
221 strcpy(proxy->
id,
id);
223 state->
id = proxy->
id;
236 if (ao2_t_weakproxy_set_object(proxy, state,
OBJ_NOLOCK,
"weakproxy link")) {
253 ast_log(LOG_ERROR,
"Unable to allocate state '%s' in manager '%s'\n",
271 #define state_find_or_add(manager, state_topic, id) __state_find_or_add(manager, state_topic, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
274 const char *file,
int line,
const char *func)
278 ao2_lock(manager->
states);
279 if (ast_strlen_zero(
id)) {
280 id = state_id_by_topic(manager->
all_topic, state_topic);
285 state = state_alloc(manager, state_topic,
id, file, line, func);
288 ao2_unlock(manager->
states);
293 static void state_manager_dtor(
void *obj)
299 char *container_name =
306 ao2_cleanup(manager->
states);
314 static void state_prnt_obj(
void *v_obj,
void *where,
ao2_prnt_fn *prnt)
329 manager = ao2_alloc_options(
sizeof(*manager), state_manager_dtor,
336 STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
355 char *container_name =
375 state = state_find_or_add(manager, NULL,
id);
380 topic = state->
topic;
392 static void subscriber_dtor(
void *obj)
406 ao2_lock(sub->
state);
408 ao2_unlock(sub->
state);
421 ast_log(LOG_ERROR,
"Unable to create subscriber to %s/%s\n",
426 sub->
state = state_find_or_add(manager, NULL,
id);
432 ao2_lock(sub->
state);
434 ao2_unlock(sub->
state);
458 ast_debug(3,
"Creating stasis state subscription to id '%s'. Topic: '%s':%p %d\n",
461 sub->
stasis_sub = stasis_subscribe_pool(topic, callback, data);
507 ao2_lock(sub->
state);
509 ao2_unlock(sub->
state);
525 static void publisher_dtor(
void *obj)
539 ast_log(LOG_ERROR,
"Unable to create publisher to %s/%s\n",
544 pub->
state = state_find_or_add(manager, NULL,
id);
565 ao2_lock(pub->
state);
567 ao2_unlock(pub->
state);
621 static void state_find_and_remove_eid(
struct stasis_state *state,
const struct ast_eid *eid)
644 state = state_find_or_add(manager, NULL,
id);
650 state_find_or_add_eid(state, eid);
674 ast_debug(5,
"Attempted to remove state for id '%s', but state not found\n",
id);
683 state_find_and_remove_eid(state, eid);
722 res = handler(state->
id, msg, data);
727 static int handle_stasis_state_proxy(
void *obj,
void *arg,
void *
data,
int flags)
733 res = handle_stasis_state(state, arg, data);
744 ast_assert(handler != NULL);
747 handle_stasis_state_proxy, handler, data);
750 static int handle_stasis_state_subscribed(
void *obj,
void *arg,
void *data,
int flags)
756 res = handle_stasis_state(state, arg, data);
767 ast_assert(handler != NULL);
770 handle_stasis_state_subscribed, handler, data);
Managed stasis state event interface.
struct stasis_forward * forward
#define AST_VECTOR_FREE(vec)
Deallocates this vector.
struct stasis_state * state
#define AST_VECTOR_RW_INIT(vec, size)
Initialize a vector with a read/write lock.
void( ao2_prnt_fn)(void *where, const char *fmt,...)
Print output.
Asterisk main include file. File version handling, generic pbx functions.
#define AST_VECTOR_REMOVE_ELEM_UNORDERED(vec, elem, cleanup)
Remove an element from a vector.
struct stasis_topic * topic
AO2_STRING_FIELD_HASH_FN(transport_monitor, key)
Hashing function for struct transport_monitor.
struct stasis_state_subscriber * stasis_state_subscribe_pool(struct stasis_state_manager *manager, const char *id, stasis_subscription_cb callback, void *data)
Add a subscriber, and subscribe to its underlying stasis topic.
struct stasis_state::@405 eids
struct stasis_state_subscriber * stasis_state_add_subscriber(struct stasis_state_manager *manager, const char *id)
Add a subscriber to the managed stasis state for the given id.
void * stasis_state_unsubscribe(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic and stasis state.
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and remove an implicit publisher.
void stasis_state_remove_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Remove an observer (will no longer receive managed state related events).
struct stasis_topic * stasis_state_subscriber_topic(struct stasis_state_subscriber *sub)
Retrieve the subscriber's topic.
The arg parameter is a search key, but is not an object.
struct ao2_container * observers
Registered global observers.
#define AST_VECTOR_RW_UNLOCK(vec)
Unlock vector.
const char * stasis_state_publisher_id(const struct stasis_state_publisher *pub)
Retrieve the publisher's underlying state's unique id.
struct stasis_state_manager * manager
The manager that owns and handles this state.
#define AST_VECTOR_APPEND(vec, elem)
Append an element to a vector, growing the vector if needed.
void * stasis_state_subscriber_data(struct stasis_state_subscriber *sub)
Retrieve the last known state stasis message payload for the subscriber.
Assume that the ao2_container is already locked.
AO2_STRING_FIELD_CMP_FN(transport_monitor, key)
Comparison function for struct transport_monitor.
#define AST_VECTOR_REMOVE_UNORDERED(vec, idx)
Remove an element from an unordered vector by index.
#define AST_VECTOR_RW_RDLOCK(vec)
Obtain read lock on vector.
int(* on_stasis_state)(const char *id, struct stasis_message *msg, void *user_data)
The delegate called for each managed state.
struct stasis_message * msg
const char * stasis_state_subscriber_id(const struct stasis_state_subscriber *sub)
Retrieve the underlying subscribed to state's unique id.
#define ao2_link_flags(container, obj, flags)
Add an object to a container.
int ao2_weakproxy_subscribe(void *weakproxy, ao2_weakproxy_notification_cb cb, void *data, int flags)
Request notification when weakproxy points to NULL.
static struct stasis_topic * manager_topic
A stasis_topic that all topics AMI cares about will be forwarded to.
void * stasis_state_unsubscribe_and_join(struct stasis_state_subscriber *sub)
Unsubscribe from the stasis topic, block until the final message is received, and then unsubscribe fr...
struct stasis_state_manager * stasis_state_manager_create(const char *topic_name)
Create a stasis state manager.
int stasis_state_add_observer(struct stasis_state_manager *manager, struct stasis_state_observer *observer)
Add an observer to receive managed state related events.
An Entity ID is essentially a MAC address, brief and unique.
int ast_eid_cmp(const struct ast_eid *eid1, const struct ast_eid *eid2)
Compare two EIDs.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
struct stasis_topic * all_topic
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
void ao2_container_unregister(const char *name)
Unregister a container for CLI stats and integrity check.
int ao2_container_register(const char *name, struct ao2_container *self, ao2_prnt_obj_fn *prnt_obj)
Register a container for CLI stats and integrity check.
#define AST_VECTOR_INIT(vec, size)
Initialize a vector.
#define AST_VECTOR_GET_ADDR(vec, idx)
Get an address of element in a vector.
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed state call the given handler.
#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem)
Vector element cleanup that does nothing.
#define AST_VECTOR_RW_FREE(vec)
Deallocates this locked vector.
#define ao2_weakproxy_get_object(weakproxy, flags)
Get the object associated with weakproxy.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
struct stasis_state_publisher * stasis_state_add_publisher(struct stasis_state_manager *manager, const char *id)
Add a publisher to the managed state for the given id.
struct stasis_state_manager * manager
#define ast_debug(level,...)
Log a DEBUG message.
#define AST_VECTOR(name, type)
Define a vector structure.
struct stasis_topic * stasis_topic_create(const char *name)
Create a new topic.
#define ast_alloca(size)
call __builtin_alloca to ensure we get gcc builtin semantics
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler, void *data)
For each managed, and explicitly subscribed state call the given handler.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
#define ao2_unlink(container, obj)
Remove an object from a container.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg)
Publish to a managed named by id topic, and add an implicit subscriber.
const char * stasis_topic_name(const struct stasis_topic *topic)
Return the name of a topic.
struct stasis_subscription * stasis_sub
struct stasis_subscription * stasis_state_subscriber_subscription(struct stasis_state_subscriber *sub)
Retrieve the stasis topic subscription if available.
struct stasis_subscription * stasis_unsubscribe(struct stasis_subscription *subscription)
Cancel a subscription.
static struct sorcery_test_observer observer
Global scope observer structure for testing.
void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Callback function type for Stasis subscriptions.
#define ao2_weakproxy_find(c, arg, flags, tag)
Perform an ao2_find on a container with ao2_weakproxy objects, returning the real object...
AST_VECTOR_RW(ast_sorcery_object_wizards, struct ast_sorcery_object_wizard *)
Interface for a sorcery object type wizards.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
#define AST_VECTOR_RW_WRLOCK(vec)
Obtain write lock on vector.
struct stasis_subscription * stasis_unsubscribe_and_join(struct stasis_subscription *subscription)
Cancel a subscription, blocking until the last message is processed.
struct stasis_topic * stasis_state_all_topic(struct stasis_state_manager *manager)
Retrieve the manager's topic (the topic that all state topics get forwarded to)
struct ast_eid ast_eid_default
Global EID.
#define AST_VECTOR_GET(vec, idx)
Get an element from a vector.
#define ao2_replace(dst, src)
Replace one object reference with another cleaning up the original.
struct stasis_state * state
static int force_inline attribute_pure ast_begins_with(const char *str, const char *prefix)
Checks whether a string begins with another.
unsigned int num_subscribers
struct ao2_container * states
struct stasis_topic * stasis_state_publisher_topic(struct stasis_state_publisher *pub)
Retrieve the publisher's topic.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void stasis_state_publish(struct stasis_state_publisher *pub, struct stasis_message *msg)
Publish to a managed state (topic) using a publisher.
#define AST_VECTOR_SIZE(vec)
Get the number of elements in a vector.
struct stasis_topic * stasis_state_topic(struct stasis_state_manager *manager, const char *id)
Retrieve a managed topic creating one if not currently managed.