Warning: Attempt to read property "date" on null in /usr/local/www/websvn.planix.org/blame.php on line 247

Warning: Attempt to read property "msg" on null in /usr/local/www/websvn.planix.org/blame.php on line 247
WebSVN – planix.SVN – Blame – /os/branches/feature_fixcpp/sys/src/cmd/upas/q/runq.c – Rev 2

Subversion Repositories planix.SVN

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include "common.h"
2
#include <ctype.h>
3
 
4
void	doalldirs(void);
5
void	dodir(char*);
6
void	dofile(Dir*);
7
void	rundir(char*);
8
char*	file(char*, char);
9
void	warning(char*, void*);
10
void	error(char*, void*);
11
int	returnmail(char**, char*, char*);
12
void	logit(char*, char*, char**);
13
void	doload(int);
14
 
15
#define HUNK 32
16
char	*cmd;
17
char	*root;
18
int	debug;
19
int	giveup = 2*24*60*60;
20
int	load;
21
int	limit;
22
 
23
/* the current directory */
24
Dir	*dirbuf;
25
long	ndirbuf = 0;
26
int	nfiles;
27
char	*curdir;
28
 
29
char *runqlog = "runq";
30
 
31
int	*pidlist;
32
char	**badsys;		/* array of recalcitrant systems */
33
int	nbad;
34
int	npid = 50;
35
int	sflag;			/* single thread per directory */
36
int	aflag;			/* all directories */
37
int	Eflag;			/* ignore E.xxxxxx dates */
38
int	Rflag;			/* no giving up, ever */
39
 
40
void
41
usage(void)
42
{
43
	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
44
	exits("");
45
}
46
 
47
void
48
main(int argc, char **argv)
49
{
50
	char *qdir, *x;
51
 
52
	qdir = 0;
53
 
54
	ARGBEGIN{
55
	case 'l':
56
		x = ARGF();
57
		if(x == 0)
58
			usage();
59
		load = atoi(x);
60
		if(load < 0)
61
			load = 0;
62
		break;
63
	case 'E':
64
		Eflag++;
65
		break;
66
	case 'R':	/* no giving up -- just leave stuff in the queue */
67
		Rflag++;
68
		break;
69
	case 'a':
70
		aflag++;
71
		break;
72
	case 'd':
73
		debug++;
74
		break;
75
	case 'r':
76
		limit = atoi(ARGF());
77
		break;
78
	case 's':
79
		sflag++;
80
		break;
81
	case 't':
82
		giveup = 60*60*atoi(ARGF());
83
		break;
84
	case 'q':
85
		qdir = ARGF();
86
		if(qdir == 0)
87
			usage();
88
		break;
89
	case 'n':
90
		npid = atoi(ARGF());
91
		if(npid == 0)
92
			usage();
93
		break;
94
	}ARGEND;
95
 
96
	if(argc != 2)
97
		usage();
98
 
99
	pidlist = malloc(npid*sizeof(*pidlist));
100
	if(pidlist == 0)
101
		error("can't malloc", 0);
102
 
103
	if(aflag == 0 && qdir == 0) {
104
		qdir = getuser();
105
		if(qdir == 0)
106
			error("unknown user", 0);
107
	}
108
	root = argv[0];
109
	cmd = argv[1];
110
 
111
	if(chdir(root) < 0)
112
		error("can't cd to %s", root);
113
 
114
	doload(1);
115
	if(aflag)
116
		doalldirs();
117
	else
118
		dodir(qdir);
119
	doload(0);
120
	exits(0);
121
}
122
 
123
int
124
emptydir(char *name)
125
{
126
	int fd;
127
	long n;
128
	char buf[2048];
129
 
130
	fd = open(name, OREAD);
131
	if(fd < 0)
132
		return 1;
133
	n = read(fd, buf, sizeof(buf));
134
	close(fd);
135
	if(n <= 0) {
136
		if(debug)
137
			fprint(2, "removing directory %s\n", name);
138
		syslog(0, runqlog, "rmdir %s", name);
139
		sysremove(name);
140
		return 1;
141
	}
142
	return 0;
143
}
144
 
