Subversion Repositories planix.SVN

Rev

Blame | Last modification | View Log | RSS feed

#include <sys/param.h>
#include <sys/sockio.h>
#include <sys/proc.h>
#include <sys/vnode.h>
#include <sys/kernel.h>
#include <sys/sysctl.h>
#include <sys/malloc.h>
#include <sys/mount.h>
#include <sys/mbuf.h>
#include <sys/socket.h>
#include <sys/socketvar.h>
#include <sys/systm.h>
#include <sys/protosw.h>
#include <sys/syslog.h>

#include <netinet/in.h>
#include <netinet/tcp.h>

#include <vm/vm.h>
#include <vm/vm_extern.h>
#include <vm/vm_zone.h>

#include <net/if.h>
#include <net/route.h>
#include <netinet/in.h>

#include <9fs/bitstring.h>
#include <9fs/9p.h>
#include <9fs/9auth.h>
#include <9fs/9fs.h>

static int u9fs_reply __P((struct u9fsreq * req));
static int u9fs_send __P((struct socket * so, struct mbuf * mreq, struct u9fsreq * req));
static int u9fs_receive __P((struct socket * so, struct mbuf **mrep, struct u9fsreq * req));

static int u9fs_sndlock __P((int *flagp, int *statep, struct u9fsreq *rep));
static void u9fs_sndunlock __P((int *flagp, int *statep));
static int u9fs_rcvlock __P((struct u9fsreq *req));
static void u9fs_rcvunlock __P((int *flagp, int *statep));

int
u9fs_connect(struct socket ** sop, struct sockaddr * saddr, int sotype, int soproto, struct proc * p)
{
  register struct socket * so;
  int error, s;
  
  *sop = 0;
  error = socreate(saddr->sa_family, sop, sotype, soproto, p);
  if( error )
    return error;
  so = *sop;
  error = soconnect(so, saddr, p);
  if( error )
    return error;

  /*
   * Wait for the connection to complete. Cribbed from the
   * connect system call but with the wait timing out so
   * that interruptible mounts don't hang here for a long time.
   */
  s = splnet();
  while ((so->so_state & SS_ISCONNECTING) && so->so_error == 0)
    (void) tsleep((caddr_t)&so->so_timeo, PSOCK,
                  "u9fscon", 2 * hz);

  if (so->so_error) {
    error = so->so_error;
    so->so_error = 0;
    splx(s);
    return error;
  }
  splx(s);

  return (0);
}

int u9fs_connect_9auth(struct u9fsmount * nmp, struct u9fs_args * argp, struct socket ** sop)
{
  int error;
  struct proc * p = & proc0;
  struct sockaddr *nam;

  error = getsockaddr(&nam, (caddr_t)argp->authaddr, argp->authaddrlen);
  if( error )
    return error;
  error = u9fs_connect(sop, nam, argp->authsotype, 
                       argp->authsoproto, p);
  if( error == 0 )
    return 0;

  u9fs_disconnect(*sop);
  *sop = 0;
  return error;
}

/*
 * Initialize sockets and congestion for a new U9FS connection.
 * We do not free the sockaddr if error.
 */
