Asterisk - The Open Source Telephony Project  21.4.1
aeap.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2021, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <kharwell@sangoma.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 #include "asterisk.h"
20 
21 #include <pthread.h>
22 
23 #include "asterisk/astobj2.h"
24 #include "asterisk/strings.h"
25 
26 #include "asterisk/res_aeap.h"
28 
29 #include "logger.h"
30 #include "transaction.h"
31 #include "transport.h"
32 
33 #define AEAP_RECV_SIZE 32768
34 
36  /*! The user data object */
37  void *obj;
38  /*! A user data identifier */
39  char id[0];
40 };
41 
44 
45 #define USER_DATA_BUCKETS 11
46 
47 struct ast_aeap {
48  /*! This object's configuration parameters */
49  const struct ast_aeap_params *params;
50  /*! Container for registered user data objects */
52  /*! Transactions container */
54  /*! Transport layer communicator */
56  /*! Id of thread that reads data from the transport */
57  pthread_t read_thread_id;
58 };
59 
60 static int tsx_end(void *obj, void *arg, int flags)
61 {
62  aeap_transaction_end(obj, -1);
63 
64  return 0;
65 }
66 
67 static void aeap_destructor(void *obj)
68 {
69  struct ast_aeap *aeap = obj;
70 
71  /* Disconnect things first, which keeps transactions from further executing */
72  ast_aeap_disconnect(aeap);
73 
74  aeap_transport_destroy(aeap->transport);
75 
76  /*
77  * Each contained transaction holds a pointer back to this transactions container,
78  * which is removed upon transaction end. Thus by explicitly ending each transaction
79  * here we can ensure all references to the transactions container are removed.
80  */
82  tsx_end, NULL);
83  ao2_cleanup(aeap->transactions);
84 
85  ao2_cleanup(aeap->user_data);
86 }
87 
88 struct ast_aeap *ast_aeap_create(const char *transport_type,
89  const struct ast_aeap_params *params)
90 {
91  struct ast_aeap *aeap;
92 
93  aeap = ao2_alloc(sizeof(*aeap), aeap_destructor);
94  if (!aeap) {
95  ast_log(LOG_ERROR, "AEAP: unable to create");
96  return NULL;
97  }
98 
99  aeap->params = params;
100  aeap->read_thread_id = AST_PTHREADT_NULL;
101 
102  aeap->user_data = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, USER_DATA_BUCKETS,
103  aeap_user_data_hash_fn, NULL, aeap_user_data_cmp_fn);
104  if (!aeap->user_data) {
105  aeap_error(aeap, NULL, "unable to create user data container");
106  ao2_ref(aeap, -1);
107  return NULL;
108  }
109 
110  aeap->transactions = aeap_transactions_create();
111  if (!aeap->transactions) {
112  aeap_error(aeap, NULL, "unable to create transactions container");
113  ao2_ref(aeap, -1);
114  return NULL;
115  }
116 
117  aeap->transport = aeap_transport_create(transport_type);
118  if (!aeap->transport) {
119  aeap_error(aeap, NULL, "unable to create transport");
120  ao2_ref(aeap, -1);
121  return NULL;
122  }
123 
124  return aeap;
125 }
126 
127 static struct aeap_user_data *aeap_user_data_create(const char *id, void *obj,
129 {
130  struct aeap_user_data *data;
131 
132  ast_assert(id != NULL);
133 
134  data = ao2_t_alloc_options(sizeof(*data) + strlen(id) + 1, cleanup,
136  if (!data) {
137  if (cleanup) {
138  cleanup(obj);
139  }
140 
141  return NULL;
142  }
143 
144  strcpy(data->id, id); /* safe */
145  data->obj = obj;
146 
147  return data;
148 }
149 
150 int ast_aeap_user_data_register(struct ast_aeap *aeap, const char *id, void *obj,
152 {
153  struct aeap_user_data *data;
154 
155  data = aeap_user_data_create(id, obj, cleanup);
156  if (!data) {
157  return -1;
158  }
159 
160  if (!ao2_link(aeap->user_data, data)) {
161  ao2_ref(data, -1);
162  return -1;
163  }
164 
165  ao2_ref(data, -1);
166  return 0;
167 }
168 
169 void ast_aeap_user_data_unregister(struct ast_aeap *aeap, const char *id)
170 {
171  ao2_find(aeap->user_data, id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
172 }
173 
174 void *ast_aeap_user_data_object_by_id(struct ast_aeap *aeap, const char *id)
175 {
176  struct aeap_user_data *data;
177  void *obj;
178 
179  data = ao2_find(aeap->user_data, id, OBJ_SEARCH_KEY);
180  if (!data) {
181  return NULL;
182  }
183 
184  obj = data->obj;
185  ao2_ref(data, -1);
186 
187  /*
188  * Returned object's lifetime is based on how it was registered.
189  * See public function docs for more info
190  */
191  return obj;
192 }
193 
194 static int raise_msg_handler(struct ast_aeap *aeap, const struct ast_aeap_message_handler *handlers,
195  size_t size, struct ast_aeap_message *msg, void *data)
196 {
197  ast_aeap_on_message on_message = NULL;
198  size_t i;
199 
200  if (!aeap->params->emit_error) {
201  const char *error_msg = ast_aeap_message_error_msg(msg);
202 
203  if (error_msg) {
204  aeap_error(aeap, NULL, "%s", error_msg);
205  return -1;
206  }
207 
208  /* If no error_msg then it's assumed this is not an error message */
209  }
210 
211  for (i = 0; i < size; ++i) {
212  if (ast_strlen_zero(handlers[i].name)) {
213  /* A default handler is specified. Use it if no other match is found */
214  on_message = handlers[i].on_message;
215  continue;
216  }
217 
218  if (ast_aeap_message_is_named(msg, handlers[i].name)) {
219  on_message = handlers[i].on_message;
220  break;
221  }
222  }
223 
224  if (on_message) {
225  return on_message(aeap, msg, data);
226  }
227 
228  /* Respond with un-handled error */
231  "Unsupported and/or un-handled message"));
232 
233  return 0;
234 }
235 
236 static void raise_msg(struct ast_aeap *aeap, const void *buf, intmax_t size,
237  enum AST_AEAP_DATA_TYPE serial_type)
238 {
239  struct ast_aeap_message *msg;
240  struct aeap_transaction *tsx;
241  int res = 0;
242 
243  if (!aeap->params || !aeap->params->msg_type ||
244  ast_aeap_message_serial_type(aeap->params->msg_type) != serial_type ||
245  !(msg = ast_aeap_message_deserialize(aeap->params->msg_type, buf, size))) {
246  return;
247  }
248 
249  /* See if this msg is involved in a transaction */
250  tsx = aeap_transaction_get(aeap->transactions, ast_aeap_message_id(msg));
251 
252  /* If so go ahead and cancel the timeout timer */
253  aeap_transaction_cancel_timer(tsx);
254 
256  res = raise_msg_handler(aeap, aeap->params->request_handlers, aeap->params->request_handlers_size,
257  msg, tsx ? aeap_transaction_user_obj(tsx) : NULL);
258  } else if (aeap->params->response_handlers && ast_aeap_message_is_response(msg)) {
259  res = raise_msg_handler(aeap, aeap->params->response_handlers, aeap->params->response_handlers_size,
260  msg, tsx ? aeap_transaction_user_obj(tsx) : NULL);
261  }
262 
263  /* Complete transaction (Note, removes tsx ref) */
264  aeap_transaction_end(tsx, res);
265 
266  ao2_ref(msg, -1);
267 }
268 
269 static void *aeap_receive(void *data)
270 {
271  struct ast_aeap *aeap = data;
272  void *buf;
273 
274  buf = ast_calloc(1, AEAP_RECV_SIZE);
275  if (!buf) {
276  aeap_error(aeap, NULL, "unable to create read buffer");
277  goto aeap_receive_error;
278  }
279 
280  while (aeap_transport_is_connected(aeap->transport)) {
281  enum AST_AEAP_DATA_TYPE rtype;
282  intmax_t size;
283 
284  size = aeap_transport_read(aeap->transport, buf, AEAP_RECV_SIZE, &rtype);
285  if (size < 0) {
286  goto aeap_receive_error;
287  }
288 
289  if (!size) {
290  continue;
291  }
292 
293  switch (rtype) {
294  case AST_AEAP_DATA_TYPE_BINARY:
295  if (aeap->params && aeap->params->on_binary) {
296  aeap->params->on_binary(aeap, buf, size);
297  }
298  break;
299  case AST_AEAP_DATA_TYPE_STRING:
300  ast_debug(3, "AEAP: received message: %s\n", (char *)buf);
301  if (aeap->params && aeap->params->on_string) {
302  aeap->params->on_string(aeap, (const char *)buf, size - 1);
303  }
304  break;
305  default:
306  break;
307  }
308 
309  raise_msg(aeap, buf, size, rtype);
310  };
311 
312  ast_free(buf);
313  return NULL;
314 
315 aeap_receive_error:
316  /*
317  * An unrecoverable error occurred so ensure the aeap and transport reset
318  * to a disconnected state. We don't want this thread to "join" itself so set
319  * its id to NULL prior to disconnecting.
320  */
321  aeap_error(aeap, NULL, "unrecoverable read error, disconnecting");
322 
323  ao2_lock(aeap);
324  aeap->read_thread_id = AST_PTHREADT_NULL;
325  ao2_unlock(aeap);
326 
327  ast_aeap_disconnect(aeap);
328 
329  ast_free(buf);
330 
331  if (aeap->params && aeap->params->on_error) {
332  aeap->params->on_error(aeap);
333  }
334 
335  return NULL;
336 }
337 
338 int ast_aeap_connect(struct ast_aeap *aeap, const char *url, const char *protocol, int timeout)
339 {
340  SCOPED_AO2LOCK(lock, aeap);
341 
342  if (aeap_transport_is_connected(aeap->transport)) {
343  /* Should already be connected, so nothing to do */
344  return 0;
345  }
346 
347  if (aeap_transport_connect(aeap->transport, url, protocol, timeout)) {
348  aeap_error(aeap, NULL, "unable to connect transport");
349  return -1;
350  }
351 
352  if (ast_pthread_create_background(&aeap->read_thread_id, NULL,
353  aeap_receive, aeap)) {
354  aeap_error(aeap, NULL, "unable to start read thread: %s",
355  strerror(errno));
356  ast_aeap_disconnect(aeap);
357  return -1;
358  }
359 
360  return 0;
361 }
362 
363 struct ast_aeap *ast_aeap_create_and_connect(const char *type,
364  const struct ast_aeap_params *params, const char *url, const char *protocol, int timeout)
365 {
366  struct ast_aeap *aeap;
367 
368  aeap = ast_aeap_create(type, params);
369  if (!aeap) {
370  return NULL;
371  }
372 
373  if (ast_aeap_connect(aeap, url, protocol, timeout)) {
374  ao2_ref(aeap, -1);
375  return NULL;
376  }
377 
378  return aeap;
379 }
380 
381 int ast_aeap_disconnect(struct ast_aeap *aeap)
382 {
383  ao2_lock(aeap);
384 
385  aeap_transport_disconnect(aeap->transport);
386 
387  if (aeap->read_thread_id != AST_PTHREADT_NULL) {
388  /*
389  * The read thread calls disconnect if an error occurs, so
390  * unlock the aeap before "joining" to avoid a deadlock.
391  */
392  ao2_unlock(aeap);
393  pthread_join(aeap->read_thread_id, NULL);
394  ao2_lock(aeap);
395 
396  aeap->read_thread_id = AST_PTHREADT_NULL;
397  }
398 
399  ao2_unlock(aeap);
400 
401  return 0;
402 }
403 
404 static int aeap_send(struct ast_aeap *aeap, const void *buf, uintmax_t size,
405  enum AST_AEAP_DATA_TYPE type)
406 {
407  intmax_t num;
408 
409  num = aeap_transport_write(aeap->transport, buf, size, type);
410 
411  if (num == 0) {
412  /* Nothing written, could be disconnected */
413  return 0;
414  }
415 
416  if (num < 0) {
417  aeap_error(aeap, NULL, "error sending data");
418  return -1;
419  }
420 
421  if (num < size) {
422  aeap_error(aeap, NULL, "not all data sent");
423  return -1;
424  }
425 
426  if (num > size) {
427  aeap_error(aeap, NULL, "sent data truncated");
428  return -1;
429  }
430 
431  return 0;
432 }
433 
434 int ast_aeap_send_binary(struct ast_aeap *aeap, const void *buf, uintmax_t size)
435 {
436  return aeap_send(aeap, buf, size, AST_AEAP_DATA_TYPE_BINARY);
437 }
438 
439 int ast_aeap_send_msg(struct ast_aeap *aeap, struct ast_aeap_message *msg)
440 {
441  void *buf;
442  intmax_t size;
443  int res;
444 
445  if (!msg) {
446  aeap_error(aeap, NULL, "no message to send");
447  return -1;
448  }
449 
450  if (ast_aeap_message_serialize(msg, &buf, &size)) {
451  aeap_error(aeap, NULL, "unable to serialize outgoing message");
452  ao2_ref(msg, -1);
453  return -1;
454  }
455 
456  res = aeap_send(aeap, buf, size, msg->type->serial_type);
457 
458  ast_free(buf);
459  ao2_ref(msg, -1);
460 
461  return res;
462 }
463 
465 {
466  struct aeap_transaction *tsx = NULL;
467  int res = 0;
468 
469  if (!params) {
470  return -1;
471  }
472 
473  if (!params->msg) {
474  aeap_transaction_params_cleanup(params);
475  aeap_error(aeap, NULL, "no message to send");
476  return -1;
477  }
478 
479  /* The transaction will take over params cleanup, which includes the msg reference */
480  tsx = aeap_transaction_create_and_add(aeap->transactions,
481  ast_aeap_message_id(params->msg), params, aeap);
482  if (!tsx) {
483  return -1;
484  }
485 
486  if (ast_aeap_send_msg(aeap, ao2_bump(params->msg))) {
487  aeap_transaction_end(tsx, -1); /* Removes container, and tsx ref */
488  return -1;
489  }
490 
491  if (aeap_transaction_start(tsx)) {
492  aeap_transaction_end(tsx, -1); /* Removes container, and tsx ref */
493  return -1;
494  }
495 
496  res = aeap_transaction_result(tsx);
497 
498  ao2_ref(tsx, -1);
499 
500  return res;
501 }
enum AST_AEAP_DATA_TYPE ast_aeap_message_serial_type(const struct ast_aeap_message_type *type)
Retrieve the serial type a message type.
Asterisk External Application Protocol API.
void(* on_binary)(struct ast_aeap *aeap, const void *buf, intmax_t size)
Raised when binary data is received.
Definition: res_aeap.h:171
Callbacks and other parameters used by an Asterisk external application object.
Definition: res_aeap.h:144
void ast_aeap_user_data_unregister(struct ast_aeap *aeap, const char *id)
Un-register a user data object.
Definition: aeap.c:169
Asterisk main include file. File version handling, generic pbx functions.
AO2_STRING_FIELD_HASH_FN(transport_monitor, key)
Hashing function for struct transport_monitor.
void(* on_string)(struct ast_aeap *aeap, const char *buf, intmax_t size)
Raised when string data is received.
Definition: res_aeap.h:180
String manipulation functions.
Asterisk External Application Protocol Message API.
The arg parameter is a search key, but is not an object.
Definition: astobj2.h:1101
unsigned int emit_error
Definition: res_aeap.h:149
struct ao2_container * transactions
Definition: aeap.c:53
const struct ast_aeap_message_handler * request_handlers
Definition: res_aeap.h:160
#define ao2_callback(c, flags, cb_fn, arg)
ao2_callback() is a generic function that applies cb_fn() to all objects in a container, as described below.
Definition: astobj2.h:1693
int ast_aeap_message_is_named(const struct ast_aeap_message *message, const char *name)
Check whether or not a message's name matches the given one.
void(* ast_aeap_user_obj_cleanup)(void *obj)
Callback to cleanup a user object.
Definition: res_aeap.h:130
struct aeap_transport * transport
Definition: aeap.c:55
int(* ast_aeap_on_message)(struct ast_aeap *aeap, struct ast_aeap_message *message, void *obj)
Event raised when a message is received.
Definition: res_aeap.h:101
AO2_STRING_FIELD_CMP_FN(transport_monitor, key)
Comparison function for struct transport_monitor.
uintmax_t request_handlers_size
Definition: res_aeap.h:162
const struct ast_aeap_message_type * msg_type
Definition: res_aeap.h:152
#define ao2_t_alloc_options(data_size, destructor_fn, options, debug_msg)
Allocate and initialize an object.
Definition: astobj2.h:402
int ast_aeap_send_msg_tsx(struct ast_aeap *aeap, struct ast_aeap_tsx_params *params)
Send a transaction based message to an external application using the given parameters.
Definition: aeap.c:464
void * ast_aeap_user_data_object_by_id(struct ast_aeap *aeap, const char *id)
Retrieve a registered user data object by its id.
Definition: aeap.c:174
Parameters to be used when sending a transaction based message.
Definition: res_aeap.h:331
struct ast_aeap * ast_aeap_create(const char *type, const struct ast_aeap_params *params)
Create an Asterisk external application object.
Definition: aeap.c:88
struct ao2_container * user_data
Definition: aeap.c:51
const char * ast_aeap_message_id(const struct ast_aeap_message *message)
Retrieve a message id.
#define ao2_bump(obj)
Bump refcount on an AO2 object by one, returning the object.
Definition: astobj2.h:480
static void cleanup(void)
Clean up any old apps that we don't need any more.
Definition: res_stasis.c:327
ast_mutex_t lock
int ast_aeap_send_msg(struct ast_aeap *aeap, struct ast_aeap_message *msg)
Send a message to an external application.
Definition: aeap.c:439
int ast_aeap_connect(struct ast_aeap *aeap, const char *url, const char *protocol, int timeout)
Connect to an external application.
Definition: aeap.c:338
int ast_aeap_disconnect(struct ast_aeap *aeap)
Disconnect an Asterisk external application object.
Definition: aeap.c:381
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
Definition: astobj2.h:459
#define ast_debug(level,...)
Log a DEBUG message.
const char * ast_aeap_message_name(const struct ast_aeap_message *message)
Retrieve a message name.
int ast_aeap_message_is_request(const struct ast_aeap_message *message)
Retrieve whether or not this is a request message.
const struct ast_aeap_params * params
Definition: aeap.c:49
struct ast_aeap * ast_aeap_create_and_connect(const char *type, const struct ast_aeap_params *params, const char *url, const char *protocol, int timeout)
Create and connect to an Asterisk external application.
Definition: aeap.c:363
int ast_aeap_message_serialize(const struct ast_aeap_message *message, void **buf, intmax_t *size)
Serialize the given message object into a byte/char buffer.
#define ao2_container_alloc_hash(ao2_options, container_options, n_buckets, hash_fn, sort_fn, cmp_fn)
Allocate and initialize a hash container with the desired number of buckets.
Definition: astobj2.h:1303
char id[0]
Definition: aeap.c:39
int ast_aeap_user_data_register(struct ast_aeap *aeap, const char *id, void *obj, ast_aeap_user_obj_cleanup cleanup)
Register a user data object.
Definition: aeap.c:150
void(* on_error)(struct ast_aeap *aeap)
Raised when an error occurs during reading.
Definition: res_aeap.h:192
struct ast_aeap_message * ast_aeap_message_deserialize(const struct ast_aeap_message_type *type, const void *buf, intmax_t size)
Deserialize the given buffer into an Asterisk external application message object.
Asterisk external application base message.
enum AST_AEAP_DATA_TYPE serial_type
An Asterisk external application message handler.
Definition: res_aeap.h:108
#define SCOPED_AO2LOCK(varname, obj)
scoped lock specialization for ao2 mutexes.
Definition: lock.h:604
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
AST_AEAP_DATA_TYPE
Supported Asterisk external application data types.
Definition: res_aeap.h:135
Asterisk external application transport structure to be "derived" by specific transport implementatio...
Definition: transport.h:98
struct ast_aeap_message * ast_aeap_message_create_error(const struct ast_aeap_message_type *type, const char *name, const char *id, const char *error_msg)
Create an Asterisk external application error response object.
void * obj
Definition: aeap.c:37
struct ast_aeap_message * msg
Definition: res_aeap.h:333
const struct ast_aeap_message_handler * response_handlers
Definition: res_aeap.h:155
Generic container type.
const struct ast_aeap_message_type * type
uintmax_t response_handlers_size
Definition: res_aeap.h:157
const char * ast_aeap_message_error_msg(const struct ast_aeap_message *message)
Retrieve the error message if it has one.
int ast_aeap_message_is_response(const struct ast_aeap_message *message)
Retrieve whether or not this is a response message.
int ast_aeap_send_binary(struct ast_aeap *aeap, const void *buf, uintmax_t size)
Send a binary data to an external application.
Definition: aeap.c:434
pthread_t read_thread_id
Definition: aeap.c:57
Definition: aeap.c:47
ast_aeap_on_message on_message
Definition: res_aeap.h:112
#define ao2_link(container, obj)
Add an object to a container.
Definition: astobj2.h:1532