plan9port/src/cmd/9pserve.c
Michael Teichgräber 4fe82be00a Related to issue
http://bitbucket.org/rsc/plan9port/issue/38/

This patch tries to make 9pserve work again with
9P2000 clients, in case the server handled by 9pserve
expects 9P2000.u.

(Since changeset
http://bitbucket.org/rsc/plan9port/changeset/d9843471f0bc/
servers using srv() stop communicating after they
received a 9P2000 msg that has a different .u
definition.)

Tattach, Tauth and Tcreate 9P2000 messages will
be translated now, if neccessary, to 9P2000.u by
growing them to the new size, and providing the
missing (default) values.

The code of the first two message types has been
tested with drawterm on linux (dialing factotum),
and mounts from within 9vx to plan9port's factotum
and dossrv.

The code for Tcreate couldn't be tested due to lack
of file servers both speaking 9P2000.u and allowing file
creation.

R=rsc_swtch, rsc
CC=codebot
http://codereview.appspot.com/151052
2009-11-30 12:39:50 -08:00

1588 lines
28 KiB
C

#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <errno.h>
#define err err9pserve /* Darwin x86 */
enum
{
STACK = 32768,
NHASH = 31,
MAXMSG = 64, /* per connection */
MAXMSGSIZE = 4*1024*1024
};
typedef struct Hash Hash;
typedef struct Fid Fid;
typedef struct Msg Msg;
typedef struct Conn Conn;
typedef struct Queue Queue;
struct Hash
{
Hash *next;
uint n;
void *v;
};
struct Fid
{
int fid;
int ref;
int cfid;
int openfd;
int offset;
int coffset;
int isdir;
Fid *next;
};
struct Msg
{
Conn *c;
int internal;
int sync;
int ref;
int ctag;
int tag;
int isopenfd;
Fcall tx;
Fcall rx;
Fid *fid;
Fid *newfid;
Fid *afid;
Msg *oldm;
Msg *next;
uchar *tpkt;
uchar *rpkt;
};
struct Conn
{
int fd;
int fdmode;
Fid *fdfid;
int nmsg;
int nfid;
Channel *inc;
Channel *internal;
int inputstalled;
char dir[40];
Hash *tag[NHASH];
Hash *fid[NHASH];
Queue *outq;
Queue *inq;
Channel *outqdead;
int dotu;
};
char *xaname;
char *addr;
int afd;
char adir[40];
int isunix;
Queue *outq;
Queue *inq;
int verbose = 0;
int logging = 0;
int msize = 8192;
u32int xafid = NOFID;
int attached;
int versioned;
int dotu;
int noauth;
void *gethash(Hash**, uint);
int puthash(Hash**, uint, void*);
int delhash(Hash**, uint, void*);
Msg *mread9p(Ioproc*, int, int);
int mwrite9p(Ioproc*, int, uchar*);
uchar *read9ppkt(Ioproc*, int);
int write9ppkt(int, uchar*);
Msg *msgnew(int);
void msgput(Msg*);
void msgclear(Msg*);
Msg *msgget(int);
void msgincref(Msg*);
Fid *fidnew(int);
void fidput(Fid*);
void *emalloc(int);
void *erealloc(void*, int);
Queue *qalloc(void);
int sendq(Queue*, void*);
void *recvq(Queue*);
void connthread(void*);
void connoutthread(void*);
void listenthread(void*);
void outputthread(void*);
void inputthread(void*);
void rewritehdr(Fcall*, uchar*);
void repack(Fcall*, uchar**, int);
int tlisten(char*, char*);
int taccept(int, char*);
int iolisten(Ioproc*, char*, char*);
int ioaccept(Ioproc*, int, char*);
int iorecvfd(Ioproc*, int);
int iosendfd(Ioproc*, int, int);
void mainproc(void*);
int ignorepipe(void*, char*);
int timefmt(Fmt*);
void dorootstat(void);
int stripudirread(Msg*);
int cvtustat(Fcall*, uchar**, int);
void cvtuauthattach(Fcall*, uchar**);
void cvtucreate(Fcall*, uchar**);
void
usage(void)
{
fprint(2, "usage: 9pserve [-lnv] [-A aname afid] [-c addr] [-M msize] address\n");
fprint(2, "\treads/writes 9P messages on stdin/stdout\n");
threadexitsall("usage");
}
uchar vbuf[128];
extern int _threaddebuglevel;
void
threadmain(int argc, char **argv)
{
char *file, *x, *addr;
int fd;
x = getenv("verbose9pserve");
if(x){
verbose = atoi(x);
fprint(2, "verbose9pserve %s => %d\n", x, verbose);
}
ARGBEGIN{
default:
usage();
case 'A':
attached = 1;
xaname = EARGF(usage());
xafid = atoi(EARGF(usage()));
break;
case 'M':
versioned = 1;
msize = atoi(EARGF(usage()));
break;
case 'c':
addr = netmkaddr(EARGF(usage()), "net", "9fs");
if((fd = dial(addr, nil, nil, nil)) < 0)
sysfatal("dial %s: %r", addr);
dup(fd, 0);
dup(fd, 1);
if(fd > 1)
close(fd);
break;
case 'n':
noauth = 1;
break;
case 'v':
verbose++;
break;
case 'u':
isunix++;
break;
case 'l':
logging++;
break;
}ARGEND
if(attached && !versioned){
fprint(2, "-A must be used with -M\n");
usage();
}
if(argc != 1)
usage();
addr = argv[0];
fmtinstall('T', timefmt);
if((afd = announce(addr, adir)) < 0)
sysfatal("announce %s: %r", addr);
if(logging){
if(strncmp(addr, "unix!", 5) == 0)
addr += 5;
file = smprint("%s.log", addr);
if(file == nil)
sysfatal("smprint log: %r");
if((fd = create(file, OWRITE, 0666)) < 0)
sysfatal("create %s: %r", file);
dup(fd, 2);
if(fd > 2)
close(fd);
}
if(verbose) fprint(2, "%T 9pserve running\n");
proccreate(mainproc, nil, STACK);
}
void
mainproc(void *v)
{
int n, nn;
Fcall f;
USED(v);
atnotify(ignorepipe, 1);
fmtinstall('D', dirfmt);
fmtinstall('M', dirmodefmt);
fmtinstall('F', fcallfmt);
fmtinstall('H', encodefmt);
outq = qalloc();
inq = qalloc();
if(!versioned){
f.type = Tversion;
f.version = "9P2000.u";
f.msize = msize;
f.tag = NOTAG;
n = convS2M(&f, vbuf, sizeof vbuf);
if(n <= BIT16SZ)
sysfatal("convS2M conversion error");
if(verbose > 1) fprint(2, "%T * <- %F\n", &f);
nn = write(1, vbuf, n);
if(n != nn)
sysfatal("error writing Tversion: %r\n");
n = read9pmsg(0, vbuf, sizeof vbuf);
if(n < 0)
sysfatal("read9pmsg failure");
if(convM2S(vbuf, n, &f) != n)
sysfatal("convM2S failure");
if(f.msize < msize)
msize = f.msize;
if(verbose > 1) fprint(2, "%T * -> %F\n", &f);
dotu = strncmp(f.version, "9P2000.u", 8) == 0;
}
threadcreate(inputthread, nil, STACK);
threadcreate(outputthread, nil, STACK);
/* if(rootfid) */
/* dorootstat(); */
threadcreate(listenthread, nil, STACK);
threadexits(0);
}
int
ignorepipe(void *v, char *s)
{
USED(v);
if(strcmp(s, "sys: write on closed pipe") == 0)
return 1;
if(strcmp(s, "sys: tstp") == 0)
return 1;
if(strcmp(s, "sys: window size change") == 0)
return 1;
fprint(2, "9pserve %s: %T note: %s\n", addr, s);
return 0;
}
void
listenthread(void *arg)
{
Conn *c;
Ioproc *io;
io = ioproc();
USED(arg);
threadsetname("listen %s", adir);
for(;;){
c = emalloc(sizeof(Conn));
c->fd = iolisten(io, adir, c->dir);
if(c->fd < 0){
if(verbose) fprint(2, "%T listen: %r\n");
close(afd);
free(c);
return;
}
c->inc = chancreate(sizeof(void*), 0);
c->internal = chancreate(sizeof(void*), 0);
c->inq = qalloc();
c->outq = qalloc();
c->outqdead = chancreate(sizeof(void*), 0);
if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
threadcreate(connthread, c, STACK);
}
}
void
send9pmsg(Msg *m)
{
int n, nn;
n = sizeS2Mu(&m->rx, m->c->dotu);
m->rpkt = emalloc(n);
nn = convS2Mu(&m->rx, m->rpkt, n, m->c->dotu);
if(nn <= BIT16SZ)
sysfatal("convS2Mu conversion error");
if(nn != n)
sysfatal("sizeS2Mu and convS2Mu disagree");
sendq(m->c->outq, m);
}
void
sendomsg(Msg *m)
{
int n, nn;
n = sizeS2Mu(&m->tx, m->c->dotu);
m->tpkt = emalloc(n);
nn = convS2Mu(&m->tx, m->tpkt, n, m->c->dotu);
if(nn <= BIT16SZ)
sysfatal("convS2Mu conversion error");
if(nn != n)
sysfatal("sizeS2Mu and convS2Mu disagree");
sendq(outq, m);
}
void
err(Msg *m, char *ename)
{
m->rx.type = Rerror;
m->rx.ename = ename;
m->rx.tag = m->tx.tag;
send9pmsg(m);
}
char*
estrdup(char *s)
{
char *t;
t = emalloc(strlen(s)+1);
strcpy(t, s);
return t;
}
void
connthread(void *arg)
{
int i, fd;
Conn *c;
Hash *h, *hnext;
Msg *m, *om, *mm, sync;
Fid *f;
Ioproc *io;
c = arg;
threadsetname("conn %s", c->dir);
io = ioproc();
fd = ioaccept(io, c->fd, c->dir);
if(fd < 0){
if(verbose) fprint(2, "%T accept %s: %r\n", c->dir);
goto out;
}
close(c->fd);
c->fd = fd;
threadcreate(connoutthread, c, STACK);
while((m = mread9p(io, c->fd, c->dotu)) != nil){
if(verbose > 1) fprint(2, "%T fd#%d -> %F\n", c->fd, &m->tx);
m->c = c;
m->ctag = m->tx.tag;
c->nmsg++;
if(verbose > 1) fprint(2, "%T fd#%d: new msg %p\n", c->fd, m);
if(puthash(c->tag, m->tx.tag, m) < 0){
err(m, "duplicate tag");
continue;
}
msgincref(m);
switch(m->tx.type){
case Tversion:
m->rx.tag = m->tx.tag;
m->rx.msize = m->tx.msize;
if(m->rx.msize > msize)
m->rx.msize = msize;
m->rx.version = "9P2000";
c->dotu = 0;
if(dotu && strncmp(m->tx.version, "9P2000.u", 8) == 0){
m->rx.version = "9P2000.u";
c->dotu = 1;
}
m->rx.type = Rversion;
send9pmsg(m);
continue;
case Tflush:
if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
m->rx.tag = m->tx.tag;
m->rx.type = Rflush;
send9pmsg(m);
continue;
}
msgincref(m->oldm);
break;
case Tattach:
m->afid = nil;
if(m->tx.afid != NOFID
&& (m->afid = gethash(c->fid, m->tx.afid)) == nil){
err(m, "unknown fid");
continue;
}
if(m->afid)
m->afid->ref++;
m->fid = fidnew(m->tx.fid);
if(puthash(c->fid, m->tx.fid, m->fid) < 0){
err(m, "duplicate fid");
continue;
}
m->fid->ref++;
if(attached && m->afid==nil){
if(m->tx.aname[0] && strcmp(xaname, m->tx.aname) != 0){
err(m, "invalid attach name");
continue;
}
m->tx.afid = xafid;
m->tx.aname = xaname;
m->tx.uname = getuser(); /* what srv.c used */
repack(&m->tx, &m->tpkt, c->dotu);
}
if(dotu && !c->dotu)
cvtuauthattach(&m->tx, &m->tpkt);
break;
case Twalk:
if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
err(m, "unknown fid");
continue;
}
m->fid->ref++;
if(m->tx.newfid == m->tx.fid){
m->fid->ref++;
m->newfid = m->fid;
}else{
m->newfid = fidnew(m->tx.newfid);
if(puthash(c->fid, m->tx.newfid, m->newfid) < 0){
err(m, "duplicate fid");
continue;
}
m->newfid->ref++;
}
break;
case Tauth:
if(attached){
err(m, "authentication not required");
continue;
}
if(noauth){
err(m, "authentication rejected");
continue;
}
m->afid = fidnew(m->tx.afid);
if(puthash(c->fid, m->tx.afid, m->afid) < 0){
err(m, "duplicate fid");
continue;
}
m->afid->ref++;
if(dotu && !c->dotu)
cvtuauthattach(&m->tx, &m->tpkt);
break;
case Tcreate:
if(dotu && !c->dotu && (m->tx.perm&(DMSYMLINK|DMDEVICE|DMNAMEDPIPE|DMSOCKET))){
err(m, "unsupported file type");
continue;
}
if(dotu && !c->dotu)
cvtucreate(&m->tx, &m->tpkt);
goto caseTopen;
case Topenfd:
if(m->tx.mode&~(OTRUNC|3)){
err(m, "bad openfd mode");
continue;
}
m->isopenfd = 1;
m->tx.type = Topen;
m->tpkt[4] = Topen;
/* fall through */
caseTopen:
case Topen:
case Tclunk:
case Tread:
case Twrite:
case Tremove:
case Tstat:
case Twstat:
if((m->fid = gethash(c->fid, m->tx.fid)) == nil){
err(m, "unknown fid");
continue;
}
m->fid->ref++;
if(m->tx.type==Twstat && dotu && !c->dotu){
if(cvtustat(&m->tx, &m->tpkt, 1) < 0){
err(m, "cannot convert stat buffer");
continue;
}
}
if(m->tx.type==Tread && m->fid->isdir && dotu && !c->dotu){
if(m->tx.offset = m->fid->coffset)
m->tx.offset = m->fid->offset;
else
m->fid->offset = m->fid->coffset;
}
break;
}
/* have everything - translate and send */
m->c = c;
m->ctag = m->tx.tag;
m->tx.tag = m->tag;
if(m->fid)
m->tx.fid = m->fid->fid;
if(m->newfid)
m->tx.newfid = m->newfid->fid;
if(m->afid)
m->tx.afid = m->afid->fid;
if(m->oldm)
m->tx.oldtag = m->oldm->tag;
/* reference passes to outq */
sendq(outq, m);
while(c->nmsg >= MAXMSG){
c->inputstalled = 1;
recvp(c->inc);
}
}
if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
/* flush all outstanding messages */
for(i=0; i<NHASH; i++){
while((h = c->tag[i]) != nil){
om = h->v;
msgincref(om); /* for us */
m = msgnew(0);
m->internal = 1;
m->c = c;
c->nmsg++;
m->tx.type = Tflush;
m->tx.tag = m->tag;
m->tx.oldtag = om->tag;
m->oldm = om;
msgincref(om);
msgincref(m); /* for outq */
sendomsg(m);
mm = recvp(c->internal);
assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
if(delhash(c->tag, om->ctag, om) == 0)
msgput(om); /* got from hash table */
msgput(om); /* got from msgincref */
}
}
/*
* outputthread has written all its messages
* to the remote connection (because we've gotten all the replies!),
* but it might not have gotten a chance to msgput
* the very last one. sync up to make sure.
*/
memset(&sync, 0, sizeof sync);
sync.sync = 1;
sync.c = c;
sendq(outq, &sync);
recvp(c->outqdead);
/* everything is quiet; can close the local output queue. */
sendq(c->outq, nil);
recvp(c->outqdead);
/* should be no messages left anywhere. */
assert(c->nmsg == 0);
/* clunk all outstanding fids */
for(i=0; i<NHASH; i++){
for(h=c->fid[i]; h; h=hnext){
f = h->v;
m = msgnew(0);
m->internal = 1;
m->c = c;
c->nmsg++;
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = f->fid;
m->fid = f;
f->ref++;
msgincref(m);
sendomsg(m);
mm = recvp(c->internal);
assert(mm == m);
msgclear(m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
fidput(f); /* got from hash table */
hnext = h->next;
free(h);
}
}
out:
closeioproc(io);
assert(c->nmsg == 0);
assert(c->nfid == 0);
close(c->fd);
chanfree(c->internal);
c->internal = 0;
chanfree(c->inc);
c->inc = 0;
free(c->inq);
c->inq = 0;
free(c);
}
static void
openfdthread(void *v)
{
Conn *c;
Fid *fid;
Msg *m;
int n;
vlong tot;
Ioproc *io;
char buf[1024];
c = v;
fid = c->fdfid;
io = ioproc();
threadsetname("openfd %s", c->fdfid);
tot = 0;
m = nil;
if(c->fdmode == OREAD){
for(;;){
if(verbose) fprint(2, "%T tread...");
m = msgnew(0);
m->internal = 1;
m->c = c;
m->tx.type = Tread;
m->tx.count = msize - IOHDRSZ;
m->tx.fid = fid->fid;
m->tx.tag = m->tag;
m->tx.offset = tot;
m->fid = fid;
fid->ref++;
msgincref(m);
sendomsg(m);
recvp(c->internal);
if(m->rx.type == Rerror){
/* fprint(2, "%T read error: %s\n", m->rx.ename); */
break;
}
if(m->rx.count == 0)
break;
tot += m->rx.count;
if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
/* fprint(2, "%T pipe write error: %r\n"); */
break;
}
msgput(m);
msgput(m);
m = nil;
}
}else{
for(;;){
if(verbose) fprint(2, "%T twrite...");
n = sizeof buf;
if(n > msize)
n = msize;
if((n=ioread(io, c->fd, buf, n)) <= 0){
if(n < 0)
fprint(2, "%T pipe read error: %r\n");
break;
}
m = msgnew(0);
m->internal = 1;
m->c = c;
m->tx.type = Twrite;
m->tx.fid = fid->fid;
m->tx.data = buf;
m->tx.count = n;
m->tx.tag = m->tag;
m->tx.offset = tot;
m->fid = fid;
fid->ref++;
msgincref(m);
sendomsg(m);
recvp(c->internal);
if(m->rx.type == Rerror){
/* fprint(2, "%T write error: %s\n", m->rx.ename); */
}
tot += n;
msgput(m);
msgput(m);
m = nil;
}
}
if(verbose) fprint(2, "%T eof on %d fid %d\n", c->fd, fid->fid);
close(c->fd);
closeioproc(io);
if(m){
msgput(m);
msgput(m);
}
if(verbose) fprint(2, "%T eof on %d fid %d ref %d\n", c->fd, fid->fid, fid->ref);
if(--fid->openfd == 0){
m = msgnew(0);
m->internal = 1;
m->c = c;
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = fid->fid;
m->fid = fid;
fid->ref++;
msgincref(m);
sendomsg(m);
recvp(c->internal);
msgput(m);
msgput(m);
}
fidput(fid);
c->fdfid = nil;
chanfree(c->internal);
c->internal = 0;
free(c);
}
int
xopenfd(Msg *m)
{
char errs[ERRMAX];
int n, p[2];
Conn *nc;
if(pipe(p) < 0){
rerrstr(errs, sizeof errs);
err(m, errs);
/* XXX return here? */
}
if(verbose) fprint(2, "%T xopen pipe %d %d...", p[0], p[1]);
/* now we're committed. */
/* a new connection for this fid */
nc = emalloc(sizeof(Conn));
nc->internal = chancreate(sizeof(void*), 0);
/* a ref for us */
nc->fdfid = m->fid;
m->fid->ref++;
nc->fdfid->openfd++;
nc->fdmode = m->tx.mode;
nc->fd = p[0];
/* a thread to tend the pipe */
threadcreate(openfdthread, nc, STACK);
/* if mode is ORDWR, that openfdthread will write; start a reader */
if((m->tx.mode&3) == ORDWR){
nc = emalloc(sizeof(Conn));
nc->internal = chancreate(sizeof(void*), 0);
nc->fdfid = m->fid;
m->fid->ref++;
nc->fdfid->openfd++;
nc->fdmode = OREAD;
nc->fd = dup(p[0], -1);
threadcreate(openfdthread, nc, STACK);
}
/* steal fid from other connection */
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
/* rewrite as Ropenfd */
m->rx.type = Ropenfd;
n = GBIT32(m->rpkt);
m->rpkt = erealloc(m->rpkt, n+4);
PBIT32(m->rpkt+n, p[1]);
n += 4;
PBIT32(m->rpkt, n);
m->rpkt[4] = Ropenfd;
m->rx.unixfd = p[1];
return 0;
}
void
connoutthread(void *arg)
{
char *ename;
int err;
Conn *c;
Msg *m, *om;
Ioproc *io;
c = arg;
io = ioproc();
threadsetname("connout %s", c->dir);
while((m = recvq(c->outq)) != nil){
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
continue;
switch(m->tx.type){
case Tflush:
om = m->oldm;
if(om)
if(delhash(om->c->tag, om->ctag, om) == 0)
msgput(om);
break;
case Tclunk:
case Tremove:
if(m->fid)
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
break;
case Tauth:
if(err && m->afid){
if(verbose) fprint(2, "%T auth error\n");
if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0)
fidput(m->afid);
}
break;
case Tattach:
if(err && m->fid)
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
break;
case Twalk:
if(err || m->rx.nwqid < m->tx.nwname)
if(m->tx.fid != m->tx.newfid && m->newfid)
if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0)
fidput(m->newfid);
break;
case Tread:
if(!err && m->fid->isdir && dotu && !m->c->dotu){
m->fid->offset += m->rx.count;
stripudirread(m);
m->fid->coffset += m->rx.count;
}
break;
case Tstat:
if(!err && dotu && !m->c->dotu)
cvtustat(&m->rx, &m->rpkt, 0);
break;
case Topen:
case Tcreate:
m->fid->isdir = (m->rx.qid.type & QTDIR);
break;
}
if(m->rx.type==Rerror && dotu && !c->dotu){
ename = estrdup(m->rx.ename);
m->rx.ename = ename;
repack(&m->rx, &m->rpkt, c->dotu);
free(ename);
m->rx.ename = "XXX";
}
if(delhash(m->c->tag, m->ctag, m) == 0)
msgput(m);
if(verbose > 1) fprint(2, "%T fd#%d <- %F\n", c->fd, &m->rx);
rewritehdr(&m->rx, m->rpkt);
if(mwrite9p(io, c->fd, m->rpkt) < 0)
if(verbose) fprint(2, "%T write error: %r\n");
msgput(m);
if(c->inputstalled && c->nmsg < MAXMSG)
nbsendp(c->inc, 0);
}
closeioproc(io);
free(c->outq);
c->outq = nil;
sendp(c->outqdead, nil);
}
void
outputthread(void *arg)
{
Msg *m;
Ioproc *io;
USED(arg);
io = ioproc();
threadsetname("output");
while((m = recvq(outq)) != nil){
if(m->sync){
sendp(m->c->outqdead, nil);
continue;
}
if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
rewritehdr(&m->tx, m->tpkt);
if(mwrite9p(io, 1, m->tpkt) < 0)
sysfatal("output error: %r");
msgput(m);
}
closeioproc(io);
fprint(2, "%T output eof\n");
threadexitsall(0);
}
void
inputthread(void *arg)
{
uchar *pkt;
int n, nn, tag;
Msg *m;
Ioproc *io;
threadsetname("input");
if(verbose) fprint(2, "%T input thread\n");
io = ioproc();
USED(arg);
while((pkt = read9ppkt(io, 0)) != nil){
n = GBIT32(pkt);
if(n < 7){
fprint(2, "%T short 9P packet from server\n");
free(pkt);
continue;
}
if(verbose > 2) fprint(2, "%T read %.*H\n", n, pkt);
tag = GBIT16(pkt+5);
if((m = msgget(tag)) == nil){
fprint(2, "%T unexpected 9P response tag %d\n", tag);
free(pkt);
continue;
}
if((nn = convM2Su(pkt, n, &m->rx, dotu)) != n){
fprint(2, "%T bad packet - convM2S %d but %d\n", nn, n);
free(pkt);
msgput(m);
continue;
}
if(verbose > 1) fprint(2, "%T * -> %F%s\n", &m->rx,
m->internal ? " (internal)" : "");
m->rpkt = pkt;
m->rx.tag = m->ctag;
if(m->internal)
sendp(m->c->internal, m);
else if(m->c->outq)
sendq(m->c->outq, m);
else
msgput(m);
}
closeioproc(io);
/*fprint(2, "%T input eof\n"); */
threadexitsall(0);
}
void*
gethash(Hash **ht, uint n)
{
Hash *h;
for(h=ht[n%NHASH]; h; h=h->next)
if(h->n == n)
return h->v;
return nil;
}
int
delhash(Hash **ht, uint n, void *v)
{
Hash *h, **l;
for(l=&ht[n%NHASH]; h=*l; l=&h->next)
if(h->n == n){
if(h->v != v){
if(verbose) fprint(2, "%T delhash %d got %p want %p\n", n, h->v, v);
return -1;
}
*l = h->next;
free(h);
return 0;
}
return -1;
}
int
puthash(Hash **ht, uint n, void *v)
{
Hash *h;
if(gethash(ht, n))
return -1;
h = emalloc(sizeof(Hash));
h->next = ht[n%NHASH];
h->n = n;
h->v = v;
ht[n%NHASH] = h;
return 0;
}
Fid **fidtab;
int nfidtab;
Fid *freefid;
Fid*
fidnew(int cfid)
{
Fid *f;
if(freefid == nil){
fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
if(nfidtab == xafid){
fidtab[nfidtab++] = nil;
fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0]));
}
fidtab[nfidtab] = emalloc(sizeof(Fid));
freefid = fidtab[nfidtab];
freefid->fid = nfidtab++;
}
f = freefid;
freefid = f->next;
f->cfid = cfid;
f->ref = 1;
f->offset = 0;
f->coffset = 0;
f->isdir = -1;
return f;
}
void
fidput(Fid *f)
{
if(f == nil)
return;
assert(f->ref > 0);
if(--f->ref > 0)
return;
f->next = freefid;
f->cfid = -1;
freefid = f;
}
Msg **msgtab;
int nmsgtab;
int nmsg;
Msg *freemsg;
void
msgincref(Msg *m)
{
if(verbose > 1) fprint(2, "%T msgincref @0x%lux %p tag %d/%d ref %d=>%d\n",
getcallerpc(&m), m, m->tag, m->ctag, m->ref, m->ref+1);
m->ref++;
}
Msg*
msgnew(int x)
{
Msg *m;
if(freemsg == nil){
msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0]));
msgtab[nmsgtab] = emalloc(sizeof(Msg));
freemsg = msgtab[nmsgtab];
freemsg->tag = nmsgtab++;
}
m = freemsg;
freemsg = m->next;
m->ref = 1;
if(verbose > 1) fprint(2, "%T msgnew @0x%lux %p tag %d ref %d\n",
getcallerpc(&x), m, m->tag, m->ref);
nmsg++;
return m;
}
/*
* Clear data associated with connections, so that
* if all msgs have been msgcleared, the connection
* can be freed. Note that this does *not* free the tpkt
* and rpkt; they are freed in msgput with the msg itself.
* The io write thread might still be holding a ref to msg
* even once the connection has finished with it.
*/
void
msgclear(Msg *m)
{
if(m->c){
m->c->nmsg--;
m->c = nil;
}
if(m->oldm){
msgput(m->oldm);
m->oldm = nil;
}
if(m->fid){
fidput(m->fid);
m->fid = nil;
}
if(m->afid){
fidput(m->afid);
m->afid = nil;
}
if(m->newfid){
fidput(m->newfid);
m->newfid = nil;
}
if(m->rx.type == Ropenfd && m->rx.unixfd >= 0){
close(m->rx.unixfd);
m->rx.unixfd = -1;
}
}
void
msgput(Msg *m)
{
if(m == nil)
return;
if(verbose > 1) fprint(2, "%T msgput 0x%lux %p tag %d/%d ref %d\n",
getcallerpc(&m), m, m->tag, m->ctag, m->ref);
assert(m->ref > 0);
if(--m->ref > 0)
return;
nmsg--;
msgclear(m);
if(m->tpkt){
free(m->tpkt);
m->tpkt = nil;
}
if(m->rpkt){
free(m->rpkt);
m->rpkt = nil;
}
m->isopenfd = 0;
m->internal = 0;
m->next = freemsg;
freemsg = m;
}
Msg*
msgget(int n)
{
Msg *m;
if(n < 0 || n >= nmsgtab)
return nil;
m = msgtab[n];
if(m->ref == 0)
return nil;
if(verbose) fprint(2, "%T msgget %d = %p\n", n, m);
msgincref(m);
return m;
}
void*
emalloc(int n)
{
void *v;
v = mallocz(n, 1);
if(v == nil){
abort();
sysfatal("out of memory allocating %d", n);
}
return v;
}
void*
erealloc(void *v, int n)
{
v = realloc(v, n);
if(v == nil){
abort();
sysfatal("out of memory reallocating %d", n);
}
return v;
}
typedef struct Qel Qel;
struct Qel
{
Qel *next;
void *p;
};
struct Queue
{
QLock lk;
Rendez r;
Qel *head;
Qel *tail;
};
Queue*
qalloc(void)
{
Queue *q;
q = mallocz(sizeof(Queue), 1);
if(q == nil)
return nil;
q->r.l = &q->lk;
return q;
}
int
sendq(Queue *q, void *p)
{
Qel *e;
e = emalloc(sizeof(Qel));
qlock(&q->lk);
e->p = p;
e->next = nil;
if(q->head == nil)
q->head = e;
else
q->tail->next = e;
q->tail = e;
rwakeup(&q->r);
qunlock(&q->lk);
return 0;
}
void*
recvq(Queue *q)
{
void *p;
Qel *e;
qlock(&q->lk);
while(q->head == nil)
rsleep(&q->r);
e = q->head;
q->head = e->next;
qunlock(&q->lk);
p = e->p;
free(e);
return p;
}
uchar*
read9ppkt(Ioproc *io, int fd)
{
uchar buf[4], *pkt;
int n, nn;
n = ioreadn(io, fd, buf, 4);
if(n != 4)
return nil;
n = GBIT32(buf);
if(n > MAXMSGSIZE)
return nil;
pkt = emalloc(n);
PBIT32(pkt, n);
nn = ioreadn(io, fd, pkt+4, n-4);
if(nn != n-4){
free(pkt);
return nil;
}
/* would do this if we ever got one of these, but we only generate them
if(pkt[4] == Ropenfd){
newfd = iorecvfd(io, fd);
PBIT32(pkt+n-4, newfd);
}
*/
return pkt;
}
Msg*
mread9p(Ioproc *io, int fd, int dotu)
{
int n, nn;
uchar *pkt;
Msg *m;
if((pkt = read9ppkt(io, fd)) == nil)
return nil;
m = msgnew(0);
m->tpkt = pkt;
n = GBIT32(pkt);
nn = convM2Su(pkt, n, &m->tx, dotu);
if(nn != n){
fprint(2, "%T read bad packet from %d\n", fd);
return nil;
}
return m;
}
int
mwrite9p(Ioproc *io, int fd, uchar *pkt)
{
int n, nfd;
n = GBIT32(pkt);
if(verbose > 2) fprint(2, "%T write %d %d %.*H\n", fd, n, n, pkt);
if(verbose > 1) fprint(2, "%T before iowrite\n");
if(iowrite(io, fd, pkt, n) != n){
fprint(2, "%T write error: %r\n");
return -1;
}
if(verbose > 1) fprint(2, "%T after iowrite\n");
if(pkt[4] == Ropenfd){
nfd = GBIT32(pkt+n-4);
if(iosendfd(io, fd, nfd) < 0){
fprint(2, "%T send fd error: %r\n");
return -1;
}
}
return 0;
}
void
restring(uchar *pkt, int pn, char *s)
{
int n;
if(s < (char*)pkt || s >= (char*)pkt+pn)
return;
n = strlen(s);
memmove(s+1, s, n);
PBIT16((uchar*)s-1, n);
}
void
repack(Fcall *f, uchar **ppkt, int dotu)
{
uint n, nn;
uchar *pkt;
pkt = *ppkt;
n = GBIT32(pkt);
nn = sizeS2Mu(f, dotu);
if(nn > n){
free(pkt);
pkt = emalloc(nn);
*ppkt = pkt;
}
n = convS2Mu(f, pkt, nn, dotu);
if(n <= BIT16SZ)
sysfatal("convS2M conversion error");
if(n != nn)
sysfatal("convS2Mu and sizeS2Mu disagree");
}
void
rewritehdr(Fcall *f, uchar *pkt)
{
int i, n;
n = GBIT32(pkt);
PBIT16(pkt+5, f->tag);
switch(f->type){
case Tversion:
case Rversion:
restring(pkt, n, f->version);
break;
case Tauth:
PBIT32(pkt+7, f->afid);
restring(pkt, n, f->uname);
restring(pkt, n, f->aname);
break;
case Tflush:
PBIT16(pkt+7, f->oldtag);
break;
case Tattach:
restring(pkt, n, f->uname);
restring(pkt, n, f->aname);
PBIT32(pkt+7, f->fid);
PBIT32(pkt+11, f->afid);
break;
case Twalk:
PBIT32(pkt+7, f->fid);
PBIT32(pkt+11, f->newfid);
for(i=0; i<f->nwname; i++)
restring(pkt, n, f->wname[i]);
break;
case Tcreate:
restring(pkt, n, f->name);
/* fall through */
case Topen:
case Tclunk:
case Tremove:
case Tstat:
case Twstat:
case Twrite:
PBIT32(pkt+7, f->fid);
break;
case Tread:
PBIT32(pkt+7, f->fid);
PBIT64(pkt+11, f->offset);
break;
case Rerror:
restring(pkt, n, f->ename);
break;
}
}
static long
_iolisten(va_list *arg)
{
char *a, *b;
a = va_arg(*arg, char*);
b = va_arg(*arg, char*);
return listen(a, b);
}
int
iolisten(Ioproc *io, char *a, char *b)
{
return iocall(io, _iolisten, a, b);
}
static long
_ioaccept(va_list *arg)
{
int fd;
char *dir;
fd = va_arg(*arg, int);
dir = va_arg(*arg, char*);
return accept(fd, dir);
}
int
ioaccept(Ioproc *io, int fd, char *dir)
{
return iocall(io, _ioaccept, fd, dir);
}
int
timefmt(Fmt *fmt)
{
static char *mon[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
vlong ns;
Tm tm;
ns = nsec();
tm = *localtime(time(0));
return fmtprint(fmt, "%s %2d %02d:%02d:%02d.%03d",
mon[tm.mon], tm.mday, tm.hour, tm.min, tm.sec,
(int)(ns%1000000000)/1000000);
}
int
cvtustat(Fcall *f, uchar **fpkt, int tounix)
{
int n;
uchar *buf;
char *str;
Dir dir;
str = emalloc(f->nstat);
n = convM2Du(f->stat, f->nstat, &dir, str, !tounix);
if(n <= BIT16SZ){
free(str);
return -1;
}
n = sizeD2Mu(&dir, tounix);
buf = emalloc(n);
if(convD2Mu(&dir, buf, n, tounix) != n)
sysfatal("convD2Mu conversion error");
f->nstat = n;
f->stat = buf;
repack(f, fpkt, dotu);
free(buf);
f->stat = nil; /* is this okay ??? */
free(str);
return 0;
}
int
stripudirread(Msg* msg)
{
char *str;
int i, m, n, nn;
uchar *buf;
Dir d;
Fcall* rx;
buf = nil;
str = nil;
rx = &msg->rx;
n = 0;
nn = 0;
for(i = 0; i < rx->count; i += m){
m = BIT16SZ + GBIT16(&rx->data[i]);
if(statchecku((uchar*)&rx->data[i], m, 1) < 0)
return -1;
if(nn < m)
nn = m;
n++;
}
str = emalloc(nn);
buf = emalloc(rx->count);
nn = 0;
for(i = 0; i < rx->count; i += m){
m = BIT16SZ + GBIT16(&rx->data[i]);
if(convM2Du((uchar*)&rx->data[i], m, &d, str, 1) != m){
free(buf);
free(str);
return -1;
}
n = convD2M(&d, &buf[nn], rx->count - nn);
if(n <= BIT16SZ){
free(buf);
free(str);
return -1;
}
nn += n;
}
rx->count = nn;
rx->data = (char*)buf;
repack(&msg->rx, &msg->rpkt, 0);
free(str);
free(buf);
rx->data = nil; /* is this okay ??? */
return 0;
}
void*
updateptr(uchar *p0, uchar *p0old, void *p)
{
return p0 + ((uchar*)p - p0old);
}
uchar*
growpkt(uchar **ppkt, int sz)
{
int n;
uchar *ap, *pkt;
pkt = *ppkt;
n = GBIT32(pkt);
pkt = erealloc(pkt, n+sz);
PBIT32(pkt, n+sz);
ap = &pkt[n];
memset(ap, 0, sz);
*ppkt = pkt;
return ap;
}
void
cvtuauthattach(Fcall *f, uchar **ppkt)
{
uchar *opkt, *ap;
opkt = *ppkt;
ap = growpkt(ppkt, BIT32SZ);
PBIT32(ap, NOUID);
if(*ppkt != opkt){
f->uname = updateptr(*ppkt, opkt, f->uname);
f->aname = updateptr(*ppkt, opkt, f->aname);
}
}
void
cvtucreate(Fcall *f, uchar **ppkt)
{
uchar *opkt;
opkt = *ppkt;
growpkt(ppkt, BIT16SZ); /* add an empty `extension' */
if(*ppkt != opkt)
f->name = updateptr(*ppkt, opkt, f->name);
}