Subversion Repositories planix.SVN

Rev

Rev 2 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Archiver.  In charge of sending blocks to Venti.
 */

#include "stdinc.h"
#include "dat.h"
#include "fns.h"
#include "error.h"

#include "9.h"  /* for consPrint */

#define DEBUG 0

static void archThread(void*);

struct Arch
{
        int ref;
        uint blockSize;
        uint diskSize;
        Cache *c;
        Fs *fs;
        VtSession *z;

        VtLock *lk;
        VtRendez *starve;
        VtRendez *die;
};

Arch *
archInit(Cache *c, Disk *disk, Fs *fs, VtSession *z)
{
        Arch *a;

        a = vtMemAllocZ(sizeof(Arch));

        a->c = c;
        a->z = z;
        a->fs = fs;
        a->blockSize = diskBlockSize(disk);
        a->lk = vtLockAlloc();
        a->starve = vtRendezAlloc(a->lk);

        a->ref = 2;
        vtThread(archThread, a);

        return a;
}

void
archFree(Arch *a)
{
        /* kill slave */
        vtLock(a->lk);
        a->die = vtRendezAlloc(a->lk);
        vtWakeup(a->starve);
        while(a->ref > 1)
                vtSleep(a->die);
        vtUnlock(a->lk);
        vtRendezFree(a->starve);
        vtRendezFree(a->die);
        vtLockFree(a->lk);
        vtMemFree(a);
}

static int
ventiSend(Arch *a, Block *b, uchar *data)
{
        uint n;
        uchar score[VtScoreSize];

        if(DEBUG > 1)
                fprint(2, "ventiSend: sending %#ux %L to venti\n", b->addr, &b->l);
        n = vtZeroTruncate(vtType[b->l.type], data, a->blockSize);
        if(DEBUG > 1)
                fprint(2, "ventiSend: truncate %d to %d\n", a->blockSize, n);
        if(!vtWrite(a->z, score, vtType[b->l.type], data, n)){
                fprint(2, "ventiSend: vtWrite block %#ux failed: %R\n", b->addr);
                return 0;
        }
        if(!vtSha1Check(score, data, n)){
                uchar score2[VtScoreSize];
                vtSha1(score2, data, n);
                fprint(2, "ventiSend: vtWrite block %#ux failed vtSha1Check %V %V\n",
                        b->addr, score, score2);
                return 0;
        }
        if(!vtSync(a->z))
                return 0;
        return 1;
}

/*
 * parameters for recursion; there are so many,
 * and some only change occasionally.  this is
 * easier than spelling things out at each call.
 */
typedef struct Param Param;
struct Param
{
        /* these never change */
        uint snapEpoch; /* epoch for snapshot being archived */
        uint blockSize;
        Cache *c;
        Arch *a;

        /* changes on every call */
        uint depth;

        /* statistics */
        uint nfixed;
        uint nsend;
        uint nvisit;
        uint nfailsend;
        uint maxdepth;
        uint nreclaim;
        uint nfake;
        uint nreal;

        /* these occasionally change (must save old values and put back) */
        uint dsize;
        uint psize;

        /* return value; avoids using stack space */
        Label l;
        uchar score[VtScoreSize];
};

static void
shaBlock(uchar score[VtScoreSize], Block *b, uchar *data, uint bsize)
{
        vtSha1(score, data, vtZeroTruncate(vtType[b->l.type], data, bsize));
}

static uint
etype(Entry *e)
{
        uint t;

        if(e->flags&VtEntryDir)
                t = BtDir;
        else
                t = BtData;
        return t+e->depth;
}

static uchar*
copyBlock(Block *b, u32int blockSize)
{
        uchar *data;

        data = vtMemAlloc(blockSize);
        if(data == nil)
                return nil;
        memmove(data, b->data, blockSize);
        return data;
}

/*
 * Walk over the block tree, archiving it to Venti.
 *
 * We don't archive the snapshots. Instead we zero the
 * entries in a temporary copy of the block and archive that.
 *
 * Return value is:
 *
 *      ArchFailure     some error occurred
 *      ArchSuccess     block and all children archived
 *      ArchFaked       success, but block or children got copied
 */
