Subversion Repositories planix.SVN

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include <u.h>
2
#include <libc.h>
3
#include <venti.h>
4
#include "queue.h"
5
 
6
long ventisendbytes, ventisendpackets;
7
long ventirecvbytes, ventirecvpackets;
8
 
9
static int
10
_vtsend(VtConn *z, Packet *p)
11
{
12
	IOchunk ioc;
13
	int n, tot;
14
	uchar buf[2];
15
 
16
	if(z->state != VtStateConnected) {
17
		werrstr("session not connected");
18
		return -1;
19
	}
20
 
21
	/* add framing */
22
	n = packetsize(p);
23
	if(n >= (1<<16)) {
24
		werrstr("packet too large");
25
		packetfree(p);
26
		return -1;
27
	}
28
	buf[0] = n>>8;
29
	buf[1] = n;
30
	packetprefix(p, buf, 2);
31
	ventisendbytes += n+2;
32
	ventisendpackets++;
33
 
34
	tot = 0;
35
	for(;;){
36
		n = packetfragments(p, &ioc, 1, 0);
37
		if(n == 0)
38
			break;
39
		if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40
			vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
41
			packetfree(p);
42
			return -1;
43
		}
44
		packetconsume(p, nil, ioc.len);
45
		tot += ioc.len;
46
	}
47
	vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
48
	packetfree(p);
49
	return 1;
50
}
51
 
52
static int
53
interrupted(void)
54
{
55
	char e[ERRMAX];
56
 
57
	rerrstr(e, sizeof e);
58
	return strstr(e, "interrupted") != nil;
59
}
60
 
61
 
62
static Packet*
63
_vtrecv(VtConn *z)
64
{
65
	uchar buf[10], *b;
66
	int n;
67
	Packet *p;
68
	int size, len;
69
 
70
	if(z->state != VtStateConnected) {
71
		werrstr("session not connected");
72
		return nil;
73
	}
74
 
75
	p = z->part;
76
	/* get enough for head size */
77
	size = packetsize(p);
78
	while(size < 2) {
79
		b = packettrailer(p, 2);
80
		assert(b != nil);
81
		if(0) fprint(2, "%d read hdr\n", getpid());
82
		n = read(z->infd, b, 2);
83
		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
84
		if(n==0 || (n<0 && !interrupted()))
85
			goto Err;
86
		size += n;
87
		packettrim(p, 0, size);
88
	}
89
 
90
	if(packetconsume(p, buf, 2) < 0)
91
		goto Err;
92
	len = (buf[0] << 8) | buf[1];
93
	size -= 2;
94
 
95
	while(size < len) {
96
		n = len - size;
97
		if(n > MaxFragSize)
98
			n = MaxFragSize;
99
		b = packettrailer(p, n);
100
		if(0) fprint(2, "%d read body %d\n", getpid(), n);
101
		n = read(z->infd, b, n);
102
		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103
		if(n > 0)
104
			size += n;
105
		packettrim(p, 0, size);
106
		if(n==0 || (n<0 && !interrupted()))
107
			goto Err;
108
	}
109
	ventirecvbytes += len;
110
	ventirecvpackets++;
111
	p = packetsplit(p, len);
112
	vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
113
	return p;
114
Err:	
115
	vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
116
	return nil;	
117
}
118
 
119
/*
120
 * If you fork off two procs running vtrecvproc and vtsendproc,
121
 * then vtrecv/vtsend (and thus vtrpc) will never block except on 
122
 * rendevouses, which is nice when it's running in one thread of many.
123
 */
124
void
125
vtrecvproc(void *v)
126
{
127
	Packet *p;
128
	VtConn *z;
129
	Queue *q;
130
 
131
	z = v;
132
	q = _vtqalloc();
133
 
134
	qlock(&z->lk);
135
	z->readq = q;
136
	qlock(&z->inlk);
137
	rwakeup(&z->rpcfork);
138
	qunlock(&z->lk);
139
 
140
	while((p = _vtrecv(z)) != nil)
141
		if(_vtqsend(q, p) < 0){
142
			packetfree(p);
143
			break;
144
		}
145
	qunlock(&z->inlk);
146
	qlock(&z->lk);
147
	_vtqhangup(q);
148
	while((p = _vtnbqrecv(q)) != nil)
149
		packetfree(p);
150
	_vtqdecref(q);
151
	z->readq = nil;
152
	rwakeup(&z->rpcfork);
153
	qunlock(&z->lk);
154
	vthangup(z);
155
}
156
 
157
void
158
vtsendproc(void *v)
159
{
160
	Queue *q;
161
	Packet *p;
162
	VtConn *z;
163
 
164
	z = v;
165
	q = _vtqalloc();
166
 
167
	qlock(&z->lk);
168
	z->writeq = q;
169
	qlock(&z->outlk);
170
	rwakeup(&z->rpcfork);
171
	qunlock(&z->lk);
172
 
173
	while((p = _vtqrecv(q)) != nil)
174
		if(_vtsend(z, p) < 0)
175
			break;
176
	qunlock(&z->outlk);
177
	qlock(&z->lk);
178
	_vtqhangup(q);
179
	while((p = _vtnbqrecv(q)) != nil)
180
		packetfree(p);
181
	_vtqdecref(q);
182
	z->writeq = nil;
183
	rwakeup(&z->rpcfork);
184
	qunlock(&z->lk);
185
	return;
186
}
187
 
188
Packet*
189
vtrecv(VtConn *z)
190
{
191
	Packet *p;
192
	Queue *q;
193
 
194
	qlock(&z->lk);
195
	if(z->state != VtStateConnected){
196
		werrstr("not connected");
197
		qunlock(&z->lk);
198
		return nil;
199
	}
200
	if(z->readq){
201
		q = _vtqincref(z->readq);
202
		qunlock(&z->lk);
203
		p = _vtqrecv(q);
204
		_vtqdecref(q);
205
		return p;
206
	}
207
 
208
	qlock(&z->inlk);
209
	qunlock(&z->lk);
210
	p = _vtrecv(z);
211
	qunlock(&z->inlk);
212
	if(!p)
213
		vthangup(z);
214
	return p;
215
}
216
 
217
int
218
vtsend(VtConn *z, Packet *p)
219
{
220
	Queue *q;
221
 
222
	qlock(&z->lk);
223
	if(z->state != VtStateConnected){
224
		packetfree(p);
225
		werrstr("not connected");
226
		qunlock(&z->lk);
227
		return -1;
228
	}
229
	if(z->writeq){
230
		q = _vtqincref(z->writeq);
231
		qunlock(&z->lk);
232
		if(_vtqsend(q, p) < 0){
233
			_vtqdecref(q);
234
			packetfree(p);
235
			return -1;
236
		}
237
		_vtqdecref(q);
238
		return 0;
239
	}
240
 
241
	qlock(&z->outlk);
242
	qunlock(&z->lk);
243
	if(_vtsend(z, p) < 0){
244
		qunlock(&z->outlk);
245
		vthangup(z);
246
		return -1;	
247
	}
248
	qunlock(&z->outlk);
249
	return 0;
250
}
251