Subversion Repositories planix.SVN

Rev

Rev 2 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 - 1
/*
2
 *  Reliable User Datagram Protocol, currently only for IPv4.
3
 *  This protocol is compatible with UDP's packet format.
4
 *  It could be done over UDP if need be.
5
 */
6
#include	"u.h"
7
#include	"../port/lib.h"
8
#include	"mem.h"
9
#include	"dat.h"
10
#include	"fns.h"
11
#include	"../port/error.h"
12
 
13
#include	"ip.h"
14
 
15
#define DEBUG	0
16
#define DPRINT if(DEBUG)print
17
 
18
#define SEQDIFF(a,b) ( (a)>=(b)?\
19
			(a)-(b):\
20
			0xffffffffUL-((b)-(a)) )
21
#define INSEQ(a,start,end) ( (start)<=(end)?\
22
				((a)>(start)&&(a)<=(end)):\
23
				((a)>(start)||(a)<=(end)) )
24
#define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25
#define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
26
 
27
enum
28
{
29
	UDP_PHDRSIZE	= 12,	/* pseudo header */
30
//	UDP_HDRSIZE	= 20,	/* pseudo header + udp header */
31
	UDP_RHDRSIZE	= 36,	/* pseudo header + udp header + rudp header */
32
	UDP_IPHDR	= 8,	/* ip header */
33
	IP_UDPPROTO	= 254,
34
	UDP_USEAD7	= 52,	/* size of new ipv6 headers struct */
35
 
36
	Rudprxms	= 200,
37
	Rudptickms	= 50,
38
	Rudpmaxxmit	= 10,
39
	Maxunacked	= 100,
40
};
41
 
42
#define Hangupgen	0xffffffff	/* used only in hangup messages */
43
 
44
typedef struct Udphdr Udphdr;
45
struct Udphdr
46
{
47
	/* ip header */
48
	uchar	vihl;		/* Version and header length */
49
	uchar	tos;		/* Type of service */
50
	uchar	length[2];	/* packet length */
51
	uchar	id[2];		/* Identification */
52
	uchar	frag[2];	/* Fragment information */
53
 
54
	/* pseudo header starts here */
55
	uchar	Unused;
56
	uchar	udpproto;	/* Protocol */
57
	uchar	udpplen[2];	/* Header plus data length */
58
	uchar	udpsrc[4];	/* Ip source */
59
	uchar	udpdst[4];	/* Ip destination */
60
 
61
	/* udp header */
62
	uchar	udpsport[2];	/* Source port */
63
	uchar	udpdport[2];	/* Destination port */
64
	uchar	udplen[2];	/* data length */
65
	uchar	udpcksum[2];	/* Checksum */
66
};
67
 
68
typedef struct Rudphdr Rudphdr;
69
struct Rudphdr
70
{
71
	/* ip header */
72
	uchar	vihl;		/* Version and header length */
73
	uchar	tos;		/* Type of service */
74
	uchar	length[2];	/* packet length */
75
	uchar	id[2];		/* Identification */
76
	uchar	frag[2];	/* Fragment information */
77
 
78
	/* pseudo header starts here */
79
	uchar	Unused;
80
	uchar	udpproto;	/* Protocol */
81
	uchar	udpplen[2];	/* Header plus data length */
82
	uchar	udpsrc[4];	/* Ip source */
83
	uchar	udpdst[4];	/* Ip destination */
84
 
85
	/* udp header */
86
	uchar	udpsport[2];	/* Source port */
87
	uchar	udpdport[2];	/* Destination port */
88
	uchar	udplen[2];	/* data length (includes rudp header) */
89
	uchar	udpcksum[2];	/* Checksum */
90
 
91
	/* rudp header */
92
	uchar	relseq[4];	/* id of this packet (or 0) */
93
	uchar	relsgen[4];	/* generation/time stamp */
94
	uchar	relack[4];	/* packet being acked (or 0) */
95
	uchar	relagen[4];	/* generation/time stamp */
96
};
97
 
98
 
99
/*
100
 *  one state structure per destination
101
 */