enum
{
        ArchFailure,
        ArchSuccess,
        ArchFaked,
};
static int
archWalk(Param *p, u32int addr, uchar type, u32int tag)
{
        int ret, i, x, psize, dsize;
        uchar *data, score[VtScoreSize];
        Block *b;
        Label l;
        Entry *e;
        WalkPtr w;

        p->nvisit++;

        b = cacheLocalData(p->c, addr, type, tag, OReadWrite,0);
        if(b == nil){
                fprint(2, "archive(%ud, %#ux): cannot find block: %R\n", p->snapEpoch, addr);
                if(strcmp(vtGetError(), ELabelMismatch) == 0){
                        /* might as well plod on so we write _something_ to Venti */
                        memmove(p->score, vtZeroScore, VtScoreSize);
                        return ArchFaked;
                }
                return ArchFailure;
        }

        if(DEBUG) fprint(2, "%*sarchive(%ud, %#ux): block label %L\n",
                p->depth*2, "",  p->snapEpoch, b->addr, &b->l);
        p->depth++;
        if(p->depth > p->maxdepth)
                p->maxdepth = p->depth;

        data = b->data;
        if((b->l.state&BsVenti) == 0){
                initWalk(&w, b, b->l.type==BtDir ? p->dsize : p->psize);
                for(i=0; nextWalk(&w, score, &type, &tag, &e); i++){
                        if(e){
                                if(!(e->flags&VtEntryActive))
                                        continue;
                                if((e->snap && !e->archive)
                                || (e->flags&VtEntryNoArchive)){
                                        if(0) fprint(2, "snap; faking %#ux\n", b->addr);
                                        if(data == b->data){
                                                data = copyBlock(b, p->blockSize);
                                                if(data == nil){
                                                        ret = ArchFailure;
                                                        goto Out;
                                                }
                                                w.data = data;
                                        }
                                        memmove(e->score, vtZeroScore, VtScoreSize);
                                        e->depth = 0;
                                        e->size = 0;
                                        e->tag = 0;
                                        e->flags &= ~VtEntryLocal;
                                        entryPack(e, data, w.n-1);
                                        continue;
                                }
                        }
                        addr = globalToLocal(score);
                        if(addr == NilBlock)
                                continue;
                        dsize = p->dsize;
                        psize = p->psize;
                        if(e){
                                p->dsize= e->dsize;
                                p->psize = e->psize;
                        }
                        vtUnlock(b->lk);
                        x = archWalk(p, addr, type, tag);
                        vtLock(b->lk);
                        if(e){
                                p->dsize = dsize;
                                p->psize = psize;
                        }
                        while(b->iostate != BioClean && b->iostate != BioDirty)
                                vtSleep(b->ioready);
                        switch(x){
                        case ArchFailure:
                                fprint(2, "archWalk %#ux failed; ptr is in %#ux offset %d\n",
                                        addr, b->addr, i);
                                ret = ArchFailure;
                                goto Out;
                        case ArchFaked:
                                /*
                                 * When we're writing the entry for an archive directory
                                 * (like /archive/2003/1215) then even if we've faked
                                 * any data, record the score unconditionally.
                                 * This way, we will always record the Venti score here.
                                 * Otherwise, temporary data or corrupted file system
                                 * would cause us to keep holding onto the on-disk
                                 * copy of the archive.
                                 */
                                if(e==nil || !e->archive)
                                if(data == b->data){
if(0) fprint(2, "faked %#ux, faking %#ux (%V)\n", addr, b->addr, p->score);
                                        data = copyBlock(b, p->blockSize);
                                        if(data == nil){
                                                ret = ArchFailure;
                                                goto Out;
                                        }
                                        w.data = data;
                                }
                                /* fall through */
if(0) fprint(2, "falling\n");
                        case ArchSuccess:
                                if(e){
                                        memmove(e->score, p->score, VtScoreSize);
                                        e->flags &= ~VtEntryLocal;
                                        entryPack(e, data, w.n-1);
                                }else
                                        memmove(data+(w.n-1)*VtScoreSize, p->score, VtScoreSize);
                                if(data == b->data){
                                        blockDirty(b);
                                        /*
                                         * If b is in the active tree, then we need to note that we've
                                         * just removed addr from the active tree (replacing it with the 
                                         * copy we just stored to Venti).  If addr is in other snapshots,
                                         * this will close addr but not free it, since it has a non-empty
                                         * epoch range.
                                         *
                                         * If b is in the active tree but has been copied (this can happen
                                         * if we get killed at just the right moment), then we will
                                         * mistakenly leak its kids.  
                                         *
                                         * The children of an archive directory (e.g., /archive/2004/0604)
                                         * are not treated as in the active tree.
                                         */
                                        if((b->l.state&BsCopied)==0 && (e==nil || e->snap==0))
                                                blockRemoveLink(b, addr, p->l.type, p->l.tag, 0);
                                }
                                break;
                        }
                }

                if(!ventiSend(p->a, b, data)){
                        p->nfailsend++;
                        ret = ArchFailure;
                        goto Out;
                }
                p->nsend++;
                if(data != b->data)
                        p->nfake++;
                if(data == b->data){    /* not faking it, so update state */
                        p->nreal++;
                        l = b->l;
                        l.state |= BsVenti;
                        if(!blockSetLabel(b, &l, 0)){
                                ret = ArchFailure;
                                goto Out;
                        }
                }
        }

        shaBlock(p->score, b, data, p->blockSize);
if(0) fprint(2, "ventisend %V %p %p %p\n", p->score, data, b->data, w.data);
        ret = data!=b->data ? ArchFaked : ArchSuccess;
        p->l = b->l;
Out:
        if(data != b->data)
                vtMemFree(data);
        p->depth--;
        blockPut(b);
        return ret;
}

