Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
#include	"u.h"
2
#include	"../port/lib.h"
3
#include	"mem.h"
4
#include	"dat.h"
5
#include	"fns.h"
6
#include	"../port/error.h"
7
 
8
static ulong padblockcnt;
9
static ulong concatblockcnt;
10
static ulong pullupblockcnt;
11
static ulong copyblockcnt;
12
static ulong consumecnt;
13
static ulong producecnt;
14
static ulong qcopycnt;
15
 
16
static int debugging;
17
 
18
#define QDEBUG	if(0)
19
 
20
/*
21
 *  IO queues
22
 */
23
typedef struct Queue	Queue;
24
 
25
struct Queue
26
{
27
	Lock;
28
 
29
	Block*	bfirst;		/* buffer */
30
	Block*	blast;
31
 
32
	int	len;		/* bytes allocated to queue */
33
	int	dlen;		/* data bytes in queue */
34
	int	limit;		/* max bytes in queue */
35
	int	inilim;		/* initial limit */
36
	int	state;
37
	int	noblock;	/* true if writes return immediately when q full */
38
	int	eof;		/* number of eofs read by user */
39
 
40
	void	(*kick)(void*);	/* restart output */
41
	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
42
	void*	arg;		/* argument to kick */
43
 
44
	QLock	rlock;		/* mutex for reading processes */
45
	Rendez	rr;		/* process waiting to read */
46
	QLock	wlock;		/* mutex for writing processes */
47
	Rendez	wr;		/* process waiting to write */
48
 
49
	char	err[ERRMAX];
50
};
51
 
52
enum
53
{
54
	Maxatomic	= 64*1024,
55
};
56
 
57
uint	qiomaxatomic = Maxatomic;
58
 
59
void
60
ixsummary(void)
61
{
62
	debugging ^= 1;
63
	iallocsummary();
64
	print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65
		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66
	print("consume %lud, produce %lud, qcopy %lud\n",
67
		consumecnt, producecnt, qcopycnt);
68
}
69
 
70
/*
71
 *  free a list of blocks
72
 */
73
void
74
freeblist(Block *b)
75
{
76
	Block *next;
77
 
78
	for(; b != 0; b = next){
79
		next = b->next;
80
		if(b->ref == 1)
81
			b->next = nil;
82
		freeb(b);
83
	}
84
}
85
 
86
/*
87
 *  pad a block to the front (or the back if size is negative)
88
 */
89
Block*
90
padblock(Block *bp, int size)
91
{
92
	int n;
93
	Block *nbp;
94
 
95
	QDEBUG checkb(bp, "padblock 1");
96
	if(size >= 0){
97
		if(bp->rp - bp->base >= size){
98
			bp->rp -= size;
99
			return bp;
100
		}
101
 
102
		if(bp->next)
103
			panic("padblock %#p", getcallerpc(&bp));
104
		n = BLEN(bp);
105
		padblockcnt++;
106
		nbp = allocb(size+n);
107
		nbp->rp += size;
108
		nbp->wp = nbp->rp;
109
		memmove(nbp->wp, bp->rp, n);
110
		nbp->wp += n;
111
		freeb(bp);
112
		nbp->rp -= size;
113
	} else {
114
		size = -size;
115
 
116
		if(bp->next)
117
			panic("padblock %#p", getcallerpc(&bp));
118
 
119
		if(bp->lim - bp->wp >= size)
120
			return bp;
121
 
122
		n = BLEN(bp);
123
		padblockcnt++;
124
		nbp = allocb(size+n);
125
		memmove(nbp->wp, bp->rp, n);
126
		nbp->wp += n;
127
		freeb(bp);
128
	}
129
	QDEBUG checkb(nbp, "padblock 1");
130
	return nbp;
131
}
132
 
133
/*
134
 *  return count of bytes in a string of blocks
135
 */
136
int
137
blocklen(Block *bp)
138
{
139
	int len;
140
 
141
	len = 0;
142
	while(bp) {
143
		len += BLEN(bp);
144
		bp = bp->next;
145
	}
146
	return len;
147
}
148
 
149
/*
150
 * return count of space in blocks
151
 */
152
int
153
blockalloclen(Block *bp)
154
{
155
	int len;
156
 
157
	len = 0;
158
	while(bp) {
159
		len += BALLOC(bp);
160
		bp = bp->next;
161
	}
162
	return len;
163
}
164
 
165
/*
166
 *  copy the  string of blocks into
167
 *  a single block and free the string
168
 */
169
Block*
170
concatblock(Block *bp)
171
{
172
	int len;
173
	Block *nb, *f;
174
 
175
	if(bp->next == 0)
176
		return bp;
177
 
178
	nb = allocb(blocklen(bp));
179
	for(f = bp; f; f = f->next) {
180
		len = BLEN(f);
181
		memmove(nb->wp, f->rp, len);
182
		nb->wp += len;
183
	}
184
	concatblockcnt += BLEN(nb);
185
	freeblist(bp);
186
	QDEBUG checkb(nb, "concatblock 1");
187
	return nb;
188
}
189
 
190
/*
191
 *  make sure the first block has at least n bytes
192
 */
