Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 * Sun RPC client.
3
 */
4
#include <u.h>
5
#include <libc.h>
6
#include <thread.h>
7
#include <sunrpc.h>
8
 
9
typedef struct Out Out;
10
struct Out
11
{
12
	char err[ERRMAX];	/* error string */
13
	Channel *creply;	/* send to finish rpc */
14
	uchar *p;			/* pending request packet */
15
	int n;				/* size of request */
16
	ulong tag;			/* flush tag of pending request */
17
	ulong xid;			/* xid of pending request */
18
	ulong st;			/* first send time */
19
	ulong t;			/* resend time */
20
	int nresend;		/* number of resends */
21
	SunRpc rpc;		/* response rpc */
22
};
23
 
24
static void
25
udpThread(void *v)
26
{
27
	uchar *p, *buf;
28
	Ioproc *io;
29
	int n;
30
	SunClient *cli;
31
	enum { BufSize = 65536 };
32
 
33
	cli = v;
34
	buf = emalloc(BufSize);
35
	io = ioproc();
36
	p = nil;
37
	for(;;){
38
		n = ioread(io, cli->fd, buf, BufSize);
39
		if(n <= 0)
40
			break;
41
		p = emalloc(4+n);
42
		memmove(p+4, buf, n);
43
		p[0] = n>>24;
44
		p[1] = n>>16;
45
		p[2] = n>>8;
46
		p[3] = n;
47
		if(sendp(cli->readchan, p) == 0)
48
			break;
49
		p = nil;
50
	}
51
	free(p);
52
	closeioproc(io);
53
	while(send(cli->dying, nil) == -1)
54
		;
55
}
56
 
57
static void
58
netThread(void *v)
59
{
60
	uchar *p, buf[4];
61
	Ioproc *io;
62
	uint n, tot;
63
	int done;
64
	SunClient *cli;
65
 
66
	cli = v;
67
	io = ioproc();
68
	tot = 0;
69
	p = nil;
70
	for(;;){
71
		n = ioreadn(io, cli->fd, buf, 4);
72
		if(n != 4)
73
			break;
74
		n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75
		if(cli->chatty)
76
			fprint(2, "%.8ux...", n);
77
		done = n&0x80000000;
78
		n &= ~0x80000000;
79
		if(tot == 0){
80
			p = emalloc(4+n);
81
			tot = 4;
82
		}else
83
			p = erealloc(p, tot+n);
84
		if(ioreadn(io, cli->fd, p+tot, n) != n)
85
			break;
86
		tot += n;
87
		if(done){
88
			p[0] = tot>>24;
89
			p[1] = tot>>16;
90
			p[2] = tot>>8;
91
			p[3] = tot;
92
			if(sendp(cli->readchan, p) == 0)
93
				break;
94
			p = nil;
95
			tot = 0;
96
		}
97
	}
98
	free(p);
99
	closeioproc(io);
100
	while(send(cli->dying, 0) == -1)
101
		;
102
}
103
 
104
static void
105
timerThread(void *v)
106
{
107
	Ioproc *io;
108
	SunClient *cli;
109
 
110
	cli = v;
111
	io = ioproc();
112
	for(;;){
113
		if(iosleep(io, 200) < 0)
114
			break;
115
		if(sendul(cli->timerchan, 0) == 0)
116
			break;
117
	}
118
	closeioproc(io);
119
	while(send(cli->dying, 0) == -1)
120
		;
121
}
122
 
123
static ulong
124
msec(void)
125
{
126
	return nsec()/1000000;
127
}
128
 
129
static ulong
130
twait(ulong rtt, int nresend)
131
{
132
	ulong t;
133
 
134
	t = rtt;
135
	if(nresend <= 1)
136
		{}
137
	else if(nresend <= 3)
138
		t *= 2;
139
	else if(nresend <= 18)
140
		t <<= nresend-2;
141
	else
142
		t = 60*1000;
143
	if(t > 60*1000)
144
		t = 60*1000;
145
 
146
	return t;
147
}
148
 
