44 #include <qb/qbdefs.h>
45 #include <qb/qblist.h>
46 #include <qb/qbutil.h>
47 #include <qb/qbloop.h>
48 #include <qb/qbipcs.h>
67 static int32_t ipc_not_enough_fds_left = 0;
68 static int32_t ipc_fc_is_quorate;
69 static int32_t ipc_fc_totem_queue_level;
70 static int32_t ipc_fc_sync_in_process;
71 static int32_t ipc_allow_connections = 0;
73 #define CS_IPCS_MAPPER_SERV_NAME 256
89 static int32_t cs_ipcs_job_add(
enum qb_loop_priority p,
void *data, qb_loop_job_dispatch_fn fn);
90 static int32_t cs_ipcs_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t events,
91 void *data, qb_ipcs_dispatch_fn_t fn);
92 static int32_t cs_ipcs_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t events,
93 void *data, qb_ipcs_dispatch_fn_t fn);
94 static int32_t cs_ipcs_dispatch_del(int32_t fd);
95 static void outq_flush (
void *data);
98 static struct qb_ipcs_poll_handlers corosync_poll_funcs = {
99 .job_add = cs_ipcs_job_add,
100 .dispatch_add = cs_ipcs_dispatch_add,
101 .dispatch_mod = cs_ipcs_dispatch_mod,
102 .dispatch_del = cs_ipcs_dispatch_del,
105 static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid);
106 static void cs_ipcs_connection_created(qb_ipcs_connection_t *c);
107 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
108 void *data,
size_t size);
109 static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c);
110 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c);
112 static struct qb_ipcs_service_handlers corosync_service_funcs = {
113 .connection_accept = cs_ipcs_connection_accept,
114 .connection_created = cs_ipcs_connection_created,
115 .msg_process = cs_ipcs_msg_process,
116 .connection_closed = cs_ipcs_connection_closed,
117 .connection_destroyed = cs_ipcs_connection_destroyed,
120 static const char* cs_ipcs_serv_short_name(int32_t service_id)
123 switch (service_id) {
157 ipc_allow_connections = allow;
162 if (ipcs_mapper[service_id].inst) {
163 qb_ipcs_destroy(ipcs_mapper[service_id].inst);
164 ipcs_mapper[service_id].
inst = NULL;
169 static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid)
171 int32_t service = qb_ipcs_service_id_get(c);
175 if (!ipc_allow_connections) {
181 ipcs_mapper[service].inst == NULL) {
185 if (ipc_not_enough_fds_left) {
189 if (euid == 0 || egid == 0) {
214 static char * pid_to_name (pid_t pid,
char *out_name,
size_t name_len)
222 snprintf (fname, 32,
"/proc/%d/stat", pid);
223 fp = fopen (fname,
"r");
228 if (fgets (buf,
sizeof (buf), fp) == NULL) {
234 name = strrchr (buf,
'(');
242 rest = strrchr (buf,
')');
244 if (rest == NULL || rest[1] !=
' ') {
253 strncpy (out_name, name, name_len - 1);
254 out_name[name_len - 1] =
'\0';
269 static void cs_ipcs_connection_created(qb_ipcs_connection_t *c)
274 struct qb_ipcs_connection_stats stats;
277 int set_client_pid = 0;
278 int set_proc_name = 0;
282 service = qb_ipcs_service_id_get(c);
285 context = calloc(1, size);
286 if (context == NULL) {
287 qb_ipcs_disconnect(c);
296 qb_ipcs_context_set(c, context);
299 log_printf(LOG_ERR,
"lib_init_fn failed, disconnecting");
300 qb_ipcs_disconnect(c);
305 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
307 if (stats.client_pid > 0) {
308 if (pid_to_name (stats.client_pid, proc_name,
sizeof(proc_name))) {
310 proc_name, stats.client_pid, c);
314 stats.client_pid, c);
325 qb_ipcs_disconnect(c);
335 if (set_client_pid) {
377 qb_ipcs_connection_ref(conn);
382 qb_ipcs_connection_unref(conn);
388 cnx = qb_ipcs_context_get(conn);
389 return &cnx->
data[0];
392 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
400 context = qb_ipcs_context_get(c);
403 list != &context->
outq_head; list = list_next) {
405 list_next = list->
next;
406 outq_item =
list_entry (list,
struct outq_item, list);
409 free (outq_item->
msg);
416 static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c)
419 int32_t service = qb_ipcs_service_id_get(c);
422 const char *key_name;
433 cnx = qb_ipcs_context_get(c);
450 const struct iovec *iov,
451 unsigned int iov_len)
453 int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len);
462 int32_t rc = qb_ipcs_response_send(conn, msg, mlen);
469 static void outq_flush (
void *
data)
471 qb_ipcs_connection_t *conn =
data;
473 struct outq_item *outq_item;
478 list != &context->
outq_head; list = list_next) {
480 list_next = list->
next;
481 outq_item =
list_entry (list,
struct outq_item, list);
483 rc = qb_ipcs_event_send(conn, outq_item->
msg, outq_item->
mlen);
484 if (rc < 0 && rc != -EAGAIN) {
486 qb_perror(LOG_ERR,
"qb_ipcs_event_send");
488 }
else if (rc == -EAGAIN) {
491 assert(rc == outq_item->
mlen);
496 free (outq_item->
msg);
510 static void msg_send_or_queue(qb_ipcs_connection_t *conn,
const struct iovec *iov, uint32_t iov_len)
514 int32_t bytes_msg = 0;
515 struct outq_item *outq_item;
519 for (i = 0; i < iov_len; i++) {
520 bytes_msg += iov[i].iov_len;
524 assert(list_empty (&context->
outq_head));
525 rc = qb_ipcs_event_sendv(conn, iov, iov_len);
526 if (rc == bytes_msg) {
540 outq_item = malloc (
sizeof (
struct outq_item));
541 if (outq_item == NULL) {
542 qb_ipcs_disconnect(conn);
545 outq_item->
msg = malloc (bytes_msg);
546 if (outq_item->
msg == NULL) {
548 qb_ipcs_disconnect(conn);
552 write_buf = outq_item->
msg;
553 for (i = 0; i < iov_len; i++) {
554 memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
555 write_buf += iov[i].iov_len;
557 outq_item->
mlen = bytes_msg;
558 list_init (&outq_item->
list);
566 iov.iov_base = (
void *)msg;
568 msg_send_or_queue (conn, &iov, 1);
573 const struct iovec *iov,
574 unsigned int iov_len)
576 msg_send_or_queue(conn, iov, iov_len);
580 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
581 void *data,
size_t size)
583 struct qb_ipc_response_header response;
584 struct qb_ipc_request_header *request_pt = (
struct qb_ipc_request_header *)data;
585 int32_t service = qb_ipcs_service_id_get(c);
587 int32_t is_async_call = QB_FALSE;
589 int sending_allowed_private_data;
595 &sending_allowed_private_data);
597 is_async_call = (service ==
CPG_SERVICE && request_pt->id == 2);
603 if (send_ok == -EINVAL) {
604 response.size =
sizeof (response);
608 cnx = qb_ipcs_context_get(c);
615 __func__, response.size, response.error);
617 qb_ipcs_response_send (c,
622 }
else if (send_ok < 0) {
623 cnx = qb_ipcs_context_get(c);
627 if (!is_async_call) {
631 response.size =
sizeof (response);
634 qb_ipcs_response_send (c,
639 "*** %s() (%d:%d - %d) %s!",
640 __func__, service, request_pt->id,
641 is_async_call, strerror(-send_ok));
655 static int32_t cs_ipcs_job_add(
enum qb_loop_priority p,
void *data, qb_loop_job_dispatch_fn fn)
660 static int32_t cs_ipcs_dispatch_add(
enum qb_loop_priority p, int32_t fd, int32_t events,
661 void *data, qb_ipcs_dispatch_fn_t fn)
666 static int32_t cs_ipcs_dispatch_mod(
enum qb_loop_priority p, int32_t fd, int32_t events,
667 void *data, qb_ipcs_dispatch_fn_t fn)
672 static int32_t cs_ipcs_dispatch_del(int32_t fd)
677 static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available)
679 ipc_not_enough_fds_left = not_enough;
692 return ipc_fc_totem_queue_level;
695 static qb_loop_timer_handle ipcs_check_for_flow_control_timer;
696 static void cs_ipcs_check_for_flow_control(
void)
705 fc_enabled = QB_IPCS_RATE_OFF;
706 if (ipc_fc_is_quorate == 1 ||
713 ipc_fc_sync_in_process == 0) {
714 fc_enabled = QB_FALSE;
721 fc_enabled = QB_FALSE;
723 fc_enabled = QB_IPCS_RATE_OFF_2;
727 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, fc_enabled);
732 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_FAST);
734 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_NORMAL);
736 qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_SLOW);
741 static void cs_ipcs_fc_quorum_changed(
int quorate,
void *context)
744 cs_ipcs_check_for_flow_control();
747 static void cs_ipcs_totem_queue_level_changed(
enum totem_q_level level)
749 ipc_fc_totem_queue_level = level;
750 cs_ipcs_check_for_flow_control();
755 ipc_fc_sync_in_process = sync_in_process;
756 cs_ipcs_check_for_flow_control();
762 struct qb_ipcs_stats srv_stats;
763 struct qb_ipcs_connection_stats stats;
764 qb_ipcs_connection_t *c, *prev;
772 qb_ipcs_stats_get(ipcs_mapper[i].inst, &srv_stats, QB_FALSE);
774 for (c = qb_ipcs_connection_first_get(ipcs_mapper[i].inst);
776 prev = c, c = qb_ipcs_connection_next_get(ipcs_mapper[i].inst, prev), qb_ipcs_connection_unref(prev)) {
778 cnx = qb_ipcs_context_get(c);
779 if (cnx == NULL)
continue;
781 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
819 static enum qb_ipc_type cs_get_ipc_type (
void)
823 enum qb_ipc_type ret = QB_IPC_NATIVE;
827 return QB_IPC_NATIVE;
830 if (strcmp(str,
"native") == 0) {
835 if (strcmp(str,
"shm") == 0) {
840 if (strcmp(str,
"socket") == 0) {
858 const char *serv_short_name;
860 serv_short_name = cs_ipcs_serv_short_name(service->
id);
864 "NOT Initializing IPC on %s [%d]",
872 return "qb_ipcs_run error";
875 ipcs_mapper[service->
id].
id = service->
id;
876 strcpy(ipcs_mapper[service->
id].
name, serv_short_name);
878 "Initializing IPC on %s [%d]",
879 ipcs_mapper[service->
id].
name,
880 ipcs_mapper[service->
id].
id);
881 ipcs_mapper[service->
id].
inst = qb_ipcs_create(ipcs_mapper[service->
id].
name,
882 ipcs_mapper[service->
id].
id,
884 &corosync_service_funcs);
885 assert(ipcs_mapper[service->
id].
inst);
886 qb_ipcs_poll_handlers_set(ipcs_mapper[service->
id].
inst,
887 &corosync_poll_funcs);
888 if (qb_ipcs_run(ipcs_mapper[service->
id].
inst) != 0) {
890 return "qb_ipcs_run error";
void cs_ipc_refcnt_dec(void *conn)
int32_t cs_ipcs_q_level_get(void)
const char * cs_ipcs_service_init(struct corosync_service_engine *service)
#define CS_IPCS_MAPPER_SERV_NAME
const char * icmap_iter_next(icmap_iter_t iter, size_t *value_len, icmap_value_types_t *type)
Return next item in iterator iter.
#define LOGSYS_LEVEL_INFO
void cs_ipc_refcnt_inc(void *conn)
struct list_head outq_head
The corosync_service_engine struct.
Totem Single Ring Protocol.
void icmap_iter_finalize(icmap_iter_t iter)
Finalize iterator.
qb_loop_t * cs_poll_handle_get(void)
void corosync_recheck_the_q_level(void *data)
int(* quorum_register_callback)(quorum_callback_fn_t callback_fn, void *context)
cs_error_t icmap_set_string(const char *key_name, const char *value)
int cs_ipcs_dispatch_iov_send(void *conn, const struct iovec *iov, unsigned int iov_len)
struct corosync_service_engine * corosync_service[SERVICES_COUNT_MAX]
cs_error_t icmap_inc(const char *key_name)
Increase stored value by one.
#define log_printf(level, format, args...)
void cs_ipcs_sync_state_changed(int32_t sync_in_process)
int corosync_sending_allowed(unsigned int service, unsigned int id, const void *msg, void *sending_allowed_private_data)
int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen)
void cs_ipc_allow_connections(int32_t allow)
int(* lib_exit_fn)(void *conn)
#define ICMAP_KEYNAME_MAXLEN
Maximum length of key in icmap.
cs_error_t icmap_get_uint8(const char *key_name, uint8_t *u8)
#define LOGSYS_LEVEL_WARNING
struct corosync_lib_handler * lib_engine
cs_error_t icmap_set_uint32(const char *key_name, uint32_t value)
#define LOGSYS_LEVEL_ERROR
cs_error_t icmap_delete(const char *key_name)
Delete key from map.
int cs_ipcs_response_iov_send(void *conn, const struct iovec *iov, unsigned int iov_len)
#define LOGSYS_LEVEL_DEBUG
LOGSYS_DECLARE_SUBSYS("MAIN")
The corosync_api_v1 struct.
cs_error_t icmap_dec(const char *key_name)
Decrease stored value by one.
cs_error_t icmap_set_uint64(const char *key_name, uint64_t value)
struct corosync_api_v1 * apidef_get(void)
void corosync_sending_allowed_release(void *sending_allowed_private_data)
void cs_ipcs_stats_update(void)
#define SERVICES_COUNT_MAX
int32_t cs_ipcs_service_destroy(int32_t service_id)
void totempg_queue_level_register_callback(totem_queue_level_changed_fn)
cs_error_t icmap_get_string(const char *key_name, char **str)
Shortcut for icmap_get for string type.
#define list_entry(ptr, type, member)
char name[CS_IPCS_MAPPER_SERV_NAME]
void * cs_ipcs_private_data_get(void *conn)
int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
#define LOGSYS_LEVEL_NOTICE
void(* lib_handler_fn)(void *conn, const void *msg)
void icmap_convert_name_to_valid_name(char *key_name)
Converts given key_name to valid key name (replacing all prohibited characters by _) ...
unsigned int private_data_size
icmap_iter_t icmap_iter_init(const char *prefix)
Initialize iterator with given prefix.
qb_map_iter_t * icmap_iter_t
Itterator type.