#include "MessageTypes.h"
#include "TEnv.h"
#include "TError.h"
#include "TException.h"
#include "TMonitor.h"
#include "TObjString.h"
#include "TProof.h"
#include "TSlave.h"
#include "TRegexp.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TXHandler.h"
#include "TXSocket.h"
#include "XProofProtocol.h"
#include "XrdProofConn.h"
#include "XrdClient/XrdClientConnMgr.hh"
#include "XrdClient/XrdClientConst.hh"
#include "XrdClient/XrdClientEnv.hh"
#include "XrdClient/XrdClientLogConnection.hh"
#include "XrdClient/XrdClientMessage.hh"
#ifndef WIN32
#include <sys/socket.h>
#else
#include <Winsock2.h>
#endif
#ifdef OLDXRDOUC
# include "XrdSysToOuc.h"
# include "XrdOuc/XrdOucError.hh"
# include "XrdOuc/XrdOucLogger.hh"
#else
# include "XrdSys/XrdSysError.hh"
# include "XrdSys/XrdSysLogger.hh"
#endif
#include "XrdProofdTrace.h"
XrdOucTrace *XrdProofdTrace = 0;
static XrdSysLogger eLogger;
static XrdSysError eDest(0, "Proofx");
#ifdef WIN32
ULong64_t TSocket::fgBytesSent;
ULong64_t TSocket::fgBytesRecv;
#endif
void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
{
::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
}
class TXSocketPingHandler : public TFileHandler {
TXSocket *fSocket;
public:
TXSocketPingHandler(TXSocket *s, Int_t fd)
: TFileHandler(fd, 1) { fSocket = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
Bool_t TXSocketPingHandler::Notify()
{
fSocket->Ping("ping handler");
return kTRUE;
}
Bool_t TXSocket::fgInitDone = kFALSE;
TXSockPipe TXSocket::fgPipe;
TString TXSocket::fgLoc = "undef";
TMutex TXSocket::fgSMtx;
std::list<TXSockBuf *> TXSocket::fgSQue;
Long64_t TXSockBuf::fgBuffMem = 0;
Long64_t TXSockBuf::fgMemMax = 10485760;
TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
const char *logbuf, Int_t loglevel, TXHandler *handler)
: TSocket(), fMode(m), fLogLevel(loglevel),
fBuffer(logbuf), fASem(0),
fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
{
fUrl = url;
eDest.logger(&eLogger);
if (!XrdProofdTrace)
XrdProofdTrace = new XrdOucTrace(&eDest);
if (!fgInitDone)
InitEnvs();
if (!(fAMtx = new TMutex(kTRUE))) {
Error("TXSocket", "problems initializing mutex for async queue");
return;
}
fAQue.clear();
if (!(fIMtx = new TMutex(kTRUE))) {
Error("TXSocket", "problems initializing mutex for interrupts");
return;
}
fILev = -1;
fIForward = kFALSE;
fByteLeft = 0;
fByteCur = 0;
fBufCur = 0;
fServType = kPROOFD;
fTcpWindowSize = -1;
fRemoteProtocol = -1;
fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
fSessionID = (fMode == 'C') ? -1 : psid;
fSocket = -1;
fReference = 0;
if (!fgPipe.IsValid()) {
Error("TXSocket", "internal pipe is invalid");
return;
}
TUrl u(url);
fAddress = gSystem->GetHostByName(u.GetHost());
u.SetProtocol("proof", kTRUE);
fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
fHandler = handler;
if (url) {
char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
if (!fConn || !(fConn->IsValid())) {
if (fConn->GetServType() != XrdProofConn::kSTProofd)
if (gDebug > 0)
Error("TXSocket", "fatal error occurred while opening a connection"
" to server [%s]: %s", url, fConn->GetLastErr());
return;
}
if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
if (!Create()) {
Error("TXSocket", "create or attach failed (%s)",
((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
Close();
return;
}
}
fUser = fConn->fUser.c_str();
fHost = fConn->fHost.c_str();
fPort = fConn->fPort;
if (fMode == 'C') {
fXrdProofdVersion = fConn->fRemoteProtocol;
fRemoteProtocol = fConn->fRemoteProtocol;
}
fUrl = fConn->fUrl.GetUrl().c_str();
fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
fAddress.fPort = fPort;
fPid = gSystem->GetPid();
}
}
TXSocket::TXSocket(const TXSocket &s) : TSocket(s),XrdClientAbsUnsolMsgHandler(s)
{
}
TXSocket& TXSocket::operator=(const TXSocket&)
{
return *this;
}
TXSocket::~TXSocket()
{
Close();
SafeDelete(fAMtx);
SafeDelete(fIMtx);
}
void TXSocket::SetLocation(const char *loc)
{
if (loc) {
fgLoc = loc;
fgPipe.SetLoc(loc);
} else {
fgLoc = "";
fgPipe.SetLoc("");
}
}
void TXSocket::SetSessionID(Int_t id)
{
if (id < 0 && fConn)
fConn->SetAsync(0);
fSessionID = id;
}
void TXSocket::DisconnectSession(Int_t id, Option_t *opt)
{
if (!IsValid()) {
if (gDebug > 0)
Info("DisconnectSession","not connected: nothing to do");
return;
}
Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
if (id > -1 || all) {
XPClientRequest Request;
memset(&Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
if (shutdown)
Request.proof.requestid = kXP_destroy;
else
Request.proof.requestid = kXP_detach;
Request.proof.sid = id;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
if (!xrsp && fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
SafeDelete(xrsp);
}
}
void TXSocket::Close(Option_t *opt)
{
TXSocket::fgPipe.Flush(this);
if (!fConn) {
if (gDebug > 0)
Info("Close","no connection: nothing to do");
return;
}
fConn->SetAsync(0);
if (IsValid()) {
TString o(opt);
Int_t sessID = fSessionID;
if (o.Index("#") != kNPOS) {
o.Remove(0,o.Index("#")+1);
if (o.Index("#") != kNPOS) {
o.Remove(o.Index("#"));
sessID = o.IsDigit() ? o.Atoi() : sessID;
}
}
if (sessID > -1) {
DisconnectSession(sessID, opt);
} else {
fConn->Close(opt);
}
}
SafeDelete(fConn);
}
UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
XrdClientMessage *m)
{
UnsolRespProcResult rc = kUNSOL_KEEP;
if (!m) {
if (gDebug > 2)
Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
return kUNSOL_CONTINUE;
} else {
if (gDebug > 2)
Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
}
if (m->IsError()) {
if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
if (gDebug > 0)
Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
XHandleErr_t herr = {1, 0};
if (!fHandler || fHandler->HandleError((const void *)&herr)) {
if (gDebug > 0)
Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
fSessionID = -1;
} else {
Touch();
}
} else {
if (gDebug > 2)
Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
}
return kUNSOL_CONTINUE;
}
if (!fConn || !m->MatchStreamid(fConn->fStreamid)) {
if (gDebug > 1)
Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
return kUNSOL_CONTINUE;
}
if (!m) {
Error("ProcessUnsolicitedMsg", "undefined message - disabling");
PostMsg(kPROOF_STOP);
return rc;
}
Int_t len = 0;
if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
PostMsg(kPROOF_STOP);
return rc;
}
Touch();
kXR_int32 acod = 0;
memcpy(&acod, m->GetData(), sizeof(kXR_int32));
if (acod > 10000)
Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
this, acod, len, m->HeaderSID());
if (gDebug > 3)
fgPipe.DumpReadySock();
kXR_int32 ilev = -1;
const char *lab = 0;
switch (acod) {
case kXPD_ping:
ilev = TProof::kPing;
lab = "kXPD_ping";
case kXPD_interrupt:
lab = !lab ? "kXPD_interrupt" : lab;
{ R__LOCKGUARD(fIMtx);
if (acod == kXPD_interrupt) {
memcpy(&ilev, pdata, sizeof(kXR_int32));
ilev = net2host(ilev);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
kXR_int32 ifw = 0;
if (len > 0) {
memcpy(&ifw, pdata, sizeof(kXR_int32));
ifw = net2host(ifw);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
}
fILev = ilev;
fIForward = (ifw == 1) ? kTRUE : kFALSE;
XHandleIn_t hin = {acod, 0, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_timer:
{
kXR_int32 opt = 1;
kXR_int32 delay = 0;
if (len > 0) {
memcpy(&opt, pdata, sizeof(kXR_int32));
opt = net2host(opt);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
if (len > 0) {
memcpy(&delay, pdata, sizeof(kXR_int32));
delay = net2host(delay);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, opt, delay, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_inflate:
{
kXR_int32 inflate = 1000;
if (len > 0) {
memcpy(&inflate, pdata, sizeof(kXR_int32));
inflate = net2host(inflate);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, inflate, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_priority:
{
kXR_int32 priority = -1;
if (len > 0) {
memcpy(&priority, pdata, sizeof(kXR_int32));
priority = net2host(priority);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, priority, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_flush:
{
XHandleIn_t hin = {acod, 0, 0, 0};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_urgent:
{
kXR_int32 type = -1;
if (len > 0) {
memcpy(&type, pdata, sizeof(kXR_int32));
type = net2host(type);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
kXR_int32 int1 = -1;
if (len > 0) {
memcpy(&int1, pdata, sizeof(kXR_int32));
int1 = net2host(int1);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
kXR_int32 int2 = -1;
if (len > 0) {
memcpy(&int2, pdata, sizeof(kXR_int32));
int2 = net2host(int2);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
}
XHandleIn_t hin = {acod, type, int1, int2};
if (fHandler)
fHandler->HandleInput((const void *)&hin);
else
Error("ProcessUnsolicitedMsg","handler undefined");
}
break;
case kXPD_msg:
{ R__LOCKGUARD(fAMtx);
TXSockBuf *b = PopUpSpare(len);
if (!b) {
Error("ProcessUnsolicitedMsg","could allocate spare buffer");
return rc;
}
memcpy(b->fBuf, pdata, len);
b->fLen = len;
fBytesRecv += len;
fAQue.push_back(b);
fgPipe.Post(this);
if (gDebug > 2)
Info("ProcessUnsolicitedMsg","%p: posting semaphore: %p (%d bytes)",
this,&fASem,len);
fASem.Post();
}
break;
case kXPD_feedback:
Info("ProcessUnsolicitedMsg",
"kXPD_feedback treatment not yet implemented");
break;
case kXPD_srvmsg:
{
kXR_int32 opt = 0;
memcpy(&opt, pdata, sizeof(kXR_int32));
opt = net2host(opt);
if (opt == 0 || opt == 1) {
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
} else {
opt = 1;
}
if (opt == 0) {
Printf("| %.*s", len, (char *)pdata);
} else {
Printf(" ");
Printf("| Message from server:");
Printf("| %.*s", len, (char *)pdata);
}
}
break;
case kXPD_errmsg:
Printf(" ");
Printf("| Error condition occured: message from server:");
Printf("| %.*s", len, (char *)pdata);
if (fHandler)
fHandler->HandleError();
else
Error("ProcessUnsolicitedMsg","handler undefined");
break;
case kXPD_msgsid:
{ R__LOCKGUARD(fAMtx);
kXR_int32 cid = 0;
memcpy(&cid, pdata, sizeof(kXR_int32));
cid = net2host(cid);
if (gDebug > 1)
Info("ProcessUnsolicitedMsg","found cid: %d", cid);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
TXSockBuf *b = PopUpSpare(len);
if (!b) {
Error("ProcessUnsolicitedMsg","could allocate spare buffer");
return rc;
}
memcpy(b->fBuf, pdata, len);
b->fLen = len;
b->fCid = cid;
fBytesRecv += len;
fAQue.push_back(b);
fgPipe.Post(this);
if (gDebug > 2)
Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
this, cid, &fASem, len);
fASem.Post();
}
break;
case kXPD_wrkmortem:
Printf(" ");
Printf("| %.*s", len, (char *)pdata);
if (fHandler)
fHandler->HandleError();
else
Error("ProcessUnsolicitedMsg","handler undefined");
break;
case kXPD_touch:
PostMsg(kPROOF_TOUCH);
break;
default:
Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
this, acod, GetTitle());
PostMsg(kPROOF_STOP);
break;
}
return rc;
}
void TXSocket::PostMsg(Int_t type)
{
TMessage m(type);
m.SetLength();
char *mbuf = m.Buffer();
Int_t mlen = m.Length();
if (m.CompBuffer()) {
mbuf = m.CompBuffer();
mlen = m.CompLength();
}
R__LOCKGUARD(fAMtx);
TXSockBuf *b = PopUpSpare(mlen);
if (!b) {
Error("PostMsg", "could allocate spare buffer");
return;
}
memcpy(b->fBuf, mbuf, mlen);
b->fLen = mlen;
fBytesRecv += mlen;
fAQue.push_back(b);
fgPipe.Post(this);
if (gDebug > 0)
Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
this, type, &fASem, mlen);
fASem.Post();
return;
}
Bool_t TXSocket::IsServProofd()
{
if (fConn && (fConn->GetServType() == XrdProofConn::kSTProofd))
return kTRUE;
return kFALSE;
}
Int_t TXSocket::GetInterrupt(Bool_t &forward)
{
if (gDebug > 2)
Info("GetInterrupt","%p: waiting to lock mutex %p", fIMtx);
R__LOCKGUARD(fIMtx);
Int_t ilev = -1;
forward = kFALSE;
if (fILev == -1)
Error("GetInterrupt","value is unset (%d) - protocol error",fILev);
ilev = fILev;
forward = fIForward;
fILev = -1;
fIForward = kFALSE;
return ilev;
}
Int_t TXSocket::Flush()
{
Int_t nf = 0;
list<TXSockBuf *> splist;
list<TXSockBuf *>::iterator i;
{ R__LOCKGUARD(fAMtx);
if (fAQue.size() > 0) {
Int_t sz = fAQue.size();
for (i = fAQue.begin(); i != fAQue.end(); i++) {
if (*i) {
splist.push_back(*i);
fAQue.erase(i);
nf += (*i)->fLen;
}
}
while (sz--)
fASem.TryWait();
fAQue.clear();
}
}
if (splist.size() > 0) {
R__LOCKGUARD(&fgSMtx);
for (i = splist.begin(); i != splist.end(); i++) {
fgSQue.push_back(*i);
splist.erase(i);
}
}
return nf;
}
Bool_t TXSocket::Create(Bool_t attach)
{
if (!IsValid()) {
if (gDebug > 0)
Info("Create","not connected: nothing to do");
return kFALSE;
}
Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
while (retriesleft--) {
XPClientRequest reqhdr;
memset( &reqhdr, 0, sizeof(reqhdr));
fConn->SetSID(reqhdr.header.streamid);
if (fMode == 'A' || attach) {
reqhdr.header.requestid = kXP_attach;
reqhdr.proof.sid = fSessionID;
} else {
reqhdr.header.requestid = kXP_create;
}
reqhdr.proof.int1 = fLogLevel;
const void *buf = (const void *)(fBuffer.Data());
reqhdr.header.dlen = fBuffer.Length();
if (gDebug >= 2)
Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
if (gDebug > 1)
Info("Create", "creating session of server %s", fUrl.Data());
char *answData = 0;
XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
&answData, "TXSocket::Create", 0);
struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
fBuffer = "";
if (xrsp) {
void *pdata = (void *)(xrsp->GetData());
Int_t len = xrsp->DataLen();
if (len >= (Int_t)sizeof(kXR_int32)) {
kXR_int32 psid = 0;
memcpy(&psid, pdata, sizeof(kXR_int32));
fSessionID = net2host(psid);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
} else {
Error("Create","session ID is undefined!");
}
if (len >= (Int_t)sizeof(kXR_int16)) {
kXR_int16 dver = 0;
memcpy(&dver, pdata, sizeof(kXR_int16));
fRemoteProtocol = net2host(dver);
pdata = (void *)((char *)pdata + sizeof(kXR_int16));
len -= sizeof(kXR_int16);
} else {
Warning("Create","protocol version of the remote PROOF undefined!");
}
if (fRemoteProtocol == 0) {
len += sizeof(kXR_int16);
kXR_int32 dver = 0;
memcpy(&dver, pdata, sizeof(kXR_int32));
fRemoteProtocol = net2host(dver);
pdata = (void *)((char *)pdata + sizeof(kXR_int32));
len -= sizeof(kXR_int32);
} else {
if (len >= (Int_t)sizeof(kXR_int16)) {
kXR_int16 dver = 0;
memcpy(&dver, pdata, sizeof(kXR_int16));
fXrdProofdVersion = net2host(dver);
pdata = (void *)((char *)pdata + sizeof(kXR_int16));
len -= sizeof(kXR_int16);
} else {
Warning("Create","version of the remote XrdProofdProtocol undefined!");
}
}
if (len > 0) {
char *url = new char[len+1];
memcpy(url, pdata, len);
url[len] = 0;
fBuffer = url;
delete[] url;
}
SafeDelete(xrsp);
if (srvresp)
free(srvresp);
return kTRUE;
} else {
if (fConn->GetOpenError() == kXP_TooManySess) {
fSessionID = -1;
return kFALSE;
} else {
if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
}
if (gDebug > 0)
Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
if (retriesleft <= 0)
Error("Create", "%d creation/attachment attempts failed: no attempts left",
gEnv->GetValue("XProof.CreationRetries", 4));
}
Error("Create:",
"problems creating or attaching to a remote server (%s)",
((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
return kFALSE;
}
Int_t TXSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
{
TSystem::ResetErrno();
fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
: (~kXPD_async & fSendOpt) ;
XPClientRequest Request;
memset( &Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.sendrcv.requestid = kXP_sendmsg;
Request.sendrcv.sid = fSessionID;
Request.sendrcv.opt = fSendOpt;
Request.sendrcv.cid = GetClientID();
Request.sendrcv.dlen = length;
if (gDebug >= 2)
Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
if (xrsp) {
Int_t nsent = length;
fBytesSent += length;
SafeDelete(xrsp);
Touch();
return nsent;
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
else
Printf("%s: error occured but no message from server", fHost.Data());
}
Error("SendRaw", "%s: problems sending %d bytes to server",
fHost.Data(), length);
return -1;
}
Bool_t TXSocket::Ping(const char *ord)
{
TSystem::ResetErrno();
if (gDebug > 0)
Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
if (!IsValid()) {
Error("Ping","not connected: nothing to do");
return kFALSE;
}
kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
XPClientRequest Request;
memset( &Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.sendrcv.requestid = kXP_ping;
Request.sendrcv.sid = fSessionID;
Request.sendrcv.opt = options;
Request.sendrcv.dlen = 0;
Bool_t res = kFALSE;
if (fMode != 'i') {
char *pans = 0;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
kXR_int32 *pres = (kXR_int32 *) pans;
if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
*pres = net2host(*pres);
res = (*pres == 1) ? kTRUE : kFALSE;
Touch();
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
SafeDelete(xrsp);
} else {
if (XPD::clientMarshall(&Request) == 0) {
XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
res = (e == kOK) ? kTRUE : kFALSE;
} else {
Error("Ping", "%p: int: problems marshalling request", this);
}
}
if (!res) {
Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
} else if (gDebug > 0) {
Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
}
return res;
}
void TXSocket::RemoteTouch()
{
TSystem::ResetErrno();
if (gDebug > 0)
Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
if (!IsValid()) {
Error("RemoteTouch","not connected: nothing to do");
return;
}
XPClientRequest Request;
memset( &Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.sendrcv.requestid = kXP_touch;
Request.sendrcv.sid = fSessionID;
Request.sendrcv.opt = 0;
Request.sendrcv.dlen = 0;
if (XPD::clientMarshall(&Request) != 0) {
Error("Touch", "%p: problems marshalling request ", this);
return;
}
if (fConn->LowWrite(&Request, 0, 0) != kOK)
Error("Touch", "%p: %s: problems sending touch request to server", this);
return;
}
Int_t TXSocket::PickUpReady()
{
fBufCur = 0;
fByteLeft = 0;
fByteCur = 0;
if (gDebug > 2)
Info("PickUpReady","%p: going to sleep", this);
if (!fDontTimeout) {
static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
static Int_t dt = 2000;
Int_t to = timeout;
while (to && !fRDInterrupt) {
if (fASem.Wait(dt) != 0) {
to -= dt;
if (to <= 0) {
Error("PickUpReady","error waiting at semaphore");
return -1;
} else {
if (gDebug > 0)
Info("PickUpReady","%p: got timeout: retring (%d secs)", this, to/1000);
}
} else
break;
}
if (fRDInterrupt) {
Error("PickUpReady","interrupted");
fRDInterrupt = kFALSE;
return -1;
}
} else {
if (fASem.Wait() != 0) {
Error("PickUpReady","error waiting at semaphore");
return -1;
}
}
if (gDebug > 2)
Info("PickUpReady","%p: waken up", this);
R__LOCKGUARD(fAMtx);
if (fAQue.size() <= 0) {
Error("PickUpReady","queue is empty - protocol error ?");
return -1;
}
fBufCur = fAQue.front();
fAQue.pop_front();
if (fBufCur)
fByteLeft = fBufCur->fLen;
if (gDebug > 2)
Info("PickUpReady","%p: got message (%d bytes)", this, (Int_t)(fBufCur ? fBufCur->fLen : 0));
fBytesRecv += fBufCur->fLen;
if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
SetClientID(fBufCur->fCid);
fgPipe.Clean(this);
return 0;
}
TXSockBuf *TXSocket::PopUpSpare(Int_t size)
{
TXSockBuf *buf = 0;
static Int_t nBuf = 0;
R__LOCKGUARD(&fgSMtx);
Int_t maxsz = 0;
if (fgSQue.size() > 0) {
list<TXSockBuf *>::iterator i;
for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
if ((*i) && (*i)->fSiz >= size) {
buf = *i;
if (gDebug > 2)
Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
size, fgSQue.size(), nBuf, buf, buf->fSiz);
fgSQue.erase(i);
return buf;
}
}
buf = fgSQue.front();
buf->Resize(size);
if (gDebug > 2)
Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
size, fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
fgSQue.pop_front();
return buf;
}
char *b = (char *)malloc(size);
if (b)
buf = new TXSockBuf(b, size);
nBuf++;
if (gDebug > 2)
Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
size, fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
return buf;
}
void TXSocket::PushBackSpare()
{
R__LOCKGUARD(&fgSMtx);
if (gDebug > 2)
Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
fBufCur, fBufCur->fSiz, TXSockBuf::BuffMem());
if (TXSockBuf::BuffMem() < TXSockBuf::GetMemMax()) {
fgSQue.push_back(fBufCur);
} else {
delete fBufCur;
}
fBufCur = 0;
fByteCur = 0;
fByteLeft = 0;
}
Int_t TXSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions)
{
if (!buffer || (length <= 0))
return -1;
if (!fBufCur && (PickUpReady() != 0))
return -1;
if (fByteLeft >= length) {
memcpy(buffer, fBufCur->fBuf + fByteCur, length);
fByteCur += length;
if ((fByteLeft -= length) <= 0)
PushBackSpare();
Touch();
return length;
} else {
memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
Int_t at = fByteLeft;
Int_t tobecopied = length - fByteLeft;
PushBackSpare();
while (tobecopied > 0) {
if (PickUpReady() != 0)
return -1;
Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
fByteCur = ncpy;
if ((fByteLeft -= ncpy) <= 0)
PushBackSpare();
tobecopied -= ncpy;
at += ncpy;
}
}
fBytesRecv += length;
fgBytesRecv += length;
Touch();
return length;
}
Int_t TXSocket::SendInterrupt(Int_t type)
{
TSystem::ResetErrno();
XPClientRequest Request;
memset(&Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
if (type == (Int_t) TProof::kShutdownInterrupt)
Request.interrupt.requestid = kXP_destroy;
else
Request.interrupt.requestid = kXP_interrupt;
Request.interrupt.sid = fSessionID;
Request.interrupt.type = type;
Request.interrupt.dlen = 0;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
if (xrsp) {
Touch();
SafeDelete(xrsp);
return 0;
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
Error("SendInterrupt", "problems sending interrupt to server");
return -1;
}
Int_t TXSocket::Send(const TMessage &mess)
{
TSystem::ResetErrno();
if (mess.IsReading()) {
Error("Send", "cannot send a message used for reading");
return -1;
}
SendStreamerInfos(mess);
SendProcessIDs(mess);
mess.SetLength();
if (fCompress > 0 && mess.GetCompressionLevel() == 0)
const_cast<TMessage&>(mess).SetCompressionLevel(fCompress);
if (mess.GetCompressionLevel() > 0)
const_cast<TMessage&>(mess).Compress();
char *mbuf = mess.Buffer();
Int_t mlen = mess.Length();
if (mess.CompBuffer()) {
mbuf = mess.CompBuffer();
mlen = mess.CompLength();
}
kXR_int32 fSendOptDefault = fSendOpt;
switch (mess.What()) {
case kPROOF_PROCESS:
fSendOpt |= kXPD_process;
break;
case kPROOF_PROGRESS:
case kPROOF_FEEDBACK:
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_QUERYSUBMITTED:
fSendOpt |= kXPD_querynum;
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_STARTPROCESS:
fSendOpt |= kXPD_startprocess;
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_STOPPROCESS:
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_SETIDLE:
fSendOpt |= kXPD_setidle;
fSendOpt |= kXPD_fb_prog;
break;
case kPROOF_LOGFILE:
case kPROOF_LOGDONE:
if (GetClientIDSize() <= 1)
fSendOpt |= kXPD_logmsg;
break;
default:
break;
}
if (gDebug > 2)
Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
Int_t nsent = SendRaw(mbuf, mlen);
fSendOpt = fSendOptDefault;
if (nsent <= 0)
return nsent;
fBytesSent += nsent;
fgBytesSent += nsent;
return nsent - sizeof(UInt_t);
}
Int_t TXSocket::Recv(TMessage *&mess)
{
TSystem::ResetErrno();
if (!IsValid()) {
mess = 0;
return -5;
}
oncemore:
Int_t n;
UInt_t len;
if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
mess = 0;
return n;
}
len = net2host(len);
char *buf = new char[len+sizeof(UInt_t)];
if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
delete [] buf;
mess = 0;
return n;
}
fBytesRecv += n + sizeof(UInt_t);
fgBytesRecv += n + sizeof(UInt_t);
mess = new TMessage(buf, len+sizeof(UInt_t));
if (RecvStreamerInfos(mess))
goto oncemore;
if (RecvProcessIDs(mess))
goto oncemore;
return n;
}
TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
Long64_t l64, Int_t int3, const char *)
{
TObjString *sout = 0;
XPClientRequest reqhdr;
const void *buf = 0;
char *bout = 0;
char **vout = 0;
memset(&reqhdr, 0, sizeof(reqhdr));
fConn->SetSID(reqhdr.header.streamid);
reqhdr.header.requestid = kXP_admin;
reqhdr.proof.int1 = kind;
reqhdr.proof.int2 = int2;
switch (kind) {
case kQueryROOTVersions:
case kQuerySessions:
case kQueryWorkers:
reqhdr.proof.sid = 0;
reqhdr.header.dlen = 0;
vout = (char **)&bout;
break;
case kCleanupSessions:
reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
: (kXR_int32) kXPD_TopMaster;
reqhdr.proof.int3 = int2;
reqhdr.proof.sid = fSessionID;
reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
buf = (msg) ? (const void *)msg : buf;
break;
case kQueryLogPaths:
vout = (char **)&bout;
case kReleaseWorker:
case kSendMsgToUser:
case kGroupProperties:
case kSessionTag:
case kSessionAlias:
reqhdr.proof.sid = fSessionID;
reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
buf = (msg) ? (const void *)msg : buf;
break;
case kROOTVersion:
reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
buf = (msg) ? (const void *)msg : buf;
break;
case kGetWorkers:
reqhdr.proof.sid = fSessionID;
reqhdr.header.dlen = 0;
vout = (char **)&bout;
break;
case kReadBuffer:
reqhdr.header.requestid = kXP_readbuf;
reqhdr.readbuf.ofs = l64;
reqhdr.readbuf.len = int2;
if (int3 > 0 && fXrdProofdVersion < 1003) {
Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
" grep functionality not supported", fXrdProofdVersion);
return sout;
}
reqhdr.readbuf.int1 = int3;
if (!msg || strlen(msg) <= 0) {
Info("SendCoordinator", "kReadBuffer: file path undefined");
return sout;
}
reqhdr.header.dlen = strlen(msg);
buf = (const void *)msg;
vout = (char **)&bout;
break;
default:
Info("SendCoordinator", "unknown message kind: %d", kind);
return sout;
}
XrdClientMessage *xrsp =
fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator");
if (xrsp) {
if (bout && (xrsp->DataLen() > 0))
sout = new TObjString(TString(bout,xrsp->DataLen()));
if (bout)
free(bout);
Touch();
SafeDelete(xrsp);
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
return sout;
}
void TXSocket::SendUrgent(Int_t type, Int_t int1, Int_t int2)
{
TSystem::ResetErrno();
XPClientRequest Request;
memset(&Request, 0, sizeof(Request) );
fConn->SetSID(Request.header.streamid);
Request.proof.requestid = kXP_urgent;
Request.proof.sid = fSessionID;
Request.proof.int1 = type;
Request.proof.int2 = int1;
Request.proof.int3 = int2;
Request.proof.dlen = 0;
XrdClientMessage *xrsp =
fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
if (xrsp) {
Touch();
SafeDelete(xrsp);
} else {
if (fConn->GetLastErr())
Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
}
return;
}
void TXSocket::InitEnvs()
{
Int_t deb = gEnv->GetValue("XProof.Debug", -1);
EnvPutInt(NAME_DEBUG, deb);
if (deb > 0) {
XrdProofdTrace->What |= TRACE_REQ;
if (deb > 1) {
XrdProofdTrace->What |= TRACE_DBG;
if (deb > 2)
XrdProofdTrace->What |= TRACE_ALL;
}
}
TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
if (allowCO.Length() > 0)
EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
if (denyCO.Length() > 0)
EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
XrdProofConn::SetRetryParam(-1, -1);
Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
Int_t recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
DFLT_RECONNECTTIMEOUT);
EnvPutInt(NAME_RECONNECTTIMEOUT, recoTO);
Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
Int_t garbCollTh = gEnv->GetValue("XProof.StartGarbageCollectorThread",
DFLT_STARTGARBAGECOLLECTORTHREAD);
EnvPutInt(NAME_STARTGARBAGECOLLECTORTHREAD, garbCollTh);
EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
if (socks4Port > 0) {
if (socks4Host.IsNull())
socks4Host = "127.0.0.1";
EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
EnvPutInt(NAME_SOCKS4PORT, socks4Port);
}
TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
if (autolog.Length() > 0)
gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
TString netrc;
netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
gSystem->Setenv("XrdSecNETRC", netrc.Data());
TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
if (alogfile.Length() > 0)
gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
if (verisrv.Length() > 0)
gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
if (srvpuk.Length() > 0)
gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
if (cadir.Length() > 0)
gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
if (crldir.Length() > 0)
gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
if (crlext.Length() > 0)
gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
if (ucert.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
if (ukey.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
if (upxy.Length() > 0)
gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
if (valid.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
if (deplen.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
if (pxybits.Length() > 0)
gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
if (crlcheck.Length() > 0)
gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
if (delegpxy.Length() > 0)
gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
if (signpxy.Length() > 0)
gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
gROOT->GetVersion());
fgInitDone = kTRUE;
}
Int_t TXSocket::Reconnect()
{
if (gDebug > 0) {
Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
this, fConn, (fConn ? fConn->IsValid() : 0),
fUrl.Data(), fConn->GetLogConnID());
}
if (fXrdProofdVersion < 1005) {
Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
this, fXrdProofdVersion);
return -1;
}
if (fConn) {
if (gDebug > 0)
Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
fConn->ReConnect();
if (fConn->IsValid()) {
if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
if (!Create(kTRUE)) {
Error("TXSocket", "create or attach failed (%s)",
((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
Close();
return -1;
}
}
}
}
if (gDebug > 0) {
Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
((fConn && fConn->IsValid()) ? "succeeded!" : "failed"),
fConn->GetLogConnID() );
}
return ((fConn && fConn->IsValid()) ? 0 : -1);
}
TXSockBuf::TXSockBuf(Char_t *bp, Int_t sz, Bool_t own)
{
fBuf = fMem = bp;
fSiz = fLen = sz;
fOwn = own;
fCid = -1;
fgBuffMem += sz;
}
TXSockBuf::~TXSockBuf()
{
if (fOwn && fMem) {
free(fMem);
fgBuffMem -= fSiz;
}
}
void TXSockBuf::Resize(Int_t sz)
{
if (sz > fSiz) {
if ((fMem = (Char_t *)realloc(fMem, sz))) {
fgBuffMem += (sz - fSiz);
fBuf = fMem;
fSiz = sz;
fLen = 0;
}
}
}
Long64_t TXSockBuf::BuffMem()
{
return fgBuffMem;
}
Long64_t TXSockBuf::GetMemMax()
{
return fgMemMax;
}
void TXSockBuf::SetMemMax(Long64_t memmax)
{
fgMemMax = memmax > 0 ? memmax : fgMemMax;
}
TXSockPipe::TXSockPipe(const char *loc) : fMutex(kTRUE), fLoc(loc)
{
if (pipe(fPipe) != 0) {
Printf("TXSockPipe: problem initializing pipe for socket inputs");
fPipe[0] = -1;
fPipe[1] = -1;
return;
}
}
TXSockPipe::~TXSockPipe()
{
if (fPipe[0] >= 0) close(fPipe[0]);
if (fPipe[1] >= 0) close(fPipe[1]);
}
Int_t TXSockPipe::Post(TSocket *s)
{
if (!IsValid() || !s) return -1;
Int_t sz = 0;
{ R__LOCKGUARD(&fMutex);
fReadySock.Add(s);
Char_t c = 1;
if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
return -1;
}
if (gDebug > 2) sz = fReadySock.GetSize();
}
if (gDebug > 2)
Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d)",
fLoc.Data(), s, sz);
return 0;
}
Int_t TXSockPipe::Clean(TSocket *s)
{
if (!IsValid() || !s) return -1;
Int_t sz = 0;
Char_t c = 0;
{ R__LOCKGUARD(&fMutex);
if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
return -1;
}
fReadySock.Remove(s);
if (gDebug > 2) sz = fReadySock.GetSize();
}
if (gDebug > 2)
Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d)",
fLoc.Data(), s, sz);
return 0;
}
Int_t TXSockPipe::Flush(TSocket *s)
{
if (!IsValid() || !s) return -1;
TObject *o = 0;
{ R__LOCKGUARD(&fMutex);
o = fReadySock.FindObject(s);
while (o) {
fReadySock.Remove(s);
o = fReadySock.FindObject(s);
Char_t c = 0;
if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
}
}
((TXSocket *)s)->Flush();
if (gDebug > 0)
Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
return 0;
}
void TXSockPipe::DumpReadySock()
{
R__LOCKGUARD(&fMutex);
TString buf = Form("%d |", fReadySock.GetSize());
TIter nxs(&fReadySock);
TObject *o = 0;
while ((o = nxs()))
buf += Form(" %p",o);
Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
}
TXSocket *TXSockPipe::GetLastReady()
{
R__LOCKGUARD(&fMutex);
return (TXSocket *) fReadySock.Last();
}
Last change: Thu Dec 18 09:31:41 2008
Last generated: 2008-12-18 09:31
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.