149
static void
150
rpcMuxThread(void *v)
151
{
152
	uchar *buf, *p, *ep;
153
	int i, n, nout, mout;
154
	ulong t, xidgen, tag;
155
	Alt a[5];
156
	Out *o, **out;
157
	SunRpc rpc;
158
	SunClient *cli;
159
 
160
	cli = v;
161
	mout = 16;
162
	nout = 0;
163
	out = emalloc(mout*sizeof(out[0]));
164
	xidgen = truerand();
165
 
166
	a[0].op = CHANRCV;
167
	a[0].c = cli->rpcchan;
168
	a[0].v = &o;
169
	a[1].op = CHANNOP;
170
	a[1].c = cli->timerchan;
171
	a[1].v = nil;
172
	a[2].op = CHANRCV;
173
	a[2].c = cli->flushchan;
174
	a[2].v = &tag;
175
	a[3].op = CHANRCV;
176
	a[3].c = cli->readchan;
177
	a[3].v = &buf;
178
	a[4].op = CHANEND;
179
 
180
	for(;;){
181
		switch(alt(a)){
182
		case 0:	/* o = <-rpcchan */
183
			if(o == nil)
184
				goto Done;
185
			cli->nsend++;
186
			/* set xid */
187
			o->xid = ++xidgen;
188
			if(cli->needcount)
189
				p = o->p+4;
190
			else
191
				p = o->p;
192
			p[0] = xidgen>>24;
193
			p[1] = xidgen>>16;
194
			p[2] = xidgen>>8;
195
			p[3] = xidgen;
196
			if(write(cli->fd, o->p, o->n) != o->n){
197
				free(o->p);
198
				o->p = nil;
199
				snprint(o->err, sizeof o->err, "write: %r");
200
				sendp(o->creply, 0);
201
				break;
202
			}
203
			if(nout >= mout){
204
				mout *= 2;
205
				out = erealloc(out, mout*sizeof(out[0]));
206
			}
207
			o->st = msec();
208
			o->nresend = 0;
209
			o->t = o->st + twait(cli->rtt.avg, 0);
210
if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
211
			out[nout++] = o;
212
			a[1].op = CHANRCV;
213
			break;
214
 
215
		case 1:	/* <-timerchan */
216
			t = msec();
217
			for(i=0; i<nout; i++){
218
				o = out[i];
219
				if((int)(t - o->t) > 0){
220
if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221
					if(cli->maxwait && t - o->st >= cli->maxwait){
222
						free(o->p);
223
						o->p = nil;
224
						strcpy(o->err, "timeout");
225
						sendp(o->creply, 0);
226
						out[i--] = out[--nout];
227
						continue;
228
					}
229
					cli->nresend++;
230
					o->nresend++;
231
					o->t = t + twait(cli->rtt.avg, o->nresend);
232
					if(write(cli->fd, o->p, o->n) != o->n){
233
						free(o->p);
234
						o->p = nil;
235
						snprint(o->err, sizeof o->err, "rewrite: %r");
236
						sendp(o->creply, 0);
237
						out[i--] = out[--nout];
238
						continue;
239
					}
240
				}
241
			}
242
			/* stop ticking if no work; rpcchan will turn it back on */
243
			if(nout == 0)
244
				a[1].op = CHANNOP;
245
			break;
246
 
247
		case 2:	/* tag = <-flushchan */
248
			for(i=0; i<nout; i++){
249
				o = out[i];
250
				if(o->tag == tag){
251
					out[i--] = out[--nout];
252
					strcpy(o->err, "flushed");
253
					free(o->p);
254
					o->p = nil;
255
					sendp(o->creply, 0);
256
				}
257
			}
258
			break;
259
 
260
		case 3:	/* buf = <-readchan */
261
			p = buf;
262
			n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
263
			p += 4;
264
			ep = p+n;
265
			if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
266
				fprint(2, "in: %.*H unpack failed\n", n, buf+4);
267
				free(buf);
268
				break;
269
			}
270
			if(cli->chatty)
271
				fprint(2, "in: %B\n", &rpc);
272
			if(rpc.iscall){
273
				fprint(2, "did not get reply\n");
274
				free(buf);
275
				break;
276
			}
277
			o = nil;
278
			for(i=0; i<nout; i++){
279
				o = out[i];
280
				if(o->xid == rpc.xid)
281
					break;
282
			}
283
			if(i==nout){
284
				if(cli->chatty) fprint(2, "did not find waiting request\n");
285
				free(buf);
286
				break;
287
			}
288
			out[i] = out[--nout];
289
			free(o->p);
290
			o->p = nil;
291
			if(rpc.status == SunSuccess){
292
				o->p = buf;
293
				o->rpc = rpc;
294
			}else{
295
				o->p = nil;
296
				free(buf);
297
				sunErrstr(rpc.status);
298
				rerrstr(o->err, sizeof o->err);
299
			}
300
			sendp(o->creply, 0);
301
			break;
302
		}
303
	}
304
Done:
305
	free(out);
306
	sendp(cli->dying, 0);
307
}
308
 
