Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 * Rebuild the index from scratch, in place.
3
 */
4
#include "stdinc.h"
5
#include "dat.h"
6
#include "fns.h"
7
 
8
enum
9
{
10
	MinBufSize = 64*1024,
11
	MaxBufSize = 4*1024*1024,
12
};
13
 
14
int		dumb;
15
int		errors;
16
char		**isect;
17
int		nisect;
18
int		bloom;
19
int		zero;
20
 
21
u32int	isectmem;
22
u64int	totalbuckets;
23
u64int	totalclumps;
24
Channel	*arenadonechan;
25
Channel	*isectdonechan;
26
Index	*ix;
27
 
28
u64int	arenaentries;
29
u64int	skipentries;
30
u64int	indexentries;
31
 
32
static int shouldprocess(ISect*);
33
static void	isectproc(void*);
34
static void	arenapartproc(void*);
35
 
36
void
37
usage(void)
38
{
39
	fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
40
	threadexitsall("usage");
41
}
42
 
43
void
44
threadmain(int argc, char *argv[])
45
{
46
	int fd, i, napart, nfinish, maxdisks;
47
	u32int bcmem, imem;
48
	Config conf;
49
	Part *p;
50
 
51
	maxdisks = 100000;
52
	ventifmtinstall();
53
	imem = 256*1024*1024;
54
	ARGBEGIN{
55
	case 'b':
56
		bloom = 1;
57
		break;
58
	case 'd':	/* debugging - make sure to run all 3 passes */
59
		dumb = 1;
60
		break;
61
	case 'i':
62
		isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
63
		isect[nisect++] = EARGF(usage());
64
		break;
65
	case 'M':
66
		imem = unittoull(EARGF(usage()));
67
		break;
68
	case 'm':	/* temporary - might go away */
69
		maxdisks = atoi(EARGF(usage()));
70
		break;
71
	default:
72
		usage();
73
		break;
74
	}ARGEND
75
 
76
	if(argc != 1)
77
		usage();
78
 
79
	if(initventi(argv[0], &conf) < 0)
80
		sysfatal("can't init venti: %r");
81
	ix = mainindex;
82
	if(nisect == 0 && ix->bloom)
83
		bloom = 1;
84
	if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
85
		sysfatal("loadbloom: %r");
86
	if(bloom && !ix->bloom)
87
		sysfatal("-b specified but no bloom filter");
88
	if(!bloom)
89
		ix->bloom = nil;
90
	isectmem = imem/ix->nsects;
91
 
92
	/*
93
	 * safety first - only need read access to arenas
94
	 */
95
	p = nil;
96
	for(i=0; i<ix->narenas; i++){
97
		if(ix->arenas[i]->part != p){
98
			p = ix->arenas[i]->part;
99
			if((fd = open(p->filename, OREAD)) < 0)
100
				sysfatal("cannot reopen %s: %r", p->filename);
101
			dup(fd, p->fd);
102
			close(fd);
103
		}
104
	}
105
 
106
	/*
107
	 * need a block for every arena
108
	 */
109
	bcmem = maxblocksize * (mainindex->narenas + 16);
110
	if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
111
	initdcache(bcmem);
112
 
113
	totalclumps = 0;
114
	for(i=0; i<ix->narenas; i++)
115
		totalclumps += ix->arenas[i]->diskstats.clumps;
116
 
117
	totalbuckets = 0;
118
	for(i=0; i<ix->nsects; i++)
119
		totalbuckets += ix->sects[i]->blocks;
120
	fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
121
 
122
	/* start index procs */
123
	fprint(2, "%T read index\n");
124
	isectdonechan = chancreate(sizeof(void*), 0);
125
	for(i=0; i<ix->nsects; i++){
126
		if(shouldprocess(ix->sects[i])){
127
			ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
128
			vtproc(isectproc, ix->sects[i]);
129
		}
130
	}
131
 
132
	for(i=0; i<nisect; i++)
133
		if(isect[i])
134
			fprint(2, "warning: did not find index section %s\n", isect[i]);
135
 
136
	/* start arena procs */
137
	p = nil;
138
	napart = 0;
139
	nfinish = 0;
140
	arenadonechan = chancreate(sizeof(void*), 0);
141
	for(i=0; i<ix->narenas; i++){
142
		if(ix->arenas[i]->part != p){
143
			p = ix->arenas[i]->part;
144
			vtproc(arenapartproc, p);
145
			if(++napart >= maxdisks){
146
				recvp(arenadonechan);
147
				nfinish++;
148
			}
149
		}
150
	}
151
 
152
	/* wait for arena procs to finish */
153
	for(nfinish=0; nfinish<napart; nfinish++)
154
		recvp(arenadonechan);
155
 
156
	/* tell index procs to finish */
157
	for(i=0; i<ix->nsects; i++)
158
		if(ix->sects[i]->writechan)
159
			send(ix->sects[i]->writechan, nil);
160
 
161
	/* wait for index procs to finish */
162
	for(i=0; i<ix->nsects; i++)
163
		if(ix->sects[i]->writechan)
164
			recvp(isectdonechan);
165
 
166
	if(ix->bloom && writebloom(ix->bloom) < 0)
167
		fprint(2, "writing bloom filter: %r\n");
168
 
169
	fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n", 
170
		arenaentries, indexentries, skipentries);