193
Block*
194
pullupblock(Block *bp, int n)
195
{
196
	int i;
197
	Block *nbp;
198
 
199
	/*
200
	 *  this should almost always be true, it's
201
	 *  just to avoid every caller checking.
202
	 */
203
	if(BLEN(bp) >= n)
204
		return bp;
205
 
206
	/*
207
	 *  if not enough room in the first block,
208
	 *  add another to the front of the list.
209
	 */
210
	if(bp->lim - bp->rp < n){
211
		nbp = allocb(n);
212
		nbp->next = bp;
213
		bp = nbp;
214
	}
215
 
216
	/*
217
	 *  copy bytes from the trailing blocks into the first
218
	 */
219
	n -= BLEN(bp);
220
	while(nbp = bp->next){
221
		i = BLEN(nbp);
222
		if(i > n) {
223
			memmove(bp->wp, nbp->rp, n);
224
			pullupblockcnt++;
225
			bp->wp += n;
226
			nbp->rp += n;
227
			QDEBUG checkb(bp, "pullupblock 1");
228
			return bp;
229
		} else {
230
			/* shouldn't happen but why crash if it does */
231
			if(i < 0){
232
				print("pullup negative length packet, called from %#p\n",
233
					getcallerpc(&bp));
234
				i = 0;
235
			}
236
			memmove(bp->wp, nbp->rp, i);
237
			pullupblockcnt++;
238
			bp->wp += i;
239
			bp->next = nbp->next;
240
			nbp->next = 0;
241
			freeb(nbp);
242
			n -= i;
243
			if(n == 0){
244
				QDEBUG checkb(bp, "pullupblock 2");
245
				return bp;
246
			}
247
		}
248
	}
249
	freeb(bp);
250
	return 0;
251
}
252
 
253
/*
254
 *  make sure the first block has at least n bytes
255
 */
256
Block*
257
pullupqueue(Queue *q, int n)
258
{
259
	Block *b;
260
 
261
	if(BLEN(q->bfirst) >= n)
262
		return q->bfirst;
263
	q->bfirst = pullupblock(q->bfirst, n);
264
	for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
265
		;
266
	q->blast = b;
267
	return q->bfirst;
268
}
269
 
270
/*
271
 *  trim to len bytes starting at offset
272
 */
273
Block *
274
trimblock(Block *bp, int offset, int len)
275
{
276
	ulong l;
277
	Block *nb, *startb;
278
 
279
	QDEBUG checkb(bp, "trimblock 1");
280
	if(blocklen(bp) < offset+len) {
281
		freeblist(bp);
282
		return nil;
283
	}
284
 
285
	while((l = BLEN(bp)) < offset) {
286
		offset -= l;
287
		nb = bp->next;
288
		bp->next = nil;
289
		freeb(bp);
290
		bp = nb;
291
	}
292
 
293
	startb = bp;
294
	bp->rp += offset;
295
 
296
	while((l = BLEN(bp)) < len) {
297
		len -= l;
298
		bp = bp->next;
299
	}
300
 
301
	bp->wp -= (BLEN(bp) - len);
302
 
303
	if(bp->next) {
304
		freeblist(bp->next);
305
		bp->next = nil;
306
	}
307
 
308
	return startb;
309
}
310
 
311
/*
312
 *  copy 'count' bytes into a new block
313
 */
314
Block*
315
copyblock(Block *bp, int count)
316
{
317
	int l;
318
	Block *nbp;
319
 
320
	QDEBUG checkb(bp, "copyblock 0");
321
	nbp = allocb(count);
322
	for(; count > 0 && bp != 0; bp = bp->next){
323
		l = BLEN(bp);
324
		if(l > count)
325
			l = count;
326
		memmove(nbp->wp, bp->rp, l);
327
		nbp->wp += l;
328
		count -= l;
329
	}
330
	if(count > 0){
331
		memset(nbp->wp, 0, count);
332
		nbp->wp += count;
333
	}
334
	copyblockcnt++;
335
	QDEBUG checkb(nbp, "copyblock 1");
336
 
337
	return nbp;
338
}
339
 
340
Block*
341
adjustblock(Block* bp, int len)
342
{
343
	int n;
344
	Block *nbp;
345
 
346
	if(len < 0){
347
		freeb(bp);
348
		return nil;
349
	}
350
 
351
	if(bp->rp+len > bp->lim){
352
		nbp = copyblock(bp, len);
353
		freeblist(bp);
354
		QDEBUG checkb(nbp, "adjustblock 1");
355
 
356
		return nbp;
357
	}
358
 
359
	n = BLEN(bp);
360
	if(len > n)
361
		memset(bp->wp, 0, len-n);
362
	bp->wp = bp->rp+len;
363
	QDEBUG checkb(bp, "adjustblock 2");
364
 
365
	return bp;
366
}
367
 
368
 
369
/*
370
 *  throw away up to count bytes from a
371
 *  list of blocks.  Return count of bytes
372
 *  thrown away.
373
 */
374
int
375
pullblock(Block **bph, int count)
376
{
377
	Block *bp;
378
	int n, bytes;
379
 
380
	bytes = 0;
381
	if(bph == nil)
382
		return 0;
383
 
384
	while(*bph != nil && count != 0) {
385
		bp = *bph;
386
		n = BLEN(bp);
387
		if(count < n)
388
			n = count;
389
		bytes += n;
390
		count -= n;
391
		bp->rp += n;
392
		QDEBUG checkb(bp, "pullblock ");
393
		if(BLEN(bp) == 0) {
394
			*bph = bp->next;
395
			bp->next = nil;
396
			freeb(bp);
397
		}
398
	}
399
	return bytes;
400
}
401
 
