distinguish between "cannot receive without blocking" and "EOF on connection". In libmux, do not elect async guys muxers, so that synchronous RPC calls run in the main event loop (e.g., in eresized) do not get stuck. Fixes problem reported by Lu Xuxiao, namely that jpg etc. would spin at 100% cpu usage.
286 lines
5.0 KiB
C
286 lines
5.0 KiB
C
/* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
|
|
/* See COPYRIGHT */
|
|
|
|
/*
|
|
* Generic RPC packet multiplexor. Inspired by but not derived from
|
|
* Plan 9 kernel. Originally developed as part of Tra, later used in
|
|
* libnventi, and then finally split out into a generic library.
|
|
*/
|
|
|
|
#include <u.h>
|
|
#include <libc.h>
|
|
#include <mux.h>
|
|
|
|
static int gettag(Mux*, Muxrpc*);
|
|
static void puttag(Mux*, Muxrpc*);
|
|
static void enqueue(Mux*, Muxrpc*);
|
|
static void dequeue(Mux*, Muxrpc*);
|
|
|
|
void
|
|
muxinit(Mux *mux)
|
|
{
|
|
memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
|
|
mux->tagrend.l = &mux->lk;
|
|
mux->rpcfork.l = &mux->lk;
|
|
mux->sleep.next = &mux->sleep;
|
|
mux->sleep.prev = &mux->sleep;
|
|
}
|
|
|
|
static Muxrpc*
|
|
allocmuxrpc(Mux *mux)
|
|
{
|
|
Muxrpc *r;
|
|
|
|
/* must malloc because stack could be private */
|
|
r = mallocz(sizeof(Muxrpc), 1);
|
|
if(r == nil){
|
|
werrstr("mallocz: %r");
|
|
return nil;
|
|
}
|
|
r->mux = mux;
|
|
r->r.l = &mux->lk;
|
|
r->waiting = 1;
|
|
|
|
return r;
|
|
}
|
|
|
|
static int
|
|
tagmuxrpc(Muxrpc *r, void *tx)
|
|
{
|
|
int tag;
|
|
Mux *mux;
|
|
|
|
mux = r->mux;
|
|
/* assign the tag, add selves to response queue */
|
|
qlock(&mux->lk);
|
|
tag = gettag(mux, r);
|
|
/*print("gettag %p %d\n", r, tag); */
|
|
enqueue(mux, r);
|
|
qunlock(&mux->lk);
|
|
|
|
/* actually send the packet */
|
|
if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
|
|
werrstr("settag/send tag %d: %r", tag);
|
|
fprint(2, "%r\n");
|
|
qlock(&mux->lk);
|
|
dequeue(mux, r);
|
|
puttag(mux, r);
|
|
qunlock(&mux->lk);
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
muxmsgandqlock(Mux *mux, void *p)
|
|
{
|
|
int tag;
|
|
Muxrpc *r2;
|
|
|
|
tag = mux->gettag(mux, p) - mux->mintag;
|
|
/*print("mux tag %d\n", tag); */
|
|
qlock(&mux->lk);
|
|
/* hand packet to correct sleeper */
|
|
if(tag < 0 || tag >= mux->mwait){
|
|
fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
|
|
/* must leak packet! don't know how to free it! */
|
|
return;
|
|
}
|
|
r2 = mux->wait[tag];
|
|
if(r2 == nil || r2->prev == nil){
|
|
fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
|
|
/* must leak packet! don't know how to free it! */
|
|
return;
|
|
}
|
|
r2->p = p;
|
|
dequeue(mux, r2);
|
|
rwakeup(&r2->r);
|
|
}
|
|
|
|
void
|
|
electmuxer(Mux *mux)
|
|
{
|
|
Muxrpc *rpc;
|
|
|
|
/* if there is anyone else sleeping, wake them to mux */
|
|
for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
|
|
if(!rpc->async){
|
|
mux->muxer = rpc;
|
|
rwakeup(&rpc->r);
|
|
return;
|
|
}
|
|
}
|
|
mux->muxer = nil;
|
|
}
|
|
|
|
void*
|
|
muxrpc(Mux *mux, void *tx)
|
|
{
|
|
int tag;
|
|
Muxrpc *r;
|
|
void *p;
|
|
|
|
if((r = allocmuxrpc(mux)) == nil)
|
|
return nil;
|
|
|
|
if((tag = tagmuxrpc(r, tx)) < 0)
|
|
return nil;
|
|
|
|
qlock(&mux->lk);
|
|
/* wait for our packet */
|
|
while(mux->muxer && mux->muxer != r && !r->p)
|
|
rsleep(&r->r);
|
|
|
|
/* if not done, there's no muxer: start muxing */
|
|
if(!r->p){
|
|
if(mux->muxer != nil && mux->muxer != r)
|
|
abort();
|
|
mux->muxer = r;
|
|
while(!r->p){
|
|
qunlock(&mux->lk);
|
|
_muxrecv(mux, 1, &p);
|
|
if(p == nil){
|
|
/* eof -- just give up and pass the buck */
|
|
qlock(&mux->lk);
|
|
dequeue(mux, r);
|
|
break;
|
|
}
|
|
muxmsgandqlock(mux, p);
|
|
}
|
|
electmuxer(mux);
|
|
}
|
|
p = r->p;
|
|
puttag(mux, r);
|
|
qunlock(&mux->lk);
|
|
if(p == nil)
|
|
werrstr("unexpected eof");
|
|
return p;
|
|
}
|
|
|
|
Muxrpc*
|
|
muxrpcstart(Mux *mux, void *tx)
|
|
{
|
|
int tag;
|
|
Muxrpc *r;
|
|
|
|
if((r = allocmuxrpc(mux)) == nil)
|
|
return nil;
|
|
r->async = 1;
|
|
if((tag = tagmuxrpc(r, tx)) < 0)
|
|
return nil;
|
|
return r;
|
|
}
|
|
|
|
int
|
|
muxrpccanfinish(Muxrpc *r, void **vp)
|
|
{
|
|
void *p;
|
|
Mux *mux;
|
|
int ret;
|
|
|
|
mux = r->mux;
|
|
qlock(&mux->lk);
|
|
ret = 1;
|
|
if(!r->p && !mux->muxer){
|
|
mux->muxer = r;
|
|
while(!r->p){
|
|
qunlock(&mux->lk);
|
|
p = nil;
|
|
if(!_muxrecv(mux, 0, &p))
|
|
ret = 0;
|
|
if(p == nil){
|
|
qlock(&mux->lk);
|
|
break;
|
|
}
|
|
muxmsgandqlock(mux, p);
|
|
}
|
|
electmuxer(mux);
|
|
}
|
|
p = r->p;
|
|
if(p)
|
|
puttag(mux, r);
|
|
qunlock(&mux->lk);
|
|
*vp = p;
|
|
return ret;
|
|
}
|
|
|
|
static void
|
|
enqueue(Mux *mux, Muxrpc *r)
|
|
{
|
|
r->next = mux->sleep.next;
|
|
r->prev = &mux->sleep;
|
|
r->next->prev = r;
|
|
r->prev->next = r;
|
|
}
|
|
|
|
static void
|
|
dequeue(Mux *mux, Muxrpc *r)
|
|
{
|
|
r->next->prev = r->prev;
|
|
r->prev->next = r->next;
|
|
r->prev = nil;
|
|
r->next = nil;
|
|
}
|
|
|
|
static int
|
|
gettag(Mux *mux, Muxrpc *r)
|
|
{
|
|
int i, mw;
|
|
Muxrpc **w;
|
|
|
|
for(;;){
|
|
/* wait for a free tag */
|
|
while(mux->nwait == mux->mwait){
|
|
if(mux->mwait < mux->maxtag-mux->mintag){
|
|
mw = mux->mwait;
|
|
if(mw == 0)
|
|
mw = 1;
|
|
else
|
|
mw <<= 1;
|
|
w = realloc(mux->wait, mw*sizeof(w[0]));
|
|
if(w == nil)
|
|
return -1;
|
|
memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
|
|
mux->wait = w;
|
|
mux->freetag = mux->mwait;
|
|
mux->mwait = mw;
|
|
break;
|
|
}
|
|
rsleep(&mux->tagrend);
|
|
}
|
|
|
|
i=mux->freetag;
|
|
if(mux->wait[i] == 0)
|
|
goto Found;
|
|
for(; i<mux->mwait; i++)
|
|
if(mux->wait[i] == 0)
|
|
goto Found;
|
|
for(i=0; i<mux->freetag; i++)
|
|
if(mux->wait[i] == 0)
|
|
goto Found;
|
|
/* should not fall out of while without free tag */
|
|
fprint(2, "libfs: nwait botch\n");
|
|
abort();
|
|
}
|
|
|
|
Found:
|
|
mux->nwait++;
|
|
mux->wait[i] = r;
|
|
r->tag = i+mux->mintag;
|
|
return r->tag;
|
|
}
|
|
|
|
static void
|
|
puttag(Mux *mux, Muxrpc *r)
|
|
{
|
|
int i;
|
|
|
|
i = r->tag - mux->mintag;
|
|
assert(mux->wait[i] == r);
|
|
mux->wait[i] = nil;
|
|
mux->nwait--;
|
|
mux->freetag = i;
|
|
rwakeup(&mux->tagrend);
|
|
free(r);
|
|
}
|