Subversion Repositories planix.SVN

Rev

Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

/* network i/o */

#include "all.h"
#include "io.h"
#include <fcall.h>              /* 9p2000 */
#include <thread.h>

enum {
        Maxfdata        = 8192,
        Nqueue          = 200,          /* queue size (tunable) */

        Netclosed       = 0,            /* Connection state */
        Netopen,
};

/*
 * the kernel file server read packets directly from
 * its ethernet(s) and did all the protocol processing.
 * if the incoming packets were 9p (over il/ip), they
 * were queued for the server processes to operate upon.
 *
 * in user mode, we have one process per incoming connection
 * instead, and those processes get just the data, minus
 * tcp and ip headers, so they just see a stream of 9p messages,
 * which they then queue for the server processes.
 *
 * there used to be more queueing (in the kernel), with separate
 * processes for ethernet input, il input, 9p processing, il output
 * and ethernet output, and queues connecting them.  we now let
 * the kernel's network queues, protocol stacks and processes do
 * much of this work.
 *
 * partly as a result of this, we can now process 9p messages
 * transported via tcp, exploit multiple x86 processors, and
 * were able to shed 70% of the file server's source, by line count.
 *
 * the upshot is that Ether (now Network) is no longer a perfect fit for
 * the way network i/o is done now.  the notion of `connection'
 * is being introduced to complement it.
 */

typedef struct Network Network;
typedef struct Netconn Netconn;
typedef struct Conn9p Conn9p;

/* a network, not necessarily an ethernet */
struct Network {
        int     ctlrno;
        char    iname[NAMELEN];
        char    oname[NAMELEN];

        char    *dialstr;
        char    anndir[40];
        char    lisdir[40];
        int     annfd;                  /* fd from announce */
};

/* an open tcp (or other transport) connection */
struct Netconn {
        Queue*  reply;          /* network output */
        char*   raddr;          /* remote caller's addr */
        Chan*   chan;           /* list of tcp channels */

        int     alloc;          /* flag: allocated */

        int     state;
        Conn9p* conn9p;         /* not reference-counted */

        Lock;
};

/*
 * incoming 9P network connection from a given machine.
 * typically will multiplex 9P sessions for multiple users.
 */
struct Conn9p {
        QLock;
        Ref;
        int     fd;
        char*   dir;
        Netconn*netconn;                /* cross-connection */
        char*   raddr;
};

static Network netif[Maxnets];
static struct {
        Lock;
        Chan*   chan;
} netchans;
static Queue *netoq;            /* only one network output queue is needed */

char *annstrs[Maxnets] = {
        "tcp!*!9fs",
};

/* never returns nil */
static Chan*
getchan(Conn9p *conn9p)
{
        Netconn *netconn;
        Chan *cp, *xcp;

        lock(&netchans);

        /* look for conn9p's Chan */
        xcp = nil;
        for(cp = netchans.chan; cp; cp = netconn->chan) {
                netconn = cp->pdata;
                if(!netconn->alloc)
                        xcp = cp;               /* remember free Chan */
                else if(netconn->raddr != nil &&
                    strcmp(conn9p->raddr, netconn->raddr) == 0) {
                        unlock(&netchans);
                        return cp;              /* found conn9p's Chan */
                }
        }

        /* conn9p's Chan not found; if no free Chan, allocate & fill in one */
        cp = xcp;
        if(cp == nil) {
                cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
                netconn = cp->pdata;
                netconn->chan = netchans.chan;
                netconn->state = Netopen;       /* a guess */
                /* cross-connect netconn and conn9p */
                netconn->conn9p = conn9p;       /* not reference-counted */
                conn9p->netconn = netconn;
                netchans.chan = cp;
        }

        /* fill in Chan's netconn */
        netconn = cp->pdata;
        netconn->raddr = strdup(conn9p->raddr);

        /* fill in Chan */
        cp->send = serveq;
        if (cp->reply == nil)
                cp->reply = netoq;
        netconn->reply = netoq;
        cp->protocol = nil;
        cp->msize = 0;
        cp->whotime = 0;
        strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
//      cp->whoprint = tcpwhoprint;
        netconn->alloc = 1;

        unlock(&netchans);
        return cp;
}

