Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include	"u.h"
2
#include	"../port/lib.h"
3
#include	"mem.h"
4
#include	"dat.h"
5
#include	"fns.h"
6
#include	"../port/error.h"
7
 
8
#include	"netif.h"
9
 
10
typedef struct Pipe	Pipe;
11
struct Pipe
12
{
13
	QLock;
14
	Pipe	*next;
15
	int	ref;
16
	ulong	path;
17
	long	perm;
18
	Queue	*q[2];
19
	int	qref[2];
20
};
21
 
22
struct
23
{
24
	Lock;
25
	ulong	path;
26
} pipealloc;
27
 
28
enum
29
{
30
	Qdir,
31
	Qdata0,
32
	Qdata1,
33
};
34
 
35
Dirtab pipedir[] =
36
{
37
	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
38
	"data",		{Qdata0},	0,		0600,
39
	"data1",	{Qdata1},	0,		0600,
40
};
41
#define NPIPEDIR 3
42
 
43
static void
44
pipeinit(void)
45
{
46
	if(conf.pipeqsize == 0){
47
		if(conf.nmach > 1)
48
			conf.pipeqsize = 256*1024;
49
		else
50
			conf.pipeqsize = 32*1024;
51
	}
52
}
53
 
54
/*
55
 *  create a pipe, no streams are created until an open
56
 */
57
static Chan*
58
pipeattach(char *spec)
59
{
60
	Pipe *p;
61
	Chan *c;
62
 
63
	c = devattach('|', spec);
64
	p = malloc(sizeof(Pipe));
65
	if(p == 0)
66
		exhausted("memory");
67
	p->ref = 1;
68
 
69
	p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
70
	if(p->q[0] == 0){
71
		free(p);
72
		exhausted("memory");
73
	}
74
	p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
75
	if(p->q[1] == 0){
76
		free(p->q[0]);
77
		free(p);
78
		exhausted("memory");
79
	}
80
 
81
	lock(&pipealloc);
82
	p->path = ++pipealloc.path;
83
	unlock(&pipealloc);
84
	p->perm = pipedir[Qdata0].perm;
85
 
86
	mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
87
	c->aux = p;
88
	c->dev = 0;
89
	return c;
90
}
91
 
92
static int
93
pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
94
{
95
	Qid q;
96
	int len;
97
	Pipe *p;
98
 
99
	if(i == DEVDOTDOT){
100
		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
101
		return 1;
102
	}
103
	i++;	/* skip . */
104
	if(tab==0 || i>=ntab)
105
		return -1;
106
 
107
	tab += i;
108
	p = c->aux;
109
	switch((ulong)tab->qid.path){
110
	case Qdata0:
111
		len = qlen(p->q[0]);
112
		break;
113
	case Qdata1:
114
		len = qlen(p->q[1]);
115
		break;
116
	default:
117
		len = tab->length;
118
		break;
119
	}
120
	mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
121
	devdir(c, q, tab->name, len, eve, p->perm, dp);
122
	return 1;
123
}
124
 
125
 
126
static Walkqid*
127
pipewalk(Chan *c, Chan *nc, char **name, int nname)
128
{
129
	Walkqid *wq;
130
	Pipe *p;
131
 
132
	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
133
	if(wq != nil && wq->clone != nil && wq->clone != c){
134
		p = c->aux;
135
		qlock(p);
136
		p->ref++;
137
		if(c->flag & COPEN){
138
			print("channel open in pipewalk\n");
139
			switch(NETTYPE(c->qid.path)){
140
			case Qdata0:
141
				p->qref[0]++;
142
				break;
143
			case Qdata1:
144
				p->qref[1]++;
145
				break;
146
			}
147
		}
148
		qunlock(p);
149
	}
150
	return wq;
151
}
152
 
153
static int
154
pipestat(Chan *c, uchar *db, int n)
155
{
156
	Pipe *p;
157
	Dir dir;
158
 
159
	p = c->aux;
160
 
161
	switch(NETTYPE(c->qid.path)){
162
	case Qdir:
163
		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
164
		break;
165
	case Qdata0:
166
		devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
167
		break;
168
	case Qdata1:
169
		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
170
		break;
171
	default:
172
		panic("pipestat");
173
	}
174
	n = convD2M(&dir, db, n);
175
	if(n < BIT16SZ)
176
		error(Eshortstat);
177
	return n;
178
}
179
 
180
static int
181
pipewstat(Chan* c, uchar* db, int n)
182
{
183
	int m;
184
	Dir *dir;
185
	Pipe *p;
186
 
187
	p = c->aux;
188
	if(strcmp(up->user, eve) != 0)
189
		error(Eperm);
190
	if(NETTYPE(c->qid.path) == Qdir)
191
		error(Eisdir);
192
 
193
	dir = smalloc(sizeof(Dir)+n);
194
	if(waserror()){
195
		free(dir);
196
		nexterror();
197
	}
198
	m = convM2D(db, n, &dir[0], (char*)&dir[1]);
199
	if(m == 0)
200
		error(Eshortstat);
201
	if(!emptystr(dir[0].uid))
202
		error("can't change owner");
203
	if(dir[0].mode != ~0UL)
204
		p->perm = dir[0].mode;
205
	poperror();
206
	free(dir);
207
	return m;
208
}
209
 
210
/*
211
 *  if the stream doesn't exist, create it
212
 */