102
typedef struct Reliable Reliable;
103
struct Reliable
104
{
105
	Ref;
106
 
107
	Reliable *next;
108
 
109
	uchar	addr[IPaddrlen];	/* always V6 when put here */
110
	ushort	port;
111
 
112
	Block	*unacked;	/* unacked msg list */
113
	Block	*unackedtail;	/*  and its tail */
114
 
115
	int	timeout;	/* time since first unacked msg sent */
116
	int	xmits;		/* number of times first unacked msg sent */
117
 
118
	ulong	sndseq;		/* next packet to be sent */
119
	ulong	sndgen;		/*  and its generation */
120
 
121
	ulong	rcvseq;		/* last packet received */
122
	ulong	rcvgen;		/*  and its generation */
123
 
124
	ulong	acksent;	/* last ack sent */
125
	ulong	ackrcvd;	/* last msg for which ack was rcvd */
126
 
127
	/* flow control */
128
	QLock	lock;
129
	Rendez	vous;
130
	int	blocked;
131
};
132
 
133
 
134
 
135
/* MIB II counters */
136
typedef struct Rudpstats Rudpstats;
137
struct Rudpstats
138
{
139
	ulong	rudpInDatagrams;
140
	ulong	rudpNoPorts;
141
	ulong	rudpInErrors;
142
	ulong	rudpOutDatagrams;
143
};
144
 
145
typedef struct Rudppriv Rudppriv;
146
struct Rudppriv
147
{
148
	Ipht	ht;
149
 
150
	/* MIB counters */
151
	Rudpstats	ustats;
152
 
153
	/* non-MIB stats */
154
	ulong	csumerr;		/* checksum errors */
155
	ulong	lenerr;			/* short packet */
156
	ulong	rxmits;			/* # of retransmissions */
157
	ulong	orders;			/* # of out of order pkts */
158
 
159
	/* keeping track of the ack kproc */
160
	int	ackprocstarted;
161
	QLock	apl;
162
};
163
 
164
 
165
static ulong generation = 0;
166
static Rendez rend;
167
 
168
/*
169
 *  protocol specific part of Conv
170
 */
171
typedef struct Rudpcb Rudpcb;
172
struct Rudpcb
173
{
174
	QLock;
175
	uchar	headers;
176
	uchar	randdrop;
177
	Reliable *r;
178
};
179
 
180
/*
181
 * local functions
182
 */
183
void	relsendack(Conv*, Reliable*, int);
184
int	reliput(Conv*, Block*, uchar*, ushort);
185
Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186
void	relput(Reliable*);
187
void	relforget(Conv *, uchar*, int, int);
188
void	relackproc(void *);
189
void	relackq(Reliable *, Block*);
190
void	relhangup(Conv *, Reliable*);
191
void	relrexmit(Conv *, Reliable*);
192
void	relput(Reliable*);
193
void	rudpkick(void *x);
194
 
195
static void
196
rudpstartackproc(Proto *rudp)
197
{
198
	Rudppriv *rpriv;
199
	char kpname[KNAMELEN];
200
 
201
	rpriv = rudp->priv;
202
	if(rpriv->ackprocstarted == 0){
203
		qlock(&rpriv->apl);
204
		if(rpriv->ackprocstarted == 0){
205
			snprint(kpname, sizeof kpname, "#I%drudpack",
206
				rudp->f->dev);
207
			kproc(kpname, relackproc, rudp);
208
			rpriv->ackprocstarted = 1;
209
		}
210
		qunlock(&rpriv->apl);
211
	}
212
}
213
 
214
static char*
215
rudpconnect(Conv *c, char **argv, int argc)
216
{
217
	char *e;
218
	Rudppriv *upriv;
219
 
220
	upriv = c->p->priv;
221
	rudpstartackproc(c->p);
222
	e = Fsstdconnect(c, argv, argc);
223
	Fsconnected(c, e);
224
	iphtadd(&upriv->ht, c);
225
 
226
	return e;
227
}
228
 
229
 
230
static int
231
rudpstate(Conv *c, char *state, int n)
232
{
233
	Rudpcb *ucb;
234
	Reliable *r;
235
	int m;
236
 
237
	m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
238
	ucb = (Rudpcb*)c->ptcl;
239
	qlock(ucb);
240
	for(r = ucb->r; r; r = r->next)
241
		m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
242
	m += snprint(state+m, n-m, "\n");
243
	qunlock(ucb);
244
	return m;
245
}
246
 