int
u9fs_connect_9fs(nmp)
     register struct u9fsmount *nmp;
{
  register struct socket *so;
  int error, rcvreserve, sndreserve;
  struct proc *p = &proc0; /* only used for socreate and sobind */

  error = u9fs_connect(&nmp->nm_so, nmp->nm_nam, nmp->nm_sotype, 
                       nmp->nm_soproto, p);
  if (error)
    goto bad;
  so = nmp->nm_so;
  nmp->nm_soflags = so->so_proto->pr_flags;

  if (nmp->nm_flag & (U9FSMNT_SOFT | U9FSMNT_INT)) {
    so->so_rcv.sb_timeo = (5 * hz);
    so->so_snd.sb_timeo = (5 * hz);
  } else {
    so->so_rcv.sb_timeo = 0;
    so->so_snd.sb_timeo = 0;
  }

  /* XXX: i dont understand this, only one outstanding request? */
  if (nmp->nm_sotype == SOCK_SEQPACKET) {
    sndreserve = (nmp->nm_wsize) * 2;
    rcvreserve = (max(nmp->nm_rsize, nmp->nm_readdirsize)) * 2;
  } else {
    if (nmp->nm_sotype != SOCK_STREAM)
      panic("u9fscon sotype");
    if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
      struct sockopt sopt;
      int val;
      
      bzero(&sopt, sizeof sopt);
      sopt.sopt_level = SOL_SOCKET;
      sopt.sopt_name = SO_KEEPALIVE;
      sopt.sopt_val = &val;
      sopt.sopt_valsize = sizeof val;
      val = 1;
      sosetopt(so, &sopt);
    }
    if (so->so_proto->pr_protocol == IPPROTO_TCP) {
      struct sockopt sopt;
      int val;
      
      bzero(&sopt, sizeof sopt);
      sopt.sopt_level = IPPROTO_TCP;
      sopt.sopt_name = TCP_NODELAY;
      sopt.sopt_val = &val;
      sopt.sopt_valsize = sizeof val;
      val = 1;
      sosetopt(so, &sopt);
    }
    sndreserve = (nmp->nm_wsize) * 2;
    rcvreserve = (nmp->nm_rsize) * 2;
  }
  error = soreserve(so, sndreserve, rcvreserve);
  if (error)
    goto bad;
  so->so_rcv.sb_flags |= SB_NOINTR;
  so->so_snd.sb_flags |= SB_NOINTR;

  /* Initialize other non-zero congestion variables */
  nmp->nm_sent = 0;
  return (0);

bad:
  u9fs_disconnect(nmp->nm_so);
  nmp->nm_so = 0;
  return (error);
}

/*
 * U9FS disconnect. Clean up and unlink.
 */
void
u9fs_disconnect(struct socket * so)
{
    soshutdown(so, 2);
    soclose(so);
}

/*
 * Lock a socket against others.
 * Necessary for STREAM sockets to ensure you get an entire rpc request/reply
 * and also to avoid race conditions between the processes with u9fs requests
 * in progress when a reconnect is necessary.
 */
static int
u9fs_sndlock(flagp, statep, rep)
        register int *flagp;
        register int *statep;
        struct u9fsreq *rep;
{
        struct proc *p;
        int slpflag = 0, slptimeo = 0;

        if (rep) {
                p = rep->r_procp;
                if (rep->r_nmp->nm_flag & U9FSMNT_INT)
                        slpflag = PCATCH;
        } else
                p = (struct proc *)0;
        while (*statep & U9FSSTA_SNDLOCK) {
                if (u9fs_sigintr(rep->r_nmp, p))
                        return (EINTR);
                *statep |= U9FSSTA_WANTSND;
                (void) tsleep((caddr_t)flagp, slpflag | (PZERO - 1),
                        "u9fsndlck", slptimeo);
                if (slpflag == PCATCH) {
                        slpflag = 0;
                        slptimeo = 2 * hz;
                }
        }
        *statep |= U9FSSTA_SNDLOCK;
        return (0);
}


/*
 * Unlock the stream socket for others.
 */
static void
u9fs_sndunlock(flagp, statep)
        register int *flagp;
        register int *statep;
{

        if ((*statep & U9FSSTA_SNDLOCK) == 0)
                panic("u9fs sndunlock");
        *statep &= ~U9FSSTA_SNDLOCK;
        if (*statep & U9FSSTA_WANTSND) {
                *statep &= ~U9FSSTA_WANTSND;
                wakeup((caddr_t)flagp);
        }
}

/*
 * Test for a termination condition pending on the process.
 * This is used for U9FSMNT_INT mounts.
 */
