Warning: Attempt to read property "date" on null in /usr/local/www/websvn.planix.org/blame.php on line 247

Warning: Attempt to read property "msg" on null in /usr/local/www/websvn.planix.org/blame.php on line 247
WebSVN – planix.SVN – Blame – /os/branches/feature_tlsv12/sys/src/cmd/venti/srv/lumpqueue.c – Rev 2

Subversion Repositories planix.SVN

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include "stdinc.h"
2
#include "dat.h"
3
#include "fns.h"
4
 
5
typedef struct LumpQueue	LumpQueue;
6
typedef struct WLump		WLump;
7
 
8
enum
9
{
10
	MaxLumpQ	= 1 << 3	/* max. lumps on a single write queue, must be pow 2 */
11
};
12
 
13
struct WLump
14
{
15
	Lump	*u;
16
	Packet	*p;
17
	int	creator;
18
	int	gen;
19
	uint	ms;
20
};
21
 
22
struct LumpQueue
23
{
24
	QLock	lock;
25
	Rendez 	flush;
26
	Rendez	full;
27
	Rendez	empty;
28
	WLump	q[MaxLumpQ];
29
	int	w;
30
	int	r;
31
};
32
 
33
static LumpQueue	*lumpqs;
34
static int		nqs;
35
 
36
static QLock		glk;
37
static int		gen;
38
 
39
static void	queueproc(void *vq);
40
 
41
int
42
initlumpqueues(int nq)
43
{
44
	LumpQueue *q;
45
 
46
	int i;
47
	nqs = nq;
48
 
49
	lumpqs = MKNZ(LumpQueue, nq);
50
 
51
	for(i = 0; i < nq; i++){
52
		q = &lumpqs[i];
53
		q->full.l = &q->lock;
54
		q->empty.l = &q->lock;
55
		q->flush.l = &q->lock;
56
 
57
		if(vtproc(queueproc, q) < 0){
58
			seterr(EOk, "can't start write queue slave: %r");
59
			return -1;
60
		}
61
	}
62
 
63
	return 0;
64
}
65
 
66
/*
67
 * queue a lump & it's packet data for writing
68
 */
69
int
70
queuewrite(Lump *u, Packet *p, int creator, uint ms)
71
{
72
	LumpQueue *q;
73
	int i;
74
 
75
	trace(TraceProc, "queuewrite");
76
	i = indexsect(mainindex, u->score);
77
	if(i < 0 || i >= nqs){
78
		seterr(EBug, "internal error: illegal index section in queuewrite");
79
		return -1;
80
	}
81
 
82
	q = &lumpqs[i];
83
 
84
	qlock(&q->lock);
85
	while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
86
		trace(TraceProc, "queuewrite sleep");
87
		rsleep(&q->full);
88
	}
89
 
90
	q->q[q->w].u = u;
91
	q->q[q->w].p = p;
92
	q->q[q->w].creator = creator;
93
	q->q[q->w].ms = ms;
94
	q->q[q->w].gen = gen;
95
	q->w = (q->w + 1) & (MaxLumpQ - 1);
96
 
97
	trace(TraceProc, "queuewrite wakeup");
98
	rwakeup(&q->empty);
99
 
100
	qunlock(&q->lock);
101
 
102
	return 0;
103
}
104
 
105
void
106
flushqueue(void)
107
{
108
	int i;
109
	LumpQueue *q;
110
 
111
	if(!lumpqs)
112
		return;
113
 
114
	trace(TraceProc, "flushqueue");
115
 
116
	qlock(&glk);
117
	gen++;
118
	qunlock(&glk);
119
 
120
	for(i=0; i<mainindex->nsects; i++){
121
		q = &lumpqs[i];
122
		qlock(&q->lock);
123
		while(q->w != q->r && gen - q->q[q->r].gen > 0){
124
			trace(TraceProc, "flushqueue sleep q%d", i);
125
			rsleep(&q->flush);
126
		}
127
		qunlock(&q->lock);
128
	}
129
}
130
 
131
static void
132
queueproc(void *vq)
133
{
134
	LumpQueue *q;
135
	Lump *u;
136
	Packet *p;
137
	int creator;
138
	uint ms;
139
 
140
	threadsetname("queueproc");
141
 
142
	q = vq;
143
	for(;;){
144
		qlock(&q->lock);
145
		while(q->w == q->r){
146
			trace(TraceProc, "queueproc sleep empty");
147
			rsleep(&q->empty);
148
		}
149
 
150
		u = q->q[q->r].u;
151
		p = q->q[q->r].p;
152
		creator = q->q[q->r].creator;
153
		ms = q->q[q->r].ms;
154
 
155
		q->r = (q->r + 1) & (MaxLumpQ - 1);
156
		trace(TraceProc, "queueproc wakeup flush");
157
		rwakeupall(&q->flush);
158
 
159
		trace(TraceProc, "queueproc wakeup full");
160
		rwakeup(&q->full);
161
 
162
		qunlock(&q->lock);
163
 
164
		trace(TraceProc, "queueproc writelump %V", u->score);
165
		if(writeqlump(u, p, creator, ms) < 0)
166
			fprint(2, "failed to write lump for %V: %r", u->score);
167
		trace(TraceProc, "queueproc wrotelump %V", u->score);
168
 
169
		putlump(u);
170
	}
171
}