171
	threadexitsall(nil);
172
}
173
 
174
static int
175
shouldprocess(ISect *is)
176
{
177
	int i;
178
 
179
	if(nisect == 0)
180
		return 1;
181
 
182
	for(i=0; i<nisect; i++)
183
		if(isect[i] && strcmp(isect[i], is->name) == 0){
184
			isect[i] = nil;
185
			return 1;
186
		}
187
	return 0;
188
}
189
 
190
static void
191
add(u64int *a, u64int n)
192
{
193
	static Lock l;
194
 
195
	lock(&l);
196
	*a += n;
197
	unlock(&l);
198
}
199
 
200
/*
201
 * Read through an arena partition and send each of its IEntries
202
 * to the appropriate index section.  When finished, send on
203
 * arenadonechan.
204
 */
205
enum
206
{
207
	ClumpChunks = 32*1024,
208
};
209
static void
210
arenapartproc(void *v)
211
{
212
	int i, j, n, nskip, x;
213
	u32int clump;
214
	u64int addr, tot;
215
	Arena *a;
216
	ClumpInfo *ci, *cis;
217
	IEntry ie;
218
	Part *p;
219
 
220
	p = v;
221
	threadsetname("arenaproc %s", p->name);
222
 
223
	nskip = 0;
224
	tot = 0;
225
	cis = MKN(ClumpInfo, ClumpChunks);
226
	for(i=0; i<ix->narenas; i++){
227
		a = ix->arenas[i];
228
		if(a->part != p)
229
			continue;
230
		if(a->memstats.clumps)
231
			fprint(2, "%T arena %s: %d entries\n", 
232
				a->name, a->memstats.clumps);
233
		/*
234
		 * Running the loop backwards accesses the 
235
		 * clump info blocks forwards, since they are
236
		 * stored in reverse order at the end of the arena.
237
		 * This speeds things slightly.
238
		 */
239
		addr = ix->amap[i].start + a->memstats.used;
240
		for(clump=a->memstats.clumps; clump > 0; clump-=n){
241
			n = ClumpChunks;
242
			if(n > clump)
243
				n = clump;
244
			if(readclumpinfos(a, clump-n, cis, n) != n){
245
				fprint(2, "%T arena %s: directory read: %r\n", a->name);
246
				errors = 1;
247
				break;
248
			}
249
			for(j=n-1; j>=0; j--){
250
				ci = &cis[j];
251
				ie.ia.type = ci->type;
252
				ie.ia.size = ci->uncsize;
253
				addr -= ci->size + ClumpSize;
254
				ie.ia.addr = addr;
255
				ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
256
				scorecp(ie.score, ci->score);
257
				if(ci->type == VtCorruptType)
258
					nskip++;
259
				else{
260
					tot++;
261
					x = indexsect(ix, ie.score);
262
					assert(0 <= x && x < ix->nsects);
263
					if(ix->sects[x]->writechan)
264
						send(ix->sects[x]->writechan, &ie);
265
					if(ix->bloom)
266
						markbloomfilter(ix->bloom, ie.score);
267
				}
268
			}
269
		}
270
		if(addr != ix->amap[i].start)
271
			fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
272
	}
273
	add(&arenaentries, tot);
274
	add(&skipentries, nskip);
275
	sendp(arenadonechan, p);
276
}
277
 
