Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#define  _BSDTIME_EXTENSION
2
#define _LOCK_EXTENSION
3
#include "lib.h"
4
#include <stdlib.h>
5
#include <errno.h>
6
#include <unistd.h>
7
#include <signal.h>
8
#include <string.h>
9
#include <stdio.h>
10
#include <lock.h>
11
#include <sys/time.h>
12
#include <sys/select.h>
13
#include <unistd.h>
14
#include "sys9.h"
15
 
16
typedef struct Muxseg {
17
	Lock	lock;			/* for mutual exclusion access to buffer variables */
18
	int	curfds;			/* number of fds currently buffered */
19
	int	selwait;		/* true if selecting process is waiting */
20
	int	waittime;		/* time for timer process to wait */
21
	fd_set	rwant;			/* fd's that select wants to read */
22
	fd_set	ewant;			/* fd's that select wants to know eof info on */
23
	Muxbuf	bufs[INITBUFS];		/* can grow, via segbrk() */
24
} Muxseg;
25
 
26
#define MUXADDR ((void*)0x6000000)
27
static Muxseg *mux = 0;			/* shared memory segment */
28
 
29
/* _muxsid and _killmuxsid are known in libbsd's listen.c */
30
int _muxsid = -1;			/* group id of copy processes */
31
static int _mainpid = -1;
32
static int timerpid = -1;		/* pid of a timer process */
33
 
34
void _killmuxsid(void);
35
static void _copyproc(int, Muxbuf*);
36
static void _timerproc(void);
37
static void _resettimer(void);
38
 
39
static int copynotehandler(void *, char *);
40
 
41
/* assume FD_SETSIZE is 96 */
42
#define FD_ANYSET(p)	((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
43
 
44
/*
45
 * Start making fd read-buffered: make the shared segment, if necessary,
46
 * allocate a slot (index into mux->bufs), and fork a child to read the fd
47
 * and write into the slot-indexed buffer.
48
 * Return -1 if we can't do it.
49
 */
50
int
51
_startbuf(int fd)
52
{
53
	long i, slot;
54
	int pid;
55
	Fdinfo *f;
56
	Muxbuf *b;
57
 
58
	if(mux == 0){
59
		_RFORK(RFREND);
60
		mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61
		if((long)mux == -1){
62
			_syserrno();
63
			return -1;
64
		}
65
		/* segattach has returned zeroed memory */
66
		atexit(_killmuxsid);
67
	}
68
 
69
	if(fd == -1)
70
		return 0;
71
 
72
	lock(&mux->lock);
73
	slot = mux->curfds++;
74
	if(mux->curfds > INITBUFS) {
75
		if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
76
			_syserrno();
77
			unlock(&mux->lock);
78
			return -1;
79
		}
80
	}
81
 
82
	f = &_fdinfo[fd];
83
	b = &mux->bufs[slot];
84
	b->n = 0;
85
	b->putnext = b->data;
86
	b->getnext = b->data;
87
	b->eof = 0;
88
	b->fd = fd;
89
	if(_mainpid == -1)
90
		_mainpid = getpid();
91
	if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
92
		/* copy process ... */
93
		if(_muxsid == -1) {
94
			_RFORK(RFNOTEG);
95
			_muxsid = getpgrp();
96
		} else
97
			setpgid(getpid(), _muxsid);
98
		_NOTIFY(copynotehandler);
99
		for(i=0; i<OPEN_MAX; i++)
100
			if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
101
				_CLOSE(i);
102
		_RENDEZVOUS(0, _muxsid);
103
		_copyproc(fd, b);
104
	}
105
 
106
	/* parent process continues ... */
107
	b->copypid = pid;
108
	f->buf = b;
109
	f->flags |= FD_BUFFERED;
110
	unlock(&mux->lock);
111
	_muxsid = _RENDEZVOUS(0, 0);
112
	/* leave fd open in parent so system doesn't reuse it */
113
	return 0;
114
}
115
 
116
/*
117
 * The given buffered fd is being closed.
118
 * Set the fd field in the shared buffer to -1 to tell copyproc
119
 * to exit, and kill the copyproc.
120
 */
121
void
122
_closebuf(int fd)
123
{
124
	Muxbuf *b;
125
 
126
	b = _fdinfo[fd].buf;
127
	if(!b)
128
		return;
129
	lock(&mux->lock);
130
	b->fd = -1;
131
	unlock(&mux->lock);
132
	kill(b->copypid, SIGKILL);
133
}
134
 
