Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 * Archiver.  In charge of sending blocks to Venti.
3
 */
4
 
5
#include "stdinc.h"
6
#include "dat.h"
7
#include "fns.h"
8
#include "error.h"
9
 
10
#include "9.h"	/* for consPrint */
11
 
12
#define DEBUG 0
13
 
14
static void archThread(void*);
15
 
16
struct Arch
17
{
18
	int ref;
19
	uint blockSize;
20
	uint diskSize;
21
	Cache *c;
22
	Fs *fs;
23
	VtSession *z;
24
 
25
	VtLock *lk;
26
	VtRendez *starve;
27
	VtRendez *die;
28
};
29
 
30
Arch *
31
archInit(Cache *c, Disk *disk, Fs *fs, VtSession *z)
32
{
33
	Arch *a;
34
 
35
	a = vtMemAllocZ(sizeof(Arch));
36
 
37
	a->c = c;
38
	a->z = z;
39
	a->fs = fs;
40
	a->blockSize = diskBlockSize(disk);
41
	a->lk = vtLockAlloc();
42
	a->starve = vtRendezAlloc(a->lk);
43
 
44
	a->ref = 2;
45
	vtThread(archThread, a);
46
 
47
	return a;
48
}
49
 
50
void
51
archFree(Arch *a)
52
{
53
	/* kill slave */
54
	vtLock(a->lk);
55
	a->die = vtRendezAlloc(a->lk);
56
	vtWakeup(a->starve);
57
	while(a->ref > 1)
58
		vtSleep(a->die);
59
	vtUnlock(a->lk);
60
	vtRendezFree(a->starve);
61
	vtRendezFree(a->die);
62
	vtLockFree(a->lk);
63
	vtMemFree(a);
64
}
65
 
66
static int
67
ventiSend(Arch *a, Block *b, uchar *data)
68
{
69
	uint n;
70
	uchar score[VtScoreSize];
71
 
72
	if(DEBUG > 1)
73
		fprint(2, "ventiSend: sending %#ux %L to venti\n", b->addr, &b->l);
74
	n = vtZeroTruncate(vtType[b->l.type], data, a->blockSize);
75
	if(DEBUG > 1)
76
		fprint(2, "ventiSend: truncate %d to %d\n", a->blockSize, n);
77
	if(!vtWrite(a->z, score, vtType[b->l.type], data, n)){
78
		fprint(2, "ventiSend: vtWrite block %#ux failed: %R\n", b->addr);
79
		return 0;
80
	}
81
	if(!vtSha1Check(score, data, n)){
82
		uchar score2[VtScoreSize];
83
		vtSha1(score2, data, n);
84
		fprint(2, "ventiSend: vtWrite block %#ux failed vtSha1Check %V %V\n",
85
			b->addr, score, score2);
86
		return 0;
87
	}
88
	if(!vtSync(a->z))
89
		return 0;
90
	return 1;
91
}
92
 
93
/*
94
 * parameters for recursion; there are so many,
95
 * and some only change occasionally.  this is
96
 * easier than spelling things out at each call.
97
 */
98
typedef struct Param Param;
99
struct Param
100
{
101
	/* these never change */
102
	uint snapEpoch;	/* epoch for snapshot being archived */
103
	uint blockSize;
104
	Cache *c;
105
	Arch *a;
106
 
107
	/* changes on every call */
108
	uint depth;
109
 
110
	/* statistics */
111
	uint nfixed;
112
	uint nsend;
113
	uint nvisit;
114
	uint nfailsend;
115
	uint maxdepth;
116
	uint nreclaim;
117
	uint nfake;
118
	uint nreal;
119
 
120
	/* these occasionally change (must save old values and put back) */
121
	uint dsize;
122
	uint psize;
123
 
124
	/* return value; avoids using stack space */
125
	Label l;
126
	uchar score[VtScoreSize];
127
};
128
 