402
/*
403
 *  get next block from a queue, return null if nothing there
404
 */
405
Block*
406
qget(Queue *q)
407
{
408
	int dowakeup;
409
	Block *b;
410
 
411
	/* sync with qwrite */
412
	ilock(q);
413
 
414
	b = q->bfirst;
415
	if(b == nil){
416
		q->state |= Qstarve;
417
		iunlock(q);
418
		return nil;
419
	}
420
	q->bfirst = b->next;
421
	b->next = 0;
422
	q->len -= BALLOC(b);
423
	q->dlen -= BLEN(b);
424
	QDEBUG checkb(b, "qget");
425
 
426
	/* if writer flow controlled, restart */
427
	if((q->state & Qflow) && q->len < q->limit/2){
428
		q->state &= ~Qflow;
429
		dowakeup = 1;
430
	} else
431
		dowakeup = 0;
432
 
433
	iunlock(q);
434
 
435
	if(dowakeup)
436
		wakeup(&q->wr);
437
 
438
	return b;
439
}
440
 
441
/*
442
 *  throw away the next 'len' bytes in the queue
443
 */
444
int
445
qdiscard(Queue *q, int len)
446
{
447
	Block *b;
448
	int dowakeup, n, sofar;
449
 
450
	ilock(q);
451
	for(sofar = 0; sofar < len; sofar += n){
452
		b = q->bfirst;
453
		if(b == nil)
454
			break;
455
		QDEBUG checkb(b, "qdiscard");
456
		n = BLEN(b);
457
		if(n <= len - sofar){
458
			q->bfirst = b->next;
459
			b->next = 0;
460
			q->len -= BALLOC(b);
461
			q->dlen -= BLEN(b);
462
			freeb(b);
463
		} else {
464
			n = len - sofar;
465
			b->rp += n;
466
			q->dlen -= n;
467
		}
468
	}
469
 
470
	/*
471
	 *  if writer flow controlled, restart
472
	 *
473
	 *  This used to be
474
	 *	q->len < q->limit/2
475
	 *  but it slows down tcp too much for certain write sizes.
476
	 *  I really don't understand it completely.  It may be
477
	 *  due to the queue draining so fast that the transmission
478
	 *  stalls waiting for the app to produce more data.  - presotto
479
	 */
480
	if((q->state & Qflow) && q->len < q->limit){
481
		q->state &= ~Qflow;
482
		dowakeup = 1;
483
	} else
484
		dowakeup = 0;
485
 
486
	iunlock(q);
487
 
488
	if(dowakeup)
489
		wakeup(&q->wr);
490
 
491
	return sofar;
492
}
493
 
494
/*
495
 *  Interrupt level copy out of a queue, return # bytes copied.
496
 */
497
int
498
qconsume(Queue *q, void *vp, int len)
499
{
500
	Block *b;
501
	int n, dowakeup;
502
	uchar *p = vp;
503
	Block *tofree = nil;
504
 
505
	/* sync with qwrite */
506
	ilock(q);
507
 
508
	for(;;) {
509
		b = q->bfirst;
510
		if(b == 0){
511
			q->state |= Qstarve;
512
			iunlock(q);
513
			return -1;
514
		}
515
		QDEBUG checkb(b, "qconsume 1");
516
 
517
		n = BLEN(b);
518
		if(n > 0)
519
			break;
520
		q->bfirst = b->next;
521
		q->len -= BALLOC(b);
522
 
523
		/* remember to free this */
524
		b->next = tofree;
525
		tofree = b;
526
	};
527
 
528
	if(n < len)
529
		len = n;
530
	memmove(p, b->rp, len);
531
	consumecnt += n;
532
	b->rp += len;
533
	q->dlen -= len;
534
 
535
	/* discard the block if we're done with it */
536
	if((q->state & Qmsg) || len == n){
537
		q->bfirst = b->next;
538
		b->next = 0;
539
		q->len -= BALLOC(b);
540
		q->dlen -= BLEN(b);
541
 
542
		/* remember to free this */
543
		b->next = tofree;
544
		tofree = b;
545
	}
546
 
547
	/* if writer flow controlled, restart */
548
	if((q->state & Qflow) && q->len < q->limit/2){
549
		q->state &= ~Qflow;
550
		dowakeup = 1;
551
	} else
552
		dowakeup = 0;
553
 
554
	iunlock(q);
555
 
556
	if(dowakeup)
557
		wakeup(&q->wr);
558
 
559
	if(tofree != nil)
560
		freeblist(tofree);
561
 
562
	return len;
563
}
564
 
