46 #include <sys/types.h>
47 #include <sys/socket.h>
51 #include <qb/qbipcc.h>
65 #define SAM_CMAP_S_FAILED "failed"
66 #define SAM_CMAP_S_REGISTERED "stopped"
67 #define SAM_CMAP_S_STARTED "running"
68 #define SAM_CMAP_S_Q_WAIT "waiting for quorum"
70 #define SAM_RP_MASK_Q(pol) (pol & (~SAM_RECOVERY_POLICY_QUORUM))
71 #define SAM_RP_MASK_C(pol) (pol & (~SAM_RECOVERY_POLICY_CMAP))
72 #define SAM_RP_MASK(pol) (pol & (~(SAM_RECOVERY_POLICY_QUORUM | SAM_RECOVERY_POLICY_CMAP)))
146 uint64_t hc_period, last_hc;
152 svalue = ssvalue[
SAM_RP_MASK (sam_internal_data.recovery_policy)];
161 hc_period = sam_internal_data.time_interval;
170 last_hc = cs_timestamp_get();
194 static cs_error_t sam_cmap_destroy_pid_path (
void)
200 err =
cmap_iter_init(sam_internal_data.cmap_handle, sam_internal_data.cmap_pid_path, &iter);
205 while ((err =
cmap_iter_next(sam_internal_data.cmap_handle, iter, key_name, NULL, NULL)) ==
CS_OK) {
206 cmap_delete(sam_internal_data.cmap_handle, key_name);
224 snprintf(sam_internal_data.cmap_pid_path,
CMAP_KEYNAME_MAXLEN,
"resources.process.%d.", getpid());
229 goto destroy_finalize_error;
233 goto destroy_finalize_error;
238 destroy_finalize_error:
239 sam_cmap_destroy_pid_path ();
244 static void quorum_notification_fn (
248 uint32_t view_list_entries,
251 sam_internal_data.quorate =
quorate;
259 uint32_t quorum_type;
276 if ((err =
quorum_initialize (&sam_internal_data.quorum_handle, &quorum_callbacks, &quorum_type)) !=
CS_OK) {
281 goto exit_error_quorum;
284 if ((err =
quorum_fd_get (sam_internal_data.quorum_handle, &sam_internal_data.quorum_fd)) !=
CS_OK) {
285 goto exit_error_quorum;
292 goto exit_error_quorum;
301 sam_internal_data.warn_signal = SIGTERM;
303 sam_internal_data.am_i_child = 0;
305 sam_internal_data.user_data = NULL;
306 sam_internal_data.user_data_size = 0;
307 sam_internal_data.user_data_allocated = 0;
309 pthread_mutex_init (&sam_internal_data.lock, NULL);
322 static size_t sam_safe_write (
328 ssize_t tmp_bytes_write;
333 tmp_bytes_write = write (d, (
const char *)buf + bytes_write,
334 (nbyte - bytes_write > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_write);
336 if (tmp_bytes_write == -1) {
337 if (!(errno == EAGAIN || errno == EINTR))
340 bytes_write += tmp_bytes_write;
342 }
while (bytes_write != nbyte);
344 return (bytes_write);
350 static size_t sam_safe_read (
356 ssize_t tmp_bytes_read;
361 tmp_bytes_read = read (d, (
char *)buf + bytes_read,
362 (nbyte - bytes_read > SSIZE_MAX) ? SSIZE_MAX : nbyte - bytes_read);
364 if (tmp_bytes_read == -1) {
365 if (!(errno == EAGAIN || errno == EINTR))
368 bytes_read += tmp_bytes_read;
371 }
while (bytes_read != nbyte && tmp_bytes_read != 0);
382 if (sam_safe_read (sam_internal_data.child_fd_in, &reply, sizeof (reply)) !=
sizeof (reply)) {
391 if (sam_safe_read (sam_internal_data.child_fd_in, &err, sizeof (err)) !=
sizeof (err)) {
423 pthread_mutex_lock (&sam_internal_data.lock);
425 *size = sam_internal_data.user_data_size;
427 pthread_mutex_unlock (&sam_internal_data.lock);
451 pthread_mutex_lock (&sam_internal_data.lock);
453 if (sam_internal_data.user_data_size == 0) {
459 if (size < sam_internal_data.user_data_size) {
465 memcpy (data, sam_internal_data.user_data, sam_internal_data.user_data_size);
467 pthread_mutex_unlock (&sam_internal_data.lock);
472 pthread_mutex_unlock (&sam_internal_data.lock);
497 pthread_mutex_lock (&sam_internal_data.lock);
499 if (sam_internal_data.am_i_child) {
504 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
510 if (sam_safe_write (sam_internal_data.child_fd_out, &size, sizeof (size)) !=
sizeof (size)) {
516 if (data != NULL && sam_safe_write (sam_internal_data.child_fd_out, data, size) != size) {
525 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
534 free (sam_internal_data.user_data);
535 sam_internal_data.user_data = NULL;
536 sam_internal_data.user_data_allocated = 0;
537 sam_internal_data.user_data_size = 0;
539 if (sam_internal_data.user_data_allocated < size) {
540 if ((new_data = realloc (sam_internal_data.user_data, size)) == NULL) {
546 sam_internal_data.user_data_allocated = size;
548 new_data = sam_internal_data.user_data;
550 sam_internal_data.user_data = new_data;
551 sam_internal_data.user_data_size = size;
553 memcpy (sam_internal_data.user_data, data, size);
556 pthread_mutex_unlock (&sam_internal_data.lock);
561 pthread_mutex_unlock (&sam_internal_data.lock);
576 recpol = sam_internal_data.recovery_policy;
579 pthread_mutex_lock (&sam_internal_data.lock);
584 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
586 pthread_mutex_unlock (&sam_internal_data.lock);
596 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
597 pthread_mutex_unlock (&sam_internal_data.lock);
602 pthread_mutex_unlock (&sam_internal_data.lock);
605 if (sam_internal_data.hc_callback)
606 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) !=
sizeof (command))
626 pthread_mutex_lock (&sam_internal_data.lock);
629 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
631 pthread_mutex_unlock (&sam_internal_data.lock);
641 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
642 pthread_mutex_unlock (&sam_internal_data.lock);
647 pthread_mutex_unlock (&sam_internal_data.lock);
650 if (sam_internal_data.hc_callback)
651 if (sam_safe_write (sam_internal_data.cb_wpipe_fd, &command, sizeof (command)) !=
sizeof (command))
669 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command))
693 free (sam_internal_data.user_data);
714 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command))
731 pthread_mutex_lock (&sam_internal_data.lock);
733 if (sam_internal_data.am_i_child) {
738 if (sam_safe_write (sam_internal_data.child_fd_out, &command, sizeof (command)) !=
sizeof (command)) {
744 if (sam_safe_write (sam_internal_data.child_fd_out, &warn_signal, sizeof (warn_signal)) !=
745 sizeof (warn_signal)) {
754 if ((err = sam_read_reply (sam_internal_data.child_fd_in)) !=
CS_OK) {
764 pthread_mutex_unlock (&sam_internal_data.lock);
769 pthread_mutex_unlock (&sam_internal_data.lock);
784 if (sam_safe_write (parent_fd_out, &reply,
sizeof (reply)) !=
sizeof (reply)) {
794 if (sam_safe_write (parent_fd_out, &reply,
sizeof (reply)) !=
sizeof (reply)) {
797 if (sam_safe_write (parent_fd_out, &err,
sizeof (err)) !=
sizeof (err)) {
805 static cs_error_t sam_parent_warn_signal_set (
814 if (sam_safe_read (parent_fd_in, &warn_signal,
sizeof (warn_signal)) !=
sizeof (warn_signal)) {
825 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
828 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
831 static cs_error_t sam_parent_wait_for_quorum (
836 struct pollfd pfds[2];
855 while (!sam_internal_data.quorate) {
856 pfds[0].fd = parent_fd_in;
860 pfds[1].fd = sam_internal_data.quorum_fd;
861 pfds[1].events = POLLIN;
864 poll_err = poll (pfds, 2, -1);
866 if (poll_err == -1) {
871 if (errno != EINTR) {
877 if (pfds[0].revents != 0) {
878 if (pfds[0].revents == POLLERR || pfds[0].revents == POLLHUP ||pfds[0].revents == POLLNVAL) {
886 if (pfds[1].revents != 0) {
899 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
906 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
927 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
930 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
940 if (!sam_internal_data.term_send) {
944 kill (child_pid, sam_internal_data.warn_signal);
946 sam_internal_data.term_send = 1;
951 kill (child_pid, SIGKILL);
958 static cs_error_t sam_parent_mark_child_failed (
964 recpol = sam_internal_data.recovery_policy;
966 sam_internal_data.term_send = 1;
971 return (sam_parent_kill_child (action, child_pid));
985 if (sam_safe_read (parent_fd_in, &size,
sizeof (size)) !=
sizeof (size)) {
991 user_data = malloc (size);
992 if (user_data == NULL) {
997 if (sam_safe_read (parent_fd_in, user_data, size) != size) {
999 goto free_error_reply;
1005 goto free_error_reply;
1010 return (sam_parent_reply_send (
CS_OK, parent_fd_in, parent_fd_out));
1015 return (sam_parent_reply_send (err, parent_fd_in, parent_fd_out));
1029 struct pollfd pfds[2];
1037 recpol = sam_internal_data.recovery_policy;
1040 pfds[0].fd = parent_fd_in;
1041 pfds[0].events = POLLIN;
1042 pfds[0].revents = 0;
1045 if (status == 1 && sam_internal_data.time_interval != 0) {
1046 time_interval = sam_internal_data.time_interval;
1052 pfds[nfds].fd = sam_internal_data.quorum_fd;
1053 pfds[nfds].events = POLLIN;
1054 pfds[nfds].revents = 0;
1058 poll_error = poll (pfds, nfds, time_interval);
1060 if (poll_error == -1) {
1065 if (errno != EINTR) {
1070 if (poll_error == 0) {
1077 sam_parent_kill_child (&action, child_pid);
1081 if (poll_error > 0) {
1082 if (pfds[0].revents != 0) {
1086 bytes_read = sam_safe_read (parent_fd_in, &command, 1);
1088 if (bytes_read == 0) {
1100 if (bytes_read == -1) {
1118 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1119 if (sam_parent_wait_for_quorum (parent_fd_in,
1120 parent_fd_out) !=
CS_OK) {
1125 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1126 if (sam_parent_cmap_state_set (parent_fd_in,
1127 parent_fd_out, 1) !=
CS_OK) {
1140 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1141 if (sam_parent_cmap_state_set (parent_fd_in,
1142 parent_fd_out, 0) !=
CS_OK) {
1151 sam_parent_data_store (parent_fd_in, parent_fd_out);
1154 sam_parent_warn_signal_set (parent_fd_in, parent_fd_out);
1158 sam_parent_mark_child_failed (&action, child_pid);
1163 if ((sam_internal_data.recovery_policy & SAM_RECOVERY_POLICY_QUORUM) &&
1164 pfds[1].revents != 0) {
1172 sam_parent_kill_child (&action, child_pid);
1188 int pipe_fd_out[2], pipe_fd_in[2];
1197 recpol = sam_internal_data.recovery_policy;
1199 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1203 if ((error = sam_cmap_register ()) !=
CS_OK) {
1211 if ((pipe_error = pipe (pipe_fd_out)) != 0) {
1216 if ((pipe_error = pipe (pipe_fd_in)) != 0) {
1217 close (pipe_fd_out[0]);
1218 close (pipe_fd_out[1]);
1224 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1230 sam_internal_data.instance_id++;
1232 sam_internal_data.term_send = 0;
1240 sam_internal_data.instance_id--;
1250 close (pipe_fd_out[0]);
1251 close (pipe_fd_in[1]);
1253 sam_internal_data.child_fd_out = pipe_fd_out[1];
1254 sam_internal_data.child_fd_in = pipe_fd_in[0];
1257 *instance_id = sam_internal_data.instance_id;
1259 sam_internal_data.am_i_child = 1;
1262 pthread_mutex_init (&sam_internal_data.lock, NULL);
1269 close (pipe_fd_out[1]);
1270 close (pipe_fd_in[0]);
1272 action = sam_parent_handler (pipe_fd_out[0], pipe_fd_in[1], pid);
1274 close (pipe_fd_out[0]);
1275 close (pipe_fd_in[1]);
1285 while (waitpid (pid, &child_status, 0) == -1 && errno == EINTR)
1288 old_action = action;
1297 if (recpol & SAM_RECOVERY_POLICY_QUORUM) {
1301 if (recpol & SAM_RECOVERY_POLICY_CMAP) {
1308 sam_cmap_destroy_pid_path ();
1312 exit (WEXITSTATUS (child_status));
1323 static void *hc_callback_thread (
void *unused_param)
1327 ssize_t bytes_readed;
1336 time_interval = sam_internal_data.time_interval >> 2;
1339 pfds.fd = sam_internal_data.cb_rpipe_fd;
1340 pfds.events = POLLIN;
1346 tmp_time_interval = -1;
1349 poll_error = poll (&pfds, 1, tmp_time_interval);
1351 if (poll_error == 0) {
1357 if (sam_internal_data.hc_callback () != 0) {
1365 if (poll_error > 0) {
1366 bytes_readed = sam_safe_read (sam_internal_data.cb_rpipe_fd, &command, 1);
1368 if (bytes_readed > 0) {
1382 return (unused_param);
1388 pthread_attr_t thread_attr;
1396 if (sam_internal_data.time_interval == 0) {
1400 if (sam_internal_data.cb_registered) {
1401 sam_internal_data.hc_callback = cb;
1414 pipe_error = pipe (pipe_fd);
1416 if (pipe_error != 0) {
1424 sam_internal_data.cb_rpipe_fd = pipe_fd[0];
1425 sam_internal_data.cb_wpipe_fd = pipe_fd[1];
1430 error = pthread_attr_init (&thread_attr);
1433 goto error_close_fd_exit;
1437 pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
1438 pthread_attr_setstacksize (&thread_attr, 32768);
1443 error = pthread_create (&sam_internal_data.cb_thread, &thread_attr, hc_callback_thread, NULL);
1447 goto error_attr_destroy_exit;
1453 pthread_attr_destroy(&thread_attr);
1455 sam_internal_data.cb_registered = 1;
1456 sam_internal_data.hc_callback = cb;
1460 error_attr_destroy_exit:
1461 pthread_attr_destroy(&thread_attr);
1462 error_close_fd_exit:
1463 sam_internal_data.cb_rpipe_fd = sam_internal_data.cb_wpipe_fd = 0;
cs_error_t cmap_set_uint64(cmap_handle_t handle, const char *key_name, uint64_t value)
enum sam_internal_status_t internal_status
#define SAM_RP_MASK_Q(pol)
cs_error_t sam_hc_callback_register(sam_hc_callback_t cb)
Register healtcheck callback.
sam_hc_callback_t hc_callback
#define SAM_CMAP_S_STARTED
cs_error_t quorum_dispatch(quorum_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
cs_error_t cmap_iter_next(cmap_handle_t handle, cmap_iter_handle_t iter_handle, char key_name[], size_t *value_len, cmap_value_types_t *type)
Return next item in iterator iter.
cmap_handle_t cmap_handle
cs_error_t cmap_initialize(cmap_handle_t *handle)
Create a new cmap connection.
The quorum_callbacks_t struct.
cs_error_t sam_initialize(int time_interval, sam_recovery_policy_t recovery_policy)
Create a new SAM connection.
quorum_handle_t quorum_handle
#define CMAP_KEYNAME_MAXLEN
cs_error_t cmap_iter_init(cmap_handle_t handle, const char *prefix, cmap_iter_handle_t *cmap_iter_handle)
Initialize iterator with given prefix.
int(* sam_hc_callback_t)(void)
Callback definition for event driven checking.
#define SAM_CMAP_S_REGISTERED
cs_error_t sam_finalize(void)
Close the SAM handle.
sam_recovery_policy_t recovery_policy
cs_error_t cmap_iter_finalize(cmap_handle_t handle, cmap_iter_handle_t iter_handle)
Finalize iterator.
cs_error_t sam_register(unsigned int *instance_id)
Register application.
cs_error_t sam_hc_send(void)
Send healthcheck confirmation.
#define SAM_CMAP_S_Q_WAIT
size_t user_data_allocated
cs_error_t sam_mark_failed(void)
Marks child as failed.
char cmap_pid_path[CMAP_KEYNAME_MAXLEN]
cs_error_t
The cs_error_t enum.
cs_error_t sam_data_getsize(size_t *size)
Return size of stored data.
cs_error_t cmap_set_string(cmap_handle_t handle, const char *key_name, const char *value)
cs_error_t sam_stop(void)
Stop healthchecking.
uint64_t quorum_handle_t
quorum_handle_t
cs_error_t sam_data_store(const void *data, size_t size)
Store user data.
cs_error_t sam_warn_signal_set(int warn_signal)
Set warning signal to be sent.
cs_error_t cmap_delete(cmap_handle_t handle, const char *key_name)
Deletes key from cmap database.
cs_error_t sam_data_restore(void *data, size_t size)
Return stored data.
uint64_t cmap_iter_handle_t
cs_error_t quorum_fd_get(quorum_handle_t handle, int *fd)
Get a file descriptor on which to poll.
quorum_notification_fn_t quorum_notify_fn
sam_recovery_policy_t
sam_recovery_policy_t enum
cs_error_t quorum_trackstart(quorum_handle_t handle, unsigned int flags)
Track node and quorum changes.
cs_error_t cmap_finalize(cmap_handle_t handle)
Close the cmap handle.
struct memb_ring_id ring_id
cs_error_t quorum_finalize(quorum_handle_t handle)
Close the quorum handle.
#define SAM_CMAP_S_FAILED
cs_error_t sam_start(void)
Start healthchecking.
#define SAM_RP_MASK_C(pol)
cs_error_t quorum_initialize(quorum_handle_t *handle, quorum_callbacks_t *callbacks, uint32_t *quorum_type)
Create a new quorum connection.