#ifdef WIN32
# include <io.h>
#endif
#include "TQueryResultManager.h"
#include "TFile.h"
#include "THashList.h"
#include "TKey.h"
#include "TProofQueryResult.h"
#include "TObjString.h"
#include "TParameter.h"
#include "TProof.h"
#include "TProofServ.h"
#include "TRegexp.h"
#include "TSortedList.h"
#include "TSystem.h"
#include "TVirtualProofPlayer.h"
TQueryResultManager::TQueryResultManager(const char *qdir, const char *stag,
const char *sdir,
TProofLockPath *lck, FILE *logfile)
{
fQueryDir = qdir;
fSessionTag = stag;
fSessionDir = sdir;
fSeqNum = 0;
fDrawQueries = 0;
fKeptQueries = 0;
fQueries = new TList;
fPreviousQueries = 0;
fLock = lck;
fLogFile = (logfile) ? logfile : stdout;
}
TQueryResultManager::~TQueryResultManager()
{
SafeDelete(fQueries);
SafeDelete(fPreviousQueries);
}
void TQueryResultManager::AddLogFile(TProofQueryResult *pq)
{
if (!pq)
return;
fflush(fLogFile);
off_t lnow = lseek(fileno(fLogFile), (off_t) 0, SEEK_CUR);
Int_t start = pq->fStartLog;
if (start > -1)
lseek(fileno(fLogFile), (off_t) start, SEEK_SET);
const Int_t kMAXBUF = 4096;
char line[kMAXBUF];
while (fgets(line, sizeof(line), fLogFile)) {
if (line[strlen(line)-1] == '\n')
line[strlen(line)-1] = 0;
pq->AddLogLine((const char *)line);
}
lseek(fileno(fLogFile), lnow, SEEK_SET);
}
Int_t TQueryResultManager::CleanupQueriesDir()
{
Int_t nd = 0;
if (fPreviousQueries) {
fPreviousQueries->Delete();
SafeDelete(fPreviousQueries);
}
TString queriesdir = fQueryDir;
queriesdir = queriesdir.Remove(queriesdir.Index(kPROOF_QueryDir) +
strlen(kPROOF_QueryDir));
void *dirs = gSystem->OpenDirectory(queriesdir);
char *sess = 0;
while ((sess = (char *) gSystem->GetDirEntry(dirs))) {
if (strlen(sess) < 7 || strncmp(sess,"session",7))
continue;
if (strstr(sess, fSessionTag))
continue;
TString qdir = Form("%s/%s", queriesdir.Data(), sess);
Info("RemoveQuery", "removing directory: %s", qdir.Data());
gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
nd++;
}
return nd;
}
void TQueryResultManager::ScanPreviousQueries(const char *dir)
{
if (fPreviousQueries) {
fPreviousQueries->Delete();
SafeDelete(fPreviousQueries);
}
void *dirs = gSystem->OpenDirectory(dir);
char *sess = 0;
while ((sess = (char *) gSystem->GetDirEntry(dirs))) {
if (strlen(sess) < 7 || strncmp(sess,"session",7))
continue;
if (strstr(sess, fSessionTag))
continue;
void *dirq = gSystem->OpenDirectory(Form("%s/%s", dir, sess));
char *qry = 0;
while ((qry = (char *) gSystem->GetDirEntry(dirq))) {
if (qry[0] == '.')
continue;
TString fn = Form("%s/%s/%s/query-result.root", dir, sess, qry);
TFile *f = TFile::Open(fn);
if (f) {
f->ReadKeys();
TIter nxk(f->GetListOfKeys());
TKey *k = 0;
TProofQueryResult *pqr = 0;
while ((k = (TKey *)nxk())) {
if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
pqr = (TProofQueryResult *) f->Get(k->GetName());
if (pqr) {
TQueryResult *qr = pqr->CloneInfo();
if (!fPreviousQueries)
fPreviousQueries = new TList;
if (qr->GetStatus() > TQueryResult::kRunning) {
fPreviousQueries->Add(qr);
} else {
TProofLockPath *lck = 0;
if (LockSession(qr->GetTitle(), &lck) == 0) {
RemoveQuery(qr);
SafeDelete(lck);
}
}
}
}
}
f->Close();
delete f;
}
}
gSystem->FreeDirectory(dirq);
}
gSystem->FreeDirectory(dirs);
}
Int_t TQueryResultManager::ApplyMaxQueries(Int_t mxq)
{
if (mxq < 0)
return 0;
TSortedList *sl = new TSortedList;
sl->SetOwner();
THashList *hl = new THashList;
hl->SetOwner();
TList *dl = new TList;
dl->SetOwner();
TString dir = fQueryDir;
Int_t idx = dir.Index("session-");
if (idx != kNPOS)
dir.Remove(idx);
void *dirs = gSystem->OpenDirectory(dir);
char *sess = 0;
while ((sess = (char *) gSystem->GetDirEntry(dirs))) {
if (strlen(sess) < 7 || strncmp(sess,"session",7))
continue;
if (strstr(sess, fSessionTag))
continue;
Int_t nq = 0;
void *dirq = gSystem->OpenDirectory(Form("%s/%s", dir.Data(), sess));
char *qry = 0;
while ((qry = (char *) gSystem->GetDirEntry(dirq))) {
if (qry[0] == '.')
continue;
TString fn = Form("%s/%s/%s/query-result.root", dir.Data(), sess, qry);
FileStat_t st;
if (gSystem->GetPathInfo(fn, st)) {
Info("ApplyMaxQueries","file '%s' cannot be stated: remove it", fn.Data());
gSystem->Unlink(gSystem->DirName(fn));
continue;
}
sl->Add(new TObjString(Form("%d",st.fMtime)));
hl->Add(new TNamed((const char *)Form("%d",st.fMtime),fn.Data()));
nq++;
}
gSystem->FreeDirectory(dirq);
if (nq > 0)
dl->Add(new TParameter<Int_t>(Form("%s/%s", dir.Data(), sess), nq));
else
gSystem->Exec(Form("%s -fr %s/%s", kRM, dir.Data(), sess));
}
gSystem->FreeDirectory(dirs);
TIter nxq(sl, kIterBackward);
Int_t nqkept = 0;
TObjString *os = 0;
while ((os = (TObjString *)nxq())) {
if (nqkept < mxq) {
nqkept++;
} else {
TNamed *nm = dynamic_cast<TNamed *>(hl->FindObject(os->GetName()));
if (nm) {
gSystem->Unlink(nm->GetTitle());
TString tdir(gSystem->DirName(nm->GetTitle()));
tdir = gSystem->DirName(tdir.Data());
TParameter<Int_t> *nq = dynamic_cast<TParameter<Int_t>*>(dl->FindObject(tdir));
if (nq) {
Int_t val = nq->GetVal();
nq->SetVal(--val);
if (nq->GetVal() <= 0)
gSystem->Exec(Form("%s -fr %s", kRM, tdir.Data()));
}
}
}
}
delete sl;
delete hl;
delete dl;
return 0;
}
Int_t TQueryResultManager::LockSession(const char *sessiontag, TProofLockPath **lck)
{
if (strstr(sessiontag, fSessionTag))
return 0;
if (!lck) {
Info("LockSession","locker space undefined");
return -1;
}
*lck = 0;
TString stag = sessiontag;
TRegexp re("session-.*-.*-.*-.*");
Int_t i1 = stag.Index(re);
if (i1 == kNPOS) {
Info("LockSession","bad format: %s", sessiontag);
return -1;
}
stag.ReplaceAll("session-","");
Int_t i2 = stag.Index(":q");
if (i2 != kNPOS)
stag.Remove(i2);
TString parlog = fSessionDir;
parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
parlog += stag;
if (!gSystem->AccessPathName(parlog)) {
Info("LockSession","parent still running: do nothing");
return -1;
}
if (fLock) {
TString qlock = fLock->GetName();
qlock.ReplaceAll(fSessionTag, stag);
if (!gSystem->AccessPathName(qlock)) {
*lck = new TProofLockPath(qlock);
if (((*lck)->Lock()) < 0) {
Info("LockSession","problems locking query lock file");
SafeDelete(*lck);
return -1;
}
}
}
return 0;
}
Int_t TQueryResultManager::CleanupSession(const char *sessiontag)
{
if (!sessiontag) {
Info("CleanupSession","session tag undefined");
return -1;
}
TString qdir = fQueryDir;
qdir.ReplaceAll(Form("session-%s", fSessionTag.Data()), sessiontag);
Int_t idx = qdir.Index(":q");
if (idx != kNPOS)
qdir.Remove(idx);
if (gSystem->AccessPathName(qdir)) {
Info("CleanupSession","query dir %s does not exist", qdir.Data());
return -1;
}
TProofLockPath *lck = 0;
if (LockSession(sessiontag, &lck) == 0) {
gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
if (lck) {
gSystem->Unlink(lck->GetName());
SafeDelete(lck);
}
return 0;
}
Info("CleanupSession", "could not lock session %s", sessiontag);
return -1;
}
void TQueryResultManager::SaveQuery(TProofQueryResult *qr, const char *fout)
{
if (!qr || qr->IsDraw())
return;
TString querydir = Form("%s/%d",fQueryDir.Data(), qr->GetSeqNum());
if (gSystem->AccessPathName(querydir))
gSystem->MakeDirectory(querydir);
TString ofn = fout ? fout : Form("%s/query-result.root", querydir.Data());
TFile *f = TFile::Open(ofn, "RECREATE");
if (f) {
f->cd();
if (!(qr->IsArchived()))
qr->SetResultFile(ofn);
qr->Write();
f->Close();
delete f;
}
}
void TQueryResultManager::RemoveQuery(const char *queryref, TList *otherlist)
{
PDB(kGlobal, 1)
Info("RemoveQuery", "Enter");
Int_t qry = -1;
TString qdir;
TProofQueryResult *pqr = LocateQuery(queryref, qry, qdir);
if (pqr) {
if (qry > -1) {
fQueries->Remove(pqr);
if (otherlist) otherlist->Remove(pqr);
} else
fPreviousQueries->Remove(pqr);
delete pqr;
pqr = 0;
}
Info("RemoveQuery", "removing directory: %s", qdir.Data());
gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
return;
}
void TQueryResultManager::RemoveQuery(TQueryResult *qr, Bool_t soft)
{
PDB(kGlobal, 1)
Info("RemoveQuery", "Enter");
if (!qr)
return;
TString qdir = fQueryDir;
qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
qdir = Form("%s/%s/%d", qdir.Data(), qr->GetTitle(), qr->GetSeqNum());
PDB(kGlobal, 1)
Info("RemoveQuery", "removing directory: %s", qdir.Data());
gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
if (soft) {
TQueryResult *qrn = qr->CloneInfo();
Int_t idx = fQueries->IndexOf(qr);
if (idx > -1)
fQueries->AddAt(qrn, idx);
else
SafeDelete(qrn);
}
fQueries->Remove(qr);
SafeDelete(qr);
return;
}
TProofQueryResult *TQueryResultManager::LocateQuery(TString queryref, Int_t &qry, TString &qdir)
{
TProofQueryResult *pqr = 0;
qry = -1;
if (queryref.IsDigit()) {
qry = queryref.Atoi();
} else if (queryref.Contains(fSessionTag)) {
Int_t i1 = queryref.Index(":q");
if (i1 != kNPOS) {
queryref.Remove(0,i1+2);
qry = queryref.Atoi();
}
}
qdir = "";
if (qry > -1) {
PDB(kGlobal, 1)
Info("LocateQuery", "local query: %d", qry);
if (fQueries) {
TIter nxq(fQueries);
while ((pqr = (TProofQueryResult *) nxq())) {
if (pqr->GetSeqNum() == qry) {
qdir = Form("%s/%d", fQueryDir.Data(), qry);
break;
}
}
}
} else {
PDB(kGlobal, 1)
Info("LocateQuery", "previously processed query: %s", queryref.Data());
if (fPreviousQueries) {
TIter nxq(fPreviousQueries);
while ((pqr = (TProofQueryResult *) nxq())) {
if (queryref.Contains(pqr->GetTitle()) &&
queryref.Contains(pqr->GetName()))
break;
}
}
queryref.ReplaceAll(":q","/");
qdir = fQueryDir;
qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
qdir = Form("%s/%s", qdir.Data(), queryref.Data());
}
return pqr;
}
Bool_t TQueryResultManager::FinalizeQuery(TProofQueryResult *pq,
TProof *proof, TVirtualProofPlayer *player)
{
if (!pq || !proof || !player) {
Warning("FinalizeQuery", "bad inputs: query = %p, proof = %p, player: %p ",
pq ? pq : 0, proof ? proof : 0, player ? player : 0);
return kFALSE;
}
Int_t qn = pq->GetSeqNum();
Long64_t np = player->GetEventsProcessed();
TVirtualProofPlayer::EExitStatus est = player->GetExitStatus();
TList *out = player->GetOutputList();
Float_t cpu = proof->GetCpuTime();
Long64_t bytes = proof->GetBytesRead();
TQueryResult::EQueryStatus st = TQueryResult::kAborted;
PDB(kGlobal, 2) Info("FinalizeQuery","query #%d", qn);
PDB(kGlobal, 1)
Info("FinalizeQuery","%.1f %lld", cpu, bytes);
Bool_t save = kTRUE;
switch (est) {
case TVirtualProofPlayer::kAborted:
PDB(kGlobal, 1)
Info("FinalizeQuery", "query %d has been ABORTED <====", qn);
out = 0;
save = kFALSE;
break;
case TVirtualProofPlayer::kStopped:
PDB(kGlobal, 1)
Info("FinalizeQuery",
"query %d has been STOPPED: %d events processed", qn, np);
st = TQueryResult::kStopped;
break;
case TVirtualProofPlayer::kFinished:
PDB(kGlobal, 1)
Info("FinalizeQuery",
"query %d has been completed: %d events processed", qn, np);
st = TQueryResult::kCompleted;
break;
default:
Warning("FinalizeQuery",
"query %d: unknown exit status (%d)", qn, player->GetExitStatus());
}
PDB(kGlobal, 1)
Info("FinalizeQuery", "cpu: %.4f, saved: %.4f, master: %.4f",
cpu, pq->GetUsedCPU() ,GetCpuTime());
pq->SetProcessInfo(np, cpu - pq->GetUsedCPU() + GetCpuTime());
pq->RecordEnd(st, out);
AddLogFile(pq);
return save;
}
void TQueryResultManager::SaveQuery(TProofQueryResult *pq, Int_t mxq)
{
if (mxq > -1) {
if (fQueries && fKeptQueries >= mxq) {
TQueryResult *fcom = 0;
TQueryResult *farc = 0;
TIter nxq(fQueries);
TQueryResult *qr = 0;
while (fKeptQueries >= mxq) {
while ((qr = (TQueryResult *) nxq())) {
if (qr->IsArchived()) {
if (qr->GetOutputList() && !farc)
farc = qr;
} else if (qr->GetStatus() > TQueryResult::kRunning && !fcom) {
fcom = qr;
}
if (farc && fcom)
break;
}
if (farc) {
RemoveQuery(farc, kTRUE);
fKeptQueries--;
} else if (fcom) {
RemoveQuery(fcom);
fKeptQueries--;
}
if (!farc && !fcom)
break;
}
}
if (fKeptQueries < mxq) {
SaveQuery(pq);
fKeptQueries++;
} else {
if (gProofServ) {
gProofServ->SendAsynMessage(Form("Too many saved queries (%d):"
" cannot save %s:%s",
fKeptQueries, pq->GetTitle(),
pq->GetName()));
} else {
Info("SaveQuery", "Too many saved queries (%d): cannot save %s:%s",
fKeptQueries, pq->GetTitle(), pq->GetName());
}
}
} else {
SaveQuery(pq);
fKeptQueries++;
}
}
Last change: Wed Oct 22 12:04:03 2008
Last generated: 2008-10-22 12:04
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.