213
static Chan*
214
pipeopen(Chan *c, int omode)
215
{
216
	Pipe *p;
217
 
218
	if(c->qid.type & QTDIR){
219
		if(omode != OREAD)
220
			error(Ebadarg);
221
		c->mode = omode;
222
		c->flag |= COPEN;
223
		c->offset = 0;
224
		return c;
225
	}
226
 
227
	p = c->aux;
228
	qlock(p);
229
	switch(NETTYPE(c->qid.path)){
230
	case Qdata0:
231
		p->qref[0]++;
232
		break;
233
	case Qdata1:
234
		p->qref[1]++;
235
		break;
236
	}
237
	qunlock(p);
238
 
239
	c->mode = openmode(omode);
240
	c->flag |= COPEN;
241
	c->offset = 0;
242
	c->iounit = qiomaxatomic;
243
	return c;
244
}
245
 
246
static void
247
pipeclose(Chan *c)
248
{
249
	Pipe *p;
250
 
251
	p = c->aux;
252
	qlock(p);
253
 
254
	if(c->flag & COPEN){
255
		/*
256
		 *  closing either side hangs up the stream
257
		 */
258
		switch(NETTYPE(c->qid.path)){
259
		case Qdata0:
260
			p->qref[0]--;
261
			if(p->qref[0] == 0){
262
				qhangup(p->q[1], 0);
263
				qclose(p->q[0]);
264
			}
265
			break;
266
		case Qdata1:
267
			p->qref[1]--;
268
			if(p->qref[1] == 0){
269
				qhangup(p->q[0], 0);
270
				qclose(p->q[1]);
271
			}
272
			break;
273
		}
274
	}
275
 
276
 
277
	/*
278
	 *  if both sides are closed, they are reusable
279
	 */
280
	if(p->qref[0] == 0 && p->qref[1] == 0){
281
		qreopen(p->q[0]);
282
		qreopen(p->q[1]);
283
	}
284
 
285
	/*
286
	 *  free the structure on last close
287
	 */
288
	p->ref--;
289
	if(p->ref == 0){
290
		qunlock(p);
291
		free(p->q[0]);
292
		free(p->q[1]);
293
		free(p);
294
	} else
295
		qunlock(p);
296
}
297
 
298
static long
299
piperead(Chan *c, void *va, long n, vlong)
300
{
301
	Pipe *p;
302
 
303
	p = c->aux;
304
 
305
	switch(NETTYPE(c->qid.path)){
306
	case Qdir:
307
		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
308
	case Qdata0:
309
		return qread(p->q[0], va, n);
310
	case Qdata1:
311
		return qread(p->q[1], va, n);
312
	default:
313
		panic("piperead");
314
	}
315
	return -1;	/* not reached */
316
}
317
 
318
static Block*
319
pipebread(Chan *c, long n, ulong offset)
320
{
321
	Pipe *p;
322
 
323
	p = c->aux;
324
 
325
	switch(NETTYPE(c->qid.path)){
326
	case Qdata0:
327
		return qbread(p->q[0], n);
328
	case Qdata1:
329
		return qbread(p->q[1], n);
330
	}
331
 
332
	return devbread(c, n, offset);
333
}
334
 
335
/*
336
 *  a write to a closed pipe causes a note to be sent to
337
 *  the process.
338
 */
339
static long
340
pipewrite(Chan *c, void *va, long n, vlong)
341
{
342
	Pipe *p;
343
 
344
	if(!islo())
345
		print("pipewrite hi %#p\n", getcallerpc(&c));
346
	if(waserror()) {
347
		/* avoid notes when pipe is a mounted queue */
348
		if((c->flag & CMSG) == 0)
349
			postnote(up, 1, "sys: write on closed pipe", NUser);
350
		nexterror();
351
	}
352
 
353
	p = c->aux;
354
 
355
	switch(NETTYPE(c->qid.path)){
356
	case Qdata0:
357
		n = qwrite(p->q[1], va, n);
358
		break;
359
 
360
	case Qdata1:
361
		n = qwrite(p->q[0], va, n);
362
		break;
363
 
364
	default:
365
		panic("pipewrite");
366
	}
367
 
368
	poperror();
369
	return n;
370
}
371
 
372
static long
373
pipebwrite(Chan *c, Block *bp, ulong)
374
{
375
	long n;
376
	Pipe *p;
377
 
378
	if(waserror()) {
379
		/* avoid notes when pipe is a mounted queue */
380
		if((c->flag & CMSG) == 0)
381
			postnote(up, 1, "sys: write on closed pipe", NUser);
382
		nexterror();
383
	}
384
 
385
	p = c->aux;
386
	switch(NETTYPE(c->qid.path)){
387
	case Qdata0:
388
		n = qbwrite(p->q[1], bp);
389
		break;
390
 
391
	case Qdata1:
392
		n = qbwrite(p->q[0], bp);
393
		break;
394
 
395
	default:
396
		n = 0;
397
		panic("pipebwrite");
398
	}
399
 
400
	poperror();
401
	return n;
402
}
403
 
404
Dev pipedevtab = {
405
	'|',
406
	"pipe",
407
 
408
	devreset,
409
	pipeinit,
410
	devshutdown,
411
	pipeattach,
412
	pipewalk,
413
	pipestat,
414
	pipeopen,
415
	devcreate,
416
	pipeclose,
417
	piperead,
418
	pipebread,
419
	pipewrite,
420
	pipebwrite,
421
	devremove,
422
	pipewstat,
423
};