278
/*
279
 * Convert score into relative bucket number in isect.
280
 * Can pass a packed ientry instead of score - score is first.
281
 */
282
static u32int
283
score2bucket(ISect *is, uchar *score)
284
{
285
	u32int b;
286
 
287
	b = hashbits(score, 32)/ix->div;
288
	if(b < is->start || b >= is->stop){
289
		fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
290
			score, ix->div, b, is->start, is->stop);
291
	}
292
	assert(is->start <= b && b < is->stop);
293
	return b - is->start;
294
}
295
 
296
/*
297
 * Convert offset in index section to bucket number.
298
 */
299
static u32int
300
offset2bucket(ISect *is, u64int offset)
301
{
302
	u32int b;
303
 
304
	assert(is->blockbase <= offset);
305
	offset -= is->blockbase;
306
	b = offset/is->blocksize;
307
	assert(b < is->stop-is->start);
308
	return b;
309
}
310
 
311
/*
312
 * Convert bucket number to offset.
313
 */
314
static u64int
315
bucket2offset(ISect *is, u32int b)
316
{
317
	assert(b <= is->stop-is->start);
318
	return is->blockbase + (u64int)b*is->blocksize;
319
}
320
 
321
/* 
322
 * IEntry buffers to hold initial round of spraying.
323
 */
324
typedef struct Buf Buf;
325
struct Buf
326
{
327
	Part *part;			/* partition being written */
328
	uchar *bp;		/* current block */
329
	uchar *ep;		/* end of block */
330
	uchar *wp;		/* write position in block */
331
	u64int boffset;		/* start offset */
332
	u64int woffset;		/* next write offset */
333
	u64int eoffset;		/* end offset */
334
	u32int nentry;		/* number of entries written */
335
};
336
 
337
static void
338
bflush(Buf *buf)
339
{
340
	u32int bufsize;
341
 
342
	if(buf->woffset >= buf->eoffset)
343
		sysfatal("buf index chunk overflow - need bigger index");
344
	bufsize = buf->ep - buf->bp;
345
	if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
346
		fprint(2, "write %s: %r\n", buf->part->name);
347
		errors = 1;
348
	}
349
	buf->woffset += bufsize;
350
	memset(buf->bp, 0, bufsize);
351
	buf->wp = buf->bp;
352
}
353
 
354
static void
355
bwrite(Buf *buf, IEntry *ie)
356
{
357
	if(buf->wp+IEntrySize > buf->ep)
358
		bflush(buf);
359
	assert(buf->bp <= buf->wp && buf->wp < buf->ep);
360
	packientry(ie, buf->wp);
361
	buf->wp += IEntrySize;
362
	assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
363
	buf->nentry++;
364
}
365
 
366
/*
367
 * Minibuffer.  In-memory data structure holds our place
368
 * in the buffer but has no block data.  We are writing and
369
 * reading the minibuffers at the same time.  (Careful!)
370
 */
371
typedef struct Minibuf Minibuf;
372
struct Minibuf
373
{
374
	u64int boffset;		/* start offset */
375
	u64int roffset;		/* read offset */
376
	u64int woffset;		/* write offset */
377
	u64int eoffset;		/* end offset */
378
	u32int nentry;		/* # entries left to read */
379
	u32int nwentry;	/* # entries written */
380
};
381
 
