Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 * Multiplexed Venti client.  It would be nice if we 
3
 * could turn this into a generic library routine rather
4
 * than keep it Venti specific.  A user-level 9P client
5
 * could use something like this too.
6
 * 
7
 * (Actually it does - this should be replaced with libmux,
8
 * which should be renamed librpcmux.)
9
 *
10
 * This is a little more complicated than it might be
11
 * because we want it to work well within and without libthread.
12
 *
13
 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14
 */
15
 
16
#include <u.h>
17
#include <libc.h>
18
#include <venti.h>
19
 
20
typedef struct Rwait Rwait;
21
struct Rwait
22
{
23
	Rendez r;
24
	Packet *p;
25
	int done;
26
	int sleeping;
27
};
28
 
29
static int gettag(VtConn*, Rwait*);
30
static void puttag(VtConn*, Rwait*, int);
31
static void muxrpc(VtConn*, Packet*);
32
 
33
Packet*
34
_vtrpc(VtConn *z, Packet *p, VtFcall *tx)
35
{
36
	int i;
37
	uchar tag, buf[2], *top;
38
	Rwait *r, *rr;
39
 
40
	if(z == nil){
41
		werrstr("not connected");
42
		packetfree(p);
43
		return nil;
44
	}
45
 
46
	/* must malloc because stack could be private */
47
	r = vtmallocz(sizeof(Rwait));
48
 
49
	qlock(&z->lk);
50
	r->r.l = &z->lk;
51
	tag = gettag(z, r);
52
	if(tx){
53
		/* vtfcallrpc can't print packet because it doesn't have tag */
54
		tx->tag = tag;
55
		if(chattyventi)
56
			fprint(2, "%s -> %F\n", argv0, tx);
57
	}
58
 
59
	/* slam tag into packet */
60
	top = packetpeek(p, buf, 0, 2);
61
	if(top == nil){
62
		packetfree(p);
63
		return nil;
64
	}
65
	if(top == buf){
66
		werrstr("first two bytes must be in same packet fragment");
67
		packetfree(p);
68
		vtfree(r);
69
		return nil;
70
	}
71
	top[1] = tag;
72
	qunlock(&z->lk);
73
	if(vtsend(z, p) < 0){
74
		vtfree(r);
75
		return nil;
76
	}
77
 
78
	qlock(&z->lk);
79
	/* wait for the muxer to give us our packet */
80
	r->sleeping = 1;
81
	z->nsleep++;
82
	while(z->muxer && !r->done)
83
		rsleep(&r->r);
84
	z->nsleep--;
85
	r->sleeping = 0;
86
 
87
	/* if not done, there's no muxer: start muxing */
88
	if(!r->done){
89
		if(z->muxer)
90
			abort();
91
		z->muxer = 1;
92
		while(!r->done){
93
			qunlock(&z->lk);
94
			if((p = vtrecv(z)) == nil){
95
				werrstr("unexpected eof on venti connection");
96
				z->muxer = 0;
97
				vtfree(r);
98
				return nil;
99
			}
100
			qlock(&z->lk);
101
			muxrpc(z, p);
102
		}
103
		z->muxer = 0;
104
		/* if there is anyone else sleeping, wake first unfinished to mux */
105
		if(z->nsleep)
106
		for(i=0; i<256; i++){
107
			rr = z->wait[i];
108
			if(rr && rr->sleeping && !rr->done){
109
				rwakeup(&rr->r);
110
				break;
111
			}
112
		}
113
	}
114
 
115
	p = r->p;
116
	puttag(z, r, tag);
117
	vtfree(r);
118
	qunlock(&z->lk);
119
	return p;
120
}
121
 
122
Packet*
123
vtrpc(VtConn *z, Packet *p)
124
{
125
	return _vtrpc(z, p, nil);
126
}
127
 
128
static int 
129
gettag(VtConn *z, Rwait *r)
130
{
131
	int i;
132
 
133
Again:
134
	while(z->ntag == 256)
135
		rsleep(&z->tagrend);
136
	for(i=0; i<256; i++)
137
		if(z->wait[i] == 0){
138
			z->ntag++;
139
			z->wait[i] = r;
140
			return i;
141
		}
142
	fprint(2, "libventi: ntag botch\n");
143
	goto Again;
144
}
145
 
146
static void
147
puttag(VtConn *z, Rwait *r, int tag)
148
{
149
	assert(z->wait[tag] == r);
150
	z->wait[tag] = nil;
151
	z->ntag--;
152
	rwakeup(&z->tagrend);
153
}
154
 
155
static void
156
muxrpc(VtConn *z, Packet *p)
157
{
158
	uchar tag, buf[2], *top;
159
	Rwait *r;
160
 
161
	if((top = packetpeek(p, buf, 0, 2)) == nil){
162
		fprint(2, "libventi: short packet in vtrpc\n");
163
		packetfree(p);
164
		return;
165
	}
166
 
167
	tag = top[1];
168
	if((r = z->wait[tag]) == nil){
169
		fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
170
abort();
171
		packetfree(p);
172
		return;
173
	}
174
 
175
	r->p = p;
176
	r->done = 1;
177
	rwakeup(&r->r);
178
}
179