37 #include "asterisk/res_pjsip.h"
38 #include "asterisk/res_pjsip_session.h"
41 static int transport_type_wss;
42 static int transport_type_wss_ipv6;
47 static int ws_obj_name_serial;
53 pjsip_transport transport;
63 static pj_status_t ws_send_msg(pjsip_transport *transport,
65 const pj_sockaddr_t *rem_addr,
68 pjsip_transport_callback callback)
71 uint64_t len = tdata->buf.cur - tdata->buf.start;
85 static pj_status_t ws_destroy(pjsip_transport *transport)
99 static pj_status_t ws_shutdown(pjsip_transport *transport)
102 int fd = ast_websocket_fd(wstransport->ws_session);
106 shutdown(fd, SHUT_RDWR);
112 static void transport_dtor(
void *arg)
116 if (wstransport->ws_session) {
117 ast_websocket_unref(wstransport->ws_session);
120 if (wstransport->transport.ref_cnt) {
121 pj_atomic_destroy(wstransport->transport.ref_cnt);
124 if (wstransport->transport.lock) {
125 pj_lock_destroy(wstransport->transport.lock);
128 if (wstransport->transport.endpt && wstransport->transport.pool) {
129 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
132 if (wstransport->rdata.tp_info.pool) {
133 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
137 static int transport_shutdown(
void *data)
141 if (!wstransport->transport.is_shutdown && !wstransport->transport.is_destroying) {
142 pjsip_transport_shutdown(&wstransport->transport);
161 static int transport_create(
void *data)
165 pjsip_tp_state_callback state_cb;
167 pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
168 struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
178 ast_log(LOG_ERROR,
"Failed to allocate WebSocket transport.\n");
183 snprintf(newtransport->transport.obj_name, PJ_MAX_OBJ_NAME,
"ws%p-%d",
186 newtransport->transport.endpt = endpt;
188 if (!(pool = pjsip_endpt_create_pool(endpt,
"ws", 512, 512))) {
189 ast_log(LOG_ERROR,
"Failed to allocate WebSocket endpoint pool.\n");
193 newtransport->transport.pool =
pool;
194 newtransport->ws_session = create_data->ws_session;
197 ast_websocket_ref(newtransport->ws_session);
199 status = pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
200 if (status != PJ_SUCCESS) {
204 status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
205 if (status != PJ_SUCCESS) {
216 newtransport->transport.type_name = ast_websocket_is_secure(newtransport->ws_session)
220 ast_debug(4,
"Creating websocket transport for %s:%s\n",
221 newtransport->transport.type_name, ws_addr_str);
223 newtransport->transport.info = (
char *) pj_pool_alloc(newtransport->transport.pool,
224 strlen(newtransport->transport.type_name) + strlen(ws_addr_str) +
sizeof(
" to "));
225 sprintf(newtransport->transport.info,
"%s to %s", newtransport->transport.type_name, ws_addr_str);
227 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.key.rem_addr);
228 if (newtransport->transport.key.rem_addr.addr.sa_family == pj_AF_INET6()) {
229 newtransport->transport.key.type = transport_type_wss_ipv6;
231 newtransport->transport.key.type = transport_type_wss;
234 newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
237 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.local_addr);
238 pj_strdup2(pool, &newtransport->transport.local_name.host,
ast_sockaddr_stringify_addr(ast_websocket_local_address(newtransport->ws_session)));
239 newtransport->transport.local_name.port =
ast_sockaddr_port(ast_websocket_local_address(newtransport->ws_session));
240 pj_strdup2(pool, &newtransport->transport.remote_name.host,
ast_sockaddr_stringify_addr(ast_websocket_remote_address(newtransport->ws_session)));
241 newtransport->transport.remote_name.port =
ast_sockaddr_port(ast_websocket_remote_address(newtransport->ws_session));
243 newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
244 newtransport->transport.dir = PJSIP_TP_DIR_INCOMING;
245 newtransport->transport.tpmgr = tpmgr;
246 newtransport->transport.send_msg = &ws_send_msg;
247 newtransport->transport.do_shutdown = &ws_shutdown;
248 newtransport->transport.destroy = &ws_destroy;
250 status = pjsip_transport_register(newtransport->transport.tpmgr,
251 (pjsip_transport *)newtransport);
252 if (status != PJ_SUCCESS) {
259 newtransport->rdata.tp_info.transport = &newtransport->transport;
260 newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt,
"rtd%p",
261 PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
262 if (!newtransport->rdata.tp_info.pool) {
263 ast_log(LOG_ERROR,
"Failed to allocate WebSocket rdata.\n");
264 pjsip_transport_destroy((pjsip_transport *)newtransport);
268 create_data->transport = newtransport;
271 state_cb = pjsip_tpmgr_get_state_cb(newtransport->transport.tpmgr);
273 pjsip_transport_state_info state_info;
275 memset(&state_info, 0,
sizeof(state_info));
276 state_cb(&newtransport->transport, PJSIP_TP_STATE_CONNECTED, &state_info);
282 ao2_cleanup(newtransport);
289 uint64_t payload_len;
295 static int transport_read(
void *data)
298 struct ws_transport *newtransport = read_data->transport;
301 pjsip_rx_data *rdata = &newtransport->rdata;
306 pj_gettimeofday(&rdata->pkt_info.timestamp);
308 pjsip_pkt_len = PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len;
309 pj_memcpy(rdata->pkt_info.packet, read_data->payload, pjsip_pkt_len);
310 rdata->pkt_info.len = pjsip_pkt_len;
311 rdata->pkt_info.zero = 0;
313 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf,
ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
314 rdata->pkt_info.src_addr_len =
sizeof(rdata->pkt_info.src_addr);
317 rdata->pkt_info.src_port =
ast_sockaddr_port(ast_websocket_remote_address(session));
319 recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
321 pj_pool_reset(rdata->tp_info.pool);
323 return (read_data->payload_len == recvd) ? 0 : -1;
326 static int get_write_timeout(
void)
328 int write_timeout = -1;
331 transport_states = ast_sip_get_transport_states();
333 if (transport_states) {
337 for (; (transport_state = ao2_iterator_next(&it_transport_states)); ao2_cleanup(transport_state)) {
340 if (transport_state->
type != AST_TRANSPORT_WS && transport_state->
type != AST_TRANSPORT_WSS) {
347 ast_debug(5,
"Found %s transport with write timeout: %d\n",
348 transport->
type == AST_TRANSPORT_WS ?
"WS" :
"WSS",
350 write_timeout = MAX(write_timeout, transport->
write_timeout);
353 ao2_cleanup(transport_states);
356 if (write_timeout < 0) {
360 ast_debug(1,
"Write timeout for WS/WSS transports: %d\n", write_timeout);
382 if (ast_websocket_set_nonblock(session)) {
383 ast_websocket_unref(session);
387 if (ast_websocket_set_timeout(session, get_write_timeout())) {
388 ast_websocket_unref(session);
392 serializer = create_websocket_serializer();
394 ast_websocket_unref(session);
398 create_data.ws_session = session;
401 ast_log(LOG_ERROR,
"Could not create WebSocket transport.\n");
403 ast_websocket_unref(session);
407 transport = create_data.transport;
408 read_data.transport = transport;
410 pjsip_transport_add_ref(&transport->transport);
411 while (ast_websocket_wait_for_input(session, -1) > 0) {
415 if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
420 if (read_data.payload_len) {
427 pjsip_transport_dec_ref(&transport->transport);
432 ast_websocket_unref(session);
435 static void save_orig_contact_host(pjsip_rx_data *rdata, pjsip_sip_uri *uri)
437 pjsip_param *x_orig_host;
440 #define MAX_PORT_LEN 5
442 if (rdata->msg_info.msg->type != PJSIP_REQUEST_MSG ||
443 rdata->msg_info.msg->line.req.method.id != PJSIP_REGISTER_METHOD) {
447 ast_debug(1,
"Saving contact '%.*s:%d'\n",
448 (
int)uri->host.slen, uri->host.ptr, uri->port);
450 x_orig_host = PJ_POOL_ALLOC_T(rdata->tp_info.pool, pjsip_param);
451 x_orig_host->name = pj_strdup3(rdata->tp_info.pool,
"x-ast-orig-host");
452 p_value.slen = pj_strlen(&uri->host) + COLON_LEN + MAX_PORT_LEN;
453 p_value.ptr = (
char*)pj_pool_alloc(rdata->tp_info.pool, p_value.slen + 1);
454 p_value.slen = snprintf(p_value.ptr, p_value.slen + 1,
"%.*s:%d", (
int)uri->host.slen, uri->host.ptr, uri->port);
455 pj_strassign(&x_orig_host->value, &p_value);
456 pj_list_insert_before(&uri->other_param, x_orig_host);
464 static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
466 static const pj_str_t STR_WS = {
"ws", 2 };
467 pjsip_contact_hdr *contact;
469 long type = rdata->tp_info.transport->key.type;
471 if (type != (
long) transport_type_wss && type != (
long) transport_type_wss_ipv6) {
475 contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
478 && (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
479 pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
480 const pj_str_t *txp_str = &STR_WS;
484 save_orig_contact_host(rdata, uri);
486 if (DEBUG_ATLEAST(4)) {
487 char src_addr_buffer[AST_SOCKADDR_BUFLEN];
488 const char *ipv6_s =
"", *ipv6_e =
"";
490 if (pj_strchr(&uri->host,
':')) {
495 ast_log(LOG_DEBUG,
"%s re-writing Contact URI from %s%.*s%s:%d%s%.*s to %s;transport=%s\n",
496 pjsip_rx_data_get_info(rdata),
497 ipv6_s, (
int) pj_strlen(&uri->host), pj_strbuf(&uri->host), ipv6_e, uri->port,
498 pj_strlen(&uri->transport_param) ?
";transport=" :
"",
499 (
int) pj_strlen(&uri->transport_param), pj_strbuf(&uri->transport_param),
500 pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buffer,
sizeof(src_addr_buffer), 3),
504 pj_strdup2(rdata->tp_info.pool, &uri->host, rdata->pkt_info.src_name);
505 uri->port = rdata->pkt_info.src_port;
506 pj_strdup(rdata->tp_info.pool, &uri->transport_param, txp_str);
509 rdata->msg_info.via->rport_param = 0;
514 static pjsip_module websocket_module = {
515 .name = {
"WebSocket Transport Module", 26 },
517 .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
518 .on_rx_request = websocket_on_rx_msg,
519 .on_rx_response = websocket_on_rx_msg,
523 static void websocket_outgoing_invite_request(
struct ast_sip_session *session,
struct pjsip_tx_data *tdata)
525 if (session->
inv_session->state == PJSIP_INV_STATE_NULL) {
526 pjsip_dlg_add_usage(session->
inv_session->dlg, &websocket_module, NULL);
533 .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
534 .outgoing_request = websocket_outgoing_invite_request,
537 static int load_module(
void)
549 pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE,
"ws", 5060, &transport_type_wss);
550 pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE | PJSIP_TRANSPORT_IPV6,
"ws", 5060, &transport_type_wss_ipv6);
552 if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
556 ast_sip_session_register_supplement(&websocket_supplement);
558 if (ast_websocket_add_protocol(
"sip", websocket_cb)) {
559 ast_sip_session_unregister_supplement(&websocket_supplement);
560 ast_sip_unregister_service(&websocket_module);
567 static int unload_module(
void)
569 ast_sip_unregister_service(&websocket_module);
570 ast_sip_session_unregister_supplement(&websocket_supplement);
571 ast_websocket_remove_protocol(
"sip", websocket_cb);
576 AST_MODULE_INFO(
ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER,
"PJSIP WebSocket Transport Support",
577 .support_level = AST_MODULE_SUPPORT_CORE,
579 .unload = unload_module,
581 .requires =
"res_pjsip,res_http_websocket",
static char * ast_sockaddr_stringify_addr(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() to return an address only.
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format,...)
Build a taskprocessor name with a sequence number on the end.
Asterisk main include file. File version handling, generic pbx functions.
Structure for variables, used for configurations and for channel variables.
static pj_pool_t * pool
Global memory pool for configuration and timers.
int AST_OPTIONAL_API_NAME() ast_websocket_close(struct ast_websocket *session, uint16_t reason)
Close function for websocket session.
void ao2_iterator_destroy(struct ao2_iterator *iter)
Destroy a container iterator.
#define ao2_t_alloc_options(data_size, destructor_fn, options, debug_msg)
Allocate and initialize an object.
#define AST_TASKPROCESSOR_MAX_NAME
Suggested maximum taskprocessor name length (less null terminator).
struct pjsip_inv_session * inv_session
A structure describing a SIP session.
int ast_atomic_fetchadd_int(volatile int *p, int v)
Atomically add v to *p and return the previous value of *p.
void * ast_sorcery_retrieve_by_id(const struct ast_sorcery *sorcery, const char *type, const char *id)
Retrieve an object using its unique identifier.
#define ast_sockaddr_port(addr)
Get the port number of a socket address.
Structure for SIP transport information.
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int(*sip_task)(void *), void *task_data)
Push a task to the serializer and wait for it to complete.
#define ao2_ref(o, delta)
Reference/unreference an object and return the old refcount.
#define ast_debug(level,...)
Log a DEBUG message.
Support for WebSocket connections within the Asterisk HTTP server and client WebSocket connections to...
#define AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT
Default websocket write timeout, in ms.
static char * ast_sockaddr_stringify(const struct ast_sockaddr *addr)
Wrapper around ast_sockaddr_stringify_fmt() with default format.
struct ast_taskprocessor * ast_sip_create_serializer(const char *name)
Create a new serializer for SIP tasks.
int AST_OPTIONAL_API_NAME() ast_websocket_write(struct ast_websocket *session, enum ast_websocket_opcode opcode, char *payload, uint64_t payload_size)
Write function for websocket traffic.
Structure definition for session.
Module has failed to load, may be in an inconsistent state.
An API for managing task processing threads that can be shared across modules.
A supplement to SIP message processing.
A ast_taskprocessor structure is a singleton by name.
When we need to walk through a container, we use an ao2_iterator to keep track of the current positio...
void * ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
Unreference the specified taskprocessor and its reference count will decrement.
#define ASTERISK_GPL_KEY
The text the key() function should return.
Asterisk module definitions.
ast_websocket_opcode
WebSocket operation codes.
struct ao2_iterator ao2_iterator_init(struct ao2_container *c, int flags) attribute_warn_unused_result
Create an iterator for a container.
char buf[MAXIMUM_FRAME_SIZE]
Wrapper for pjsip_transport, for storing the WebSocket session.