382
/*
383
 * Index entry pool.  Used when trying to shuffle around 
384
 * the entries in a big buffer into the corresponding M minibuffers.
385
 * Sized to hold M*EntriesPerBlock entries, so that there will always
386
 * either be room in the pool for another block worth of entries
387
 * or there will be an entire block worth of sorted entries to 
388
 * write out.
389
 */
390
typedef struct IEntryLink IEntryLink;
391
typedef struct IPool IPool;
392
 
393
struct IEntryLink
394
{
395
	uchar ie[IEntrySize];		/* raw IEntry */
396
	IEntryLink *next;		/* next in chain */
397
};
398
 
399
struct IPool
400
{
401
	ISect *isect;
402
	u32int buck0;			/* first bucket in pool */
403
	u32int mbufbuckets;	/* buckets per minibuf */
404
	IEntryLink *entry;		/* all IEntryLinks */
405
	u32int nentry;			/* # of IEntryLinks */
406
	IEntryLink *free;		/* free list */
407
	u32int nfree;			/* # on free list */
408
	Minibuf *mbuf;			/* all minibufs */
409
	u32int nmbuf;			/* # of minibufs */
410
	IEntryLink **mlist;		/* lists for each minibuf */
411
	u32int *mcount;		/* # on each mlist[i] */
412
	u32int bufsize;			/* block buffer size */
413
	uchar *rbuf;			/* read buffer */
414
	uchar *wbuf;			/* write buffer */
415
	u32int epbuf;			/* entries per block buffer */
416
};
417
 
418
/*
419
static int
420
countsokay(IPool *p)
421
{
422
	int i;
423
	u64int n;
424
 
425
	n = 0;
426
	for(i=0; i<p->nmbuf; i++)
427
		n += p->mcount[i];
428
	n += p->nfree;
429
	if(n != p->nentry){
430
		print("free %ud:", p->nfree);
431
		for(i=0; i<p->nmbuf; i++)
432
			print(" %ud", p->mcount[i]);
433
		print(" = %lld nentry: %ud\n", n, p->nentry);
434
	}
435
	return n == p->nentry;
436
}
437
*/
438
 
439
static IPool*
440
mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf, 
441
	u32int mbufbuckets, u32int bufsize)
442
{
443
	u32int i, nentry;
444
	uchar *data;
445
	IPool *p;
446
	IEntryLink *l;
447
 
448
	nentry = (nmbuf+1)*bufsize / IEntrySize;
449
	p = ezmalloc(sizeof(IPool)
450
		+nentry*sizeof(IEntry)
451
		+nmbuf*sizeof(IEntryLink*)
452
		+nmbuf*sizeof(u32int)
453
		+3*bufsize);
454
 
455
	p->isect = isect;
456
	p->mbufbuckets = mbufbuckets;
457
	p->bufsize = bufsize;
458
	p->entry = (IEntryLink*)(p+1);
459
	p->nentry = nentry;
460
	p->mlist = (IEntryLink**)(p->entry+nentry);
461
	p->mcount = (u32int*)(p->mlist+nmbuf);
462
	p->nmbuf = nmbuf;
463
	p->mbuf = mbuf;
464
	data = (uchar*)(p->mcount+nmbuf);
465
	data += bufsize - (uintptr)data%bufsize;
466
	p->rbuf = data;
467
	p->wbuf = data+bufsize;
468
	p->epbuf = bufsize/IEntrySize;
469
 
470
	for(i=0; i<p->nentry; i++){
471
		l = &p->entry[i];
472
		l->next = p->free;
473
		p->free = l;
474
		p->nfree++;
475
	}
476
	return p;
477
}
478
 
479
/* 
480
 * Add the index entry ie to the pool p.
481
 * Caller must know there is room.
482
 */
