Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include <u.h>
2
#include <libc.h>
3
#include <thread.h>
4
#include "threadimpl.h"
5
 
6
/* Value to indicate the channel is closed */
7
enum {
8
	CHANCLOSD = 0xc105ed,
9
};
10
 
11
static char errcl[] = "channel was closed";
12
static Lock chanlock;		/* central channel access lock */
13
 
14
static void enqueue(Alt*, Channel**);
15
static void dequeue(Alt*);
16
static int canexec(Alt*);
17
static int altexec(Alt*, int);
18
 
19
#define Closed	((void*)CHANCLOSD)
20
#define Intred	((void*)~0)		/* interrupted */
21
 
22
static void
23
_chanfree(Channel *c)
24
{
25
	int i, inuse;
26
 
27
	if(c->closed == 1)			/* chanclose is ongoing */
28
		inuse = 1;
29
	else{
30
		inuse = 0;
31
		for(i = 0; i < c->nentry; i++)	/* alt ongoing */
32
			if(c->qentry[i])
33
				inuse = 1;
34
	}
35
	if(inuse)
36
		c->freed = 1;
37
	else{
38
		if(c->qentry)
39
			free(c->qentry);
40
		free(c);
41
	}
42
}
43
 
44
void
45
chanfree(Channel *c)
46
{
47
	lock(&chanlock);
48
	_chanfree(c);
49
	unlock(&chanlock);
50
}
51
 
52
int
53
chaninit(Channel *c, int elemsize, int elemcnt)
54
{
55
	if(elemcnt < 0 || elemsize <= 0 || c == nil)
56
		return -1;
57
	c->f = 0;
58
	c->n = 0;
59
	c->closed = 0;
60
	c->freed = 0;
61
	c->e = elemsize;
62
	c->s = elemcnt;
63
	_threaddebug(DBGCHAN, "chaninit %p", c);
64
	return 1;
65
}
66
 
67
Channel*
68
chancreate(int elemsize, int elemcnt)
69
{
70
	Channel *c;
71
 
72
	if(elemcnt < 0 || elemsize <= 0)
73
		return nil;
74
	c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
75
	c->e = elemsize;
76
	c->s = elemcnt;
77
	_threaddebug(DBGCHAN, "chancreate %p", c);
78
	return c;
79
}
80
 
81
static int
82
isopenfor(Channel *c, int op)
83
{
84
	return c->closed == 0 || (op == CHANRCV && c->n > 0);
85
}
86
 
87
int
88
alt(Alt *alts)
89
{
90
	Alt *a, *xa, *ca;
91
	Channel volatile *c;
92
	int n, s, waiting, allreadycl;
93
	void* r;
94
	Thread *t;
95
 
96
	/*
97
	 * The point of going splhi here is that note handlers
98
	 * might reasonably want to use channel operations,
99
	 * but that will hang if the note comes while we hold the
100
	 * chanlock.  Instead, we delay the note until we've dropped
101
	 * the lock.
102
	 */
103
	t = _threadgetproc()->thread;
104
	if(t->moribund || _threadexitsallstatus)
105
		yield();	/* won't return */
106
	s = _procsplhi();
107
	lock(&chanlock);
108
	t->alt = alts;
109
	t->chan = Chanalt;
110
 
111
	/* test whether any channels can proceed */
112
	n = 0;
113
	a = nil;
114
 
115
	for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
116
		xa->entryno = -1;
117
		if(xa->op == CHANNOP)
118
			continue;
119
 
120
		c = xa->c;
121
		if(c==nil){
122
			unlock(&chanlock);
123
			_procsplx(s);
124
			t->chan = Channone;
125
			return -1;
126
		}
127
 
128
		if(isopenfor(c, xa->op) && canexec(xa))
129
			if(nrand(++n) == 0)
130
				a = xa;
131
	}
132
 
133
 