static char *
fd2name(int fd)
{
        char data[128];

        if (fd2path(fd, data, sizeof data) < 0)
                return strdup("/GOK");
        return strdup(data);
}

static void
hangupdfd(int dfd)
{
        int ctlfd;
        char *end, *data;

        data = fd2name(dfd);
        close(dfd);

        end = strstr(data, "/data");
        if (end != nil)
                strcpy(end, "/ctl");
        ctlfd = open(data, OWRITE);
        if (ctlfd >= 0) {
                hangup(ctlfd);
                close(ctlfd);
        }
        free(data);
}

void
closechan(int n)
{
        Chan *cp;

        for(cp = chans; cp; cp = cp->next)
                if(cp->whotime != 0 && cp->chan == n)
                        fileinit(cp);
}

void
nethangup(Chan *cp, char *msg, int dolock)
{
        Netconn *netconn;

        netconn = cp->pdata;
        netconn->state = Netclosed;

        if(msg != nil)
                print("hangup! %s %s\n", msg, netconn->raddr);

        fileinit(cp);
        cp->whotime = 0;
        strcpy(cp->whoname, "<none>");

        if(dolock)
                lock(&netchans);
        netconn->alloc = 0;
        free(netconn->raddr);
        netconn->raddr = nil;
        if(dolock)
                unlock(&netchans);
}

void
chanhangup(Chan *cp, char *msg, int dolock)
{
        Netconn *netconn = cp->pdata;
        Conn9p *conn9p = netconn->conn9p;

        if (conn9p->fd > 0)
                hangupdfd(conn9p->fd);  /* drop it */
        nethangup(cp, msg, dolock);
}

/*
 * returns length of next 9p message (including the length) and
 * leaves it in the first few bytes of abuf.
 */
static long
size9pmsg(int fd, void *abuf, uint n)
{
        int m;
        uchar *buf = abuf;

        if (n < BIT32SZ)
                return -1;      /* caller screwed up */

        /* read count */
        m = readn(fd, buf, BIT32SZ);
        if(m != BIT32SZ){
                if(m < 0)
                        return -1;
                return 0;
        }
        return GBIT32(buf);
}

static int
readalloc9pmsg(int fd, Msgbuf **mbp)
{
        int m, len;
        uchar lenbuf[BIT32SZ];
        Msgbuf *mb;

        *mbp = nil;
        len = size9pmsg(fd, lenbuf, BIT32SZ);
        if (len <= 0)
                return len;
        if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
                werrstr("bad length in 9P2000 message header");
                return -1;
        }
        if ((mb = mballoc(len, nil, Mbeth1)) == nil)
                panic("readalloc9pmsg: mballoc failed");
        *mbp = mb;
        memmove(mb->data, lenbuf, BIT32SZ);
        len -= BIT32SZ;
        m = readn(fd, mb->data+BIT32SZ, len);
        if(m < len)
                return 0;
        return BIT32SZ+m;
}

static void
connection(void *v)
{
        int n;
        char buf[64];
        Chan *chan9p;
        Conn9p *conn9p = v;
        Msgbuf *mb;
        NetConnInfo *nci;

        incref(conn9p);                 /* count connections */
        nci = getnetconninfo(conn9p->dir, conn9p->fd);
        if (nci == nil)
                panic("connection: getnetconninfo(%s, %d) failed",
                        conn9p->dir, conn9p->fd);
        conn9p->raddr = nci->raddr;

        chan9p = getchan(conn9p);
        print("new connection on %s pid %d from %s\n",
                conn9p->dir, getpid(), conn9p->raddr);

        /*
         * reading from a pipe or a network device
         * will give an error after a few eof reads.
         * however, we cannot tell the difference
         * between a zero-length read and an interrupt
         * on the processes writing to us,
         * so we wait for the error.
         */
        while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
                if(n == 0)
                        continue;
                mb->param = (uintptr)conn9p;    /* has fd for replies */
                mb->chan = chan9p;

                assert(mb->magic == Mbmagic);
                incref(conn9p);                 /* & count packets in flight */
                fs_send(serveq, mb);            /* to 9P server processes */
                /* mb will be freed by receiving process */
        }

        rerrstr(buf, sizeof buf);

        qlock(conn9p);
        print("connection hung up from %s\n", conn9p->dir);
        if (conn9p->fd > 0)             /* not poisoned yet? */
                hangupdfd(conn9p->fd);  /* poison the fd */

        nethangup(chan9p, "remote hung up", 1);
        closechan(chan9p->chan);

        conn9p->fd = -1;                /* poison conn9p */
        if (decref(conn9p) == 0) {      /* last conn.? turn the lights off */
                free(conn9p->dir);
                qunlock(conn9p);
                free(conn9p);
        } else
                qunlock(conn9p);

        freenetconninfo(nci);

        if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
                exits("");
        sysfatal("mount read, pid %d", getpid());
}

