libfuse
fuse_loop_mt.c
1 /*
2  FUSE: Filesystem in Userspace
3  Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
4 
5  Implementation of the multi-threaded FUSE session loop.
6 
7  This program can be distributed under the terms of the GNU LGPLv2.
8  See the file COPYING.LIB.
9 */
10 
11 #include "config.h"
12 #include "fuse_lowlevel.h"
13 #include "fuse_misc.h"
14 #include "fuse_kernel.h"
15 #include "fuse_i.h"
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <signal.h>
22 #include <semaphore.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <sys/ioctl.h>
26 #include <assert.h>
27 #include <limits.h>
28 
29 /* Environment var controlling the thread stack size */
30 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
31 
32 #define FUSE_LOOP_MT_V2_IDENTIFIER INT_MAX - 2
33 #define FUSE_LOOP_MT_DEF_CLONE_FD 0
34 #define FUSE_LOOP_MT_DEF_MAX_THREADS 10
35 #define FUSE_LOOP_MT_DEF_IDLE_THREADS -1 /* thread destruction is disabled
36  * by default */
37 
38 struct fuse_worker {
39  struct fuse_worker *prev;
40  struct fuse_worker *next;
41  pthread_t thread_id;
42 
43  // We need to include fuse_buf so that we can properly free
44  // it when a thread is terminated by pthread_cancel().
45  struct fuse_buf fbuf;
46  struct fuse_chan *ch;
47  struct fuse_mt *mt;
48 };
49 
50 struct fuse_mt {
51  pthread_mutex_t lock;
52  int numworker;
53  int numavail;
54  struct fuse_session *se;
55  struct fuse_worker main;
56  sem_t finish;
57  int exit;
58  int error;
59  int clone_fd;
60  int max_idle;
61  int max_threads;
62 };
63 
64 static struct fuse_chan *fuse_chan_new(int fd)
65 {
66  struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
67  if (ch == NULL) {
68  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate channel\n");
69  return NULL;
70  }
71 
72  memset(ch, 0, sizeof(*ch));
73  ch->fd = fd;
74  ch->ctr = 1;
75  pthread_mutex_init(&ch->lock, NULL);
76 
77  return ch;
78 }
79 
80 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
81 {
82  assert(ch->ctr > 0);
83  pthread_mutex_lock(&ch->lock);
84  ch->ctr++;
85  pthread_mutex_unlock(&ch->lock);
86 
87  return ch;
88 }
89 
90 void fuse_chan_put(struct fuse_chan *ch)
91 {
92  if (ch == NULL)
93  return;
94  pthread_mutex_lock(&ch->lock);
95  ch->ctr--;
96  if (!ch->ctr) {
97  pthread_mutex_unlock(&ch->lock);
98  close(ch->fd);
99  pthread_mutex_destroy(&ch->lock);
100  free(ch);
101  } else
102  pthread_mutex_unlock(&ch->lock);
103 }
104 
105 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
106 {
107  struct fuse_worker *prev = next->prev;
108  w->next = next;
109  w->prev = prev;
110  prev->next = w;
111  next->prev = w;
112 }
113 
114 static void list_del_worker(struct fuse_worker *w)
115 {
116  struct fuse_worker *prev = w->prev;
117  struct fuse_worker *next = w->next;
118  prev->next = next;
119  next->prev = prev;
120 }
121 
122 static int fuse_loop_start_thread(struct fuse_mt *mt);
123 
124 static void *fuse_do_work(void *data)
125 {
126  struct fuse_worker *w = (struct fuse_worker *) data;
127  struct fuse_mt *mt = w->mt;
128 
129  while (!fuse_session_exited(mt->se)) {
130  int isforget = 0;
131  int res;
132 
133  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
134  res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
135  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
136  if (res == -EINTR)
137  continue;
138  if (res <= 0) {
139  if (res < 0) {
140  fuse_session_exit(mt->se);
141  mt->error = res;
142  }
143  break;
144  }
145 
146  pthread_mutex_lock(&mt->lock);
147  if (mt->exit) {
148  pthread_mutex_unlock(&mt->lock);
149  return NULL;
150  }
151 
152  /*
153  * This disgusting hack is needed so that zillions of threads
154  * are not created on a burst of FORGET messages
155  */
156  if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
157  struct fuse_in_header *in = w->fbuf.mem;
158 
159  if (in->opcode == FUSE_FORGET ||
160  in->opcode == FUSE_BATCH_FORGET)
161  isforget = 1;
162  }
163 
164  if (!isforget)
165  mt->numavail--;
166  if (mt->numavail == 0 && mt->numworker < mt->max_threads)
167  fuse_loop_start_thread(mt);
168  pthread_mutex_unlock(&mt->lock);
169 
170  fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
171 
172  pthread_mutex_lock(&mt->lock);
173  if (!isforget)
174  mt->numavail++;
175 
176  /* creating and destroying threads is rather expensive - and there is
177  * not much gain from destroying existing threads. It is therefore
178  * discouraged to set max_idle to anything else than -1. If there
179  * is indeed a good reason to destruct threads it should be done
180  * delayed, a moving average might be useful for that.
181  */
182  if (mt->max_idle != -1 && mt->numavail > mt->max_idle) {
183  if (mt->exit) {
184  pthread_mutex_unlock(&mt->lock);
185  return NULL;
186  }
187  list_del_worker(w);
188  mt->numavail--;
189  mt->numworker--;
190  pthread_mutex_unlock(&mt->lock);
191 
192  pthread_detach(w->thread_id);
193  free(w->fbuf.mem);
194  fuse_chan_put(w->ch);
195  free(w);
196  return NULL;
197  }
198  pthread_mutex_unlock(&mt->lock);
199  }
200 
201  sem_post(&mt->finish);
202 
203  return NULL;
204 }
205 
206 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
207 {
208  sigset_t oldset;
209  sigset_t newset;
210  int res;
211  pthread_attr_t attr;
212  char *stack_size;
213 
214  /* Override default stack size
215  * XXX: This should ideally be a parameter option. It is rather
216  * well hidden here.
217  */
218  pthread_attr_init(&attr);
219  stack_size = getenv(ENVNAME_THREAD_STACK);
220  if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
221  fuse_log(FUSE_LOG_ERR, "fuse: invalid stack size: %s\n", stack_size);
222 
223  /* Disallow signal reception in worker threads */
224  sigemptyset(&newset);
225  sigaddset(&newset, SIGTERM);
226  sigaddset(&newset, SIGINT);
227  sigaddset(&newset, SIGHUP);
228  sigaddset(&newset, SIGQUIT);
229  pthread_sigmask(SIG_BLOCK, &newset, &oldset);
230  res = pthread_create(thread_id, &attr, func, arg);
231  pthread_sigmask(SIG_SETMASK, &oldset, NULL);
232  pthread_attr_destroy(&attr);
233  if (res != 0) {
234  fuse_log(FUSE_LOG_ERR, "fuse: error creating thread: %s\n",
235  strerror(res));
236  return -1;
237  }
238 
239  return 0;
240 }
241 
242 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
243 {
244  int res;
245  int clonefd;
246  uint32_t masterfd;
247  struct fuse_chan *newch;
248  const char *devname = "/dev/fuse";
249 
250 #ifndef O_CLOEXEC
251 #define O_CLOEXEC 0
252 #endif
253  clonefd = open(devname, O_RDWR | O_CLOEXEC);
254  if (clonefd == -1) {
255  fuse_log(FUSE_LOG_ERR, "fuse: failed to open %s: %s\n", devname,
256  strerror(errno));
257  return NULL;
258  }
259  fcntl(clonefd, F_SETFD, FD_CLOEXEC);
260 
261  masterfd = mt->se->fd;
262  res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
263  if (res == -1) {
264  fuse_log(FUSE_LOG_ERR, "fuse: failed to clone device fd: %s\n",
265  strerror(errno));
266  close(clonefd);
267  return NULL;
268  }
269  newch = fuse_chan_new(clonefd);
270  if (newch == NULL)
271  close(clonefd);
272 
273  return newch;
274 }
275 
276 static int fuse_loop_start_thread(struct fuse_mt *mt)
277 {
278  int res;
279 
280  struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
281  if (!w) {
282  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate worker structure\n");
283  return -1;
284  }
285  memset(w, 0, sizeof(struct fuse_worker));
286  w->fbuf.mem = NULL;
287  w->mt = mt;
288 
289  w->ch = NULL;
290  if (mt->clone_fd) {
291  w->ch = fuse_clone_chan(mt);
292  if(!w->ch) {
293  /* Don't attempt this again */
294  fuse_log(FUSE_LOG_ERR, "fuse: trying to continue "
295  "without -o clone_fd.\n");
296  mt->clone_fd = 0;
297  }
298  }
299 
300  res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
301  if (res == -1) {
302  fuse_chan_put(w->ch);
303  free(w);
304  return -1;
305  }
306  list_add_worker(w, &mt->main);
307  mt->numavail ++;
308  mt->numworker ++;
309 
310  return 0;
311 }
312 
313 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
314 {
315  pthread_join(w->thread_id, NULL);
316  pthread_mutex_lock(&mt->lock);
317  list_del_worker(w);
318  pthread_mutex_unlock(&mt->lock);
319  free(w->fbuf.mem);
320  fuse_chan_put(w->ch);
321  free(w);
322 }
323 
324 int fuse_session_loop_mt_312(struct fuse_session *se, struct fuse_loop_config *config);
325 FUSE_SYMVER("fuse_session_loop_mt_312", "fuse_session_loop_mt@@FUSE_3.12")
326 int fuse_session_loop_mt_312(struct fuse_session *se, struct fuse_loop_config *config)
327 {
328 int err;
329  struct fuse_mt mt;
330  struct fuse_worker *w;
331  int created_config = 0;
332 
333  if (config) {
334  err = fuse_loop_cfg_verify(config);
335  if (err)
336  return err;
337  } else {
338  /* The caller does not care about parameters - use the default */
339  config = fuse_loop_cfg_create();
340  created_config = 1;
341  }
342 
343 
344  memset(&mt, 0, sizeof(struct fuse_mt));
345  mt.se = se;
346  mt.clone_fd = config->clone_fd;
347  mt.error = 0;
348  mt.numworker = 0;
349  mt.numavail = 0;
350  mt.max_idle = config->max_idle_threads;
351  mt.max_threads = config->max_threads;
352  mt.main.thread_id = pthread_self();
353  mt.main.prev = mt.main.next = &mt.main;
354  sem_init(&mt.finish, 0, 0);
355  pthread_mutex_init(&mt.lock, NULL);
356 
357  pthread_mutex_lock(&mt.lock);
358  err = fuse_loop_start_thread(&mt);
359  pthread_mutex_unlock(&mt.lock);
360  if (!err) {
361  /* sem_wait() is interruptible */
362  while (!fuse_session_exited(se))
363  sem_wait(&mt.finish);
364 
365  pthread_mutex_lock(&mt.lock);
366  for (w = mt.main.next; w != &mt.main; w = w->next)
367  pthread_cancel(w->thread_id);
368  mt.exit = 1;
369  pthread_mutex_unlock(&mt.lock);
370 
371  while (mt.main.next != &mt.main)
372  fuse_join_worker(&mt, mt.main.next);
373 
374  err = mt.error;
375  }
376 
377  pthread_mutex_destroy(&mt.lock);
378  sem_destroy(&mt.finish);
379  if(se->error != 0)
380  err = se->error;
381  fuse_session_reset(se);
382 
383  if (created_config) {
384  fuse_loop_cfg_destroy(config);
385  config = NULL;
386  }
387 
388  return err;
389 }
390 
391 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config_v1 *config_v1);
392 FUSE_SYMVER("fuse_session_loop_mt_32", "fuse_session_loop_mt@FUSE_3.2")
393 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config_v1 *config_v1)
394 {
395  int err;
396  struct fuse_loop_config *config = NULL;
397 
398  if (config_v1 != NULL) {
399  /* convert the given v1 config */
400  config = fuse_loop_cfg_create();
401  if (config == NULL)
402  return ENOMEM;
403 
404  fuse_loop_cfg_convert(config, config_v1);
405  }
406 
407  err = fuse_session_loop_mt_312(se, config);
408 
409  fuse_loop_cfg_destroy(config);
410 
411  return err;
412 }
413 
414 
415 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
416 FUSE_SYMVER("fuse_session_loop_mt_31", "fuse_session_loop_mt@FUSE_3.0")
417 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
418 {
419  struct fuse_loop_config *config = fuse_loop_cfg_create();
420  if (clone_fd > 0)
422  return fuse_session_loop_mt_312(se, config);
423 }
424 
426 {
427  struct fuse_loop_config *config = calloc(1, sizeof(*config));
428  if (config == NULL)
429  return NULL;
430 
431  config->version_id = FUSE_LOOP_MT_V2_IDENTIFIER;
432  config->max_idle_threads = FUSE_LOOP_MT_DEF_IDLE_THREADS;
433  config->max_threads = FUSE_LOOP_MT_DEF_MAX_THREADS;
434  config->clone_fd = FUSE_LOOP_MT_DEF_CLONE_FD;
435 
436  return config;
437 }
438 
439 void fuse_loop_cfg_destroy(struct fuse_loop_config *config)
440 {
441  free(config);
442 }
443 
444 int fuse_loop_cfg_verify(struct fuse_loop_config *config)
445 {
446  if (config->version_id != FUSE_LOOP_MT_V2_IDENTIFIER)
447  return -EINVAL;
448 
449  return 0;
450 }
451 
452 void fuse_loop_cfg_convert(struct fuse_loop_config *config,
453  struct fuse_loop_config_v1 *v1_conf)
454 {
456 
457  fuse_loop_cfg_set_clone_fd(config, v1_conf->clone_fd);
458 }
459 
461  unsigned int value)
462 {
463  config->max_idle_threads = value;
464 }
465 
467  unsigned int value)
468 {
469  config->max_threads = value;
470 }
471 
472 void fuse_loop_cfg_set_clone_fd(struct fuse_loop_config *config,
473  unsigned int value)
474 {
475  config->clone_fd = value;
476 }
477 
@ FUSE_BUF_IS_FD
Definition: fuse_common.h:633
void fuse_log(enum fuse_log_level level, const char *fmt,...)
Definition: fuse_log.c:33
void fuse_session_exit(struct fuse_session *se)
int fuse_session_exited(struct fuse_session *se)
void fuse_session_reset(struct fuse_session *se)
void fuse_loop_cfg_convert(struct fuse_loop_config *config, struct fuse_loop_config_v1 *v1_conf)
Definition: fuse_loop_mt.c:451
void fuse_loop_cfg_set_idle_threads(struct fuse_loop_config *config, unsigned int value)
Definition: fuse_loop_mt.c:459
struct fuse_loop_config * fuse_loop_cfg_create(void)
Definition: fuse_loop_mt.c:424
void fuse_loop_cfg_set_clone_fd(struct fuse_loop_config *config, unsigned int value)
Definition: fuse_loop_mt.c:471
void fuse_loop_cfg_destroy(struct fuse_loop_config *config)
Definition: fuse_loop_mt.c:438
void fuse_loop_cfg_set_max_threads(struct fuse_loop_config *config, unsigned int value)
Definition: fuse_loop_mt.c:465
unsigned int max_threads
Definition: fuse_i.h:137
unsigned int max_idle_threads
Definition: fuse_common.h:124