134
	if(a==nil){
135
		/* nothing can proceed */
136
		if(xa->op == CHANNOBLK){
137
			unlock(&chanlock);
138
			_procsplx(s);
139
			t->chan = Channone;
140
			if(xa->op == CHANNOBLK)
141
				return xa - alts;
142
		}
143
 
144
		/* enqueue on all channels open for us. */
145
		c = nil;
146
		ca = nil;
147
		waiting = 0;
148
		allreadycl = 0;
149
		for(xa=alts; xa->op!=CHANEND; xa++)
150
			if(xa->op==CHANNOP)
151
				continue;
152
			else if(isopenfor(xa->c, xa->op)){
153
				waiting = 1;
154
				enqueue(xa, &c);
155
			} else if(xa->err != errcl)
156
				ca = xa;
157
			else
158
				allreadycl = 1;
159
 
160
		if(waiting == 0)
161
			if(ca != nil){
162
				/* everything was closed, select last channel */
163
				ca->err = errcl;
164
				unlock(&chanlock);
165
				_procsplx(s);
166
				t->chan = Channone;
167
				return ca - alts;
168
			} else if(allreadycl){
169
				/* everything was already closed */
170
				unlock(&chanlock);
171
				_procsplx(s);
172
				t->chan = Channone;
173
				return -1;
174
			}
175
		/*
176
		 * wait for successful rendezvous.
177
		 * we can't just give up if the rendezvous
178
		 * is interrupted -- someone else might come
179
		 * along and try to rendezvous with us, so
180
		 * we need to be here.
181
		 * if the channel was closed, the op is done
182
		 * and we flag an error for the entry.
183
		 */
184
	    Again:
185
		unlock(&chanlock);
186
		_procsplx(s);
187
		r = _threadrendezvous(&c, 0);
188
		s = _procsplhi();
189
		lock(&chanlock);
190
 
191
		if(r==Intred){		/* interrupted */
192
			if(c!=nil)	/* someone will meet us; go back */
193
				goto Again;
194
			c = (Channel*)~0;	/* so no one tries to meet us */
195
		}
196
 
197
		/* dequeue from channels, find selected one */
198
		a = nil;
199
		for(xa=alts; xa->op!=CHANEND; xa++){
200
			if(xa->op==CHANNOP)
201
				continue;
202
			if(xa->c == c){
203
				a = xa;
204
				a->err = nil;
205
				if(r == Closed)
206
					a->err = errcl;
207
			}
208
			dequeue(xa);
209
		}
210
		unlock(&chanlock);
211
		_procsplx(s);
212
		if(a == nil){	/* we were interrupted */
213
			assert(c==(Channel*)~0);
214
			return -1;
215
		}
216
	}else
217
		altexec(a, s);	/* unlocks chanlock, does splx */
218
	_sched();
219
	t->chan = Channone;
220
	return a - alts;
221
}
222
 
223
int
224
chanclose(Channel *c)
225
{
226
	Alt *a;
227
	int i, s;
228
 
229
	s = _procsplhi();	/* note handlers; see :/^alt */
230
	lock(&chanlock);
231
	if(c->closed){
232
		/* Already close; we fail but it's ok. don't print */
233
		unlock(&chanlock);
234
		_procsplx(s);
235
		return -1;
236
	}
237
	c->closed = 1;		/* Being closed */
238
	/*
239
	 * Locate entries that will fail due to close
240
	 * (send, and receive if nothing buffered) and wake them up.
241
	 * the situation cannot change because all queries
242
	 * should be committed by now and new ones will find the channel
243
	 * closed.  We still need to take the lock during the iteration
244
	 * because we can wake threads on qentrys we have not seen yet
245
	 * as in alt and there would be a race in the access to *a.
246
	 */
247
	for(i = 0; i < c->nentry; i++){
248
		if((a = c->qentry[i]) == nil || *a->tag != nil)
249
			continue;
250
 
251
		if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
252
			continue;
253
		*a->tag = c;
254
		unlock(&chanlock);
255
		_procsplx(s);
256
		while(_threadrendezvous(a->tag, Closed) == Intred)
257
			;
258
		s = _procsplhi();
259
		lock(&chanlock);
260
	}
261
 
262
	c->closed = 2;		/* Fully closed */
263
	if(c->freed)
264
		_chanfree(c);
265
	unlock(&chanlock);
266
	_procsplx(s);
267
	return 0;
268
}
269
 
270
int
271
chanclosing(Channel *c)
272
{
273
	int n, s;
274
 
275
	s = _procsplhi();	/* note handlers; see :/^alt */
276
	lock(&chanlock);
277
	if(c->closed == 0)
278
		n = -1;
279
	else
280
		n = c->n;
281
	unlock(&chanlock);
282
	_procsplx(s);
283
	return n;
284
}
285
 
