Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 * Mirror one arena partition onto another.  
3
 * Be careful to copy only new data.
4
 */
5
 
6
#include "stdinc.h"
7
#include "dat.h"
8
#include "fns.h"
9
 
10
Channel *writechan;
11
 
12
typedef struct Write Write;
13
struct Write
14
{
15
	uchar *p;
16
	int n;
17
	uvlong o;
18
	int error;
19
};
20
 
21
Part *src;
22
Part *dst;
23
int force;
24
int verbose;
25
int dosha1 = 1;
26
char *status;
27
uvlong astart, aend;
28
 
29
void
30
usage(void)
31
{
32
	fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n");
33
	threadexitsall("usage");
34
}
35
 
36
char *tagged;
37
 
38
void
39
tag(char *fmt, ...)
40
{
41
	va_list arg;
42
 
43
	if(tagged){
44
		free(tagged);
45
		tagged = nil;
46
	}
47
	va_start(arg, fmt);
48
	tagged = vsmprint(fmt, arg);
49
	va_end(arg);
50
}
51
 
52
void
53
chat(char *fmt, ...)
54
{
55
	va_list arg;
56
 
57
	if(tagged){
58
		write(1, tagged, strlen(tagged));
59
		free(tagged);
60
		tagged = nil;
61
	}
62
	va_start(arg, fmt);
63
	vfprint(1, fmt, arg);
64
	va_end(arg);
65
}
66
 
67
#pragma varargck argpos tag 1
68
#pragma varargck argpos chat 1
69
 
70
 
71
int
72
ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
73
{
74
	if(readpart(p, offset, buf, count) != count){
75
		chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
76
		return -1;
77
	}
78
	return 0;
79
}
80
 
81
int
82
ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
83
{
84
	if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
85
		chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
86
		return -1;
87
	}
88
	return 0;
89
}
90
 
91
/*
92
 * Extra proc to do writes to dst, so that we can overlap reading
93
 * src with writing dst during copy.  This is an easy factor of two
94
 * (almost) in performance.
95
 */
96
static Write wsync;
97
static void
98
writeproc(void *v)
99
{
100
	Write *w;
101
 
102
	USED(v);
103
	while((w = recvp(writechan)) != nil){
104
		if(w == &wsync)
105
			continue;
106
		if(ewritepart(dst, w->o, w->p, w->n) < 0)
107
			w->error = 1;
108
	}
109
}
110
 
111
int
112
copy(uvlong start, uvlong end, char *what, DigestState *ds)
113
{
114
	int i, n;
115
	uvlong o;
116
	static uchar tmp[2][1024*1024];
117
	Write w[2];
118
 
119
	assert(start <= end);
120
	assert(astart <= start && start < aend);
121
	assert(astart <= end && end <= aend);
122
 
123
	if(verbose && start != end)
124
		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
125
 
126
	i = 0;
127
	memset(w, 0, sizeof w);
128
	for(o=start; o<end; o+=n){
129
		if(w[i].error)
130
			goto error;
131
		n = sizeof tmp[i];
132
		if(o+n > end)
133
			n = end - o;
134
		if(ereadpart(src, o, tmp[i], n) < 0)
135
			goto error;
136
		w[i].p = tmp[i];
137
		w[i].o = o;
138
		w[i].n = n;
139
		w[i].error = 0;
140
		sendp(writechan, &w[i]);
141
		if(ds)
142
			sha1(tmp[i], n, nil, ds);
143
		i = 1-i;
144
	}
145
	if(w[i].error)
146
		goto error;
147
 
148
	/*
149
	 * wait for queued write to finish
150
	 */
151
	sendp(writechan, &wsync);
152
	i = 1-i;
153
	if(w[i].error)
154
		return -1;
155
	return 0;
156
 
157
error:
158
	/*
159
	 * sync with write proc
160
	 */
161
	w[i].p = nil;
162
	w[i].o = 0;
163
	w[i].n = 0;
164
	w[i].error = 0;
165
	sendp(writechan, &w[i]);
166
	return -1;
167
}
168
 
169
/* single-threaded, for reference */
170
int
171
copy1(uvlong start, uvlong end, char *what, DigestState *ds)
172
{
173
	int n;
174
	uvlong o;
175
	static uchar tmp[1024*1024];
176
 
177
	assert(start <= end);
178
	assert(astart <= start && start < aend);
179
	assert(astart <= end && end <= aend);
180
 
181
	if(verbose && start != end)
182
		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
183
 
184
	for(o=start; o<end; o+=n){
185
		n = sizeof tmp;
186
		if(o+n > end)
187
			n = end - o;
188
		if(ereadpart(src, o, tmp, n) < 0)
189
			return -1;
190
		if(ds)
191
			sha1(tmp, n, nil, ds);
192
		if(ewritepart(dst, o, tmp, n) < 0)
193
			return -1;
194
	}
195
	return 0;
196
}
197
 