565
int
566
qpass(Queue *q, Block *b)
567
{
568
	int dlen, len, dowakeup;
569
 
570
	/* sync with qread */
571
	dowakeup = 0;
572
	ilock(q);
573
	if(q->len >= q->limit){
574
		freeblist(b);
575
		iunlock(q);
576
		return -1;
577
	}
578
	if(q->state & Qclosed){
579
		len = BALLOC(b);
580
		freeblist(b);
581
		iunlock(q);
582
		return len;
583
	}
584
 
585
	/* add buffer to queue */
586
	if(q->bfirst)
587
		q->blast->next = b;
588
	else
589
		q->bfirst = b;
590
	len = BALLOC(b);
591
	dlen = BLEN(b);
592
	QDEBUG checkb(b, "qpass");
593
	while(b->next){
594
		b = b->next;
595
		QDEBUG checkb(b, "qpass");
596
		len += BALLOC(b);
597
		dlen += BLEN(b);
598
	}
599
	q->blast = b;
600
	q->len += len;
601
	q->dlen += dlen;
602
 
603
	if(q->len >= q->limit/2)
604
		q->state |= Qflow;
605
 
606
	if(q->state & Qstarve){
607
		q->state &= ~Qstarve;
608
		dowakeup = 1;
609
	}
610
	iunlock(q);
611
 
612
	if(dowakeup)
613
		wakeup(&q->rr);
614
 
615
	return len;
616
}
617
 
618
int
619
qpassnolim(Queue *q, Block *b)
620
{
621
	int dlen, len, dowakeup;
622
 
623
	/* sync with qread */
624
	dowakeup = 0;
625
	ilock(q);
626
 
627
	if(q->state & Qclosed){
628
		freeblist(b);
629
		iunlock(q);
630
		return BALLOC(b);
631
	}
632
 
633
	/* add buffer to queue */
634
	if(q->bfirst)
635
		q->blast->next = b;
636
	else
637
		q->bfirst = b;
638
	len = BALLOC(b);
639
	dlen = BLEN(b);
640
	QDEBUG checkb(b, "qpass");
641
	while(b->next){
642
		b = b->next;
643
		QDEBUG checkb(b, "qpass");
644
		len += BALLOC(b);
645
		dlen += BLEN(b);
646
	}
647
	q->blast = b;
648
	q->len += len;
649
	q->dlen += dlen;
650
 
651
	if(q->len >= q->limit/2)
652
		q->state |= Qflow;
653
 
654
	if(q->state & Qstarve){
655
		q->state &= ~Qstarve;
656
		dowakeup = 1;
657
	}
658
	iunlock(q);
659
 
660
	if(dowakeup)
661
		wakeup(&q->rr);
662
 
663
	return len;
664
}
665
 
666
/*
667
 *  if the allocated space is way out of line with the used
668
 *  space, reallocate to a smaller block
669
 */
670
Block*
671
packblock(Block *bp)
672
{
673
	Block **l, *nbp;
674
	int n;
675
 
676
	for(l = &bp; *l; l = &(*l)->next){
677
		nbp = *l;
678
		n = BLEN(nbp);
679
		if((n<<2) < BALLOC(nbp)){
680
			*l = allocb(n);
681
			memmove((*l)->wp, nbp->rp, n);
682
			(*l)->wp += n;
683
			(*l)->next = nbp->next;
684
			freeb(nbp);
685
		}
686
	}
687
 
688
	return bp;
689
}
690
 
691
int
692
qproduce(Queue *q, void *vp, int len)
693
{
694
	Block *b;
695
	int dowakeup;
696
	uchar *p = vp;
697
 
698
	/* sync with qread */
699
	dowakeup = 0;
700
	ilock(q);
701
 
702
	/* no waiting receivers, room in buffer? */
703
	if(q->len >= q->limit){
704
		q->state |= Qflow;
705
		iunlock(q);
706
		return -1;
707
	}
708
 
709
	/* save in buffer */
710
	b = iallocb(len);
711
	if(b == 0){
712
		iunlock(q);
713
		return 0;
714
	}
715
	memmove(b->wp, p, len);
716
	producecnt += len;
717
	b->wp += len;
718
	if(q->bfirst)
719
		q->blast->next = b;
720
	else
721
		q->bfirst = b;
722
	q->blast = b;
723
	/* b->next = 0; done by iallocb() */
724
	q->len += BALLOC(b);
725
	q->dlen += BLEN(b);
726
	QDEBUG checkb(b, "qproduce");
727
 
728
	if(q->state & Qstarve){
729
		q->state &= ~Qstarve;
730
		dowakeup = 1;
731
	}
732
 
733
	if(q->len >= q->limit)
734
		q->state |= Qflow;
735
	iunlock(q);
736
 
737
	if(dowakeup)
738
		wakeup(&q->rr);
739
 
740
	return len;
741
}
742
 
743
/*
744
 *  copy from offset in the queue
745
 */
746
Block*
747
qcopy(Queue *q, int len, ulong offset)
748
{
749
	int sofar;
750
	int n;
751
	Block *b, *nb;
752
	uchar *p;
753
 
754
	nb = allocb(len);
755
 
756
	ilock(q);
757
 
758
	/* go to offset */
759
	b = q->bfirst;
760
	for(sofar = 0; ; sofar += n){
761
		if(b == nil){
762
			iunlock(q);
763
			return nb;
764
		}
765
		n = BLEN(b);
766
		if(sofar + n > offset){
767
			p = b->rp + offset - sofar;
768
			n -= offset - sofar;
769
			break;
770
		}
771
		QDEBUG checkb(b, "qcopy");
772
		b = b->next;
773
	}
774
 
775
	/* copy bytes from there */
776
	for(sofar = 0; sofar < len;){
777
		if(n > len - sofar)
778
			n = len - sofar;
779
		memmove(nb->wp, p, n);
780
		qcopycnt += n;
781
		sofar += n;
782
		nb->wp += n;
783
		b = b->next;
784
		if(b == nil)
785
			break;
786
		n = BLEN(b);
787
		p = b->rp;
788
	}
789
	iunlock(q);
790
 
791
	return nb;
792
}
793
 