286
/*
287
 * superseded by chanclosing
288
int
289
chanisclosed(Channel *c)
290
{
291
	return chanisclosing(c) >= 0;
292
}
293
 */
294
 
295
static int
296
runop(int op, Channel *c, void *v, int nb)
297
{
298
	int r;
299
	Alt a[2];
300
 
301
	/*
302
	 * we could do this without calling alt,
303
	 * but the only reason would be performance,
304
	 * and i'm not convinced it matters.
305
	 */
306
	a[0].op = op;
307
	a[0].c = c;
308
	a[0].v = v;
309
	a[0].err = nil;
310
	a[1].op = CHANEND;
311
	if(nb)
312
		a[1].op = CHANNOBLK;
313
	switch(r=alt(a)){
314
	case -1:	/* interrupted */
315
		return -1;
316
	case 1:	/* nonblocking, didn't accomplish anything */
317
		assert(nb);
318
		return 0;
319
	case 0:
320
		/*
321
		 * Okay, but return -1 if the op is done because of a close.
322
		 */
323
		if(a[0].err != nil)
324
			return -1;
325
		return 1;
326
	default:
327
		fprint(2, "ERROR: channel alt returned %d\n", r);
328
		abort();
329
		return -1;
330
	}
331
}
332
 
333
int
334
recv(Channel *c, void *v)
335
{
336
	return runop(CHANRCV, c, v, 0);
337
}
338
 
339
int
340
nbrecv(Channel *c, void *v)
341
{
342
	return runop(CHANRCV, c, v, 1);
343
}
344
 
345
int
346
send(Channel *c, void *v)
347
{
348
	return runop(CHANSND, c, v, 0);
349
}
350
 
351
int
352
nbsend(Channel *c, void *v)
353
{
354
	return runop(CHANSND, c, v, 1);
355
}
356
 
357
static void
358
channelsize(Channel *c, int sz)
359
{
360
	if(c->e != sz){
361
		fprint(2, "expected channel with elements of size %d, got size %d\n",
362
			sz, c->e);
363
		abort();
364
	}
365
}
366
 
367
int
368
sendul(Channel *c, ulong v)
369
{
370
	channelsize(c, sizeof(ulong));
371
	return send(c, &v);
372
}
373
 
374
ulong
375
recvul(Channel *c)
376
{
377
	ulong v;
378
 
379
	channelsize(c, sizeof(ulong));
380
	if(recv(c, &v) < 0)
381
		return ~0;
382
	return v;
383
}
384
 
385
int
386
sendp(Channel *c, void *v)
387
{
388
	channelsize(c, sizeof(void*));
389
	return send(c, &v);
390
}
391
 
392
void*
393
recvp(Channel *c)
394
{
395
	void *v;
396
 
397
	channelsize(c, sizeof(void*));
398
	if(recv(c, &v) < 0)
399
		return nil;
400
	return v;
401
}
402
 
403
int
404
nbsendul(Channel *c, ulong v)
405
{
406
	channelsize(c, sizeof(ulong));
407
	return nbsend(c, &v);
408
}
409
 
410
ulong
411
nbrecvul(Channel *c)
412
{
413
	ulong v;
414
 
415
	channelsize(c, sizeof(ulong));
416
	if(nbrecv(c, &v) == 0)
417
		return 0;
418
	return v;
419
}
420
 
421
int
422
nbsendp(Channel *c, void *v)
423
{
424
	channelsize(c, sizeof(void*));
425
	return nbsend(c, &v);
426
}
427
 
428
void*
429
nbrecvp(Channel *c)
430
{
431
	void *v;
432
 
433
	channelsize(c, sizeof(void*));
434
	if(nbrecv(c, &v) == 0)
435
		return nil;
436
	return v;
437
}
438
 
439
static int
440
emptyentry(Channel *c)
441
{
442
	int i, extra;
443
 
444
	assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
445
 
446
	for(i=0; i<c->nentry; i++)
447
		if(c->qentry[i]==nil)
448
			return i;
449
 
450
	extra = 16;
451
	c->nentry += extra;
452
	c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
453
	if(c->qentry == nil)
454
		sysfatal("realloc channel entries: %r");
455
	memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
456
	return i;
457
}
458
 
