Warning: Attempt to read property "date" on null in /usr/local/www/websvn.planix.org/blame.php on line 247

Warning: Attempt to read property "msg" on null in /usr/local/www/websvn.planix.org/blame.php on line 247
WebSVN – planix.SVN – Blame – /os/branches/feature_fixcpp/sys/src/cmd/cwfs/net.c – Rev 2

Subversion Repositories planix.SVN

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/* network i/o */
2
 
3
#include "all.h"
4
#include "io.h"
5
#include <fcall.h>		/* 9p2000 */
6
#include <thread.h>
7
 
8
enum {
9
	Maxfdata	= 8192,
10
	Nqueue		= 200,		/* queue size (tunable) */
11
 
12
	Netclosed	= 0,		/* Connection state */
13
	Netopen,
14
};
15
 
16
/*
17
 * the kernel file server read packets directly from
18
 * its ethernet(s) and did all the protocol processing.
19
 * if the incoming packets were 9p (over il/ip), they
20
 * were queued for the server processes to operate upon.
21
 *
22
 * in user mode, we have one process per incoming connection
23
 * instead, and those processes get just the data, minus
24
 * tcp and ip headers, so they just see a stream of 9p messages,
25
 * which they then queue for the server processes.
26
 *
27
 * there used to be more queueing (in the kernel), with separate
28
 * processes for ethernet input, il input, 9p processing, il output
29
 * and ethernet output, and queues connecting them.  we now let
30
 * the kernel's network queues, protocol stacks and processes do
31
 * much of this work.
32
 *
33
 * partly as a result of this, we can now process 9p messages
34
 * transported via tcp, exploit multiple x86 processors, and
35
 * were able to shed 70% of the file server's source, by line count.
36
 *
37
 * the upshot is that Ether (now Network) is no longer a perfect fit for
38
 * the way network i/o is done now.  the notion of `connection'
39
 * is being introduced to complement it.
40
 */
41
 
42
typedef struct Network Network;
43
typedef struct Netconn Netconn;
44
typedef struct Conn9p Conn9p;
45
 
46
/* a network, not necessarily an ethernet */
47
struct Network {
48
	int	ctlrno;
49
	char	iname[NAMELEN];
50
	char	oname[NAMELEN];
51
 
52
	char	*dialstr;
53
	char	anndir[40];
54
	char	lisdir[40];
55
	int	annfd;			/* fd from announce */
56
};
57
 
58
/* an open tcp (or other transport) connection */
59
struct Netconn {
60
	Queue*	reply;		/* network output */
61
	char*	raddr;		/* remote caller's addr */
62
	Chan*	chan;		/* list of tcp channels */
63
 
64
	int	alloc;		/* flag: allocated */
65
 
66
	int	state;
67
	Conn9p*	conn9p;		/* not reference-counted */
68
 
69
	Lock;
70
};
71
 
72
/*
73
 * incoming 9P network connection from a given machine.
74
 * typically will multiplex 9P sessions for multiple users.
75
 */
76
struct Conn9p {
77
	QLock;
78
	Ref;
79
	int	fd;
80
	char*	dir;
81
	Netconn*netconn;		/* cross-connection */
82
	char*	raddr;
83
};
84
 
85
static Network netif[Maxnets];
86
static struct {
87
	Lock;
88
	Chan*	chan;
89
} netchans;
90
static Queue *netoq;		/* only one network output queue is needed */
91
 
92
char *annstrs[Maxnets] = {
93
	"tcp!*!9fs",
94
};
95
 
96
/* never returns nil */
97
static Chan*
98
getchan(Conn9p *conn9p)
99
{
100
	Netconn *netconn;
101
	Chan *cp, *xcp;
102
 
103
	lock(&netchans);
104
 
105
	/* look for conn9p's Chan */
106
	xcp = nil;
107
	for(cp = netchans.chan; cp; cp = netconn->chan) {
108
		netconn = cp->pdata;
109
		if(!netconn->alloc)
110
			xcp = cp;		/* remember free Chan */
111
		else if(netconn->raddr != nil &&
112
		    strcmp(conn9p->raddr, netconn->raddr) == 0) {
113
			unlock(&netchans);
114
			return cp;		/* found conn9p's Chan */
115
		}
116
	}
117
 
118
	/* conn9p's Chan not found; if no free Chan, allocate & fill in one */
119
	cp = xcp;
120
	if(cp == nil) {
121
		cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
122
		netconn = cp->pdata;
123
		netconn->chan = netchans.chan;
124
		netconn->state = Netopen;	/* a guess */
125
		/* cross-connect netconn and conn9p */
126
		netconn->conn9p = conn9p;	/* not reference-counted */
127
		conn9p->netconn = netconn;
128
		netchans.chan = cp;
129
	}
130
 
131
	/* fill in Chan's netconn */
132
	netconn = cp->pdata;
133
	netconn->raddr = strdup(conn9p->raddr);
134
 
135
	/* fill in Chan */
136
	cp->send = serveq;
137
	if (cp->reply == nil)
138
		cp->reply = netoq;
139
	netconn->reply = netoq;
140
	cp->protocol = nil;
141
	cp->msize = 0;
142
	cp->whotime = 0;
143
	strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
144
//	cp->whoprint = tcpwhoprint;
145
	netconn->alloc = 1;
146
 
147
	unlock(&netchans);
148
	return cp;
149
}
150
 