794
/*
795
 *  called by non-interrupt code
796
 */
797
Queue*
798
qopen(int limit, int msg, void (*kick)(void*), void *arg)
799
{
800
	Queue *q;
801
 
802
	q = malloc(sizeof(Queue));
803
	if(q == 0)
804
		return 0;
805
 
806
	q->limit = q->inilim = limit;
807
	q->kick = kick;
808
	q->arg = arg;
809
	q->state = msg;
810
 
811
	q->state |= Qstarve;
812
	q->eof = 0;
813
	q->noblock = 0;
814
 
815
	return q;
816
}
817
 
818
/* open a queue to be bypassed */
819
Queue*
820
qbypass(void (*bypass)(void*, Block*), void *arg)
821
{
822
	Queue *q;
823
 
824
	q = malloc(sizeof(Queue));
825
	if(q == 0)
826
		return 0;
827
 
828
	q->limit = 0;
829
	q->arg = arg;
830
	q->bypass = bypass;
831
	q->state = 0;
832
 
833
	return q;
834
}
835
 
836
static int
837
notempty(void *a)
838
{
839
	Queue *q = a;
840
 
841
	return (q->state & Qclosed) || q->bfirst != 0;
842
}
843
 
844
/*
845
 *  wait for the queue to be non-empty or closed.
846
 *  called with q ilocked.
847
 */
848
static int
849
qwait(Queue *q)
850
{
851
	/* wait for data */
852
	for(;;){
853
		if(q->bfirst != nil)
854
			break;
855
 
856
		if(q->state & Qclosed){
857
			if(++q->eof > 3)
858
				return -1;
859
			if(*q->err && strcmp(q->err, Ehungup) != 0)
860
				return -1;
861
			return 0;
862
		}
863
 
864
		q->state |= Qstarve;	/* flag requesting producer to wake me */
865
		iunlock(q);
866
		sleep(&q->rr, notempty, q);
867
		ilock(q);
868
	}
869
	return 1;
870
}
871
 
872
/*
873
 * add a block list to a queue
874
 */
875
void
876
qaddlist(Queue *q, Block *b)
877
{
878
	/* queue the block */
879
	if(q->bfirst)
880
		q->blast->next = b;
881
	else
882
		q->bfirst = b;
883
	q->len += blockalloclen(b);
884
	q->dlen += blocklen(b);
885
	while(b->next)
886
		b = b->next;
887
	q->blast = b;
888
}
889
 
890
/*
891
 *  called with q ilocked
892
 */
893
Block*
894
qremove(Queue *q)
895
{
896
	Block *b;
897
 
898
	b = q->bfirst;
899
	if(b == nil)
900
		return nil;
901
	q->bfirst = b->next;
902
	b->next = nil;
903
	q->dlen -= BLEN(b);
904
	q->len -= BALLOC(b);
905
	QDEBUG checkb(b, "qremove");
906
	return b;
907
}
908
 
909
/*
910
 *  copy the contents of a string of blocks into
911
 *  memory.  emptied blocks are freed.  return
912
 *  pointer to first unconsumed block.
913
 */
914
Block*
915
bl2mem(uchar *p, Block *b, int n)
916
{
917
	int i;
918
	Block *next;
919
 
920
	for(; b != nil; b = next){
921
		i = BLEN(b);
922
		if(i > n){
923
			memmove(p, b->rp, n);
924
			b->rp += n;
925
			return b;
926
		}
927
		memmove(p, b->rp, i);
928
		n -= i;
929
		p += i;
930
		b->rp += i;
931
		next = b->next;
932
		freeb(b);
933
	}
934
	return nil;
935
}
936
 
937
/*
938
 *  copy the contents of memory into a string of blocks.
939
 *  return nil on error.
940
 */
941
Block*
942
mem2bl(uchar *p, int len)
943
{
944
	int n;
945
	Block *b, *first, **l;
946
 
947
	first = nil;
948
	l = &first;
949
	if(waserror()){
950
		freeblist(first);
951
		nexterror();
952
	}
953
	do {
954
		n = len;
955
		if(n > Maxatomic)
956
			n = Maxatomic;
957
 
958
		*l = b = allocb(n);
959
		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
960
		memmove(b->wp, p, n);
961
		b->wp += n;
962
		p += n;
963
		len -= n;
964
		l = &b->next;
965
	} while(len > 0);
966
	poperror();
967
 
968
	return first;
969
}
970
 
971
/*
972
 *  put a block back to the front of the queue
973
 *  called with q ilocked
974
 */
975
void
976
qputback(Queue *q, Block *b)
977
{
978
	b->next = q->bfirst;
979
	if(q->bfirst == nil)
980
		q->blast = b;
981
	q->bfirst = b;
982
	q->len += BALLOC(b);
983
	q->dlen += BLEN(b);
984
}
985
 