247
static char*
248
rudpannounce(Conv *c, char** argv, int argc)
249
{
250
	char *e;
251
	Rudppriv *upriv;
252
 
253
	upriv = c->p->priv;
254
	rudpstartackproc(c->p);
255
	e = Fsstdannounce(c, argv, argc);
256
	if(e != nil)
257
		return e;
258
	Fsconnected(c, nil);
259
	iphtadd(&upriv->ht, c);
260
 
261
	return nil;
262
}
263
 
264
static void
265
rudpcreate(Conv *c)
266
{
267
	c->rq = qopen(64*1024, Qmsg, 0, 0);
268
	c->wq = qopen(64*1024, Qkick, rudpkick, c);
269
}
270
 
271
static void
272
rudpclose(Conv *c)
273
{
274
	Rudpcb *ucb;
275
	Reliable *r, *nr;
276
	Rudppriv *upriv;
277
 
278
	upriv = c->p->priv;
279
	iphtrem(&upriv->ht, c);
280
 
281
	/* force out any delayed acks */
282
	ucb = (Rudpcb*)c->ptcl;
283
	qlock(ucb);
284
	for(r = ucb->r; r; r = r->next){
285
		if(r->acksent != r->rcvseq)
286
			relsendack(c, r, 0);
287
	}
288
	qunlock(ucb);
289
 
290
	qclose(c->rq);
291
	qclose(c->wq);
292
	qclose(c->eq);
293
	ipmove(c->laddr, IPnoaddr);
294
	ipmove(c->raddr, IPnoaddr);
295
	c->lport = 0;
296
	c->rport = 0;
297
 
298
	ucb->headers = 0;
299
	ucb->randdrop = 0;
300
	qlock(ucb);
301
	for(r = ucb->r; r; r = nr){
302
		if(r->acksent != r->rcvseq)
303
			relsendack(c, r, 0);
304
		nr = r->next;
305
		relhangup(c, r);
306
		relput(r);
307
	}
308
	ucb->r = 0;
309
 
310
	qunlock(ucb);
311
}
312
 
313
/*
314
 *  randomly don't send packets
315
 */
316
static void
317
doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
318
{
319
	Rudpcb *ucb;
320
 
321
	ucb = (Rudpcb*)c->ptcl;
322
	if(ucb->randdrop && nrand(100) < ucb->randdrop)
323
		freeblist(bp);
324
	else
325
		ipoput4(f, bp, x, ttl, tos, nil);
326
}
327
 
328
int
329
flow(void *v)
330
{
331
	Reliable *r = v;
332
 
333
	return UNACKED(r) <= Maxunacked;
334
}
335
 