151
static char *
152
fd2name(int fd)
153
{
154
	char data[128];
155
 
156
	if (fd2path(fd, data, sizeof data) < 0)
157
		return strdup("/GOK");
158
	return strdup(data);
159
}
160
 
161
static void
162
hangupdfd(int dfd)
163
{
164
	int ctlfd;
165
	char *end, *data;
166
 
167
	data = fd2name(dfd);
168
	close(dfd);
169
 
170
	end = strstr(data, "/data");
171
	if (end != nil)
172
		strcpy(end, "/ctl");
173
	ctlfd = open(data, OWRITE);
174
	if (ctlfd >= 0) {
175
		hangup(ctlfd);
176
		close(ctlfd);
177
	}
178
	free(data);
179
}
180
 
181
void
182
closechan(int n)
183
{
184
	Chan *cp;
185
 
186
	for(cp = chans; cp; cp = cp->next)
187
		if(cp->whotime != 0 && cp->chan == n)
188
			fileinit(cp);
189
}
190
 
191
void
192
nethangup(Chan *cp, char *msg, int dolock)
193
{
194
	Netconn *netconn;
195
 
196
	netconn = cp->pdata;
197
	netconn->state = Netclosed;
198
 
199
	if(msg != nil)
200
		print("hangup! %s %s\n", msg, netconn->raddr);
201
 
202
	fileinit(cp);
203
	cp->whotime = 0;
204
	strcpy(cp->whoname, "<none>");
205
 
206
	if(dolock)
207
		lock(&netchans);
208
	netconn->alloc = 0;
209
	free(netconn->raddr);
210
	netconn->raddr = nil;
211
	if(dolock)
212
		unlock(&netchans);
213
}
214
 
215
void
216
chanhangup(Chan *cp, char *msg, int dolock)
217
{
218
	Netconn *netconn = cp->pdata;
219
	Conn9p *conn9p = netconn->conn9p;
220
 
221
	if (conn9p->fd > 0)
222
		hangupdfd(conn9p->fd);	/* drop it */
223
	nethangup(cp, msg, dolock);
224
}
225
 
226
/*
227
 * returns length of next 9p message (including the length) and
228
 * leaves it in the first few bytes of abuf.
229
 */
230
static long
231
size9pmsg(int fd, void *abuf, uint n)
232
{
233
	int m;
234
	uchar *buf = abuf;
235
 
236
	if (n < BIT32SZ)
237
		return -1;	/* caller screwed up */
238
 
239
	/* read count */
240
	m = readn(fd, buf, BIT32SZ);
241
	if(m != BIT32SZ){
242
		if(m < 0)
243
			return -1;
244
		return 0;
245
	}
246
	return GBIT32(buf);
247
}
248
 
249
static int
250
readalloc9pmsg(int fd, Msgbuf **mbp)
251
{
252
	int m, len;
253
	uchar lenbuf[BIT32SZ];
254
	Msgbuf *mb;
255
 
256
	*mbp = nil;
257
	len = size9pmsg(fd, lenbuf, BIT32SZ);
258
	if (len <= 0)
259
		return len;
260
	if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
261
		werrstr("bad length in 9P2000 message header");
262
		return -1;
263
	}
264
	if ((mb = mballoc(len, nil, Mbeth1)) == nil)
265
		panic("readalloc9pmsg: mballoc failed");
266
	*mbp = mb;
267
	memmove(mb->data, lenbuf, BIT32SZ);
268
	len -= BIT32SZ;
269
	m = readn(fd, mb->data+BIT32SZ, len);
270
	if(m < len)
271
		return 0;
272
	return BIT32SZ+m;
273
}
274
 
275
static void
276
connection(void *v)
277
{
278
	int n;
279
	char buf[64];
280
	Chan *chan9p;
281
	Conn9p *conn9p = v;
282
	Msgbuf *mb;
283
	NetConnInfo *nci;
284
 
285
	incref(conn9p);			/* count connections */
286
	nci = getnetconninfo(conn9p->dir, conn9p->fd);
287
	if (nci == nil)
288
		panic("connection: getnetconninfo(%s, %d) failed",
289
			conn9p->dir, conn9p->fd);
290
	conn9p->raddr = nci->raddr;
291
 
292
	chan9p = getchan(conn9p);
293
	print("new connection on %s pid %d from %s\n",
294
		conn9p->dir, getpid(), conn9p->raddr);
295
 
296
	/*
297
	 * reading from a pipe or a network device
298
	 * will give an error after a few eof reads.
299
	 * however, we cannot tell the difference
300
	 * between a zero-length read and an interrupt
301
	 * on the processes writing to us,
302
	 * so we wait for the error.
303
	 */
304
	while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
305
		if(n == 0)
306
			continue;
307
		mb->param = (uintptr)conn9p;	/* has fd for replies */
308
		mb->chan = chan9p;
309
 
310
		assert(mb->magic == Mbmagic);
311
		incref(conn9p);			/* & count packets in flight */
312
		fs_send(serveq, mb);		/* to 9P server processes */
313
		/* mb will be freed by receiving process */
314
	}