145
int
146
forkltd(void)
147
{
148
	int i;
149
	int pid;
150
 
151
	for(i = 0; i < npid; i++){
152
		if(pidlist[i] <= 0)
153
			break;
154
	}
155
 
156
	while(i >= npid){
157
		pid = waitpid();
158
		if(pid < 0){
159
			syslog(0, runqlog, "forkltd confused");
160
			exits(0);
161
		}
162
 
163
		for(i = 0; i < npid; i++)
164
			if(pidlist[i] == pid)
165
				break;
166
	}
167
	pidlist[i] = fork();
168
	return pidlist[i];
169
}
170
 
171
/*
172
 *  run all user directories, must be bootes (or root on unix) to do this
173
 */
174
void
175
doalldirs(void)
176
{
177
	Dir *db;
178
	int fd;
179
	long i, n;
180
 
181
 
182
	fd = open(".", OREAD);
183
	if(fd == -1){
184
		warning("reading %s", root);
185
		return;
186
	}
187
	n = sysdirreadall(fd, &db);
188
	if(n > 0){
189
		for(i=0; i<n; i++){
190
			if(db[i].qid.type & QTDIR){
191
				if(emptydir(db[i].name))
192
					continue;
193
				switch(forkltd()){
194
				case -1:
195
					syslog(0, runqlog, "out of procs");
196
					doload(0);
197
					exits(0);
198
				case 0:
199
					if(sysdetach() < 0)
200
						error("%r", 0);
201
					dodir(db[i].name);
202
					exits(0);
203
				default:
204
					break;
205
				}
206
			}
207
		}
208
		free(db);
209
	}
210
	close(fd);
211
}
212
 
213
/*
214
 *  cd to a user directory and run it
215
 */
216
void
217
dodir(char *name)
218
{
219
	curdir = name;
220
 
221
	if(chdir(name) < 0){
222
		warning("cd to %s", name);
223
		return;
224
	}
225
	if(debug)
226
		fprint(2, "running %s\n", name);
227
	rundir(name);
228
	chdir("..");
229
}
230
 
231
/*
232
 *  run the current directory
233
 */
234
void
235
rundir(char *name)
236
{
237
	int fd;
238
	long i;
239
 
240
	if(aflag && sflag)
241
		fd = sysopenlocked(".", OREAD);
242
	else
243
		fd = open(".", OREAD);
244
	if(fd == -1){
245
		warning("reading %s", name);
246
		return;
247
	}
248
	nfiles = sysdirreadall(fd, &dirbuf);
249
	if(nfiles > 0){
250
		for(i=0; i<nfiles; i++){
251
			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
252
				continue;
253
			dofile(&dirbuf[i]);
254
		}
255
		free(dirbuf);
256
	}
257
	if(aflag && sflag)
258
		sysunlockfile(fd);
259
	else
260
		close(fd);
261
}
262
 
263
/*
264
 *  free files matching name in the current directory
265
 */
266
void
267
remmatch(char *name)
268
{
269
	long i;
270
 
271
	syslog(0, runqlog, "removing %s/%s", curdir, name);
272
 
273
	for(i=0; i<nfiles; i++){
274
		if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
275
			sysremove(dirbuf[i].name);
276
	}
277
 
278
	/* error file (may have) appeared after we read the directory */
279
	/* stomp on data file in case of phase error */
280
	sysremove(file(name, 'D'));
281
	sysremove(file(name, 'E'));
282
}
283
 
284
/*
285
 *  like trylock, but we've already got the lock on fd,
286
 *  and don't want an L. lock file.
287
 */
288
static Mlock *
289
keeplockalive(char *path, int fd)
290
{
291
	char buf[1];
292
	Mlock *l;
293
 
294
	l = malloc(sizeof(Mlock));
295
	if(l == 0)
296
		return 0;
297
	l->fd = fd;
298
	l->name = s_new();
299
	s_append(l->name, path);
300
 
301
	/* fork process to keep lock alive until sysunlock(l) */
302
	switch(l->pid = rfork(RFPROC)){
303
	default:
304
		break;
305
	case 0:
306
		fd = l->fd;
307
		for(;;){
308
			sleep(1000*60);
309
			if(pread(fd, buf, 1, 0) < 0)
310
				break;
311
		}
312
		_exits(0);
313
	}
314
	return l;
315
}
316
 
317
/*
318
 *  try a message
319
 */