336
void
337
rudpkick(void *x)
338
{
339
	Conv *c = x;
340
	Udphdr *uh;
341
	ushort rport;
342
	uchar laddr[IPaddrlen], raddr[IPaddrlen];
343
	Block *bp;
344
	Rudpcb *ucb;
345
	Rudphdr *rh;
346
	Reliable *r;
347
	int dlen, ptcllen;
348
	Rudppriv *upriv;
349
	Fs *f;
350
 
351
	upriv = c->p->priv;
352
	f = c->p->f;
353
 
354
	netlog(c->p->f, Logrudp, "rudp: kick\n");
355
	bp = qget(c->wq);
356
	if(bp == nil)
357
		return;
358
 
359
	ucb = (Rudpcb*)c->ptcl;
360
	switch(ucb->headers) {
361
	case 7:
362
		/* get user specified addresses */
363
		bp = pullupblock(bp, UDP_USEAD7);
364
		if(bp == nil)
365
			return;
366
		ipmove(raddr, bp->rp);
367
		bp->rp += IPaddrlen;
368
		ipmove(laddr, bp->rp);
369
		bp->rp += IPaddrlen;
370
		/* pick interface closest to dest */
371
		if(ipforme(f, laddr) != Runi)
372
			findlocalip(f, laddr, raddr);
373
		bp->rp += IPaddrlen;		/* Ignore ifc address */
374
		rport = nhgets(bp->rp);
375
		bp->rp += 2+2;			/* Ignore local port */
376
		break;
377
	default:
378
		ipmove(raddr, c->raddr);
379
		ipmove(laddr, c->laddr);
380
		rport = c->rport;
381
		break;
382
	}
383
 
384
	dlen = blocklen(bp);
385
 
386
	/* Make space to fit rudp & ip header */
387
	bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
388
	if(bp == nil)
389
		return;
390
 
391
	uh = (Udphdr *)(bp->rp);
392
	uh->vihl = IP_VER4;
393
 
394
	rh = (Rudphdr*)uh;
395
 
396
	ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
397
	uh->Unused = 0;
398
	uh->udpproto = IP_UDPPROTO;
399
	uh->frag[0] = 0;
400
	uh->frag[1] = 0;
401
	hnputs(uh->udpplen, ptcllen);
402
	switch(ucb->headers){
403
	case 7:
404
		v6tov4(uh->udpdst, raddr);
405
		hnputs(uh->udpdport, rport);
406
		v6tov4(uh->udpsrc, laddr);
407
		break;
408
	default:
409
		v6tov4(uh->udpdst, c->raddr);
410
		hnputs(uh->udpdport, c->rport);
411
		if(ipcmp(c->laddr, IPnoaddr) == 0)
412
			findlocalip(f, c->laddr, c->raddr);
413
		v6tov4(uh->udpsrc, c->laddr);
414
		break;
415
	}
416
	hnputs(uh->udpsport, c->lport);
417
	hnputs(uh->udplen, ptcllen);
418
	uh->udpcksum[0] = 0;
419
	uh->udpcksum[1] = 0;
420
 
421
	qlock(ucb);
422
	r = relstate(ucb, raddr, rport, "kick");
423
	r->sndseq = NEXTSEQ(r->sndseq);
424
	hnputl(rh->relseq, r->sndseq);
425
	hnputl(rh->relsgen, r->sndgen);
426
 
427
	hnputl(rh->relack, r->rcvseq);  /* ACK last rcvd packet */
428
	hnputl(rh->relagen, r->rcvgen);
429
 
430
	if(r->rcvseq != r->acksent)
431
		r->acksent = r->rcvseq;
432
 
433
	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
434
 
435
	relackq(r, bp);
436
	qunlock(ucb);
437
 
438
	upriv->ustats.rudpOutDatagrams++;
439
 
440
	DPRINT("sent: %lud/%lud, %lud/%lud\n",
441
		r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
442
 
443
	doipoput(c, f, bp, 0, c->ttl, c->tos);
444
 
445
	if(waserror()) {
446
		relput(r);
447
		qunlock(&r->lock);
448
		nexterror();
449
	}
450
 
451
	/* flow control of sorts */
452
	qlock(&r->lock);
453
	if(UNACKED(r) > Maxunacked){
454
		r->blocked = 1;
455
		sleep(&r->vous, flow, r);
456
		r->blocked = 0;
457
	}
458
 
459
	qunlock(&r->lock);
460
	relput(r);
461
	poperror();
462
}
463
 
464
void
465
rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
466
{
467
	int len, olen, ottl;
468
	Udphdr *uh;
469
	Conv *c;
470
	Rudpcb *ucb;
471
	uchar raddr[IPaddrlen], laddr[IPaddrlen];
472
	ushort rport, lport;
473
	Rudppriv *upriv;
474
	Fs *f;
475
	uchar *p;
476
 
477
	upriv = rudp->priv;
478
	f = rudp->f;
479
 
480
	upriv->ustats.rudpInDatagrams++;
481
 
482
	uh = (Udphdr*)(bp->rp);
483
 
484
	/* Put back pseudo header for checksum
485
	 * (remember old values for icmpnoconv())
486
	 */
487
	ottl = uh->Unused;
488
	uh->Unused = 0;
489
	len = nhgets(uh->udplen);
490
	olen = nhgets(uh->udpplen);
491
	hnputs(uh->udpplen, len);
492
 
493
	v4tov6(raddr, uh->udpsrc);
494
	v4tov6(laddr, uh->udpdst);
495
	lport = nhgets(uh->udpdport);
496
	rport = nhgets(uh->udpsport);
497
 
498
	if(nhgets(uh->udpcksum)) {
499
		if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
500
			upriv->ustats.rudpInErrors++;
501
			upriv->csumerr++;
502
			netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
503
			DPRINT("rudp: checksum error %I\n", raddr);
504
			freeblist(bp);
505
			return;
506
		}
507
	}
508
 
509
	qlock(rudp);
510
 
511
	c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
512
	if(c == nil){
513
		/* no conversation found */
514
		upriv->ustats.rudpNoPorts++;
515
		qunlock(rudp);
516
		netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
517
			laddr, lport);
518
		uh->Unused = ottl;
519
		hnputs(uh->udpplen, olen);
520
		icmpnoconv(f, bp);
521
		freeblist(bp);
522
		return;
523
	}