483
static void
484
ipoolinsert(IPool *p, uchar *ie)
485
{
486
	u32int buck, x;
487
	IEntryLink *l;
488
 
489
	assert(p->free != nil);
490
 
491
	buck = score2bucket(p->isect, ie);
492
	x = (buck-p->buck0) / p->mbufbuckets;
493
	if(x >= p->nmbuf){
494
		fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
495
			buck, p->mbufbuckets, x);
496
	}
497
	assert(x < p->nmbuf);
498
 
499
	l = p->free;
500
	p->free = l->next;
501
	p->nfree--;
502
	memmove(l->ie, ie, IEntrySize);
503
	l->next = p->mlist[x];
504
	p->mlist[x] = l;
505
	p->mcount[x]++;
506
}	
507
 
508
/*
509
 * Pull out a block containing as many
510
 * entries as possible for minibuffer x.
511
 */
512
static u32int
513
ipoolgetbuf(IPool *p, u32int x)
514
{
515
	uchar *bp, *ep, *wp;
516
	IEntryLink *l;
517
	u32int n;
518
 
519
	bp = p->wbuf;
520
	ep = p->wbuf + p->bufsize;
521
	n = 0;
522
	assert(x < p->nmbuf);
523
	for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
524
		l = p->mlist[x];
525
		p->mlist[x] = l->next;
526
		p->mcount[x]--;
527
		memmove(wp, l->ie, IEntrySize);
528
		l->next = p->free;
529
		p->free = l;
530
		p->nfree++;
531
		n++;
532
	}
533
	memset(wp, 0, ep-wp);
534
	return n;
535
}
536
 
537
/*
538
 * Read a block worth of entries from the minibuf
539
 * into the pool.  Caller must know there is room.
540
 */
541
static void
542
ipoolloadblock(IPool *p, Minibuf *mb)
543
{
544
	u32int i, n;
545
 
546
	assert(mb->nentry > 0);
547
	assert(mb->roffset >= mb->woffset);
548
	assert(mb->roffset < mb->eoffset);
549
 
550
	n = p->bufsize/IEntrySize;
551
	if(n > mb->nentry)
552
		n = mb->nentry;
553
	if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
554
		fprint(2, "readpart %s: %r\n", p->isect->part->name);
555
	else{
556
		for(i=0; i<n; i++)
557
			ipoolinsert(p, p->rbuf+i*IEntrySize);
558
	}
559
	mb->nentry -= n;
560
	mb->roffset += p->bufsize;
561
}
562
 
563
/*
564
 * Write out a block worth of entries to minibuffer x.
565
 * If necessary, pick up the data there before overwriting it.
566
 */
567
static void
568
ipoolflush0(IPool *pool, u32int x)
569
{
570
	u32int bufsize;
571
	Minibuf *mb;
572
 
573
	mb = pool->mbuf+x;
574
	bufsize = pool->bufsize;
575
	mb->nwentry += ipoolgetbuf(pool, x);
576
	if(mb->nentry > 0 && mb->roffset == mb->woffset){
577
		assert(pool->nfree >= pool->bufsize/IEntrySize);
578
		/*
579
		 * There will be room in the pool -- we just 
580
		 * removed a block worth.
581
		 */
582
		ipoolloadblock(pool, mb);
583
	}
584
	if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
585
		fprint(2, "writepart %s: %r\n", pool->isect->part->name);
586
	mb->woffset += bufsize;
587
}
588
 
589
/*
590
 * Write out some full block of entries.
591
 * (There must be one -- the pool is almost full!)
592
 */
593
static void
594
ipoolflush1(IPool *pool)
595
{
596
	u32int i;
597
 
598
	assert(pool->nfree <= pool->epbuf);
599
 
600
	for(i=0; i<pool->nmbuf; i++){
601
		if(pool->mcount[i] >= pool->epbuf){
602
			ipoolflush0(pool, i);
603
			return;
604
		}
605
	}
606
	/* can't be reached - someone must be full */
607
	sysfatal("ipoolflush1");
608
}
609
 