986
/*
987
 *  flow control, get producer going again
988
 *  called with q ilocked
989
 */
990
static void
991
qwakeup_iunlock(Queue *q)
992
{
993
	int dowakeup = 0;
994
 
995
	/* if writer flow controlled, restart */
996
	if((q->state & Qflow) && q->len < q->limit/2){
997
		q->state &= ~Qflow;
998
		dowakeup = 1;
999
	}
1000
 
1001
	iunlock(q);
1002
 
1003
	/* wakeup flow controlled writers */
1004
	if(dowakeup){
1005
		if(q->kick)
1006
			q->kick(q->arg);
1007
		wakeup(&q->wr);
1008
	}
1009
}
1010
 
1011
/*
1012
 *  get next block from a queue (up to a limit)
1013
 */
1014
Block*
1015
qbread(Queue *q, int len)
1016
{
1017
	Block *b, *nb;
1018
	int n;
1019
 
1020
	qlock(&q->rlock);
1021
	if(waserror()){
1022
		qunlock(&q->rlock);
1023
		nexterror();
1024
	}
1025
 
1026
	ilock(q);
1027
	switch(qwait(q)){
1028
	case 0:
1029
		/* queue closed */
1030
		iunlock(q);
1031
		qunlock(&q->rlock);
1032
		poperror();
1033
		return nil;
1034
	case -1:
1035
		/* multiple reads on a closed queue */
1036
		iunlock(q);
1037
		error(q->err);
1038
	}
1039
 
1040
	/* if we get here, there's at least one block in the queue */
1041
	b = qremove(q);
1042
	n = BLEN(b);
1043
 
1044
	/* split block if it's too big and this is not a message queue */
1045
	nb = b;
1046
	if(n > len){
1047
		if((q->state&Qmsg) == 0){
1048
			n -= len;
1049
			b = allocb(n);
1050
			memmove(b->wp, nb->rp+len, n);
1051
			b->wp += n;
1052
			qputback(q, b);
1053
		}
1054
		nb->wp = nb->rp + len;
1055
	}
1056
 
1057
	/* restart producer */
1058
	qwakeup_iunlock(q);
1059
 
1060
	poperror();
1061
	qunlock(&q->rlock);
1062
	return nb;
1063
}
1064
 
1065
/*
1066
 *  read a queue.  if no data is queued, post a Block
1067
 *  and wait on its Rendez.
1068
 */
1069
long
1070
qread(Queue *q, void *vp, int len)
1071
{
1072
	Block *b, *first, **l;
1073
	int m, n;
1074
 
1075
	qlock(&q->rlock);
1076
	if(waserror()){
1077
		qunlock(&q->rlock);
1078
		nexterror();
1079
	}
1080
 
1081
	ilock(q);
1082
again:
1083
	switch(qwait(q)){
1084
	case 0:
1085
		/* queue closed */
1086
		iunlock(q);
1087
		qunlock(&q->rlock);
1088
		poperror();
1089
		return 0;
1090
	case -1:
1091
		/* multiple reads on a closed queue */
1092
		iunlock(q);
1093
		error(q->err);
1094
	}
1095
 
1096
	/* if we get here, there's at least one block in the queue */
1097
	if(q->state & Qcoalesce){
1098
		/* when coalescing, 0 length blocks just go away */
1099
		b = q->bfirst;
1100
		if(BLEN(b) <= 0){
1101
			freeb(qremove(q));
1102
			goto again;
1103
		}
1104
 
1105
		/*  grab the first block plus as many
1106
		 *  following blocks as will completely
1107
		 *  fit in the read.
1108
		 */
1109
		n = 0;
1110
		l = &first;
1111
		m = BLEN(b);
1112
		for(;;) {
1113
			*l = qremove(q);
1114
			l = &b->next;
1115
			n += m;
1116
 
1117
			b = q->bfirst;
1118
			if(b == nil)
1119
				break;
1120
			m = BLEN(b);
1121
			if(n+m > len)
1122
				break;
1123
		}
1124
	} else {
1125
		first = qremove(q);
1126
		n = BLEN(first);
1127
	}
1128
 
1129
	/* copy to user space outside of the ilock */
1130
	iunlock(q);
1131
	b = bl2mem(vp, first, len);
1132
	ilock(q);
1133
 
1134
	/* take care of any left over partial block */
1135
	if(b != nil){
1136
		n -= BLEN(b);
1137
		if(q->state & Qmsg)
1138
			freeb(b);
1139
		else
1140
			qputback(q, b);
1141
	}
1142
 
1143
	/* restart producer */
1144
	qwakeup_iunlock(q);
1145
 
1146
	poperror();
1147
	qunlock(&q->rlock);
1148
	return n;
1149
}
1150
 
1151
static int
1152
qnotfull(void *a)
1153
{
1154
	Queue *q = a;
1155
 
1156
	return q->len < q->limit || (q->state & Qclosed);
1157
}
1158
 
1159
ulong noblockcnt;
1160
 
1161
/*
1162
 *  add a block to a queue obeying flow control
1163
 */