198
int
199
asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
200
{
201
	int n;
202
	uvlong o;
203
	static uchar tmp[1024*1024];
204
 
205
	if(start == end)
206
		return 0;
207
	assert(start < end);
208
 
209
	if(verbose)
210
		chat("%T   sha1 %,llud-%,llud\n", start, end);
211
 
212
	for(o=start; o<end; o+=n){
213
		n = sizeof tmp;
214
		if(o+n > end)
215
			n = end - o;
216
		if(ereadpart(p, o, tmp, n) < 0)
217
			return -1;
218
		sha1(tmp, n, nil, ds);
219
	}
220
	return 0;
221
}
222
 
223
uvlong
224
rdown(uvlong a, int b)
225
{
226
	return a-a%b;
227
}
228
 
229
uvlong
230
rup(uvlong a, int b)
231
{
232
	if(a%b == 0)
233
		return a;
234
	return a+b-a%b;
235
}
236
 
237
void
238
mirror(Arena *sa, Arena *da)
239
{
240
	vlong v, si, di, end;
241
	int clumpmax, blocksize, sealed;
242
	static uchar buf[MaxIoSize];
243
	ArenaHead h;
244
	DigestState xds, *ds;
245
	vlong shaoff, base;
246
 
247
	base = sa->base;
248
	blocksize = sa->blocksize;
249
	end = sa->base + sa->size;
250
 
251
	astart = base - blocksize;
252
	aend = end + blocksize;
253
 
254
	tag("%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
255
 
256
	if(force){
257
		copy(astart, aend, "all", nil);
258
		return;
259
	}
260
 
261
	if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
262
		if(scorecmp(sa->score, da->score) == 0){
263
			if(verbose)
264
				chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
265
			return;
266
		}
267
		chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
268
		/* Keep executing; will correct seal if possible. */
269
	}
270
	if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
271
		chat("%T %s: dst is sealed, src is not\n", sa->name);
272
		status = "errors";
273
		return;
274
	}
275
	if(sa->diskstats.used < da->diskstats.used){
276
		chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
277
		status = "errors";
278
		return;
279
	}
280
 
281
	if(da->clumpmagic != sa->clumpmagic){
282
		/*
283
		 * Write this now to reduce the window in which
284
		 * the head and tail disagree about clumpmagic.
285
		 */
286
		da->clumpmagic = sa->clumpmagic;
287
		memset(buf, 0, sizeof buf);
288
		packarena(da, buf);
289
		if(ewritepart(dst, end, buf, blocksize) < 0)
290
			return;
291
	}
292
 
293
	memset(&h, 0, sizeof h);
294
	h.version = da->version;
295
	strcpy(h.name, da->name);
296
	h.blocksize = da->blocksize;
297
	h.size = da->size + 2*da->blocksize;
298
	h.clumpmagic = da->clumpmagic;
299
	memset(buf, 0, sizeof buf);
300
	packarenahead(&h, buf);
301
	if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
302
		return;
303
 
304
	shaoff = 0;
305
	ds = nil;
306
	sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0;
307
	if(sealed && dosha1){
308
		/* start sha1 state with header */
309
		memset(&xds, 0, sizeof xds);
310
		ds = &xds;
311
		sha1(buf, blocksize, nil, ds);
312
		shaoff = base;
313
	}
314
 
315
	if(sa->diskstats.used != da->diskstats.used){
316
		di = base+rdown(da->diskstats.used, blocksize);
317
		si = base+rup(sa->diskstats.used, blocksize);
318
		if(ds && asha1(dst, shaoff, di, ds) < 0)
319
			return;
320
		if(copy(di, si, "data", ds) < 0)
321
			return;
322
		shaoff = si;
323
	}
324
 
325
	clumpmax = sa->clumpmax;
326
	di = end - da->diskstats.clumps/clumpmax * blocksize;
327
	si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
328
 
329
	if(sa->diskstats.sealed){
330
		/*
331
		 * might be a small hole between the end of the 
332
		 * data and the beginning of the directory.
333
		 */
334
		v = base+rup(sa->diskstats.used, blocksize);
335
		if(ds && asha1(dst, shaoff, v, ds) < 0)
336
			return;
337
		if(copy(v, si, "hole", ds) < 0)
338
			return;
339
		shaoff = si;
340
	}
341
 
342
	if(da->diskstats.clumps != sa->diskstats.clumps){
343
		if(ds && asha1(dst, shaoff, si, ds) < 0)
344
			return;
345
		if(copy(si, di, "directory", ds) < 0)	/* si < di  because clumpinfo blocks grow down */
346
			return;
347
		shaoff = di;
348
	}
349
 
350
	da->ctime = sa->ctime;
351
	da->wtime = sa->wtime;
352
	da->diskstats = sa->diskstats;
353
	da->diskstats.sealed = 0;
354
 
355
	/*
356
	 * Repack the arena tail information
357
	 * and save it for next time...
358
	 */
359
	memset(buf, 0, sizeof buf);
360
	packarena(da, buf);