static void
neti(void *v)
{
        int lisfd, accfd;
        Network *net;
        Conn9p *conn9p;

        net = v;
        print("net%di\n", net->ctlrno);
        for(;;) {
                lisfd = listen(net->anndir, net->lisdir);
                if (lisfd < 0) {
                        print("listen %s failed: %r\n", net->anndir);
                        continue;
                }

                /* got new call on lisfd */
                accfd = accept(lisfd, net->lisdir);
                if (accfd < 0) {
                        print("accept %d (from %s) failed: %r\n",
                                lisfd, net->lisdir);
                        continue;
                }

                /* accepted that call */
                conn9p = malloc(sizeof *conn9p);
                conn9p->dir = strdup(net->lisdir);
                conn9p->fd = accfd;
                newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
                close(lisfd);
        }
}

/* only need one of these for all network connections, thus all interfaces */
static void
neto(void *)
{
        int len, datafd;
        Msgbuf *mb;
        Conn9p *conn9p;

        print("neto\n");
        for(;;) {
                /* receive 9P answer from 9P server processes */
                while((mb = fs_recv(netoq, 0)) == nil)
                        continue;

                if(mb->data == nil) {
                        print("neto: pkt nil cat=%d free=%d\n",
                                mb->category, mb->flags&FREE);
                        if(!(mb->flags & FREE))
                                mbfree(mb);
                        continue;
                }

                /* send answer back over the network connection in the reply */
                len = mb->count;
                conn9p = (Conn9p *)mb->param;
                assert(conn9p);

                qlock(conn9p);
                datafd = conn9p->fd;
                assert(len >= 0);
                /* datafd < 0 probably indicates poisoning by the read side */
                if (datafd < 0 || write(datafd, mb->data, len) != len) {
                        print( "network write error (%r);");
                        print(" closing connection for %s\n", conn9p->dir);
                        nethangup(getchan(conn9p), "network write error", 1);
                        if (datafd > 0)
                                hangupdfd(datafd);      /* drop it */
                        conn9p->fd = -1;                /* poison conn9p */
                }
                mbfree(mb);
                if (decref(conn9p) == 0)
                        panic("neto: zero ref count");
                qunlock(conn9p);
        }
}

void
netstart(void)
{
        int netorun = 0;
        Network *net;

        if(netoq == nil)
                netoq = newqueue(Nqueue, "network reply");
        for(net = &netif[0]; net < &netif[Maxnets]; net++){
                if(net->dialstr == nil)
                        continue;
                sprint(net->oname, "neto");
                if (netorun++ == 0)
                        newproc(neto, nil, net->oname);
                sprint(net->iname, "net%di", net->ctlrno);
                newproc(neti, net, net->iname);
        }
}

void
netinit(void)
{
        Network *net;

        for (net = netif; net < netif + Maxnets; net++) {
                net->dialstr = annstrs[net - netif];
                if (net->dialstr == nil)
                        continue;
                net->annfd = announce(net->dialstr, net->anndir);
                /* /bin/service/tcp564 may already have grabbed the port */
                if (net->annfd < 0)
                        sysfatal("can't announce %s: %r", net->dialstr);
                print("netinit: announced on %s\n", net->dialstr);
        }
}