1164
long
1165
qbwrite(Queue *q, Block *b)
1166
{
1167
	int n, dowakeup;
1168
	Proc *p;
1169
 
1170
	n = BLEN(b);
1171
 
1172
	if(q->bypass){
1173
		(*q->bypass)(q->arg, b);
1174
		return n;
1175
	}
1176
 
1177
	dowakeup = 0;
1178
	qlock(&q->wlock);
1179
	if(waserror()){
1180
		if(b != nil)
1181
			freeb(b);
1182
		qunlock(&q->wlock);
1183
		nexterror();
1184
	}
1185
 
1186
	ilock(q);
1187
 
1188
	/* give up if the queue is closed */
1189
	if(q->state & Qclosed){
1190
		iunlock(q);
1191
		error(q->err);
1192
	}
1193
 
1194
	/* if nonblocking, don't queue over the limit */
1195
	if(q->len >= q->limit){
1196
		if(q->noblock){
1197
			iunlock(q);
1198
			freeb(b);
1199
			noblockcnt += n;
1200
			qunlock(&q->wlock);
1201
			poperror();
1202
			return n;
1203
		}
1204
	}
1205
 
1206
	/* queue the block */
1207
	if(q->bfirst)
1208
		q->blast->next = b;
1209
	else
1210
		q->bfirst = b;
1211
	q->blast = b;
1212
	b->next = 0;
1213
	q->len += BALLOC(b);
1214
	q->dlen += n;
1215
	QDEBUG checkb(b, "qbwrite");
1216
	b = nil;
1217
 
1218
	/* make sure other end gets awakened */
1219
	if(q->state & Qstarve){
1220
		q->state &= ~Qstarve;
1221
		dowakeup = 1;
1222
	}
1223
	iunlock(q);
1224
 
1225
	/*  get output going again */
1226
	if(q->kick && (dowakeup || (q->state&Qkick)))
1227
		q->kick(q->arg);
1228
 
1229
	/* wakeup anyone consuming at the other end */
1230
	if(dowakeup){
1231
		p = wakeup(&q->rr);
1232
 
1233
		/* if we just wokeup a higher priority process, let it run */
1234
		if(p != nil && p->priority > up->priority)
1235
			sched();
1236
	}
1237
 
1238
	/*
1239
	 *  flow control, wait for queue to get below the limit
1240
	 *  before allowing the process to continue and queue
1241
	 *  more.  We do this here so that postnote can only
1242
	 *  interrupt us after the data has been queued.  This
1243
	 *  means that things like 9p flushes and ssl messages
1244
	 *  will not be disrupted by software interrupts.
1245
	 *
1246
	 *  Note - this is moderately dangerous since a process
1247
	 *  that keeps getting interrupted and rewriting will
1248
	 *  queue infinite crud.
1249
	 */
1250
	for(;;){
1251
		if(q->noblock || qnotfull(q))
1252
			break;
1253
 
1254
		ilock(q);
1255
		q->state |= Qflow;
1256
		iunlock(q);
1257
		sleep(&q->wr, qnotfull, q);
1258
	}
1259
	USED(b);
1260
 
1261
	qunlock(&q->wlock);
1262
	poperror();
1263
	return n;
1264
}
1265
 
1266
/*
1267
 *  write to a queue.  only Maxatomic bytes at a time is atomic.
1268
 */
1269
int
1270
qwrite(Queue *q, void *vp, int len)
1271
{
1272
	int n, sofar;
1273
	Block *b;
1274
	uchar *p = vp;
1275
 
1276
	QDEBUG if(!islo())
1277
		print("qwrite hi %#p\n", getcallerpc(&q));
1278
 
1279
	sofar = 0;
1280
	do {
1281
		n = len-sofar;
1282
		if(n > Maxatomic)
1283
			n = Maxatomic;
1284
 
1285
		b = allocb(n);
1286
		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1287
		if(waserror()){
1288
			freeb(b);
1289
			nexterror();
1290
		}
1291
		memmove(b->wp, p+sofar, n);
1292
		poperror();
1293
		b->wp += n;
1294
 
1295
		qbwrite(q, b);
1296
 
1297
		sofar += n;
1298
	} while(sofar < len && (q->state & Qmsg) == 0);
1299
 
1300
	return len;
1301
}
1302
 
1303
/*
1304
 *  used by print() to write to a queue.  Since we may be splhi or not in
1305
 *  a process, don't qlock.
1306
 *
1307
 *  this routine merges adjacent blocks if block n+1 will fit into
1308
 *  the free space of block n.
1309
 */
