#include "TTreeCacheUnzip.h"
#include "TChain.h"
#include "TBranch.h"
#include "TFile.h"
#include "TEventList.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TCondition.h"
#include "TMath.h"
#include "Bytes.h"
extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;
Double_t TTreeCacheUnzip::fgRelBuffSize = 0.2;
ClassImp(TTreeCacheUnzip)
TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
fUnzipThread(0),
fActiveThread(kFALSE),
fNewTransfer(kFALSE),
fTmpBufferSz(0),
fTmpBuffer(0),
fPosWrite(0),
fUnzipLen(0),
fUnzipPos(0),
fNseekMax(0),
fUnzipBufferSize(0),
fUnzipBuffer(0),
fSkipZip(0),
fUnzipStart(0),
fUnzipEnd(0),
fUnzipNext(0),
fNUnzip(0),
fNFound(0),
fNMissed(0)
{
Init();
}
TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
fUnzipThread(0),
fActiveThread(kFALSE),
fNewTransfer(kFALSE),
fTmpBufferSz(10000),
fTmpBuffer(new char[fTmpBufferSz]),
fPosWrite(0),
fUnzipLen(0),
fUnzipPos(0),
fNseekMax(0),
fUnzipBufferSize(0),
fUnzipBuffer(0),
fSkipZip(0),
fUnzipStart(0),
fUnzipEnd(0),
fUnzipNext(0),
fNUnzip(0),
fNFound(0),
fNMissed(0)
{
Init();
}
void TTreeCacheUnzip::Init()
{
fMutexCache = new TMutex(kTRUE);
fMutexBuffer = new TMutex();
fMutexList = new TMutex();
fUnzipCondition = new TCondition();
if (fgParallel == kDisable) {
fParallel = kFALSE;
}
else if(fgParallel == kEnable || fgParallel == kForce) {
SysInfo_t info;
gSystem->GetSysInfo(&info);
Int_t ncpus = info.fCpus;
if(ncpus > 1 || fgParallel == kForce) {
if(gDebug > 0)
Info("TTreeCacheUnzip", "Enabling Parallel Unzipping, number of cpus:%d", ncpus);
fParallel = kTRUE;
StartThreadUnzip();
}
else {
fParallel = kFALSE;
}
}
else {
Warning("TTreeCacheUnzip", "Parallel Option unknown");
}
}
TTreeCacheUnzip::~TTreeCacheUnzip()
{
if (IsActiveThread())
StopThreadUnzip();
delete fMutexCache;
delete fMutexBuffer;
delete fMutexList;
delete [] fUnzipBuffer;
delete [] fTmpBuffer;
delete [] fUnzipLen;
delete [] fUnzipPos;
delete fUnzipCondition;
}
void TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches )
{
R__LOCKGUARD(fMutexCache);
TTreeCache::AddBranch(b, subbranches);
}
void TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches )
{
R__LOCKGUARD(fMutexCache);
TTreeCache::AddBranch(branch, subbranches);
}
Bool_t TTreeCacheUnzip::FillBuffer()
{
R__LOCKGUARD(fMutexCache);
if (fNbranches <= 0) return kFALSE;
TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
Long64_t entry = tree->GetReadEntry();
if (!fIsManual && entry < fEntryNext) return kFALSE;
if (entry == -1) entry=0;
if (fZipBytes==0) {
fEntryNext = entry + tree->GetEntries();
} else {
fEntryNext = entry + tree->GetEntries()*fBufferSizeMin/fZipBytes;
}
if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
if (fEntryNext > fEntryMax) fEntryNext = fEntryMax+1;
Long64_t chainOffset = 0;
if (fOwner->GetEventList()) {
if (fOwner->IsA() == TChain::Class()) {
TChain *chain = (TChain*)fOwner;
Int_t t = chain->GetTreeNumber();
chainOffset = chain->GetTreeOffset()[t];
}
}
fMutexCache->UnLock();
ResetCache();
TFileCacheRead::Prefetch(0,0);
fMutexCache->Lock();
Bool_t mustBreak = kFALSE;
for (Int_t i=0;i<fNbranches;i++) {
if (mustBreak) break;
TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
Int_t nb = b->GetMaxBaskets();
Int_t *lbaskets = b->GetBasketBytes();
Long64_t *entries = b->GetBasketEntry();
if (!lbaskets || !entries) continue;
for (Int_t j=0;j<nb;j++) {
if (b->GetListOfBaskets()->UncheckedAt(j)) continue;
Long64_t pos = b->GetBasketSeek(j);
Int_t len = lbaskets[j];
if (pos <= 0 || len <= 0) continue;
if (entries[j] > fEntryNext) continue;
if (entries[j] < entry && (j<nb-1 && entries[j+1] < entry)) continue;
if (fOwner->GetEventList()) {
Long64_t emax = fEntryMax;
if (j<nb-1) emax = entries[j+1]-1;
if (!(fOwner->GetEventList())->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
}
fNReadPref++;
TFileCacheRead::Prefetch(pos,len);
if (fNtot > 2*fBufferSizeMin) {TFileCacheRead::Prefetch(0,0);mustBreak = kTRUE; break;}
}
if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
}
fIsLearning = kFALSE;
if (mustBreak) return kFALSE;
return kTRUE;
}
void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
{
R__LOCKGUARD(fMutexCache);
TTreeCache::SetEntryRange(emin, emax);
}
void TTreeCacheUnzip::StopLearningPhase()
{
TTreeCache::StopLearningPhase();
this->SendSignal();
}
void TTreeCacheUnzip::UpdateBranches(TTree *tree, Bool_t owner)
{
R__LOCKGUARD(fMutexCache);
TTreeCache::UpdateBranches(tree, owner);
}
TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
{
return fgParallel;
}
Bool_t TTreeCacheUnzip::IsParallelUnzip()
{
if (fgParallel == kEnable || fgParallel == kForce)
return kTRUE;
return kFALSE;
}
Bool_t TTreeCacheUnzip::IsActiveThread()
{
return fActiveThread;
}
Bool_t TTreeCacheUnzip::IsQueueEmpty()
{
R__LOCKGUARD(fMutexCache);
if ( fIsLearning )
return kTRUE;
return kFALSE;
}
Int_t TTreeCacheUnzip::ProcessQueue()
{
if ( IsQueueEmpty() )
return 0;
if (gDebug > 0) Info("ProcessQueue", " Calling UnzipCache() ");
return UnzipCache();
}
void TTreeCacheUnzip::SendSignal()
{
if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
fUnzipCondition->Signal();
}
Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
{
if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
fgParallel = option;
return 1;
}
return 0;
}
Int_t TTreeCacheUnzip::StartThreadUnzip()
{
if(!fUnzipThread) {
fActiveThread=kTRUE;
fUnzipThread= new TThread("UnzipLoop", UnzipLoop, (void*) this);
fUnzipThread->Run();
return 0;
}
return 1;
}
Int_t TTreeCacheUnzip::StopThreadUnzip()
{
if(fUnzipThread){
fActiveThread = kFALSE;
SendSignal();
if (fUnzipThread->Exists()) {
fUnzipThread->Join();
}
return 0;
}
return 1;
}
void TTreeCacheUnzip::WaitForSignal()
{
fUnzipCondition->Wait();
}
void* TTreeCacheUnzip::UnzipLoop(void *arg)
{
TTreeCacheUnzip *unzipMng = (TTreeCacheUnzip *)arg;
TThread::SetCancelOn();
TThread::SetCancelDeferred();
while( unzipMng->IsActiveThread() ) {
unzipMng->ProcessQueue();
if(!unzipMng->IsActiveThread()) break;
unzipMng->WaitForSignal();
}
return (void *)0;
}
Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
{
Version_t versionkey;
Short_t klen;
UInt_t datime;
Int_t nb = 0,olen;
Int_t nread = maxbytes;
frombuf(buf,&nb);
nbytes = nb;
if (nb < 0) return nread;
const Int_t headerSize = 16;
if (nread < headerSize) return nread;
frombuf(buf, &versionkey);
frombuf(buf, &olen);
frombuf(buf, &datime);
frombuf(buf, &klen);
if (!olen) olen = nbytes-klen;
objlen = olen;
keylen = klen;
return nread;
}
void TTreeCacheUnzip::ResetCache()
{
R__LOCKGUARD(fMutexList);
if (gDebug > 0)
Info("GetUnzipBuffer", "Thread: %d -- Reseting the cache", TThread::SelfId());
fUnzipStart = fUnzipEnd = fUnzipNext = 0;
fPosWrite = 0;
fNewTransfer = kTRUE;
}
Int_t TTreeCacheUnzip::GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
{
if (fParallel){
if ( fIsLearning ) {
ResetCache();
TFileCacheRead::Prefetch(0,0);
}
R__LOCKGUARD(fMutexCache);
if (fNseek > 0 && !fIsSorted) {
Sort();
if (fFile->ReadBuffers(fBuffer,fPos,fLen,fNb)) return -1;
fIsTransferred = kTRUE;
this->SendSignal();
}
Int_t loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
if (loc >= 0 && (pos == fSeekSort[loc]) ) {
fMutexList->Lock();
if( (loc >= fUnzipStart) && (loc < fUnzipEnd )) {
Long64_t locPos = fUnzipPos[loc];
Int_t locLen = fUnzipLen[loc];
fMutexBuffer->Lock();
if(!(*buf)) {
*buf = new char[locLen+1];
*free = kTRUE;
}
memcpy(*buf,&fUnzipBuffer[locPos], locLen);
fMutexBuffer->UnLock();
fNFound++;
if((loc == fUnzipEnd - 1) && (loc == fUnzipNext - 1))
this->SendSignal();
fMutexList->UnLock();
return locLen;
}
if (gDebug > 0)
Info("GetUnzipBuffer", "NOT found in the unzip cahe fUnzipStart:%d, fUnzipEnd:%d, fUnzipNext: %d, pos:%lld, len:%d",
fUnzipStart, fUnzipEnd, fUnzipNext, pos, len);
if (loc > fUnzipNext)
this->SendSignal();
fMutexList->UnLock();
}
}
char *comp = new char[len];
Bool_t found = kFALSE;
if (fNseek > 0 && !fIsSorted) {
if (gDebug > 0)
Info("GetUnzipBuffer", "This is a new transfer... must clean things up fNSeek:%d", fNseek);
ResetCache();
}
if (TFileCacheRead::ReadBuffer(comp,pos,len) == 1){
found = kTRUE;
}
if(!found) {
Bool_t bufferFilled = FillBuffer();
if (bufferFilled) {
if (TFileCacheRead::ReadBuffer(comp,pos,len) == 1){
found = kTRUE;
}
}
}
if (!found) {
fFile->Seek(pos);
if(fFile->ReadBuffer(comp, len)){
Error("GetUnzipBuffer", "Thread: %d -- Error reading from TFile ... must go out", TThread::SelfId());
delete [] comp;
return -1;
}
}
Int_t res = UnzipBuffer(buf, comp);
*free = kTRUE;
if (!fIsLearning) fNMissed++;
if (comp) delete [] comp;
return res;
}
void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
{
fUnzipBufferSize = bufferSize;
}
Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src)
{
Int_t uzlen = 0;
Bool_t alloc = kFALSE;
const Int_t hlen=128;
Int_t nbytes=0, objlen=0, keylen=0;
GetRecordHeader(src, hlen, nbytes, objlen, keylen);
if (gDebug > 1)
Info("UnzipBuffer", "nbytes:%d, objlen:%d, keylen:%d ", nbytes, objlen, keylen);
if (!(*dest)) {
*dest = new char[keylen+objlen];
alloc = kTRUE;
}
Bool_t oldCase = objlen==nbytes-keylen
&& ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
&& fFile->GetVersion()<=30401;
if (objlen > nbytes-keylen || oldCase) {
if (fSkipZip) {
memcpy(*dest, src, keylen);
uzlen += keylen;
memcpy(*dest, src + keylen, objlen);
uzlen += objlen;
return nbytes;
}
if (gDebug > 2)
Info("UnzipBuffer", "Copy the key keylen:%d from src:%p to *dest:%p", keylen, src, *dest);
memcpy(*dest, src, keylen);
uzlen += keylen;
char *objbuf = *dest + keylen;
UChar_t *bufcur = (UChar_t *) (src + keylen);
Int_t nin, nout, nbuf;
Int_t noutot = 0;
while (1) {
nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
if (gDebug > 2)
Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
if (oldCase && (nin > objlen || nbuf > objlen)) {
if (gDebug > 2)
Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
memcpy( *dest + keylen, src + keylen, objlen);
uzlen += objlen;
return uzlen;
}
R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
if (gDebug > 2)
Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
nin, bufcur, nbuf, objbuf, nout);
if (!nout) break;
noutot += nout;
if (noutot >= objlen) break;
bufcur += nin;
objbuf += nout;
}
if (noutot != objlen) {
Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
nbytes,keylen,objlen, noutot,nout,nin,nbuf);
uzlen = -1;
if(alloc) delete [] *dest;
return uzlen;
}
uzlen += objlen;
} else {
memcpy(*dest, src, keylen);
uzlen += keylen;
memcpy(*dest + keylen, src + keylen, objlen);
uzlen += objlen;
}
return uzlen;
}
Int_t TTreeCacheUnzip::UnzipCache()
{
if(!fIsTransferred) {
if (gDebug > 0)
Info("UnzipCache", "It is still in the learning phase");
return 0;
}
if (fUnzipBufferSize == 0) {
SetUnzipBufferSize((Long64_t)(fgRelBuffSize*fBufferSize));
fMutexBuffer->Lock();
fUnzipBuffer = new char[fUnzipBufferSize];
fMutexBuffer->UnLock();
if (gDebug > 0)
Info("UnzipCache", "Creating a buffer of %lld bytes ", fUnzipBufferSize);
}
if(fNseekMax < fNseek){
if (gDebug > 0)
Info("UnzipCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
fMutexList->Lock();
Int_t *aUnzipPos = new Int_t[fNseek];
Int_t *aUnzipLen = new Int_t[fNseek];
for (Int_t i=0;i<fNseekMax;i++) {
aUnzipPos[i] = fUnzipPos[i];
aUnzipLen[i] = fUnzipLen[i];
}
if (fUnzipPos) delete [] fUnzipPos;
if (fUnzipLen) delete [] fUnzipLen;
fUnzipPos = aUnzipPos;
fUnzipLen = aUnzipLen;
fNseekMax = fNseek;
fMutexList->UnLock();
}
fMutexList->Lock();
Long64_t locPos = 0;
Int_t locLen = 0;
fNewTransfer = kFALSE;
fUnzipStart = fUnzipEnd;
Long64_t reqbuffer = 0;
Int_t reqmax = fUnzipEnd;
Int_t unzipend = fUnzipEnd;
for (Int_t reqi=unzipend;reqi<fNseek; reqi++) {
const Int_t hlen=128;
Int_t objlen=0, keylen=0;
Int_t nbytes=0;
GetRecordHeader(&fBuffer[fSeekPos[reqi]], hlen, nbytes, objlen, keylen);
Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
if ((reqi == unzipend) && (len > fUnzipBufferSize)) {
if (gDebug > 0)
Info("UnzipCache", "One buffer is too big resizing from:%d to len*2:%d", fUnzipBufferSize, len*2);
fMutexBuffer->Lock();
char *newBuffer = new char[len*2];
memcpy(newBuffer, fUnzipBuffer, fPosWrite);
delete [] fUnzipBuffer;
SetUnzipBufferSize((Long64_t)(len*2));
fUnzipBuffer = newBuffer;
fMutexBuffer->UnLock();
}
if( (reqbuffer + len) > fUnzipBufferSize ){
if (gDebug > 0)
Info("UnzipCache", "Cache found the rigth size: %lld ... breaking ", reqbuffer );
break;
}
reqmax++;
reqbuffer += len;
}
fUnzipNext = reqmax;
if (gDebug > 0)
Info("UnzipCache", "Cache found the rigth size fUnzipStart:%d, fUnzipEnd:%d, fUnzipNext: %d, fNseek: %d",
fUnzipStart, fUnzipEnd, fUnzipNext, fNseek);
fMutexList->UnLock();
for (Int_t reqi = unzipend; reqi<fNseek; reqi++) {
if (!IsActiveThread() || !fNseek || fIsLearning || fNewTransfer || !fIsTransferred) {
if (gDebug > 0)
Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d, fNewTransfer:%d",
IsActiveThread(), fNseek, fIsLearning, fNewTransfer);
return 0;
}
const Int_t hlen=128;
Int_t objlen=0, keylen=0;
Int_t nbytes=0;
GetRecordHeader(&fBuffer[fSeekPos[reqi]], hlen, nbytes, objlen, keylen);
Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
if (reqi > unzipend) {
fMutexList->Lock();
locPos = fUnzipPos[reqi-1] + fUnzipLen[reqi-1];
locLen = 0;
fMutexList->UnLock();
}
if( (locPos + len) > fUnzipBufferSize ){
if (gDebug > 0)
Info("UnzipCache", "Cache is full.. breaking i:%d, fUnzipBufferSize: %lld, locPos: %lld",
reqi, fUnzipBufferSize, locPos);
break;
}
if(fTmpBufferSz < fSeekSortLen[reqi]) {
delete [] fTmpBuffer;
fTmpBufferSz = fSeekSortLen[reqi]*2;
fTmpBuffer = new char[fTmpBufferSz];
}
fMutexList->Lock();
memcpy(fTmpBuffer, &fBuffer[fSeekPos[reqi]], fSeekSortLen[reqi]);
fMutexList->UnLock();
char *ptr = &fUnzipBuffer[locPos];
locLen = UnzipBuffer(&ptr, fTmpBuffer);
R__LOCKGUARD(fMutexList);
if (!IsActiveThread() || !fNseek || fIsLearning || fNewTransfer || !fIsTransferred)
return 0;
fPosWrite = locPos + locLen;
fUnzipPos[reqi] = locPos;
fUnzipLen[reqi] = locLen;
if (gDebug > 0)
Info("UnzipCache", "reqi:%d, fSeekPos[reqi]:%d, fSeekSortLen[reqi]: %d, fUnzipPos[reqi]:%d, fUnzipLen[reqi]:%d",
reqi, fSeekPos[reqi], fSeekSortLen[reqi], fUnzipPos[reqi], fUnzipLen[reqi]);
fUnzipEnd++;
fNUnzip++;
}
return 0;
}
Last change: Wed Jun 25 08:54:19 2008
Last generated: 2008-06-25 08:54
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.