#include "TProofPlayerLite.h"
#include "MessageTypes.h"
#include "TDSet.h"
#include "TDSetProxy.h"
#include "TEntryList.h"
#include "TEventList.h"
#include "TList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TPerfStats.h"
#include "TProofLite.h"
#include "TProofDebug.h"
#include "TProofServ.h"
#include "TROOT.h"
#include "TSelector.h"
#include "TVirtualPacketizer.h"
Int_t TProofPlayerLite::MakeSelector(const char *selfile)
{
fSelectorClass = 0;
SafeDelete(fSelector);
if (!selfile || strlen(selfile) <= 0) {
Error("MakeSelector", "input file path or name undefined");
return -1;
}
if (!strchr(gSystem->BaseName(selfile), '.')) {
if (gDebug > 1)
Info("MakeSelector", "selector name '%s' does not contain a '.':"
" no file to check, it will be loaded from a library", selfile);
if (!(fSelector = TSelector::GetSelector(selfile))) {
Error("MakeSelector", "could not create a %s selector", selfile);
return -1;
}
return 0;
}
TString cacheDir = ((TProofLite *)fProof)->fCacheDir;
gSystem->ExpandPathName(cacheDir);
TProofLockPath *cacheLock = ((TProofLite *)fProof)->fCacheLock;
TString name = selfile;
TString acmode, args, io;
name = gSystem->SplitAclicMode(name, acmode, args, io);
PDB(kGlobal,1)
Info("MakeSelector", "enter: names: %s, %s", selfile, name.Data());
if (gSystem->AccessPathName(name, kReadPermission)) {
Error("MakeSelector", "implementation file %s not found or not readable", name.Data());
return -1;
}
Int_t dot = name.Last('.');
const char *hext[] = { ".h", ".hh", "" };
TString hname, checkedext;
Int_t i = 0;
while (strlen(hext[i]) > 0) {
hname = name(0, dot);
hname += hext[i];
if (!gSystem->AccessPathName(hname, kReadPermission))
break;
if (!checkedext.IsNull()) checkedext += ",";
checkedext += hext[i];
hname = "";
}
if (hname.IsNull()) {
Error("MakeSelector", "header file for %s not found or not readable "
"(checked extensions: %s)", name.Data(), checkedext.Data());
return -1;
}
cacheLock->Lock();
Bool_t useCacheBinaries = kFALSE;
TString cachedname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(name));
TString cachedhname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
if (!gSystem->AccessPathName(cachedname, kReadPermission) &&
!gSystem->AccessPathName(cachedhname, kReadPermission)) {
TMD5 *md5 = TMD5::FileChecksum(name);
TMD5 *md5cache = TMD5::FileChecksum(cachedname);
TMD5 *md5h = TMD5::FileChecksum(hname);
TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
if (md5 && md5cache && md5h && md5hcache &&
(*md5 == *md5cache) && (*md5h == *md5hcache))
useCacheBinaries = kTRUE;
}
TString vername(Form(".%s", name.Data()));
dot = vername.Last('.');
if (dot != kNPOS)
vername.Remove(dot);
vername += ".binversion";
Bool_t savever = kFALSE;
if (useCacheBinaries) {
TString v;
Int_t rev = -1;
FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "r");
if (f) {
TString r;
v.Gets(f);
r.Gets(f);
rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
fclose(f);
}
if (!f || v != gROOT->GetVersion() ||
(gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision()))
useCacheBinaries = kFALSE;
}
TString binname = gSystem->BaseName(name);
dot = binname.Last('.');
if (dot != kNPOS)
binname.Replace(dot,1,"_");
binname += ".";
FileStat_t stlocal, stcache;
void *dirp = 0;
if (useCacheBinaries) {
dirp = gSystem->OpenDirectory(cacheDir);
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, binname.Data(), binname.Length())) {
TString fncache = Form("%s/%s", cacheDir.Data(), e);
Bool_t docp = kTRUE;
if (!gSystem->GetPathInfo(fncache, stcache)) {
Int_t rc = gSystem->GetPathInfo(e, stlocal);
if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
docp = kFALSE;
if (docp) {
gSystem->Exec(Form("%s %s", kRM, e));
PDB(kGlobal,2)
Info("MakeSelector",
"retrieving %s from cache", fncache.Data());
gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
}
}
}
}
gSystem->FreeDirectory(dirp);
}
}
cacheLock->Unlock();
if (!(fSelector = TSelector::GetSelector(selfile))) {
Error("MakeSelector", "could not create a selector from %s", selfile);
return -1;
}
TList *cachedFiles = new TList;
cacheLock->Lock();
dirp = gSystem->OpenDirectory(".");
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, binname.Data(), binname.Length())) {
Bool_t docp = kTRUE;
if (!gSystem->GetPathInfo(e, stlocal)) {
TString fncache = Form("%s/%s", cacheDir.Data(), e);
Int_t rc = gSystem->GetPathInfo(fncache, stcache);
if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
docp = kFALSE;
if (docp) {
gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
PDB(kGlobal,2)
Info("MakeSelector","caching %s ...", e);
gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
savever = kTRUE;
}
cachedFiles->Add(new TObjString(fncache.Data()));
}
}
}
gSystem->FreeDirectory(dirp);
}
if (savever) {
FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "w");
if (f) {
fputs(gROOT->GetVersion(), f);
fputs(Form("\n%d",gROOT->GetSvnRevision()), f);
fclose(f);
}
}
if (!useCacheBinaries) {
gSystem->Exec(Form("%s %s", kRM, cachedname.Data()));
PDB(kGlobal,2)
Info("MakeSelector","caching %s ...", name.Data());
gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
gSystem->Exec(Form("%s %s", kRM, cachedhname.Data()));
PDB(kGlobal,2)
Info("MakeSelector","caching %s ...", hname.Data());
gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
}
cachedFiles->Add(new TObjString(cachedname.Data()));
cachedFiles->Add(new TObjString(cachedhname.Data()));
cacheLock->Unlock();
((TProofLite *)fProof)->CreateSymLinks(cachedFiles);
cachedFiles->SetOwner();
delete cachedFiles;
return 0;
}
Long64_t TProofPlayerLite::Process(TDSet *dset, const char *selector_file,
Option_t *option, Long64_t nentries,
Long64_t first)
{
PDB(kGlobal,1) Info("Process","Enter");
fDSet = dset;
fExitStatus = kFinished;
if (!fProgressStatus) {
Error("Process", "No progress status");
return -1;
}
fProgressStatus->Reset();
if (!fOutput)
fOutput = new TList;
else
fOutput->Clear();
TPerfStats::Setup(fInput);
TPerfStats::Start(fInput, fOutput);
TMessage mesg(kPROOF_PROCESS);
TString fn(gSystem->BaseName(selector_file));
Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
if (fOutputLists) {
fOutputLists->Delete();
delete fOutputLists;
fOutputLists = 0;
}
if (!sync) {
gSystem->RedirectOutput(fProof->fLogFileName);
Printf(" ");
Info("Process","starting new query");
}
if (MakeSelector(selector_file) != 0) {
if (!sync)
gSystem->RedirectOutput(0);
return -1;
}
fSelectorClass = fSelector->IsA();
fSelector->SetInputList(fInput);
fSelector->SetOption(option);
PDB(kLoop,1) Info("Process","Call Begin(0)");
fSelector->Begin(0);
PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
TDSet *set = new TDSetProxy(dset->GetType(), dset->GetObjName(),
dset->GetDirectory());
if (dset->TestBit(TDSet::kEmpty))
set->SetBit(TDSet::kEmpty);
if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
Error("Process", "cannot init the packetizer");
fExitStatus = kAborted;
return -1;
}
first = 0;
if (!fProof->GetParameter("PROOF_MemLogFreq")){
Long64_t memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
memlogfreq = (memlogfreq > 0) ? memlogfreq : 1;
fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
}
if (!sync)
gSystem->RedirectOutput(0);
TCleanup clean(this);
SetupFeedback();
TString opt = option;
Long64_t num = (fProof->IsParallel()) ? -1 : nentries;
Long64_t fst = (fProof->IsParallel()) ? -1 : first;
TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
: (TEntryList *)0;
TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
: (TEventList *)0;
PDB(kGlobal,1) Info("Process","Calling Broadcast");
mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
fProof->Broadcast(mesg);
fProof->fRedirLog = kTRUE;
if (!sync) {
PDB(kGlobal,1) Info("Process","Asynchronous processing:"
" activating CollectInputFrom");
fProof->Activate();
return fProof->fSeqNum;
} else {
PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
gSystem->RedirectOutput(fProof->fLogFileName);
fProof->Collect();
gSystem->RedirectOutput(0);
fProof->fRedirLog = kFALSE;
HandleTimer(0);
if (fPacketizer && fQuery)
fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
fPacketizer->GetInitTime(),
fPacketizer->GetProcTime());
StopFeedback();
if (GetExitStatus() != TProofPlayer::kAborted)
return Finalize(kFALSE, sync);
else
return -1;
}
}
Long64_t TProofPlayerLite::Finalize(Bool_t force, Bool_t sync)
{
if (fOutputLists == 0) {
if (force && fQuery)
return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
fQuery->GetName()), force);
}
Long64_t rv = 0;
TPerfStats::Stop();
MergeOutputFiles();
if (fExitStatus != kAborted) {
if (!sync) {
if (ReinitSelector(fQuery) == -1) {
Info("Finalize", "problems reinitializing selector \"%s\"",
fQuery->GetSelecImp()->GetName());
return -1;
}
}
fSelector->SetInputList(fInput);
TIter next(fOutput);
TList *output = fSelector->GetOutputList();
while(TObject* obj = next()) {
if (fProof->IsParallel() || DrawCanvas(obj) == 1)
output->Add(obj);
}
PDB(kLoop,1) Info("Finalize","Call Terminate()");
fOutput->Clear("nodelete");
fSelector->Terminate();
rv = fSelector->GetStatus();
TIter it(output);
while(TObject* o = it()) {
fOutput->Add(o);
}
if (fQuery) {
fQuery->SetOutputList(fOutput);
fQuery->SetFinalized();
} else {
Warning("Finalize","current TQueryResult object is undefined!");
}
output->SetOwner(kFALSE);
SafeDelete(fSelector);
fOutput->SetOwner(kFALSE);
SafeDelete(fOutput);
} else {
fOutput->SetOwner();
SafeDelete(fSelector);
}
PDB(kGlobal,1) Info("Finalize","exit");
return rv;
}
Bool_t TProofPlayerLite::HandleTimer(TTimer *)
{
PDB(kFeedback,2)
Info("HandleTimer","Entry: %p", fFeedbackTimer);
if (fFeedbackTimer == 0) return kFALSE;
TList *fb = new TList;
fb->SetOwner();
TIter next(fFeedback);
while( TObjString *name = (TObjString*) next() ) {
TObject *o = fOutput->FindObject(name->GetName());
if (o != 0) fb->Add(o->Clone());
}
if (fb->GetSize() > 0)
StoreFeedback(this, fb);
else
delete fb;
if (fFeedbackLists == 0) {
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
fb = MergeFeedback();
Feedback(fb);
fb->SetOwner();
delete fb;
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
return kFALSE;
}
void TProofPlayerLite::SetupFeedback()
{
fFeedback = (TList*) fInput->FindObject("FeedbackList");
if (fFeedback) {
PDB(kFeedback,1)
Info("SetupFeedback","\"FeedbackList\" found: %d objects", fFeedback->GetSize());
} else {
PDB(kFeedback,1)
Info("SetupFeedback","\"FeedbackList\" NOT found");
}
if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
SafeDelete(fFeedbackTimer);
fFeedbackPeriod = 2000;
TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
fFeedbackTimer = new TTimer;
fFeedbackTimer->SetObject(this);
fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
}
void TProofPlayerLite::StoreFeedback(TObject *slave, TList *out)
{
PDB(kFeedback,1)
Info("StoreFeedback","Enter (%p,%p,%d)", fFeedbackLists, out, (out ? out->GetSize() : -1));
if ( out == 0 ) {
PDB(kFeedback,1)
Info("StoreFeedback","Leave (empty)");
return;
}
if (fFeedbackLists == 0) {
PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
fFeedbackLists = new TList;
fFeedbackLists->SetOwner();
}
TIter next(out);
out->SetOwner(kFALSE);
TObject *obj;
while( (obj = next()) ) {
PDB(kFeedback,2)
Info("StoreFeedback","Find '%s'", obj->GetName() );
TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
if ( map == 0 ) {
PDB(kFeedback,2)
Info("StoreFeedback","Map not Found (creating)", obj->GetName() );
map = new TMap;
map->SetName(obj->GetName());
fFeedbackLists->Add(map);
} else {
PDB(kFeedback,2)
Info("StoreFeedback","removing previous value");
if (map->GetValue(slave))
delete map->GetValue(slave);
map->Remove(slave);
}
map->Add(slave, obj);
}
delete out;
PDB(kFeedback,1)
Info("StoreFeedback","Leave");
}
Last change: Thu Oct 23 08:20:21 2008
Last generated: 2008-10-23 08:20
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.