Subversion Repositories planix.SVN

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include <u.h>
2
#include <libc.h>
3
#include <fcall.h>
4
#include <thread.h>
5
#include <9p.h>
6
#include "dat.h"
7
 
8
int nclient;
9
Client **client;
10
#define Zmsg ((Msg*)~0)
11
char nocmd[] = "";
12
 
13
static void readthread(void*);
14
static void writethread(void*);
15
static void kickwriter(Client*);
16
 
17
int
18
newclient(void)
19
{
20
	int i;
21
	Client *c;
22
 
23
	for(i=0; i<nclient; i++)
24
		if(client[i]->ref==0 && !client[i]->moribund)
25
			return i;
26
 
27
	c = emalloc(sizeof(Client));
28
	c->writerkick = chancreate(sizeof(void*), 1);
29
	c->execpid = chancreate(sizeof(ulong), 0);
30
	c->cmd = nocmd;
31
 
32
	c->readerproc = ioproc();
33
	c->writerproc = ioproc();
34
	c->num = nclient;
35
	if(nclient%16 == 0)
36
		client = erealloc(client, (nclient+16)*sizeof(client[0]));
37
	client[nclient++] = c;
38
	return nclient-1;
39
}
40
 
41
void
42
die(Client *c)
43
{
44
	Msg *m, *next;
45
	Req *r, *rnext;
46
 
47
	c->moribund = 1;
48
	kickwriter(c);
49
	iointerrupt(c->readerproc);
50
	iointerrupt(c->writerproc);
51
	if(--c->activethread == 0){
52
		if(c->cmd != nocmd){
53
			free(c->cmd);
54
			c->cmd = nocmd;
55
		}
56
		c->pid = 0;
57
		c->moribund = 0;
58
		c->status = Closed;
59
		for(m=c->mq; m && m != Zmsg; m=next){
60
			next = m->link;
61
			free(m);
62
		}
63
		c->mq = nil;
64
		if(c->rq != nil){
65
			for(r=c->rq; r; r=rnext){
66
				rnext = r->aux;
67
				respond(r, "hangup");
68
			}
69
			c->rq = nil;
70
		}
71
		if(c->wq != nil){
72
			for(r=c->wq; r; r=rnext){
73
				rnext = r->aux;
74
				respond(r, "hangup");
75
			}
76
			c->wq = nil;
77
		}
78
		c->rq = nil;
79
		c->wq = nil;
80
		c->emq = nil;
81
		c->erq = nil;
82
		c->ewq = nil;
83
	}
84
}
85
 
86
void
87
closeclient(Client *c)
88
{
89
	if(--c->ref == 0){
90
		if(c->pid > 0)
91
			postnote(PNPROC, c->pid, "kill");
92
		c->status = Hangup;
93
		close(c->fd[0]);
94
		c->fd[0] = c->fd[1] = -1;
95
		c->moribund = 1;
96
		kickwriter(c);
97
		iointerrupt(c->readerproc);
98
		iointerrupt(c->writerproc);		
99
		c->activethread++;
100
		die(c);
101
	}
102
}
103
 
104
void
105
queuerdreq(Client *c, Req *r)
106
{
107
	if(c->rq==nil)
108
		c->erq = &c->rq;
109
	*c->erq = r;
110
	r->aux = nil;
111
	c->erq = (Req**)&r->aux;
112
}
113
 
114
void
115
queuewrreq(Client *c, Req *r)
116
{
117
	if(c->wq==nil)
118
		c->ewq = &c->wq;
119
	*c->ewq = r;
120
	r->aux = nil;
121
	c->ewq = (Req**)&r->aux;
122
}
123
 
124
void
125
queuemsg(Client *c, Msg *m)
126
{
127
	if(c->mq==nil)
128
		c->emq = &c->mq;
129
	*c->emq = m;
130
	if(m != Zmsg){
131
		m->link = nil;
132
		c->emq = (Msg**)&m->link;
133
	}else
134
		c->emq = nil;
135
}
136
 