int
u9fs_sigintr(nmp, p)
        struct u9fsmount *nmp;
        struct proc * p;
{
        if (!(nmp->nm_flag & U9FSMNT_INT))
                return (0);
        if (p && p->p_siglist &&
            (((p->p_siglist & ~p->p_sigmask) & ~p->p_sigignore) &
            U9FSINT_SIGMASK))
                return (EINTR);
        return (0);
}

/*
 * This is the u9fs send routine. For connection based socket types, it
 * must be called with an u9fs_sndlock() on the socket.
 * "rep == NULL" indicates that it has been called from a server.
 * For the client side:
 * - return EINTR if the RPC is terminated, 0 otherwise
 * - set R_MUSTRESEND if the send fails for any reason
 * - do any cleanup required by recoverable socket errors (?)
 * For the server side:
 * - return EINTR or ERESTART if interrupted by a signal
 * - return EPIPE if a connection is lost for connection based sockets (TCP...)
 * - do any cleanup required by recoverable socket errors (?)
 */
static int
u9fs_send(so, top, req)
        register struct socket *so;
        register struct mbuf *top;
        struct u9fsreq *req;
{
  int error, soflags, flags;
  
  soflags = so->so_proto->pr_flags;
  if (so->so_type == SOCK_SEQPACKET)
    flags = MSG_EOR;
  else
    flags = 0;
  
  error = so->so_proto->pr_usrreqs->pru_sosend(so, 0, 0, top, 0,
                                               flags, req->r_procp);
  if (error)
    log(LOG_INFO, "u9fs send error %d for server %s\n",error,
          req->r_nmp->nm_mountp->mnt_stat.f_mntfromname);

  return (error);
}

static int
u9fs_receive(so, mrep, req)     
     register struct socket * so;
     struct mbuf **mrep;
     struct u9fsreq * req;
{
  struct uio auio;
  u_int32_t len;
  int error = 0, sotype, rcvflg;
  
  /*
   * Set up arguments for soreceive()
   */
  *mrep = (struct mbuf *)0;
  sotype = req->r_nmp->nm_sotype;
  
  /*
   * For reliable protocols, lock against other senders/receivers
   * in case a reconnect is necessary.
   * For SOCK_STREAM, first get the Record Mark to find out how much
   * more there is to get.
   * We must lock the socket against other receivers
   * until we have an entire rpc request/reply.
   */
  if (sotype == SOCK_SEQPACKET ) {
    if( (so->so_state & SS_ISCONNECTED) == 0 )
      return (EACCES);
                auio.uio_resid = len = 1000000;
                auio.uio_procp = req->r_procp;
                do {
                        rcvflg = 0;
                        error =  so->so_proto->pr_usrreqs->pru_soreceive
                                (so, 0, &auio, mrep,
                                (struct mbuf **)0, &rcvflg);
                } while (error == EWOULDBLOCK);
                len -= auio.uio_resid;    
  }
  if (error) {
    m_freem(*mrep);
    *mrep = (struct mbuf *)0;
  }
  return (error);  
}

static int
u9fs_rcvlock(req)
        register struct u9fsreq *req;
{
        register int *flagp = &req->r_nmp->nm_flag;
        register int *statep = &req->r_nmp->nm_state;
        int slpflag, slptimeo = 0;

        if (*flagp & U9FSMNT_INT)
                slpflag = PCATCH;
        else
                slpflag = 0;
        while (*statep & U9FSSTA_RCVLOCK) {
                if (u9fs_sigintr(req->r_nmp, req->r_procp))
                        return (EINTR);
                *statep |= U9FSSTA_WANTRCV;
                (void) tsleep((caddr_t)flagp, slpflag | (PZERO - 1), "u9fsrcvlk",
                        slptimeo);
                /*
                 * If our reply was recieved while we were sleeping,
                 * then just return without taking the lock to avoid a
                 * situation where a single iod could 'capture' the
                 * recieve lock.
                 */
                if (req->r_mrep != NULL)
                        return (EALREADY);
                if (slpflag == PCATCH) {
                        slpflag = 0;
                        slptimeo = 2 * hz;
                }
        }
        *statep |= U9FSSTA_RCVLOCK;
        return (0);
}