610
/*
611
 * Flush all the entries in the pool out to disk.
612
 * Nothing more to read from disk.
613
 */
614
static void
615
ipoolflush(IPool *pool)
616
{
617
	u32int i;
618
 
619
	for(i=0; i<pool->nmbuf; i++)
620
		while(pool->mlist[i])
621
			ipoolflush0(pool, i);
622
	assert(pool->nfree == pool->nentry);
623
}
624
 
625
/*
626
 * Third pass.  Pick up each minibuffer from disk into
627
 * memory and then write out the buckets.
628
 */
629
 
630
/*
631
 * Compare two packed index entries.  
632
 * Usual ordering except break ties by putting higher
633
 * index addresses first (assumes have duplicates
634
 * due to corruption in the lower addresses).
635
 */
636
static int
637
ientrycmpaddr(const void *va, const void *vb)
638
{
639
	int i;
640
	uchar *a, *b;
641
 
642
	a = (uchar*)va;
643
	b = (uchar*)vb;
644
	i = ientrycmp(a, b);
645
	if(i)
646
		return i;
647
	return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
648
}
649
 
650
static void
651
zerorange(Part *p, u64int o, u64int e)
652
{
653
	static uchar zero[MaxIoSize];
654
	u32int n;
655
 
656
	for(; o<e; o+=n){
657
		n = sizeof zero;
658
		if(o+n > e)
659
			n = e-o;
660
		if(writepart(p, o, zero, n) < 0)
661
			fprint(2, "writepart %s: %r\n", p->name);
662
	}
663
}
664
 
665
/*
666
 * Load a minibuffer into memory and write out the 
667
 * corresponding buckets.
668
 */
669
static void
670
sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
671
{
672
	uchar *buckdata, *p, *q, *ep;
673
	u32int b, lastb, memsize, n;
674
	u64int o;
675
	IBucket ib;
676
	Part *part;
677
 
678
	part = is->part;
679
	buckdata = emalloc(is->blocksize);
680
 
681
	if(mb->nwentry == 0)
682
		return;
683
 
684
	/*
685
	 * read entire buffer.
686
	 */
687
	assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
688
	assert(mb->woffset-mb->boffset <= nbuf);
689
	if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
690
		fprint(2, "readpart %s: %r\n", part->name);
691
		errors = 1;
692
		return;
693
	}
694
	assert(*(uint*)buf != 0xa5a5a5a5);
695
 
696
	/*
697
	 * remove fragmentation due to IEntrySize
698
	 * not evenly dividing Bufsize
699
	 */
700
	memsize = (bufsize/IEntrySize)*IEntrySize;
701
	for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
702
		memmove(p, q, memsize);
703
		p += memsize;
704
		q += bufsize;
705
	}
706
	ep = buf + mb->nwentry*IEntrySize;
707
	assert(ep <= buf+nbuf);
708
 
709
	/* 
710
	 * sort entries
711
	 */
712
	qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
713
 
714
	/*
715
	 * write buckets out
716
	 */
717
	n = 0;
718
	lastb = offset2bucket(is, mb->boffset);
719
	for(p=buf; p<ep; p=q){
720
		b = score2bucket(is, p);
721
		for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
722
			;
723
		if(lastb+1 < b && zero)
724
			zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
725
		if(IBucketSize+(q-p) > is->blocksize)
726
			sysfatal("bucket overflow - make index bigger");
727
		memmove(buckdata+IBucketSize, p, q-p);
728
		ib.n = (q-p)/IEntrySize;
729
		n += ib.n;
730
		packibucket(&ib, buckdata, is->bucketmagic);
731
		if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
732
			fprint(2, "write %s: %r\n", part->name);
733
		lastb = b;
734
	}
735
	if(lastb+1 < is->stop-is->start && zero)
736
		zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
737
 
738
	if(n != mb->nwentry)
739
		fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
740
 
741
	free(buckdata);
742
}
743
 