459
static void
460
enqueue(Alt *a, Channel **c)
461
{
462
	int i;
463
 
464
	_threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
465
	a->tag = c;
466
	i = emptyentry(a->c);
467
	a->c->qentry[i] = a;
468
}
469
 
470
static void
471
dequeue(Alt *a)
472
{
473
	int i;
474
	Channel *c;
475
 
476
	c = a->c;
477
	for(i=0; i<c->nentry; i++)
478
		if(c->qentry[i]==a){
479
			_threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
480
			c->qentry[i] = nil;
481
			/* release if freed and not closing */
482
			if(c->freed && c->closed != 1)
483
				_chanfree(c);
484
			return;
485
		}
486
}
487
 
488
static int
489
canexec(Alt *a)
490
{
491
	int i, otherop;
492
	Channel *c;
493
 
494
	c = a->c;
495
	/* are there senders or receivers blocked? */
496
	otherop = (CHANSND+CHANRCV) - a->op;
497
	for(i=0; i<c->nentry; i++)
498
		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
499
			_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
500
			return 1;
501
		}
502
 
503
	/* is there room in the channel? */
504
	if((a->op==CHANSND && c->n < c->s)
505
	|| (a->op==CHANRCV && c->n > 0)){
506
		_threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
507
		return 1;
508
	}
509
 
510
	return 0;
511
}
512
 
513
static void*
514
altexecbuffered(Alt *a, int willreplace)
515
{
516
	uchar *v;
517
	Channel *c;
518
 
519
	c = a->c;
520
	/* use buffered channel queue */
521
	if(a->op==CHANRCV && c->n > 0){
522
		_threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
523
		v = c->v + c->e*(c->f%c->s);
524
		if(!willreplace)
525
			c->n--;
526
		c->f++;
527
		return v;
528
	}
529
	if(a->op==CHANSND && c->n < c->s){
530
		_threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
531
		v = c->v + c->e*((c->f+c->n)%c->s);
532
		if(!willreplace)
533
			c->n++;
534
		return v;
535
	}
536
	abort();
537
	return nil;
538
}
539
 
540
static void
541
altcopy(void *dst, void *src, int sz)
542
{
543
	if(dst){
544
		if(src)
545
			memmove(dst, src, sz);
546
		else
547
			memset(dst, 0, sz);
548
	}
549
}
550
 
551
static int
552
altexec(Alt *a, int spl)
553
{
554
	volatile Alt *b;
555
	int i, n, otherop;
556
	Channel *c;
557
	void *me, *waiter, *buf;
558
 
559
	c = a->c;
560
 
561
	/* rendezvous with others */
562
	otherop = (CHANSND+CHANRCV) - a->op;
563
	n = 0;
564
	b = nil;
565
	me = a->v;
566
	for(i=0; i<c->nentry; i++)
567
		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
568
			if(nrand(++n) == 0)
569
				b = c->qentry[i];
570
	if(b != nil){
571
		_threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
572
		waiter = b->v;
573
		if(c->s && c->n){
574
			/*
575
			 * if buffer is full and there are waiters
576
			 * and we're meeting a waiter,
577
			 * we must be receiving.
578
			 *
579
			 * we use the value in the channel buffer,
580
			 * copy the waiter's value into the channel buffer
581
			 * on behalf of the waiter, and then wake the waiter.
582
			 */
583
			if(a->op!=CHANRCV)
584
				abort();
585
			buf = altexecbuffered(a, 1);
586
			altcopy(me, buf, c->e);
587
			altcopy(buf, waiter, c->e);
588
		}else{
589
			if(a->op==CHANRCV)
590
				altcopy(me, waiter, c->e);
591
			else
592
				altcopy(waiter, me, c->e);
593
		}
594
		*b->tag = c;	/* commits us to rendezvous */
595
		_threaddebug(DBGCHAN, "unlocking the chanlock");
596
		unlock(&chanlock);
597
		_procsplx(spl);
598
		_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
599
		while(_threadrendezvous(b->tag, 0) == Intred)
600
			;
601
		return 1;
602
	}
603
 
604
	buf = altexecbuffered(a, 0);
605
	if(a->op==CHANRCV)
606
		altcopy(me, buf, c->e);
607
	else
608
		altcopy(buf, me, c->e);
609
 
610
	unlock(&chanlock);
611
	_procsplx(spl);
612
	return 1;
613
}