Asterisk - The Open Source Telephony Project  21.4.1
transport_websocket.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2021, Sangoma Technologies Corporation
5  *
6  * Kevin Harwell <kharwell@sangoma.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 
19 #include "asterisk.h"
20 
22 #include "asterisk/utils.h"
23 
24 #include "logger.h"
25 #include "transport.h"
26 #include "transport_websocket.h"
27 
28 #define log_error(obj, fmt, ...) aeap_error(obj, "websocket", fmt, ##__VA_ARGS__)
29 
31  /*! Derive from base transport (must be first attribute) */
33  /*! The underlying websocket */
34  struct ast_websocket *ws;
35 };
36 
37 static int websocket_connect(struct aeap_transport *self, const char *url,
38  const char *protocol, int timeout)
39 {
40  struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
41  enum ast_websocket_result ws_result;
42  struct ast_websocket_client_options ws_options = {
43  .uri = url,
44  .protocols = protocol,
45  .timeout = timeout,
46  .tls_cfg = NULL,
47  };
48 
49  transport->ws = ast_websocket_client_create_with_options(&ws_options, &ws_result);
50  if (ws_result != WS_OK) {
51  log_error(self, "connect failure (%d)", (int)ws_result);
52  return -1;
53  }
54 
55  return 0;
56 }
57 
58 static int websocket_disconnect(struct aeap_transport *self)
59 {
60  struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
61 
62  if (transport->ws) {
63  ast_websocket_unref(transport->ws);
64  transport->ws = NULL;
65  }
66 
67  return 0;
68 }
69 
70 static void websocket_destroy(struct aeap_transport *self)
71 {
72  /*
73  * Disconnect takes care of cleaning up the websocket. Note, disconnect
74  * was called by the base/dispatch interface prior to calling this
75  * function so nothing to do here.
76  */
77 }
78 
79 static intmax_t websocket_read(struct aeap_transport *self, void *buf, intmax_t size,
80  enum AST_AEAP_DATA_TYPE *rtype)
81 {
82  struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
83 
84  char *payload;
85  uint64_t bytes_read = 0;
86  uint64_t total_bytes_read = 0;
87  enum ast_websocket_opcode opcode;
88  int fragmented = 0;
89 
90  *rtype = AST_AEAP_DATA_TYPE_NONE;
91 
92  if (ast_websocket_fd(transport->ws) < 0) {
93  log_error(self, "unavailable for reading");
94  /* Ensure this transport is in a disconnected state */
95  aeap_transport_disconnect(self);
96  return -1;
97  }
98 
99  /*
100  * This function is called with the read_lock locked. However, the lock needs to be
101  * unlocked while waiting for input otherwise a deadlock can occur during disconnect
102  * (disconnect attempts to grab the lock but can't because read holds it here). So
103  * unlock it prior to waiting.
104  */
105  ast_mutex_unlock(&transport->base.read_lock);
106  while (ast_websocket_wait_for_input(transport->ws, -1) <= 0) {
107  /* If this was poll getting interrupted just go back to waiting */
108  if (errno == EINTR || errno == EAGAIN) {
109  continue;
110  }
111 
112  ast_mutex_lock(&transport->base.read_lock);
113  log_error(self, "poll failure: %s", strerror(errno));
114  /* Ensure this transport is in a disconnected state */
115  aeap_transport_disconnect(self);
116  return -1;
117  }
118  ast_mutex_lock(&transport->base.read_lock);
119 
120  if (!transport->ws) {
121  /*
122  * It's possible the transport was told to disconnect while waiting for input.
123  * If so then the websocket will be NULL, so we don't want to continue.
124  */
125  return 0;
126  }
127 
128  do {
129  if (ast_websocket_read(transport->ws, &payload, &bytes_read, &opcode,
130  &fragmented) != 0) {
131  log_error(self, "read failure (%d): %s", opcode, strerror(errno));
132  return -1;
133  }
134 
135  if (!bytes_read) {
136  continue;
137  }
138 
139  if (total_bytes_read + bytes_read > size) {
140  log_error(self, "attempted to read too many bytes into (%jd) sized buffer", size);
141  return -1;
142  }
143 
144  memcpy(buf + total_bytes_read, payload, bytes_read);
145  total_bytes_read += bytes_read;
146 
147  } while (opcode == AST_WEBSOCKET_OPCODE_CONTINUATION);
148 
149  switch (opcode) {
151  log_error(self, "closed");
152  return -1;
154  *rtype = AST_AEAP_DATA_TYPE_BINARY;
155  break;
157  *rtype = AST_AEAP_DATA_TYPE_STRING;
158 
159  /* Append terminator, but check for overflow first */
160  if (total_bytes_read == size) {
161  log_error(self, "unable to write string terminator");
162  return -1;
163  }
164 
165  *((char *)(buf + total_bytes_read)) = '\0';
166  break;
167  default:
168  /* Ignore all other message types */
169  return 0;
170  }
171 
172  return total_bytes_read;
173 }
174 
175 static intmax_t websocket_write(struct aeap_transport *self, const void *buf, intmax_t size,
176  enum AST_AEAP_DATA_TYPE wtype)
177 {
178  struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
179  intmax_t res = 0;
180 
181  switch (wtype) {
182  case AST_AEAP_DATA_TYPE_BINARY:
184  (char *)buf, size);
185  break;
186  case AST_AEAP_DATA_TYPE_STRING:
188  (char *)buf, size);
189  break;
190  default:
191  break;
192  }
193 
194  if (res < 0) {
195  log_error(self, "problem writing to websocket (closed)");
196 
197  /*
198  * If the underlying socket is closed then ensure the
199  * transport is in a disconnected state as well.
200  */
201  aeap_transport_disconnect(self);
202 
203  return res;
204  }
205 
206  return size;
207 }
208 
209 static struct aeap_transport_vtable *transport_websocket_vtable(void)
210 {
211  static struct aeap_transport_vtable websocket_vtable = {
212  .connect = websocket_connect,
213  .disconnect = websocket_disconnect,
214  .destroy = websocket_destroy,
215  .read = websocket_read,
216  .write = websocket_write,
217  };
218 
219  return &websocket_vtable;
220 }
221 
222 /*!
223  * \brief Initialize a transport websocket object, and set its virtual table
224  *
225  * \param transport The transport to initialize
226  *
227  * \returns 0 on success, -1 on error
228  */
229 static int transport_websocket_init(struct aeap_transport_websocket *transport)
230 {
231  transport->ws = NULL;
232 
233  ((struct aeap_transport *)transport)->vtable = transport_websocket_vtable();
234 
235  return 0;
236 }
237 
238 struct aeap_transport_websocket *aeap_transport_websocket_create(void)
239 {
240  struct aeap_transport_websocket *transport;
241 
242  transport = ast_calloc(1, sizeof(*transport));
243  if (!transport) {
244  ast_log(LOG_ERROR, "AEAP websocket: unable to create transport websocket");
245  return NULL;
246  }
247 
248  if (transport_websocket_init(transport)) {
249  ast_free(transport);
250  return NULL;
251  }
252 
253  return transport;
254 }
Asterisk main include file. File version handling, generic pbx functions.
struct ast_websocket * ws
ast_websocket_result
Result code for a websocket client.
ast_mutex_t read_lock
Definition: transport.h:104
Utility functions.
Options used for a websocket client.
struct aeap_transport base
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
Asterisk external application transport virtual table.
Definition: transport.h:33
int AST_OPTIONAL_API_NAME() ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Write function for websocket traffic.
Structure definition for session.
#define ast_calloc(num, len)
A wrapper for calloc()
Definition: astmm.h:202
int(* connect)(struct aeap_transport *self, const char *url, const char *protocol, int timeout)
Connect a transport.
Definition: transport.h:44
AST_AEAP_DATA_TYPE
Supported Asterisk external application data types.
Definition: res_aeap.h:135
Asterisk external application transport structure to be "derived" by specific transport implementatio...
Definition: transport.h:98
ast_websocket_opcode
WebSocket operation codes.