320
void
321
dofile(Dir *dp)
322
{
323
	Dir *d;
324
	int dfd, ac, dtime, efd, pid, i, etime;
325
	char *buf, *cp, **av;
326
	Waitmsg *wm;
327
	Biobuf *b;
328
	Mlock *l = nil;
329
 
330
	if(debug)
331
		fprint(2, "dofile %s\n", dp->name);
332
	/*
333
	 *  if no data file or empty control or data file, just clean up
334
	 *  the empty control file must be 15 minutes old, to minimize the
335
	 *  chance of a race.
336
	 */
337
	d = dirstat(file(dp->name, 'D'));
338
	if(d == nil){
339
		syslog(0, runqlog, "no data file for %s", dp->name);
340
		remmatch(dp->name);
341
		return;
342
	}
343
	if(dp->length == 0){
344
		if(time(0)-dp->mtime > 15*60){
345
			syslog(0, runqlog, "empty ctl file for %s", dp->name);
346
			remmatch(dp->name);
347
		}
348
		return;
349
	}
350
	dtime = d->mtime;
351
	free(d);
352
 
353
	/*
354
	 *  retry times depend on the age of the errors file
355
	 */
356
	if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
357
		etime = d->mtime;
358
		free(d);
359
		if(etime - dtime < 15*60){
360
			/* up to the first 15 minutes, every 30 seconds */
361
			if(time(0) - etime < 30)
362
				return;
363
		} else if(etime - dtime < 60*60){
364
			/* up to the first hour, try every 15 minutes */
365
			if(time(0) - etime < 15*60)
366
				return;
367
		} else {
368
			/* after the first hour, try once an hour */
369
			if(time(0) - etime < 60*60)
370
				return;
371
		}
372
 
373
	}
374
 
375
	/*
376
	 *  open control and data
377
	 */
378
	b = sysopen(file(dp->name, 'C'), "rl", 0660);
379
	if(b == 0) {
380
		if(debug)
381
			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
382
		return;
383
	}
384
	dfd = open(file(dp->name, 'D'), OREAD);
385
	if(dfd < 0){
386
		if(debug)
387
			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
388
		Bterm(b);
389
		sysunlockfile(Bfildes(b));
390
		return;
391
	}
392
 
393
	/*
394
	 *  make arg list
395
	 *	- read args into (malloc'd) buffer
396
	 *	- malloc a vector and copy pointers to args into it
397
	 */
398
	buf = malloc(dp->length+1);
399
	if(buf == 0){
400
		warning("buffer allocation", 0);
401
		Bterm(b);
402
		sysunlockfile(Bfildes(b));
403
		close(dfd);
404
		return;
405
	}
406
	if(Bread(b, buf, dp->length) != dp->length){
407
		warning("reading control file %s\n", dp->name);
408
		Bterm(b);
409
		sysunlockfile(Bfildes(b));
410
		close(dfd);
411
		free(buf);
412
		return;
413
	}
414
	buf[dp->length] = 0;
415
	av = malloc(2*sizeof(char*));
416
	if(av == 0){
417
		warning("argv allocation", 0);
418
		close(dfd);
419
		free(buf);
420
		Bterm(b);
421
		sysunlockfile(Bfildes(b));
422
		return;
423
	}
424
	for(ac = 1, cp = buf; *cp; ac++){
425
		while(isspace(*cp))
426
			*cp++ = 0;
427
		if(*cp == 0)
428
			break;
429
 
430
		av = realloc(av, (ac+2)*sizeof(char*));
431
		if(av == 0){
432
			warning("argv allocation", 0);
433
			close(dfd);
434
			free(buf);
435
			Bterm(b);
436
			sysunlockfile(Bfildes(b));
437
			return;
438
		}
439
		av[ac] = cp;
440
		while(*cp && !isspace(*cp)){
441
			if(*cp++ == '"'){
442
				while(*cp && *cp != '"')
443
					cp++;
444
				if(*cp)
445
					cp++;
446
			}
447
		}
448
	}
449
	av[0] = cmd;
450
	av[ac] = 0;
451
 
452
	if(!Eflag &&time(0) - dtime > giveup){
453
		if(returnmail(av, dp->name, "Giveup") != 0)
454
			logit("returnmail failed", dp->name, av);
455
		remmatch(dp->name);
456
		goto done;
457
	}
458
 
459
	for(i = 0; i < nbad; i++){
460
		if(strcmp(av[3], badsys[i]) == 0)
461
			goto done;
462
	}
463
 
