Subversion Repositories planix.SVN

Rev

Rev 2 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Multiplexed Venti client.  It would be nice if we 
 * could turn this into a generic library routine rather
 * than keep it Venti specific.  A user-level 9P client
 * could use something like this too.
 * 
 * (Actually it does - this should be replaced with libmux,
 * which should be renamed librpcmux.)
 *
 * This is a little more complicated than it might be
 * because we want it to work well within and without libthread.
 *
 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
 */

#include <u.h>
#include <libc.h>
#include <venti.h>

typedef struct Rwait Rwait;
struct Rwait
{
        Rendez r;
        Packet *p;
        int done;
        int sleeping;
};

static int gettag(VtConn*, Rwait*);
static void puttag(VtConn*, Rwait*, int);
static void muxrpc(VtConn*, Packet*);

Packet*
_vtrpc(VtConn *z, Packet *p, VtFcall *tx)
{
        int i;
        uchar tag, buf[2], *top;
        Rwait *r, *rr;

        if(z == nil){
                werrstr("not connected");
                packetfree(p);
                return nil;
        }

        /* must malloc because stack could be private */
        r = vtmallocz(sizeof(Rwait));

        qlock(&z->lk);
        r->r.l = &z->lk;
        tag = gettag(z, r);
        if(tx){
                /* vtfcallrpc can't print packet because it doesn't have tag */
                tx->tag = tag;
                if(chattyventi)
                        fprint(2, "%s -> %F\n", argv0, tx);
        }

        /* slam tag into packet */
        top = packetpeek(p, buf, 0, 2);
        if(top == nil){
                packetfree(p);
                return nil;
        }
        if(top == buf){
                werrstr("first two bytes must be in same packet fragment");
                packetfree(p);
                vtfree(r);
                return nil;
        }
        top[1] = tag;
        qunlock(&z->lk);
        if(vtsend(z, p) < 0){
                vtfree(r);
                return nil;
        }

        qlock(&z->lk);
        /* wait for the muxer to give us our packet */
        r->sleeping = 1;
        z->nsleep++;
        while(z->muxer && !r->done)
                rsleep(&r->r);
        z->nsleep--;
        r->sleeping = 0;

        /* if not done, there's no muxer: start muxing */
        if(!r->done){
                if(z->muxer)
                        abort();
                z->muxer = 1;
                while(!r->done){
                        qunlock(&z->lk);
                        if((p = vtrecv(z)) == nil){
                                werrstr("unexpected eof on venti connection");
                                z->muxer = 0;
                                vtfree(r);
                                return nil;
                        }
                        qlock(&z->lk);
                        muxrpc(z, p);
                }
                z->muxer = 0;
                /* if there is anyone else sleeping, wake first unfinished to mux */
                if(z->nsleep)
                for(i=0; i<256; i++){
                        rr = z->wait[i];
                        if(rr && rr->sleeping && !rr->done){
                                rwakeup(&rr->r);
                                break;
                        }
                }
        }

        p = r->p;
        puttag(z, r, tag);
        vtfree(r);
        qunlock(&z->lk);
        return p;
}

Packet*
vtrpc(VtConn *z, Packet *p)
{
        return _vtrpc(z, p, nil);
}

static int 
gettag(VtConn *z, Rwait *r)
{
        int i;

Again:
        while(z->ntag == 256)
                rsleep(&z->tagrend);
        for(i=0; i<256; i++)
                if(z->wait[i] == 0){
                        z->ntag++;
                        z->wait[i] = r;
                        return i;
                }
        fprint(2, "libventi: ntag botch\n");
        goto Again;
}

static void
puttag(VtConn *z, Rwait *r, int tag)
{
        assert(z->wait[tag] == r);
        z->wait[tag] = nil;
        z->ntag--;
        rwakeup(&z->tagrend);
}

static void
muxrpc(VtConn *z, Packet *p)
{
        uchar tag, buf[2], *top;
        Rwait *r;

        if((top = packetpeek(p, buf, 0, 2)) == nil){
                fprint(2, "libventi: short packet in vtrpc\n");
                packetfree(p);
                return;
        }

        tag = top[1];
        if((r = z->wait[tag]) == nil){
                fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
abort();
                packetfree(p);
                return;
        }

        r->p = p;
        r->done = 1;
        rwakeup(&r->r);
}