524
	ucb = (Rudpcb*)c->ptcl;
525
	qlock(ucb);
526
	qunlock(rudp);
527
 
528
	if(reliput(c, bp, raddr, rport) < 0){
529
		qunlock(ucb);
530
		freeb(bp);
531
		return;
532
	}
533
 
534
	/*
535
	 * Trim the packet down to data size
536
	 */
537
 
538
	len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
539
	bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
540
	if(bp == nil) {
541
		netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
542
			raddr, rport, laddr, lport);
543
		DPRINT("rudp: len err %I.%d -> %I.%d\n",
544
			raddr, rport, laddr, lport);
545
		upriv->lenerr++;
546
		return;
547
	}
548
 
549
	netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
550
		raddr, rport, laddr, lport, len);
551
 
552
	switch(ucb->headers){
553
	case 7:
554
		/* pass the src address */
555
		bp = padblock(bp, UDP_USEAD7);
556
		p = bp->rp;
557
		ipmove(p, raddr); p += IPaddrlen;
558
		ipmove(p, laddr); p += IPaddrlen;
559
		ipmove(p, ifc->lifc->local); p += IPaddrlen;
560
		hnputs(p, rport); p += 2;
561
		hnputs(p, lport);
562
		break;
563
	default:
564
		/* connection oriented rudp */
565
		if(ipcmp(c->raddr, IPnoaddr) == 0){
566
			/* save the src address in the conversation */
567
		 	ipmove(c->raddr, raddr);
568
			c->rport = rport;
569
 
570
			/* reply with the same ip address (if not broadcast) */
571
			if(ipforme(f, laddr) == Runi)
572
				ipmove(c->laddr, laddr);
573
			else
574
				v4tov6(c->laddr, ifc->lifc->local);
575
		}
576
		break;
577
	}
578
	if(bp->next)
579
		bp = concatblock(bp);
580
 
581
	if(qfull(c->rq)) {
582
		netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
583
			laddr, lport);
584
		freeblist(bp);
585
	}
586
	else
587
		qpass(c->rq, bp);
588
 
589
	qunlock(ucb);
590
}
591
 
592
static char *rudpunknown = "unknown rudp ctl request";
593
 
594
char*
595
rudpctl(Conv *c, char **f, int n)
596
{
597
	Rudpcb *ucb;
598
	uchar ip[IPaddrlen];
599
	int x;
600
 
601
	ucb = (Rudpcb*)c->ptcl;
602
	if(n < 1)
603
		return rudpunknown;
604
 
605
	if(strcmp(f[0], "headers") == 0){
606
		ucb->headers = 7;		/* new headers format */
607
		return nil;
608
	} else if(strcmp(f[0], "hangup") == 0){
609
		if(n < 3)
610
			return "bad syntax";
611
		if (parseip(ip, f[1]) == -1)
612
			return Ebadip;
613
		x = atoi(f[2]);
614
		qlock(ucb);
615
		relforget(c, ip, x, 1);
616
		qunlock(ucb);
617
		return nil;
618
	} else if(strcmp(f[0], "randdrop") == 0){
619
		x = 10;			/* default is 10% */
620
		if(n > 1)
621
			x = atoi(f[1]);
622
		if(x > 100 || x < 0)
623
			return "illegal rudp drop rate";
624
		ucb->randdrop = x;
625
		return nil;
626
	}
627
	return rudpunknown;
628
}
629
 
