#include "TProofLite.h"
#ifdef WIN32
# include <io.h>
#endif
#include "TDSet.h"
#include "TEnv.h"
#include "TError.h"
#include "TFile.h"
#include "TFileCollection.h"
#include "TFileInfo.h"
#include "THashList.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TObjString.h"
#include "TPluginManager.h"
#include "TProofDataSetManager.h"
#include "TProofQueryResult.h"
#include "TProofServ.h"
#include "TQueryResultManager.h"
#include "TROOT.h"
#include "TServerSocket.h"
#include "TSlave.h"
#include "TSortedList.h"
#include "TTree.h"
#include "TVirtualProofPlayer.h"
#include "TH3F.h"
ClassImp(TProofLite)
TProofLite::TProofLite(const char *url, const char *conffile, const char *confdir,
Int_t loglevel, const char *alias, TProofMgr *mgr)
{
fUrl.SetUrl(url);
fManager = mgr;
fServType = TProofMgr::kProofLite;
fQueryMode = kSync;
fMasterServ = kTRUE;
SetBit(TProof::kIsClient);
SetBit(TProof::kIsMaster);
fUrl.SetProtocol("proof");
fUrl.SetHost("__lite__");
if (strlen(fUrl.GetUser()) <= 0) {
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUrl.SetUser(pw->fUser);
delete pw;
}
}
fMaster = gSystem->HostName();
fNWorkers = GetNumberOfWorkers(url);
Printf(" +++ Starting PROOF-Lite with %d workers +++", fNWorkers);
Init(url, conffile, confdir, loglevel, alias);
gProof = this;
}
Int_t TProofLite::Init(const char *, const char *conffile,
const char *confdir, Int_t loglevel, const char *)
{
R__ASSERT(gSystem);
fValid = kFALSE;
if (TestBit(TProof::kIsMaster)) {
if (!conffile || strlen(conffile) == 0)
fConfFile = kPROOF_ConfFile;
if (!confdir || strlen(confdir) == 0)
fConfDir = kPROOF_ConfDir;
} else {
fConfDir = confdir;
fConfFile = conffile;
}
if (CreateSandbox() != 0) {
Error("Init", "could not create/assert sandbox for this session");
return 0;
}
fSockPath = Form("%s/prooflite-sockpath-%s", gSystem->TempDirectory(), GetName());
fLogLevel = loglevel;
fProtocol = kPROOF_Protocol;
fSendGroupView = kTRUE;
fImage = "<local>";
fIntHandler = 0;
fStatus = 0;
fRecvMessages = new TList;
fRecvMessages->SetOwner(kTRUE);
fSlaveInfo = 0;
fChains = new TList;
fAvailablePackages = 0;
fEnabledPackages = 0;
fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
fInputData = 0;
ResetBit(TProof::kNewInputData);
fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
fProgressDialog = 0;
fProgressDialogStarted = kFALSE;
fRedirLog = kFALSE;
if (TestBit(TProof::kIsClient)) {
fLogFileName = Form("%s/session-%s.log", fWorkDir.Data(), GetName());
if ((fLogFileW = fopen(fLogFileName.Data(), "w")) == 0)
Error("Init", "could not create temporary logfile %s", fLogFileName.Data());
if ((fLogFileR = fopen(fLogFileName.Data(), "r")) == 0)
Error("Init", "could not open logfile %s for reading", fLogFileName.Data());
}
fLogToWindowOnly = kFALSE;
fCacheLock = new TProofLockPath(Form("%s/%s%s", gSystem->TempDirectory(),
kPROOF_CacheLockFile,
TString(fCacheDir).ReplaceAll("/","%").Data()));
fQueryLock = new TProofLockPath(Form("%s/%s%s-%s", gSystem->TempDirectory(),
kPROOF_QueryLockFile, GetName(),
TString(fQueryDir).ReplaceAll("/","%").Data()));
fQueryLock->Lock();
fQMgr = new TQueryResultManager(fQueryDir, GetName(), fWorkDir,
fQueryLock, fLogFileW);
if (fQMgr && fQMgr->ApplyMaxQueries(10) != 0)
Warning("Init", "problems applying fMaxQueries");
if (InitDataSetManager() != 0)
Warning("Init", "problems initializing the dataset manager");
fNotIdle = 0;
fSync = kTRUE;
fQueries = 0;
fOtherQueries = 0;
fDrawQueries = 0;
fMaxDrawQueries = 1;
fSeqNum = 0;
fSessionID = -1;
fWaitingSlaves = 0;
fPlayer = 0;
MakePlayer("lite");
fFeedback = new TList;
fFeedback->SetOwner();
fFeedback->SetName("FeedbackList");
AddInput(fFeedback);
fSlaves = new TSortedList(kSortDescending);
fActiveSlaves = new TList;
fInactiveSlaves = new TList;
fUniqueSlaves = new TList;
fAllUniqueSlaves = new TList;
fNonUniqueMasters = new TList;
fBadSlaves = new TList;
fAllMonitor = new TMonitor;
fActiveMonitor = new TMonitor;
fUniqueMonitor = new TMonitor;
fAllUniqueMonitor = new TMonitor;
fCurrentMonitor = 0;
fServSock = 0;
fForkStartup = kFALSE;
if (gEnv->GetValue("ProofLite.ForkStartup", 0) != 0) {
#ifndef WIN32
fForkStartup = kTRUE;
#else
Warning("Init", "fork-based workers startup is not available on Windows - ignoring");
#endif
}
fPackageLock = 0;
fEnabledPackagesOnClient = 0;
fLoadedMacros = 0;
fGlobalPackageDirList = 0;
if (TestBit(TProof::kIsClient)) {
TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
if (globpack.Length() > 0) {
Int_t ng = 0;
Int_t from = 0;
TString ldir;
while (globpack.Tokenize(ldir, from, ":")) {
if (gSystem->AccessPathName(ldir, kReadPermission)) {
Warning("Init", "directory for global packages %s does not"
" exist or is not readable", ldir.Data());
} else {
TString key = Form("G%d", ng++);
if (!fGlobalPackageDirList) {
fGlobalPackageDirList = new THashList();
fGlobalPackageDirList->SetOwner();
}
fGlobalPackageDirList->Add(new TNamed(key,ldir));
}
}
}
UserGroup_t *ug = gSystem->GetUserInfo();
fPackageLock = new TProofLockPath(Form("%s%s", kPROOF_PackageLockFile, ug->fUser.Data()));
delete ug;
fEnabledPackagesOnClient = new TList;
fEnabledPackagesOnClient->SetOwner();
}
if (SetupWorkers(0) != 0) {
Error("Init", "problems setting up workers");
return 0;
}
fValid = kTRUE;
fAllMonitor->DeActivateAll();
GoParallel(9999, kFALSE);
SendInitialState();
SetActive(kFALSE);
if (IsValid()) {
ActivateAsyncInput();
SetRunStatus(TProof::kRunning);
}
return fActiveSlaves->GetSize();
}
TProofLite::~TProofLite()
{
RemoveWorkers(0);
if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
gSystem->MakeDirectory(fQueryDir+"/.delete");
gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
}
if (fQueryLock) {
gSystem->Unlink(fQueryLock->GetName());
fQueryLock->Unlock();
}
SafeDelete(fServSock);
gSystem->Unlink(fSockPath);
}
Int_t TProofLite::GetNumberOfWorkers(const char *url)
{
Int_t nWorkers = -1;
if (url && strlen(url)) {
TString o(url);
Int_t in = o.Index("workers=");
if (in != kNPOS) {
o.Remove(0, in + strlen("workers="));
while (!o.IsDigit())
o.Remove(o.Length()-1);
nWorkers = (!o.IsNull()) ? o.Atoi() : nWorkers;
}
}
if (nWorkers <= 0) {
nWorkers = gEnv->GetValue("ProofLite.Workers", -1);
if (nWorkers <= 0) {
SysInfo_t si;
if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
nWorkers = si.fCpus;
} else {
nWorkers = 2;
}
}
}
return nWorkers;
}
Int_t TProofLite::SetupWorkers(Int_t opt, TList *startedWorkers)
{
if (!fServSock) fServSock = new TServerSocket(fSockPath);
if (!fServSock || !fServSock->IsValid()) {
Error("SetupWorkers",
"unable to create server socket for internal communications");
SetBit(kInvalidObject);
return -1;
}
TMonitor *mon = new TMonitor;
mon->Add(fServSock);
TList started;
TSlave *wrk = 0;
Int_t nWrksDone = 0, nWrksTot = -1;
TString fullord;
if (opt == 0) {
nWrksTot = fForkStartup ? 1 : fNWorkers;
Int_t ord = 0;
for (; ord < nWrksTot; ord++) {
fullord = Form("0.%d", ord);
SetProofServEnv(fullord);
if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
started.Add(wrk);
NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
}
} else {
if (!fForkStartup) {
Warning("SetupWorkers", "standard startup: workers already started");
return -1;
}
nWrksTot = fNWorkers - 1;
TString clones;
Int_t ord = 0;
for (; ord < nWrksTot; ord++) {
fullord.Form("0.%d", ord + 1);
if (!clones.IsNull()) clones += " ";
clones += fullord;
if ((wrk = CreateSlave("lite", fullord, -1, fImage, fWorkDir)))
started.Add(wrk);
NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
}
TMessage m(kPROOF_FORK);
m << clones;
Broadcast(m, kActive);
}
nWrksDone = 0;
nWrksTot = started.GetSize();
Int_t nSelects = 0;
Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
while (started.GetSize() > 0 && nSelects < nWrksTot) {
TSocket *xs = mon->Select(to);
nSelects++;
if (xs == (TSocket *) -1) continue;
TSocket *s = fServSock->Accept();
if (s && s->IsValid()) {
TMessage *msg = 0;
s->Recv(msg);
if (msg) {
TString ord;
*msg >> ord;
if ((wrk = (TSlave *) started.FindObject(ord))) {
started.Remove(wrk);
wrk->SetSocket(s);
{ R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(s);
}
if (wrk->IsValid()) {
wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
wrk->fParallel = 1;
wrk->SetupServ(TSlave::kSlave, 0);
}
if (wrk->IsValid()) {
fSlaves->Add(wrk);
if (opt == 1) fActiveSlaves->Add(wrk);
fAllMonitor->Add(wrk->GetSocket());
if (startedWorkers) startedWorkers->Add(wrk);
NotifyStartUp("Setting up worker servers", ++nWrksDone, nWrksTot);
} else {
fBadSlaves->Add(wrk);
}
}
}
}
}
mon->DeActivateAll();
delete mon;
if (!gROOT->IsBatch() && !fProgressDialog) {
if ((fProgressDialog =
gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
if (fProgressDialog->LoadPlugin() == -1)
fProgressDialog = 0;
}
if (opt == 1) {
Collect(kActive);
SendGroupView();
SetParallel(9999, 0);
}
return 0;
}
void TProofLite::NotifyStartUp(const char *action, Int_t done, Int_t tot)
{
Int_t frac = (Int_t) (done*100.)/tot;
char msg[512] = {0};
if (frac >= 100) {
sprintf(msg, "%s: OK (%d workers) \n",
action, tot);
} else {
sprintf(msg, "%s: %d out of %d (%d %%)\r",
action, done, tot, frac);
}
fprintf(stderr,"%s", msg);
}
Int_t TProofLite::SetProofServEnv(const char *ord)
{
if (!ord || strlen(ord) <= 0) {
Error("SetProofServEnv", "ordinal string undefined");
return -1;
}
TString rcfile(Form("%s/worker-%s.rootrc", fWorkDir.Data(), ord));
FILE *frc = fopen(rcfile.Data(), "w");
if (!frc) {
Error("SetProofServEnv", "cannot open rc file %s", rcfile.Data());
return -1;
}
fprintf(frc,"# The session working dir\n");
fprintf(frc,"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
fprintf(frc,"# Proof Log/Debug level\n");
fprintf(frc,"Proof.DebugLevel: %d\n", gDebug);
fprintf(frc,"# Ordinal number\n");
fprintf(frc,"ProofServ.Ordinal: %s\n", ord);
fprintf(frc,"# ROOT Version tag\n");
fprintf(frc,"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
TString sandbox = gEnv->GetValue("ProofLite.Sandbox", Form("%s/%s",
gSystem->WorkingDirectory(), kPROOF_WorkDir));
fprintf(frc,"# Users sandbox\n");
fprintf(frc, "ProofServ.Sandbox: %s\n", sandbox.Data());
fprintf(frc,"# Users cache\n");
fprintf(frc, "ProofServ.CacheDir: %s\n", fCacheDir.Data());
fprintf(frc,"# Users packages\n");
fprintf(frc, "ProofServ.PackageDir: %s\n", fPackageDir.Data());
fprintf(frc,"# Server image\n");
fprintf(frc, "ProofServ.Image: %s\n", fImage.Data());
fprintf(frc,"# Open socket\n");
fprintf(frc, "ProofServ.OpenSock: %s\n", fSockPath.Data());
fprintf(frc,"# Client Protocol\n");
fprintf(frc, "ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
fclose(frc);
TString envfile(Form("%s/worker-%s.env", fWorkDir.Data(), ord));
FILE *fenv = fopen(envfile.Data(), "w");
if (!fenv) {
Error("SetProofServEnv", "cannot open env file %s", envfile.Data());
return -1;
}
fprintf(fenv, "ROOTSYS=%s\n", gSystem->Getenv("ROOTSYS"));
fprintf(fenv, "ROOTCONFDIR=%s\n", gSystem->Getenv("ROOTSYS"));
fprintf(fenv, "TMPDIR=%s\n", gSystem->TempDirectory());
TString logfile(Form("%s/worker-%s.log", fWorkDir.Data(), ord));
fprintf(fenv, "ROOTPROOFLOGFILE=%s\n", logfile.Data());
fprintf(fenv, "ROOTRCFILE=%s\n", rcfile.Data());
fprintf(fenv, "ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
if (fgProofEnvList) {
TString namelist;
TIter nxenv(fgProofEnvList);
TNamed *env = 0;
while ((env = (TNamed *)nxenv())) {
fprintf(fenv, "%s=%s\n", env->GetName(), env->GetTitle());
if (namelist.Length() > 0)
namelist += ',';
namelist += env->GetName();
}
fprintf(fenv, "PROOF_ALLVARS=%s\n", namelist.Data());
}
fclose(fenv);
return 0;
}
Int_t TProofLite::CreateSandbox()
{
TString sandbox = gEnv->GetValue("Proof.Sandbox", "");
if (sandbox.IsNull()) {
sandbox.Form("~/%s", kPROOF_WorkDir);
}
gSystem->ExpandPathName(sandbox);
if (AssertPath(sandbox, kTRUE) != 0) return -1;
fPackageDir = gEnv->GetValue("Proof.PackageDir", "");
if (fPackageDir.IsNull())
fPackageDir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
if (AssertPath(fPackageDir, kTRUE) != 0) return -1;
fCacheDir = gEnv->GetValue("Proof.CacheDir", "");
if (fCacheDir.IsNull())
fCacheDir.Form("%s/%s", sandbox.Data(), kPROOF_CacheDir);
if (AssertPath(fCacheDir, kTRUE) != 0) return -1;
fDataSetDir = gEnv->GetValue("Proof.DataSetDir", "");
if (fDataSetDir.IsNull())
fDataSetDir.Form("%s/%s", sandbox.Data(), kPROOF_DataSetDir);
if (AssertPath(fDataSetDir, kTRUE) != 0) return -1;
TString stag;
stag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
SetName(stag.Data());
TString sessdir(gSystem->WorkingDirectory());
sessdir.ReplaceAll(gSystem->HomeDirectory(),"");
sessdir.ReplaceAll("/","-");
sessdir.Replace(0,1,"/",1);
sessdir.Insert(0, sandbox.Data());
fWorkDir.Form("%s/session-%s", sessdir.Data(), stag.Data());
if (AssertPath(fWorkDir, kTRUE) != 0) return -1;
TString lastsess;
lastsess.Form("%s/last-lite-session", sessdir.Data());
gSystem->Unlink(lastsess);
gSystem->Symlink(fWorkDir, lastsess);
fQueryDir = gEnv->GetValue("Proof.QueryDir", "");
if (fQueryDir.IsNull())
fQueryDir.Form("%s/%s", sessdir.Data(), kPROOF_QueryDir);
if (AssertPath(fQueryDir, kTRUE) != 0) return -1;
CleanupSandbox();
return 0;
}
void TProofLite::Print(Option_t *option) const
{
if (IsParallel())
Printf("*** PROOF-Lite cluster (parallel mode, %d workers):", GetParallel());
else
Printf("*** PROOF-Lite cluster (sequential mode)");
Printf("Host name: %s", gSystem->HostName());
Printf("User: %s", GetUser());
TString ver(gROOT->GetVersion());
if (gROOT->GetSvnRevision() > 0)
ver += Form("|r%d", gROOT->GetSvnRevision());
if (gSystem->Getenv("ROOTVERSIONTAG"))
ver += Form("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
Printf("ROOT version|rev|tag: %s", ver.Data());
Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
gSystem->GetBuildCompilerVersion());
Printf("Protocol version: %d", GetClientProtocol());
Printf("Working directory: %s", gSystem->WorkingDirectory());
Printf("Communication path: %s", fSockPath.Data());
Printf("Log level: %d", GetLogLevel());
Printf("Number of workers: %d", GetNumberOfSlaves());
Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
Printf("Total real time used (s): %.3f", GetRealTime());
Printf("Total CPU time used (s): %.3f", GetCpuTime());
if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
Printf("List of workers:");
TIter nextslave(fSlaves);
while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
if (sl->IsValid())
sl->Print(option);
}
}
}
TProofQueryResult *TProofLite::MakeQueryResult(Long64_t nent, const char *opt,
Long64_t fst, TDSet *dset,
const char *selec)
{
if (fQMgr) fQMgr->IncrementSeqNum();
TProofQueryResult *pqr = new TProofQueryResult(fQMgr->SeqNum(), opt,
fPlayer->GetInputList(), nent,
fst, dset, selec,
(dset ? dset->GetEntryList() : 0));
pqr->SetTitle(GetName());
return pqr;
}
void TProofLite::SetQueryRunning(TProofQueryResult *pq)
{
fflush(fLogFileW);
Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
Printf(" ");
Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
TString parlist = "";
TIter nxp(fEnabledPackagesOnClient);
TObjString *os= 0;
while ((os = (TObjString *)nxp())) {
if (parlist.Length() <= 0)
parlist = os->GetName();
else
parlist += Form(";%s",os->GetName());
}
pq->SetRunning(startlog, parlist);
pq->SetProcessInfo(pq->GetEntries(), GetCpuTime(), GetBytesRead());
}
TList *TProofLite::GetDataSet(const char *name)
{
TString fileListPath;
if (strchr(name, '~') == name) {
char *nameCopy = new char[strlen(name)];
strcpy(nameCopy, name + 1);
char *userName = strtok(nameCopy, "/");
if (strcmp(strtok(0, "/"), "public"))
return 0;
fileListPath = fWorkDir + "/../" + userName + "/"
+ kPROOF_DataSetDir + "/public/";
delete[] nameCopy;
} else if (strchr(name, '/') && strstr(name, "public") != name) {
Printf("Dataset name should be of form [[~user/]public/]dataset");
return 0;
} else
fileListPath = fDataSetDir + "/" + name + ".root";
TList *fileList = 0;
if (gSystem->AccessPathName(fileListPath.Data(), kFileExists) == kFALSE) {
TFile *f = TFile::Open(fileListPath);
f->cd();
fileList = (TList *) f->Get("fileList");
f->Close();
delete f;
if (strchr(name, '~') == name)
delete[] fileListPath;
}
return fileList;
}
Long64_t TProofLite::DrawSelect(TDSet *dset, const char *varexp,
const char *selection, Option_t *option,
Long64_t nentries, Long64_t first)
{
if (!IsValid()) return -1;
if (!IsIdle()) {
Info("DrawSelect","not idle, asynchronous Draw not supported");
return -1;
}
TString opt(option);
Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
if (idx != kNPOS)
opt.Replace(idx,4,"");
TString q;
q.Form("draw|%s|%s", varexp, selection);
return Process(dset, q.Data(), opt, nentries, first);
}
Long64_t TProofLite::Process(TDSet *dset, const char *selector, Option_t *option,
Long64_t nentries, Long64_t first)
{
fSync = (GetQueryMode(option) == kSync);
if (!fSync) {
Info("Process","asynchronous mode not yet supported in PROOF-Lite");
return -1;
}
if (!IsIdle()) {
Info("Process", "not idle: cannot accept queries");
return -1;
}
if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
fRunningDSets->SetOwner(kTRUE);
fRunningDSets->Delete();
}
if (!IsValid() || !fQMgr) {
Error("Process", "invalid sesion or query-result manager undefined!");
return -1;
}
if (!fPlayer->GetInputList()->FindObject("PROOF_MaxSlavesPerNode"))
SetParameter("PROOF_MaxSlavesPerNode", (Long_t)fNWorkers);
Bool_t hasNoData = (dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
TString emsg;
if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
Error("Process", "from AssertDataSet: %s", emsg.Data());
return -1;
}
if (dset->GetListOfElements()->GetSize() == 0) {
Error("Process", "no files to process!");
return -1;
}
}
TString selec(selector), varexp, selection, objname;
if (selec.BeginsWith("draw|")) {
TString ss(selector), s;
Ssiz_t from = 0;
ss.Tokenize(s, from, "|");
if (!ss.Tokenize(varexp, from, "|")) {
Error("Process", "draw query: badly formed expression: %s", selector);
return -1;
}
ss.Tokenize(selection, from, "|");
if (fPlayer->GetDrawArgs(varexp, selection, option, selec, objname) != 0) {
Error("Process", "draw query: error parsing arguments %s, %s, %s",
varexp.Data(), selection.Data(), option);
return -1;
}
}
TProofQueryResult *pq = MakeQueryResult(nentries, option, first, 0, selec);
if (!(pq->IsDraw())) {
if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
fQMgr->SaveQuery(pq);
}
fSeqNum = pq->GetSeqNum();
SetQueryRunning(pq);
if (!(pq->IsDraw()))
fQMgr->SaveQuery(pq);
else
fQMgr->IncrementDrawQueries();
if (!gROOT->IsBatch()) {
Int_t dsz = dset->GetListOfElements()->GetSize();
if (fProgressDialog && !TestBit(kUsingSessionGui)) {
if (!fProgressDialogStarted) {
fProgressDialog->ExecPlugin(5, this, selec.Data(), dsz,
first, nentries);
fProgressDialogStarted = kTRUE;
} else {
ResetProgressDialog(selec.Data(), dsz, first, nentries);
}
}
ResetBit(kUsingSessionGui);
}
if (!(pq->IsDraw()))
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
if (qtag) {
qtag->SetTitle(Form("%s:%s",pq->GetTitle(),pq->GetName()));
} else {
fPlayer->AddInput(new TNamed("PROOF_QueryTag",
Form("%s:%s",pq->GetTitle(),pq->GetName())));
}
SetRunStatus(TProof::kRunning);
TSignalHandler *sh = 0;
if (fSync) {
if (gApplication)
sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
}
TList *startedWorkers = 0;
if (fForkStartup) {
startedWorkers = new TList;
startedWorkers->SetOwner(kFALSE);
SetupWorkers(1, startedWorkers);
}
Long64_t rv = 0;
if (!(pq->IsDraw())) {
rv = fPlayer->Process(dset, selec, option, nentries, first);
} else {
rv = fPlayer->DrawSelect(dset, varexp, selection, option, nentries, first);
}
if (fSync) {
if (fForkStartup && startedWorkers) {
RemoveWorkers(startedWorkers);
SafeDelete(startedWorkers);
}
if (sh)
gSystem->AddSignalHandler(sh);
if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
Bool_t abort = (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted)
? kTRUE : kFALSE;
if (abort) fPlayer->StopProcess(kTRUE);
Emit("StopProcess(Bool_t)", abort);
}
pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
UpdateDialog();
if (dset && pq->GetInputList()) {
pq->GetInputList()->Add(dset);
if (dset->GetEntryList())
pq->GetInputList()->Add(dset->GetEntryList());
}
AskStatistics();
if (!(pq->IsDraw())) {
if (fQMgr->FinalizeQuery(pq, this, fPlayer)) {
if (!strcmp(gEnv->GetValue("ProofLite.AutoSaveQueries", "off"), "on"))
fQMgr->SaveQuery(pq, -1);
}
}
if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
if (fQMgr) fQMgr->RemoveQuery(pq);
} else {
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
if (!(pq->IsDraw())) {
if (fQMgr->Queries()) {
TQueryResult *pqr = pq->CloneInfo();
if (pqr)
fQMgr->Queries()->Add(pqr);
fQMgr->Queries()->Remove(pq);
}
}
}
}
return rv;
}
Int_t TProofLite::CreateSymLinks(TList *files)
{
Int_t rc = 0;
if (files) {
TIter nxf(files);
TObjString *os = 0;
while ((os = (TObjString *) nxf())) {
TString tgt(os->GetName());
gSystem->ExpandPathName(tgt);
TIter nxw(fActiveSlaves);
TSlave *wrk = 0;
while ((wrk = (TSlave *) nxw())) {
TString lnk = Form("%s/%s", wrk->GetWorkDir(), gSystem->BaseName(os->GetName()));
gSystem->Unlink(lnk);
if (gSystem->Symlink(tgt, lnk) != 0) {
rc++;
Warning("CreateSymLinks", "problems creating sym link: %s", lnk.Data());
}
}
}
} else {
Warning("CreateSymLinks", "files list is undefined");
}
return rc;
}
Int_t TProofLite::InitDataSetManager()
{
fDataSetManager = 0;
TString user("???"), group("default");
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
user = pw->fUser;
delete pw;
}
TPluginHandler *h = 0;
TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
if (!dsm.IsNull()) {
if (gROOT->GetPluginManager()) {
h = gROOT->GetPluginManager()->FindHandler("TProofDataSetManager", dsm);
if (h && h->LoadPlugin() != -1) {
fDataSetManager =
reinterpret_cast<TProofDataSetManager*>(h->ExecPlugin(3, group.Data(),
user.Data(), dsm.Data()));
}
}
}
if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("InitDataSetManager", "dataset manager plug-in initialization failed");
SafeDelete(fDataSetManager);
}
if (!fDataSetManager) {
TString opts("As:");
TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
if (dsetdir.IsNull()) {
dsetdir = fDataSetDir;
opts += "Sb:";
}
if (!h) {
h = gROOT->GetPluginManager()->FindHandler("TProofDataSetManager", "file");
if (h && h->LoadPlugin() == -1) h = 0;
}
if (h) {
fDataSetManager = reinterpret_cast<TProofDataSetManager*>(h->ExecPlugin(3,
group.Data(), user.Data(),
Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
}
if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("InitDataSetManager", "default dataset manager plug-in initialization failed");
SafeDelete(fDataSetManager);
}
}
if (gDebug > 0 && fDataSetManager) {
Info("InitDataSetManager", "datasetmgr Cq: %d, Ar: %d, Av: %d, As: %d, Sb: %d",
fDataSetManager->TestBit(TProofDataSetManager::kCheckQuota),
fDataSetManager->TestBit(TProofDataSetManager::kAllowRegister),
fDataSetManager->TestBit(TProofDataSetManager::kAllowVerify),
fDataSetManager->TestBit(TProofDataSetManager::kAllowStaging),
fDataSetManager->TestBit(TProofDataSetManager::kIsSandbox));
}
return (fDataSetManager ? 0 : -1);
}
void TProofLite::ShowCache(Bool_t)
{
if (!IsValid()) return;
Printf("*** Local file cache %s ***", fCacheDir.Data());
gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
}
void TProofLite::ClearCache(const char *file)
{
if (!IsValid()) return;
fCacheLock->Lock();
if (!file || strlen(file) <= 0) {
gSystem->Exec(Form("%s %s/*", kRM, fCacheDir.Data()));
} else {
gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file));
}
fCacheLock->Unlock();
}
Int_t TProofLite::CleanupSandbox()
{
Int_t maxold = gEnv->GetValue("Proof.MaxOldSessions", 10);
if (maxold < 0) return 0;
TSortedList *olddirs = new TSortedList(kFALSE);
TString sandbox = gSystem->DirName(fWorkDir.Data());
void *dirp = gSystem->OpenDirectory(sandbox);
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, "session-", 8) && !strstr(e, GetName())) {
TString d(e);
Int_t i = d.Last('-');
if (i != kNPOS) d.Remove(i);
i = d.Last('-');
if (i != kNPOS) d.Remove(0,i+1);
TString path = Form("%s/%s", sandbox.Data(), e);
olddirs->Add(new TNamed(d, path));
}
}
gSystem->FreeDirectory(dirp);
}
Bool_t notify = kTRUE;
while (olddirs->GetSize() > maxold) {
if (notify && gDebug > 0)
Printf("Cleaning sandbox at: %s", sandbox.Data());
notify = kFALSE;
TNamed *n = (TNamed *) olddirs->Last();
if (n) {
gSystem->Exec(Form("%s %s", kRM, n->GetTitle()));
olddirs->Remove(n);
delete n;
}
}
olddirs->SetOwner();
delete olddirs;
return 0;
}
TList *TProofLite::GetListOfQueries(Option_t *opt)
{
Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
TList *ql = new TList;
Int_t ntot = 0, npre = 0, ndraw= 0;
if (fQMgr) {
if (all) {
TString qdir = fQueryDir;
Int_t idx = qdir.Index("session-");
if (idx != kNPOS)
qdir.Remove(idx);
fQMgr->ScanPreviousQueries(qdir);
if (fQMgr->PreviousQueries()) {
TIter nxq(fQMgr->PreviousQueries());
TProofQueryResult *pqr = 0;
while ((pqr = (TProofQueryResult *)nxq())) {
ntot++;
pqr->fSeqNum = ntot;
ql->Add(pqr);
}
}
}
npre = ntot;
if (fQMgr->Queries()) {
TIter nxq(fQMgr->Queries());
TProofQueryResult *pqr = 0;
TQueryResult *pqm = 0;
while ((pqr = (TProofQueryResult *)nxq())) {
ntot++;
pqm = pqr->CloneInfo();
pqm->fSeqNum = ntot;
ql->Add(pqm);
}
}
ndraw = fQMgr->DrawQueries();
}
fOtherQueries = npre;
fDrawQueries = ndraw;
if (fQueries) {
fQueries->Delete();
delete fQueries;
fQueries = 0;
}
fQueries = ql;
return fQueries;
}
Bool_t TProofLite::RegisterDataSet(const char *uri,
TFileCollection *dataSet, const char* optStr)
{
if (!fDataSetManager) {
Info("RegisterDataSet", "dataset manager not available");
return kFALSE;
}
if (!uri || strlen(uri) <= 0) {
Info("RegisterDataSet", "specifying a dataset name is mandatory");
return kFALSE;
}
Bool_t result = kTRUE;
if (fDataSetManager->TestBit(TProofDataSetManager::kAllowRegister)) {
if (!dataSet || dataSet->GetList()->GetSize() == 0) {
Error("RegisterDataSet", "can not save an empty list.");
result = kFALSE;
}
result = (fDataSetManager->RegisterDataSet(uri, dataSet, optStr) == 0)
? kTRUE : kFALSE;
} else {
Info("RegisterDataSets", "dataset registration not allowed");
result = kFALSE;
}
if (!result)
Error("RegisterDataSet", "dataset was not saved");
return result;
}
TMap *TProofLite::GetDataSets(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("GetDataSets", "dataset manager not available");
return (TMap *)0;
}
UInt_t opt = (UInt_t)TProofDataSetManager::kExport;
return fDataSetManager->GetDataSets(uri, opt);
}
void TProofLite::ShowDataSets(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("GetDataSet", "dataset manager not available");
return;
}
UInt_t opt = (UInt_t)TProofDataSetManager::kPrint;
fDataSetManager->GetDataSets(uri, opt);
}
TFileCollection *TProofLite::GetDataSet(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("GetDataSet", "dataset manager not available");
return (TFileCollection *)0;
}
if (!uri || strlen(uri) <= 0) {
Info("GetDataSet", "specifying a dataset name is mandatory");
return 0;
}
return fDataSetManager->GetDataSet(uri);
}
Int_t TProofLite::RemoveDataSet(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("RemoveDataSet", "dataset manager not available");
return -1;
}
if (fDataSetManager->TestBit(TProofDataSetManager::kAllowRegister)) {
if (!fDataSetManager->RemoveDataSet(uri)) {
return -1;
}
} else {
Info("RemoveDataSet", "dataset creation / removal not allowed");
return -1;
}
return 0;
}
Int_t TProofLite::VerifyDataSet(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("VerifyDataSet", "dataset manager not available");
return -1;
}
Int_t rc = -1;
if (fDataSetManager->TestBit(TProofDataSetManager::kAllowVerify)) {
rc = fDataSetManager->ScanDataSet(uri);
} else {
Info("VerifyDataSet", "dataset verification not allowed");
return -1;
}
return rc;
}
void TProofLite::SendInputDataFile()
{
TString dataFile;
PrepareInputDataFile(dataFile);
if (dataFile.Length() > 0) {
if (!dataFile.BeginsWith(fCacheDir)) {
TString dst;
dst.Form("%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
if (!gSystem->AccessPathName(dst))
gSystem->Unlink(dst);
gSystem->CopyFile(dataFile, dst);
}
AddInput(new TNamed("PROOF_InputDataFile", Form("%s", gSystem->BaseName(dataFile))));
}
}
Int_t TProofLite::Remove(const char *ref, Bool_t all)
{
PDB(kGlobal, 1)
Info("Remove", "Enter: %s, %d", ref, all);
if (all) {
if (fPlayer)
fPlayer->RemoveQueryResult(ref);
}
TString queryref(ref);
if (queryref == "cleanupdir") {
Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
Info("Remove", "%d directories removed", nd);
return 0;
}
if (fQMgr) {
TProofLockPath *lck = 0;
if (fQMgr->LockSession(queryref, &lck) == 0) {
fQMgr->RemoveQuery(queryref, 0);
if (lck) {
gSystem->Unlink(lck->GetName());
SafeDelete(lck);
}
return 0;
}
} else {
Warning("Remove", "query result manager undefined!");
}
Info("Remove",
"query %s could not be removed (unable to lock session)", queryref.Data());
return -1;
}
TTree *TProofLite::GetTreeHeader(TDSet *dset)
{
TTree *t = 0;
if (!dset) {
Error("GetTreeHeader", "undefined TDSet");
return t;
}
dset->Reset();
TDSetElement *e = dset->Next();
Long64_t entries = 0;
TFile *f = 0;
if (!e) {
PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
} else {
f = TFile::Open(e->GetFileName());
t = 0;
if (f) {
t = (TTree*) f->Get(e->GetObjName());
if (t) {
t->SetMaxVirtualSize(0);
t->DropBaskets();
entries = t->GetEntries();
while ((e = dset->Next()) != 0) {
TFile *f1 = TFile::Open(e->GetFileName());
if (f1) {
TTree *t1 = (TTree*) f1->Get(e->GetObjName());
if (t1) {
entries += t1->GetEntries();
delete t1;
}
delete f1;
}
}
t->SetMaxEntryLoop(entries);
}
}
}
return t;
}
Last change: Mon Nov 24 08:19:57 2008
Last generated: 2008-11-24 08:19
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.