1310
int
1311
qiwrite(Queue *q, void *vp, int len)
1312
{
1313
	int n, sofar, dowakeup;
1314
	Block *b;
1315
	uchar *p = vp;
1316
 
1317
	dowakeup = 0;
1318
 
1319
	sofar = 0;
1320
	do {
1321
		n = len-sofar;
1322
		if(n > Maxatomic)
1323
			n = Maxatomic;
1324
 
1325
		b = iallocb(n);
1326
		if(b == nil)
1327
			break;
1328
		memmove(b->wp, p+sofar, n);
1329
		b->wp += n;
1330
 
1331
		ilock(q);
1332
 
1333
		/* we use an artificially high limit for kernel prints since anything
1334
		 * over the limit gets dropped
1335
		 */
1336
		if(q->dlen >= 16*1024){
1337
			iunlock(q);
1338
			freeb(b);
1339
			break;
1340
		}
1341
 
1342
		QDEBUG checkb(b, "qiwrite");
1343
		if(q->bfirst)
1344
			q->blast->next = b;
1345
		else
1346
			q->bfirst = b;
1347
		q->blast = b;
1348
		q->len += BALLOC(b);
1349
		q->dlen += n;
1350
 
1351
		if(q->state & Qstarve){
1352
			q->state &= ~Qstarve;
1353
			dowakeup = 1;
1354
		}
1355
 
1356
		iunlock(q);
1357
 
1358
		if(dowakeup){
1359
			if(q->kick)
1360
				q->kick(q->arg);
1361
			wakeup(&q->rr);
1362
		}
1363
 
1364
		sofar += n;
1365
	} while(sofar < len && (q->state & Qmsg) == 0);
1366
 
1367
	return sofar;
1368
}
1369
 
1370
/*
1371
 *  be extremely careful when calling this,
1372
 *  as there is no reference accounting
1373
 */
1374
void
1375
qfree(Queue *q)
1376
{
1377
	qclose(q);
1378
	free(q);
1379
}
1380
 
1381
/*
1382
 *  Mark a queue as closed.  No further IO is permitted.
1383
 *  All blocks are released.
1384
 */
1385
void
1386
qclose(Queue *q)
1387
{
1388
	Block *bfirst;
1389
 
1390
	if(q == nil)
1391
		return;
1392
 
1393
	/* mark it */
1394
	ilock(q);
1395
	q->state |= Qclosed;
1396
	q->state &= ~(Qflow|Qstarve);
1397
	strcpy(q->err, Ehungup);
1398
	bfirst = q->bfirst;
1399
	q->bfirst = 0;
1400
	q->len = 0;
1401
	q->dlen = 0;
1402
	q->noblock = 0;
1403
	iunlock(q);
1404
 
1405
	/* free queued blocks */
1406
	freeblist(bfirst);
1407
 
1408
	/* wake up readers/writers */
1409
	wakeup(&q->rr);
1410
	wakeup(&q->wr);
1411
}
1412
 
1413
/*
1414
 *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1415
 *  blocks.
1416
 */
1417
void
1418
qhangup(Queue *q, char *msg)
1419
{
1420
	/* mark it */
1421
	ilock(q);
1422
	q->state |= Qclosed;
1423
	if(msg == 0 || *msg == 0)
1424
		strcpy(q->err, Ehungup);
1425
	else
1426
		strncpy(q->err, msg, ERRMAX-1);
1427
	iunlock(q);
1428
 
1429
	/* wake up readers/writers */
1430
	wakeup(&q->rr);
1431
	wakeup(&q->wr);
1432
}
1433
 
1434
/*
1435
 *  return non-zero if the q is hungup
1436
 */
1437
int
1438
qisclosed(Queue *q)
1439
{
1440
	return q->state & Qclosed;
1441
}
1442
 
1443
/*
1444
 *  mark a queue as no longer hung up
1445
 */
1446
void
1447
qreopen(Queue *q)
1448
{
1449
	ilock(q);
1450
	q->state &= ~Qclosed;
1451
	q->state |= Qstarve;
1452
	q->eof = 0;
1453
	q->limit = q->inilim;
1454
	iunlock(q);
1455
}
1456
 
1457
/*
1458
 *  return bytes queued
1459
 */
1460
int
1461
qlen(Queue *q)
1462
{
1463
	return q->dlen;
1464
}
1465
 
1466
/*
1467
 * return space remaining before flow control
1468
 */
1469
int
1470
qwindow(Queue *q)
1471
{
1472
	int l;
1473
 
1474
	l = q->limit - q->len;
1475
	if(l < 0)
1476
		l = 0;
1477
	return l;
1478
}
1479
 
1480
/*
1481
 *  return true if we can read without blocking
1482
 */
1483
int
1484
qcanread(Queue *q)
1485
{
1486
	return q->bfirst!=0;
1487
}
1488
 
1489
/*
1490
 *  change queue limit
1491
 */
1492
void
1493
qsetlimit(Queue *q, int limit)
1494
{
1495
	q->limit = limit;
1496
}
1497
 
1498
/*
1499
 *  set blocking/nonblocking
1500
 */
1501
void
1502
qnoblock(Queue *q, int onoff)
1503
{
1504
	q->noblock = onoff;
1505
}
1506
 
1507
/*
1508
 *  flush the output queue
1509
 */
1510
void
1511
qflush(Queue *q)
1512
{
1513
	Block *bfirst;
1514
 
1515
	/* mark it */
1516
	ilock(q);
1517
	bfirst = q->bfirst;
1518
	q->bfirst = 0;
1519
	q->len = 0;
1520
	q->dlen = 0;
1521
	iunlock(q);
1522
 
1523
	/* free queued blocks */
1524
	freeblist(bfirst);
1525
 
1526
	/* wake up readers/writers */
1527
	wakeup(&q->wr);
1528
}
1529
 
1530
int
1531
qfull(Queue *q)
1532
{
1533
	return q->state & Qflow;
1534
}
1535
 
1536
int
1537
qstate(Queue *q)
1538
{
1539
	return q->state;
1540
}