#include <errno.h>
#include "TApplicationRemote.h"
#include "TBrowser.h"
#include "TDirectory.h"
#include "TError.h"
#include "THashList.h"
#include "TMonitor.h"
#include "TRandom.h"
#include "TROOT.h"
#include "TServerSocket.h"
#include "TSystem.h"
#include "TRemoteObject.h"
#ifdef WIN32
#include <io.h>
#include <sys/types.h>
#endif
Bool_t TARInterruptHandler::Notify()
{
Info("Notify","Processing interrupt signal ...");
fApplicationRemote->Interrupt(kRRI_Hard);
return kTRUE;
}
ClassImp(TApplicationRemote)
static const char *gScript = "roots";
static const char *gScriptCmd = "\\\"%s %d localhost:%d/%s -d=%d\\\"";
#ifndef WIN32
static const char *gSshCmd = "ssh %s -f4 %s -R %d:localhost:%d sh -c \
\"'(sh=\\`basename \'\\\\\\$SHELL\'\\`; \
if test xbash = x\'\\\\\\$sh\' -o xsh = x\'\\\\\\$sh\' -o xzsh = x\'\\\\\\$sh\' -o xdash = x\'\\\\\\$sh\'; then \
\'\\\\\\$SHELL\' -l -c %s; \
elif test xcsh = x\'\\\\\\$sh\' -o xtcsh = x\'\\\\\\$sh\' -o xksh = x\'\\\\\\$sh\'; then \
\'\\\\\\$SHELL\' -c %s; \
else \
echo \\\"Unknown shell \'\\\\\\$SHELL\'\\\"; \
fi)'\"";
#else
static const char *gSshCmd = "ssh %s -f4 %s -R %d:localhost:%d sh -c \
\"'(sh=`basename $SHELL`; \
if test xbash = x$sh -o xsh = x$sh -o xzsh = x$sh -o xdash = x$sh; then \
$SHELL -l -c %s; \
elif test xcsh = x$sh -o xtcsh = x$sh -o xksh = x$sh; then \
$SHELL -c %s; \
else \
echo \"Unknown shell $SHELL\"; \
fi)'\"";
#endif
Int_t TApplicationRemote::fgPortAttempts = 100;
Int_t TApplicationRemote::fgPortLower = 49152;
Int_t TApplicationRemote::fgPortUpper = 65535;
TApplicationRemote::TApplicationRemote(const char *url, Int_t debug,
const char *script)
: TApplication(), fUrl(url)
{
fName = fUrl.GetHost();
if (strlen(fUrl.GetOptions()) > 0)
fName += Form("-%s", fUrl.GetOptions());
UserGroup_t *pw = gSystem->GetUserInfo(gSystem->GetEffectiveUid());
TString user = (pw) ? pw->fUser : "";
SafeDelete(pw);
if (strlen(fUrl.GetUser()) > 0 && user != fUrl.GetUser())
fName.Insert(0,Form("%s@", fUrl.GetUser()));
fIntHandler = 0;
fSocket = 0;
fMonitor = 0;
fFileList = 0;
fWorkingDir = 0;
fRootFiles = 0;
fReceivedObject = 0;
ResetBit(kCollecting);
Int_t port = -1;
Int_t na = fgPortAttempts;
Long_t now = gSystem->Now();
UInt_t seed;
memcpy(&seed,&now,sizeof(UInt_t));
gRandom->SetSeed(seed);
TServerSocket *ss = 0;
while (na--) {
port = (Int_t) (gRandom->Rndm() * (fgPortUpper - fgPortLower)) + fgPortLower;
ss = new TServerSocket(port);
if (ss->IsValid())
break;
}
if (!ss || !ss->IsValid()) {
Error("TApplicationRemote","unable to find a free port for connections");
SetBit(kInvalidObject);
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
Int_t rport = (port < fgPortUpper) ? port + 1 : port - 1;
TString sc = gScript;
if (script && *script) {
if (script[1] == '<') {
if (script[2])
sc.Form("source %s; %s", script+2, gScript);
else
Error("TApplicationRemote", "illegal script name <");
} else
sc = script;
}
sc.ReplaceAll("\"","");
TString userhost = fUrl.GetHost();
if (strlen(fUrl.GetUser()) > 0)
userhost.Insert(0, Form("%s@", fUrl.GetUser()));
const char *verb = "";
if (debug > 0)
verb = "-v";
TString scriptCmd;
scriptCmd.Form(gScriptCmd, sc.Data(), kRRemote_Protocol, rport, fUrl.GetFile(), debug);
TString cmd;
cmd.Form(gSshCmd, verb, userhost.Data(), rport, port, scriptCmd.Data(), scriptCmd.Data());
#ifdef WIN32
TApplication::NeedGraphicsLibs();
gApplication->InitializeGraphics();
#endif
if (gDebug > 0)
Info("TApplicationRemote", "executing: %s", cmd.Data());
if (gSystem->Exec(cmd) != 0) {
Info("TApplicationRemote", "an error occured during SSH connection");
mon->DeActivateAll();
delete mon;
delete ss;
SafeDelete(fSocket);
SetBit(kInvalidObject);
return;
}
mon->Select();
fSocket = ss->Accept();
mon->DeActivateAll();
delete mon;
delete ss;
Int_t what;
char buf[512];
if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
Error("TApplicationRemote", "failed to receive startup message");
SafeDelete(fSocket);
SetBit(kInvalidObject);
return;
}
Printf("%s", buf);
if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
Error("TApplicationRemote", "failed to receive remote server protocol");
SafeDelete(fSocket);
SetBit(kInvalidObject);
return;
}
if (fProtocol != kRRemote_Protocol)
Info("TApplicationRemote","server runs a different protocol version: %d (vs %d)",
fProtocol, kRRemote_Protocol);
TMessage *msg = 0;
if (fSocket->Recv(msg) < 0 || msg->What() != kMESS_ANY) {
Error("TApplicationRemote", "failed to receive server info - protocol error");
SafeDelete(fSocket);
SetBit(kInvalidObject);
return;
}
TString hostname;
(*msg) >> hostname >> fLogFilePath;
fUrl.SetHost(hostname);
fMonitor = new TMonitor;
fMonitor->Add(fSocket);
fIntHandler = new TARInterruptHandler(this);
gROOT->GetListOfSockets()->Remove(fSocket);
gROOT->GetListOfSockets()->Add(this);
fRootFiles = new TList;
fRootFiles->SetName("Files");
Collect();
return;
}
TApplicationRemote::~TApplicationRemote()
{
gROOT->GetListOfSockets()->Remove(this);
Terminate(0);
}
Int_t TApplicationRemote::Broadcast(const TMessage &mess)
{
if (!IsValid()) return -1;
if (fSocket->Send(mess) == -1) {
Error("Broadcast", "could not send message");
return -1;
}
return 0;
}
Int_t TApplicationRemote::Broadcast(const char *str, Int_t kind, Int_t type)
{
TMessage mess(kind);
if (kind == kMESS_ANY)
mess << type;
if (str) mess.WriteString(str);
return Broadcast(mess);
}
Int_t TApplicationRemote::BroadcastObject(const TObject *obj, Int_t kind)
{
TMessage mess(kind);
mess.WriteObject(obj);
return Broadcast(mess);
}
Int_t TApplicationRemote::BroadcastRaw(const void *buffer, Int_t length)
{
if (!IsValid()) return -1;
if (fSocket->SendRaw(buffer, length) == -1) {
Error("Broadcast", "could not send raw buffer");
return -1;
}
return 0;
}
Int_t TApplicationRemote::Collect(Long_t timeout)
{
fMonitor->ActivateAll();
if (!fMonitor->GetActive())
return 0;
Long_t nto = timeout;
if (gDebug > 2)
Info("Collect","active: %d", fMonitor->GetActive());
if (fIntHandler)
fIntHandler->Add();
SetBit(kCollecting);
Int_t rc = 0, cnt = 0;
while (fMonitor->GetActive() && (nto < 0 || nto > 0)) {
TSocket *s = fMonitor->Select(1000);
if (s && s != (TSocket *)(-1)) {
if ((rc = CollectInput()) != 0) {
fMonitor->DeActivate(s);
if (gDebug > 2)
Info("Collect","deactivating %p", s);
}
if (rc >= 0)
cnt++;
} else {
if (!s)
fMonitor->DeActivateAll();
if (s == (TSocket *)(-1) && nto > 0)
nto--;
}
}
ResetBit(kCollecting);
if (nto == 0)
fMonitor->DeActivateAll();
if (fIntHandler)
fIntHandler->Remove();
return cnt;
}
Int_t TApplicationRemote::CollectInput()
{
TMessage *mess;
Int_t rc = 0;
char str[512];
TObject *obj;
Int_t what;
Bool_t delete_mess = kTRUE;
if (fSocket->Recv(mess) < 0) {
SetBit(kInvalidObject);
SafeDelete(fSocket);
return -1;
}
if (!mess) {
SetBit(kInvalidObject);
SafeDelete(fSocket);
return -1;
}
what = mess->What();
if (gDebug > 2)
Info("CollectInput","what %d", what);
switch (what) {
case kMESS_OBJECT:
{
TObject *o = mess->ReadObject(mess->GetClass());
if (TString(o->ClassName()) == "TCanvas")
o->Draw();
else if (TString(o->ClassName()) == "TRemoteObject") {
TRemoteObject *robj = (TRemoteObject *)o;
if (TString(robj->GetClassName()) == "TSystemDirectory") {
if (fWorkingDir == 0) {
fWorkingDir = (TRemoteObject *)o;
}
}
}
else if (TString(o->ClassName()) == "TList") {
TList *list = (TList *)o;
TRemoteObject *robj = (TRemoteObject *)list->First();
if (robj && (TString(robj->GetClassName()) == "TFile")) {
TIter next(list);
while ((robj = (TRemoteObject *)next())) {
if (!fRootFiles->FindObject(robj->GetName()))
fRootFiles->Add(robj);
}
gROOT->RefreshBrowsers();
}
}
fReceivedObject = o;
}
break;
case kMESS_ANY:
{ Int_t type;
(*mess) >> type;
if (gDebug > 2)
Info("CollectInput","type %d", type);
switch (type) {
case kRRT_GetObject:
mess->ReadString(str, sizeof(str));
obj = gDirectory->Get(str);
if (obj) {
fSocket->SendObject(obj);
} else {
Warning("CollectInput",
"server requested an object that we do not have");
fSocket->Send(kMESS_NOTOK);
}
break;
case kRRT_Fatal:
SafeDelete(fSocket);
rc = -1;
break;
case kRRT_LogFile:
{ Int_t size;
(*mess) >> size;
RecvLogFile(size);
}
break;
case kRRT_LogDone:
{ Int_t st;
(*mess) >> st;
if (st < 0) {
SetBit(kInvalidObject);
}
if (gDebug > 1)
Info("CollectInput","kRTT_LogDone: status %d", st);
rc = 1;
}
break;
case kRRT_Message:
{ TString msg;
Bool_t lfeed;
(*mess) >> msg >> lfeed;
if (lfeed)
fprintf(stderr,"%s\n", msg.Data());
else
fprintf(stderr,"%s\r", msg.Data());
}
break;
case kRRT_SendFile:
{ TString fname;
(*mess) >> fname;
TMessage m(kMESS_ANY);
m << (Int_t) kRRT_SendFile;
char *imp = gSystem->Which(TROOT::GetMacroPath(), fname, kReadPermission);
if (!imp) {
Error("CollectInput", "file %s not found in path(s) %s",
fname.Data(), TROOT::GetMacroPath());
m << (Bool_t) kFALSE;
Broadcast(m);
} else {
TString impfile = imp;
delete [] imp;
Int_t dot = impfile.Last('.');
Bool_t hasHeader = kTRUE;
TString headfile = impfile;
if (dot != kNPOS)
headfile.Remove(dot);
headfile += ".h";
if (gSystem->AccessPathName(headfile, kReadPermission)) {
TString h = headfile;
headfile.Remove(dot);
headfile += ".hh";
if (gSystem->AccessPathName(headfile, kReadPermission)) {
hasHeader = kFALSE;
if (gDebug > 0)
Info("CollectInput", "no associated header file"
" found: tried: %s %s",
h.Data(), headfile.Data());
}
}
m << (Bool_t) kTRUE;
Broadcast(m);
if (SendFile(impfile, kForce) == -1) {
Info("CollectInput", "problems sending file %s", impfile.Data());
return 0;
}
if (hasHeader) {
Broadcast(m);
if (SendFile(headfile, kForce) == -1) {
Info("CollectInput", "problems sending file %s", headfile.Data());
return 0;
}
}
}
m.Reset(kMESS_ANY);
m << (Int_t) kRRT_SendFile;
m << (Bool_t) kFALSE;
Broadcast(m);
}
break;
default:
Warning("CollectInput","unknown type received from server: %d", type);
break;
}
}
break;
default:
Error("CollectInput", "unknown command received from server: %d", what);
SetBit(kInvalidObject);
SafeDelete(fSocket);
rc = -1;
break;
}
if (delete_mess)
delete mess;
return rc;
}
void TApplicationRemote::RecvLogFile(Int_t size)
{
const Int_t kMAXBUF = 16384;
char buf[kMAXBUF];
Int_t fdout = fileno(stdout);
if (fdout < 0) {
Warning("RecvLogFile", "file descriptor for outputs undefined (%d):"
" will not log msgs", fdout);
return;
}
lseek(fdout, (off_t) 0, SEEK_END);
Int_t left, rec, r;
Long_t filesize = 0;
while (filesize < size) {
left = Int_t(size - filesize);
if (left > kMAXBUF)
left = kMAXBUF;
rec = fSocket->RecvRaw(&buf, left);
filesize = (rec > 0) ? (filesize + rec) : filesize;
if (rec > 0) {
char *p = buf;
r = rec;
while (r) {
Int_t w;
w = write(fdout, p, r);
if (w < 0) {
SysError("RecvLogFile", "error writing to unit: %d", fdout);
break;
}
r -= w;
p += w;
}
} else if (rec < 0) {
Error("RecvLogFile", "error during receiving log file");
break;
}
}
}
Int_t TApplicationRemote::SendObject(const TObject *obj)
{
if (!IsValid() || !obj) return -1;
TMessage mess(kMESS_OBJECT);
mess.WriteObject(obj);
return Broadcast(mess);
}
Bool_t TApplicationRemote::CheckFile(const char *file, Long_t modtime)
{
Bool_t sendto = kFALSE;
if (!IsValid()) return -1;
TString fn = gSystem->BaseName(file);
TARFileStat *fs = 0;
if (fFileList && (fs = (TARFileStat *) fFileList->FindObject(fn))) {
if (fs->fModtime != modtime) {
TMD5 *md5 = TMD5::FileChecksum(file);
if (md5) {
if ((*md5) != fs->fMD5) {
sendto = kTRUE;
fs->fMD5 = *md5;
fs->fModtime = modtime;
}
delete md5;
} else {
Error("CheckFile", "could not calculate local MD5 check sum - dont send");
return kFALSE;
}
}
} else {
TMD5 *md5 = TMD5::FileChecksum(file);
if (md5) {
fs = new TARFileStat(fn, *md5, modtime);
if (!fFileList)
fFileList = new THashList;
fFileList->Add(fs);
delete md5;
} else {
Error("CheckFile", "could not calculate local MD5 check sum - dont send");
return kFALSE;
}
TMessage mess(kMESS_ANY);
mess << Int_t(kRRT_CheckFile) << TString(gSystem->BaseName(file)) << fs->fMD5;
fSocket->Send(mess);
TMessage *reply;
fSocket->Recv(reply);
if (reply) {
if (reply->What() == kMESS_ANY) {
Int_t type;
Bool_t uptodate;
(*reply) >> type >> uptodate;
if (type != kRRT_CheckFile) {
Warning("CheckFile", "received wrong type:"
" %d (expected %d): protocol error?",
type, (Int_t)kRRT_CheckFile);
}
sendto = uptodate ? kFALSE : kTRUE;
} else {
Error("CheckFile", "received wrong message: %d (expected %d)",
reply->What(), kMESS_ANY);
}
} else {
Error("CheckFile", "received empty message");
}
Collect();
}
return sendto;
}
Int_t TApplicationRemote::SendFile(const char *file, Int_t opt, const char *rfile)
{
if (!IsValid()) return -1;
#ifndef R__WIN32
Int_t fd = open(file, O_RDONLY);
#else
Int_t fd = open(file, O_RDONLY | O_BINARY);
#endif
if (fd < 0) {
SysError("SendFile", "cannot open file %s", file);
return -1;
}
Long64_t size;
Long_t id, flags, modtime;
if (gSystem->GetPathInfo(file, &id, &size, &flags, &modtime) == 1) {
Error("SendFile", "cannot stat file %s", file);
return -1;
}
if (size == 0) {
Error("SendFile", "empty file %s", file);
return -1;
}
Bool_t bin = (opt & kBinary) ? kTRUE : kFALSE;
Bool_t force = (opt & kForce) ? kTRUE : kFALSE;
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
const char *fnam = (rfile) ? rfile : gSystem->BaseName(file);
Bool_t sendto = force ? kTRUE : CheckFile(file, modtime);
size = sendto ? size : 0;
if (gDebug > 1 && size > 0)
Info("SendFile", "sending file %s", file);
sprintf(buf, "%s %d %lld", fnam, bin, size);
if (Broadcast(buf, kMESS_ANY, kRRT_File) == -1) {
SafeDelete(fSocket);
return -1;
}
if (sendto) {
lseek(fd, 0, SEEK_SET);
Int_t len;
do {
while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendFile", "error reading from file %s", file);
Interrupt();
close(fd);
return -1;
}
if (len > 0 && fSocket->SendRaw(buf, len) == -1) {
SysError("SendFile", "error writing to server @ %s:%d (now offline)",
fUrl.GetHost(), fUrl.GetPort());
SafeDelete(fSocket);
break;
}
} while (len > 0);
}
close(fd);
if (!TestBit(kCollecting))
Collect();
return IsValid() ? 0 : -1;
}
void TApplicationRemote::Terminate(Int_t status)
{
TMessage mess(kMESS_ANY);
mess << (Int_t)kRRT_Terminate << status;
Broadcast(mess);
SafeDelete(fRootFiles);
SafeDelete(fMonitor);
SafeDelete(fSocket);
}
void TApplicationRemote::SetPortParam(Int_t lower, Int_t upper, Int_t attempts)
{
if (lower > -1)
fgPortLower = lower;
if (upper > -1)
fgPortUpper = upper;
if (attempts > -1)
fgPortAttempts = attempts;
::Info("TApplicationRemote::SetPortParam","port scan: %d attempts in [%d,%d]",
fgPortAttempts, fgPortLower, fgPortUpper);
}
Long_t TApplicationRemote::ProcessLine(const char *line, Bool_t, Int_t *)
{
if (!line || !*line) return 0;
if (!strncasecmp(line, ".q", 2)) {
gApplication->ProcessLine(".R -close");
return 0;
}
if (!strncmp(line, "?", 1)) {
Help(line);
return 1;
}
fReceivedObject = 0;
InitializeGraphics();
Broadcast(line, kMESS_CINT);
Collect();
return (Long_t)fReceivedObject;
return 1;
}
void TApplicationRemote::Print(Option_t *opt) const
{
TString s(Form("OBJ: TApplicationRemote %s", fName.Data()));
Printf("%s", s.Data());
if (opt && opt[0] == 'F') {
s = " url: ";
if (strlen(fUrl.GetUser()) > 0)
s += Form("%s@", fUrl.GetUser());
s += fUrl.GetHostFQDN();
s += Form(" logfile: %s", fLogFilePath.Data());
Printf("%s", s.Data());
}
}
void TApplicationRemote::Interrupt(Int_t type)
{
if (!IsValid()) return;
fInterrupt = kTRUE;
#if 1
Info("Interrupt", "*** Ctrl-C not yet enabled *** (type= %d)", type);
return;
#else
char oobc = (char) type;
const int kBufSize = 1024;
char waste[kBufSize];
if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
Error("Interrupt", "error sending oobc to server");
return;
}
if (type == kRRI_Hard) {
char oob_byte;
int n, nch, nbytes = 0, nloop = 0;
while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
if (n == -2) {
fSocket->GetOption(kBytesToRead, nch);
if (nch == 0) {
gSystem->Sleep(1000);
continue;
}
if (nch > kBufSize) nch = kBufSize;
n = fSocket->RecvRaw(waste, nch);
if (n <= 0) {
Error("Interrupt", "error receiving waste from server");
break;
}
nbytes += n;
} else if (n == -3) {
gSystem->Sleep(100);
if (++nloop > 100) {
Error("Interrupt", "server does not respond");
break;
}
} else {
Error("Interrupt", "error receiving OOB from server");
break;
}
}
while (1) {
int atmark;
fSocket->GetOption(kAtMark, atmark);
if (atmark)
break;
fSocket->GetOption(kBytesToRead, nch);
if (nch == 0) {
gSystem->Sleep(1000);
continue;
}
if (nch > kBufSize) nch = kBufSize;
n = fSocket->RecvRaw(waste, nch);
if (n <= 0) {
Error("Interrupt", "error receiving waste (2) from server");
break;
}
nbytes += n;
}
if (nbytes > 0)
Info("Interrupt", "server synchronized: %d bytes discarded", nbytes);
Collect();
} else if (type == kRRI_Soft) {
Collect();
} else if (type == kRRI_Shutdown) {
;
} else {
Collect();
}
#endif
}
void TApplicationRemote::Browse(TBrowser *b)
{
b->Add(fRootFiles, "ROOT Files");
b->Add(fWorkingDir, fWorkingDir->GetTitle());
gROOT->RefreshBrowsers();
}
Last change: Wed Jun 25 08:34:41 2008
Last generated: 2008-06-25 08:34
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.