/*
 * Unlock the stream socket for others.
 */
static void
u9fs_rcvunlock(flagp, statep)
        register int *flagp;
        register int *statep;
{

        if ((*statep & U9FSSTA_RCVLOCK) == 0)
                panic("u9fs rcvunlock");
        *statep &= ~U9FSSTA_RCVLOCK;
        if (*statep & U9FSSTA_WANTRCV) {
                *statep &= ~U9FSSTA_WANTRCV;
                wakeup((caddr_t)flagp);
        }
}

/*
 * Implement receipt of reply on a socket.
 * We must search through the list of received datagrams matching them
 * with outstanding requests using the xid, until ours is found.
 */
/* ARGSUSED */
static 
int u9fs_reply(struct u9fsreq * req)
{
  int error;
  struct mbuf * mrep;
  register struct u9fsmount *nmp = req->r_nmp;
  u_short tag;
  struct u9fsreq * qp;

  /*
   * Loop around until we get our own reply
   */
  for (;;) {
    /*
     * Lock against other receivers so that I don't get stuck in
     * sbwait() after someone else has received my reply for me.
     * Also necessary for connection based protocols to avoid
     * race conditions during a reconnect.
     * If u9fs_rcvlock() returns EALREADY, that means that
     * the reply has already been recieved by another
     * process and we can return immediately.  In this
     * case, the lock is not taken to avoid races with
     * other processes.
     */
    error = u9fs_rcvlock(req);
    if (error == EALREADY)
      return (0);
    if (error)
      return (error);
    /*
     * Get the next Rpc reply off the socket
     */
    error = u9fs_receive(nmp->nm_so, &mrep, req);
    u9fs_rcvunlock(&nmp->nm_flag, &nmp->nm_state);
    if (error)
      return (error);

    /* extract the tag */
    tag = u9p_m_tag(&mrep);

    /*
     * Loop through the request list to match up the reply
     * Iff no match, just drop the datagram
     */
    for (qp = nmp->nm_reqq.tqh_first; qp != 0; qp = qp->r_chain.tqe_next) {
      if ( qp->r_mrep == 0 && qp->r_tag == tag )
        break;
    }
    if( qp == 0 ) {
      m_freem(mrep);
      continue;
    }

    if( u9p_m_m2s(&mrep, qp->r_rep) ) { /* freed by m2s */
      continue;
    }

    qp->r_mrep = mrep;  /* should not be freed until the reply is read */

    if( qp == req )
      return 0;
  }
}

int u9fs_request(struct u9fsreq * req, struct u9fsreq * rep, int relm)
{
  struct mbuf * mreq;
  int error,s;
  struct u9fsmount * nmp;  

  req->r_rep = rep;
  req->r_mrep = 0;
  nmp = req->r_nmp;
  req->r_tag = u9fs_id_new(nmp->nm_tags);

  mreq = u9p_m_s2m(req);

  /*
   * Chain request into list of outstanding requests. Be sure
   * to put it LAST so timer finds oldest requests first.
   */
  s = splsoftclock();
  TAILQ_INSERT_TAIL(&nmp->nm_reqq, req, r_chain);
  splx(s);

  error = u9fs_send(nmp->nm_so, mreq, req);

  if( !error )
    error = u9fs_reply(req);

  /*
   * RPC done, unlink the request.
   */
  s = splsoftclock();
  TAILQ_REMOVE(&nmp->nm_reqq, req, r_chain);
  splx(s);

  u9fs_id_free(nmp->nm_tags, req->r_tag);

  if( !error && relm ) {
        m_freem(req->r_mrep);
        req->r_mrep = 0;
  }      
  if( rep->r_type == Rerror )
      error = EACCES;

  return error;
}