// @(#)root/tree:$Id: TTreeCacheUnzip.cxx 23878 2008-05-16 14:35:09Z brun $
// Author: Leandro Franco   10/04/2008

/*************************************************************************
 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
// Parallel Unzipping                                                   //
//                                                                      //
// TTreeCache has been modified to create an additional thread to unzip //
// the buffers that are sitting on the cache, in that way we could      //
// improve the overall ROOT performance by executing two tasks in       //
// parallel.                                                            //
// The order is important and for the moment we will do it following    //
// the sorting algorithm in TTreeCache since they are usually read by   //
// entry.                                                               //
// By default the unzipping cache will need only 10% of the buffer for  //
// TTreCache so be careful with the memory (in a normal case you need   //
// 10MB for TTreeCache and 1MB for the unizp. buffer), to change it use //
// TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)                  //
// where bufferSize must be passed in bytes.                            //
//                                                                      //
// This is a implemented in a similar way to a consumer-producer model  //
// where the the producer will be TTreCache by transfering the data     //
// and the consumer will be additional Thread trying to unzip it.       //
//////////////////////////////////////////////////////////////////////////

#include "TTreeCacheUnzip.h"
#include "TChain.h"
#include "TBranch.h"
#include "TFile.h"
#include "TEventList.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TCondition.h"
#include "TMath.h"
#include "Bytes.h"

extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);

 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;

// Unzip cache is 10% of TTreeCache.
// if by default fBufferSize = 10MB
// then 0.1=1MB 0.01=100KB 0.001=10KB 0.0001=1KB
Double_t TTreeCacheUnzip::fgRelBuffSize = 0.2;

ClassImp(TTreeCacheUnzip)

//______________________________________________________________________________
TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
   fUnzipThread(0),
   fActiveThread(kFALSE),
   fNewTransfer(kFALSE),
   fTmpBufferSz(0),
   fTmpBuffer(0),
   fPosWrite(0),
   fUnzipLen(0),
   fUnzipPos(0),
   fNseekMax(0),
   fUnzipBufferSize(0),
   fUnzipBuffer(0),
   fSkipZip(0),
   fUnzipStart(0),
   fUnzipEnd(0),
   fUnzipNext(0),
   fNUnzip(0),
   fNFound(0),
   fNMissed(0)
{
   // Default Constructor.

   Init();
}

//______________________________________________________________________________
TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
   fUnzipThread(0),
   fActiveThread(kFALSE),
   fNewTransfer(kFALSE),
   fTmpBufferSz(10000),
   fTmpBuffer(new char[fTmpBufferSz]),
   fPosWrite(0),
   fUnzipLen(0),
   fUnzipPos(0),
   fNseekMax(0),
   fUnzipBufferSize(0),
   fUnzipBuffer(0),
   fSkipZip(0),
   fUnzipStart(0),
   fUnzipEnd(0),
   fUnzipNext(0),
   fNUnzip(0),
   fNFound(0),
   fNMissed(0)
{
   // Constructor.

   Init();
}

//______________________________________________________________________________
void TTreeCacheUnzip::Init()
{
   // Initialization procedure common to all the constructors

   fMutexCache       = new TMutex(kTRUE);
   fMutexBuffer      = new TMutex();
   fMutexList        = new TMutex();
   fUnzipCondition   = new TCondition();

   if (fgParallel == kDisable) {
      fParallel = kFALSE;
   }
   else if(fgParallel == kEnable || fgParallel == kForce) {
      SysInfo_t info;
      gSystem->GetSysInfo(&info);
      Int_t ncpus = info.fCpus;

      if(ncpus > 1 || fgParallel == kForce) {
         if(gDebug > 0)
            Info("TTreeCacheUnzip", "Enabling Parallel Unzipping, number of cpus:%d", ncpus);

         fParallel = kTRUE;
         StartThreadUnzip();
      }
      else {
         fParallel = kFALSE;
      }
   }
   else {
      Warning("TTreeCacheUnzip", "Parallel Option unknown");
   }
}

//______________________________________________________________________________
TTreeCacheUnzip::~TTreeCacheUnzip()
{
   // destructor. (in general called by the TFile destructor
   // destructor. (in general called by the TFile destructor)

   //ResetCache();

   if (IsActiveThread())
      StopThreadUnzip();

   delete fMutexCache;
   delete fMutexBuffer;
   delete fMutexList;

   delete [] fUnzipBuffer;
   delete [] fTmpBuffer;
   delete [] fUnzipLen;
   delete [] fUnzipPos;
   delete fUnzipCondition;
}

//_____________________________________________________________________________
void TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
{
   //add a branch to the list of branches to be stored in the cache
   //this function is called by TBranch::GetBasket
   R__LOCKGUARD(fMutexCache);

   TTreeCache::AddBranch(b, subbranches);
}

//_____________________________________________________________________________
void TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
{
   //add a branch to the list of branches to be stored in the cache
   //this function is called by TBranch::GetBasket
   R__LOCKGUARD(fMutexCache);

   TTreeCache::AddBranch(branch, subbranches);
}

//_____________________________________________________________________________
Bool_t TTreeCacheUnzip::FillBuffer()
{
   // Fill the cache buffer with the branches in the cache.
   R__LOCKGUARD(fMutexCache);

   //return TTreeCache::FillBuffer();

   if (fNbranches <= 0) return kFALSE;
   TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
   Long64_t entry = tree->GetReadEntry();

   if (!fIsManual && entry < fEntryNext) return kFALSE;

   // Triggered by the user, not the learning phase
   if (entry == -1)  entry=0;

   // Estimate number of entries that can fit in the cache compare it
   // to the original value of fBufferSize not to the real one
   if (fZipBytes==0) {
      fEntryNext = entry + tree->GetEntries();
   } else {
      fEntryNext = entry + tree->GetEntries()*fBufferSizeMin/fZipBytes;
   }
   if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
   if (fEntryNext > fEntryMax) fEntryNext = fEntryMax+1;

   //check if owner has a TEventList set. If yes we optimize for this special case
   //reading only the baskets containing entries in the list
   Long64_t chainOffset = 0;
   if (fOwner->GetEventList()) {
      if (fOwner->IsA() == TChain::Class()) {
         TChain *chain = (TChain*)fOwner;
         Int_t t = chain->GetTreeNumber();
         chainOffset = chain->GetTreeOffset()[t];
      }
   }

   fMutexCache->UnLock();
   //clear cache buffer
   ResetCache();
   TFileCacheRead::Prefetch(0,0);
   fMutexCache->Lock();
   //store baskets
   Bool_t mustBreak = kFALSE;
   for (Int_t i=0;i<fNbranches;i++) {
      if (mustBreak) break;
      TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
      Int_t nb = b->GetMaxBaskets();
      Int_t *lbaskets   = b->GetBasketBytes();
      Long64_t *entries = b->GetBasketEntry();
      if (!lbaskets || !entries) continue;
      //we have found the branch. We now register all its baskets
      //from the requested offset to the basket below fEntrymax
      for (Int_t j=0;j<nb;j++) {
         // This basket has already been read, skip it
         if (b->GetListOfBaskets()->UncheckedAt(j)) continue;

         Long64_t pos = b->GetBasketSeek(j);
         Int_t len = lbaskets[j];
         if (pos <= 0 || len <= 0) continue;
         if (entries[j] > fEntryNext) continue;
         if (entries[j] < entry && (j<nb-1 && entries[j+1] < entry)) continue;
         if (fOwner->GetEventList()) {
            Long64_t emax = fEntryMax;
            if (j<nb-1) emax = entries[j+1]-1;
            if (!(fOwner->GetEventList())->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
         }
         fNReadPref++;
         TFileCacheRead::Prefetch(pos,len);
         //we allow up to twice the default buffer size. When using eventlist in particular
         //it may happen that the evaluation of fEntryNext is bad, hence this protection
         if (fNtot > 2*fBufferSizeMin) {TFileCacheRead::Prefetch(0,0);mustBreak = kTRUE; break;}
      }
      if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
   }
   fIsLearning = kFALSE;
   if (mustBreak) return kFALSE;
   return kTRUE;
}

//_____________________________________________________________________________
void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
{
   // Set the minimum and maximum entry number to be processed
   // this information helps to optimize the number of baskets to read
   // when prefetching the branch buffers.
   R__LOCKGUARD(fMutexCache);

   TTreeCache::SetEntryRange(emin, emax);
}

//_____________________________________________________________________________
void TTreeCacheUnzip::StopLearningPhase()
{
   // It's the same as TTreeCache::StopLearningPhase but we guarantee that
   // we start the unzipping just after getting the buffers

   TTreeCache::StopLearningPhase();
   this->SendSignal();
}

//_____________________________________________________________________________
void TTreeCacheUnzip::UpdateBranches(TTree *tree, Bool_t owner)
{
   //update pointer to current Tree and recompute pointers to the branches in the cache
   R__LOCKGUARD(fMutexCache);

   TTreeCache::UpdateBranches(tree, owner);
}

///////////////////////////////////////////////////////////////////////////////
//                                                                           //
// From now on we have the method concerning the threading part of the cache //
//                                                                           //
///////////////////////////////////////////////////////////////////////////////

//_____________________________________________________________________________
TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
{
   // Static function that returns the parallel option
   // (to indicate an additional thread)

   return fgParallel;
}

//_____________________________________________________________________________
Bool_t TTreeCacheUnzip::IsParallelUnzip()
{
   // Static function that tells wether the multithreading unzipping
   // is activated

   if (fgParallel == kEnable || fgParallel == kForce)
      return kTRUE;

   return kFALSE;
}

//_____________________________________________________________________________
Bool_t TTreeCacheUnzip::IsActiveThread()
{
   // This indicates if the thread is active in this moment...
   // this variable is very important because if we change it from true to
   // false the thread will stop... ( see StopThreadTreeCacheUnzip() )

   return fActiveThread;
}

//_____________________________________________________________________________
Bool_t TTreeCacheUnzip::IsQueueEmpty()
{
   // It says if the queue is empty... useful to see if we have to process
   // it.
   R__LOCKGUARD(fMutexCache);

   if ( fIsLearning )
      return kTRUE;

   return kFALSE;
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::ProcessQueue()
{
   // This will traverse the queue and read all the buffers to put them
   // in the cache... after reading each buffer it will send a signal and
   // if someone is waiting for it, it will wake up and will check the queue
   // again to see if the buffer it was waiting for was the buffer that was
   // just processed.
   // It works that way because I can not have one signal per-buffer...
   // instead there is only one signal and all the methods waiting for buffer
   // will know that a "new" buffer has been processed (even if that particular
   // method is not the one waiting for the buffer)

   if ( IsQueueEmpty() )
      return 0;

   if (gDebug > 0) Info("ProcessQueue", " Calling UnzipCache() ");

   return UnzipCache();
}

//_____________________________________________________________________________
void TTreeCacheUnzip::SendSignal()
{
   // This will send the signal corresponfing to the queue... normally used
   // when we want to start processing the list of buffers.

   if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");

   fUnzipCondition->Signal();
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
{
   // Static function that(de)activates multithreading unzipping
   // The possible options are:
   // kEnable _Enable_ it, which causes an automatic detection and launches the
   // additional thread if the number of cores in the machine is greater than one
   // kDisable _Disable_ will not activate the additional thread.
   // kForce _Force_ will start the additional thread even if there is only one core.
   // the default will be taken as kEnable.
   // returns 0 if there was an error, 1 otherwise.

   if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
      fgParallel = option;
      return 1;
   }
   return 0;
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::StartThreadUnzip()
{
   // The Thread is only a part of the TTreeCache but it is the part that
   // waits for info in the queue and process it... unfortunatly, a Thread is
   // not an object an we have to deal with it in the old C-Style way
   // Returns 0 if the thread was initialized or 1 if it was already running

   if(!fUnzipThread) {
      fActiveThread=kTRUE;
      fUnzipThread= new TThread("UnzipLoop", UnzipLoop, (void*) this);
      fUnzipThread->Run();
      return 0;
   }
   return 1;
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::StopThreadUnzip()
{
   // To stop the thread we only need to change the value of the variable
   // fActiveThread to false and the loop will stop (of course, we will have)
   // to do the cleaning after that.
   // Note: The syncronization part is important here or we will try to delete
   //       teh object while it's still processing the queue

   if(fUnzipThread){
      fActiveThread = kFALSE;
      SendSignal();
      if (fUnzipThread->Exists()) {
         fUnzipThread->Join();
      }
      return 0;
   }
   return 1;
}

//_____________________________________________________________________________
void TTreeCacheUnzip::WaitForSignal()
{
   // This is the counter part of SendSignal() and is used to wait for a buffer
   // that is in the queue and will be processed soon (instead of making a new
   // call)

   fUnzipCondition->Wait();
}

//_____________________________________________________________________________
void* TTreeCacheUnzip::UnzipLoop(void *arg)
{
   // This is a static function.
   // This is the call that will be executed in the Thread generated by
   // StartThreadTreeCacheUnzip... what we want to do is to inflate the next
   // series of buffers leaving them in the second cache.
   // Returns 0 when it finishes

   TTreeCacheUnzip *unzipMng = (TTreeCacheUnzip *)arg;
   TThread::SetCancelOn();
   TThread::SetCancelDeferred();

   while( unzipMng->IsActiveThread() ) {
      unzipMng->ProcessQueue();
      if(!unzipMng->IsActiveThread()) break;
      unzipMng->WaitForSignal();
   }
   return (void *)0;
}

///////////////////////////////////////////////////////////////////////////////
//                                                                           //
// From now on we have the method concerning the nuzipping part of the cache //
//                                                                           //
///////////////////////////////////////////////////////////////////////////////

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
{
   // Read the logical record header from the buffer buf.
   // That must be the pointer tho the header part not the object by itself and
   // must contain data of at least maxbytes
   // Returns nread;
   // In output arguments:
   //    nbytes : number of bytes in record
   //             if negative, this is a deleted record
   //             if 0, cannot read record, wrong value of argument first
   //    objlen : uncompressed object size
   //    keylen : length of logical record header
   // Note that the arguments objlen and keylen are returned only
   // if maxbytes >=16
   // Note: This was adapted from TFile... so some things dont apply

   Version_t versionkey;
   Short_t klen;
   UInt_t datime;
   Int_t nb = 0,olen;
   Int_t nread = maxbytes;
   frombuf(buf,&nb);
   nbytes = nb;
   if (nb < 0) return nread;
   //   const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
   const Int_t headerSize = 16;
   if (nread < headerSize) return nread;
   frombuf(buf, &versionkey);
   frombuf(buf, &olen);
   frombuf(buf, &datime);
   frombuf(buf, &klen);
   if (!olen) olen = nbytes-klen;
   objlen = olen;
   keylen = klen;
   return nread;
}

//_____________________________________________________________________________
void TTreeCacheUnzip::ResetCache()
{
   // This will delete the list of buffers that are in the unzipping cache
   // and will reset certain values in the cache.
   // This name is ambiguos because the method doesn't reset the whole cache,
   // only the part related to the unzipping
   // Note: This method is completely different from TTreeCache::ResetCache(),
   // in that method we were cleaning the prefetching buffer while here we
   // delete the information about the unzipping buffers
   R__LOCKGUARD(fMutexList);

   if (gDebug > 0)
      Info("GetUnzipBuffer", "Thread: %d -- Reseting the cache", TThread::SelfId());

   fUnzipStart  = fUnzipEnd = fUnzipNext = 0;
   fPosWrite    = 0;
   fNewTransfer = kTRUE;
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
{
   // We try to read a buffer that has already been unzipped
   // Returns -1 in case of read failure, 0 in case it's not in the
   // cache and n>0 in case read from cache (number of bytes copied).
   // pos and len are the original values as were passed to ReadBuffer
   // but instead we will return the inflated buffer.
   // Note!! : If *buf == 0 we will allocate the buffer and it will be the
   // responsability of the caller to free it... it is useful for example
   // to pass it to the creator of TBuffer

   if (fParallel){
      if ( fIsLearning ) {
         // We need to reset it for new transferences...
         ResetCache();
         TFileCacheRead::Prefetch(0,0);
      }
      // The modify the cache if it's in de middle of something (unzipping for example)
      R__LOCKGUARD(fMutexCache);

      // If this is the first time we get here since the last FillBuffer
      // it's probable that the information about the prefetched buffers is there
      // but it hasn't actually been transfered... Is this the best place to put it??
      if (fNseek > 0 && !fIsSorted) {
         Sort();

         // Then we use the vectored read to read everything now
         if (fFile->ReadBuffers(fBuffer,fPos,fLen,fNb)) return -1;
         fIsTransferred = kTRUE;

         // Try to start the unzipping thread since we just transfered the data
         this->SendSignal();
      }

      // be careful.. can be -1
      Int_t loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);

      if (loc >= 0 && (pos == fSeekSort[loc]) ) {
	 // We shouldn't have to "wait" so a long Lock is not be very bad for the performance
         fMutexList->Lock();

         if( (loc >= fUnzipStart) && (loc < fUnzipEnd )) {
            Long64_t locPos = fUnzipPos[loc]; // Gives the pos in the buffer
            Int_t    locLen = fUnzipLen[loc]; // Gives the size in the buffer

            fMutexBuffer->Lock();
            if(!(*buf)) {
               *buf = new char[locLen+1];
               *free = kTRUE;
            }
            memcpy(*buf,&fUnzipBuffer[locPos], locLen);

	    // We could pass the pointer to the buffer instead of allocating
            // the space and copying it. I tried that and the management needed
            // to keep track of the buffer is just too much, it's just cheaper to
            // do the copy.
	    //*buf = &fUnzipBuffer[locPos];
            //*free = kFALSE;
            fMutexBuffer->UnLock();
            fNFound++;

            // If this is the last buffer from the unzipping list,
            // send the signal to start unzipping the rest
            if((loc == fUnzipEnd - 1) && (loc == fUnzipNext - 1))
               this->SendSignal();

            // We found it and copied to the buffer... we are done
            fMutexList->UnLock();
            return locLen;
         }

         if (gDebug > 0)
            Info("GetUnzipBuffer", "NOT found in the unzip cahe fUnzipStart:%d, fUnzipEnd:%d, fUnzipNext: %d, pos:%lld, len:%d",
		 fUnzipStart, fUnzipEnd, fUnzipNext, pos, len);

         // If we are in a new batch of transfer... inform the thread in case it's sleeping
         if (loc > fUnzipNext)
            this->SendSignal();

         fMutexList->UnLock();
      }
   }

   char *comp = new char[len];
   Bool_t found = kFALSE;

   if (fNseek > 0 && !fIsSorted) {
      if (gDebug > 0)
         Info("GetUnzipBuffer", "This is a new transfer... must clean things up fNSeek:%d", fNseek);
      ResetCache();
   }

   if (TFileCacheRead::ReadBuffer(comp,pos,len) == 1){
      found = kTRUE;
   }
   if(!found) {
      //not found in cache. Do we need to fill the cache?
      Bool_t bufferFilled = FillBuffer();
      if (bufferFilled) {
         if (TFileCacheRead::ReadBuffer(comp,pos,len) == 1){
            found = kTRUE;
         }
      }
   }

   if (!found) {
      fFile->Seek(pos);
      if(fFile->ReadBuffer(comp, len)){
         Error("GetUnzipBuffer", "Thread: %d --  Error reading from TFile ... must go out", TThread::SelfId());
         delete [] comp;
         return -1;
      }
   }

   Int_t res = UnzipBuffer(buf, comp);
   *free = kTRUE;
   if (!fIsLearning) fNMissed++;

   if (comp) delete [] comp;
   return res;
}

//_____________________________________________________________________________
void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
{
   // Sets the size for the unzipping cache... by default it should be
   // two times the size of the prefetching cache

   fUnzipBufferSize = bufferSize;
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src)
{
   // UNzips a ROOT specific buffer... by reading the header at the beginning.
   // returns the size of the inflated buffer or -1 if error
   // Note!! : If *dest == 0 we will allocate the buffer and it will be the
   // responsability of the caller to free it... it is useful for example
   // to pass it to the creator of TBuffer
   // src is the original buffer with the record (header+compressed data)
   // *dest is the inflated buffer (including the header)
   Int_t  uzlen = 0;
   Bool_t alloc = kFALSE;

   // Here we read the header of the buffer
   const Int_t hlen=128;
   Int_t nbytes=0, objlen=0, keylen=0;
   GetRecordHeader(src, hlen, nbytes, objlen, keylen);

   if (gDebug > 1)
      Info("UnzipBuffer", "nbytes:%d, objlen:%d, keylen:%d  ", nbytes, objlen, keylen);

   if (!(*dest)) {
      *dest = new char[keylen+objlen];
      alloc = kTRUE;
   }
   // Must unzip the buffer
   // fSeekPos[ind]; adress of zipped buffer
   // fSeekLen[ind]; len of the zipped buffer
   // &fBuffer[fSeekPos[ind]]; memory address

   // This is similar to TBasket::ReadBasketBuffers
   Bool_t oldCase = objlen==nbytes-keylen
      && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
      && fFile->GetVersion()<=30401;

   if (objlen > nbytes-keylen || oldCase) {
      // /**/ Question? ... do we have to ask this
      //      for every buffer or the global fSkipZip is enough?
      //      can somebody else set it up?

      if (fSkipZip) {
         // Copy without unzipping
         memcpy(*dest, src, keylen);
         uzlen += keylen;

         memcpy(*dest, src + keylen, objlen);
         uzlen += objlen;

         return nbytes;
      }

      // Copy the key
      if (gDebug > 2)
         Info("UnzipBuffer", "Copy the key keylen:%d from src:%p to *dest:%p", keylen, src, *dest);

      memcpy(*dest, src, keylen);
      uzlen += keylen;

      char *objbuf = *dest + keylen;
      UChar_t *bufcur = (UChar_t *) (src + keylen);
      Int_t nin, nout, nbuf;
      Int_t noutot = 0;

      while (1) {
         nin  = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
         nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);

         if (gDebug > 2)
            Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
                 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);

         if (oldCase && (nin > objlen || nbuf > objlen)) {
            if (gDebug > 2)
               Info("UnzipBuffer", "oldcase objlen :%d ", objlen);

            //buffer was very likely not compressed in an old version
            memcpy( *dest + keylen, src + keylen, objlen);
            uzlen += objlen;
            return uzlen;
         }

         R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
         if (gDebug > 2)
            Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
                 nin, bufcur, nbuf, objbuf, nout);

         if (!nout) break;
         noutot += nout;
         if (noutot >= objlen) break;
         bufcur += nin;
         objbuf += nout;
      }

      if (noutot != objlen) {
         Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
               nbytes,keylen,objlen, noutot,nout,nin,nbuf);
         uzlen = -1;
         if(alloc) delete [] *dest;
         return uzlen;
      }
      uzlen += objlen;
   } else {
      memcpy(*dest, src, keylen);
      uzlen += keylen;
      memcpy(*dest + keylen, src + keylen, objlen);
      uzlen += objlen;
   }
   return uzlen;
}