309
SunClient*
310
sunDial(char *address)
311
{
312
	int fd;
313
	SunClient *cli;
314
 
315
	if((fd = dial(address, 0, 0, 0)) < 0)
316
		return nil;
317
 
318
	cli = emalloc(sizeof(SunClient));
319
	cli->fd = fd;
320
	cli->maxwait = 15000;
321
	cli->rtt.avg = 1000;
322
	cli->dying = chancreate(sizeof(void*), 0);
323
	cli->rpcchan = chancreate(sizeof(Out*), 0);
324
	cli->timerchan = chancreate(sizeof(ulong), 0);
325
	cli->flushchan = chancreate(sizeof(ulong), 0);
326
	cli->readchan = chancreate(sizeof(uchar*), 0);
327
	if(strstr(address, "udp!")){
328
		cli->needcount = 0;
329
		cli->nettid = threadcreate(udpThread, cli, SunStackSize);
330
		cli->timertid = threadcreate(timerThread, cli, SunStackSize);
331
	}else{
332
		cli->needcount = 1;
333
		cli->nettid = threadcreate(netThread, cli, SunStackSize);
334
		/* assume reliable: don't need timer */
335
		/* BUG: netThread should know how to redial */
336
	}
337
	threadcreate(rpcMuxThread, cli, SunStackSize);
338
 
339
	return cli;
340
}
341
 
342
void
343
sunClientClose(SunClient *cli)
344
{
345
	int n;
346
 
347
	/*
348
	 * Threadints get you out of any stuck system calls
349
	 * or thread rendezvouses, but do nothing if the thread
350
	 * is in the ready state.  Keep interrupting until it takes.
351
	 */
352
	n = 0;
353
	if(!cli->timertid)
354
		n++;
355
	while(n < 2){
356
		threadint(cli->nettid);
357
		if(cli->timertid)
358
			threadint(cli->timertid);
359
		yield();
360
		while(nbrecv(cli->dying, nil) == 1)
361
			n++;
362
	}
363
 
364
	sendp(cli->rpcchan, 0);
365
	recvp(cli->dying);
366
 
367
	/* everyone's gone: clean up */
368
	close(cli->fd);
369
	chanfree(cli->flushchan);
370
	chanfree(cli->readchan);
371
	chanfree(cli->timerchan);
372
	free(cli);
373
}
374
 
375
void
376
sunClientFlushRpc(SunClient *cli, ulong tag)
377
{
378
	sendul(cli->flushchan, tag);
379
}
380
 
381
void
382
sunClientProg(SunClient *cli, SunProg *p)
383
{
384
	if(cli->nprog%16 == 0)
385
		cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
386
	cli->prog[cli->nprog++] = p;
387
}
388
 
389
int
390
sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
391
{
392
	uchar *bp, *p, *ep;
393
	int i, n1, n2, n, nn;
394
	Out o;
395
	SunProg *prog;
396
	SunStatus ok;
397
 
398
	for(i=0; i<cli->nprog; i++)
399
		if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
400
			break;
401
	if(i==cli->nprog){
402
		werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
403
		return -1;
404
	}
405
	prog = cli->prog[i];
406
 
407
	if(cli->chatty){
408
		fprint(2, "out: %B\n", &tx->rpc);
409
		fprint(2, "\t%C\n", tx);
410
	}
411
 
412
	n1 = sunRpcSize(&tx->rpc);
413
	n2 = sunCallSize(prog, tx);
414
 
415
	n = n1+n2;
416
	if(cli->needcount)
417
		n += 4;
418
 
419
	bp = emalloc(n);
420
	ep = bp+n;
421
	p = bp;
422
	if(cli->needcount){
423
		nn = n-4;
424
		p[0] = (nn>>24)|0x80;
425
		p[1] = nn>>16;
426
		p[2] = nn>>8;
427
		p[3] = nn;
428
		p += 4;
429
	}
430
	if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
431
	|| (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
432
		sunErrstr(ok);
433
		free(bp);
434
		return -1;
435
	}
436
	if(p != ep){
437
		werrstr("rpc: packet size mismatch");
438
		free(bp);
439
		return -1;
440
	}
441
 
442
	memset(&o, 0, sizeof o);
443
	o.creply = chancreate(sizeof(void*), 0);
444
	o.tag = tag;
445
	o.p = bp;
446
	o.n = n;
447
 
448
	sendp(cli->rpcchan, &o);
449
	recvp(o.creply);
450
	chanfree(o.creply);
451
 
452
	if(o.p == nil){
453
		werrstr("%s", o.err);
454
		return -1;
455
	}
456
 
457
	p = o.rpc.data;
458
	ep = p+o.rpc.ndata;
459
	rx->rpc = o.rpc;
460
	rx->rpc.proc = tx->rpc.proc;
461
	rx->rpc.prog = tx->rpc.prog;
462
	rx->rpc.vers = tx->rpc.vers;
463
	rx->type = (rx->rpc.proc<<1)|1;
464
	if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
465
		sunErrstr(ok);
466
		werrstr("unpack: %r");
467
		free(o.p);
468
		return -1;
469
	}
470
 
471
	if(cli->chatty){
472
		fprint(2, "in: %B\n", &rx->rpc);
473
		fprint(2, "in:\t%C\n", rx);
474
	}
475
 
476
	if(tofree)
477
		*tofree = o.p;
478
	else
479
		free(o.p);
480
 
481
	return 0;
482
}