static void
archThread(void *v)
{
        Arch *a = v;
        Block *b;
        Param p;
        Super super;
        int ret;
        u32int addr;
        uchar rbuf[VtRootSize];
        VtRoot root;

        vtThreadSetName("arch");

        for(;;){
                /* look for work */
                vtLock(a->fs->elk);
                b = superGet(a->c, &super);
                if(b == nil){
                        vtUnlock(a->fs->elk);
                        fprint(2, "archThread: superGet: %R\n");
                        sleep(60*1000);
                        continue;
                }
                addr = super.next;
                if(addr != NilBlock && super.current == NilBlock){
                        super.current = addr;
                        super.next = NilBlock;
                        superPack(&super, b->data);
                        blockDirty(b);
                }else
                        addr = super.current;
                blockPut(b);
                vtUnlock(a->fs->elk);

                if(addr == NilBlock){
                        /* wait for work */
                        vtLock(a->lk);
                        vtSleep(a->starve);
                        if(a->die != nil)
                                goto Done;
                        vtUnlock(a->lk);
                        continue;
                }

sleep(10*1000); /* window of opportunity to provoke races */

                /* do work */
                memset(&p, 0, sizeof p);
                p.blockSize = a->blockSize;
                p.dsize = 3*VtEntrySize;        /* root has three Entries */
                p.c = a->c;
                p.a = a;

                ret = archWalk(&p, addr, BtDir, RootTag);
                switch(ret){
                default:
                        abort();
                case ArchFailure:
                        fprint(2, "archiveBlock %#ux: %R\n", addr);
                        sleep(60*1000);
                        continue;
                case ArchSuccess:
                case ArchFaked:
                        break;
                }

                if(0) fprint(2, "archiveSnapshot 0x%#ux: maxdepth %ud nfixed %ud"
                        " send %ud nfailsend %ud nvisit %ud"
                        " nreclaim %ud nfake %ud nreal %ud\n",
                        addr, p.maxdepth, p.nfixed,
                        p.nsend, p.nfailsend, p.nvisit,
                        p.nreclaim, p.nfake, p.nreal);
                if(0) fprint(2, "archiveBlock %V (%ud)\n", p.score, p.blockSize);

                /* tie up vac root */
                memset(&root, 0, sizeof root);
                root.version = VtRootVersion;
                strecpy(root.type, root.type+sizeof root.type, "vac");
                strecpy(root.name, root.name+sizeof root.name, "fossil");
                memmove(root.score, p.score, VtScoreSize);
                memmove(root.prev, super.last, VtScoreSize);
                root.blockSize = a->blockSize;
                vtRootPack(&root, rbuf);
                if(!vtWrite(a->z, p.score, VtRootType, rbuf, VtRootSize)
                || !vtSha1Check(p.score, rbuf, VtRootSize)){
                        fprint(2, "vtWriteBlock %#ux: %R\n", addr);
                        sleep(60*1000);
                        continue;
                }

                /* record success */
                vtLock(a->fs->elk);
                b = superGet(a->c, &super);
                if(b == nil){
                        vtUnlock(a->fs->elk);
                        fprint(2, "archThread: superGet: %R\n");
                        sleep(60*1000);
                        continue;
                }
                super.current = NilBlock;
                memmove(super.last, p.score, VtScoreSize);
                superPack(&super, b->data);
                blockDirty(b);
                blockPut(b);
                vtUnlock(a->fs->elk);

                consPrint("archive vac:%V\n", p.score);
        }

Done:
        a->ref--;
        vtWakeup(a->die);
        vtUnlock(a->lk);
}

void
archKick(Arch *a)
{
        if(a == nil){
                fprint(2, "warning: archKick nil\n");
                return;
        }
        vtLock(a->lk);
        vtWakeup(a->starve);
        vtUnlock(a->lk);
}