Subversion Repositories planix.SVN

Rev

Blame | Last modification | View Log | RSS feed

#define  _BSDTIME_EXTENSION
#define _LOCK_EXTENSION
#include "lib.h"
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <lock.h>
#include <sys/time.h>
#include <sys/select.h>
#include <unistd.h>
#include "sys9.h"

typedef struct Muxseg {
        Lock    lock;                   /* for mutual exclusion access to buffer variables */
        int     curfds;                 /* number of fds currently buffered */
        int     selwait;                /* true if selecting process is waiting */
        int     waittime;               /* time for timer process to wait */
        fd_set  rwant;                  /* fd's that select wants to read */
        fd_set  ewant;                  /* fd's that select wants to know eof info on */
        Muxbuf  bufs[INITBUFS];         /* can grow, via segbrk() */
} Muxseg;

#define MUXADDR ((void*)0x6000000)
static Muxseg *mux = 0;                 /* shared memory segment */

/* _muxsid and _killmuxsid are known in libbsd's listen.c */
int _muxsid = -1;                       /* group id of copy processes */
static int _mainpid = -1;
static int timerpid = -1;               /* pid of a timer process */

void _killmuxsid(void);
static void _copyproc(int, Muxbuf*);
static void _timerproc(void);
static void _resettimer(void);

static int copynotehandler(void *, char *);

/* assume FD_SETSIZE is 96 */
#define FD_ANYSET(p)    ((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])

/*
 * Start making fd read-buffered: make the shared segment, if necessary,
 * allocate a slot (index into mux->bufs), and fork a child to read the fd
 * and write into the slot-indexed buffer.
 * Return -1 if we can't do it.
 */
int
_startbuf(int fd)
{
        long i, slot;
        int pid;
        Fdinfo *f;
        Muxbuf *b;

        if(mux == 0){
                _RFORK(RFREND);
                mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
                if((long)mux == -1){
                        _syserrno();
                        return -1;
                }
                /* segattach has returned zeroed memory */
                atexit(_killmuxsid);
        }

        if(fd == -1)
                return 0;

        lock(&mux->lock);
        slot = mux->curfds++;
        if(mux->curfds > INITBUFS) {
                if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
                        _syserrno();
                        unlock(&mux->lock);
                        return -1;
                }
        }

        f = &_fdinfo[fd];
        b = &mux->bufs[slot];
        b->n = 0;
        b->putnext = b->data;
        b->getnext = b->data;
        b->eof = 0;
        b->fd = fd;
        if(_mainpid == -1)
                _mainpid = getpid();
        if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
                /* copy process ... */
                if(_muxsid == -1) {
                        _RFORK(RFNOTEG);
                        _muxsid = getpgrp();
                } else
                        setpgid(getpid(), _muxsid);
                _NOTIFY(copynotehandler);
                for(i=0; i<OPEN_MAX; i++)
                        if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
                                _CLOSE(i);
                _RENDEZVOUS(0, _muxsid);
                _copyproc(fd, b);
        }

        /* parent process continues ... */
        b->copypid = pid;
        f->buf = b;
        f->flags |= FD_BUFFERED;
        unlock(&mux->lock);
        _muxsid = _RENDEZVOUS(0, 0);
        /* leave fd open in parent so system doesn't reuse it */
        return 0;
}

/*
 * The given buffered fd is being closed.
 * Set the fd field in the shared buffer to -1 to tell copyproc
 * to exit, and kill the copyproc.
 */
void
_closebuf(int fd)
{
        Muxbuf *b;

        b = _fdinfo[fd].buf;
        if(!b)
                return;
        lock(&mux->lock);
        b->fd = -1;
        unlock(&mux->lock);
        kill(b->copypid, SIGKILL);
}