//_____________________________________________________________________________
Int_t TTreeCacheUnzip::UnzipCache()
{
   // This inflates all the buffers in the cache.. passing the data to a new
   // buffer that will only wait there to be read...
   // We can not inflate all the buffers in the cache so we will try to do
   // it by parts... there is a member calle fUnzipBufferSize which will
   // tell us the size we can allocate for this cache so we will divide
   // the prefeteched cche in chunks of this size and we will try to unzip then
   // note that we will  unzip in the order they were put into the cache not
   // the order of the transference so it has to be read in that order or the
   // pre-unzipping will be useless.
   // pos and len are used to see where we have to start unzipping...
   // and for how long..
   // returns 0 in normal conditions or -1 if error

   if(!fIsTransferred) {
      if (gDebug > 0)
         Info("UnzipCache", "It is still in the learning phase");
      return 0;
   }

   // This is the first time we unzip the cache
   if (fUnzipBufferSize == 0) {
      // creating a new buffer
      SetUnzipBufferSize((Long64_t)(fgRelBuffSize*fBufferSize));

      fMutexBuffer->Lock();   //*** fMutexBuffer Lock
      fUnzipBuffer = new char[fUnzipBufferSize];
      fMutexBuffer->UnLock(); //*** fMutexBuffer Lock
      if (gDebug > 0)
         Info("UnzipCache", "Creating a buffer of %lld bytes ", fUnzipBufferSize);
   }
   if(fNseekMax < fNseek){
      if (gDebug > 0)
         Info("UnzipCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);

      fMutexList->Lock();  //*** fMutexList Lock
      Int_t *aUnzipPos = new Int_t[fNseek];
      Int_t *aUnzipLen = new Int_t[fNseek];

      for (Int_t i=0;i<fNseekMax;i++) {
         aUnzipPos[i] = fUnzipPos[i];
         aUnzipLen[i] = fUnzipLen[i];
      }

      if (fUnzipPos) delete [] fUnzipPos;
      if (fUnzipLen) delete [] fUnzipLen;

      fUnzipPos  = aUnzipPos;
      fUnzipLen  = aUnzipLen;
      fNseekMax  = fNseek;
      fMutexList->UnLock(); //*** fMutexList UnLock
   }
   fMutexList->Lock();  //*** fMutexList Lock
   Long64_t locPos = 0; // Local values for each buffer... change them together to ease sync.
   Int_t    locLen = 0;
   fNewTransfer    = kFALSE;  // abort unzipping if TFileCacheRead starts a new transfer

   // We will unzip all the buffers starting with fUnzipStart
   fUnzipStart = fUnzipEnd;

   Long64_t reqbuffer = 0;         // How much space do we need for this transfer list?
   Int_t    reqmax    = fUnzipEnd; // How many buffers?
   Int_t    unzipend  = fUnzipEnd; // starting fUnzipEnd since it will be changed later on
   for (Int_t reqi=unzipend;reqi<fNseek; reqi++) {
      // This for is inefficent since the onlz thing we want is to know how many
      // requests can fit our buffer and how big it's going to be... it migth be
      // better to just have a rough estimate but since that number is use in the first thread
      // I'm not sure about how to do it.

      const Int_t hlen=128;
      Int_t objlen=0, keylen=0;
      Int_t nbytes=0;
      GetRecordHeader(&fBuffer[fSeekPos[reqi]], hlen, nbytes, objlen, keylen);
      Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;

      // We need a protection here in case a buffer is bigger than
      // the whole unzipping cache... do it only at the first iteration
      // to guarantee that it indeed doesnt fit the cache...
      if ((reqi == unzipend) && (len > fUnzipBufferSize)) {
         if (gDebug > 0)
            Info("UnzipCache", "One buffer is too big resizing from:%d to len*2:%d", fUnzipBufferSize, len*2);

         fMutexBuffer->Lock();  //*** fMutexBuffer Lock
         char *newBuffer = new char[len*2];
         memcpy(newBuffer, fUnzipBuffer, fPosWrite);
         delete [] fUnzipBuffer;

         SetUnzipBufferSize((Long64_t)(len*2));
         fUnzipBuffer = newBuffer;
         fMutexBuffer->UnLock(); //*** fMutexBuffer UnLock
      }

      // if it is going to exceed the buffer size then stop
      if( (reqbuffer + len) > fUnzipBufferSize ){
         if (gDebug > 0)
            Info("UnzipCache", "Cache found the rigth size: %lld ... breaking ", reqbuffer );
         break;
      }
      reqmax++;
      reqbuffer += len;
   }
   fUnzipNext = reqmax;  // Our new goal

   if (gDebug > 0)
      Info("UnzipCache", "Cache found the rigth size fUnzipStart:%d, fUnzipEnd:%d, fUnzipNext: %d, fNseek: %d",
           fUnzipStart, fUnzipEnd, fUnzipNext, fNseek);

   fMutexList->UnLock();  //*** fMutexList UnLock

   for (Int_t reqi = unzipend; reqi<fNseek; reqi++) {
      if (!IsActiveThread() || !fNseek || fIsLearning || fNewTransfer || !fIsTransferred) {
         if (gDebug > 0)
            Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d, fNewTransfer:%d",
		 IsActiveThread(), fNseek, fIsLearning, fNewTransfer);
         return 0;
      }

      // This must have this lock because UnzipBuffer can access GetRecordHeader also
      const Int_t hlen=128;
      Int_t objlen=0, keylen=0;
      Int_t nbytes=0;
      GetRecordHeader(&fBuffer[fSeekPos[reqi]], hlen, nbytes, objlen, keylen);
      Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;

      // Don't do it for the first buffer since we should had done in the last cycle
      if (reqi > unzipend) {
         fMutexList->Lock();   //*** fMutexList Lock
         locPos = fUnzipPos[reqi-1] + fUnzipLen[reqi-1];
         locLen = 0; // just in case
         fMutexList->UnLock(); //*** fMutexList UnLock
      }

      // if it is going to exceed the buffer size then stop
      // this is not needed since now we calculate how many buffers fit the cache
      // keep it here just in case
      if( (locPos + len) > fUnzipBufferSize ){
         if (gDebug > 0)
            Info("UnzipCache", "Cache is full.. breaking i:%d, fUnzipBufferSize: %lld, locPos: %lld",
                 reqi, fUnzipBufferSize, locPos);
         break;
      }

      // Initially we had fBuffer with all the buffers to be unzipped... we took
      // one of then, we unzipped it and we put it in fUnzipBuffer. But the first
      // thread can start a transference at the same moment changing the content of fBuffer,
      // putting a lock on it is inneficient because we have to wait for the unzip to
      // start the transfer and I have seen that is better to just copy the data to a
      // "local variable"... but doing _new_ here wouldn't be very good also so
      // I use a preallocated buffer.
      if(fTmpBufferSz < fSeekSortLen[reqi]) {
         delete [] fTmpBuffer;
         fTmpBufferSz = fSeekSortLen[reqi]*2;
         fTmpBuffer = new char[fTmpBufferSz];
      }

      fMutexList->Lock();   // fMutexList Lock
      memcpy(fTmpBuffer, &fBuffer[fSeekPos[reqi]], fSeekSortLen[reqi]);
      fMutexList->UnLock(); // fMutexList UnLock

      char *ptr = &fUnzipBuffer[locPos];
      locLen = UnzipBuffer(&ptr, fTmpBuffer);

      R__LOCKGUARD(fMutexList); // fMutexList LOCK until going out of scope
      if (!IsActiveThread() || !fNseek || fIsLearning || fNewTransfer || !fIsTransferred)
         return 0;

      fPosWrite       = locPos + locLen;  // keep a pointer to the end of the buffer
      fUnzipPos[reqi] = locPos;
      fUnzipLen[reqi] = locLen;

      if (gDebug > 0)
         Info("UnzipCache", "reqi:%d, fSeekPos[reqi]:%d, fSeekSortLen[reqi]: %d, fUnzipPos[reqi]:%d, fUnzipLen[reqi]:%d",
	      reqi, fSeekPos[reqi], fSeekSortLen[reqi], fUnzipPos[reqi], fUnzipLen[reqi]);

      fUnzipEnd++;
      fNUnzip++;
   }
   return 0;
}


Last change: Wed Jun 25 08:54:19 2008
Last generated: 2008-06-25 08:54

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.