#include "TXSlave.h"
#include "TProof.h"
#include "TProofServ.h"
#include "TSystem.h"
#include "TEnv.h"
#include "TROOT.h"
#include "TUrl.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TError.h"
#include "TSysEvtHandler.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TXSocket.h"
#include "TXSocketHandler.h"
#include "Varargs.h"
ClassImp(TXSlave)
TSlave *GetTXSlave(const char *url, const char *ord, Int_t perf,
const char *image, TProof *proof, Int_t stype,
const char *workdir, const char *msd)
{
return ((TSlave *)(new TXSlave(url, ord, perf, image,
proof, stype, workdir, msd)));
}
class XSlaveInit {
public:
XSlaveInit() {
TSlave::SetTXSlaveHook(&GetTXSlave);
}};
static XSlaveInit xslave_init;
void TXSlave::DoError(int level, const char *location, const char *fmt, va_list va) const
{
::ErrorHandler(level, Form("TXSlave::%s", location), fmt, va);
}
class TXSlaveInterruptHandler : public TSignalHandler {
private:
TXSocket *fSocket;
public:
TXSlaveInterruptHandler(TXSocket *s = 0)
: TSignalHandler(kSigInterrupt, kFALSE), fSocket(s) { }
Bool_t Notify();
};
Bool_t TXSlaveInterruptHandler::Notify()
{
Info("Notify","Processing interrupt signal ...");
if (fSocket)
fSocket->SetInterrupt();
return kTRUE;
}
TXSlave::TXSlave(const char *url, const char *ord, Int_t perf,
const char *image, TProof *proof, Int_t stype,
const char *workdir, const char *msd) : TSlave()
{
fImage = image;
fProofWorkDir = workdir;
fWorkDir = workdir;
fOrdinal = ord;
fPerfIdx = perf;
fProof = proof;
fSlaveType = (ESlaveType)stype;
fMsd = msd;
fIntHandler = 0;
fValid = kFALSE;
TXSocketHandler *sh = TXSocketHandler::GetSocketHandler();
gSystem->AddFileHandler(sh);
TXSocket::SetLocation((fProof->IsMaster()) ? "master" : "client");
Init(url, stype);
}
void TXSlave::Init(const char *host, Int_t stype)
{
TUrl url(host);
url.SetProtocol(fProof->fUrl.GetProtocol());
if (url.GetPort() == TUrl("a").GetPort()) {
Int_t port = gSystem->GetServiceByName("proofd");
if (port < 0) {
if (gDebug > 0)
Info("Init","service 'proofd' not found by GetServiceByName"
": using default IANA assigned tcp port 1093");
port = 1094;
} else {
if (gDebug > 1)
Info("Init","port from GetServiceByName: %d", port);
}
url.SetPort(port);
}
fName = url.GetHostFQDN();
fPort = url.GetPort();
fGroup = url.GetPasswd();
TString opts(url.GetOptions());
Bool_t attach = (opts.Length() > 0 && opts.IsDigit()) ? kTRUE : kFALSE;
Int_t psid = (attach) ? opts.Atoi() : kPROOF_Protocol;
TString iam;
Char_t mode = 's';
TString alias = fProof->GetTitle();
if (fProof->IsMaster() && stype == kSlave) {
iam = "Master";
mode = 's';
alias = Form("session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
} else if (fProof->IsMaster() && stype == kMaster) {
iam = "Master";
mode = 'm';
alias = Form("session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
} else if (!fProof->IsMaster() && stype == kMaster) {
iam = "Local Client";
mode = (attach) ? 'A' : 'M';
} else {
Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
R__ASSERT(0);
}
if (fProof->fConfFile.Length() > 0)
alias += Form("|cf:%s",fProof->fConfFile.Data());
TString envlist;
if (!fProof->GetManager() ||
fProof->GetManager()->GetRemoteProtocol() > 1001) {
const TList *envs = TProof::GetEnvVars();
if (envs != 0 ) {
TIter next(envs);
for (TObject *o = next(); o != 0; o = next()) {
TNamed *env = dynamic_cast<TNamed*>(o);
if (env != 0) {
if (!envlist.IsNull())
envlist += ",";
envlist += Form("%s=%s", env->GetName(), env->GetTitle());
}
}
}
} else {
if (fProof->GetManager() && TProof::GetEnvVars())
Info("Init", "** NOT ** sending user envs - RemoteProtocol : %d",
fProof->GetManager()->GetRemoteProtocol());
}
if (!envlist.IsNull())
alias += Form("|envs:%s", envlist.Data());
if (!(fSocket = new TXSocket(url.GetUrl(kTRUE), mode, psid,
-1, alias, fProof->GetLogLevel(), this))) {
Error("Init", "while opening the connection to %s - exit", url.GetUrl(kTRUE));
return;
}
if (!(fSocket->IsValid())) {
if (gDebug > 0)
Error("Init", "some severe error occurred while opening "
"the connection at %s - exit", url.GetUrl(kTRUE));
SafeDelete(fSocket);
return;
}
fSocket->SetTitle(fOrdinal);
if (!fProof->GetManager() && !envlist.IsNull() &&
((TXSocket *)fSocket)->GetXrdProofdVersion() <= 1001) {
Info("Init","user envs setting sent but unsupported remotely - RemoteProtocol : %d",
((TXSocket *)fSocket)->GetXrdProofdVersion());
}
((TXSocket *)fSocket)->fReference = fProof;
fProtocol = fSocket->GetRemoteProtocol();
fProof->fServType = TProofMgr::kXProofd;
fProof->fSessionID = ((TXSocket *)fSocket)->GetSessionID();
TString dpu(((TXSocket *)fSocket)->fBuffer);
if (dpu.Length() > 0)
fProof->SetDataPoolUrl(dpu);
{
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(fSocket);
}
R__LOCKGUARD2(gProofMutex);
fUser = ((TXSocket *)fSocket)->fUser;
PDB(kGlobal,3) {
Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
}
fValid = kTRUE;
}
Int_t TXSlave::SetupServ(Int_t, const char *)
{
Int_t what;
char buf[512];
if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
Error("SetupServ", "failed to receive slave startup message");
Close("S");
SafeDelete(fSocket);
fValid = kFALSE;
return -1;
}
if (what == kMESS_NOTOK) {
SafeDelete(fSocket);
fValid = kFALSE;
return -1;
}
if (fProtocol < 4) {
Error("SetupServ", "incompatible PROOF versions (remote version "
"must be >= 4, is %d)", fProtocol);
SafeDelete(fSocket);
fValid = kFALSE;
return -1;
}
fProof->fProtocol = fProtocol;
fSocket->SetOption(kNoDelay, 1);
return 0;
}
TXSlave::~TXSlave()
{
Close();
}
void TXSlave::Close(Option_t *opt)
{
if (fSocket)
fSocket->Close(opt);
SafeDelete(fInput);
SafeDelete(fSocket);
}
Int_t TXSlave::Ping()
{
if (!IsValid()) return -1;
return (((TXSocket *)fSocket)->Ping(GetOrdinal()) ? 0 : -1);
}
void TXSlave::Touch()
{
if (!IsValid()) return;
((TXSocket *)fSocket)->RemoteTouch();
return;
}
void TXSlave::Interrupt(Int_t type)
{
if (!IsValid()) return;
if (type == TProof::kLocalInterrupt) {
if (fProof) {
TMonitor *mon = fProof->fCurrentMonitor;
if (mon && fSocket && mon->GetListOfActives()->FindObject(fSocket)) {
if (gDebug > 2)
Info("Interrupt", "%p: deactivating from monitor %p", this, mon);
mon->DeActivate(fSocket);
}
} else {
Warning("Interrupt", "%p: reference to PROOF missing", this);
}
if (fSocket) {
R__LOCKGUARD(((TXSocket *)fSocket)->fAMtx);
TSemaphore *sem = &(((TXSocket *)fSocket)->fASem);
while (sem->TryWait() != 1)
sem->Post();
}
return;
}
((TXSocket *)fSocket)->SendInterrupt(type);
Info("Interrupt","Interrupt of type %d sent", type);
}
void TXSlave::StopProcess(Bool_t abort, Int_t timeout)
{
if (!IsValid()) return;
((TXSocket *)fSocket)->SendUrgent(TXSocket::kStopProcess, (Int_t)abort, timeout);
if (gDebug > 0)
Info("StopProcess", "Request of type %d sent over", abort);
}
Int_t TXSlave::GetProofdProtocol(TSocket *s)
{
Int_t rproto = -1;
UInt_t cproto = 0;
Int_t len = sizeof(cproto);
memcpy((char *)&cproto,
Form(" %d", TSocket::GetClientProtocol()),len);
Int_t ns = s->SendRaw(&cproto, len);
if (ns != len) {
::Error("TXSlave::GetProofdProtocol",
"sending %d bytes to proofd server [%s:%d]",
len, (s->GetInetAddress()).GetHostName(), s->GetPort());
return -1;
}
Int_t ibuf[2] = {0};
len = sizeof(ibuf);
Int_t nr = s->RecvRaw(ibuf, len);
if (nr != len) {
::Error("TXSlave::GetProofdProtocol",
"reading %d bytes from proofd server [%s:%d]",
len, (s->GetInetAddress()).GetHostName(), s->GetPort());
return -1;
}
Int_t kind = net2host(ibuf[0]);
if (kind == kROOTD_PROTOCOL) {
rproto = net2host(ibuf[1]);
} else {
kind = net2host(ibuf[1]);
if (kind == kROOTD_PROTOCOL) {
len = sizeof(rproto);
nr = s->RecvRaw(&rproto, len);
if (nr != len) {
::Error("TXSlave::GetProofdProtocol",
"reading %d bytes from proofd server [%s:%d]",
len, (s->GetInetAddress()).GetHostName(), s->GetPort());
return -1;
}
rproto = net2host(rproto);
}
}
if (gDebug > 2)
::Info("TXSlave::GetProofdProtocol",
"remote proofd: buf1: %d, buf2: %d rproto: %d",
net2host(ibuf[0]),net2host(ibuf[1]),rproto);
return rproto;
}
TObjString *TXSlave::SendCoordinator(Int_t kind, const char *msg, Int_t int2)
{
return ((TXSocket *)fSocket)->SendCoordinator(kind, msg, int2);
}
void TXSlave::SetAlias(const char *alias)
{
if (!IsValid()) return;
((TXSocket *)fSocket)->SendCoordinator(TXSocket::kSessionAlias, alias);
return;
}
Int_t TXSlave::SendGroupPriority(const char *grp, Int_t priority)
{
if (!IsValid()) return -1;
((TXSocket *)fSocket)->SendCoordinator(TXSocket::kGroupProperties, grp, priority);
return 0;
}
Bool_t TXSlave::HandleError(const void *in)
{
XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
if (fSocket && herr && (herr->fOpt == 1)) {
((TXSocket *)fSocket)->Reconnect();
if (fSocket && fSocket->IsValid()) {
if (gDebug > 0) {
if (!strcmp(GetOrdinal(), "0")) {
Printf("Proof: connection to master at %s:%d re-established",
GetName(), GetPort());
} else {
Printf("Proof: connection to node '%s' at %s:%d re-established",
GetOrdinal(), GetName(), GetPort());
}
}
return kFALSE;
}
}
Info("HandleError", "%p:%s:%s got called ... fProof: %p, fSocket: %p (valid: %d)",
this, fName.Data(), fOrdinal.Data(), fProof, fSocket,
(fSocket ? (Int_t)fSocket->IsValid() : -1));
SetInterruptHandler(kFALSE);
if (fProof) {
if (fProof->fIntHandler)
fProof->fIntHandler->Remove();
Info("HandleError", "%p: proof: %p", this, fProof);
if (fSocket) {
((TXSocket *)fSocket)->SetSessionID(-1);
((TXSocket *)fSocket)->PostMsg(kPROOF_FATAL);
}
if (fProof->IsMaster()) {
TString msg(Form("Worker '%s-%s' has been removed from the active list",
fName.Data(), fOrdinal.Data()));
TMessage m(kPROOF_MESSAGE);
m << msg;
if (gProofServ)
gProofServ->GetSocket()->Send(m);
else
Warning("HandleError", "%p: global reference to TProofServ missing");
}
} else {
Warning("HandleError", "%p: reference to PROOF missing", this);
}
Printf("TXSlave::HandleError: %p: DONE ... ", this);
return kTRUE;
}
Bool_t TXSlave::HandleInput(const void *)
{
if (fProof) {
TMonitor *mon = fProof->fCurrentMonitor;
if (gDebug > 2)
Info("HandleInput", "%p: %s: proof: %p, mon: %p",
this, GetOrdinal(), fProof, mon);
if (mon && mon->IsActive(fSocket)) {
if (gDebug > 2)
Info("HandleInput","%p: %s: posting monitor %p", this, GetOrdinal(), mon);
mon->SetReady(fSocket);
} else {
if (gDebug > 2) {
if (mon) {
Info("HandleInput", "%p: %s: not active in current monitor"
" - calling TProof::CollectInputFrom",
this, GetOrdinal());
} else {
Info("HandleInput", "%p: %s: calling TProof::CollectInputFrom",
this, GetOrdinal());
}
}
if (fProof->CollectInputFrom(fSocket) < 0)
FlushSocket();
}
} else {
Warning("HandleInput", "%p: %s: reference to PROOF missing", this, GetOrdinal());
return kFALSE;
}
return kTRUE;
}
void TXSlave::SetInterruptHandler(Bool_t on)
{
if (gDebug > 1)
Info("SetInterruptHandler", "enter: %d", on);
if (on) {
if (!fIntHandler)
fIntHandler = new TXSlaveInterruptHandler((TXSocket *)fSocket);
fIntHandler->Add();
} else {
if (fIntHandler)
fIntHandler->Remove();
}
}
void TXSlave::FlushSocket()
{
if (gDebug > 1)
Info("FlushSocket", "enter: %p", fSocket);
if (fSocket)
TXSocket::fgPipe.Flush(fSocket);
}
Last change: Tue Dec 9 09:01:14 2008
Last generated: 2008-12-09 09:01
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.