137
void
138
matchmsgs(Client *c)
139
{
140
	Req *r;
141
	Msg *m;
142
	int n, rm;
143
 
144
	while(c->rq && c->mq){
145
		r = c->rq;
146
		c->rq = r->aux;
147
 
148
		rm = 0;
149
		m = c->mq;
150
		if(m == Zmsg){
151
			respond(r, "execnet: no more data");
152
			break;
153
		}
154
		n = r->ifcall.count;
155
		if(n >= m->ep - m->rp){
156
			n = m->ep - m->rp;
157
			c->mq = m->link;
158
			rm = 1;
159
		}
160
		if(n)
161
			memmove(r->ofcall.data, m->rp, n);
162
		if(rm)
163
			free(m);
164
		else
165
			m->rp += n;
166
		r->ofcall.count = n;
167
		respond(r, nil);
168
	}
169
}
170
 
171
void
172
findrdreq(Client *c, Req *r)
173
{
174
	Req **l;
175
 
176
	for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
177
		if(*l == r){
178
			*l = r->aux;
179
			if(*l == nil)
180
				c->erq = l;
181
			respond(r, "flushed");
182
			break;
183
		}
184
	}
185
}
186
 
187
void
188
findwrreq(Client *c, Req *r)
189
{
190
	Req **l;
191
 
192
	for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
193
		if(*l == r){
194
			*l = r->aux;
195
			if(*l == nil)
196
				c->ewq = l;
197
			respond(r, "flushed");
198
			return;
199
		}
200
	}
201
}
202
 
203
void
204
dataread(Req *r, Client *c)
205
{
206
	queuerdreq(c, r);
207
	matchmsgs(c);
208
}
209
 
210
static void
211
readthread(void *a)
212
{
213
	uchar *buf;
214
	int n;
215
	Client *c;
216
	Ioproc *io;
217
	Msg *m;
218
	char tmp[32];
219
 
220
	c = a;
221
	snprint(tmp, sizeof tmp, "read%d", c->num);
222
	threadsetname(tmp);
223
 
224
	buf = emalloc(8192);
225
	io = c->readerproc;
226
	while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
227
		m = emalloc(sizeof(Msg)+n);
228
		m->rp = (uchar*)&m[1];
229
		m->ep = m->rp + n;
230
		if(n)
231
			memmove(m->rp, buf, n);
232
		queuemsg(c, m);
233
		matchmsgs(c);
234
	}
235
	queuemsg(c, Zmsg);
236
	free(buf);
237
	die(c);
238
}
239
 
240
static void
241
kickwriter(Client *c)
242
{
243
	nbsendp(c->writerkick, nil);
244
}
245
 
246
void
247
clientflush(Req *or, Client *c)
248
{
249
	if(or->ifcall.type == Tread)
250
		findrdreq(c, or);
251
	else{
252
		if(c->execreq == or){
253
			c->execreq = nil;
254
			iointerrupt(c->writerproc);
255
		}
256
		findwrreq(c, or);
257
		if(c->curw == or){
258
			c->curw = nil;
259
			iointerrupt(c->writerproc);
260
			kickwriter(c);
261
		}
262
	}
263
}
264
 
265
void
266
datawrite(Req *r, Client *c)
267
{
268
	queuewrreq(c, r);
269
	kickwriter(c);
270
}
271
 
272
static void
273
writethread(void *a)
274
{
275
	char e[ERRMAX];
276
	uchar *buf;
277
	int n;
278
	Ioproc *io;
279
	Req *r;
280
	Client *c;
281
	char tmp[32];
282
 
283
	c = a;
284
	snprint(tmp, sizeof tmp, "write%d", c->num);
285
	threadsetname(tmp);
286
 
287
	buf = emalloc(8192);
288
	io = c->writerproc;
289
	for(;;){
290
		while(c->wq == nil){
291
			if(c->moribund)
292
				goto Out;
293
			recvp(c->writerkick);
294
			if(c->moribund)
295
				goto Out;
296
		}
297
		r = c->wq;
298
		c->wq = r->aux;
299
		c->curw = r;
300
		n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
301
		if(chatty9p)
302
			fprint(2, "io->write returns %d\n", n);
303
		if(n >= 0){
304
			r->ofcall.count = n;
305
			respond(r, nil);
306
		}else{
307
			rerrstr(e, sizeof e);
308
			respond(r, e);
309
		}
310
	}
311
Out:
312
	free(buf);
313
	die(c);
314
}
315
 
