Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include	"u.h"
2
#include	"lib.h"
3
#include	"dat.h"
4
#include	"fns.h"
5
#include	"error.h"
6
 
7
static ulong padblockcnt;
8
static ulong concatblockcnt;
9
static ulong pullupblockcnt;
10
static ulong copyblockcnt;
11
static ulong consumecnt;
12
static ulong producecnt;
13
static ulong qcopycnt;
14
 
15
static int debugging;
16
 
17
#define QDEBUG	if(0)
18
 
19
/*
20
 *  IO queues
21
 */
22
struct Queue
23
{
24
	Lock lk;
25
 
26
	Block*	bfirst;		/* buffer */
27
	Block*	blast;
28
 
29
	int	len;		/* bytes allocated to queue */
30
	int	dlen;		/* data bytes in queue */
31
	int	limit;		/* max bytes in queue */
32
	int	inilim;		/* initial limit */
33
	int	state;
34
	int	noblock;	/* true if writes return immediately when q full */
35
	int	eof;		/* number of eofs read by user */
36
 
37
	void	(*kick)(void*);	/* restart output */
38
	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
39
	void*	arg;		/* argument to kick */
40
 
41
	QLock	rlock;		/* mutex for reading processes */
42
	Rendez	rr;		/* process waiting to read */
43
	QLock	wlock;		/* mutex for writing processes */
44
	Rendez	wr;		/* process waiting to write */
45
 
46
	char	err[ERRMAX];
47
};
48
 
49
enum
50
{
51
	Maxatomic	= 64*1024,
52
};
53
 
54
uint	qiomaxatomic = Maxatomic;
55
 
56
void
57
ixsummary(void)
58
{
59
	debugging ^= 1;
60
	iallocsummary();
61
	print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
62
		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
63
	print("consume %lud, produce %lud, qcopy %lud\n",
64
		consumecnt, producecnt, qcopycnt);
65
}
66
 
67
/*
68
 *  free a list of blocks
69
 */
70
void
71
freeblist(Block *b)
72
{
73
	Block *next;
74
 
75
	for(; b != 0; b = next){
76
		next = b->next;
77
		b->next = 0;
78
		freeb(b);
79
	}
80
}
81
 
82
/*
83
 *  pad a block to the front (or the back if size is negative)
84
 */
85
Block*
86
padblock(Block *bp, int size)
87
{
88
	int n;
89
	Block *nbp;
90
 
91
	QDEBUG checkb(bp, "padblock 1");
92
	if(size >= 0){
93
		if(bp->rp - bp->base >= size){
94
			bp->rp -= size;
95
			return bp;
96
		}
97
 
98
		if(bp->next)
99
			panic("padblock 0x%p", getcallerpc(&bp));
100
		n = BLEN(bp);
101
		padblockcnt++;
102
		nbp = allocb(size+n);
103
		nbp->rp += size;
104
		nbp->wp = nbp->rp;
105
		memmove(nbp->wp, bp->rp, n);
106
		nbp->wp += n;
107
		freeb(bp);
108
		nbp->rp -= size;
109
	} else {
110
		size = -size;
111
 
112
		if(bp->next)
113
			panic("padblock 0x%p", getcallerpc(&bp));
114
 
115
		if(bp->lim - bp->wp >= size)
116
			return bp;
117
 
118
		n = BLEN(bp);
119
		padblockcnt++;
120
		nbp = allocb(size+n);
121
		memmove(nbp->wp, bp->rp, n);
122
		nbp->wp += n;
123
		freeb(bp);
124
	}
125
	QDEBUG checkb(nbp, "padblock 1");
126
	return nbp;
127
}
128
 
129
/*
130
 *  return count of bytes in a string of blocks
131
 */
132
int
133
blocklen(Block *bp)
134
{
135
	int len;
136
 
137
	len = 0;
138
	while(bp) {
139
		len += BLEN(bp);
140
		bp = bp->next;
141
	}
142
	return len;
143
}
144
 
145
/*
146
 * return count of space in blocks
147
 */
148
int
149
blockalloclen(Block *bp)
150
{
151
	int len;
152
 
153
	len = 0;
154
	while(bp) {
155
		len += BALLOC(bp);
156
		bp = bp->next;
157
	}
158
	return len;
159
}
160
 
161
/*
162
 *  copy the  string of blocks into
163
 *  a single block and free the string
164
 */
165
Block*
166
concatblock(Block *bp)
167
{
168
	int len;
169
	Block *nb, *f;
170
 
171
	if(bp->next == 0)
172
		return bp;
173
 
174
	nb = allocb(blocklen(bp));
175
	for(f = bp; f; f = f->next) {
176
		len = BLEN(f);
177
		memmove(nb->wp, f->rp, len);
178
		nb->wp += len;
179
	}
180
	concatblockcnt += BLEN(nb);
181
	freeblist(bp);
182
	QDEBUG checkb(nb, "concatblock 1");
183
	return nb;
184
}
185
 
186
/*
187
 *  make sure the first block has at least n bytes
188
 */
