#include "TPacketizerAdaptive.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEnv.h"
#include "TEntryList.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TRandom.h"
#include "TMath.h"
#include "TObjString.h"
#include "TList.h"
class TPacketizerAdaptive::TFileStat : public TObject {
private:
Bool_t fIsDone;
TFileNode *fNode;
TDSetElement *fElement;
Long64_t fNextEntry;
public:
TFileStat(TFileNode *node, TDSetElement *elem);
Bool_t IsDone() const {return fIsDone;}
void SetDone() {fIsDone = kTRUE;}
TFileNode *GetNode() const {return fNode;}
TDSetElement *GetElement() const {return fElement;}
Long64_t GetNextEntry() const {return fNextEntry;}
void MoveNextEntry(Long64_t step) {fNextEntry += step;}
};
TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
: fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
{
}
class TPacketizerAdaptive::TFileNode : public TObject {
private:
TString fNodeName;
TList *fFiles;
TObject *fUnAllocFileNext;
TList *fActFiles;
TObject *fActFileNext;
Int_t fMySlaveCnt;
Int_t fExtSlaveCnt;
Int_t fRunSlaveCnt;
Long64_t fProcessed;
Long64_t fEvents;
public:
TFileNode(const char *name);
~TFileNode() { delete fFiles; delete fActFiles; }
void IncMySlaveCnt() { fMySlaveCnt++; }
Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
void IncRunSlaveCnt() { fRunSlaveCnt++; }
void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
Bool_t IsSortable() const { return kTRUE; }
Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
void IncProcessed(Long64_t nEvents)
{ fProcessed += nEvents; }
Long64_t GetProcessed() const { return fProcessed; }
void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
Long64_t GetEventsLeftPerSlave() const
{ return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
const char *GetName() const { return fNodeName.Data(); }
Long64_t GetNEvents() const { return fEvents; }
void Add(TDSetElement *elem)
{
TFileStat *f = new TFileStat(this, elem);
fFiles->Add(f);
if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
}
TFileStat *GetNextUnAlloc()
{
TObject *next = fUnAllocFileNext;
if (next != 0) {
fActFiles->Add(next);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
}
return (TFileStat *) next;
}
TFileStat *GetNextActive()
{
TObject *next = fActFileNext;
if (fActFileNext != 0) {
fActFileNext = fActFiles->After(fActFileNext);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
}
return (TFileStat *) next;
}
void RemoveActive(TFileStat *file)
{
if (fActFileNext == file) fActFileNext = fActFiles->After(file);
fActFiles->Remove(file);
if (fActFileNext == 0) fActFileNext = fActFiles->First();
}
Int_t Compare(const TObject *other) const
{
const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
R__ASSERT(obj != 0);
if (TPacketizerAdaptive::fgStrategy == 1) {
Int_t myVal = GetRunSlaveCnt();
Int_t otherVal = obj->GetRunSlaveCnt();
if (myVal < otherVal) {
return -1;
} else if (myVal > otherVal) {
return 1;
} else {
if ((fEvents - fProcessed) >
(obj->GetNEvents() - obj->GetProcessed())) {
return -1;
} else {
return 1;
}
}
} else {
Int_t myVal = GetSlaveCnt();
Int_t otherVal = obj->GetSlaveCnt();
if (myVal < otherVal) {
return -1;
} else if (myVal > otherVal) {
return 1;
} else {
return 0;
}
}
}
void Print(Option_t *) const
{
cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
<< "\tMySlaveCount " << fMySlaveCnt
<< "\tSlaveCount " << fExtSlaveCnt << endl;
}
void Reset()
{
fUnAllocFileNext = fFiles->First();
fActFiles->Clear();
fActFileNext = 0;
fExtSlaveCnt = 0;
fMySlaveCnt = 0;
fRunSlaveCnt = 0;
}
};
TPacketizerAdaptive::TFileNode::TFileNode(const char *name)
: fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
fExtSlaveCnt(0), fProcessed(0), fEvents(0)
{
fFiles->SetOwner();
fActFiles->SetOwner(kFALSE);
}
class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
friend class TPacketizerAdaptive;
private:
TFileNode *fFileNode;
TFileStat *fCurFile;
TDSetElement *fCurElem;
Long64_t fCurProcessed;
Float_t fCurProcTime;
TList *fDSubSet;
public:
TSlaveStat(TSlave *slave);
~TSlaveStat();
TFileNode *GetFileNode() const { return fFileNode; }
const char *GetName() const { return fSlave->GetName(); }
Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
TFileStat *GetCurFile() { return fCurFile; }
void SetFileNode(TFileNode *node) { fFileNode = node; }
void UpdateRates(TProofProgressStatus *st);
Float_t GetAvgRate() { return fStatus->GetRate(); }
Float_t GetCurRate() {
return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
Int_t GetLocalEventsLeft() {
return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
TList *GetProcessedSubSet() { return fDSubSet; }
TProofProgressStatus *GetProgressStatus() { return fStatus; }
TProofProgressStatus *AddProcessed(TProofProgressStatus *st = 0);
};
TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
: fFileNode(0), fCurFile(0), fCurElem(0),
fCurProcessed(0), fCurProcTime(0)
{
fDSubSet = new TList();
fDSubSet->SetOwner();
fSlave = slave;
fStatus = new TProofProgressStatus();
}
TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
{
SafeDelete(fDSubSet);
SafeDelete(fStatus);
}
void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
{
if (!st) {
Error("UpdateRates", "no status object!");
return;
}
if (fCurFile->IsDone()) {
fCurProcTime = 0;
fCurProcessed = 0;
} else {
fCurProcTime += st->GetProcTime() - GetProcTime();
fCurProcessed += st->GetEntries() - GetEntriesProcessed();
}
fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
SafeDelete(fStatus);
fStatus = st;
}
TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
{
if (fDSubSet && fCurElem) {
if (st)
if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
fDSubSet->Add(fCurElem);
TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
return diff;
} else {
Error("AddProcessed", "Processed subset of current elem undefined");
return 0;
}
}
ClassImp(TPacketizerAdaptive)
Long_t TPacketizerAdaptive::fgMaxSlaveCnt = 2;
Int_t TPacketizerAdaptive::fgPacketAsAFraction = 4;
Double_t TPacketizerAdaptive::fgMinPacketTime = 3;
Int_t TPacketizerAdaptive::fgStrategy = 1;
TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves,
Long64_t first, Long64_t num,
TList *input, TProofProgressStatus *st)
: TVirtualPacketizer(input, st)
{
PDB(kPacketizer,1) Info("TPacketizerAdaptive",
"enter (first %lld, num %lld)", first, num);
fSlaveStats = 0;
fUnAllocated = 0;
fActive = 0;
fFileNodes = 0;
fMaxPerfIdx = 1;
if (!fProgressStatus) {
Error("TPacketizerAdaptive", "No progress status");
return;
}
Int_t strategy = -1;
if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
strategy = gEnv->GetValue("Packetizer.Strategy", 1);
}
if (strategy == 0) {
fgStrategy = 0;
fgMaxSlaveCnt = 4;
Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
} else if (strategy != 1) {
Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
}
Long_t maxSlaveCnt = 0;
if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
if (maxSlaveCnt < 1) {
Info("TPacketizerAdaptive",
"The value of PROOF_MaxSlavesPerNode must be grater than 0");
maxSlaveCnt = 0;
}
} else {
Int_t mxslcnt = -1;
if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
if (mxslcnt < 1) {
Info("TPacketizerAdaptive",
"The value of PROOF_MaxSlavesPerNode must be grater than 0");
mxslcnt = 0;
}
maxSlaveCnt = (Long_t) mxslcnt;
}
}
if (!maxSlaveCnt)
maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
if (maxSlaveCnt > 0) {
fgMaxSlaveCnt = maxSlaveCnt;
Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
fgMaxSlaveCnt);
} else {
SysInfo_t si;
gSystem->GetSysInfo(&si);
if (si.fCpus > 2)
fgMaxSlaveCnt = si.fCpus;
}
fForceLocal = kFALSE;
Int_t forceLocal = 0;
if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
if (forceLocal == 1)
fForceLocal = kTRUE;
else
Info("TPacketizerAdaptive",
"The only accepted value of PROOF_ForceLocal parameter is 1 !");
}
Int_t packetAsAFraction = 0;
if (TProof::GetParameter(input, "PROOF_PacketAsAFraction",
packetAsAFraction) == 0) {
if (packetAsAFraction > 0) {
fgPacketAsAFraction = packetAsAFraction;
Info("TPacketizerAdaptive",
"using alternate fraction of query time as a packet size: %ld",
packetAsAFraction);
} else
Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
}
Double_t minPacketTime = 0;
if (TProof::GetParameter(input, "PROOF_MinPacketTime",
minPacketTime) == 0) {
Info("TPacketizerAdaptive", "using alternate minimum time of a packet: %f",
minPacketTime);
fgMinPacketTime = (Int_t) minPacketTime;
}
Double_t baseLocalPreference = 1.2;
TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference);
fBaseLocalPreference = (Float_t)baseLocalPreference;
fFileNodes = new TList;
fFileNodes->SetOwner();
fUnAllocated = new TList;
fUnAllocated->SetOwner(kFALSE);
fActive = new TList;
fActive->SetOwner(kFALSE);
fValid = kTRUE;
dset->Reset();
TDSetElement *e;
while ((e = (TDSetElement*)dset->Next())) {
if (e->GetValid()) continue;
TUrl url = e->GetFileName();
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4)) ) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if (node == 0) {
node = new TFileNode(host);
fFileNodes->Add(node);
}
node->Add( e );
}
fSlaveStats = new TMap;
fSlaveStats->SetOwner(kFALSE);
TSlave *slave;
TIter si(slaves);
while ((slave = (TSlave*) si.Next())) {
fSlaveStats->Add( slave, new TSlaveStat(slave) );
fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
slave->GetPerfIdx() : fMaxPerfIdx;
}
Reset();
ValidateFiles(dset, slaves);
if (!fValid) return;
Int_t files = 0;
fTotalEntries = 0;
fUnAllocated->Clear();
fActive->Clear();
fFileNodes->Clear();
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing Range: First %lld, Num %lld", first, num);
dset->Reset();
Long64_t cur = 0;
while (( e = (TDSetElement*)dset->Next())) {
if (!e->GetValid()) continue;
TUrl url = e->GetFileName();
Long64_t eFirst = e->GetFirst();
Long64_t eNum = e->GetNum();
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: First %lld, Num %lld (cur %lld)", eFirst, eNum, cur);
if (!e->GetEntryList()) {
if (cur + eNum < first) {
cur += eNum;
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: skip element cur %lld", cur);
continue;
}
if (num != -1 && (first+num <= cur)) {
cur += eNum;
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: drop element cur %lld", cur);
continue;
}
if (num != -1 && (first+num < cur+eNum)) {
e->SetNum( first + num - cur );
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: Adjust end %lld", first + num - cur);
}
if (cur < first) {
e->SetFirst( eFirst + (first - cur) );
e->SetNum( e->GetNum() - (first - cur) );
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: Adjust start %lld and end %lld",
eFirst + (first - cur), first + num - cur);
}
cur += eNum;
} else {
Long64_t n = 0;
TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
if (enl) {
n = enl->GetN();
} else {
TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
n = evl ? evl->GetN() : n;
}
if (!n)
continue;
}
PDB(kPacketizer,2)
Info("TPacketizerAdaptive",
"processing element: next cur %lld", cur);
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4)) ) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if ( node == 0 ) {
node = new TFileNode( host );
fFileNodes->Add( node );
}
++files;
fTotalEntries += e->GetNum();
node->Add( e );
node->IncEvents(e->GetNum());
PDB(kPacketizer,2) e->Print("a");
}
TEntryList *enl = dynamic_cast<TEntryList *>(dset->GetEntryList());
if (enl) {
fTotalEntries = enl->GetN();
} else {
TEventList *evl = dynamic_cast<TEventList *>(dset->GetEntryList());
fTotalEntries = evl ? evl->GetN() : fTotalEntries;
}
PDB(kGlobal,1)
Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
fTotalEntries, files, fFileNodes->GetSize());
Reset();
InitStats();
if (!fValid)
SafeDelete(fProgress);
PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
}
TPacketizerAdaptive::~TPacketizerAdaptive()
{
if (fSlaveStats) {
fSlaveStats->DeleteValues();
}
SafeDelete(fSlaveStats);
SafeDelete(fUnAllocated);
SafeDelete(fActive);
SafeDelete(fFileNodes);
}
void TPacketizerAdaptive::InitStats()
{
Int_t noRemoteFiles = 0;
fNEventsOnRemLoc = 0;
Int_t totalNumberOfFiles = 0;
TIter next(fFileNodes);
while (TFileNode *fn = (TFileNode*)next()) {
totalNumberOfFiles += fn->GetNumberOfFiles();
if (fn->GetMySlaveCnt() == 0) {
noRemoteFiles += fn->GetNumberOfFiles();
fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
}
}
if (totalNumberOfFiles == 0) {
Info("TPacketizerAdaptive", "no valid or non-empty file found: setting invalid");
fValid = kFALSE;
return;
}
fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
Info("TPacketizerAdaptive",
"fraction of remote files %f", fFractionOfRemoteFiles);
if (!fValid)
SafeDelete(fProgress);
PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
}
TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node)
{
TFileStat *file = 0;
if (node != 0) {
file = node->GetNextUnAlloc();
if (file == 0) RemoveUnAllocNode(node);
} else {
while (file == 0 && ((node = NextNode()) != 0)) {
file = node->GetNextUnAlloc();
if (file == 0) RemoveUnAllocNode(node);
}
}
if (file != 0) {
if (fActive->FindObject(node) == 0) {
fActive->Add(node);
}
}
return file;
}
TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
{
fUnAllocated->Sort();
PDB(kPacketizer,2) {
fUnAllocated->Print();
}
TFileNode *fn = (TFileNode*) fUnAllocated->First();
if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
PDB(kPacketizer,1) Info("NextNode",
"Reached Slaves per Node Limit (%ld)", fgMaxSlaveCnt);
fn = 0;
}
return fn;
}
void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node)
{
fUnAllocated->Remove(node);
}
TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
{
TFileNode *node;
TFileStat *file = 0;
while (file == 0 && ((node = NextActiveNode()) != 0)) {
file = node->GetNextActive();
if (file == 0) RemoveActiveNode(node);
}
return file;
}
TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
{
fActive->Sort();
PDB(kPacketizer,2) {
Info("NextActiveNode", "enter");
fActive->Print();
}
TFileNode *fn = (TFileNode*) fActive->First();
if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
PDB(kPacketizer,1)
Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fgMaxSlaveCnt);
fn = 0;
}
return fn;
}
void TPacketizerAdaptive::RemoveActive(TFileStat *file)
{
TFileNode *node = file->GetNode();
node->RemoveActive(file);
if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
}
void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node)
{
fActive->Remove(node);
}
void TPacketizerAdaptive::Reset()
{
fUnAllocated->Clear();
fUnAllocated->AddAll(fFileNodes);
fActive->Clear();
TIter files(fFileNodes);
TFileNode *fn;
while ((fn = (TFileNode*) files.Next()) != 0) {
fn->Reset();
}
TIter slaves(fSlaveStats);
TObject *key;
while ((key = slaves.Next()) != 0) {
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
if (fn != 0 ) {
slstat->SetFileNode(fn);
fn->IncMySlaveCnt();
}
slstat->fCurFile = 0;
}
}
void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves)
{
TMap slaves_by_sock;
TMonitor mon;
TList workers;
workers.AddAll(slaves);
TIter si(slaves);
TSlave *slm;
while ((slm = (TSlave*)si.Next()) != 0) {
PDB(kPacketizer,3) Info("ValidateFiles","socket added to monitor: %p (%s)",
slm->GetSocket(), slm->GetName());
mon.Add(slm->GetSocket());
slaves_by_sock.Add(slm->GetSocket(), slm);
}
mon.DeActivateAll();
((TProof*)gProof)->DeActivateAsyncInput();
((TProof*)gProof)->fCurrentMonitor = &mon;
TString msg("Validating files");
UInt_t n = 0;
UInt_t tot = dset->GetListOfElements()->GetSize();
Bool_t st = kTRUE;
while (kTRUE) {
while (TSlave *s = (TSlave *)workers.First()) {
workers.Remove(s);
TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
TFileNode *node = 0;
TFileStat *file = 0;
if ((node = slstat->GetFileNode()) != 0) {
file = GetNextUnAlloc(node);
if (file == 0)
slstat->SetFileNode(0);
}
if (file == 0)
file = GetNextUnAlloc();
if (file != 0) {
RemoveActive(file);
slstat->fCurFile = file;
TDSetElement *elem = file->GetElement();
Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
file->GetNode()->IncExtSlaveCnt(slstat->GetName());
TMessage m(kPROOF_GETENTRIES);
m << dset->IsTree()
<< TString(elem->GetFileName())
<< TString(elem->GetDirectory())
<< TString(elem->GetObjName());
s->GetSocket()->Send( m );
mon.Activate(s->GetSocket());
PDB(kPacketizer,2)
Info("ValidateFiles",
"sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
s->GetOrdinal(), s->GetName(), s->GetSocket(),
dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
elem->GetDirectory(), elem->GetObjName());
} else {
elem->SetTDSetOffset(entries);
if (entries > 0) {
if (!elem->GetEntryList()) {
if (elem->GetFirst() > entries) {
Error("ValidateFiles",
"first (%d) higher then number of entries (%d) in %d",
elem->GetFirst(), entries, elem->GetFileName() );
slstat->fCurFile->SetDone();
elem->Invalidate();
dset->SetBit(TDSet::kSomeInvalid);
}
if (elem->GetNum() == -1) {
elem->SetNum(entries - elem->GetFirst());
} else if (elem->GetFirst() + elem->GetNum() > entries) {
Warning("ValidateFiles", "Num (%lld) + First (%lld) larger then number of"
" keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
entries, elem->GetFileName());
elem->SetNum(entries - elem->GetFirst());
}
PDB(kPacketizer,2)
Info("ValidateFiles",
"found elem '%s' with %lld entries", elem->GetFileName(), entries);
elem->SetValid();
}
}
n++;
gProof->SendDataSetStatus(msg, n, tot, st);
workers.Add(s);
}
}
}
if (mon.GetActive() == 0) break;
PDB(kPacketizer,3) {
Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
TList *act = mon.GetListOfActives();
TIter next(act);
while (TSocket *s = (TSocket*) next()) {
TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
if (sl)
Info("ValidateFiles", " worker-%s (%s)",
sl->GetOrdinal(), sl->GetName());
}
delete act;
}
TSocket *sock = mon.Select();
if (!sock) {
Error("ValidateFiles", "selection has been interrupted - STOP");
mon.DeActivateAll();
fValid = kFALSE;
break;
}
mon.DeActivate(sock);
PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
if (!sock->IsValid()) {
Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
slave->GetOrdinal(), slave->GetName());
((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
fValid = kFALSE;
break;
}
TMessage *reply;
if (sock->Recv(reply) <= 0) {
((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
fValid = kFALSE;
Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
slave->GetOrdinal(), slave->GetName());
continue;
}
if (reply->What() != kPROOF_GETENTRIES) {
Int_t what = reply->What();
((TProof*)gProof)->HandleInputMessage(slave, reply);
if (what == kPROOF_FATAL) {
Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
slave->GetOrdinal(), slave->GetName());
fValid = kFALSE;
} else {
mon.Activate(sock);
}
continue;
}
TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
TDSetElement *e = slavestat->fCurFile->GetElement();
slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
Long64_t entries;
(*reply) >> entries;
if ((reply->BufferSize() > reply->Length())) {
TString objname;
(*reply) >> objname;
e->SetTitle(objname);
}
e->SetTDSetOffset(entries);
if (entries > 0) {
if (!e->GetEntryList()) {
if (e->GetFirst() > entries) {
Error("ValidateFiles",
"first (%d) higher then number of entries (%d) in %d",
e->GetFirst(), entries, e->GetFileName() );
slavestat->fCurFile->SetDone();
e->Invalidate();
dset->SetBit(TDSet::kSomeInvalid);
}
if (e->GetNum() == -1) {
e->SetNum(entries - e->GetFirst());
} else if (e->GetFirst() + e->GetNum() > entries) {
Error("ValidateFiles",
"Num (%d) + First (%d) larger then number of keys/entries (%d) in %s",
e->GetNum(), e->GetFirst(), entries, e->GetFileName() );
e->SetNum(entries - e->GetFirst());
}
e->SetValid();
}
n++;
gProof->SendDataSetStatus(msg, n, tot, st);
} else {
Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
if (gProofServ) {
TMessage m(kPROOF_MESSAGE);
m << TString(Form("Cannot get entries for file: %s - skipping",
e->GetFileName()));
gProofServ->GetSocket()->Send(m);
}
e->Invalidate();
dset->SetBit(TDSet::kSomeInvalid);
}
workers.Add(slave);
}
((TProof*)gProof)->ActivateAsyncInput();
((TProof*)gProof)->fCurrentMonitor = 0;
if (!fValid)
return;
Long64_t offset = 0;
Long64_t newOffset = 0;
TIter next(dset->GetListOfElements());
TDSetElement *el;
while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
if (el->GetValid()) {
newOffset = offset + el->GetTDSetOffset();
el->SetTDSetOffset(offset);
offset = newOffset;
}
}
}
Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr)
{
Long64_t num;
if (fgStrategy == 0) {
Int_t nslaves = fSlaveStats->GetSize();
if (nslaves > 0) {
num = fTotalEntries / (fgPacketAsAFraction * nslaves);
} else {
num = 1;
}
} else {
TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
Float_t rate = slstat->GetCurRate();
if (!rate)
rate = slstat->GetAvgRate();
if (rate) {
Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
Float_t packetTime;
packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fgPacketAsAFraction;
if (packetTime < fgMinPacketTime)
packetTime = fgMinPacketTime;
if (rate < 0.25 * slstat->GetAvgRate())
rate = (rate + slstat->GetAvgRate()) / 2;
num = (Long64_t)(rate * packetTime);
} else {
Int_t packetSize = (fTotalEntries - GetEntriesProcessed())
/ (6 * fgPacketAsAFraction * fSlaveStats->GetSize());
num = Long64_t(packetSize *
((Float_t)slstat->fSlave->GetPerfIdx() / fMaxPerfIdx));
}
}
if (num < 1) num = 1;
return num;
}
Int_t TPacketizerAdaptive::AddProcessed(TSlave *sl,
TProofProgressStatus *status,
Double_t latency,
TList **listOfMissingFiles)
{
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
if (!slstat) {
Error("AddProcessed", "TSlaveStat instance for worker %s not found!",
(sl ? sl->GetName() : "**undef**"));
return -1;
}
if ( slstat->fCurElem != 0 ) {
Long64_t expectedNumEv = slstat->fCurElem->GetNum();
Long64_t numev;
if (status && status->GetEntries() > 0)
numev = status->GetEntries() - slstat->GetEntriesProcessed();
else
numev = 0;
TProofProgressStatus *progress;
if (numev > 0) {
progress = slstat->AddProcessed(status);
if (progress) {
(*fProgressStatus) += *progress;
slstat->UpdateRates(status);
}
} else
progress = new TProofProgressStatus();
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
progress->GetProcTime(),
progress->GetCPUTime(),
progress->GetBytesRead());
if (gPerfStats != 0) {
gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
slstat->fCurElem->GetFileName(),
progress->GetEntries(),
latency,
progress->GetProcTime(),
progress->GetCPUTime(),
progress->GetBytesRead());
}
delete progress;
if (numev != expectedNumEv) {
TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
if (newPacket && numev < newPacket->GetNum()) {
Long64_t first = newPacket->GetFirst();
newPacket->SetFirst(first + numev);
if (listOfMissingFiles && *listOfMissingFiles)
ReassignPacket(newPacket, listOfMissingFiles);
else
Error("AddProcessed", "No list for missing files!");
} else
Error("AddProcessed", "Processed too much?");
}
slstat->fCurElem = 0;
return (expectedNumEv - numev);
} else {
return -1;
}
}
TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r)
{
if ( !fValid ) {
return 0;
}
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
if (!slstat) {
Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
(sl ? sl->GetName() : "**undef**"));
return 0;
}
if ( slstat->fCurElem != 0 ) {
Double_t latency, proctime, proccpu;
TProofProgressStatus *status = 0;
if (sl->GetProtocol() > 18) {
(*r) >> latency;
(*r) >> status;
} else {
Long64_t bytesRead = -1;
Long64_t totalEntries = -1;
(*r) >> latency >> proctime >> proccpu;
if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
Long64_t totev = 0;
if (r->BufferSize() > r->Length()) (*r) >> totev;
status = new TProofProgressStatus(totev, bytesRead, proctime, proccpu);
}
if (AddProcessed(sl, status, latency))
Error("GetNextPacket", "The worker processed diff. no. entries");
if ( fProgressStatus->GetEntries() >= fTotalEntries ) {
if (fProgressStatus->GetEntries() > fTotalEntries)
Error("GetNextPacket", "Processed too many entries!");
HandleTimer(0);
delete fProgress; fProgress = 0;
}
}
if ( fStop ) {
HandleTimer(0);
return 0;
}
TFileStat *file = slstat->fCurFile;
if ( file != 0 && file->IsDone() ) {
file->GetNode()->DecExtSlaveCnt(slstat->GetName());
file->GetNode()->DecRunSlaveCnt();
if (gPerfStats != 0) {
gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
file->GetElement()->GetFileName(), kFALSE);
}
file = 0;
}
Long64_t avgEventsLeftPerSlave =
(fTotalEntries - fProgressStatus->GetEntries()) / fSlaveStats->GetSize();
if (fTotalEntries == fProgressStatus->GetEntries())
return 0;
if ( file == 0) {
Bool_t openLocal;
Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
(0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
if ( slstat->GetFileNode() != 0 ) {
fUnAllocated->Sort();
TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
Bool_t nonLocalNodePossible;
if (fForceLocal)
nonLocalNodePossible = 0;
else
nonLocalNodePossible = firstNonLocalNode?
(firstNonLocalNode->GetExtSlaveCnt() < fgMaxSlaveCnt):0;
openLocal = !nonLocalNodePossible;
Float_t slaveRate = slstat->GetAvgRate();
if ( nonLocalNodePossible && fgStrategy == 1) {
if ( slstat->GetFileNode()->GetRunSlaveCnt() >
slstat->GetFileNode()->GetMySlaveCnt() - 1 )
openLocal = kTRUE;
else if ( slaveRate == 0 ) {
if ( slstat->GetLocalEventsLeft() * localPreference
> (avgEventsLeftPerSlave))
openLocal = kTRUE;
else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
< slstat->GetLocalEventsLeft() * localPreference )
openLocal = kTRUE;
else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
openLocal = kTRUE;
else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
openLocal = kTRUE;
} else {
Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
Float_t avgTime = avgEventsLeftPerSlave
/(fProgressStatus->GetEntries()/GetCumProcTime());
if (slaveTime * localPreference > avgTime)
openLocal = kTRUE;
else if ((firstNonLocalNode->GetEventsLeftPerSlave())
< slstat->GetLocalEventsLeft() * localPreference)
openLocal = kTRUE;
}
}
if (openLocal || fgStrategy == 0) {
file = slstat->GetFileNode()->GetNextUnAlloc();
if (!file)
file = slstat->GetFileNode()->GetNextActive();
if ( file == 0 ) {
slstat->SetFileNode(0);
}
}
}
if(file == 0 && !fForceLocal) {
file = GetNextUnAlloc();
}
if(file == 0 && !fForceLocal) {
file = GetNextActive();
}
if ( file == 0 ) return 0;
slstat->fCurFile = file;
if (file->GetNode()->GetMySlaveCnt() == 0 &&
file->GetElement()->GetFirst() == file->GetNextEntry()) {
fNEventsOnRemLoc -= file->GetElement()->GetNum();
if (fNEventsOnRemLoc < 0) {
Error("GetNextPacket",
"inconsistent value for fNEventsOnRemLoc (%d): stop delivering packets!",
fNEventsOnRemLoc);
return 0;
}
}
file->GetNode()->IncExtSlaveCnt(slstat->GetName());
file->GetNode()->IncRunSlaveCnt();
if (gPerfStats != 0) {
gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
file->GetNode()->GetName(),
file->GetElement()->GetFileName(), kTRUE);
}
}
Long64_t num = CalculatePacketSize(slstat);
TDSetElement *base = file->GetElement();
Long64_t first = file->GetNextEntry();
Long64_t last = base->GetFirst() + base->GetNum();
if ( first + num * 1.5 >= last ) {
num = last - first;
file->SetDone();
RemoveActive(file);
} else {
file->MoveNextEntry(num);
}
slstat->fCurElem = CreateNewPacket(base, first, num);
if (base->GetEntryList())
slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
return slstat->fCurElem;
}
Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t, Long64_t &ent,
Long64_t &bytes)
{
ent = GetEntriesProcessed();
bytes = GetBytesRead();
if (fUseEstOpt == kEstOff)
return 0;
Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
Float_t trate = 0.;
if (fSlaveStats && fSlaveStats->GetSize() > 0) {
ent = 0;
TIter nxw(fSlaveStats);
TObject *key;
while ((key = nxw()) != 0) {
TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
if (slstat) {
Long64_t e = slstat->GetEntriesProcessed();
Float_t dt = (t > slstat->GetProcTime()) ? t - slstat->GetProcTime() : 0;
Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
: slstat->GetAvgRate();
trate += rate;
e += (Long64_t) (dt * rate);
ent += e;
PDB(kPacketizer,3)
Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
slstat->fSlave->GetOrdinal(),
slstat->GetEntriesProcessed(), rate, dt, e);
}
}
}
PDB(kPacketizer,2)
Info("GetEstEntriesProcessed",
"estimated entries: %lld, bytes read: %lld rate: %f", ent, bytes, trate);
ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
return 0;
}
void TPacketizerAdaptive::MarkBad(TSlave *s, TProofProgressStatus *status,
TList **listOfMissingFiles)
{
TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
if (!slaveStat) {
Error("MarkBad", "Worker does not exist");
return;
}
if (!status) {
TList *subSet = slaveStat->GetProcessedSubSet();
if (slaveStat->fCurElem) {
subSet->Add(slaveStat->fCurElem);
}
if (subSet)
SplitPerHost(subSet, listOfMissingFiles);
(*fProgressStatus) -= *(slaveStat->GetProgressStatus());
subSet->SetOwner(0);
}
fSlaveStats->Remove(s);
delete slaveStat;
InitStats();
}
Int_t TPacketizerAdaptive::ReassignPacket(TDSetElement *e,
TList **listOfMissingFiles)
{
if (!e) {
Error("ReassignPacket", "Empty packet!");
return -1;
}
TUrl url = e->GetFileName();
TString host;
if ( !url.IsValid() ||
(strncmp(url.GetProtocol(),"root", 4) &&
strncmp(url.GetProtocol(),"rfio", 4))) {
host = "no-host";
} else {
host = url.GetHost();
}
TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
if (node) {
node->DecreaseProcessed(e->GetNum());
node->Add( e );
if (!fUnAllocated->FindObject(node))
fUnAllocated->Add(node);
return 0;
} else {
TFileInfo *fi = e->GetFileInfo();
if (listOfMissingFiles)
(*listOfMissingFiles)->Add((TObject *)fi);
return -1;
}
}
void TPacketizerAdaptive::SplitPerHost(TList *elements,
TList **listOfMissingFiles)
{
if (!elements) {
Error("SplitPerHost", "Empty list of packets!");
return;
}
if (elements->GetSize() <= 0) {
Error("SplitPerHost", "The input list contains no elements");
return;
}
TIter subSetIter(elements);
TDSetElement *e;
while ((e = (TDSetElement*) subSetIter.Next())) {
if (ReassignPacket(e, listOfMissingFiles) == -1) {
if (elements->Remove(e))
Error("ReassignPacket", "Error removing a missing file");
delete e;
}
}
}
Last change: Mon Dec 15 13:03:16 2008
Last generated: 2008-12-15 13:03
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.