316
static void
317
execproc(void *a)
318
{
319
	int i, fd;
320
	Client *c;
321
	char tmp[32];
322
 
323
	c = a;
324
	snprint(tmp, sizeof tmp, "execproc%d", c->num);
325
	threadsetname(tmp);
326
	if(pipe(c->fd) < 0){
327
		rerrstr(c->err, sizeof c->err);
328
		sendul(c->execpid, -1);
329
		return;
330
	}
331
	rfork(RFFDG);
332
	fd = c->fd[1];
333
	close(c->fd[0]);
334
	dup(fd, 0);
335
	dup(fd, 1);
336
	for(i=3; i<100; i++)	/* should do better */
337
		close(i);
338
	strcpy(c->err, "exec failed");
339
	procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
340
}
341
 
342
static void
343
execthread(void *a)
344
{
345
	Client *c;
346
	int p;
347
	char tmp[32];
348
 
349
	c = a;
350
	snprint(tmp, sizeof tmp, "exec%d", c->num);
351
	threadsetname(tmp);
352
	c->execpid = chancreate(sizeof(ulong), 0);
353
	proccreate(execproc, c, STACK);
354
	p = recvul(c->execpid);
355
	chanfree(c->execpid);
356
	c->execpid = nil;
357
	close(c->fd[1]);
358
	c->fd[1] = c->fd[0];
359
	if(p != -1){
360
		c->pid = p;
361
		c->activethread = 2;
362
		threadcreate(readthread, c, STACK);
363
		threadcreate(writethread, c, STACK);
364
		if(c->execreq)
365
			respond(c->execreq, nil);
366
	}else{
367
		if(c->execreq)
368
			respond(c->execreq, c->err);
369
	}
370
}
371
 
372
void
373
ctlwrite(Req *r, Client *c)
374
{
375
	char *f[3], *s, *p;
376
	int nf;
377
 
378
	s = emalloc(r->ifcall.count+1);
379
	memmove(s, r->ifcall.data, r->ifcall.count);
380
	s[r->ifcall.count] = '\0';
381
 
382
	f[0] = s;
383
	p = strchr(s, ' ');
384
	if(p == nil)
385
		nf = 1;
386
	else{
387
		*p++ = '\0';
388
		f[1] = p;
389
		nf = 2;
390
	}
391
 
392
	if(f[0][0] == '\0'){
393
		free(s);
394
		respond(r, nil);
395
		return;
396
	}
397
 
398
	r->ofcall.count = r->ifcall.count;
399
	if(strcmp(f[0], "hangup") == 0){
400
		if(c->pid == 0){
401
			respond(r, "connection already hung up");
402
			goto Out;
403
		}
404
		postnote(PNPROC, c->pid, "kill");
405
		respond(r, nil);
406
		goto Out;
407
	}
408
 
409
	if(strcmp(f[0], "connect") == 0){
410
		if(c->cmd != nocmd){
411
			respond(r, "already have connection");
412
			goto Out;
413
		}
414
		if(nf == 1){
415
			respond(r, "need argument to connect");
416
			goto Out;
417
		}
418
		c->status = Exec;
419
		if(p = strrchr(f[1], '!'))
420
			*p = '\0';
421
		c->cmd = emalloc(4+1+strlen(f[1])+1);
422
		strcpy(c->cmd, "exec ");
423
		strcat(c->cmd, f[1]);
424
		c->execreq = r;
425
		threadcreate(execthread, c, STACK);
426
		goto Out;
427
	}
428
 
429
	respond(r, "bad or inappropriate control message");
430
Out:
431
	free(s);
432
}