35 #include "asterisk/stasis_channels.h"
37 #include "asterisk/stasis_message_router.h"
42 #define ENDPOINT_CHANNEL_BUCKETS 127
45 #define ENDPOINT_BUCKETS 127
48 #define TECH_ENDPOINT_BUCKETS 11
84 struct ast_endpoint *endpoint = ao2_find(endpoints,
id,
OBJ_KEY);
87 endpoint = ao2_find(tech_endpoints,
id,
OBJ_KEY);
122 static void endpoint_publish_snapshot(
struct ast_endpoint *endpoint)
127 ast_assert(endpoint != NULL);
128 ast_assert(endpoint->
topics != NULL);
145 static void endpoint_dtor(
void *obj)
151 ao2_cleanup(endpoint->
router);
167 ast_assert(chan != NULL);
168 ast_assert(endpoint != NULL);
169 ast_assert(!ast_strlen_zero(endpoint->
resource));
175 ao2_unlock(endpoint);
177 endpoint_publish_snapshot(endpoint);
195 ast_assert(endpoint != NULL);
199 ao2_unlock(endpoint);
200 endpoint_publish_snapshot(endpoint);
203 static void endpoint_subscription_change(
void *data,
207 struct stasis_endpoint *endpoint = data;
210 ao2_cleanup(endpoint);
221 if (!ast_strlen_zero(resource)) {
222 tech_endpoint = ao2_find(tech_endpoints, tech,
OBJ_KEY);
223 if (!tech_endpoint) {
224 tech_endpoint = endpoint_internal_create(tech, NULL);
225 if (!tech_endpoint) {
231 endpoint = ao2_alloc(
sizeof(*endpoint), endpoint_dtor);
236 endpoint->max_channels = -1;
246 !ast_strlen_zero(resource) ?
"/" :
"",
253 if (!endpoint->channel_ids) {
257 if (!ast_strlen_zero(resource)) {
261 ret =
ast_asprintf(&topic_name,
"endpoint:%s", endpoint->id);
268 ast_free(topic_name);
269 if (!endpoint->topics) {
276 if (!endpoint->router) {
292 endpoint_publish_snapshot(endpoint);
298 ret =
ast_asprintf(&topic_name,
"endpoint:%s", endpoint->id);
305 ast_free(topic_name);
306 if (!endpoint->topics) {
321 if (ast_strlen_zero(tech)) {
322 ast_log(LOG_ERROR,
"Endpoint tech cannot be empty\n");
326 if (ast_strlen_zero(resource)) {
327 ast_log(LOG_ERROR,
"Endpoint resource cannot be empty\n");
331 return endpoint_internal_create(tech, resource);
354 if (endpoint == NULL) {
361 clear_msg = create_endpoint_snapshot_message(endpoint);
380 return endpoint->
tech;
404 return endpoint->
state;
410 ast_assert(endpoint != NULL);
411 ast_assert(!ast_strlen_zero(endpoint->
resource));
414 endpoint->
state = state;
415 ao2_unlock(endpoint);
416 endpoint_publish_snapshot(endpoint);
422 ast_assert(endpoint != NULL);
423 ast_assert(!ast_strlen_zero(endpoint->
resource));
427 ao2_unlock(endpoint);
428 endpoint_publish_snapshot(endpoint);
431 static void endpoint_snapshot_dtor(
void *obj)
436 ast_assert(snapshot != NULL);
438 for (channel = 0; channel < snapshot->
num_channels; channel++) {
454 ast_assert(endpoint != NULL);
455 ast_assert(!ast_strlen_zero(endpoint->
resource));
459 snapshot = ao2_alloc_options(
460 sizeof(*snapshot) + channel_count *
sizeof(
char *),
461 endpoint_snapshot_dtor,
465 ao2_cleanup(snapshot);
478 while ((obj = ao2_iterator_next(&i))) {
487 static void endpoint_cleanup(
void)
489 ao2_cleanup(endpoints);
492 ao2_cleanup(tech_endpoints);
493 tech_endpoints = NULL;
501 ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
508 if (!tech_endpoints) {
struct stasis_topic * stasis_cp_single_topic_cached(struct stasis_cp_single *one)
Get the caching topic for this instance.
Main Channel structure associated with a channel.
struct ast_channel_snapshot_base * base
Asterisk main include file. File version handling, generic pbx functions.
AO2_STRING_FIELD_HASH_FN(transport_monitor, key)
Hashing function for struct transport_monitor.
int ao2_container_count(struct ao2_container *c)
Returns the number of elements in a container.
int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data)
Add a route to a message router.
struct ast_endpoint_snapshot * ast_endpoint_snapshot_create(struct ast_endpoint *endpoint)
Create a snapshot of an endpoint.
const char * ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
Gets the resource name of the given endpoint.
struct ao2_container * channel_ids
int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint)
Forward channel stasis messages to the given endpoint.
Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.
struct stasis_message_type * ast_endpoint_snapshot_type(void)
Message type for ast_endpoint_snapshot.
struct ast_endpoint * ast_endpoint_create(const char *tech, const char *resource)
Create an endpoint struct.
#define ENDPOINT_CHANNEL_BUCKETS
struct stasis_forward * tech_forward
struct ast_endpoint * ast_endpoint_find_by_id(const char *id)
Finds the endpoint with the given tech[/resource] id.
AO2_STRING_FIELD_CMP_FN(transport_monitor, key)
Comparison function for struct transport_monitor.
struct stasis_topic * ast_endpoint_topic(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
Stops caching and forwarding messages.
int ast_endpoint_add_channel(struct ast_endpoint *endpoint, struct ast_channel *chan)
Adds a channel to the given endpoint.
void ast_str_container_remove(struct ao2_container *str_container, const char *remove)
Removes a string from a string container allocated by ast_str_container_alloc.
const ast_string_field uniqueid
struct stasis_message * stasis_cache_clear_create(struct stasis_message *message)
A message which instructs the caching topic to remove an entry from its cache.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define AST_DECLARE_STRING_FIELDS(field_list)
Declare the fields needed in a structure.
const ast_string_field resource
Structure representing a change of snapshot of channel state.
int stasis_message_router_is_done(struct stasis_message_router *router)
Returns whether router has received its final message.
ast_endpoint_state
Valid states for an endpoint.
struct stasis_topic * ast_endpoint_topic_all_cached(void)
Cached topic for all endpoint related messages.
struct stasis_cp_single * stasis_cp_single_create(struct stasis_cp_all *all, const char *name)
Create the 'one' side of the cache pattern.
#define ast_asprintf(ret, fmt,...)
A wrapper for asprintf()
enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
Gets the state of the given endpoint.
int ast_register_cleanup(void(*func)(void))
Register a function to be executed before Asterisk gracefully exits.
struct ao2_container * ast_str_container_alloc_options(enum ao2_alloc_opts opts, int buckets)
Allocates a hash container for bare strings.
#define ast_string_field_init(x, size)
Initialize a field pool and fields.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define AST_STRING_FIELD(name)
Declare a string field.
const char * ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
Gets the technology of the given endpoint.
#define TECH_ENDPOINT_BUCKETS
A snapshot of an endpoint's state.
struct stasis_topic * ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
Returns the topic for a specific endpoint.
const ast_string_field id
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
#define ao2_unlink(container, obj)
Remove an object from a container.
void * stasis_message_data(const struct stasis_message *msg)
Get the data contained in a message.
struct stasis_message * stasis_message_create(struct stasis_message_type *type, void *data)
Create a new message.
enum ast_endpoint_state state
struct stasis_message_router * router
const ast_string_field tech
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
Determine whether a message is the final message to be received on a subscription.
static struct stasis_rest_handlers endpoints
REST handler for /api-docs/endpoints.json.
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
Publish a message to a topic's subscribers.
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
struct stasis_topic * stasis_cp_single_topic(struct stasis_cp_single *one)
Get the topic for this instance.
struct stasis_topic * ast_endpoint_topic_all(void)
Topic for all endpoint related messages.
Prototypes for public functions only of internal interest,.
struct ast_channel_snapshot * new_snapshot
struct stasis_message_type * ast_channel_snapshot_type(void)
Message type for ast_channel_snapshot_update.
#define ast_string_field_build(x, field, fmt, args...)
Set a field to a complex (built) value.
void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
Updates the state of the given endpoint.
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
Unsubscribe the router from the upstream topic.
int stasis_cp_single_set_filter(struct stasis_cp_single *one, enum stasis_subscription_message_filter filter)
Set the message type filtering level on a cache.
int max_channels
Max channels for this endpoint. -1 means unlimited or unknown.
void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint, int max_channels)
Updates the maximum number of channels an endpoint supports.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
int ast_endpoint_init(void)
Endpoint support initialization.
#define S_OR(a, b)
returns the equivalent of logic or for strings: first one if not empty, otherwise second one...
struct stasis_message_type * stasis_subscription_change_type(void)
Gets the message type for subscription change notices.
const char * ast_endpoint_state_to_string(enum ast_endpoint_state state)
Returns a string representation of the given endpoint state.
int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, struct stasis_message_type *type)
Indicate to an instance that we are interested in a message type.
struct stasis_forward * stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
Create a subscription which forwards all messages from one topic to another.
void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
Shutsdown an ast_endpoint.
#define RAII_VAR(vartype, varname, initval, dtor)
Declare a variable that will call a destructor function when it goes out of scope.
#define ast_string_field_free_memory(x)
free all memory - to be called before destroying the object
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
enum ast_endpoint_state state
int ast_str_container_add(struct ao2_container *str_container, const char *add)
Adds a string to a string container allocated by ast_str_container_alloc.
static void endpoint_cache_clear(void *data, struct stasis_subscription *sub, struct stasis_message *message)
Handler for channel snapshot update.
struct stasis_cp_single * topics
const char * ast_endpoint_get_id(const struct ast_endpoint *endpoint)
Gets the tech/resource id of the given endpoint.
struct stasis_cp_single * stasis_cp_sink_create(struct stasis_cp_all *all, const char *name)
Create a sink in the cache pattern.
#define ast_string_field_set(x, field, data)
Set a field to a simple string value.
#define ao2_link(container, obj)
Add an object to a container.