189
Block*
190
pullupblock(Block *bp, int n)
191
{
192
	int i;
193
	Block *nbp;
194
 
195
	/*
196
	 *  this should almost always be true, it's
197
	 *  just to avoid every caller checking.
198
	 */
199
	if(BLEN(bp) >= n)
200
		return bp;
201
 
202
	/*
203
	 *  if not enough room in the first block,
204
	 *  add another to the front of the list.
205
	 */
206
	if(bp->lim - bp->rp < n){
207
		nbp = allocb(n);
208
		nbp->next = bp;
209
		bp = nbp;
210
	}
211
 
212
	/*
213
	 *  copy bytes from the trailing blocks into the first
214
	 */
215
	n -= BLEN(bp);
216
	while((nbp = bp->next)){
217
		i = BLEN(nbp);
218
		if(i > n) {
219
			memmove(bp->wp, nbp->rp, n);
220
			pullupblockcnt++;
221
			bp->wp += n;
222
			nbp->rp += n;
223
			QDEBUG checkb(bp, "pullupblock 1");
224
			return bp;
225
		} else {
226
			/* shouldn't happen but why crash if it does */
227
			if(i < 0){
228
				print("pullup negative length packet\n");
229
				i = 0;
230
			}
231
			memmove(bp->wp, nbp->rp, i);
232
			pullupblockcnt++;
233
			bp->wp += i;
234
			bp->next = nbp->next;
235
			nbp->next = 0;
236
			freeb(nbp);
237
			n -= i;
238
			if(n == 0){
239
				QDEBUG checkb(bp, "pullupblock 2");
240
				return bp;
241
			}
242
		}
243
	}
244
	freeb(bp);
245
	return 0;
246
}
247
 
248
/*
249
 *  make sure the first block has at least n bytes
250
 */
251
Block*
252
pullupqueue(Queue *q, int n)
253
{
254
	Block *b;
255
 
256
	if(BLEN(q->bfirst) >= n)
257
		return q->bfirst;
258
	q->bfirst = pullupblock(q->bfirst, n);
259
	for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
260
		;
261
	q->blast = b;
262
	return q->bfirst;
263
}
264
 
265
/*
266
 *  trim to len bytes starting at offset
267
 */
268
Block *
269
trimblock(Block *bp, int offset, int len)
270
{
271
	ulong l;
272
	Block *nb, *startb;
273
 
274
	QDEBUG checkb(bp, "trimblock 1");
275
	if(blocklen(bp) < offset+len) {
276
		freeblist(bp);
277
		return nil;
278
	}
279
 
280
	while((l = BLEN(bp)) < offset) {
281
		offset -= l;
282
		nb = bp->next;
283
		bp->next = nil;
284
		freeb(bp);
285
		bp = nb;
286
	}
287
 
288
	startb = bp;
289
	bp->rp += offset;
290
 
291
	while((l = BLEN(bp)) < len) {
292
		len -= l;
293
		bp = bp->next;
294
	}
295
 
296
	bp->wp -= (BLEN(bp) - len);
297
 
298
	if(bp->next) {
299
		freeblist(bp->next);
300
		bp->next = nil;
301
	}
302
 
303
	return startb;
304
}
305
 
306
/*
307
 *  copy 'count' bytes into a new block
308
 */
309
Block*
310
copyblock(Block *bp, int count)
311
{
312
	int l;
313
	Block *nbp;
314
 
315
	QDEBUG checkb(bp, "copyblock 0");
316
	nbp = allocb(count);
317
	for(; count > 0 && bp != 0; bp = bp->next){
318
		l = BLEN(bp);
319
		if(l > count)
320
			l = count;
321
		memmove(nbp->wp, bp->rp, l);
322
		nbp->wp += l;
323
		count -= l;
324
	}
325
	if(count > 0){
326
		memset(nbp->wp, 0, count);
327
		nbp->wp += count;
328
	}
329
	copyblockcnt++;
330
	QDEBUG checkb(nbp, "copyblock 1");
331
 
332
	return nbp;
333
}
334
 
335
Block*
336
adjustblock(Block* bp, int len)
337
{
338
	int n;
339
	Block *nbp;
340
 
341
	if(len < 0){
342
		freeb(bp);
343
		return nil;
344
	}
345
 
346
	if(bp->rp+len > bp->lim){
347
		nbp = copyblock(bp, len);
348
		freeblist(bp);
349
		QDEBUG checkb(nbp, "adjustblock 1");
350
 
351
		return nbp;
352
	}
353
 
354
	n = BLEN(bp);
355
	if(len > n)
356
		memset(bp->wp, 0, len-n);
357
	bp->wp = bp->rp+len;
358
	QDEBUG checkb(bp, "adjustblock 2");
359
 
360
	return bp;
361
}
362
 
363
 
364
/*
365
 *  throw away up to count bytes from a
366
 *  list of blocks.  Return count of bytes
367
 *  thrown away.
368
 */
369
int
370
pullblock(Block **bph, int count)
371
{
372
	Block *bp;
373
	int n, bytes;
374
 
375
	bytes = 0;
376
	if(bph == nil)
377
		return 0;
378
 
379
	while(*bph != nil && count != 0) {
380
		bp = *bph;
381
		n = BLEN(bp);
382
		if(count < n)
383
			n = count;
384
		bytes += n;
385
		count -= n;
386
		bp->rp += n;
387
		QDEBUG checkb(bp, "pullblock ");
388
		if(BLEN(bp) == 0) {
389
			*bph = bp->next;
390
			bp->next = nil;
391
			freeb(bp);
392
		}
393
	}
394
	return bytes;
395
}
396
 
