Subversion Repositories planix.SVN

Rev

Rev 2 | Blame | Compare with Previous | Last modification | View Log | RSS feed

#include "stdinc.h"
#include "dat.h"
#include "fns.h"

typedef struct LumpQueue        LumpQueue;
typedef struct WLump            WLump;

enum
{
        MaxLumpQ        = 1 << 3        /* max. lumps on a single write queue, must be pow 2 */
};

struct WLump
{
        Lump    *u;
        Packet  *p;
        int     creator;
        int     gen;
        uint    ms;
};

struct LumpQueue
{
        QLock   lock;
        Rendez  flush;
        Rendez  full;
        Rendez  empty;
        WLump   q[MaxLumpQ];
        int     w;
        int     r;
};

static LumpQueue        *lumpqs;
static int              nqs;

static QLock            glk;
static int              gen;

static void     queueproc(void *vq);

int
initlumpqueues(int nq)
{
        LumpQueue *q;

        int i;
        nqs = nq;

        lumpqs = MKNZ(LumpQueue, nq);

        for(i = 0; i < nq; i++){
                q = &lumpqs[i];
                q->full.l = &q->lock;
                q->empty.l = &q->lock;
                q->flush.l = &q->lock;

                if(vtproc(queueproc, q) < 0){
                        seterr(EOk, "can't start write queue slave: %r");
                        return -1;
                }
        }

        return 0;
}

/*
 * queue a lump & it's packet data for writing
 */
int
queuewrite(Lump *u, Packet *p, int creator, uint ms)
{
        LumpQueue *q;
        int i;

        trace(TraceProc, "queuewrite");
        i = indexsect(mainindex, u->score);
        if(i < 0 || i >= nqs){
                seterr(EBug, "internal error: illegal index section in queuewrite");
                return -1;
        }

        q = &lumpqs[i];

        qlock(&q->lock);
        while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
                trace(TraceProc, "queuewrite sleep");
                rsleep(&q->full);
        }

        q->q[q->w].u = u;
        q->q[q->w].p = p;
        q->q[q->w].creator = creator;
        q->q[q->w].ms = ms;
        q->q[q->w].gen = gen;
        q->w = (q->w + 1) & (MaxLumpQ - 1);

        trace(TraceProc, "queuewrite wakeup");
        rwakeup(&q->empty);

        qunlock(&q->lock);

        return 0;
}

void
flushqueue(void)
{
        int i;
        LumpQueue *q;

        if(!lumpqs)
                return;

        trace(TraceProc, "flushqueue");

        qlock(&glk);
        gen++;
        qunlock(&glk);

        for(i=0; i<mainindex->nsects; i++){
                q = &lumpqs[i];
                qlock(&q->lock);
                while(q->w != q->r && gen - q->q[q->r].gen > 0){
                        trace(TraceProc, "flushqueue sleep q%d", i);
                        rsleep(&q->flush);
                }
                qunlock(&q->lock);
        }
}
        
static void
queueproc(void *vq)
{
        LumpQueue *q;
        Lump *u;
        Packet *p;
        int creator;
        uint ms;

        threadsetname("queueproc");

        q = vq;
        for(;;){
                qlock(&q->lock);
                while(q->w == q->r){
                        trace(TraceProc, "queueproc sleep empty");
                        rsleep(&q->empty);
                }

                u = q->q[q->r].u;
                p = q->q[q->r].p;
                creator = q->q[q->r].creator;
                ms = q->q[q->r].ms;

                q->r = (q->r + 1) & (MaxLumpQ - 1);
                trace(TraceProc, "queueproc wakeup flush");
                rwakeupall(&q->flush);

                trace(TraceProc, "queueproc wakeup full");
                rwakeup(&q->full);

                qunlock(&q->lock);

                trace(TraceProc, "queueproc writelump %V", u->score);
                if(writeqlump(u, p, creator, ms) < 0)
                        fprint(2, "failed to write lump for %V: %r", u->score);
                trace(TraceProc, "queueproc wrotelump %V", u->score);

                putlump(u);
        }
}