630
void
631
rudpadvise(Proto *rudp, Block *bp, char *msg)
632
{
633
	Udphdr *h;
634
	uchar source[IPaddrlen], dest[IPaddrlen];
635
	ushort psource, pdest;
636
	Conv *s, **p;
637
 
638
	h = (Udphdr*)(bp->rp);
639
 
640
	v4tov6(dest, h->udpdst);
641
	v4tov6(source, h->udpsrc);
642
	psource = nhgets(h->udpsport);
643
	pdest = nhgets(h->udpdport);
644
 
645
	/* Look for a connection */
646
	for(p = rudp->conv; *p; p++) {
647
		s = *p;
648
		if(s->rport == pdest)
649
		if(s->lport == psource)
650
		if(ipcmp(s->raddr, dest) == 0)
651
		if(ipcmp(s->laddr, source) == 0){
652
			qhangup(s->rq, msg);
653
			qhangup(s->wq, msg);
654
			break;
655
		}
656
	}
657
	freeblist(bp);
658
}
659
 
660
int
661
rudpstats(Proto *rudp, char *buf, int len)
662
{
663
	Rudppriv *upriv;
664
 
665
	upriv = rudp->priv;
666
	return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
667
		upriv->ustats.rudpInDatagrams,
668
		upriv->ustats.rudpNoPorts,
669
		upriv->ustats.rudpInErrors,
670
		upriv->ustats.rudpOutDatagrams,
671
		upriv->rxmits,
672
		upriv->orders);
673
}
674
 
675
void
676
rudpinit(Fs *fs)
677
{
678
 
679
	Proto *rudp;
680
 
681
	rudp = smalloc(sizeof(Proto));
682
	rudp->priv = smalloc(sizeof(Rudppriv));
683
	rudp->name = "rudp";
684
	rudp->connect = rudpconnect;
685
	rudp->announce = rudpannounce;
686
	rudp->ctl = rudpctl;
687
	rudp->state = rudpstate;
688
	rudp->create = rudpcreate;
689
	rudp->close = rudpclose;
690
	rudp->rcv = rudpiput;
691
	rudp->advise = rudpadvise;
692
	rudp->stats = rudpstats;
693
	rudp->ipproto = IP_UDPPROTO;
694
	rudp->nc = 32;
695
	rudp->ptclsize = sizeof(Rudpcb);
696
 
697
	Fsproto(fs, rudp);
698
}
699
 
700
/*********************************************/
701
/* Here starts the reliable helper functions */
702
/*********************************************/
703
/*
704
 *  Enqueue a copy of an unacked block for possible retransmissions
705
 */
706
void
707
relackq(Reliable *r, Block *bp)
708
{
709
	Block *np;
710
 
711
	np = copyblock(bp, blocklen(bp));
712
	if(r->unacked)
713
		r->unackedtail->list = np;
714
	else {
715
		/* restart timer */
716
		r->timeout = 0;
717
		r->xmits = 1;
718
		r->unacked = np;
719
	}
720
	r->unackedtail = np;
721
	np->list = nil;
722
}
723
 
724
/*
725
 *  retransmit unacked blocks
726
 */
727
void
728
relackproc(void *a)
729
{
730
	Rudpcb *ucb;
731
	Proto *rudp;
732
	Reliable *r;
733
	Conv **s, *c;
734
 
735
	rudp = (Proto *)a;
736
 
737
loop:
738
	tsleep(&up->sleep, return0, 0, Rudptickms);
739
 
740
	for(s = rudp->conv; *s; s++) {
741
		c = *s;
742
		ucb = (Rudpcb*)c->ptcl;
743
		qlock(ucb);
744
 
745
		for(r = ucb->r; r; r = r->next) {
746
			if(r->unacked != nil){
747
				r->timeout += Rudptickms;
748
				if(r->timeout > Rudprxms*r->xmits)
749
					relrexmit(c, r);
750
			}
751
			if(r->acksent != r->rcvseq)
752
				relsendack(c, r, 0);
753
		}
754
		qunlock(ucb);
755
	}
756
	goto loop;
757
}
758
 
759
/*
760
 *  get the state record for a conversation
761
 */
762
Reliable*
763
relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
764
{
765
	Reliable *r, **l;
766
 
767
	l = &ucb->r;
768
	for(r = *l; r; r = *l){
769
		if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
770
		    port == r->port)
771
			break;
772
		l = &r->next;
773
	}
774
 
775
	/* no state for this addr/port, create some */