397
/*
398
 *  get next block from a queue, return null if nothing there
399
 */
400
Block*
401
qget(Queue *q)
402
{
403
	int dowakeup;
404
	Block *b;
405
 
406
	/* sync with qwrite */
407
	ilock(&q->lk);
408
 
409
	b = q->bfirst;
410
	if(b == nil){
411
		q->state |= Qstarve;
412
		iunlock(&q->lk);
413
		return nil;
414
	}
415
	q->bfirst = b->next;
416
	b->next = 0;
417
	q->len -= BALLOC(b);
418
	q->dlen -= BLEN(b);
419
	QDEBUG checkb(b, "qget");
420
 
421
	/* if writer flow controlled, restart */
422
	if((q->state & Qflow) && q->len < q->limit/2){
423
		q->state &= ~Qflow;
424
		dowakeup = 1;
425
	} else
426
		dowakeup = 0;
427
 
428
	iunlock(&q->lk);
429
 
430
	if(dowakeup)
431
		wakeup(&q->wr);
432
 
433
	return b;
434
}
435
 
436
/*
437
 *  throw away the next 'len' bytes in the queue
438
 */
439
int
440
qdiscard(Queue *q, int len)
441
{
442
	Block *b;
443
	int dowakeup, n, sofar;
444
 
445
	ilock(&q->lk);
446
	for(sofar = 0; sofar < len; sofar += n){
447
		b = q->bfirst;
448
		if(b == nil)
449
			break;
450
		QDEBUG checkb(b, "qdiscard");
451
		n = BLEN(b);
452
		if(n <= len - sofar){
453
			q->bfirst = b->next;
454
			b->next = 0;
455
			q->len -= BALLOC(b);
456
			q->dlen -= BLEN(b);
457
			freeb(b);
458
		} else {
459
			n = len - sofar;
460
			b->rp += n;
461
			q->dlen -= n;
462
		}
463
	}
464
 
465
	/*
466
	 *  if writer flow controlled, restart
467
	 *
468
	 *  This used to be
469
	 *	q->len < q->limit/2
470
	 *  but it slows down tcp too much for certain write sizes.
471
	 *  I really don't understand it completely.  It may be
472
	 *  due to the queue draining so fast that the transmission
473
	 *  stalls waiting for the app to produce more data.  - presotto
474
	 */
475
	if((q->state & Qflow) && q->len < q->limit){
476
		q->state &= ~Qflow;
477
		dowakeup = 1;
478
	} else
479
		dowakeup = 0;
480
 
481
	iunlock(&q->lk);
482
 
483
	if(dowakeup)
484
		wakeup(&q->wr);
485
 
486
	return sofar;
487
}
488
 
489
/*
490
 *  Interrupt level copy out of a queue, return # bytes copied.
491
 */
492
int
493
qconsume(Queue *q, void *vp, int len)
494
{
495
	Block *b;
496
	int n, dowakeup;
497
	uchar *p = vp;
498
	Block *tofree = nil;
499
 
500
	/* sync with qwrite */
501
	ilock(&q->lk);
502
 
503
	for(;;) {
504
		b = q->bfirst;
505
		if(b == 0){
506
			q->state |= Qstarve;
507
			iunlock(&q->lk);
508
			return -1;
509
		}
510
		QDEBUG checkb(b, "qconsume 1");
511
 
512
		n = BLEN(b);
513
		if(n > 0)
514
			break;
515
		q->bfirst = b->next;
516
		q->len -= BALLOC(b);
517
 
518
		/* remember to free this */
519
		b->next = tofree;
520
		tofree = b;
521
	};
522
 
523
	if(n < len)
524
		len = n;
525
	memmove(p, b->rp, len);
526
	consumecnt += n;
527
	b->rp += len;
528
	q->dlen -= len;
529
 
530
	/* discard the block if we're done with it */
531
	if((q->state & Qmsg) || len == n){
532
		q->bfirst = b->next;
533
		b->next = 0;
534
		q->len -= BALLOC(b);
535
		q->dlen -= BLEN(b);
536
 
537
		/* remember to free this */
538
		b->next = tofree;
539
		tofree = b;
540
	}
541
 
542
	/* if writer flow controlled, restart */
543
	if((q->state & Qflow) && q->len < q->limit/2){
544
		q->state &= ~Qflow;
545
		dowakeup = 1;
546
	} else
547
		dowakeup = 0;
548
 
549
	iunlock(&q->lk);
550
 
551
	if(dowakeup)
552
		wakeup(&q->wr);
553
 
554
	if(tofree != nil)
555
		freeblist(tofree);
556
 
557
	return len;
558
}
559
 
