Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 * Write the dirty icache entries to disk.  Random seeks are
3
 * so expensive that it makes sense to wait until we have
4
 * a lot and then just make a sequential pass over the disk.
5
 */
6
#include "stdinc.h"
7
#include "dat.h"
8
#include "fns.h"
9
 
10
static void icachewriteproc(void*);
11
static void icachewritecoord(void*);
12
static IEntry *iesort(IEntry*);
13
 
14
int icachesleeptime = 1000;	/* milliseconds */
15
int minicachesleeptime = 0;
16
 
17
enum
18
{
19
	Bufsize = 8*1024*1024
20
};
21
 
22
typedef struct IWrite IWrite;
23
struct IWrite
24
{
25
	Round round;
26
	AState as;
27
};
28
 
29
static IWrite iwrite;
30
 
31
void
32
initicachewrite(void)
33
{
34
	int i;
35
	Index *ix;
36
 
37
	initround(&iwrite.round, "icache", 120*60*1000);
38
	ix = mainindex;
39
	for(i=0; i<ix->nsects; i++){
40
		ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
41
		ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
42
		vtproc(icachewriteproc, ix->sects[i]);
43
	}
44
	vtproc(icachewritecoord, nil);
45
	vtproc(delaykickroundproc, &iwrite.round);
46
}
47
 
48
static u64int
49
ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
50
{
51
	u64int bucket, addr;
52
 
53
	bucket = hashbits(ie->score, 32)/ix->div;
54
	addr = is->blockbase + ((bucket - is->start) << is->blocklog);
55
	return addr;
56
}
57
 
58
static IEntry*
59
nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
60
{
61
	u64int addr, naddr;
62
	uint nbuf;
63
	int bsize;
64
	IEntry *iefirst, *ie, **l;
65
 
66
	bsize = 1<<is->blocklog;
67
	iefirst = *pie;
68
	addr = ie2diskaddr(ix, is, iefirst);
69
	nbuf = 0;
70
	for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
71
		naddr = ie2diskaddr(ix, is, ie);
72
		if(naddr - addr >= Bufsize)
73
			break;
74
		nbuf = naddr - addr;
75
	}
76
	nbuf += bsize;
77
 
78
	*l = nil;
79
	*pie = ie;
80
	*paddr = addr;
81
	*pnbuf = nbuf;
82
	return iefirst;
83
}
84
 
85
static int
86
icachewritesect(Index *ix, ISect *is, u8int *buf)
87
{
88
	int err, i, werr, h, bsize, t;
89
	u32int lo, hi;
90
	u64int addr, naddr;
91
	uint nbuf, off;
92
	DBlock *b;
93
	IBucket ib;
94
	IEntry *ie, *iedirty, **l, *chunk;
95
 
96
	lo = is->start * ix->div;
97
	if(TWID32/ix->div < is->stop)
98
		hi = TWID32;
99
	else
100
		hi = is->stop * ix->div - 1;
101
 
102
	trace(TraceProc, "icachewritesect enter %ud %ud %llud",
103
		lo, hi, iwrite.as.aa);
104
 
105
	iedirty = icachedirty(lo, hi, iwrite.as.aa);
106
	iedirty = iesort(iedirty);
107
	bsize = 1 << is->blocklog;
108
	err = 0;
109
 
110
	while(iedirty){
111
		disksched();
112
		while((t = icachesleeptime) == SleepForever){
113
			sleep(1000);
114
			disksched();
115
		}
116
		if(t < minicachesleeptime)
117
			t = minicachesleeptime;
118
		if(t > 0)
119
			sleep(t);
120
		trace(TraceProc, "icachewritesect nextchunk");
121
		chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
122
 
123
		trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
124
			addr, nbuf);
125
		if(readpart(is->part, addr, buf, nbuf) < 0){
126
			fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
127
				"readpart: %r\n", argv0, is->part->name, addr);
128
			err  = -1;
129
			continue;
130
		}
131
		trace(TraceProc, "icachewritesect updatebuf");
132
		addstat(StatIsectReadBytes, nbuf);
133
		addstat(StatIsectRead, 1);
134
 
135
		for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
136
again:
137
			naddr = ie2diskaddr(ix, is, ie);
138
			off = naddr - addr;
139
			if(off+bsize > nbuf){
140
				fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
141
					"addr+nbuf=0x%llux naddr=0x%llux\n",
142
					argv0, addr, nbuf, addr+nbuf, naddr);
143
				assert(off+bsize <= nbuf);
144
			}
145
			unpackibucket(&ib, buf+off, is->bucketmagic);
146
			if(okibucket(&ib, is) < 0){
147
				fprint(2, "%s: bad bucket XXX\n", argv0);
148
				goto skipit;
149
			}
150
			trace(TraceProc, "icachewritesect add %V at 0x%llux",
151
				ie->score, naddr);
152
			h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
153
			if(h & 1){
154
				h ^= 1;
155
				packientry(ie, &ib.data[h]);
156
			}else if(ib.n < is->buckmax){
157
				memmove(&ib.data[h + IEntrySize], &ib.data[h],
158
					ib.n*IEntrySize - h);
159
				ib.n++;
160
				packientry(ie, &ib.data[h]);
161
			}else{
162
				fprint(2, "%s: bucket overflow XXX\n", argv0);
163
skipit:
164
				err = -1;
165
				*l = ie->nextdirty;
166
				ie = *l;
167
				if(ie)
168
					goto again;
169
				else
170
					break;
171
			}
172
			packibucket(&ib, buf+off, is->bucketmagic);
173
		}
174
 
175
		diskaccess(1);