464
	/*
465
	 * Ken's fs, for example, gives us 5 minutes of inactivity before
466
	 * the lock goes stale, so we have to keep reading it.
467
 	 */
468
	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
469
 
470
	/*
471
	 *  transfer
472
	 */
473
	pid = fork();
474
	switch(pid){
475
	case -1:
476
		sysunlock(l);
477
		sysunlockfile(Bfildes(b));
478
		syslog(0, runqlog, "out of procs");
479
		exits(0);
480
	case 0:
481
		if(debug) {
482
			fprint(2, "Starting %s", cmd);
483
			for(ac = 0; av[ac]; ac++)
484
				fprint(2, " %s", av[ac]);
485
			fprint(2, "\n");
486
		}
487
		logit("execing", dp->name, av);
488
		close(0);
489
		dup(dfd, 0);
490
		close(dfd);
491
		close(2);
492
		efd = open(file(dp->name, 'E'), OWRITE);
493
		if(efd < 0){
494
			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
495
			efd = create(file(dp->name, 'E'), OWRITE, 0666);
496
			if(efd < 0){
497
				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
498
				exits("could not open error file - Retry");
499
			}
500
		}
501
		seek(efd, 0, 2);
502
		exec(cmd, av);
503
		error("can't exec %s", cmd);
504
		break;
505
	default:
506
		for(;;){
507
			wm = wait();
508
			if(wm == nil)
509
				error("wait failed: %r", "");
510
			if(wm->pid == pid)
511
				break;
512
			free(wm);
513
		}
514
		if(debug)
515
			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
516
 
517
		if(wm->msg[0]){
518
			if(debug)
519
				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
520
			syslog(0, runqlog, "message: %s\n", wm->msg);
521
			if(strstr(wm->msg, "Ignore") != nil){
522
				/* fix for fish/chips, leave message alone */
523
				logit("ignoring", dp->name, av);
524
			}else if(!Rflag && strstr(wm->msg, "Retry")==0){
525
				/* return the message and remove it */
526
				if(returnmail(av, dp->name, wm->msg) != 0)
527
					logit("returnmail failed", dp->name, av);
528
				remmatch(dp->name);
529
			} else {
530
				/* add sys to bad list and try again later */
531
				nbad++;
532
				badsys = realloc(badsys, nbad*sizeof(char*));
533
				badsys[nbad-1] = strdup(av[3]);
534
			}
535
		} else {
536
			/* it worked remove the message */
537
			remmatch(dp->name);
538
		}
539
		free(wm);
540
 
541
	}
542
done:
543
	if (l)
544
		sysunlock(l);
545
	Bterm(b);
546
	sysunlockfile(Bfildes(b));
547
	free(buf);
548
	free(av);
549
	close(dfd);
550
}
551
 
552
 
553
/*
554
 *  return a name starting with the given character
555
 */
556
char*
557
file(char *name, char type)
558
{
559
	static char nname[Elemlen+1];
560
 
561
	strncpy(nname, name, Elemlen);
562
	nname[Elemlen] = 0;
563
	nname[0] = type;
564
	return nname;
565
}
566
 
567
/*
568
 *  send back the mail with an error message
569
 *
570
 *  return 0 if successful
571
 */
572
int
573
returnmail(char **av, char *name, char *msg)
574
{
575
	int pfd[2];
576
	Waitmsg *wm;
577
	int fd;
578
	char buf[256];
579
	char attachment[256];
580
	int i;
581
	long n;
582
	String *s;
583
	char *sender;
584
 
585
	if(av[1] == 0 || av[2] == 0){
586
		logit("runq - dumping bad file", name, av);
587
		return 0;
588
	}
589
 
590
	s = unescapespecial(s_copy(av[2]));
591
	sender = s_to_c(s);
592
 
593
	if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
594
		logit("runq - dumping p to p mail", name, av);
595
		return 0;
596
	}
597
 
598
	if(pipe(pfd) < 0){
599
		logit("runq - pipe failed", name, av);
600
		return -1;
601
	}
602
 
603
	switch(rfork(RFFDG|RFPROC|RFENVG)){
604
	case -1:
605
		logit("runq - fork failed", name, av);
606
		return -1;
607
	case 0:
608
		logit("returning", name, av);
609
		close(pfd[1]);
610
		close(0);
611
		dup(pfd[0], 0);
612
		close(pfd[0]);
613
		putenv("upasname", "/dev/null");
614
		snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
615
		snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
616
		execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
617
		error("can't exec", 0);
618
		break;
619
	default:
620
		break;
621
	}