129
static void
130
shaBlock(uchar score[VtScoreSize], Block *b, uchar *data, uint bsize)
131
{
132
	vtSha1(score, data, vtZeroTruncate(vtType[b->l.type], data, bsize));
133
}
134
 
135
static uint
136
etype(Entry *e)
137
{
138
	uint t;
139
 
140
	if(e->flags&VtEntryDir)
141
		t = BtDir;
142
	else
143
		t = BtData;
144
	return t+e->depth;
145
}
146
 
147
static uchar*
148
copyBlock(Block *b, u32int blockSize)
149
{
150
	uchar *data;
151
 
152
	data = vtMemAlloc(blockSize);
153
	if(data == nil)
154
		return nil;
155
	memmove(data, b->data, blockSize);
156
	return data;
157
}
158
 
159
/*
160
 * Walk over the block tree, archiving it to Venti.
161
 *
162
 * We don't archive the snapshots. Instead we zero the
163
 * entries in a temporary copy of the block and archive that.
164
 *
165
 * Return value is:
166
 *
167
 *	ArchFailure	some error occurred
168
 *	ArchSuccess	block and all children archived
169
 * 	ArchFaked	success, but block or children got copied
170
 */
171
enum
172
{
173
	ArchFailure,
174
	ArchSuccess,
175
	ArchFaked,
176
};
177
static int
178
archWalk(Param *p, u32int addr, uchar type, u32int tag)
179
{
180
	int ret, i, x, psize, dsize;
181
	uchar *data, score[VtScoreSize];
182
	Block *b;
183
	Label l;
184
	Entry *e;
185
	WalkPtr w;
186
 
187
	p->nvisit++;
188
 
189
	b = cacheLocalData(p->c, addr, type, tag, OReadWrite,0);
190
	if(b == nil){
191
		fprint(2, "archive(%ud, %#ux): cannot find block: %R\n", p->snapEpoch, addr);
192
		if(strcmp(vtGetError(), ELabelMismatch) == 0){
193
			/* might as well plod on so we write _something_ to Venti */
194
			memmove(p->score, vtZeroScore, VtScoreSize);
195
			return ArchFaked;
196
		}
197
		return ArchFailure;
198
	}
199
 
200
	if(DEBUG) fprint(2, "%*sarchive(%ud, %#ux): block label %L\n",
201
		p->depth*2, "",  p->snapEpoch, b->addr, &b->l);
202
	p->depth++;
203
	if(p->depth > p->maxdepth)
204
		p->maxdepth = p->depth;
205
 
206
	data = b->data;
207
	if((b->l.state&BsVenti) == 0){
208
		initWalk(&w, b, b->l.type==BtDir ? p->dsize : p->psize);
209
		for(i=0; nextWalk(&w, score, &type, &tag, &e); i++){
210
			if(e){
211
				if(!(e->flags&VtEntryActive))
212
					continue;
213
				if((e->snap && !e->archive)
214
				|| (e->flags&VtEntryNoArchive)){
215
					if(0) fprint(2, "snap; faking %#ux\n", b->addr);
216
					if(data == b->data){
217
						data = copyBlock(b, p->blockSize);
218
						if(data == nil){
219
							ret = ArchFailure;
220
							goto Out;
221
						}
222
						w.data = data;
223
					}
224
					memmove(e->score, vtZeroScore, VtScoreSize);
225
					e->depth = 0;
226
					e->size = 0;
227
					e->tag = 0;
228
					e->flags &= ~VtEntryLocal;
229
					entryPack(e, data, w.n-1);
230
					continue;
231
				}
232
			}
233
			addr = globalToLocal(score);
234
			if(addr == NilBlock)
235
				continue;
236
			dsize = p->dsize;
237
			psize = p->psize;
238
			if(e){
239
				p->dsize= e->dsize;
240
				p->psize = e->psize;
241
			}
242
			vtUnlock(b->lk);
243
			x = archWalk(p, addr, type, tag);
244
			vtLock(b->lk);
245
			if(e){
246
				p->dsize = dsize;
247
				p->psize = psize;
248
			}
249
			while(b->iostate != BioClean && b->iostate != BioDirty)
250
				vtSleep(b->ioready);
251
			switch(x){
252
			case ArchFailure:
253
				fprint(2, "archWalk %#ux failed; ptr is in %#ux offset %d\n",
254
					addr, b->addr, i);
255
				ret = ArchFailure;
256
				goto Out;
257
			case ArchFaked:
258
				/*
259
				 * When we're writing the entry for an archive directory
260
				 * (like /archive/2003/1215) then even if we've faked
261
				 * any data, record the score unconditionally.
262
				 * This way, we will always record the Venti score here.
263
				 * Otherwise, temporary data or corrupted file system
264
				 * would cause us to keep holding onto the on-disk
265
				 * copy of the archive.
266
				 */
267
				if(e==nil || !e->archive)
268
				if(data == b->data){
269
if(0) fprint(2, "faked %#ux, faking %#ux (%V)\n", addr, b->addr, p->score);
270
					data = copyBlock(b, p->blockSize);
271
					if(data == nil){
272
						ret = ArchFailure;
273
						goto Out;
274
					}
275
					w.data = data;
276
				}
277
				/* fall through */
278
if(0) fprint(2, "falling\n");
279
			case ArchSuccess:
280
				if(e){
281
					memmove(e->score, p->score, VtScoreSize);
282
					e->flags &= ~VtEntryLocal;
283
					entryPack(e, data, w.n-1);
284
				}else
285
					memmove(data+(w.n-1)*VtScoreSize, p->score, VtScoreSize);
286
				if(data == b->data){
287
					blockDirty(b);
288
					/*
289
					 * If b is in the active tree, then we need to note that we've
290
					 * just removed addr from the active tree (replacing it with the 
291
					 * copy we just stored to Venti).  If addr is in other snapshots,
292
					 * this will close addr but not free it, since it has a non-empty
293
					 * epoch range.
294
					 *
295
					 * If b is in the active tree but has been copied (this can happen
296
					 * if we get killed at just the right moment), then we will
297
					 * mistakenly leak its kids.  
298
					 *
299
					 * The children of an archive directory (e.g., /archive/2004/0604)
300
					 * are not treated as in the active tree.
301
					 */
302
					if((b->l.state&BsCopied)==0 && (e==nil || e->snap==0))
303
						blockRemoveLink(b, addr, p->l.type, p->l.tag, 0);
304
				}
305
				break;
306
			}
307
		}
308
 
309
		if(!ventiSend(p->a, b, data)){
310
			p->nfailsend++;
311
			ret = ArchFailure;
312
			goto Out;
313
		}
314
		p->nsend++;
315
		if(data != b->data)
316
			p->nfake++;
317
		if(data == b->data){	/* not faking it, so update state */
318
			p->nreal++;
319
			l = b->l;
320
			l.state |= BsVenti;
321
			if(!blockSetLabel(b, &l, 0)){
322
				ret = ArchFailure;
323
				goto Out;
324
			}
325
		}
326
	}
327
 
328
	shaBlock(p->score, b, data, p->blockSize);
329
if(0) fprint(2, "ventisend %V %p %p %p\n", p->score, data, b->data, w.data);
330
	ret = data!=b->data ? ArchFaked : ArchSuccess;
331
	p->l = b->l;
332
Out:
333
	if(data != b->data)
334
		vtMemFree(data);
335
	p->depth--;
336
	blockPut(b);
337
	return ret;
338
}
339
 