776
	if(r == nil){
777
		while(generation == 0)
778
			generation = rand();
779
 
780
		DPRINT("from %s new state %lud for %I!%ud\n",
781
		        from, generation, addr, port);
782
 
783
		r = smalloc(sizeof(Reliable));
784
		memmove(r->addr, addr, IPaddrlen);
785
		r->port = port;
786
		r->unacked = 0;
787
		if(generation == Hangupgen)
788
			generation++;
789
		r->sndgen = generation++;
790
		r->sndseq = 0;
791
		r->ackrcvd = 0;
792
		r->rcvgen = 0;
793
		r->rcvseq = 0;
794
		r->acksent = 0;
795
		r->xmits = 0;
796
		r->timeout = 0;
797
		r->ref = 0;
798
		incref(r);	/* one reference for being in the list */
799
 
800
		*l = r;
801
	}
802
 
803
	incref(r);
804
	return r;
805
}
806
 
807
void
808
relput(Reliable *r)
809
{
810
	if(decref(r) == 0)
811
		free(r);
812
}
813
 
814
/*
815
 *  forget a Reliable state
816
 */
817
void
818
relforget(Conv *c, uchar *ip, int port, int originator)
819
{
820
	Rudpcb *ucb;
821
	Reliable *r, **l;
822
 
823
	ucb = (Rudpcb*)c->ptcl;
824
 
825
	l = &ucb->r;
826
	for(r = *l; r; r = *l){
827
		if(ipcmp(ip, r->addr) == 0 && port == r->port){
828
			*l = r->next;
829
			if(originator)
830
				relsendack(c, r, 1);
831
			relhangup(c, r);
832
			relput(r);	/* remove from the list */
833
			break;
834
		}
835
		l = &r->next;
836
	}
837
}
838
 
839
/*
840
 *  process a rcvd reliable packet. return -1 if not to be passed to user process,
841
 *  0 therwise.
842
 *
843
 *  called with ucb locked.
844
 */
845
int
846
reliput(Conv *c, Block *bp, uchar *addr, ushort port)
847
{
848
	Block *nbp;
849
	Rudpcb *ucb;
850
	Rudppriv *upriv;
851
	Udphdr *uh;
852
	Reliable *r;
853
	Rudphdr *rh;
854
	ulong seq, ack, sgen, agen, ackreal;
855
	int rv = -1;
856
 
857
	/* get fields */
858
	uh = (Udphdr*)(bp->rp);
859
	rh = (Rudphdr*)uh;
860
	seq = nhgetl(rh->relseq);
861
	sgen = nhgetl(rh->relsgen);
862
	ack = nhgetl(rh->relack);
863
	agen = nhgetl(rh->relagen);
864
 
865
	upriv = c->p->priv;
866
	ucb = (Rudpcb*)c->ptcl;
867
	r = relstate(ucb, addr, port, "input");
868
 
869
	DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
870
		seq, sgen, ack, agen, r->sndgen);
871
 
872
	/* if acking an incorrect generation, ignore */
873
	if(ack && agen != r->sndgen)
874
		goto out;
875
 
876
	/* Look for a hangup */
877
	if(sgen == Hangupgen) {
878
		if(agen == r->sndgen)
879
			relforget(c, addr, port, 0);
880
		goto out;
881
	}
882
 
883
	/* make sure we're not talking to a new remote side */
884
	if(r->rcvgen != sgen){
885
		if(seq != 0 && seq != 1)
886
			goto out;
887
 
888
		/* new connection */
889
		if(r->rcvgen != 0){
890
			DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
891
			relhangup(c, r);
892
		}
893
		r->rcvgen = sgen;
894
	}
895
 
896
	/* dequeue acked packets */
897
	if(ack && agen == r->sndgen){
898
		ackreal = 0;
899
		while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
900
			nbp = r->unacked;
901
			r->unacked = nbp->list;
902
			DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
903
			       ack, agen, r->sndgen);
904
			freeb(nbp);
905
			r->ackrcvd = NEXTSEQ(r->ackrcvd);
906
			ackreal = 1;
907
		}
908
 
909
		/* flow control */
910
		if(UNACKED(r) < Maxunacked/8 && r->blocked)
911
			wakeup(&r->vous);
912
 
913
		/*
914
		 *  retransmit next packet if the acked packet
915
		 *  was transmitted more than once
916
		 */
917
		if(ackreal && r->unacked != nil){
918
			r->timeout = 0;
919
			if(r->xmits > 1){
920
				r->xmits = 1;
921
				relrexmit(c, r);
922
			}
923
		}
