#include "TVirtualPacketizer.h"
#include "TEnv.h"
#include "TFile.h"
#include "TTree.h"
#include "TKey.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TEntryList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TObjString.h"
#include "TProof.h"
#include "TProofDebug.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TMath.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TPerfStats.h"
ClassImp(TVirtualPacketizer)
TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st)
{
fProgressStatus = st;
if (!fProgressStatus) {
Error("TVirtualPacketizer", "No progress status");
return;
}
fTotalEntries = 0;
fValid = kTRUE;
fStop = kFALSE;
fFailedPackets = 0;
TTime tnow = gSystem->Now();
fStartTime = Long_t(tnow);
SetBit(TVirtualPacketizer::kIsInitializing);
ResetBit(TVirtualPacketizer::kIsDone);
fInitTime = 0;
fProcTime = 0;
fTimeUpdt = -1.;
fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb");
fCircN = 10;
TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
fCircProg->SetCircular(fCircN);
Long_t period = 500;
TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
fProgress = new TTimer;
fProgress->SetObject(this);
fProgress->Start(period, kFALSE);
TString estopt;
TProof::GetParameter(input, "PROOF_RateEstimation", estopt);
if (estopt.IsNull()) {
estopt = gEnv->GetValue("Proof.RateEstimation", "");
}
fUseEstOpt = kEstOff;
if (estopt == "current")
fUseEstOpt = kEstCurrent;
else if (estopt == "average")
fUseEstOpt = kEstAverage;
}
TVirtualPacketizer::~TVirtualPacketizer()
{
SafeDelete(fCircProg);
SafeDelete(fProgress);
SafeDelete(fFailedPackets);
fProgressStatus = 0;
}
Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
{
Long64_t entries;
TFile *file = TFile::Open(e->GetFileName());
if ( file->IsZombie() ) {
Error("GetEntries","Cannot open file: %s (%s)",
e->GetFileName(), strerror(file->GetErrno()) );
return -1;
}
TDirectory *dirsave = gDirectory;
if ( ! file->cd(e->GetDirectory()) ) {
Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
delete file;
return -1;
}
TDirectory *dir = gDirectory;
dirsave->cd();
if ( tree ) {
TKey *key = dir->GetKey(e->GetObjName());
if ( key == 0 ) {
Error("GetEntries","Cannot find tree \"%s\" in %s",
e->GetObjName(), e->GetFileName() );
delete file;
return -1;
}
TTree *t = (TTree *) key->ReadObj();
if ( t == 0 ) {
delete file;
return -1;
}
entries = (Long64_t) t->GetEntries();
delete t;
} else {
TList *keys = dir->GetListOfKeys();
entries = keys->GetSize();
}
delete file;
return entries;
}
TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
{
AbstractMethod("GetNextPacket");
return 0;
}
void TVirtualPacketizer::StopProcess(Bool_t )
{
fStop = kTRUE;
}
TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
Long64_t first, Long64_t num)
{
TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
base->GetDirectory(), first, num);
TList *friends = base->GetListOfFriends();
if (friends) {
TIter nxf(friends);
TPair *p = 0;
while ((p = (TPair *) nxf())) {
TDSetElement *fe = (TDSetElement *) p->Key();
elem->AddFriend(new TDSetElement(fe->GetFileName(), fe->GetObjName(),
fe->GetDirectory(), first, num),
((TObjString *)(p->Value()))->GetName());
}
}
return elem;
}
Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
{
if (fProgress == 0 || TestBit(TVirtualPacketizer::kIsDone))
return kFALSE;
TTime tnow = gSystem->Now();
Float_t now = (Float_t) (Long_t(tnow) - fStartTime) / (Double_t)1000.;
Long64_t estent = GetEntriesProcessed();
Long64_t estmb = GetBytesRead();
Float_t evtrti = -1., mbrti = -1.;
if (TestBit(TVirtualPacketizer::kIsInitializing)) {
fInitTime = now;
} else {
if (fCircProg->GetEntries() <= 0) {
fCircProg->Fill((Double_t)0., 0., 0.);
fInitTime = (now + fInitTime) / 2.;
}
fTimeUpdt = now - fProcTime;
fProcTime = now - fInitTime;
GetEstEntriesProcessed(fProcTime, estent, estmb);
Double_t evts = (Double_t) estent;
Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.;
fCircProg->Fill((Double_t)fProcTime, evts, mbs);
if (fCircProg->GetEntries() > 4) {
Double_t *ar = fCircProg->GetArgs();
fCircProg->GetEntry(0);
Double_t dt = (Double_t)fProcTime - ar[0];
evtrti = (dt > 0) ? (Float_t) (evts - ar[1]) / dt : -1. ;
mbrti = (dt > 0) ? (Float_t) (mbs - ar[2]) / dt : -1. ;
if (gPerfStats != 0)
gPerfStats->RateEvent((Double_t)fProcTime, dt,
(Long64_t) (evts - ar[1]),
(Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)));
}
if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
SetBit(TVirtualPacketizer::kIsDone);
}
if (gProofServ) {
TMessage m(kPROOF_PROGRESS);
if (gProofServ->GetProtocol() > 11) {
m << fTotalEntries << estent << estmb << fInitTime << fProcTime
<< evtrti << mbrti;
} else {
m << fTotalEntries << GetEntriesProcessed();
}
gProofServ->GetSocket()->Send(m);
} else {
if (gProof && gProof->GetPlayer()) {
gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
fInitTime, fProcTime, evtrti, mbrti);
}
}
if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
SetBit(TVirtualPacketizer::kIsDone);
return kFALSE;
}
void TVirtualPacketizer::SetInitTime()
{
if (TestBit(TVirtualPacketizer::kIsInitializing)) {
fInitTime = (Float_t) (Long_t(gSystem->Now()) - fStartTime) / (Double_t)1000.;
ResetBit(TVirtualPacketizer::kIsInitializing);
}
PDB(kPacketizer,2)
Info("SetInitTime","fInitTime: %f s", fInitTime);
}
Last change: Thu Oct 23 08:20:33 2008
Last generated: 2008-10-23 08:20
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.