340
static void
341
archThread(void *v)
342
{
343
	Arch *a = v;
344
	Block *b;
345
	Param p;
346
	Super super;
347
	int ret;
348
	u32int addr;
349
	uchar rbuf[VtRootSize];
350
	VtRoot root;
351
 
352
	vtThreadSetName("arch");
353
 
354
	for(;;){
355
		/* look for work */
356
		vtLock(a->fs->elk);
357
		b = superGet(a->c, &super);
358
		if(b == nil){
359
			vtUnlock(a->fs->elk);
360
			fprint(2, "archThread: superGet: %R\n");
361
			sleep(60*1000);
362
			continue;
363
		}
364
		addr = super.next;
365
		if(addr != NilBlock && super.current == NilBlock){
366
			super.current = addr;
367
			super.next = NilBlock;
368
			superPack(&super, b->data);
369
			blockDirty(b);
370
		}else
371
			addr = super.current;
372
		blockPut(b);
373
		vtUnlock(a->fs->elk);
374
 
375
		if(addr == NilBlock){
376
			/* wait for work */
377
			vtLock(a->lk);
378
			vtSleep(a->starve);
379
			if(a->die != nil)
380
				goto Done;
381
			vtUnlock(a->lk);
382
			continue;
383
		}
384
 
385
sleep(10*1000);	/* window of opportunity to provoke races */
386
 
387
		/* do work */
388
		memset(&p, 0, sizeof p);
389
		p.blockSize = a->blockSize;
390
		p.dsize = 3*VtEntrySize;	/* root has three Entries */
391
		p.c = a->c;
392
		p.a = a;
393
 
394
		ret = archWalk(&p, addr, BtDir, RootTag);
395
		switch(ret){
396
		default:
397
			abort();
398
		case ArchFailure:
399
			fprint(2, "archiveBlock %#ux: %R\n", addr);
400
			sleep(60*1000);
401
			continue;
402
		case ArchSuccess:
403
		case ArchFaked:
404
			break;
405
		}
406
 
407
		if(0) fprint(2, "archiveSnapshot 0x%#ux: maxdepth %ud nfixed %ud"
408
			" send %ud nfailsend %ud nvisit %ud"
409
			" nreclaim %ud nfake %ud nreal %ud\n",
410
			addr, p.maxdepth, p.nfixed,
411
			p.nsend, p.nfailsend, p.nvisit,
412
			p.nreclaim, p.nfake, p.nreal);
413
		if(0) fprint(2, "archiveBlock %V (%ud)\n", p.score, p.blockSize);
414
 
415
		/* tie up vac root */
416
		memset(&root, 0, sizeof root);
417
		root.version = VtRootVersion;
418
		strecpy(root.type, root.type+sizeof root.type, "vac");
419
		strecpy(root.name, root.name+sizeof root.name, "fossil");
420
		memmove(root.score, p.score, VtScoreSize);
421
		memmove(root.prev, super.last, VtScoreSize);
422
		root.blockSize = a->blockSize;
423
		vtRootPack(&root, rbuf);
424
		if(!vtWrite(a->z, p.score, VtRootType, rbuf, VtRootSize)
425
		|| !vtSha1Check(p.score, rbuf, VtRootSize)){
426
			fprint(2, "vtWriteBlock %#ux: %R\n", addr);
427
			sleep(60*1000);
428
			continue;
429
		}
430
 
431
		/* record success */
432
		vtLock(a->fs->elk);
433
		b = superGet(a->c, &super);
434
		if(b == nil){
435
			vtUnlock(a->fs->elk);
436
			fprint(2, "archThread: superGet: %R\n");
437
			sleep(60*1000);
438
			continue;
439
		}
440
		super.current = NilBlock;
441
		memmove(super.last, p.score, VtScoreSize);
442
		superPack(&super, b->data);
443
		blockDirty(b);
444
		blockPut(b);
445
		vtUnlock(a->fs->elk);
446
 
447
		consPrint("archive vac:%V\n", p.score);
448
	}
449
 
450
Done:
451
	a->ref--;
452
	vtWakeup(a->die);
453
	vtUnlock(a->lk);
454
}
455
 
456
void
457
archKick(Arch *a)
458
{
459
	if(a == nil){
460
		fprint(2, "warning: archKick nil\n");
461
		return;
462
	}
463
	vtLock(a->lk);
464
	vtWakeup(a->starve);
465
	vtUnlock(a->lk);
466
}