622
 
623
	close(pfd[0]);
624
	fprint(pfd[1], "\n");	/* get out of headers */
625
	if(av[1]){
626
		fprint(pfd[1], "Your request ``%.20s ", av[1]);
627
		for(n = 3; av[n]; n++)
628
			fprint(pfd[1], "%s ", av[n]);
629
	}
630
	fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
631
	fd = open(file(name, 'E'), OREAD);
632
	if(fd >= 0){
633
		for(;;){
634
			n = read(fd, buf, sizeof(buf));
635
			if(n <= 0)
636
				break;
637
			if(write(pfd[1], buf, n) != n){
638
				close(fd);
639
				goto out;
640
			}
641
		}
642
		close(fd);
643
	}
644
	close(pfd[1]);
645
out:
646
	wm = wait();
647
	if(wm == nil){
648
		syslog(0, "runq", "wait: %r");
649
		logit("wait failed", name, av);
650
		return -1;
651
	}
652
	i = 0;
653
	if(wm->msg[0]){
654
		i = -1;
655
		syslog(0, "runq", "returnmail child: %s", wm->msg);
656
		logit("returnmail child failed", name, av);
657
	}
658
	free(wm);
659
	return i;
660
}
661
 
662
/*
663
 *  print a warning and continue
664
 */
665
void
666
warning(char *f, void *a)
667
{
668
	char err[65];
669
	char buf[256];
670
 
671
	rerrstr(err, sizeof(err));
672
	snprint(buf, sizeof(buf), f, a);
673
	fprint(2, "runq: %s: %s\n", buf, err);
674
}
675
 
676
/*
677
 *  print an error and die
678
 */
679
void
680
error(char *f, void *a)
681
{
682
	char err[Errlen];
683
	char buf[256];
684
 
685
	rerrstr(err, sizeof(err));
686
	snprint(buf, sizeof(buf), f, a);
687
	fprint(2, "runq: %s: %s\n", buf, err);
688
	exits(buf);
689
}
690
 
691
void
692
logit(char *msg, char *file, char **av)
693
{
694
	int n, m;
695
	char buf[256];
696
 
697
	n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
698
	for(; *av; av++){
699
		m = strlen(*av);
700
		if(n + m + 4 > sizeof(buf))
701
			break;
702
		sprint(buf + n, " '%s'", *av);
703
		n += m + 3;
704
	}
705
	syslog(0, runqlog, "%s", buf);
706
}
707
 
708
char *loadfile = ".runqload";
709
 
710
/*
711
 *  load balancing
712
 */
713
void
714
doload(int start)
715
{
716
	int fd;
717
	char buf[32];
718
	int i, n;
719
	Mlock *l;
720
	Dir *d;
721
 
722
	if(load <= 0)
723
		return;
724
 
725
	if(chdir(root) < 0){
726
		load = 0;
727
		return;
728
	}
729
 
730
	l = syslock(loadfile);
731
	fd = open(loadfile, ORDWR);
732
	if(fd < 0){
733
		fd = create(loadfile, 0666, ORDWR);
734
		if(fd < 0){
735
			load = 0;
736
			sysunlock(l);
737
			return;
738
		}
739
	}
740
 
741
	/* get current load */
742
	i = 0;
743
	n = read(fd, buf, sizeof(buf)-1);
744
	if(n >= 0){
745
		buf[n] = 0;
746
		i = atoi(buf);
747
	}
748
	if(i < 0)
749
		i = 0;
750
 
751
	/* ignore load if file hasn't been changed in 30 minutes */
752
	d = dirfstat(fd);
753
	if(d != nil){
754
		if(d->mtime + 30*60 < time(0))
755
			i = 0;
756
		free(d);
757
	}
758
 
759
	/* if load already too high, give up */
760
	if(start && i >= load){
761
		sysunlock(l);
762
		exits(0);
763
	}
764
 
765
	/* increment/decrement load */
766
	if(start)
767
		i++;
768
	else
769
		i--;
770
	seek(fd, 0, 0);
771
	fprint(fd, "%d\n", i);
772
	sysunlock(l);
773
	close(fd);
774
}