176
 
177
		trace(TraceProc, "icachewritesect writepart", addr, nbuf);
178
		werr = 0;
179
		if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
180
			werr = -1;
181
 
182
		for(i=0; i<nbuf; i+=bsize){
183
			if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){
184
				memmove(b->data, buf+i, bsize);
185
				putdblock(b);
186
			}
187
		}
188
 
189
		if(werr < 0){
190
			fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
191
				"writepart: %r\n", argv0, is->part->name, addr);
192
			err = -1;
193
			continue;
194
		}
195
 
196
		addstat(StatIsectWriteBytes, nbuf);
197
		addstat(StatIsectWrite, 1);
198
		icacheclean(chunk);
199
	}
200
 
201
	trace(TraceProc, "icachewritesect done");
202
	return err;
203
}
204
 
205
static void
206
icachewriteproc(void *v)
207
{
208
	int ret;
209
	uint bsize;
210
	ISect *is;
211
	Index *ix;
212
	u8int *buf;
213
 
214
	ix = mainindex;
215
	is = v;
216
	threadsetname("icachewriteproc:%s", is->part->name);
217
 
218
	bsize = 1<<is->blocklog;
219
	buf = emalloc(Bufsize+bsize);
220
	buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
221
 
222
	for(;;){
223
		trace(TraceProc, "icachewriteproc recv");
224
		recv(is->writechan, 0);
225
		trace(TraceWork, "start");
226
		ret = icachewritesect(ix, is, buf);
227
		trace(TraceProc, "icachewriteproc send");
228
		trace(TraceWork, "finish");
229
		sendul(is->writedonechan, ret);
230
	}
231
}
232
 
233
static void
234
icachewritecoord(void *v)
235
{
236
	int i, err;
237
	Index *ix;
238
	AState as;
239
 
240
	USED(v);
241
 
242
	threadsetname("icachewritecoord");
243
 
244
	ix = mainindex;
245
	iwrite.as = icachestate();
246
 
247
	for(;;){
248
		trace(TraceProc, "icachewritecoord sleep");
249
		waitforkick(&iwrite.round);
250
		trace(TraceWork, "start");
251
		as = icachestate();
252
		if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
253
			/* will not be able to do anything more than last flush - kick disk */
254
			trace(TraceProc, "icachewritecoord kick dcache");
255
			kickdcache();
256
			trace(TraceProc, "icachewritecoord kicked dcache");
257
			goto SkipWork;	/* won't do anything; don't bother rewriting bloom filter */
258
		}
259
		iwrite.as = as;
260
 
261
		trace(TraceProc, "icachewritecoord start flush");
262
		if(iwrite.as.arena){
263
			for(i=0; i<ix->nsects; i++)
264
				send(ix->sects[i]->writechan, 0);
265
			if(ix->bloom)
266
				send(ix->bloom->writechan, 0);
267
 
268
			err = 0;
269
			for(i=0; i<ix->nsects; i++)
270
				err |= recvul(ix->sects[i]->writedonechan);
271
			if(ix->bloom)
272
				err |= recvul(ix->bloom->writedonechan);
273
 
274
			trace(TraceProc, "icachewritecoord donewrite err=%d", err);
275
			if(err == 0){
276
				setatailstate(&iwrite.as);
277
			}
278
		}
279
	SkipWork:
280
		icacheclean(nil);	/* wake up anyone waiting */
281
		trace(TraceWork, "finish");
282
		addstat(StatIcacheFlush, 1);
283
	}
284
}
285
 
286
void
287
flushicache(void)
288
{
289
	trace(TraceProc, "flushicache enter");
290
	kickround(&iwrite.round, 1);
291
	trace(TraceProc, "flushicache exit");
292
}
293
 
294
void
295
kickicache(void)
296
{
297
	kickround(&iwrite.round, 0);
298
}
299
 
300
void
301
delaykickicache(void)
302
{
303
	delaykickround(&iwrite.round);
304
}
305
 
306
static IEntry*
307
iesort(IEntry *ie)
308
{
309
	int cmp;
310
	IEntry **l;
311
	IEntry *ie1, *ie2, *sorted;
312
 
313
	if(ie == nil || ie->nextdirty == nil)
314
		return ie;
315
 
316
	/* split the lists */
317
	ie1 = ie;
318
	ie2 = ie;
319
	if(ie2)
320
		ie2 = ie2->nextdirty;
321
	if(ie2)
322
		ie2 = ie2->nextdirty;
323
	while(ie1 && ie2){
324
		ie1 = ie1->nextdirty;
325
		ie2 = ie2->nextdirty;
326
		if(ie2)
327
			ie2 = ie2->nextdirty;
328
	}
329
	if(ie1){
330
		ie2 = ie1->nextdirty;
331
		ie1->nextdirty = nil;
332
	}
333
 
334
	/* sort the lists */
335
	ie1 = iesort(ie);
336
	ie2 = iesort(ie2);
337
 
338
	/* merge the lists */
339
	sorted = nil;
340
	l = &sorted;
341
	cmp = 0;
342
	while(ie1 || ie2){
343
		if(ie1 && ie2)
344
			cmp = scorecmp(ie1->score, ie2->score);
345
		if(ie1==nil || (ie2 && cmp > 0)){
346
			*l = ie2;
347
			l = &ie2->nextdirty;
348
			ie2 = ie2->nextdirty;
349
		}else{
350
			*l = ie1;
351
			l = &ie1->nextdirty;
352
			ie1 = ie1->nextdirty;
353
		}
354
	}
355
	*l = nil;
356
	return sorted;
357
}
358