560
int
561
qpass(Queue *q, Block *b)
562
{
563
	int dlen, len, dowakeup;
564
 
565
	/* sync with qread */
566
	dowakeup = 0;
567
	ilock(&q->lk);
568
	if(q->len >= q->limit){
569
		freeblist(b);
570
		iunlock(&q->lk);
571
		return -1;
572
	}
573
	if(q->state & Qclosed){
574
		freeblist(b);
575
		iunlock(&q->lk);
576
		return BALLOC(b);
577
	}
578
 
579
	/* add buffer to queue */
580
	if(q->bfirst)
581
		q->blast->next = b;
582
	else
583
		q->bfirst = b;
584
	len = BALLOC(b);
585
	dlen = BLEN(b);
586
	QDEBUG checkb(b, "qpass");
587
	while(b->next){
588
		b = b->next;
589
		QDEBUG checkb(b, "qpass");
590
		len += BALLOC(b);
591
		dlen += BLEN(b);
592
	}
593
	q->blast = b;
594
	q->len += len;
595
	q->dlen += dlen;
596
 
597
	if(q->len >= q->limit/2)
598
		q->state |= Qflow;
599
 
600
	if(q->state & Qstarve){
601
		q->state &= ~Qstarve;
602
		dowakeup = 1;
603
	}
604
	iunlock(&q->lk);
605
 
606
	if(dowakeup)
607
		wakeup(&q->rr);
608
 
609
	return len;
610
}
611
 
612
int
613
qpassnolim(Queue *q, Block *b)
614
{
615
	int dlen, len, dowakeup;
616
 
617
	/* sync with qread */
618
	dowakeup = 0;
619
	ilock(&q->lk);
620
 
621
	if(q->state & Qclosed){
622
		freeblist(b);
623
		iunlock(&q->lk);
624
		return BALLOC(b);
625
	}
626
 
627
	/* add buffer to queue */
628
	if(q->bfirst)
629
		q->blast->next = b;
630
	else
631
		q->bfirst = b;
632
	len = BALLOC(b);
633
	dlen = BLEN(b);
634
	QDEBUG checkb(b, "qpass");
635
	while(b->next){
636
		b = b->next;
637
		QDEBUG checkb(b, "qpass");
638
		len += BALLOC(b);
639
		dlen += BLEN(b);
640
	}
641
	q->blast = b;
642
	q->len += len;
643
	q->dlen += dlen;
644
 
645
	if(q->len >= q->limit/2)
646
		q->state |= Qflow;
647
 
648
	if(q->state & Qstarve){
649
		q->state &= ~Qstarve;
650
		dowakeup = 1;
651
	}
652
	iunlock(&q->lk);
653
 
654
	if(dowakeup)
655
		wakeup(&q->rr);
656
 
657
	return len;
658
}
659
 
660
/*
661
 *  if the allocated space is way out of line with the used
662
 *  space, reallocate to a smaller block
663
 */
664
Block*
665
packblock(Block *bp)
666
{
667
	Block **l, *nbp;
668
	int n;
669
 
670
	for(l = &bp; *l; l = &(*l)->next){
671
		nbp = *l;
672
		n = BLEN(nbp);
673
		if((n<<2) < BALLOC(nbp)){
674
			*l = allocb(n);
675
			memmove((*l)->wp, nbp->rp, n);
676
			(*l)->wp += n;
677
			(*l)->next = nbp->next;
678
			freeb(nbp);
679
		}
680
	}
681
 
682
	return bp;
683
}
684
 
685
int
686
qproduce(Queue *q, void *vp, int len)
687
{
688
	Block *b;
689
	int dowakeup;
690
	uchar *p = vp;
691
 
692
	/* sync with qread */
693
	dowakeup = 0;
694
	ilock(&q->lk);
695
 
696
	/* no waiting receivers, room in buffer? */
697
	if(q->len >= q->limit){
698
		q->state |= Qflow;
699
		iunlock(&q->lk);
700
		return -1;
701
	}
702
 
703
	/* save in buffer */
704
	b = iallocb(len);
705
	if(b == 0){
706
		iunlock(&q->lk);
707
		return 0;
708
	}
709
	memmove(b->wp, p, len);
710
	producecnt += len;
711
	b->wp += len;
712
	if(q->bfirst)
713
		q->blast->next = b;
714
	else
715
		q->bfirst = b;
716
	q->blast = b;
717
	/* b->next = 0; done by iallocb() */
718
	q->len += BALLOC(b);
719
	q->dlen += BLEN(b);
720
	QDEBUG checkb(b, "qproduce");
721
 
722
	if(q->state & Qstarve){
723
		q->state &= ~Qstarve;
724
		dowakeup = 1;
725
	}
726
 
727
	if(q->len >= q->limit)
728
		q->state |= Qflow;
729
	iunlock(&q->lk);
730
 
731
	if(dowakeup)
732
		wakeup(&q->rr);
733
 
734
	return len;
735
}
736
 
737
/*
738
 *  copy from offset in the queue
739
 */
740
Block*
741
qcopy(Queue *q, int len, ulong offset)
742
{
743
	int sofar;
744
	int n;
745
	Block *b, *nb;
746
	uchar *p;
747
 
748
	nb = allocb(len);
749
 
750
	ilock(&q->lk);
751
 
752
	/* go to offset */
753
	b = q->bfirst;
754
	for(sofar = 0; ; sofar += n){
755
		if(b == nil){
756
			iunlock(&q->lk);
757
			return nb;
758
		}
759
		n = BLEN(b);
760
		if(sofar + n > offset){
761
			p = b->rp + offset - sofar;
762
			n -= offset - sofar;
763
			break;
764
		}
765
		QDEBUG checkb(b, "qcopy");
766
		b = b->next;
767
	}
768
 
769
	/* copy bytes from there */
770
	for(sofar = 0; sofar < len;){
771
		if(n > len - sofar)
772
			n = len - sofar;
773
		memmove(nb->wp, p, n);
774
		qcopycnt += n;
775
		sofar += n;
776
		nb->wp += n;
777
		b = b->next;
778
		if(b == nil)
779
			break;
780
		n = BLEN(b);
781
		p = b->rp;
782
	}
783
	iunlock(&q->lk);
784
 
785
	return nb;
786
}
787
 
