plan9port/src/libmux/mux.c
rsc 3a19470202 In non-blocking recv functions in libmux and libdraw,
distinguish between "cannot receive without blocking"
and "EOF on connection".

In libmux, do not elect async guys muxers, so that
synchronous RPC calls run in the main event loop
(e.g., in eresized) do not get stuck.

Fixes problem reported by Lu Xuxiao, namely that
jpg etc. would spin at 100% cpu usage.
2006-11-04 18:46:00 +00:00

286 lines
5.0 KiB
C

/* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
/*
* Generic RPC packet multiplexor. Inspired by but not derived from
* Plan 9 kernel. Originally developed as part of Tra, later used in
* libnventi, and then finally split out into a generic library.
*/
#include <u.h>
#include <libc.h>
#include <mux.h>
static int gettag(Mux*, Muxrpc*);
static void puttag(Mux*, Muxrpc*);
static void enqueue(Mux*, Muxrpc*);
static void dequeue(Mux*, Muxrpc*);
void
muxinit(Mux *mux)
{
memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
mux->tagrend.l = &mux->lk;
mux->rpcfork.l = &mux->lk;
mux->sleep.next = &mux->sleep;
mux->sleep.prev = &mux->sleep;
}
static Muxrpc*
allocmuxrpc(Mux *mux)
{
Muxrpc *r;
/* must malloc because stack could be private */
r = mallocz(sizeof(Muxrpc), 1);
if(r == nil){
werrstr("mallocz: %r");
return nil;
}
r->mux = mux;
r->r.l = &mux->lk;
r->waiting = 1;
return r;
}
static int
tagmuxrpc(Muxrpc *r, void *tx)
{
int tag;
Mux *mux;
mux = r->mux;
/* assign the tag, add selves to response queue */
qlock(&mux->lk);
tag = gettag(mux, r);
/*print("gettag %p %d\n", r, tag); */
enqueue(mux, r);
qunlock(&mux->lk);
/* actually send the packet */
if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
werrstr("settag/send tag %d: %r", tag);
fprint(2, "%r\n");
qlock(&mux->lk);
dequeue(mux, r);
puttag(mux, r);
qunlock(&mux->lk);
return -1;
}
return 0;
}
void
muxmsgandqlock(Mux *mux, void *p)
{
int tag;
Muxrpc *r2;
tag = mux->gettag(mux, p) - mux->mintag;
/*print("mux tag %d\n", tag); */
qlock(&mux->lk);
/* hand packet to correct sleeper */
if(tag < 0 || tag >= mux->mwait){
fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
/* must leak packet! don't know how to free it! */
return;
}
r2 = mux->wait[tag];
if(r2 == nil || r2->prev == nil){
fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
/* must leak packet! don't know how to free it! */
return;
}
r2->p = p;
dequeue(mux, r2);
rwakeup(&r2->r);
}
void
electmuxer(Mux *mux)
{
Muxrpc *rpc;
/* if there is anyone else sleeping, wake them to mux */
for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
if(!rpc->async){
mux->muxer = rpc;
rwakeup(&rpc->r);
return;
}
}
mux->muxer = nil;
}
void*
muxrpc(Mux *mux, void *tx)
{
int tag;
Muxrpc *r;
void *p;
if((r = allocmuxrpc(mux)) == nil)
return nil;
if((tag = tagmuxrpc(r, tx)) < 0)
return nil;
qlock(&mux->lk);
/* wait for our packet */
while(mux->muxer && mux->muxer != r && !r->p)
rsleep(&r->r);
/* if not done, there's no muxer: start muxing */
if(!r->p){
if(mux->muxer != nil && mux->muxer != r)
abort();
mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
_muxrecv(mux, 1, &p);
if(p == nil){
/* eof -- just give up and pass the buck */
qlock(&mux->lk);
dequeue(mux, r);
break;
}
muxmsgandqlock(mux, p);
}
electmuxer(mux);
}
p = r->p;
puttag(mux, r);
qunlock(&mux->lk);
if(p == nil)
werrstr("unexpected eof");
return p;
}
Muxrpc*
muxrpcstart(Mux *mux, void *tx)
{
int tag;
Muxrpc *r;
if((r = allocmuxrpc(mux)) == nil)
return nil;
r->async = 1;
if((tag = tagmuxrpc(r, tx)) < 0)
return nil;
return r;
}
int
muxrpccanfinish(Muxrpc *r, void **vp)
{
void *p;
Mux *mux;
int ret;
mux = r->mux;
qlock(&mux->lk);
ret = 1;
if(!r->p && !mux->muxer){
mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
p = nil;
if(!_muxrecv(mux, 0, &p))
ret = 0;
if(p == nil){
qlock(&mux->lk);
break;
}
muxmsgandqlock(mux, p);
}
electmuxer(mux);
}
p = r->p;
if(p)
puttag(mux, r);
qunlock(&mux->lk);
*vp = p;
return ret;
}
static void
enqueue(Mux *mux, Muxrpc *r)
{
r->next = mux->sleep.next;
r->prev = &mux->sleep;
r->next->prev = r;
r->prev->next = r;
}
static void
dequeue(Mux *mux, Muxrpc *r)
{
r->next->prev = r->prev;
r->prev->next = r->next;
r->prev = nil;
r->next = nil;
}
static int
gettag(Mux *mux, Muxrpc *r)
{
int i, mw;
Muxrpc **w;
for(;;){
/* wait for a free tag */
while(mux->nwait == mux->mwait){
if(mux->mwait < mux->maxtag-mux->mintag){
mw = mux->mwait;
if(mw == 0)
mw = 1;
else
mw <<= 1;
w = realloc(mux->wait, mw*sizeof(w[0]));
if(w == nil)
return -1;
memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
mux->wait = w;
mux->freetag = mux->mwait;
mux->mwait = mw;
break;
}
rsleep(&mux->tagrend);
}
i=mux->freetag;
if(mux->wait[i] == 0)
goto Found;
for(; i<mux->mwait; i++)
if(mux->wait[i] == 0)
goto Found;
for(i=0; i<mux->freetag; i++)
if(mux->wait[i] == 0)
goto Found;
/* should not fall out of while without free tag */
fprint(2, "libfs: nwait botch\n");
abort();
}
Found:
mux->nwait++;
mux->wait[i] = r;
r->tag = i+mux->mintag;
return r->tag;
}
static void
puttag(Mux *mux, Muxrpc *r)
{
int i;
i = r->tag - mux->mintag;
assert(mux->wait[i] == r);
mux->wait[i] = nil;
mux->nwait--;
mux->freetag = i;
rwakeup(&mux->tagrend);
free(r);
}