744
static void
745
isectproc(void *v)
746
{
747
	u32int buck, bufbuckets, bufsize, epbuf, i, j;
748
	u32int mbufbuckets, n, nbucket, nn, space;
749
	u32int nbuf, nminibuf, xminiclump, prod;
750
	u64int blocksize, offset, xclump;
751
	uchar *data, *p;
752
	Buf *buf;
753
	IEntry ie;
754
	IPool *ipool;
755
	ISect *is;
756
	Minibuf *mbuf, *mb;
757
 
758
	is = v;
759
	blocksize = is->blocksize;
760
	nbucket = is->stop - is->start;
761
 
762
	/*
763
	 * Three passes:
764
	 *	pass 1 - write index entries from arenas into 
765
	 *		large sequential sections on index disk.
766
	 *		requires nbuf * bufsize memory.
767
	 *
768
	 *	pass 2 - split each section into minibufs.
769
	 *		requires nminibuf * bufsize memory.
770
	 *
771
	 *	pass 3 - read each minibuf into memory and
772
	 *		write buckets out. 
773
	 *		requires entries/minibuf * IEntrySize memory.
774
	 * 
775
	 * The larger we set bufsize the less seeking hurts us.
776
	 * 
777
	 * The fewer sections and minibufs we have, the less
778
	 * seeking hurts us.
779
	 * 
780
	 * The fewer sections and minibufs we have, the 
781
	 * more entries we end up with in each minibuf
782
	 * at the end.  
783
	 *
784
	 * Shoot for using half our memory to hold each
785
	 * minibuf.  The chance of a random distribution 
786
	 * getting off by 2x is quite low.  
787
	 *
788
	 * Once that is decided, figure out the smallest 
789
	 * nminibuf and nsection/biggest bufsize we can use
790
	 * and still fit in the memory constraints.
791
	 */
792
 
793
	/* expected number of clump index entries we'll see */
794
	xclump = nbucket * (double)totalclumps/totalbuckets;
795
 
796
	/* number of clumps we want to see in a minibuf */
797
	xminiclump = isectmem/2/IEntrySize;
798
 
799
	/* total number of minibufs we need */
800
	prod = (xclump+xminiclump-1) / xminiclump;
801
 
802
	/* if possible, skip second pass */
803
	if(!dumb && prod*MinBufSize < isectmem){
804
		nbuf = prod;
805
		nminibuf = 1;
806
	}else{
807
		/* otherwise use nsection = sqrt(nmini) */
808
		for(nbuf=1; nbuf*nbuf<prod; nbuf++)
809
			;
810
		if(nbuf*MinBufSize > isectmem)
811
			sysfatal("not enough memory");
812
		nminibuf = nbuf;
813
	}
814
	if (nbuf == 0) {
815
		fprint(2, "%s: brand-new index, no work to do\n", argv0);
816
		threadexitsall(0);
817
	}
818
 
819
	/* size buffer to use extra memory */
820
	bufsize = MinBufSize;
821
	while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
822
		bufsize *= 2;
823
	data = emalloc(nbuf*bufsize);
824
	epbuf = bufsize/IEntrySize;
825
	fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
826
		is->part->name, nbucket, nbuf, nminibuf, bufsize);
827
	/*
828
	 * Accept index entries from arena procs.
829
	 */
830
	buf = MKNZ(Buf, nbuf);
831
	p = data;
832
	offset = is->blockbase;
833
	bufbuckets = (nbucket+nbuf-1)/nbuf;
834
	for(i=0; i<nbuf; i++){
835
		buf[i].part = is->part;
836
		buf[i].bp = p;
837
		buf[i].wp = p;
838
		p += bufsize;
839
		buf[i].ep = p;
840
		buf[i].boffset = offset;
841
		buf[i].woffset = offset;
842
		if(i < nbuf-1){
843
			offset += bufbuckets*blocksize;
844
			buf[i].eoffset = offset;
845
		}else{
846
			offset = is->blockbase + nbucket*blocksize;
847
			buf[i].eoffset = offset;
848
		}
849
	}