/* child copy procs execute this until eof */
static void
_copyproc(int fd, Muxbuf *b)
{
        unsigned char *e;
        int n;
        int nzeros;

        e = &b->data[PERFDMAX];
        for(;;) {
                /* make sure there's room */
                lock(&mux->lock);
                if(e - b->putnext < READMAX) {
                        if(b->getnext == b->putnext) {
                                b->getnext = b->putnext = b->data;
                                unlock(&mux->lock);
                        } else {
                                /* sleep until there's room */
                                b->roomwait = 1;
                                unlock(&mux->lock);
                                _RENDEZVOUS((unsigned long)&b->roomwait, 0);
                        }
                } else
                        unlock(&mux->lock);
                /*
                 * A Zero-length _READ might mean a zero-length write
                 * happened, or it might mean eof; try several times to
                 * disambiguate (posix read() discards 0-length messages)
                 */
                nzeros = 0;
                do {
                        n = _READ(fd, b->putnext, READMAX);
                        if(b->fd == -1) {
                                _exit(0);               /* we've been closed */
                        }
                } while(n == 0 && ++nzeros < 3);
                lock(&mux->lock);
                if(n <= 0) {
                        b->eof = 1;
                        if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
                                mux->selwait = 0;
                                unlock(&mux->lock);
                                _RENDEZVOUS((unsigned long)&mux->selwait, fd);
                        } else if(b->datawait) {
                                b->datawait = 0;
                                unlock(&mux->lock);
                                _RENDEZVOUS((unsigned long)&b->datawait, 0);
                        } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
                                mux->selwait = 0;
                                unlock(&mux->lock);
                                _RENDEZVOUS((unsigned long)&mux->selwait, fd);
                        } else
                                unlock(&mux->lock);
                        _exit(0);
                } else {
                        b->putnext += n;
                        b->n += n;
                        if(b->n > 0) {
                                /* parent process cannot be both in datawait and selwait */
                                if(b->datawait) {
                                        b->datawait = 0;
                                        unlock(&mux->lock);
                                        /* wake up _bufreading process */
                                        _RENDEZVOUS((unsigned long)&b->datawait, 0);
                                } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
                                        mux->selwait = 0;
                                        unlock(&mux->lock);
                                        /* wake up selecting process */
                                        _RENDEZVOUS((unsigned long)&mux->selwait, fd);
                                } else
                                        unlock(&mux->lock);
                        } else
                                unlock(&mux->lock);
                }
        }
}

/* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
int
_readbuf(int fd, void *addr, int nwant, int noblock)
{
        Muxbuf *b;
        int ngot;

        b = _fdinfo[fd].buf;
        if(b->eof && b->n == 0) {
goteof:
                return 0;
        }
        if(b->n == 0 && noblock) {
                errno = EAGAIN;
                return -1;
        }
        /* make sure there's data */
        lock(&mux->lock);
        ngot = b->putnext - b->getnext;
        if(ngot == 0) {
                /* maybe EOF just happened */
                if(b->eof) {
                        unlock(&mux->lock);
                        goto goteof;
                }
                /* sleep until there's data */
                b->datawait = 1;
                unlock(&mux->lock);
                _RENDEZVOUS((unsigned long)&b->datawait, 0);
                lock(&mux->lock);
                ngot = b->putnext - b->getnext;
        }
        if(ngot == 0) {
                unlock(&mux->lock);
                goto goteof;
        }
        if(ngot > nwant)
                ngot = nwant;
        memcpy(addr, b->getnext, ngot);
        b->getnext += ngot;
        b->n -= ngot;
        if(b->getnext == b->putnext && b->roomwait) {
                b->getnext = b->putnext = b->data;
                b->roomwait = 0;
                unlock(&mux->lock);
                /* wake up copy process */
                _RENDEZVOUS((unsigned long)&b->roomwait, 0);
        } else
                unlock(&mux->lock);
        return ngot;
}

