#include "TPacketizerUnit.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TMath.h"
#include "TObjString.h"
using namespace TMath;
class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
friend class TPacketizerUnit;
private:
Long64_t fLastProcessed;
Double_t fSpeed;
Double_t fTimeInstant;
TNtupleD *fCircNtp;
Long_t fCircLvl;
public:
TSlaveStat(TSlave *sl, TList *input);
~TSlaveStat();
void GetCurrentTime();
const char *GetName() const { return fSlave->GetName(); }
void UpdatePerformance(Double_t time);
TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
};
TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
: fLastProcessed(0),
fSpeed(0), fTimeInstant(0), fCircLvl(5)
{
fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
fCircNtp->SetCircular(fCircLvl);
fSlave = slave;
fStatus = new TProofProgressStatus();
}
TPacketizerUnit::TSlaveStat::~TSlaveStat()
{
SafeDelete(fCircNtp);
}
void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
{
Double_t ttot = time;
Double_t *ar = fCircNtp->GetArgs();
Int_t ne = fCircNtp->GetEntries();
if (ne <= 0) {
fCircNtp->Fill(0., 0);
fSpeed = 0.;
return;
}
fCircNtp->GetEntry(ne-1);
ttot = ar[0] + time;
fCircNtp->Fill(ttot, GetEntriesProcessed());
fCircNtp->GetEntry(0);
Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
fSpeed = nevts / dtime;
PDB(kPacketizer,2)
Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
time, dtime, nevts, fSpeed);
}
TProofProgressStatus *TPacketizerUnit::TSlaveStat::AddProcessed(TProofProgressStatus *st)
{
if (st) {
TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
*fStatus += *diff;
return diff;
} else {
Error("AddProcessed", "status arg undefined");
return 0;
}
}
ClassImp(TPacketizerUnit)
TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input,
TProofProgressStatus *st)
: TVirtualPacketizer(input, st)
{
PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
fSlaveStats = 0;
fPackets = 0;
fTimeLimit = 1;
TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", fTimeLimit);
PDB(kPacketizer,1)
Info("TPacketizerUnit", "time limit is %lf", fTimeLimit);
fProcessing = 0;
fAssigned = 0;
fStopwatch = new TStopwatch();
fPackets = new TList;
fPackets->SetOwner();
fSlaveStats = new TMap;
fSlaveStats->SetOwner(kFALSE);
TSlave *slave;
TIter si(slaves);
while ((slave = (TSlave*) si.Next()))
fSlaveStats->Add(slave, new TSlaveStat(slave, input));
fTotalEntries = num;
fStopwatch->Start();
PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
}
TPacketizerUnit::~TPacketizerUnit()
{
if (fSlaveStats)
fSlaveStats->DeleteValues();
SafeDelete(fSlaveStats);
SafeDelete(fPackets);
SafeDelete(fStopwatch);
}
Double_t TPacketizerUnit::GetCurrentTime()
{
Double_t retValue = fStopwatch->RealTime();
fStopwatch->Continue();
return retValue;
}
TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
{
if (!fValid)
return 0;
TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(sl);
R__ASSERT(slstat != 0);
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
Double_t latency, proctime, proccpu;
Long64_t bytesRead = -1;
Long64_t totalEntries = -1;
Long64_t totev = 0;
Long64_t numev = -1;
TProofProgressStatus *status = 0;
if (sl->GetProtocol() > 18) {
(*r) >> latency;
(*r) >> status;
TProofProgressStatus *progress = 0;
if (status) {
numev = status->GetEntries() - slstat->GetEntriesProcessed();
progress = slstat->AddProcessed(status);
if (progress) {
proctime = progress->GetProcTime();
proccpu = progress->GetCPUTime();
totev = status->GetEntries();
bytesRead = progress->GetBytesRead();
delete progress;
}
delete status;
} else
Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
} else {
(*r) >> latency >> proctime >> proccpu;
if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
if (r->BufferSize() > r->Length()) (*r) >> totev;
numev = totev - slstat->GetEntriesProcessed();
slstat->GetProgressStatus()->IncEntries(numev);
}
fProgressStatus->IncEntries(numev);
fProcessing = 0;
PDB(kPacketizer,2)
Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
sl->GetOrdinal(), sl->GetName(),
numev, latency, proctime, proccpu, bytesRead);
if (gPerfStats != 0) {
gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
latency, proctime, proccpu, bytesRead);
}
if (fAssigned == fTotalEntries) {
HandleTimer(0);
return 0;
}
if (fStop) {
HandleTimer(0);
return 0;
}
Long64_t num;
Double_t cTime = GetCurrentTime();
if (slstat->fCircNtp->GetEntries() <= 0) {
PDB(kPacketizer,2) Info("GetNextPacket"," calibration: "
"total entries %lld, workers %d",
fTotalEntries, fSlaveStats->GetSize());
Long64_t avg = fTotalEntries/(fSlaveStats->GetSize());
num = (avg > 5) ? 5 : avg;
slstat->UpdatePerformance(0.);
} else {
slstat->UpdatePerformance(proctime);
TMapIter *iter = (TMapIter *)fSlaveStats->MakeIterator();
TSlave * tmpSlave;
TSlaveStat * tmpStat;
Double_t sumSpeed = 0;
Double_t sumBusy = 0;
while ((tmpSlave = (TSlave *)iter->Next())) {
tmpStat = (TSlaveStat *)fSlaveStats->GetValue(tmpSlave);
if ((cTime - tmpStat->fTimeInstant) > 4*fTimeLimit)
tmpStat->fSpeed = 0;
PDB(kPacketizer,2)
Info("GetNextPacket",
"worker-%s: speed %lf", tmpSlave->GetOrdinal(), tmpStat->fSpeed);
if (tmpStat->fSpeed > 0) {
sumSpeed += tmpStat->fSpeed;
if ((tmpStat->fTimeInstant) && (cTime - tmpStat->fTimeInstant > 0)) {
Double_t busyspeed =
((tmpStat->fTimeInstant - cTime)*tmpStat->fSpeed + tmpStat->fLastProcessed);
if (busyspeed > 0)
sumBusy += busyspeed;
}
}
}
PDB(kPacketizer,2)
Info("GetNextPacket", "worker-%s: sum speed: %lf, sum busy: %f",
sl->GetOrdinal(), sumSpeed, sumBusy);
if (slstat->fSpeed > 0 &&
(fTotalEntries - fAssigned)/(slstat->fSpeed) < fTimeLimit) {
num = (fTotalEntries - fAssigned);
} else {
if (slstat->fSpeed > 0) {
Double_t optTime = (fTotalEntries - fAssigned + sumBusy)/sumSpeed;
num = (optTime > fTimeLimit) ? Nint(fTimeLimit*(slstat->fSpeed))
: Nint(optTime*(slstat->fSpeed));
PDB(kPacketizer,2)
Info("GetNextPacket", "opTime %lf num %lld speed %lf", optTime, num, slstat->fSpeed);
} else {
Long64_t avg = fTotalEntries/(fSlaveStats->GetSize());
num = (avg > 5) ? 5 : avg;
}
}
}
num = (num > 1) ? num : 1;
fProcessing = (num < (fTotalEntries - fAssigned)) ? num
: (fTotalEntries - fAssigned);
slstat->fLastProcessed = fProcessing;
slstat->fTimeInstant = cTime;
PDB(kPacketizer,2)
Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
num, fProcessing, (fTotalEntries - fAssigned - fProcessing));
TDSetElement *elem = new TDSetElement("", "", "", 0, fProcessing);
elem->SetBit(TDSetElement::kEmpty);
fAssigned += slstat->fLastProcessed;
return elem;
}
Last change: Mon Dec 15 13:03:17 2008
Last generated: 2008-12-15 13:03
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.