788
/*
789
 *  called by non-interrupt code
790
 */
791
Queue*
792
qopen(int limit, int msg, void (*kick)(void*), void *arg)
793
{
794
	Queue *q;
795
 
796
	q = malloc(sizeof(Queue));
797
	if(q == 0)
798
		return 0;
799
 
800
	q->limit = q->inilim = limit;
801
	q->kick = kick;
802
	q->arg = arg;
803
	q->state = msg;
804
 
805
	q->state |= Qstarve;
806
	q->eof = 0;
807
	q->noblock = 0;
808
 
809
	return q;
810
}
811
 
812
/* open a queue to be bypassed */
813
Queue*
814
qbypass(void (*bypass)(void*, Block*), void *arg)
815
{
816
	Queue *q;
817
 
818
	q = malloc(sizeof(Queue));
819
	if(q == 0)
820
		return 0;
821
 
822
	q->limit = 0;
823
	q->arg = arg;
824
	q->bypass = bypass;
825
	q->state = 0;
826
 
827
	return q;
828
}
829
 
830
static int
831
notempty(void *a)
832
{
833
	Queue *q = a;
834
 
835
	return (q->state & Qclosed) || q->bfirst != 0;
836
}
837
 
838
/*
839
 *  wait for the queue to be non-empty or closed.
840
 *  called with q ilocked.
841
 */
842
static int
843
qwait(Queue *q)
844
{
845
	/* wait for data */
846
	for(;;){
847
		if(q->bfirst != nil)
848
			break;
849
 
850
		if(q->state & Qclosed){
851
			if(++q->eof > 3)
852
				return -1;
853
			if(*q->err && strcmp(q->err, Ehungup) != 0)
854
				return -1;
855
			return 0;
856
		}
857
 
858
		q->state |= Qstarve;	/* flag requesting producer to wake me */
859
		iunlock(&q->lk);
860
		sleep(&q->rr, notempty, q);
861
		ilock(&q->lk);
862
	}
863
	return 1;
864
}
865
 
866
/*
867
 * add a block list to a queue
868
 */
869
void
870
qaddlist(Queue *q, Block *b)
871
{
872
	/* queue the block */
873
	if(q->bfirst)
874
		q->blast->next = b;
875
	else
876
		q->bfirst = b;
877
	q->len += blockalloclen(b);
878
	q->dlen += blocklen(b);
879
	while(b->next)
880
		b = b->next;
881
	q->blast = b;
882
}
883
 
884
/*
885
 *  called with q ilocked
886
 */
887
Block*
888
qremove(Queue *q)
889
{
890
	Block *b;
891
 
892
	b = q->bfirst;
893
	if(b == nil)
894
		return nil;
895
	q->bfirst = b->next;
896
	b->next = nil;
897
	q->dlen -= BLEN(b);
898
	q->len -= BALLOC(b);
899
	QDEBUG checkb(b, "qremove");
900
	return b;
901
}
902
 
903
/*
904
 *  copy the contents of a string of blocks into
905
 *  memory.  emptied blocks are freed.  return
906
 *  pointer to first unconsumed block.
907
 */
908
Block*
909
bl2mem(uchar *p, Block *b, int n)
910
{
911
	int i;
912
	Block *next;
913
 
914
	for(; b != nil; b = next){
915
		i = BLEN(b);
916
		if(i > n){
917
			memmove(p, b->rp, n);
918
			b->rp += n;
919
			return b;
920
		}
921
		memmove(p, b->rp, i);
922
		n -= i;
923
		p += i;
924
		b->rp += i;
925
		next = b->next;
926
		freeb(b);
927
	}
928
	return nil;
929
}
930
 
931
/*
932
 *  copy the contents of memory into a string of blocks.
933
 *  return nil on error.
934
 */