850
	assert(p == data+nbuf*bufsize);
851
 
852
	n = 0;
853
	while(recv(is->writechan, &ie) == 1){
854
		if(ie.ia.addr == 0)
855
			break;
856
		buck = score2bucket(is, ie.score);
857
		i = buck/bufbuckets;
858
		assert(i < nbuf);
859
		bwrite(&buf[i], &ie);
860
		n++;
861
	}
862
	add(&indexentries, n);
863
 
864
	nn = 0;
865
	for(i=0; i<nbuf; i++){
866
		bflush(&buf[i]);
867
		buf[i].bp = nil;
868
		buf[i].ep = nil;
869
		buf[i].wp = nil;
870
		nn += buf[i].nentry;
871
	}
872
	if(n != nn)
873
		fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
874
 
875
	free(data);
876
 
877
	fprint(2, "%T %s: reordering\n", is->part->name);
878
 
879
	/*
880
	 * Rearrange entries into minibuffers and then
881
	 * split each minibuffer into buckets.
882
	 * The minibuffer must be sized so that it is 
883
	 * a multiple of blocksize -- ipoolloadblock assumes
884
	 * that each minibuf starts aligned on a blocksize
885
	 * boundary.
886
	 */
887
	mbuf = MKN(Minibuf, nminibuf);
888
	mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
889
	while(mbufbuckets*blocksize % bufsize)
890
		mbufbuckets++;
891
	for(i=0; i<nbuf; i++){
892
		/*
893
		 * Set up descriptors.
894
		 */
895
		n = buf[i].nentry;
896
		nn = 0;
897
		offset = buf[i].boffset;
898
		memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
899
		for(j=0; j<nminibuf; j++){
900
			mb = &mbuf[j];
901
			mb->boffset = offset;
902
			offset += mbufbuckets*blocksize;
903
			if(offset > buf[i].eoffset)
904
				offset = buf[i].eoffset;
905
			mb->eoffset = offset;
906
			mb->roffset = mb->boffset;
907
			mb->woffset = mb->boffset;
908
			mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
909
			if(mb->nentry > buf[i].nentry)
910
				mb->nentry = buf[i].nentry;
911
			buf[i].nentry -= mb->nentry;
912
			nn += mb->nentry;
913
		}
914
		if(n != nn)
915
			fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
916
		/*
917
		 * Rearrange.
918
		 */
919
		if(!dumb && nminibuf == 1){
920
			mbuf[0].nwentry = mbuf[0].nentry;
921
			mbuf[0].woffset = buf[i].woffset;
922
		}else{
923
			ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
924
			ipool->buck0 = bufbuckets*i;
925
			for(j=0; j<nminibuf; j++){
926
				mb = &mbuf[j];
927
				while(mb->nentry > 0){
928
					if(ipool->nfree < epbuf){
929
						ipoolflush1(ipool);
930
						/* ipoolflush1 might change mb->nentry */	
931
						continue;
932
					}
933
					assert(ipool->nfree >= epbuf);
934
					ipoolloadblock(ipool, mb);
935
				}
936
			}
937
			ipoolflush(ipool);
938
			nn = 0;
939
			for(j=0; j<nminibuf; j++)
940
				nn += mbuf[j].nwentry;
941
			if(n != nn)
942
				fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
943
			free(ipool);
944
		}
945
 
946
		/*
947
		 * Make buckets.
948
		 */
949
		space = 0;
950
		for(j=0; j<nminibuf; j++)
951
			if(space < mbuf[j].woffset - mbuf[j].boffset)
952
				space = mbuf[j].woffset - mbuf[j].boffset;
953
 
954
		data = emalloc(space);
955
		for(j=0; j<nminibuf; j++){
956
			mb = &mbuf[j];
957
			sortminibuffer(is, mb, data, space, bufsize);
958
		}
959
		free(data);
960
	}
961
 
962
	sendp(isectdonechan, is);
963
}
964
 
965
 
966