135
/* child copy procs execute this until eof */
136
static void
137
_copyproc(int fd, Muxbuf *b)
138
{
139
	unsigned char *e;
140
	int n;
141
	int nzeros;
142
 
143
	e = &b->data[PERFDMAX];
144
	for(;;) {
145
		/* make sure there's room */
146
		lock(&mux->lock);
147
		if(e - b->putnext < READMAX) {
148
			if(b->getnext == b->putnext) {
149
				b->getnext = b->putnext = b->data;
150
				unlock(&mux->lock);
151
			} else {
152
				/* sleep until there's room */
153
				b->roomwait = 1;
154
				unlock(&mux->lock);
155
				_RENDEZVOUS((unsigned long)&b->roomwait, 0);
156
			}
157
		} else
158
			unlock(&mux->lock);
159
		/*
160
		 * A Zero-length _READ might mean a zero-length write
161
		 * happened, or it might mean eof; try several times to
162
		 * disambiguate (posix read() discards 0-length messages)
163
		 */
164
		nzeros = 0;
165
		do {
166
			n = _READ(fd, b->putnext, READMAX);
167
			if(b->fd == -1) {
168
				_exit(0);		/* we've been closed */
169
			}
170
		} while(n == 0 && ++nzeros < 3);
171
		lock(&mux->lock);
172
		if(n <= 0) {
173
			b->eof = 1;
174
			if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
175
				mux->selwait = 0;
176
				unlock(&mux->lock);
177
				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
178
			} else if(b->datawait) {
179
				b->datawait = 0;
180
				unlock(&mux->lock);
181
				_RENDEZVOUS((unsigned long)&b->datawait, 0);
182
			} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
183
				mux->selwait = 0;
184
				unlock(&mux->lock);
185
				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
186
			} else
187
				unlock(&mux->lock);
188
			_exit(0);
189
		} else {
190
			b->putnext += n;
191
			b->n += n;
192
			if(b->n > 0) {
193
				/* parent process cannot be both in datawait and selwait */
194
				if(b->datawait) {
195
					b->datawait = 0;
196
					unlock(&mux->lock);
197
					/* wake up _bufreading process */
198
					_RENDEZVOUS((unsigned long)&b->datawait, 0);
199
				} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
200
					mux->selwait = 0;
201
					unlock(&mux->lock);
202
					/* wake up selecting process */
203
					_RENDEZVOUS((unsigned long)&mux->selwait, fd);
204
				} else
205
					unlock(&mux->lock);
206
			} else
207
				unlock(&mux->lock);
208
		}
209
	}
210
}
211
 
212
/* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
213
int
214
_readbuf(int fd, void *addr, int nwant, int noblock)
215
{
216
	Muxbuf *b;
217
	int ngot;
218
 
219
	b = _fdinfo[fd].buf;
220
	if(b->eof && b->n == 0) {
221
goteof:
222
		return 0;
223
	}
224
	if(b->n == 0 && noblock) {
225
		errno = EAGAIN;
226
		return -1;
227
	}
228
	/* make sure there's data */
229
	lock(&mux->lock);
230
	ngot = b->putnext - b->getnext;
231
	if(ngot == 0) {
232
		/* maybe EOF just happened */
233
		if(b->eof) {
234
			unlock(&mux->lock);
235
			goto goteof;
236
		}
237
		/* sleep until there's data */
238
		b->datawait = 1;
239
		unlock(&mux->lock);
240
		_RENDEZVOUS((unsigned long)&b->datawait, 0);
241
		lock(&mux->lock);
242
		ngot = b->putnext - b->getnext;
243
	}
244
	if(ngot == 0) {
245
		unlock(&mux->lock);
246
		goto goteof;
247
	}
248
	if(ngot > nwant)
249
		ngot = nwant;
250
	memcpy(addr, b->getnext, ngot);
251
	b->getnext += ngot;
252
	b->n -= ngot;
253
	if(b->getnext == b->putnext && b->roomwait) {
254
		b->getnext = b->putnext = b->data;
255
		b->roomwait = 0;
256
		unlock(&mux->lock);
257
		/* wake up copy process */
258
		_RENDEZVOUS((unsigned long)&b->roomwait, 0);
259
	} else
260
		unlock(&mux->lock);
261
	return ngot;
262
}
263
 
264
int
265
select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
266
{
267
	int n, i, t, slots, fd, err;
268
	Fdinfo *f;
269
	Muxbuf *b;
270
 
271
	if(timeout)
272
		t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
273
	else
274
		t = -1;
275
	if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
276
			|| (efds && FD_ANYSET(efds)))) {
277
		/* no requested fds */
278
		if(t > 0)
279
			_SLEEP(t);
280
		return 0;
281
	}
282
 
283
	_startbuf(-1);
284
 
285
	/* make sure all requested rfds and efds are buffered */
286
	if(nfds >= OPEN_MAX)
287
		nfds = OPEN_MAX;
288
	for(i = 0; i < nfds; i++)
289
		if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
290
			f = &_fdinfo[i];
291
			if(!(f->flags&FD_BUFFERED))
292
				if(_startbuf(i) != 0)