924
 
925
	}
926
 
927
	/* no message or input queue full */
928
	if(seq == 0 || qfull(c->rq))
929
		goto out;
930
 
931
	/* refuse out of order delivery */
932
	if(seq != NEXTSEQ(r->rcvseq)){
933
		relsendack(c, r, 0);	/* tell him we got it already */
934
		upriv->orders++;
935
		DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
936
		goto out;
937
	}
938
	r->rcvseq = seq;
939
 
940
	rv = 0;
941
out:
942
	relput(r);
943
	return rv;
944
}
945
 
946
void
947
relsendack(Conv *c, Reliable *r, int hangup)
948
{
949
	Udphdr *uh;
950
	Block *bp;
951
	Rudphdr *rh;
952
	int ptcllen;
953
	Fs *f;
954
 
955
	bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
956
	if(bp == nil)
957
		return;
958
	bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
959
	f = c->p->f;
960
	uh = (Udphdr *)(bp->rp);
961
	uh->vihl = IP_VER4;
962
	rh = (Rudphdr*)uh;
963
 
964
	ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
965
	uh->Unused = 0;
966
	uh->udpproto = IP_UDPPROTO;
967
	uh->frag[0] = 0;
968
	uh->frag[1] = 0;
969
	hnputs(uh->udpplen, ptcllen);
970
 
971
	v6tov4(uh->udpdst, r->addr);
972
	hnputs(uh->udpdport, r->port);
973
	hnputs(uh->udpsport, c->lport);
974
	if(ipcmp(c->laddr, IPnoaddr) == 0)
975
		findlocalip(f, c->laddr, c->raddr);
976
	v6tov4(uh->udpsrc, c->laddr);
977
	hnputs(uh->udplen, ptcllen);
978
 
979
	if(hangup)
980
		hnputl(rh->relsgen, Hangupgen);
981
	else
982
		hnputl(rh->relsgen, r->sndgen);
983
	hnputl(rh->relseq, 0);
984
	hnputl(rh->relagen, r->rcvgen);
985
	hnputl(rh->relack, r->rcvseq);
986
 
987
	if(r->acksent < r->rcvseq)
988
		r->acksent = r->rcvseq;
989
 
990
	uh->udpcksum[0] = 0;
991
	uh->udpcksum[1] = 0;
992
	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
993
 
994
	DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
995
	doipoput(c, f, bp, 0, c->ttl, c->tos);
996
}
997
 
998
 
999
/*
1000
 *  called with ucb locked (and c locked if user initiated close)
1001
 */
1002
void
1003
relhangup(Conv *c, Reliable *r)
1004
{
1005
	int n;
1006
	Block *bp;
1007
	char hup[ERRMAX];
1008
 
1009
	n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1010
	qproduce(c->eq, hup, n);
1011
 
1012
	/*
1013
	 *  dump any unacked outgoing messages
1014
	 */
1015
	for(bp = r->unacked; bp != nil; bp = r->unacked){
1016
		r->unacked = bp->list;
1017
		bp->list = nil;
1018
		freeb(bp);
1019
	}
1020
 
1021
	r->rcvgen = 0;
1022
	r->rcvseq = 0;
1023
	r->acksent = 0;
1024
	if(generation == Hangupgen)
1025
		generation++;
1026
	r->sndgen = generation++;
1027
	r->sndseq = 0;
1028
	r->ackrcvd = 0;
1029
	r->xmits = 0;
1030
	r->timeout = 0;
1031
	wakeup(&r->vous);
1032
}
1033
 
1034
/*
1035
 *  called with ucb locked
1036
 */
1037
void
1038
relrexmit(Conv *c, Reliable *r)
1039
{
1040
	Rudppriv *upriv;
1041
	Block *np;
1042
	Fs *f;
1043
 
1044
	upriv = c->p->priv;
1045
	f = c->p->f;
1046
	r->timeout = 0;
1047
	if(r->xmits++ > Rudpmaxxmit){
1048
		relhangup(c, r);
1049
		return;
1050
	}
1051
 
1052
	upriv->rxmits++;
1053
	np = copyblock(r->unacked, blocklen(r->unacked));
1054
	DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1055
	doipoput(c, f, np, 0, c->ttl, c->tos);
1056
}