Subversion Repositories planix.SVN

Rev

Blame | Last modification | View Log | RSS feed

#include <u.h>
#include <libc.h>
#include <venti.h>
#include "queue.h"

long ventisendbytes, ventisendpackets;
long ventirecvbytes, ventirecvpackets;

static int
_vtsend(VtConn *z, Packet *p)
{
        IOchunk ioc;
        int n, tot;
        uchar buf[2];

        if(z->state != VtStateConnected) {
                werrstr("session not connected");
                return -1;
        }

        /* add framing */
        n = packetsize(p);
        if(n >= (1<<16)) {
                werrstr("packet too large");
                packetfree(p);
                return -1;
        }
        buf[0] = n>>8;
        buf[1] = n;
        packetprefix(p, buf, 2);
        ventisendbytes += n+2;
        ventisendpackets++;

        tot = 0;
        for(;;){
                n = packetfragments(p, &ioc, 1, 0);
                if(n == 0)
                        break;
                if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
                        vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
                        packetfree(p);
                        return -1;
                }
                packetconsume(p, nil, ioc.len);
                tot += ioc.len;
        }
        vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
        packetfree(p);
        return 1;
}

static int
interrupted(void)
{
        char e[ERRMAX];

        rerrstr(e, sizeof e);
        return strstr(e, "interrupted") != nil;
}


static Packet*
_vtrecv(VtConn *z)
{
        uchar buf[10], *b;
        int n;
        Packet *p;
        int size, len;

        if(z->state != VtStateConnected) {
                werrstr("session not connected");
                return nil;
        }

        p = z->part;
        /* get enough for head size */
        size = packetsize(p);
        while(size < 2) {
                b = packettrailer(p, 2);
                assert(b != nil);
                if(0) fprint(2, "%d read hdr\n", getpid());
                n = read(z->infd, b, 2);
                if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
                if(n==0 || (n<0 && !interrupted()))
                        goto Err;
                size += n;
                packettrim(p, 0, size);
        }

        if(packetconsume(p, buf, 2) < 0)
                goto Err;
        len = (buf[0] << 8) | buf[1];
        size -= 2;

        while(size < len) {
                n = len - size;
                if(n > MaxFragSize)
                        n = MaxFragSize;
                b = packettrailer(p, n);
                if(0) fprint(2, "%d read body %d\n", getpid(), n);
                n = read(z->infd, b, n);
                if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
                if(n > 0)
                        size += n;
                packettrim(p, 0, size);
                if(n==0 || (n<0 && !interrupted()))
                        goto Err;
        }
        ventirecvbytes += len;
        ventirecvpackets++;
        p = packetsplit(p, len);
        vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
        return p;
Err:    
        vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
        return nil;     
}

/*
 * If you fork off two procs running vtrecvproc and vtsendproc,
 * then vtrecv/vtsend (and thus vtrpc) will never block except on 
 * rendevouses, which is nice when it's running in one thread of many.
 */
void
vtrecvproc(void *v)
{
        Packet *p;
        VtConn *z;
        Queue *q;

        z = v;
        q = _vtqalloc();

        qlock(&z->lk);
        z->readq = q;
        qlock(&z->inlk);
        rwakeup(&z->rpcfork);
        qunlock(&z->lk);

        while((p = _vtrecv(z)) != nil)
                if(_vtqsend(q, p) < 0){
                        packetfree(p);
                        break;
                }
        qunlock(&z->inlk);
        qlock(&z->lk);
        _vtqhangup(q);
        while((p = _vtnbqrecv(q)) != nil)
                packetfree(p);
        _vtqdecref(q);
        z->readq = nil;
        rwakeup(&z->rpcfork);
        qunlock(&z->lk);
        vthangup(z);
}

void
vtsendproc(void *v)
{
        Queue *q;
        Packet *p;
        VtConn *z;

        z = v;
        q = _vtqalloc();

        qlock(&z->lk);
        z->writeq = q;
        qlock(&z->outlk);
        rwakeup(&z->rpcfork);
        qunlock(&z->lk);

        while((p = _vtqrecv(q)) != nil)
                if(_vtsend(z, p) < 0)
                        break;
        qunlock(&z->outlk);
        qlock(&z->lk);
        _vtqhangup(q);
        while((p = _vtnbqrecv(q)) != nil)
                packetfree(p);
        _vtqdecref(q);
        z->writeq = nil;
        rwakeup(&z->rpcfork);
        qunlock(&z->lk);
        return;
}

Packet*
vtrecv(VtConn *z)
{
        Packet *p;
        Queue *q;

        qlock(&z->lk);
        if(z->state != VtStateConnected){
                werrstr("not connected");
                qunlock(&z->lk);
                return nil;
        }
        if(z->readq){
                q = _vtqincref(z->readq);
                qunlock(&z->lk);
                p = _vtqrecv(q);
                _vtqdecref(q);
                return p;
        }

        qlock(&z->inlk);
        qunlock(&z->lk);
        p = _vtrecv(z);
        qunlock(&z->inlk);
        if(!p)
                vthangup(z);
        return p;
}

int
vtsend(VtConn *z, Packet *p)
{
        Queue *q;

        qlock(&z->lk);
        if(z->state != VtStateConnected){
                packetfree(p);
                werrstr("not connected");
                qunlock(&z->lk);
                return -1;
        }
        if(z->writeq){
                q = _vtqincref(z->writeq);
                qunlock(&z->lk);
                if(_vtqsend(q, p) < 0){
                        _vtqdecref(q);
                        packetfree(p);
                        return -1;
                }
                _vtqdecref(q);
                return 0;
        }

        qlock(&z->outlk);
        qunlock(&z->lk);
        if(_vtsend(z, p) < 0){
                qunlock(&z->outlk);
                vthangup(z);
                return -1;      
        }
        qunlock(&z->outlk);
        return 0;
}