int
select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
{
        int n, i, t, slots, fd, err;
        Fdinfo *f;
        Muxbuf *b;

        if(timeout)
                t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
        else
                t = -1;
        if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
                        || (efds && FD_ANYSET(efds)))) {
                /* no requested fds */
                if(t > 0)
                        _SLEEP(t);
                return 0;
        }

        _startbuf(-1);

        /* make sure all requested rfds and efds are buffered */
        if(nfds >= OPEN_MAX)
                nfds = OPEN_MAX;
        for(i = 0; i < nfds; i++)
                if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
                        f = &_fdinfo[i];
                        if(!(f->flags&FD_BUFFERED))
                                if(_startbuf(i) != 0)
                                        return -1;
                }

        /* check wfds;  for now, we'll say they are all ready */
        n = 0;
        if(wfds && FD_ANYSET(wfds)){
                for(i = 0; i<nfds; i++)
                        if(FD_ISSET(i, wfds)) {
                                n++;
                        }
        }

        lock(&mux->lock);

        slots = mux->curfds;
        FD_ZERO(&mux->rwant);
        FD_ZERO(&mux->ewant);

        for(i = 0; i<slots; i++) {
                b = &mux->bufs[i];
                fd = b->fd;
                if(fd == -1)
                        continue;
                err = 0;
                if(efds && FD_ISSET(fd, efds)) {
                        if(b->eof && b->n == 0){
                                err = 1;
                                n++;
                        }else{
                                FD_CLR(fd, efds);
                                FD_SET(fd, &mux->ewant);
                        }
                }
                if(rfds && FD_ISSET(fd, rfds)) {
                        if(!err && (b->n > 0 || b->eof))
                                n++;
                        else{
                                FD_CLR(fd, rfds);
                                FD_SET(fd, &mux->rwant);
                        }
                }
        }
        if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
                FD_ZERO(&mux->rwant);
                FD_ZERO(&mux->ewant);
                unlock(&mux->lock);
                return n;
        }

        if(timeout) {
                mux->waittime = t;
                if(timerpid == -1)
                        _timerproc();
                else
                        _resettimer();
        }
        mux->selwait = 1;
        unlock(&mux->lock);
        fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
        if(fd >= 0) {
                b = _fdinfo[fd].buf;
                if(FD_ISSET(fd, &mux->rwant)) {
                        FD_SET(fd, rfds);
                        n = 1;
                } else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
                        FD_SET(fd, efds);
                        n = 1;
                }
        }
        FD_ZERO(&mux->rwant);
        FD_ZERO(&mux->ewant);
        return n;
}

static int timerreset;
static int timerpid;

static void
alarmed(int)
{
        timerreset = 1;
}

/* a little over an hour */
#define LONGWAIT 4000001

static void
_killtimerproc(void)
{
        if(timerpid > 0)
                kill(timerpid, SIGKILL);
}

static void
_timerproc(void)
{
        int i;

        if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
                /* timer process */
                setpgid(getpid(), _muxsid);
                signal(SIGALRM, alarmed);
                for(i=0; i<OPEN_MAX; i++)
                                _CLOSE(i);
                _RENDEZVOUS(1, 0);
                for(;;) {
                        _SLEEP(mux->waittime);
                        if(timerreset) {
                                timerreset = 0;
                        } else {
                                lock(&mux->lock);
                                if(mux->selwait && mux->waittime != LONGWAIT) {
                                        mux->selwait = 0;
                                        mux->waittime = LONGWAIT;
                                        unlock(&mux->lock);
                                        _RENDEZVOUS((unsigned long)&mux->selwait, -2);
                                } else {
                                        mux->waittime = LONGWAIT;
                                        unlock(&mux->lock);
                                }
                        }
                }
        }
        atexit(_killtimerproc);
        /* parent process continues */
        _RENDEZVOUS(1, 0);
}

static void
_resettimer(void)
{
        kill(timerpid, SIGALRM);
}

void
_killmuxsid(void)
{
        if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
                kill(-_muxsid,SIGTERM);
}

/* call this on fork(), because reading a BUFFERED fd won't work in child */
void
_detachbuf(void)
{
        int i;
        Fdinfo *f;

        if(mux == 0)
                return;
        _SEGDETACH(mux);
        for(i = 0; i < OPEN_MAX; i++){
                f = &_fdinfo[i];
                if(f->flags&FD_BUFFERED)
                        f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
                                /* mark 'poisoned' */
        }
        mux = 0;
        _muxsid = -1;
        _mainpid = -1;
        timerpid = -1;
}

static int
copynotehandler(void *, char *)
{
        if(_finishing)
                _finish(0, 0);
        _NOTED(1);
        return 0;
}