361
	if(ewritepart(dst, end, buf, blocksize) < 0)
362
		return;
363
 
364
	if(sealed){
365
		/*
366
		 * ... but on the final pass, copy the encoding
367
		 * of the tail information from the source
368
		 * arena itself.  There are multiple possible
369
		 * ways to write the tail info out (the exact
370
		 * details have changed as venti went through
371
		 * revisions), and to keep the SHA1 hash the
372
		 * same, we have to use what the disk uses.
373
		 */
374
		if(asha1(dst, shaoff, end, ds) < 0
375
		|| copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
376
			return;
377
		if(dosha1){
378
			memset(buf, 0, VtScoreSize);
379
			sha1(buf, VtScoreSize, da->score, ds);
380
			if(scorecmp(sa->score, da->score) == 0){
381
				if(verbose)
382
					chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
383
				if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
384
					return;
385
			}else{
386
				chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
387
				memset(&xds, 0, sizeof xds);
388
				asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
389
				sha1(buf, VtScoreSize, 0, &xds);
390
				chat("%T   reseal: %V\n", da->score);
391
				status = "errors";
392
			}
393
		}else{
394
			if(verbose)
395
				chat("%T %s: %V mirrored\n", sa->name, sa->score);
396
			if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0)
397
				return;
398
		}
399
	}else{
400
		chat("%T %s: %,lld used mirrored\n",
401
			sa->name, sa->diskstats.used);
402
	}
403
}
404
 
405
void
406
mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
407
{
408
	int i, lo, hi;
409
	char *s, *t;
410
	Arena *sa, *da;
411
 
412
	if(range == nil){
413
		for(i=0; i<sp->narenas; i++){
414
			sa = sp->arenas[i];
415
			da = dp->arenas[i];
416
			mirror(sa, da);
417
		}
418
		return;
419
	}
420
	if(strcmp(range, "none") == 0)
421
		return;
422
 
423
	for(s=range; *s; s=t){
424
		t = strchr(s, ',');
425
		if(t)
426
			*t++ = 0;
427
		else
428
			t = s+strlen(s);
429
		if(*s == '-')
430
			lo = 0;
431
		else
432
			lo = strtol(s, &s, 0);
433
		hi = lo;
434
		if(*s == '-'){
435
			s++;
436
			if(*s == 0)
437
				hi = sp->narenas-1;
438
			else
439
				hi = strtol(s, &s, 0);
440
		}
441
		if(*s != 0){
442
			chat("%T bad arena range: %s\n", s);
443
			continue;
444
		}
445
		for(i=lo; i<=hi; i++){
446
			sa = sp->arenas[i];
447
			da = dp->arenas[i];
448
			mirror(sa, da);
449
		}
450
	}	
451
}
452
 
453
 
454
void
455
threadmain(int argc, char **argv)
456
{
457
	int i;
458
	Arena *sa, *da;
459
	ArenaPart *s, *d;
460
	char *ranges;
461
 
462
	ventifmtinstall();
463
 
464
	ARGBEGIN{
465
	case 'F':
466
		force = 1;
467
		break;
468
	case 'v':
469
		verbose++;
470
		break;
471
	case 's':
472
		dosha1 = 0;
473
		break;
474
	default:
475
		usage();
476
	}ARGEND
477
 
478
	if(argc != 2 && argc != 3)
479
		usage();
480
	ranges = nil;
481
	if(argc == 3)
482
		ranges = argv[2];
483
 
484
	if((src = initpart(argv[0], OREAD)) == nil)
485
		sysfatal("initpart %s: %r", argv[0]);
486
	if((dst = initpart(argv[1], ORDWR)) == nil)
487
		sysfatal("initpart %s: %r", argv[1]);
488
	if((s = initarenapart(src)) == nil)
489
		sysfatal("initarenapart %s: %r", argv[0]);
490
	for(i=0; i<s->narenas; i++)
491
		delarena(s->arenas[i]);
492
	if((d = initarenapart(dst)) == nil)
493
		sysfatal("loadarenapart %s: %r", argv[1]);
494
	for(i=0; i<d->narenas; i++)
495
		delarena(d->arenas[i]);
496
 
497
	/*
498
	 * The arena geometries must match or all bets are off.
499
	 */
500
	if(s->narenas != d->narenas)
501
		sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
502
	for(i=0; i<s->narenas; i++){
503
		sa = s->arenas[i];
504
		da = d->arenas[i];
505
		if(sa->version != da->version)
506
			sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
507
		if(sa->blocksize != da->blocksize)
508
			sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
509
		if(sa->size != da->size)
510
			sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
511
		if(strcmp(sa->name, da->name) != 0)
512
			sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
513
	}
514
 
515
	/*
516
	 * Mirror one arena at a time.
517
	 */
518
	writechan = chancreate(sizeof(void*), 0);
519
	vtproc(writeproc, nil);
520
	mirrormany(s, d, ranges);
521
	sendp(writechan, nil);
522
	threadexitsall(status);
523
}