315
 
316
	rerrstr(buf, sizeof buf);
317
 
318
	qlock(conn9p);
319
	print("connection hung up from %s\n", conn9p->dir);
320
	if (conn9p->fd > 0)		/* not poisoned yet? */
321
		hangupdfd(conn9p->fd);	/* poison the fd */
322
 
323
	nethangup(chan9p, "remote hung up", 1);
324
	closechan(chan9p->chan);
325
 
326
	conn9p->fd = -1;		/* poison conn9p */
327
	if (decref(conn9p) == 0) {	/* last conn.? turn the lights off */
328
		free(conn9p->dir);
329
		qunlock(conn9p);
330
		free(conn9p);
331
	} else
332
		qunlock(conn9p);
333
 
334
	freenetconninfo(nci);
335
 
336
	if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
337
		exits("");
338
	sysfatal("mount read, pid %d", getpid());
339
}
340
 
341
static void
342
neti(void *v)
343
{
344
	int lisfd, accfd;
345
	Network *net;
346
	Conn9p *conn9p;
347
 
348
	net = v;
349
	print("net%di\n", net->ctlrno);
350
	for(;;) {
351
		lisfd = listen(net->anndir, net->lisdir);
352
		if (lisfd < 0) {
353
			print("listen %s failed: %r\n", net->anndir);
354
			continue;
355
		}
356
 
357
		/* got new call on lisfd */
358
		accfd = accept(lisfd, net->lisdir);
359
		if (accfd < 0) {
360
			print("accept %d (from %s) failed: %r\n",
361
				lisfd, net->lisdir);
362
			continue;
363
		}
364
 
365
		/* accepted that call */
366
		conn9p = malloc(sizeof *conn9p);
367
		conn9p->dir = strdup(net->lisdir);
368
		conn9p->fd = accfd;
369
		newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
370
		close(lisfd);
371
	}
372
}
373
 
374
/* only need one of these for all network connections, thus all interfaces */
375
static void
376
neto(void *)
377
{
378
	int len, datafd;
379
	Msgbuf *mb;
380
	Conn9p *conn9p;
381
 
382
	print("neto\n");
383
	for(;;) {
384
		/* receive 9P answer from 9P server processes */
385
		while((mb = fs_recv(netoq, 0)) == nil)
386
			continue;
387
 
388
		if(mb->data == nil) {
389
			print("neto: pkt nil cat=%d free=%d\n",
390
				mb->category, mb->flags&FREE);
391
			if(!(mb->flags & FREE))
392
				mbfree(mb);
393
			continue;
394
		}
395
 
396
		/* send answer back over the network connection in the reply */
397
		len = mb->count;
398
		conn9p = (Conn9p *)mb->param;
399
		assert(conn9p);
400
 
401
		qlock(conn9p);
402
		datafd = conn9p->fd;
403
		assert(len >= 0);
404
		/* datafd < 0 probably indicates poisoning by the read side */
405
		if (datafd < 0 || write(datafd, mb->data, len) != len) {
406
			print( "network write error (%r);");
407
			print(" closing connection for %s\n", conn9p->dir);
408
			nethangup(getchan(conn9p), "network write error", 1);
409
			if (datafd > 0)
410
				hangupdfd(datafd);	/* drop it */
411
			conn9p->fd = -1;		/* poison conn9p */
412
		}
413
		mbfree(mb);
414
		if (decref(conn9p) == 0)
415
			panic("neto: zero ref count");
416
		qunlock(conn9p);
417
	}
418
}
419
 
420
void
421
netstart(void)
422
{
423
	int netorun = 0;
424
	Network *net;
425
 
426
	if(netoq == nil)
427
		netoq = newqueue(Nqueue, "network reply");
428
	for(net = &netif[0]; net < &netif[Maxnets]; net++){
429
		if(net->dialstr == nil)
430
			continue;
431
		sprint(net->oname, "neto");
432
		if (netorun++ == 0)
433
			newproc(neto, nil, net->oname);
434
		sprint(net->iname, "net%di", net->ctlrno);
435
		newproc(neti, net, net->iname);
436
	}
437
}
438
 
439
void
440
netinit(void)
441
{
442
	Network *net;
443
 
444
	for (net = netif; net < netif + Maxnets; net++) {
445
		net->dialstr = annstrs[net - netif];
446
		if (net->dialstr == nil)
447
			continue;
448
		net->annfd = announce(net->dialstr, net->anndir);
449
		/* /bin/service/tcp564 may already have grabbed the port */
450
		if (net->annfd < 0)
451
			sysfatal("can't announce %s: %r", net->dialstr);
452
		print("netinit: announced on %s\n", net->dialstr);
453
	}
454
}