935
Block*
936
mem2bl(uchar *p, int len)
937
{
938
	int n;
939
	Block *b, *first, **l;
940
 
941
	first = nil;
942
	l = &first;
943
	if(waserror()){
944
		freeblist(first);
945
		nexterror();
946
	}
947
	do {
948
		n = len;
949
		if(n > Maxatomic)
950
			n = Maxatomic;
951
 
952
		*l = b = allocb(n);
953
	/*	setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
954
		memmove(b->wp, p, n);
955
		b->wp += n;
956
		p += n;
957
		len -= n;
958
		l = &b->next;
959
	} while(len > 0);
960
	poperror();
961
 
962
	return first;
963
}
964
 
965
/*
966
 *  put a block back to the front of the queue
967
 *  called with q ilocked
968
 */
969
void
970
qputback(Queue *q, Block *b)
971
{
972
	b->next = q->bfirst;
973
	if(q->bfirst == nil)
974
		q->blast = b;
975
	q->bfirst = b;
976
	q->len += BALLOC(b);
977
	q->dlen += BLEN(b);
978
}
979
 
980
/*
981
 *  flow control, get producer going again
982
 *  called with q ilocked
983
 */
984
static void
985
qwakeup_iunlock(Queue *q)
986
{
987
	int dowakeup = 0;
988
 
989
	/* if writer flow controlled, restart */
990
	if((q->state & Qflow) && q->len < q->limit/2){
991
		q->state &= ~Qflow;
992
		dowakeup = 1;
993
	}
994
 
995
	iunlock(&q->lk);
996
 
997
	/* wakeup flow controlled writers */
998
	if(dowakeup){
999
		if(q->kick)
1000
			q->kick(q->arg);
1001
		wakeup(&q->wr);
1002
	}
1003
}
1004
 
1005
/*
1006
 *  get next block from a queue (up to a limit)
1007
 */
1008
Block*
1009
qbread(Queue *q, int len)
1010
{
1011
	Block *b, *nb;
1012
	int n;
1013
 
1014
	qlock(&q->rlock);
1015
	if(waserror()){
1016
		qunlock(&q->rlock);
1017
		nexterror();
1018
	}
1019
 
1020
	ilock(&q->lk);
1021
	switch(qwait(q)){
1022
	case 0:
1023
		/* queue closed */
1024
		iunlock(&q->lk);
1025
		qunlock(&q->rlock);
1026
		poperror();
1027
		return nil;
1028
	case -1:
1029
		/* multiple reads on a closed queue */
1030
		iunlock(&q->lk);
1031
		error(q->err);
1032
	}
1033
 
1034
	/* if we get here, there's at least one block in the queue */
1035
	b = qremove(q);
1036
	n = BLEN(b);
1037
 
1038
	/* split block if it's too big and this is not a message queue */
1039
	nb = b;
1040
	if(n > len){
1041
		if((q->state&Qmsg) == 0){
1042
			n -= len;
1043
			b = allocb(n);
1044
			memmove(b->wp, nb->rp+len, n);
1045
			b->wp += n;
1046
			qputback(q, b);
1047
		}
1048
		nb->wp = nb->rp + len;
1049
	}
1050
 
1051
	/* restart producer */
1052
	qwakeup_iunlock(q);
1053
 
1054
	poperror();
1055
	qunlock(&q->rlock);
1056
	return nb;
1057
}
1058
 
1059
/*
1060
 *  read a queue.  if no data is queued, post a Block
1061
 *  and wait on its Rendez.
1062
 */
1063
long
1064
qread(Queue *q, void *vp, int len)
1065
{
1066
	Block *b, *first, **l;
1067
	int m, n;
1068
 
1069
	qlock(&q->rlock);
1070
	if(waserror()){
1071
		qunlock(&q->rlock);
1072
		nexterror();
1073
	}
1074
 
1075
	ilock(&q->lk);
1076
again:
1077
	switch(qwait(q)){
1078
	case 0:
1079
		/* queue closed */
1080
		iunlock(&q->lk);
1081
		qunlock(&q->rlock);
1082
		poperror();
1083
		return 0;
1084
	case -1:
1085
		/* multiple reads on a closed queue */
1086
		iunlock(&q->lk);
1087
		error(q->err);
1088
	}
1089
 
1090
	/* if we get here, there's at least one block in the queue */
1091
	if(q->state & Qcoalesce){
1092
		/* when coalescing, 0 length blocks just go away */
1093
		b = q->bfirst;
1094
		if(BLEN(b) <= 0){
1095
			freeb(qremove(q));
1096
			goto again;
1097
		}
1098
 
1099
		/*  grab the first block plus as many
1100
		 *  following blocks as will completely
1101
		 *  fit in the read.
1102
		 */
1103
		n = 0;
1104
		l = &first;
1105
		m = BLEN(b);
1106
		for(;;) {
1107
			*l = qremove(q);
1108
			l = &b->next;
1109
			n += m;
1110
 
1111
			b = q->bfirst;
1112
			if(b == nil)
1113
				break;
1114
			m = BLEN(b);
1115
			if(n+m > len)
1116
				break;
1117
		}
1118
	} else {
1119
		first = qremove(q);
1120
		n = BLEN(first);
1121
	}
1122
 
1123
	/* copy to user space outside of the ilock */
1124
	iunlock(&q->lk);
1125
	b = bl2mem(vp, first, len);
1126
	ilock(&q->lk);
1127
 
1128
	/* take care of any left over partial block */
1129
	if(b != nil){
1130
		n -= BLEN(b);
1131
		if(q->state & Qmsg)
1132
			freeb(b);
1133
		else
1134
			qputback(q, b);
1135
	}
1136
 
1137
	/* restart producer */
1138
	qwakeup_iunlock(q);
1139
 
1140
	poperror();
1141
	qunlock(&q->rlock);
1142
	return n;
1143
}
1144
 
1145
static int
1146
qnotfull(void *a)
1147
{
1148
	Queue *q = a;
1149
 
1150
	return q->len < q->limit || (q->state & Qclosed);
1151
}
1152
 
1153
ulong noblockcnt;
1154
 
1155
/*
1156
 *  add a block to a queue obeying flow control
1157
 */
1158
long
1159
qbwrite(Queue *q, Block *b)
1160
{
1161
	int n, dowakeup;
1162
 
1163
	n = BLEN(b);
1164
 
1165
	if(q->bypass){
1166
		(*q->bypass)(q->arg, b);
1167
		return n;
1168
	}
1169
 
1170
	dowakeup = 0;
1171
	qlock(&q->wlock);
1172
	if(waserror()){
1173
		if(b != nil)
1174
			freeb(b);
1175
		qunlock(&q->wlock);
1176
		nexterror();
1177
	}
1178
 
1179
	ilock(&q->lk);
1180
 
1181
	/* give up if the queue is closed */
1182
	if(q->state & Qclosed){
1183
		iunlock(&q->lk);
1184
		error(q->err);
1185
	}
1186
 
1187
	/* if nonblocking, don't queue over the limit */
1188
	if(q->len >= q->limit){
1189
		if(q->noblock){
1190
			iunlock(&q->lk);
1191
			freeb(b);
1192
			noblockcnt += n;
1193
			qunlock(&q->wlock);
1194
			poperror();
1195
			return n;
1196
		}
1197
	}
1198
 
1199
	/* queue the block */
1200
	if(q->bfirst)
1201
		q->blast->next = b;
1202
	else
1203
		q->bfirst = b;
1204
	q->blast = b;
1205
	b->next = 0;
1206
	q->len += BALLOC(b);
1207
	q->dlen += n;
1208
	QDEBUG checkb(b, "qbwrite");
1209
	b = nil;
1210
 
1211
	/* make sure other end gets awakened */
1212
	if(q->state & Qstarve){
1213
		q->state &= ~Qstarve;
1214
		dowakeup = 1;
1215
	}
1216
	iunlock(&q->lk);
1217
 
1218
	/*  get output going again */
1219
	if(q->kick && (dowakeup || (q->state&Qkick)))
1220
		q->kick(q->arg);
1221
 
1222
	/* wakeup anyone consuming at the other end */
1223
	if(dowakeup){
1224
		wakeup(&q->rr);
1225
 
1226
		/* if we just wokeup a higher priority process, let it run */
1227
	/*
1228
		p = wakeup(&q->rr);
1229
		if(p != nil && p->priority > up->priority)
1230
			sched();
1231
	 */
1232
	}
1233
 
1234
	/*
1235
	 *  flow control, wait for queue to get below the limit
1236
	 *  before allowing the process to continue and queue
1237
	 *  more.  We do this here so that postnote can only
1238
	 *  interrupt us after the data has been queued.  This
1239
	 *  means that things like 9p flushes and ssl messages
1240
	 *  will not be disrupted by software interrupts.
1241
	 *
1242
	 *  Note - this is moderately dangerous since a process
1243
	 *  that keeps getting interrupted and rewriting will
1244
	 *  queue infinite crud.
1245
	 */
1246
	for(;;){
1247
		if(q->noblock || qnotfull(q))
1248
			break;
1249
 
1250
		ilock(&q->lk);
1251
		q->state |= Qflow;
1252
		iunlock(&q->lk);
1253
		sleep(&q->wr, qnotfull, q);
1254
	}
1255
	USED(b);
1256
 
1257
	qunlock(&q->wlock);
1258
	poperror();
1259
	return n;
1260
}
1261
 
1262
/*
1263
 *  write to a queue.  only Maxatomic bytes at a time is atomic.
1264
 */
1265
int
1266
qwrite(Queue *q, void *vp, int len)
1267
{
1268
	int n, sofar;
1269
	Block *b;
1270
	uchar *p = vp;
1271
 
1272
	QDEBUG if(!islo())
1273
		print("qwrite hi %p\n", getcallerpc(&q));
1274
 
1275
	sofar = 0;
1276
	do {
1277
		n = len-sofar;
1278
		if(n > Maxatomic)
1279
			n = Maxatomic;
1280
 
1281
		b = allocb(n);
1282
	/*	setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
1283
		if(waserror()){
1284
			freeb(b);
1285
			nexterror();
1286
		}
1287
		memmove(b->wp, p+sofar, n);
1288
		poperror();
1289
		b->wp += n;
1290
 
1291
		qbwrite(q, b);
1292
 
1293
		sofar += n;
1294
	} while(sofar < len && (q->state & Qmsg) == 0);
1295
 
1296
	return len;
1297
}
1298
 
1299
/*
1300
 *  used by print() to write to a queue.  Since we may be splhi or not in
1301
 *  a process, don't qlock.
1302
 */
1303
int
1304
qiwrite(Queue *q, void *vp, int len)
1305
{
1306
	int n, sofar, dowakeup;
1307
	Block *b;
1308
	uchar *p = vp;
1309
 
1310
	dowakeup = 0;
1311
 
1312
	sofar = 0;
1313
	do {
1314
		n = len-sofar;
1315
		if(n > Maxatomic)
1316
			n = Maxatomic;
1317
 
1318
		b = iallocb(n);
1319
		if(b == nil)
1320
			break;
1321
		memmove(b->wp, p+sofar, n);
1322
		b->wp += n;
1323
 
1324
		ilock(&q->lk);
1325
 
1326
		QDEBUG checkb(b, "qiwrite");
1327
		if(q->bfirst)
1328
			q->blast->next = b;
1329
		else
1330
			q->bfirst = b;
1331
		q->blast = b;
1332
		q->len += BALLOC(b);
1333
		q->dlen += n;
1334
 
1335
		if(q->state & Qstarve){
1336
			q->state &= ~Qstarve;
1337
			dowakeup = 1;
1338
		}
1339
 
1340
		iunlock(&q->lk);
1341
 
1342
		if(dowakeup){
1343
			if(q->kick)
1344
				q->kick(q->arg);
1345
			wakeup(&q->rr);
1346
		}
1347
 
1348
		sofar += n;
1349
	} while(sofar < len && (q->state & Qmsg) == 0);
1350
 
1351
	return sofar;
1352
}
1353
 
1354
/*
1355
 *  be extremely careful when calling this,
1356
 *  as there is no reference accounting
1357
 */
1358
void
1359
qfree(Queue *q)
1360
{
1361
	qclose(q);
1362
	free(q);
1363
}
1364
 
1365
/*
1366
 *  Mark a queue as closed.  No further IO is permitted.
1367
 *  All blocks are released.
1368
 */
1369
void
1370
qclose(Queue *q)
1371
{
1372
	Block *bfirst;
1373
 
1374
	if(q == nil)
1375
		return;
1376
 
1377
	/* mark it */
1378
	ilock(&q->lk);
1379
	q->state |= Qclosed;
1380
	q->state &= ~(Qflow|Qstarve);
1381
	strcpy(q->err, Ehungup);
1382
	bfirst = q->bfirst;
1383
	q->bfirst = 0;
1384
	q->len = 0;
1385
	q->dlen = 0;
1386
	q->noblock = 0;
1387
	iunlock(&q->lk);
1388
 
1389
	/* free queued blocks */
1390
	freeblist(bfirst);
1391
 
1392
	/* wake up readers/writers */
1393
	wakeup(&q->rr);
1394
	wakeup(&q->wr);
1395
}
1396
 
1397
/*
1398
 *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1399
 *  blocks.
1400
 */
1401
void
1402
qhangup(Queue *q, char *msg)
1403
{
1404
	/* mark it */
1405
	ilock(&q->lk);
1406
	q->state |= Qclosed;
1407
	if(msg == 0 || *msg == 0)
1408
		strcpy(q->err, Ehungup);
1409
	else
1410
		strncpy(q->err, msg, ERRMAX-1);
1411
	iunlock(&q->lk);
1412
 
1413
	/* wake up readers/writers */
1414
	wakeup(&q->rr);
1415
	wakeup(&q->wr);
1416
}
1417
 
1418
/*
1419
 *  return non-zero if the q is hungup
1420
 */
1421
int
1422
qisclosed(Queue *q)
1423
{
1424
	return q->state & Qclosed;
1425
}
1426
 
1427
/*
1428
 *  mark a queue as no longer hung up
1429
 */
1430
void
1431
qreopen(Queue *q)
1432
{
1433
	ilock(&q->lk);
1434
	q->state &= ~Qclosed;
1435
	q->state |= Qstarve;
1436
	q->eof = 0;
1437
	q->limit = q->inilim;
1438
	iunlock(&q->lk);
1439
}
1440
 
1441
/*
1442
 *  return bytes queued
1443
 */
1444
int
1445
qlen(Queue *q)
1446
{
1447
	return q->dlen;
1448
}
1449
 
1450
/*
1451
 * return space remaining before flow control
1452
 */
1453
int
1454
qwindow(Queue *q)
1455
{
1456
	int l;
1457
 
1458
	l = q->limit - q->len;
1459
	if(l < 0)
1460
		l = 0;
1461
	return l;
1462
}
1463
 
1464
/*
1465
 *  return true if we can read without blocking
1466
 */
1467
int
1468
qcanread(Queue *q)
1469
{
1470
	return q->bfirst!=0;
1471
}
1472
 
1473
/*
1474
 *  change queue limit
1475
 */
1476
void
1477
qsetlimit(Queue *q, int limit)
1478
{
1479
	q->limit = limit;
1480
}
1481
 
1482
/*
1483
 *  set blocking/nonblocking
1484
 */
1485
void
1486
qnoblock(Queue *q, int onoff)
1487
{
1488
	q->noblock = onoff;
1489
}
1490
 
1491
/*
1492
 *  flush the output queue
1493
 */
1494
void
1495
qflush(Queue *q)
1496
{
1497
	Block *bfirst;
1498
 
1499
	/* mark it */
1500
	ilock(&q->lk);
1501
	bfirst = q->bfirst;
1502
	q->bfirst = 0;
1503
	q->len = 0;
1504
	q->dlen = 0;
1505
	iunlock(&q->lk);
1506
 
1507
	/* free queued blocks */
1508
	freeblist(bfirst);
1509
 
1510
	/* wake up readers/writers */
1511
	wakeup(&q->wr);
1512
}
1513
 
1514
int
1515
qfull(Queue *q)
1516
{
1517
	return q->state & Qflow;
1518
}
1519
 
1520
int
1521
qstate(Queue *q)
1522
{
1523
	return q->state;
1524
}