293
					return -1;
294
		}
295
 
296
	/* check wfds;  for now, we'll say they are all ready */
297
	n = 0;
298
	if(wfds && FD_ANYSET(wfds)){
299
		for(i = 0; i<nfds; i++)
300
			if(FD_ISSET(i, wfds)) {
301
				n++;
302
			}
303
	}
304
 
305
	lock(&mux->lock);
306
 
307
	slots = mux->curfds;
308
	FD_ZERO(&mux->rwant);
309
	FD_ZERO(&mux->ewant);
310
 
311
	for(i = 0; i<slots; i++) {
312
		b = &mux->bufs[i];
313
		fd = b->fd;
314
		if(fd == -1)
315
			continue;
316
		err = 0;
317
		if(efds && FD_ISSET(fd, efds)) {
318
			if(b->eof && b->n == 0){
319
				err = 1;
320
				n++;
321
			}else{
322
				FD_CLR(fd, efds);
323
				FD_SET(fd, &mux->ewant);
324
			}
325
		}
326
		if(rfds && FD_ISSET(fd, rfds)) {
327
			if(!err && (b->n > 0 || b->eof))
328
				n++;
329
			else{
330
				FD_CLR(fd, rfds);
331
				FD_SET(fd, &mux->rwant);
332
			}
333
		}
334
	}
335
	if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
336
		FD_ZERO(&mux->rwant);
337
		FD_ZERO(&mux->ewant);
338
		unlock(&mux->lock);
339
		return n;
340
	}
341
 
342
	if(timeout) {
343
		mux->waittime = t;
344
		if(timerpid == -1)
345
			_timerproc();
346
		else
347
			_resettimer();
348
	}
349
	mux->selwait = 1;
350
	unlock(&mux->lock);
351
	fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
352
	if(fd >= 0) {
353
		b = _fdinfo[fd].buf;
354
		if(FD_ISSET(fd, &mux->rwant)) {
355
			FD_SET(fd, rfds);
356
			n = 1;
357
		} else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
358
			FD_SET(fd, efds);
359
			n = 1;
360
		}
361
	}
362
	FD_ZERO(&mux->rwant);
363
	FD_ZERO(&mux->ewant);
364
	return n;
365
}
366
 
367
static int timerreset;
368
static int timerpid;
369
 
370
static void
371
alarmed(int)
372
{
373
	timerreset = 1;
374
}
375
 
376
/* a little over an hour */
377
#define LONGWAIT 4000001
378
 
379
static void
380
_killtimerproc(void)
381
{
382
	if(timerpid > 0)
383
		kill(timerpid, SIGKILL);
384
}
385
 
386
static void
387
_timerproc(void)
388
{
389
	int i;
390
 
391
	if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
392
		/* timer process */
393
		setpgid(getpid(), _muxsid);
394
		signal(SIGALRM, alarmed);
395
		for(i=0; i<OPEN_MAX; i++)
396
				_CLOSE(i);
397
		_RENDEZVOUS(1, 0);
398
		for(;;) {
399
			_SLEEP(mux->waittime);
400
			if(timerreset) {
401
				timerreset = 0;
402
			} else {
403
				lock(&mux->lock);
404
				if(mux->selwait && mux->waittime != LONGWAIT) {
405
					mux->selwait = 0;
406
					mux->waittime = LONGWAIT;
407
					unlock(&mux->lock);
408
					_RENDEZVOUS((unsigned long)&mux->selwait, -2);
409
				} else {
410
					mux->waittime = LONGWAIT;
411
					unlock(&mux->lock);
412
				}
413
			}
414
		}
415
	}
416
	atexit(_killtimerproc);
417
	/* parent process continues */
418
	_RENDEZVOUS(1, 0);
419
}
420
 
421
static void
422
_resettimer(void)
423
{
424
	kill(timerpid, SIGALRM);
425
}
426
 
427
void
428
_killmuxsid(void)
429
{
430
	if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
431
		kill(-_muxsid,SIGTERM);
432
}
433
 
434
/* call this on fork(), because reading a BUFFERED fd won't work in child */
435
void
436
_detachbuf(void)
437
{
438
	int i;
439
	Fdinfo *f;
440
 
441
	if(mux == 0)
442
		return;
443
	_SEGDETACH(mux);
444
	for(i = 0; i < OPEN_MAX; i++){
445
		f = &_fdinfo[i];
446
		if(f->flags&FD_BUFFERED)
447
			f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
448
				/* mark 'poisoned' */
449
	}
450
	mux = 0;
451
	_muxsid = -1;
452
	_mainpid = -1;
453
	timerpid = -1;
454
}
455
 
456
static int
457
copynotehandler(void *, char *)
458
{
459
	if(_finishing)
460
		_finish(0, 0);
461
	_NOTED(1);
462
	return 0;
463
}