#include <errno.h>
#ifdef WIN32
#include <io.h>
#endif
#include "TList.h"
#include "TObjArray.h"
#include "TObjString.h"
#include "TProof.h"
#include "TProofLog.h"
#include "TXProofMgr.h"
#include "TXSocket.h"
#include "TROOT.h"
ClassImp(TXProofMgr)
TProofMgr *GetTXProofMgr(const char *url, Int_t l, const char *al)
{ return ((TProofMgr *) new TXProofMgr(url, l, al)); }
class TXProofMgrInit {
public:
TXProofMgrInit() {
TProofMgr::SetTXProofMgrHook(&GetTXProofMgr);
}};
static TXProofMgrInit gxproofmgr_init;
TXProofMgr::TXProofMgr(const char *url, Int_t dbg, const char *alias)
: TProofMgr(url, dbg, alias)
{
fServType = kXProofd;
if (Init(dbg) != 0) {
SafeDelete(fSocket);
}
}
Int_t TXProofMgr::Init(Int_t)
{
TString u = fUrl.GetUrl(kTRUE);
fSocket = 0;
if (!(fSocket = new TXSocket(u, 'C', kPROOF_Protocol,
kXPROOF_Protocol, 0, -1, this)) ||
!(fSocket->IsValid())) {
if (!fSocket || !(fSocket->IsServProofd()))
if (gDebug > 0)
Error("Init", "while opening the connection to %s - exit (error: %d)",
u.Data(), (fSocket ? fSocket->GetOpenError() : -1));
if (fSocket && fSocket->IsServProofd())
fServType = TProofMgr::kProofd;
return -1;
}
fRemoteProtocol = fSocket->GetRemoteProtocol();
{ R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(fSocket);
}
return 0;
}
TXProofMgr::~TXProofMgr()
{
SetInvalid();
}
void TXProofMgr::SetInvalid()
{
if (fSocket)
fSocket->Close("P");
SafeDelete(fSocket);
{ R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(this);
}
}
TProof *TXProofMgr::AttachSession(TProofDesc *d, Bool_t gui)
{
if (!IsValid()) {
Warning("AttachSession","invalid TXProofMgr - do nothing");
return 0;
}
if (!d) {
Warning("AttachSession","invalid description object - do nothing");
return 0;
}
if (d->GetProof())
return d->GetProof();
TString u(Form("%s/?%d", fUrl.GetUrl(kTRUE), d->GetRemoteId()));
if (gui)
u += "GUI";
TProof *p = new TProof(u, 0, 0, gDebug, 0, this);
if (p && p->IsValid()) {
p->SetManager(this);
Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
: TProofDesc::kRunning;
d->SetStatus(st);
d->SetProof(p);
p->SetName(d->GetName());
} else {
Error("AttachSession", "attaching to PROOF session");
}
return p;
}
void TXProofMgr::DetachSession(Int_t id, Option_t *opt)
{
if (!IsValid()) {
Warning("DetachSession","invalid TXProofMgr - do nothing");
return;
}
if (id > 0) {
TProofDesc *d = GetProofDesc(id);
if (d) {
if (fSocket)
fSocket->DisconnectSession(d->GetRemoteId(), opt);
TProof *p = d->GetProof();
SafeDelete(p);
fSessions->Remove(d);
delete d;
}
} else if (id == 0) {
if (fSocket) {
TString o = Form("%sA",opt);
fSocket->DisconnectSession(-1, o);
}
if (fSessions) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
TProof *p = d->GetProof();
SafeDelete(p);
}
fSessions->Delete();
}
}
return;
}
Bool_t TXProofMgr::MatchUrl(const char *url)
{
if (!IsValid()) {
Warning("MatchUrl","invalid TXProofMgr - do nothing");
return 0;
}
TUrl u(url);
if (!strcmp(u.GetProtocol(), TUrl("a").GetProtocol()))
u.SetProtocol("proof");
if (u.GetPort() == TUrl("a").GetPort()) {
Int_t port = gSystem->GetServiceByName("proofd");
if (port < 0)
port = 1093;
u.SetPort(port);
}
if (!strcmp(u.GetHostFQDN(), fUrl.GetHost()))
if (u.GetPort() == fUrl.GetPort() ||
u.GetPort() == fSocket->GetPort())
if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
return kTRUE;
return kFALSE;
}
void TXProofMgr::ShowWorkers()
{
if (!IsValid()) {
Warning("ShowWorkers","invalid TXProofMgr - do nothing");
return;
}
TObjString *os = fSocket->SendCoordinator(TXSocket::kQueryWorkers);
if (os) {
TObjArray *oa = TString(os->GetName()).Tokenize(TString("&"));
if (oa) {
TIter nxos(oa);
TObjString *to = 0;
while ((to = (TObjString *) nxos()))
Printf("+ %s", to->GetName());
}
}
}
TList *TXProofMgr::QuerySessions(Option_t *opt)
{
if (opt && !strncasecmp(opt,"L",1))
return fSessions;
if (!IsValid()) {
Warning("QuerySessions","invalid TXProofMgr - do nothing");
return 0;
}
if (!fSessions) {
fSessions = new TList();
fSessions->SetOwner();
}
TList *ocl = new TList;
TObjString *os = fSocket->SendCoordinator(TXSocket::kQuerySessions);
if (os) {
TObjArray *oa = TString(os->GetName()).Tokenize(TString("|"));
if (oa) {
TProofDesc *d = 0;
TIter nxos(oa);
TObjString *to = (TObjString *) nxos();
while ((to = (TObjString *) nxos())) {
char al[256];
char tg[256];
Int_t id = -1, st = -1, nc = 0;
sscanf(to->GetName(),"%d %s %s %d %d", &id, tg, al, &st, &nc);
if (!(d = (TProofDesc *) fSessions->FindObject(tg))) {
Int_t locid = fSessions->GetSize() + 1;
d = new TProofDesc(tg, al, GetUrl(), locid, id, st, 0);
fSessions->Add(d);
} else {
d->SetStatus(st);
d->SetRemoteId(id);
d->SetTitle(al);
}
ocl->Add(new TObjString(tg));
}
SafeDelete(oa);
}
SafeDelete(os);
}
if (fSessions->GetSize() > 0) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
if (ocl->FindObject(d->GetName())) {
if (opt && !strncasecmp(opt,"S",1))
d->Print("");
} else {
fSessions->Remove(d);
SafeDelete(d);
}
}
}
return fSessions;
}
Bool_t TXProofMgr::HandleInput(const void *)
{
if (fSocket && fSocket->IsValid()) {
TMessage *mess;
if (fSocket->Recv(mess) >= 0) {
Int_t what = mess->What();
if (gDebug > 0)
Info("HandleInput", "%p: got message type: %d", this, what);
switch (what) {
case kPROOF_TOUCH:
fSocket->RemoteTouch();
break;
default:
Warning("HandleInput", "%p: got unknown message type: %d", what);
break;
}
}
} else {
Warning("HandleInput", "%p: got message but socket is invalid!");
}
return kTRUE;
}
Bool_t TXProofMgr::HandleError(const void *in)
{
XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
if (fSocket && herr && (herr->fOpt == 1)) {
fSocket->Reconnect();
if (fSocket && fSocket->IsValid()) {
if (gDebug > 0)
Printf("ProofMgr: connection to coordinator at %s re-established",
fUrl.GetUrl());
return kFALSE;
}
}
Printf("TXProofMgr::HandleError: %p: got called ...", this);
if (fSessions && fSessions->GetSize() > 0) {
TIter nxd(fSessions);
TProofDesc *d = 0;
while ((d = (TProofDesc *)nxd())) {
TProof *p = (TProof *) d->GetProof();
if (p)
p->InterruptCurrentMonitor();
}
}
if (gDebug > 0)
Printf("TXProofMgr::HandleError: %p: DONE ... ", this);
return kTRUE;
}
Int_t TXProofMgr::Reset(Bool_t hard, const char *usr)
{
if (!IsValid()) {
Warning("Reset","invalid TXProofMgr - do nothing");
return -1;
}
Int_t h = (hard) ? 1 : 0;
fSocket->SendCoordinator(TXSocket::kCleanupSessions, usr, h);
return 0;
}
TProofLog *TXProofMgr::GetSessionLogs(Int_t isess,
const char *stag, const char *pattern)
{
if (!IsValid()) {
Warning("GetSessionLogs","invalid TXProofMgr - do nothing");
return 0;
}
TProofLog *pl = 0;
isess = (isess > 0) ? -isess : isess;
TObjString *os = fSocket->SendCoordinator(TXSocket::kQueryLogPaths, stag, isess);
Int_t ii = 0;
if (os) {
TString rs(os->GetName());
Ssiz_t from = 0;
TString tag;
if (!rs.Tokenize(tag, from, "|")) {
Warning("GetSessionLogs", "Session tag undefined: corruption?\n"
" (received string: %s)", os->GetName());
return (TProofLog *)0;
}
TString purl;
if (!rs.Tokenize(purl, from, "|")) {
Warning("GetSessionLogs", "Pool URL undefined: corruption?\n"
" (received string: %s)", os->GetName());
return (TProofLog *)0;
}
if (!pl)
pl = new TProofLog(tag, purl, this);
TString to;
while (rs.Tokenize(to, from, "|")) {
if (!to.IsNull()) {
TString ord(to);
ord.Strip(TString::kLeading, ' ');
TString url(ord);
if ((ii = ord.Index(" ")) != kNPOS)
ord.Remove(ii);
if ((ii = url.Index(" ")) != kNPOS)
url.Remove(0, ii + 1);
pl->Add(ord, url);
if (gDebug > 1)
Info("GetSessionLogs", "ord: %s, url: %s", ord.Data(), url.Data());
}
}
SafeDelete(os);
if (pl) {
if (pattern && strlen(pattern) > 0)
pl->Retrieve("*", TProofLog::kGrep, 0, pattern);
else
pl->Retrieve();
}
}
return pl;
}
TObjString *TXProofMgr::ReadBuffer(const char *fin, Long64_t ofs, Int_t len)
{
if (!IsValid()) {
Warning("ReadBuffer","invalid TXProofMgr - do nothing");
return (TObjString *)0;
}
return fSocket->SendCoordinator(TXSocket::kReadBuffer, fin, len, ofs, 0);
}
TObjString *TXProofMgr::ReadBuffer(const char *fin, const char *pattern)
{
if (!IsValid()) {
Warning("ReadBuffer","invalid TXProofMgr - do nothing");
return (TObjString *)0;
}
Int_t plen = strlen(pattern);
Int_t lfi = strlen(fin);
char *buf = new char[lfi + plen + 1];
memcpy(buf, fin, lfi);
memcpy(buf+lfi, pattern, plen);
buf[lfi+plen] = 0;
return fSocket->SendCoordinator(TXSocket::kReadBuffer, buf, plen, 0, 1);
}
void TXProofMgr::ShowROOTVersions()
{
if (!IsValid()) {
Warning("ShowROOTVersions","invalid TXProofMgr - do nothing");
return;
}
TObjString *os = fSocket->SendCoordinator(TXSocket::kQueryROOTVersions);
if (os) {
Printf("----------------------------------------------------------\n");
Printf("Available versions (tag ROOT-vers remote-path PROOF-version):\n");
Printf("%s", os->GetName());
Printf("----------------------------------------------------------");
SafeDelete(os);
}
return;
}
void TXProofMgr::SetROOTVersion(const char *tag)
{
if (!IsValid()) {
Warning("SetROOTVersion","invalid TXProofMgr - do nothing");
return;
}
fSocket->SendCoordinator(TXSocket::kROOTVersion, tag);
return;
}
Int_t TXProofMgr::SendMsgToUsers(const char *msg, const char *usr)
{
Int_t rc = 0;
if (!msg || strlen(msg) <= 0) {
Error("SendMsgToUsers","no message to send - do nothing");
return -1;
}
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF] = {0};
char *p = &buf[0];
Int_t space = kMAXBUF - 1;
Int_t len = 0;
Int_t lusr = 0;
if (usr && strlen(usr) > 0 && (strlen(usr) != 1 || usr[0] != '*')) {
lusr = (strlen(usr) + 3);
sprintf(buf, "u:%s ", usr);
p += lusr;
space -= lusr;
}
if (!gSystem->AccessPathName(msg, kFileExists)) {
if (gSystem->AccessPathName(msg, kReadPermission)) {
Error("SendMsgToUsers","request to read message from unreadable file '%s'", msg);
return -1;
}
FILE *f = 0;
if (!(f = fopen(msg, "r"))) {
Error("SendMsgToUsers", "file '%s' cannot be open", msg);
return -1;
}
Int_t left = (Int_t) lseek(fileno(f), (off_t) 0, SEEK_END);
lseek(fileno(f), (off_t) 0, SEEK_SET);
Int_t wanted = left;
if (wanted > space) {
wanted = space;
Warning("SendMsgToUsers",
"requested to send %d bytes: max size is %d bytes: truncating", left, space);
}
do {
while ((len = read(fileno(f), p, wanted)) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendMsgToUsers", "error reading file");
break;
}
left -= len;
p += len;
wanted = (left > kMAXBUF-1) ? kMAXBUF-1 : left;
} while (len > 0 && left > 0);
} else {
len = strlen(msg);
if (len > space) {
Warning("SendMsgToUsers",
"requested to send %d bytes: max size is %d bytes: truncating", len, space);
len = space;
}
memcpy(p, msg, len);
}
buf[len + lusr] = 0;
fSocket->SendCoordinator(TXSocket::kSendMsgToUser, buf);
return rc;
}
Last change: Tue Jul 15 16:23:15 2008
Last generated: 2008-07-15 16:23
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.