corosync  3.1.9
pload.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008-2012 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Authors: Steven Dake (sdake@redhat.com)
7  * Fabio M. Di Nitto (fdinitto@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #include <qb/qblist.h>
39 #include <qb/qbutil.h>
40 #include <qb/qbipc_common.h>
41 
42 #include <corosync/swab.h>
43 #include <corosync/corodefs.h>
44 #include <corosync/coroapi.h>
45 #include <corosync/icmap.h>
46 #include <corosync/logsys.h>
47 
48 #include "service.h"
49 #include "util.h"
50 
51 LOGSYS_DECLARE_SUBSYS ("PLOAD");
52 
53 /*
54  * Service Interfaces required by service_message_handler struct
55  */
56 static struct corosync_api_v1 *api;
57 
58 static char *pload_exec_init_fn (struct corosync_api_v1 *corosync_api);
59 
60 /*
61  * on wire / network bits
62  */
66 };
67 
69  struct qb_ipc_request_header header;
70  uint32_t msg_count;
71  uint32_t msg_size;
72 };
73 
75  struct qb_ipc_request_header header;
76 };
77 
78 static void message_handler_req_exec_pload_start (const void *msg,
79  unsigned int nodeid);
80 static void req_exec_pload_start_endian_convert (void *msg);
81 
82 static void message_handler_req_exec_pload_mcast (const void *msg,
83  unsigned int nodeid);
84 static void req_exec_pload_mcast_endian_convert (void *msg);
85 
86 static struct corosync_exec_handler pload_exec_engine[] =
87 {
88  {
89  .exec_handler_fn = message_handler_req_exec_pload_start,
90  .exec_endian_convert_fn = req_exec_pload_start_endian_convert
91  },
92  {
93  .exec_handler_fn = message_handler_req_exec_pload_mcast,
94  .exec_endian_convert_fn = req_exec_pload_mcast_endian_convert
95  }
96 };
97 
98 /*
99  * internal bits and pieces
100  */
101 
102 /*
103  * really unused buffer but we need to give something to iovec
104  */
105 static char *buffer = NULL;
106 
107 /*
108  * wanted/size come from config
109  * sent/delivered track the runtime status
110  */
111 static uint32_t msgs_wanted = 0;
112 static uint32_t msg_size = 0;
113 static uint32_t msgs_sent = 0;
114 static uint32_t msgs_delivered = 0;
115 
116 /*
117  * bit flip to track if we are running or not and avoid multiple instances
118  */
119 static uint8_t pload_started = 0;
120 
121 /*
122  * handle for scheduler
123  */
124 static hdb_handle_t start_mcasting_handle;
125 
126 /*
127  * timing/profiling
128  */
129 static unsigned long long int tv1;
130 static unsigned long long int tv2;
131 static unsigned long long int tv_elapsed;
132 
133 /*
134  * Service engine hooks
135  */
137  .name = "corosync profile loading service",
138  .id = PLOAD_SERVICE,
139  .priority = 1,
140  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
141  .exec_engine = pload_exec_engine,
142  .exec_engine_count = sizeof (pload_exec_engine) / sizeof (struct corosync_exec_handler),
143  .exec_init_fn = pload_exec_init_fn
144 };
145 
147 {
148  return (&pload_service_engine);
149 }
150 
151 /*
152  * internal use only functions
153  */
154 
155 /*
156  * not all architectures / OSes define timersub in sys/time.h or time.h
157  */
158 
159 #ifndef timersub
160 #warning Using internal timersub definition. Check your include header files
161 #define timersub(a, b, result) \
162 do { \
163  (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
164  (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
165  if ((result)->tv_usec < 0) { \
166  --(result)->tv_sec; \
167  (result)->tv_usec += 1000000; \
168  } \
169 } while (0)
170 #endif /* timersub */
171 
172 /*
173  * tell all cluster nodes to start mcasting
174  */
175 static void pload_send_start (uint32_t count, uint32_t size)
176 {
177  struct req_exec_pload_start req_exec_pload_start;
178  struct iovec iov;
179 
181  req_exec_pload_start.msg_count = count;
182  req_exec_pload_start.msg_size = size;
183  iov.iov_base = (void *)&req_exec_pload_start;
184  iov.iov_len = sizeof (struct req_exec_pload_start);
185 
186  api->totem_mcast (&iov, 1, TOTEM_AGREED);
187 }
188 
189 /*
190  * send N empty data messages of size X
191  */
192 static int pload_send_message (const void *arg)
193 {
195  struct iovec iov[2];
196  unsigned int res;
197  unsigned int iov_len = 1;
198 
200  req_exec_pload_mcast.header.size = sizeof (struct req_exec_pload_mcast) + msg_size;
201 
202  iov[0].iov_base = (void *)&req_exec_pload_mcast;
203  iov[0].iov_len = sizeof (struct req_exec_pload_mcast);
204  if (msg_size > sizeof (req_exec_pload_mcast)) {
205  iov[1].iov_base = &buffer;
206  iov[1].iov_len = msg_size - sizeof (req_exec_pload_mcast);
207  iov_len = 2;
208  }
209 
210  do {
211  res = api->totem_mcast (iov, iov_len, TOTEM_AGREED);
212  if (res == -1) {
213  break;
214  } else {
215  msgs_sent++;
216  }
217  } while (msgs_sent < msgs_wanted);
218 
219  if (msgs_sent == msgs_wanted) {
220  return (0);
221  } else {
222  return (-1);
223  }
224 }
225 
226 /*
227  * hook into icmap to read config at runtime
228  * we do NOT start by default, ever!
229  */
230 static void pload_read_config(
231  int32_t event,
232  const char *key_name,
233  struct icmap_notify_value new_val,
234  struct icmap_notify_value old_val,
235  void *user_data)
236 {
237  uint32_t pload_count = 1500000;
238  uint32_t pload_size = 300;
239  char *pload_start = NULL;
240 
241  icmap_get_uint32("pload.count", &pload_count);
242  icmap_get_uint32("pload.size", &pload_size);
243 
244  if (pload_size > MESSAGE_SIZE_MAX) {
245  pload_size = MESSAGE_SIZE_MAX;
246  log_printf(LOGSYS_LEVEL_WARNING, "pload size limited to %u", pload_size);
247  }
248 
249  if ((!pload_started) &&
250  (icmap_get_string("pload.start", &pload_start) == CS_OK)) {
251  if (!strcmp(pload_start,
252  "i_totally_understand_pload_will_crash_my_cluster_and_kill_corosync_on_exit")) {
253  buffer = malloc(pload_size);
254  if (buffer) {
255  log_printf(LOGSYS_LEVEL_WARNING, "Starting pload!");
256  pload_send_start(pload_count, pload_size);
257  } else {
259  "Unable to allocate pload buffer!");
260  }
261  }
262  free(pload_start);
263  }
264 }
265 
266 /*
267  * exec functions
268  */
269 static char *pload_exec_init_fn (struct corosync_api_v1 *corosync_api)
270 {
271  icmap_track_t pload_track = NULL;
272 
273  api = corosync_api;
274 
275  /*
276  * track changes to pload config and start only on demand
277  */
278  if (icmap_track_add("pload.",
280  pload_read_config,
281  NULL,
282  &pload_track) != CS_OK) {
283  return (char *)"Unable to setup pload config tracking!\n";
284  }
285 
286  return NULL;
287 }
288 
289 /*
290  * network messages/onwire handlers
291  */
292 
293 static void req_exec_pload_start_endian_convert (void *msg)
294 {
296 
297  req_exec_pload_start->msg_count = swab32(req_exec_pload_start->msg_count);
298  req_exec_pload_start->msg_size = swab32(req_exec_pload_start->msg_size);
299 }
300 
301 static void message_handler_req_exec_pload_start (
302  const void *msg,
303  unsigned int nodeid)
304 {
305  const struct req_exec_pload_start *req_exec_pload_start = msg;
306 
307  /*
308  * don't start multiple instances
309  */
310  if (pload_started) {
311  return;
312  }
313 
314  pload_started = 1;
315 
316  msgs_wanted = req_exec_pload_start->msg_count;
317  msg_size = req_exec_pload_start->msg_size;
318 
319  api->schedwrk_create (
320  &start_mcasting_handle,
321  pload_send_message,
322  &start_mcasting_handle);
323 }
324 
325 static void req_exec_pload_mcast_endian_convert (void *msg)
326 {
327 }
328 
329 static void message_handler_req_exec_pload_mcast (
330  const void *msg,
331  unsigned int nodeid)
332 {
333  char log_buffer[1024];
334 
335  if (msgs_delivered == 0) {
336  tv1 = qb_util_nano_current_get ();
337  }
338  msgs_delivered += 1;
339  if (msgs_delivered == msgs_wanted) {
340  tv2 = qb_util_nano_current_get ();
341  tv_elapsed = tv2 - tv1;
342  sprintf (log_buffer, "%5d Writes %d bytes per write %7.3f seconds runtime, %9.3f TP/S, %9.3f MB/S.",
343  msgs_delivered,
344  msg_size,
345  (tv_elapsed / 1000000000.0),
346  ((float)msgs_delivered) / (tv_elapsed / 1000000000.0),
347  (((float)msgs_delivered) * ((float)msg_size) /
348  (tv_elapsed / 1000000000.0)) / (1024.0 * 1024.0));
349  log_printf (LOGSYS_LEVEL_NOTICE, "%s", log_buffer);
350  log_printf (LOGSYS_LEVEL_WARNING, "Stopping corosync the hard way");
351  if (buffer) {
352  free(buffer);
353  buffer = NULL;
354  }
355  exit(COROSYNC_DONE_PLOAD);
356  }
357 }
#define TOTEM_AGREED
Definition: coroapi.h:102
const char * name
Definition: coroapi.h:491
pload_exec_message_req_types
Definition: pload.c:63
LOGSYS_DECLARE_SUBSYS("PLOAD")
struct qb_ipc_request_header header
Definition: pload.c:75
The corosync_service_engine struct.
Definition: coroapi.h:490
struct corosync_service_engine pload_service_engine
Definition: pload.c:136
The corosync_exec_handler struct.
Definition: coroapi.h:475
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
Definition: coroapi.h:279
#define log_printf(level, format, args...)
Definition: logsys.h:332
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
Definition: coroapi.h:476
#define SERVICE_ID_MAKE(a, b)
Definition: coroapi.h:458
#define ICMAP_TRACK_DELETE
Definition: icmap.h:77
#define LOGSYS_LEVEL_WARNING
Definition: logsys.h:73
#define ICMAP_TRACK_MODIFY
Definition: icmap.h:78
void * user_data
Definition: sam.c:127
#define ICMAP_TRACK_ADD
Definition: icmap.h:76
The corosync_api_v1 struct.
Definition: coroapi.h:225
cs_error_t icmap_get_uint32(const char *key_name, uint32_t *u32)
Definition: icmap.c:896
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
struct qb_ipc_request_header header
Definition: pload.c:69
int(* schedwrk_create)(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
Definition: coroapi.h:372
qb_handle_t hdb_handle_t
Definition: hdb.h:52
uint32_t msg_count
Definition: pload.c:70
cs_error_t icmap_get_string(const char *key_name, char **str)
Shortcut for icmap_get for string type.
Definition: icmap.c:860
#define LOGSYS_LEVEL_NOTICE
Definition: logsys.h:74
#define MESSAGE_SIZE_MAX
Definition: coroapi.h:97
unsigned int nodeid
Definition: coroapi.h:75
struct corosync_service_engine * pload_get_service_engine_ver0(void)
Definition: pload.c:146
uint32_t msg_size
Definition: pload.c:71
Structure passed as new_value and old_value in change callback.
Definition: icmap.h:91
cs_error_t icmap_track_add(const char *key_name, int32_t track_type, icmap_notify_fn_t notify_fn, void *user_data, icmap_track_t *icmap_track)
Add tracking function for given key_name.
Definition: icmap.c:1163
#define ICMAP_TRACK_PREFIX
Whole prefix is tracked, instead of key only (so "totem." tracking means that "totem.nodeid", "totem.version", ...
Definition: icmap.h:85