#include "RConfigure.h"
#include "RConfig.h"
#include "Riostream.h"
#ifdef WIN32
   #include <process.h>
   #include <io.h>
   typedef long off_t;
#endif
#include <errno.h>
#include <time.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifndef WIN32
#include <sys/wait.h>
#endif
#include <cstdlib>
#if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
    (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
     (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
#include <sys/file.h>
#define lockf(fd, op, sz)   flock((fd), (op))
#ifndef F_LOCK
#define F_LOCK             (LOCK_EX | LOCK_NB)
#endif
#ifndef F_ULOCK
#define F_ULOCK             LOCK_UN
#endif
#endif
#include "TProofServ.h"
#include "TDSetProxy.h"
#include "TEnv.h"
#include "TError.h"
#include "TEventList.h"
#include "TEntryList.h"
#include "TException.h"
#include "TFile.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TKey.h"
#include "TMessage.h"
#include "TVirtualPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TVirtualProofPlayer.h"
#include "TProofQueryResult.h"
#include "TQueryResultManager.h"
#include "TRegexp.h"
#include "TROOT.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TSystem.h"
#include "TTimeStamp.h"
#include "TUrl.h"
#include "TPluginManager.h"
#include "TObjString.h"
#include "compiledata.h"
#include "TProofResourcesStatic.h"
#include "TProofNodeInfo.h"
#include "TFileInfo.h"
#include "TMutex.h"
#include "TClass.h"
#include "TSQLServer.h"
#include "TSQLResult.h"
#include "TSQLRow.h"
#include "TPRegexp.h"
#include "TParameter.h"
#include "TMap.h"
#include "TSortedList.h"
#include "TParameter.h"
#include "TFileCollection.h"
#include "TLockFile.h"
#include "TProofDataSetManagerFile.h"
#include "TProofProgressStatus.h"
TProofServ *gProofServ = 0;
static volatile Int_t gProofServDebug = 1;
FILE *TProofServ::fgErrorHandlerFile = 0;
Int_t TProofServ::fgRecursive = 0;
class TProofServTerminationHandler : public TSignalHandler {
   TProofServ  *fServ;
public:
   TProofServTerminationHandler(TProofServ *s)
      : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
   Bool_t  Notify();
};
Bool_t TProofServTerminationHandler::Notify()
{
   
   Printf("Received SIGTERM: terminating");
   fServ->HandleTermination();
   return kTRUE;
}
class TProofServInterruptHandler : public TSignalHandler {
   TProofServ  *fServ;
public:
   TProofServInterruptHandler(TProofServ *s)
      : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
   Bool_t  Notify();
};
Bool_t TProofServInterruptHandler::Notify()
{
   
   fServ->HandleUrgentData();
   if (TROOT::Initialized()) {
      Throw(GetSignal());
   }
   return kTRUE;
}
class TProofServSigPipeHandler : public TSignalHandler {
   TProofServ  *fServ;
public:
   TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
      { fServ = s; }
   Bool_t  Notify();
};
Bool_t TProofServSigPipeHandler::Notify()
{
   
   fServ->HandleSigPipe();
   return kTRUE;
}
class TProofServInputHandler : public TFileHandler {
   TProofServ  *fServ;
public:
   TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
      { fServ = s; }
   Bool_t Notify();
   Bool_t ReadNotify() { return Notify(); }
};
Bool_t TProofServInputHandler::Notify()
{
   
   fServ->HandleSocketInput();
   return kTRUE;
}
TString TProofServLogHandler::fgPfx = ""; 
TProofServLogHandler::TProofServLogHandler(const char *cmd,
                                             TSocket *s, const char *pfx)
                     : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
{
   
   ResetBit(kFileIsPipe);
   fFile = 0;
   if (s && cmd) {
      fFile = gSystem->OpenPipe(cmd, "r");
      if (fFile) {
         SetFd(fileno(fFile));
         
         Notify();
         
         SetBit(kFileIsPipe);
      } else {
         fSocket = 0;
         Error("TProofServLogHandler", "executing command in pipe");
      }
   } else {
      Error("TProofServLogHandler",
            "undefined command (%p) or socket (%p)", (int *)cmd, s);
   }
}
TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s, const char *pfx)
                     : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
{
   
   ResetBit(kFileIsPipe);
   fFile = 0;
   if (s && f) {
      fFile = f;
      SetFd(fileno(fFile));
      
      Notify();
   } else {
      Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
   }
}
TProofServLogHandler::~TProofServLogHandler()
{
   
   if (TestBit(kFileIsPipe) && fFile)
      gSystem->ClosePipe(fFile);
   fFile = 0;
   fSocket = 0;
   ResetBit(kFileIsPipe);
}
Bool_t TProofServLogHandler::Notify()
{
   
   if (IsValid()) {
      TMessage m(kPROOF_MESSAGE);
      
      char line[4096];
      char *plf = 0;
      while (fgets(line, sizeof(line), fFile)) {
         if ((plf = strchr(line, '\n')))
            *plf = 0;
         
         TString log;
         if (fPfx.Length() > 0) {
            
            log.Form("%s: %s", fPfx.Data(), line);
         } else if (fgPfx.Length() > 0) {
            
            log.Form("%s: %s", fgPfx.Data(), line);
         } else {
            
            log = line;
         }
         
         m.Reset(kPROOF_MESSAGE);
         m << log;
         fSocket->Send(m);
      }
   }
   return kTRUE;
}
void TProofServLogHandler::SetDefaultPrefix(const char *pfx)
{
   
   fgPfx = pfx;
}
TProofServLogHandlerGuard::TProofServLogHandlerGuard(const char *cmd, TSocket *s,
                                                     const char *pfx, Bool_t on)
{
   
   fExecHandler = 0;
   if (cmd && on) {
      fExecHandler = new TProofServLogHandler(cmd, s, pfx);
      if (fExecHandler->IsValid()) {
         gSystem->AddFileHandler(fExecHandler);
      } else {
         Error("TProofServLogHandlerGuard","invalid handler");
      }
   } else {
      if (on)
         Error("TProofServLogHandlerGuard","undefined command");
   }
}
TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
                                                     const char *pfx, Bool_t on)
{
   
   fExecHandler = 0;
   if (f && on) {
      fExecHandler = new TProofServLogHandler(f, s, pfx);
      if (fExecHandler->IsValid()) {
         gSystem->AddFileHandler(fExecHandler);
      } else {
         Error("TProofServLogHandlerGuard","invalid handler");
      }
   } else {
      if (on)
         Error("TProofServLogHandlerGuard","undefined file");
   }
}
TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
{
   
   if (fExecHandler && fExecHandler->IsValid()) {
      gSystem->RemoveFileHandler(fExecHandler);
      SafeDelete(fExecHandler);
   }
}
Bool_t TShutdownTimer::Notify()
{
   
   
   if (gDebug > 0)
      Info ("Notify","checking activity on the input socket");
   
   TSocket *xs = 0;
   if (fProofServ && (xs = fProofServ->GetSocket())) {
      TTimeStamp now;
      TTimeStamp ts = xs->GetLastUsage();
      Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
                  (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
      Int_t to = gEnv->GetValue("ProofServ.ShutdonwTimeout", 20);
      if (dt > to * 60000) {
         Printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
                         " during the last %d mins: aborting", xs, to);
         
         
         gSystem->Abort();
      } else {
         if (gDebug > 0)
            Info("Notify", "input socket: %p: show activity"
                           " %ld secs ago", xs, dt / 60000);
      }
   }
   Start(-1, kFALSE);
   return kTRUE;
}
TReaperTimer::~TReaperTimer()
{
   
   if (fChildren) {
      fChildren->SetOwner(kTRUE);
      delete fChildren;
      fChildren = 0;
   }
}
void TReaperTimer::AddPid(Int_t pid)
{
   
   if (pid > 0) {
      if (!fChildren)
         fChildren = new TList;
      TString spid;
      spid.Form("%d", pid);
      fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
      TurnOn();
   }
}
Bool_t TReaperTimer::Notify()
{
   
   
   if (fChildren) {
      TIter nxp(fChildren);
      TParameter<Int_t> *p = 0;
      while ((p = (TParameter<Int_t> *)nxp())) {
         int status;
#ifndef WIN32
         pid_t pid;
         do {
            pid = waitpid(p->GetVal(), &status, WNOHANG);
         } while (pid < 0 && errno == EINTR);
#else
         intptr_t pid;
         pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
#endif
         if (pid > 0 && pid == p->GetVal()) {
            
            fChildren->Remove(p);
            delete p;
         }
      }
   }
   
   if (fChildren->GetSize() <= 0) {
      Stop();
   } else {
      
      Reset();
   }
   return kTRUE;
}
ClassImp(TProofServ)
extern "C" {
   TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
   { return new TProofServ(argc, argv, flog); }
}
TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
       : TApplication("proofserv", argc, argv, 0, -1)
{
   
   
   
   
   
   TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
                                                  : "session.rootrc";
   if (!gSystem->AccessPathName(rcfile, kReadPermission))
      gEnv->ReadFile(rcfile, kEnvChange);
   
   fVirtMemHWM = -1;
   fVirtMemMax = -1;
   if (gSystem->Getenv("ROOTPROOFASSOFT")) {
      Long_t hwm = strtol(gSystem->Getenv("ROOTPROOFASSOFT"), 0, 10);
      if (hwm < kMaxLong && hwm > 0)
         fVirtMemHWM = hwm * 1024;
   }
   if (gSystem->Getenv("ROOTPROOFASHARD")) {
      Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
      if (mmx < kMaxLong && mmx > 0)
         fVirtMemMax = mmx * 1024;
   }
   
   Bool_t test = (*argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
   if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
       (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
      while (gProofServDebug)
         ;
   }
   
   if (*argc >= 4)
      if (!strcmp(argv[3], "test"))
         fService = "prooftest";
   
   if (*argc < 2) {
      Error("TProofServ", "Must have at least 1 arguments (see  proofd).");
      exit(1);
   }
   
   gProofServ = this;
   
   fLogToSysLog     = (gEnv->GetValue("ProofServ.LogToSysLog", 0) != 0) ? kTRUE : kFALSE;
   fSendLogToMaster = kFALSE;
   
   gErrorAbortLevel = kSysError + 1;
   SetErrorHandlerFile(stderr);
   SetErrorHandler(ErrorHandler);
   fNcmd            = 0;
   fGroupPriority   = 100;
   fInterrupt       = kFALSE;
   fProtocol        = 0;
   fOrdinal         = gEnv->GetValue("ProofServ.Ordinal", "-1");
   fGroupId         = -1;
   fGroupSize       = 0;
   fRealTime        = 0.0;
   fCpuTime         = 0.0;
   fProof           = 0;
   fPlayer          = 0;
   fSocket          = 0;
   fEnabledPackages = new TList;
   fEnabledPackages->SetOwner();
   fGlobalPackageDirList = 0;
   fLogFile         = flog;
   fLogFileDes      = -1;
   fArchivePath     = "";
   
   fPackageLock     = 0;
   fCacheLock       = 0;
   fQueryLock       = 0;
   fQMgr            = 0;
   fWaitingQueries  = new TList;
   fIdle            = kTRUE;
   fQueuedMsg       = new TList;
   fRealTimeLog     = kFALSE;
   fShutdownTimer   = 0;
   fReaperTimer     = 0;
   fInflateFactor   = 1000;
   fDataSetManager  = 0; 
   fInputHandler    = 0;
   
   fMaxQueries      = -1;
   fMaxBoxSize      = -1;
   fHWMBoxSize      = -1;
   gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
   fLogLevel = gProofDebugLevel;
   gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue("Proof.DebugMask",~0);
   if (gProofDebugLevel > 0)
      Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
   
   GetOptions(argc, argv);
   
   fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
   if (fOrdinal != "-1")
      fPrefix += fOrdinal;
   TProofServLogHandler::SetDefaultPrefix(fPrefix);
}
Int_t TProofServ::CreateServer()
{
   
   
   
   
   TString opensock = gSystem->Getenv("ROOTOPENSOCK");
   if (opensock.Length() <= 0)
      opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
   Int_t sock = opensock.Atoi();
   if (sock <= 0) {
      Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
      return -1;
   }
   fSocket = new TSocket(sock);
   
   if (IsMaster()) {
      
      if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
         while (gProofServDebug)
            ;
      }
   } else {
      
      if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
         while (gProofServDebug)
            ;
      }
   }
   if (gProofDebugLevel > 0)
      Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
           GetService(), GetConfDir(), (Int_t)fMasterServ);
   if (Setup() != 0) {
      
      LogToMaster();
      SendLogFile();
      Terminate(0);
      return -1;
   }
   
   
   
   TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
   pfx += GetOrdinal();
   TProofServLogHandler::SetDefaultPrefix(pfx);
   if (!fLogFile) {
      RedirectOutput();
      
      
      if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
         LogToMaster();
         SendLogFile(-98);
         Terminate(0);
         return -1;
      }
   } else {
      
      if ((fLogFileDes = fileno(fLogFile)) < 0) {
         LogToMaster();
         SendLogFile(-98);
         Terminate(0);
         return -1;
      }
   }
   
   if (IsMaster()) {
      if (CatMotd() == -1) {
         LogToMaster();
         SendLogFile(-99);
         Terminate(0);
         return -1;
      }
   }
   
   ProcessLine("#include <iostream>", kTRUE);
   ProcessLine("#include <_string>",kTRUE); 
   
   ProcessLine("#include <RtypesCint.h>", kTRUE);
   
   ProcessLine("#define ROOT_Rtypes 0", kTRUE);
   ProcessLine("#define ROOT_TError 0", kTRUE);
   ProcessLine("#define ROOT_TGenericClassInfo 0", kTRUE);
   
   
   
   
   const char *logon;
   logon = gEnv->GetValue("Proof.Load", (char *)0);
   if (logon) {
      char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
      if (mac)
         ProcessLine(Form(".L %s", logon), kTRUE);
      delete [] mac;
   }
   
   logon = gEnv->GetValue("Proof.Logon", (char *)0);
   if (logon && !NoLogOpt()) {
      char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
      if (mac)
         ProcessFile(logon);
      delete [] mac;
   }
   
   gInterpreter->SaveContext();
   gInterpreter->SaveGlobalsContext();
   
   gSystem->AddSignalHandler(new TProofServTerminationHandler(this));
   gSystem->AddSignalHandler(new TProofServInterruptHandler(this));
   fInputHandler = new TProofServInputHandler(this, sock);
   gSystem->AddFileHandler(fInputHandler);
   
   if (IsMaster()) {
      TString master = "proof://__master__";
      TInetAddress a = gSystem->GetSockName(sock);
      if (a.IsValid()) {
         master += ":";
         master += a.GetPort();
      }
      
      TPluginManager *pm = gROOT->GetPluginManager();
      if (!pm) {
         Error("CreateServer", "no plugin manager found");
         SendLogFile(-99);
         Terminate(0);
         return -1;
      }
      
      TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
      if (!h) {
         Error("CreateServer", "no plugin found for TProof with a"
                             " config file of '%s'", fConfFile.Data());
         SendLogFile(-99);
         Terminate(0);
         return -1;
      }
      
      if (h->LoadPlugin() == -1) {
         Error("CreateServer", "plugin for TProof could not be loaded");
         SendLogFile(-99);
         Terminate(0);
         return -1;
      }
      
      fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
                                                          fConfFile.Data(),
                                                          GetConfDir(),
                                                          fLogLevel, 0));
      if (!fProof || !fProof->IsValid()) {
         Error("CreateServer", "plugin for TProof could not be executed");
         SafeDelete(fProof);
         SendLogFile(-99);
         Terminate(0);
         return -1;
      }
      
      fEndMaster = fProof->IsEndMaster();
      SendLogFile();
   }
   
   if (!fShutdownTimer) {
      
      fShutdownTimer = new TShutdownTimer(this, 300000);
      fShutdownTimer->Start(-1, kFALSE);
   }
   
   return 0;
}
TProofServ::~TProofServ()
{
   
   
   SafeDelete(fWaitingQueries);
   SafeDelete(fEnabledPackages);
   SafeDelete(fSocket);
   SafeDelete(fPackageLock);
   SafeDelete(fCacheLock);
   SafeDelete(fQueryLock);
   SafeDelete(fGlobalPackageDirList);
   close(fLogFileDes);
}
Int_t TProofServ::CatMotd()
{
   
   
   
   
   
   TString lastname;
   FILE   *motd;
   Bool_t  show = kFALSE;
   
   TString motdname(GetConfDir());
   
   
   if (gSystem->Getenv("PROOFNOPROOF")) {
      motdname = gSystem->Getenv("PROOFNOPROOF");
   } else {
      motdname += "/etc/proof/noproof";
   }
   if ((motd = fopen(motdname, "r"))) {
      Int_t c;
      printf("\n");
      while ((c = getc(motd)) != EOF)
         putchar(c);
      fclose(motd);
      printf("\n");
      return -1;
   }
   
   lastname = TString(GetWorkDir()) + "/.prooflast";
   char *last = gSystem->ExpandPathName(lastname.Data());
   Long64_t size;
   Long_t id, flags, modtime, lasttime;
   if (gSystem->GetPathInfo(last, &id, &size, &flags, &lasttime) == 1)
      lasttime = 0;
   
   if (time(0) - lasttime > (time_t)86400)
      show = kTRUE;
   
   
   if (gSystem->Getenv("PROOFMOTD")) {
      motdname = gSystem->Getenv("PROOFMOTD");
   } else {
      motdname = GetConfDir();
      motdname += "/etc/proof/motd";
   }
   if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
      if (modtime > lasttime || show) {
         if ((motd = fopen(motdname, "r"))) {
            Int_t c;
            printf("\n");
            while ((c = getc(motd)) != EOF)
               putchar(c);
            fclose(motd);
            printf("\n");
         }
      }
   }
   if (lasttime)
      gSystem->Unlink(last);
   Int_t fd = creat(last, 0600);
   close(fd);
   delete [] last;
   return 0;
}
TObject *TProofServ::Get(const char *namecycle)
{
   
   
   
   fSocket->Send(namecycle, kPROOF_GETOBJECT);
   TObject *idcur = 0;
   Bool_t notdone = kTRUE;
   while (notdone) {
      TMessage *mess = 0;
      if (fSocket->Recv(mess) < 0)
         return 0;
      Int_t what = mess->What();
      if (what == kMESS_OBJECT) {
         idcur = mess->ReadObject(mess->GetClass());
         notdone = kFALSE;
      } else {
         Int_t xrc = HandleSocketInput(mess, kFALSE);
         if (xrc == -1) {
            Error("Get", "command %d cannot be executed while processing", what);
         } else if (xrc == -2) {
            Error("Get", "unknown command %d ! Protocol error?", what);
         }
      }
      delete mess;
   }
   return idcur;
}
TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
{
   
   Long64_t bytesRead = 0;
   if (gPerfStats != 0) bytesRead = gPerfStats->GetBytesRead();
   if (fCompute.Counter() > 0)
      fCompute.Stop();
   TMessage req(kPROOF_GETPACKET);
   Double_t cputime = fCompute.CpuTime();
   Double_t realtime = fCompute.RealTime();
   
   PDB(kLoop, 2)
      Info("GetNextPacket", "inflate factor: %d"
                            " (realtime: %f, cputime: %f, entries: %lld)",
                            fInflateFactor, realtime, cputime, totalEntries);
   if (fInflateFactor > 1000) {
      UInt_t sleeptime = (UInt_t) (cputime * (fInflateFactor - 1000)) ;
      Int_t i = 0;
      for (i = kSigBus ; i <= kSigUser2 ; i++)
         gSystem->IgnoreSignal((ESignals)i, kTRUE);
      gSystem->Sleep(sleeptime);
      for (i = kSigBus ; i <= kSigUser2 ; i++)
         gSystem->IgnoreSignal((ESignals)i, kFALSE);
      realtime += sleeptime / 1000.;
      PDB(kLoop, 2)
         Info("GetNextPacket","slept %d millisec", sleeptime);
   }
   if (fProtocol > 18) {
      req << fLatency.RealTime();
      TProofProgressStatus *status = 0;
      if (fPlayer)
         status = fPlayer->GetProgressStatus();
      else {
         Error("GetNextPacket", "No progress status object");
         return 0;
      }
      
      
      status->IncProcTime(realtime);
      status->IncCPUTime(cputime);
      req << status;
      status = 0; 
   } else {
      req << fLatency.RealTime() << realtime << cputime
          << bytesRead << totalEntries;
      if (fPlayer)
         req << fPlayer->GetEventsProcessed();
   }
   fLatency.Start();
   Int_t rc = fSocket->Send(req);
   if (rc <= 0) {
      Error("GetNextPacket","Send() failed, returned %d", rc);
      return 0;
   }
   TDSetElement  *e = 0;
   Bool_t notdone = kTRUE;
   while (notdone) {
      TMessage *mess;
      if ((rc = fSocket->Recv(mess)) <= 0) {
         fLatency.Stop();
         Error("GetNextPacket","Recv() failed, returned %d", rc);
         return 0;
      }
      Int_t xrc = 0;
      TString file, dir, obj;
      Int_t what = mess->What();
      switch (what) {
         case kPROOF_GETPACKET:
            fLatency.Stop();
            (*mess) >> e;
            if (e != 0) {
               fCompute.Start();
               PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
                                 e->GetFileName(), e->GetDirectory(),
                                 e->GetObjName(), e->GetFirst(),e->GetNum());
            } else {
               PDB(kLoop, 2) Info("GetNextPacket", "Done");
            }
            notdone = kFALSE;
            break;
         case kPROOF_STOPPROCESS:
            
            
            
            fLatency.Stop();
            PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
            break;
         default:
            xrc = HandleSocketInput(mess, kFALSE);
            if (xrc == -1) {
               Error("GetNextPacket", "command %d cannot be executed while processing", what);
            } else if (xrc == -2) {
               Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
            }
            break;
      }
      delete mess;
   }
   
   return e;
}
void TProofServ::GetOptions(Int_t *argc, char **argv)
{
   
   
   if (*argc <= 1) {
      Fatal("GetOptions", "Must be started from proofd with arguments");
      exit(1);
   }
   if (!strcmp(argv[1], "proofserv")) {
      fMasterServ = kTRUE;
      fEndMaster = kTRUE;
   } else if (!strcmp(argv[1], "proofslave")) {
      fMasterServ = kFALSE;
      fEndMaster = kFALSE;
   } else {
      Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
      exit(1);
   }
   fService = argv[1];
   
   if (!(gSystem->Getenv("ROOTCONFDIR"))) {
      Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
      exit(1);
   }
   fConfDir = gSystem->Getenv("ROOTCONFDIR");
}
void TProofServ::HandleSocketInput()
{
   
   Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
   fgRecursive++;
   
   TMessage *mess;
   if (fSocket->Recv(mess) <= 0 || !mess) {
      
      
      Error("HandleSocketInput", "retrieving message from input socket");
      Terminate(0);
      return;
   }
   Int_t what = mess->What();
   PDB(kGlobal, 1)
      Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
   
   Bool_t parallel = IsParallel();
   fNcmd++;
   if (fProof) fProof->SetActive();
   Bool_t doit = kTRUE;
   Int_t rc = 0;
   while (doit) {
      
      rc = HandleSocketInput(mess, all);
      if (rc < 0) {
         TString emsg;
         if (rc == -1) {
            emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
         } else if (rc == -3) {
            emsg.Form("HandleSocketInput: message undefined ! Protocol error?", what);
         } else {
            emsg.Form("HandleSocketInput: unknown command %d ! Protocol error?", what);
         }
         SendAsynMessage(emsg.Data());
      } else if (rc == 2) {
         
         fQueuedMsg->Add(mess);
         PDB(kGlobal, 1)
            Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
                                       mess->What(), fQueuedMsg->GetSize());
         mess = 0;
      }
      
      doit = 0;
      if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
         
         PDB(kGlobal, 1)
            Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
                                      mess->What(), fQueuedMsg->GetSize());
         all = 1;
         SafeDelete(mess);
         mess = (TMessage *) fQueuedMsg->First();
         fQueuedMsg->Remove(mess);
         doit = 1;
      }
   }
   fgRecursive--;
   if (fProof) {
      
      
      if (rc == 0 && parallel != IsParallel()) {
         SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
         Terminate(0);
      }
      fProof->SetActive(kFALSE);
      
      fProof->SetRunStatus(TProof::kRunning);
   }
   
   SafeDelete(mess);
}
Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
{
   
   
   
   
   
   static TStopwatch timer;
   char str[2048];
   Bool_t aborted = kFALSE;
   if (!mess) return -3;
   Int_t what = mess->What();
   PDB(kGlobal, 1)
      Info("HandleSocketInput", "processing message type %d from '%s'",
                                what, fSocket->GetTitle());
   timer.Start();
   Int_t rc = 0;
   switch (what) {
      case kMESS_CINT:
         if (all) {
            mess->ReadString(str, sizeof(str));
            if (IsParallel()) {
               fProof->SendCommand(str);
            } else {
               PDB(kGlobal, 1)
                  Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
               ProcessLine(str);
            }
            LogToMaster();
         } else {
            rc = -1;
         }
         SendLogFile();
         break;
      case kMESS_STRING:
         if (all) {
            mess->ReadString(str, sizeof(str));
         } else {
            rc = -1;
         }
         break;
      case kMESS_OBJECT:
         if (all) {
            mess->ReadObject(mess->GetClass());
         } else {
            rc = -1;
         }
         break;
      case kPROOF_GROUPVIEW:
         if (all) {
            mess->ReadString(str, sizeof(str));
            sscanf(str, "%d %d", &fGroupId, &fGroupSize);
         } else {
            rc = -1;
         }
         break;
      case kPROOF_LOGLEVEL:
         {  UInt_t mask;
            mess->ReadString(str, sizeof(str));
            sscanf(str, "%d %u", &fLogLevel, &mask);
            gProofDebugLevel = fLogLevel;
            gProofDebugMask  = (TProofDebug::EProofDebugMask) mask;
            if (IsMaster())
               fProof->SetLogLevel(fLogLevel, mask);
         }
         break;
      case kPROOF_PING:
         {  if (IsMaster())
               fProof->Ping();
            
         }
         break;
      case kPROOF_PRINT:
         mess->ReadString(str, sizeof(str));
         Print(str);
         LogToMaster();
         SendLogFile();
         break;
      case kPROOF_RESET:
         if (all) {
            mess->ReadString(str, sizeof(str));
            Reset(str);
         } else {
            rc = -1;
         }
         break;
      case kPROOF_STATUS:
         Warning("HandleSocketInput:kPROOF_STATUS",
               "kPROOF_STATUS message is obsolete");
         fSocket->Send(fProof->GetParallel(), kPROOF_STATUS);
         break;
      case kPROOF_GETSTATS:
         SendStatistics();
         break;
      case kPROOF_GETPARALLEL:
         SendParallel();
         break;
      case kPROOF_STOP:
         if (all) {
            if (IsMaster()) {
               TString ord;
               *mess >> ord;
               PDB(kGlobal, 1)
                  Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
               if (fProof) fProof->TerminateWorker(ord);
            } else {
               PDB(kGlobal, 1)
                  Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
               Terminate(0);
            }
         } else {
            rc = -1;
         }
         break;
      case kPROOF_STOPPROCESS:
         if (all) {
            
            
            
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
         } else {
            Long_t timeout = -1;
            (*mess) >> aborted;
            if (fProtocol > 9)
               (*mess) >> timeout;
            PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_STOPPROCESS",
                    "recursive mode: enter %d, %d", aborted, timeout);
            if (fProof)
               
               fProof->StopProcess(aborted, timeout);
            else
               
               if (fPlayer)
                  fPlayer->StopProcess(aborted, timeout);
         }
         break;
      case kPROOF_PROCESS:
         {
            TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
            HandleProcess(mess);
            
            SendLogFile();
         }
         break;
      case kPROOF_QUERYLIST:
         {
            HandleQueryList(mess);
            
            SendLogFile();
         }
         break;
      case kPROOF_REMOVE:
         {
            HandleRemove(mess);
            
            SendLogFile();
         }
         break;
      case kPROOF_RETRIEVE:
         {
            HandleRetrieve(mess);
            
            SendLogFile();
         }
         break;
      case kPROOF_ARCHIVE:
         {
            HandleArchive(mess);
            
            SendLogFile();
         }
         break;
      case kPROOF_MAXQUERIES:
         {  PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
            TMessage m(kPROOF_MAXQUERIES);
            m << fMaxQueries;
            fSocket->Send(m);
            
            SendLogFile();
         }
         break;
      case kPROOF_CLEANUPSESSION:
         if (all) {
            PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
            TString stag;
            (*mess) >> stag;
            if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
               Printf("Session %s cleaned up", stag.Data());
            } else {
               Printf("Could not cleanup session %s", stag.Data());
            }
         } else {
            rc = -1;
         }
         
         SendLogFile();
         break;
      case kPROOF_GETENTRIES:
         {  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
            Bool_t         isTree;
            TString        filename;
            TString        dir;
            TString        objname("undef");
            Long64_t       entries = -1;
            if (all) {
               (*mess) >> isTree >> filename >> dir >> objname;
               PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
                                    "Report size of object %s (%s) in dir %s in file %s",
                                    objname.Data(), isTree ? "T" : "O",
                                    dir.Data(), filename.Data());
               entries = TDSet::GetEntries(isTree, filename, dir, objname);
               PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
                                    "Found %lld %s", entries, isTree ? "entries" : "objects");
            } else {
               rc = -1;
            }
            TMessage answ(kPROOF_GETENTRIES);
            answ << entries << objname;
            SendLogFile(); 
            fSocket->Send(answ);
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
         }
         break;
      case kPROOF_CHECKFILE:
         if (!all && fProtocol <= 19) {
            
            rc = 2;
         } else {
            
            HandleCheckFile(mess);
         }
         break;
      case kPROOF_SENDFILE:
         if (!all && fProtocol <= 19) {
            
            rc = 2;
         } else {
            mess->ReadString(str, sizeof(str));
            Long_t size;
            Int_t  bin, fw = 1;
            char   name[1024];
            if (fProtocol > 5)
               sscanf(str, "%s %d %ld %d", name, &bin, &size, &fw);
            else
               sscanf(str, "%s %d %ld", name, &bin, &size);
            TString fnam(name);
            Bool_t copytocache = kTRUE;
            if (fnam.BeginsWith("cache:")) {
               fnam.ReplaceAll("cache:", Form("%s/", fCacheDir.Data()));
               copytocache = kFALSE;
            }
            ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
            
            if (copytocache && size > 0 &&
                strncmp(fPackageDir, name, fPackageDir.Length()))
               CopyToCache(name, 0);
            if (IsMaster() && fw == 1) {
               Int_t opt = TProof::kForward | TProof::kCp;
               if (bin)
                  opt |= TProof::kBinary;
               
               
               fProof->SendFile(fnam, opt, (copytocache ? "cache" : ""));
            }
            if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
         }
         break;
      case kPROOF_LOGFILE:
         {
            Int_t start, end;
            (*mess) >> start >> end;
            PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_LOGFILE",
                    "Logfile request - byte range: %d - %d", start, end);
            LogToMaster();
            SendLogFile(0, start, end);
         }
         break;
      case kPROOF_PARALLEL:
         if (all) {
            if (IsMaster()) {
               Int_t nodes;
               Bool_t random = kFALSE;
               (*mess) >> nodes;
               if ((mess->BufferSize() > mess->Length()))
                  (*mess) >> random;
               if (fProof) fProof->SetParallel(nodes, random);
               rc = 1;
            }
         } else {
            rc = -1;
         }
         
         SendLogFile();
         break;
      case kPROOF_CACHE:
         if (!all && fProtocol <= 19) {
            
            rc = 2;
         } else {
            TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
            Int_t status = HandleCache(mess);
            
            SendLogFile(status);
         }
         break;
      case kPROOF_WORKERLISTS:
         if (all) {
            if (IsMaster())
               HandleWorkerLists(mess);
            else
               Warning("HandleSocketInput:kPROOF_WORKERLISTS",
                       "Action meaning-less on worker nodes: protocol error?");
         } else {
            rc = -1;
         }
         
         SendLogFile();
         break;
      case kPROOF_GETSLAVEINFO:
         if (all) {
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
            if (IsMaster()) {
               TList *info = fProof->GetListOfSlaveInfos();
               TMessage answ(kPROOF_GETSLAVEINFO);
               answ << info;
               fSocket->Send(answ);
            } else {
               TMessage answ(kPROOF_GETSLAVEINFO);
               answ << (TList *)0;
               fSocket->Send(answ);
            }
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
         } else {
            TMessage answ(kPROOF_GETSLAVEINFO);
            answ << (TList *)0;
            fSocket->Send(answ);
            rc = -1;
         }
         break;
      case kPROOF_GETTREEHEADER:
         if (all) {
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
            TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
            p->HandleGetTreeHeader(mess);
            delete p;
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
         } else {
            TMessage answ(kPROOF_GETTREEHEADER);
            answ << TString("Failed") << (TObject *)0;
            fSocket->Send(answ);
            rc = -1;
         }
         break;
      case kPROOF_GETOUTPUTLIST:
         {  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
            TList* outputList = 0;
            if (IsMaster()) {
               outputList = fProof->GetOutputList();
               if (!outputList)
                  outputList = new TList();
            } else {
               outputList = new TList();
               if (fProof->GetPlayer()) {
                  TList *olist = fProof->GetPlayer()->GetOutputList();
                  TIter next(olist);
                  TObject *o;
                  while ( (o = next()) ) {
                     outputList->Add(new TNamed(o->GetName(), ""));
                  }
               }
            }
            outputList->SetOwner();
            TMessage answ(kPROOF_GETOUTPUTLIST);
            answ << outputList;
            fSocket->Send(answ);
            delete outputList;
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
         }
         break;
      case kPROOF_VALIDATE_DSET:
         if (all) {
            PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
            TDSet* dset = 0;
            (*mess) >> dset;
            if (IsMaster()) fProof->ValidateDSet(dset);
            else dset->Validate();
            TMessage answ(kPROOF_VALIDATE_DSET);
            answ << dset;
            fSocket->Send(answ);
            delete dset;
            PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
         } else {
            rc = -1;
         }
         
         SendLogFile();
         break;
      case kPROOF_DATA_READY:
         if (all) {
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
            TMessage answ(kPROOF_DATA_READY);
            if (IsMaster()) {
               Long64_t totalbytes = 0, bytesready = 0;
               Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
               answ << dataready << totalbytes << bytesready;
            } else {
               Error("HandleSocketInput:kPROOF_DATA_READY",
                     "This message should not be sent to slaves");
               answ << kFALSE << Long64_t(0) << Long64_t(0);
            }
            fSocket->Send(answ);
            PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
         } else {
            TMessage answ(kPROOF_DATA_READY);
            answ << kFALSE << Long64_t(0) << Long64_t(0);
            fSocket->Send(answ);
            rc = -1;
         }
         
         SendLogFile();
         break;
      case kPROOF_DATASETS:
         {  Int_t xrc = -1;
            if (fProtocol > 16) {
               xrc = HandleDataSets(mess);
            } else {
               Error("HandleProcess", "old client: no or incompatible dataset support");
            }
            SendLogFile(xrc);
         }
         break;
      case kPROOF_LIB_INC_PATH:
         if (all) {
            HandleLibIncPath(mess);
         } else {
            rc = -1;
         }
         
         SendLogFile();
         break;
      case kPROOF_REALTIMELOG:
         {  Bool_t on;
            (*mess) >> on;
            PDB(kGlobal, 1)
               Info("HandleSocketInput:kPROOF_REALTIMELOG",
                    "setting real-time logging %s", (on ? "ON" : "OFF"));
            fRealTimeLog = on;
            
            if (IsMaster())
               fProof->SetRealTimeLog(on);
         }
         break;
      case kPROOF_FORK:
         if (all) {
            HandleFork(mess);
            LogToMaster();
         } else {
            rc = -1;
         }
         SendLogFile();
         break;
      default:
         Error("HandleSocketInput", "unknown command %d", what);
         rc = -2;
         break;
   }
   fRealTime += (Float_t)timer.RealTime();
   fCpuTime  += (Float_t)timer.CpuTime();
   
   return rc;
}
void TProofServ::HandleUrgentData()
{
   
   char  oob_byte;
   Int_t n, nch, wasted = 0;
   const Int_t kBufSize = 1024;
   char waste[kBufSize];
   
   TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
   PDB(kGlobal, 5)
      Info("HandleUrgentData", "handling oob...");
   
   while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
      if (n == -2) {   
         
         
         
         
         
         
         
         
         
         
         fSocket->GetOption(kBytesToRead, nch);
         if (nch == 0) {
            gSystem->Sleep(1000);
            continue;
         }
         if (nch > kBufSize) nch = kBufSize;
         n = fSocket->RecvRaw(waste, nch);
         if (n <= 0) {
            Error("HandleUrgentData", "error receiving waste");
            break;
         }
         wasted = 1;
      } else {
         Error("HandleUrgentData", "error receiving OOB");
         return;
      }
   }
   PDB(kGlobal, 5)
      Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
   if (fProof) fProof->SetActive();
   switch (oob_byte) {
      case TProof::kHardInterrupt:
         Info("HandleUrgentData", "*** Hard Interrupt");
         
         if (IsMaster())
            fProof->Interrupt(TProof::kHardInterrupt);
         
         while (1) {
            Int_t atmark;
            fSocket->GetOption(kAtMark, atmark);
            if (atmark) {
               
               
               n = fSocket->SendRaw(&oob_byte, 1, kOob);
               if (n <= 0)
                  Error("HandleUrgentData", "error sending OOB");
               break;
            }
            
            fSocket->GetOption(kBytesToRead, nch);
            if (nch == 0) {
               gSystem->Sleep(1000);
               continue;
            }
            if (nch > kBufSize) nch = kBufSize;
            n = fSocket->RecvRaw(waste, nch);
            if (n <= 0) {
               Error("HandleUrgentData", "error receiving waste (2)");
               break;
            }
         }
         SendLogFile();
         break;
      case TProof::kSoftInterrupt:
         Info("HandleUrgentData", "Soft Interrupt");
         
         if (IsMaster())
            fProof->Interrupt(TProof::kSoftInterrupt);
         if (wasted) {
            Error("HandleUrgentData", "soft interrupt flushed stream");
            break;
         }
         Interrupt();
         SendLogFile();
         break;
      case TProof::kShutdownInterrupt:
         Info("HandleUrgentData", "Shutdown Interrupt");
         
         if (IsMaster())
            fProof->Interrupt(TProof::kShutdownInterrupt);
         Terminate(0);
         break;
      default:
         Error("HandleUrgentData", "unexpected OOB byte");
         break;
   }
   if (fProof) fProof->SetActive(kFALSE);
}
void TProofServ::HandleSigPipe()
{
   
   
   
   TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
   if (IsMaster()) {
      
      
      if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
         Info("HandleSigPipe", "keepAlive probe failed");
         
         fProof->SetActive();
         fProof->Interrupt(TProof::kShutdownInterrupt);
         fProof->SetActive(kFALSE);
         Terminate(0);
      }
   } else {
      Info("HandleSigPipe", "keepAlive probe failed");
      Terminate(0);  
   }
}
Bool_t TProofServ::IsParallel() const
{
   
   if (IsMaster() && fProof)
      return fProof->IsParallel();
   
   return kFALSE;
}
void TProofServ::Print(Option_t *option) const
{
   
   if (IsMaster() && fProof)
      fProof->Print(option);
   else
      Printf("This is worker %s", gSystem->HostName());
}
void TProofServ::RedirectOutput(const char *dir, const char *mode)
{
   
   
   char logfile[512];
   TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
   if (IsMaster()) {
      sprintf(logfile, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
   } else {
      sprintf(logfile, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
   }
   if ((freopen(logfile, mode, stdout)) == 0)
      SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
   if ((dup2(fileno(stdout), fileno(stderr))) < 0)
      SysError("RedirectOutput", "could not redirect stderr");
   if ((fLogFile = fopen(logfile, "r")) == 0)
      SysError("RedirectOutput", "could not open logfile '%s'", logfile);
   
   if (fProtocol < 4 && fWorkDir != Form("~/%s", kPROOF_WorkDir)) {
      Warning("RedirectOutput", "no way to tell master (or client) where"
              " to upload packages");
   }
}
void TProofServ::Reset(const char *dir)
{
   
   
   gDirectory->cd(dir);
   
   gROOT->Reset();
   
   
   if (gDirectory != gROOT) {
      gDirectory->Delete();
   }
   if (IsMaster()) fProof->SendCurrentState();
}
Int_t TProofServ::ReceiveFile(const char *file, Bool_t bin, Long64_t size)
{
   
   
   
   
   if (size <= 0) return 0;
   
   Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
   if (fd < 0) {
      SysError("ReceiveFile", "error opening file %s", file);
      return -1;
   }
   const Int_t kMAXBUF = 16384;  
   char buf[kMAXBUF], cpy[kMAXBUF];
   Int_t    left, r;
   Long64_t filesize = 0;
   while (filesize < size) {
      left = Int_t(size - filesize);
      if (left > kMAXBUF)
         left = kMAXBUF;
      r = fSocket->RecvRaw(&buf, left);
      if (r > 0) {
         char *p = buf;
         filesize += r;
         while (r) {
            Int_t w;
            if (!bin) {
               Int_t k = 0, i = 0, j = 0;
               char *q;
               while (i < r) {
                  if (p[i] == '\r') {
                     i++;
                     k++;
                  }
                  cpy[j++] = buf[i++];
               }
               q = cpy;
               r -= k;
               w = write(fd, q, r);
            } else {
               w = write(fd, p, r);
            }
            if (w < 0) {
               SysError("ReceiveFile", "error writing to file %s", file);
               close(fd);
               return -1;
            }
            r -= w;
            p += w;
         }
      } else if (r < 0) {
         Error("ReceiveFile", "error during receiving file %s", file);
         close(fd);
         return -1;
      }
   }
   close(fd);
   chmod(file, 0644);
   return 0;
}
void TProofServ::Run(Bool_t retrn)
{
   
   
   if (CreateServer() == 0) {
      
      TApplication::Run(retrn);
   }
}
void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
{
   
   
   
   
   fflush(stdout);
   
   
   if (!IsMaster()) {
      if (!fSendLogToMaster) {
         FlushLogFile();
      } else {
         
         LogToMaster(kFALSE);
      }
   }
   off_t ltot=0, lnow=0;
   Int_t left = -1;
   Bool_t adhoc = kFALSE;
   if (fLogFileDes > -1) {
      ltot = lseek(fileno(stdout),   (off_t) 0, SEEK_END);
      lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
      if (start > -1) {
         lseek(fLogFileDes, (off_t) start, SEEK_SET);
         if (end <= start || end > ltot)
            end = ltot;
         left = (Int_t)(end - start);
         if (end < ltot)
            left++;
         adhoc = kTRUE;
      } else {
         left = (Int_t)(ltot - lnow);
      }
   }
   if (left > 0) {
      fSocket->Send(left, kPROOF_LOGFILE);
      const Int_t kMAXBUF = 32768;  
      char buf[kMAXBUF];
      Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
      Int_t len;
      do {
         while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
                TSystem::GetErrno() == EINTR)
            TSystem::ResetErrno();
         if (len < 0) {
            SysError("SendLogFile", "error reading log file");
            break;
         }
         if (end == ltot && len == wanted)
            buf[len-1] = '\n';
         if (fSocket->SendRaw(buf, len) < 0) {
            SysError("SendLogFile", "error sending log file");
            break;
         }
         
         left -= len;
         wanted = (left > kMAXBUF) ? kMAXBUF : left;
      } while (len > 0 && left > 0);
   }
   
   if (adhoc)
      lseek(fLogFileDes, lnow, SEEK_SET);
   TMessage mess(kPROOF_LOGDONE);
   if (IsMaster())
      mess << status << (fProof ? fProof->GetParallel() : 0);
   else
      mess << status << (Int_t) 1;
   fSocket->Send(mess);
}
void TProofServ::SendStatistics()
{
   
   Long64_t bytesread = 0;
   if (IsMaster())
      bytesread = fProof->GetBytesRead();
   else
      bytesread = TFile::GetFileBytesRead();
   TMessage mess(kPROOF_GETSTATS);
   TString workdir = gSystem->WorkingDirectory();  
   mess << bytesread << fRealTime << fCpuTime << workdir;
   if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
   mess << TString(gProofServ->GetImage());
   fSocket->Send(mess);
}
void TProofServ::SendParallel(Bool_t async)
{
   
   Int_t nparallel = 0;
   if (IsMaster()) {
      fProof->AskParallel();
      nparallel = fProof->GetParallel();
   } else {
      nparallel = 1;
   }
   TMessage mess(kPROOF_GETPARALLEL);
   mess << nparallel << async;
   fSocket->Send(mess);
}
Int_t TProofServ::UnloadPackage(const char *package)
{
   
   
   
   
   
   TObjString *pack = (TObjString *) fEnabledPackages->FindObject(package);
   if (pack) {
      
      TString aclicincpath = gSystem->GetIncludePath();
      TString cintincpath = gInterpreter->GetIncludePath();
      
      aclicincpath.Remove(aclicincpath.Length() - cintincpath.Length() - 1);
      
      aclicincpath.ReplaceAll(TString(" -I") + package, "");
      gSystem->SetIncludePath(aclicincpath);
      
      
      delete fEnabledPackages->Remove(pack);
      PDB(kPackage, 1)
         Info("UnloadPackage",
              "package %s successfully unloaded", package);
   }
   
   if (!gSystem->AccessPathName(package))
      if (gSystem->Unlink(package) != 0)
         Warning("UnloadPackage", "unable to remove symlink to %s", package);
   
   return 0;
}
Int_t TProofServ::UnloadPackages()
{
   
   
   TIter nextpackage(fEnabledPackages);
   while (TObjString* objstr = dynamic_cast<TObjString*>(nextpackage()))
      if (UnloadPackage(objstr->String()) != 0)
         return -1;
   PDB(kPackage, 1)
      Info("UnloadPackages",
           "packages successfully unloaded");
   return 0;
}
Int_t TProofServ::Setup()
{
   
   
   char str[512];
   if (IsMaster()) {
      sprintf(str, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
   } else {
      sprintf(str, "**** PROOF slave server @ %s started ****", gSystem->HostName());
   }
   if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
      Error("Setup", "failed to send proof server startup message");
      return -1;
   }
   
   
   Int_t what;
   if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
      Error("Setup", "failed to receive remote proof protocol");
      return -1;
   }
   if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
      Error("Setup", "failed to send local proof protocol");
      return -1;
   }
   
   if (fProtocol < 5) {
      TString wconf;
      if (OldAuthSetup(wconf) != 0) {
         Error("Setup", "OldAuthSetup: failed to setup authentication");
         return -1;
      }
      if (IsMaster()) {
         fConfFile = wconf;
         fWorkDir.Form("~/%s", kPROOF_WorkDir);
      } else {
         if (fProtocol < 4) {
            fWorkDir.Form("~/%s", kPROOF_WorkDir);
         } else {
            fWorkDir = wconf;
            if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
         }
      }
   } else {
      
      TMessage *mess;
      if ((fSocket->Recv(mess) <= 0) || !mess) {
         Error("Setup", "failed to receive ordinal and config info");
         return -1;
      }
      if (IsMaster()) {
         (*mess) >> fUser >> fOrdinal >> fConfFile;
         fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
      } else {
         (*mess) >> fUser >> fOrdinal >> fWorkDir;
         if (fWorkDir.IsNull())
            fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
      }
      
      if (fOrdinal != "-1")
         fPrefix += fOrdinal;
      TProofServLogHandler::SetDefaultPrefix(fPrefix);
      delete mess;
   }
   if (IsMaster()) {
      
      TString conffile = fConfFile;
      conffile.Remove(0, 1 + conffile.Index(":"));
      
      TProofResourcesStatic resources(fConfDir, conffile);
      if (resources.IsValid()) {
         if (resources.GetMaster()) {
            TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
            if (tmpWorkDir != "")
               fWorkDir = tmpWorkDir;
         }
      } else {
         Info("Setup", "invalid config file %s (missing or unreadable",
                        resources.GetFileName().Data());
      }
   }
   
   
   gSystem->Setenv("HOME", gSystem->HomeDirectory());
   
   if (fWorkDir.BeginsWith("/") &&
      !fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
      if (!fWorkDir.EndsWith("/"))
         fWorkDir += "/";
      UserGroup_t *u = gSystem->GetUserInfo();
      if (u) {
         fWorkDir += u->fUser;
         delete u;
      }
   }
   
   char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
   fWorkDir = workdir;
   delete [] workdir;
   if (gProofDebugLevel > 0)
      Info("Setup", "working directory set to %s", fWorkDir.Data());
   
   TString host = gSystem->HostName();
   if (host.Index(".") != kNPOS)
      host.Remove(host.Index("."));
   
   fSessionTag.Form("%s-%s-%d-%d", fOrdinal.Data(), host.Data(),
                    TTimeStamp().GetSec(),gSystem->GetPid());
   fTopSessionTag = fSessionTag;
   
   fSessionDir = fWorkDir;
   if (IsMaster())
      fSessionDir += "/master-";
   else
      fSessionDir += "/slave-";
   fSessionDir += fSessionTag;
   
   if (SetupCommon() != 0) {
      Error("Setup", "common setup failed");
      return -1;
   }
   
   fSocket->SetOption(kProcessGroup, gSystem->GetPid());
   
   fSocket->SetOption(kNoDelay, 1);
   
   fSocket->SetOption(kKeepAlive, 1);
   
   return 0;
}
Int_t TProofServ::SetupCommon()
{
   
   
   
   gSystem->Umask(022);
#ifdef R__UNIX
   TString bindir;
# ifdef ROOTBINDIR
   bindir = ROOTBINDIR;
# else
   bindir = gSystem->Getenv("ROOTSYS");
   if (!bindir.IsNull()) bindir += "/bin";
# endif
# ifdef COMPILER
   TString compiler = COMPILER;
   if (compiler.Index("is ") != kNPOS)
      compiler.Remove(0, compiler.Index("is ") + 3);
   compiler = gSystem->DirName(compiler);
   if (!bindir.IsNull()) bindir += ":";
   bindir += compiler;
#endif
   if (!bindir.IsNull()) bindir += ":";
   bindir += "/bin:/usr/bin:/usr/local/bin";
   
   TString path(gSystem->Getenv("PATH"));
   if (!path.IsNull()) path.Insert(0, ":");
   path.Insert(0, bindir);
   gSystem->Setenv("PATH", path);
#endif
   if (gSystem->AccessPathName(fWorkDir)) {
      gSystem->mkdir(fWorkDir, kTRUE);
      if (!gSystem->ChangeDirectory(fWorkDir)) {
         Error("SetupCommon", "can not change to PROOF directory %s",
               fWorkDir.Data());
         return -1;
      }
   } else {
      if (!gSystem->ChangeDirectory(fWorkDir)) {
         gSystem->Unlink(fWorkDir);
         gSystem->mkdir(fWorkDir, kTRUE);
         if (!gSystem->ChangeDirectory(fWorkDir)) {
            Error("SetupCommon", "can not change to PROOF directory %s",
                     fWorkDir.Data());
            return -1;
         }
      }
   }
   
   fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
                               Form("%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
   if (gSystem->AccessPathName(fCacheDir))
      gSystem->MakeDirectory(fCacheDir);
   if (gProofDebugLevel > 0)
      Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
   fCacheLock =
      new TProofLockPath(Form("%s/%s%s",
                         gSystem->TempDirectory(), kPROOF_CacheLockFile,
                         TString(fCacheDir).ReplaceAll("/","%").Data()));
   
   fPackageDir = gEnv->GetValue("ProofServ.PackageDir",
                                 Form("%s/%s", fWorkDir.Data(), kPROOF_PackDir));
   if (gSystem->AccessPathName(fPackageDir))
      gSystem->MakeDirectory(fPackageDir);
   if (gProofDebugLevel > 0)
      Info("SetupCommon", "package directory set to %s", fPackageDir.Data());
   fPackageLock =
      new TProofLockPath(Form("%s/%s%s",
                         gSystem->TempDirectory(), kPROOF_PackageLockFile,
                         TString(fPackageDir).ReplaceAll("/","%").Data()));
   
   TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
   if (globpack.Length() > 0) {
      Int_t ng = 0;
      Int_t from = 0;
      TString ldir;
      while (globpack.Tokenize(ldir, from, ":")) {
         if (gSystem->AccessPathName(ldir, kReadPermission)) {
            Warning("SetupCommon", "directory for global packages %s does not"
                             " exist or is not readable", ldir.Data());
         } else {
            
            TString key;
            key.Form("G%d", ng++);
            if (!fGlobalPackageDirList) {
               fGlobalPackageDirList = new THashList();
               fGlobalPackageDirList->SetOwner();
            }
            fGlobalPackageDirList->Add(new TNamed(key,ldir));
            Info("SetupCommon", "directory for global packages %s added to the list",
                          ldir.Data());
            FlushLogFile();
         }
      }
   }
   
   if (fSessionDir != gSystem->WorkingDirectory()) {
      if (gSystem->AccessPathName(fSessionDir))
         gSystem->MakeDirectory(fSessionDir);
      if (!gSystem->ChangeDirectory(fSessionDir)) {
         Error("SetupCommon", "can not change to working directory %s",
                              fSessionDir.Data());
         return -1;
      }
   }
   gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
   if (gProofDebugLevel > 0)
      Info("SetupCommon", "session dir is %s", fSessionDir.Data());
   
   
   if (IsMaster()) {
      
      fQueryDir = fWorkDir;
      fQueryDir += TString("/") + kPROOF_QueryDir;
      if (gSystem->AccessPathName(fQueryDir))
         gSystem->MakeDirectory(fQueryDir);
      fQueryDir += TString("/session-") + fTopSessionTag;
      if (gSystem->AccessPathName(fQueryDir))
         gSystem->MakeDirectory(fQueryDir);
      if (gProofDebugLevel > 0)
         Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
      
      fQueryLock = new TProofLockPath(Form("%s/%s%s-%s",
                       gSystem->TempDirectory(),
                       kPROOF_QueryLockFile, fTopSessionTag.Data(),
                       TString(fQueryDir).ReplaceAll("/","%").Data()));
      fQueryLock->Lock();
      
      fQMgr = new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
                                      fQueryLock, 0);
      
      TMessage m(kPROOF_SESSIONTAG);
      m << fTopSessionTag;
      fSocket->Send(m);
   }
   
   fImage = gEnv->GetValue("ProofServ.Image", "");
   
   fGroup = gEnv->GetValue("ProofServ.ProofGroup", "");
   if (IsMaster()) {
      
      fGroupPriority = GetPriority();
      
      TPluginHandler *h = 0;
      TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
      if (!dsm.IsNull()) {
         
         if (gROOT->GetPluginManager()) {
            
            h = gROOT->GetPluginManager()->FindHandler("TProofDataSetManager", dsm);
            if (h && h->LoadPlugin() != -1) {
               
               fDataSetManager =
                  reinterpret_cast<TProofDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
                                                          fUser.Data(), dsm.Data()));
            }
         }
      }
      if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
         Warning("SetupCommon", "dataset manager plug-in initialization failed");
         SafeDelete(fDataSetManager);
      }
      
      if (!fDataSetManager) {
         TString opts("As:");
         TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
         if (dsetdir.IsNull()) {
            
            dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
            if (gSystem->AccessPathName(fDataSetDir))
               gSystem->MakeDirectory(fDataSetDir);
            opts += "Sb:";
         }
         
         if (!h) {
            h = gROOT->GetPluginManager()->FindHandler("TProofDataSetManager", "file");
            if (h && h->LoadPlugin() == -1) h = 0;
         }
         if (h) {
            
            fDataSetManager = reinterpret_cast<TProofDataSetManager*>(h->ExecPlugin(3,
                              fGroup.Data(), fUser.Data(),
                              Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
         }
         if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
            Warning("SetupCommon", "default dataset manager plug-in initialization failed");
            SafeDelete(fDataSetManager);
         }
      }
   }
   
   TString quotas = gEnv->GetValue(Form("ProofServ.UserQuotas.%s", fUser.Data()),"");
   if (quotas.IsNull())
      quotas = gEnv->GetValue(Form("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
   if (quotas.IsNull())
      quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
   if (!quotas.IsNull()) {
      
      TString tok;
      Ssiz_t from = 0;
      while (quotas.Tokenize(tok, from, " ")) {
         
         if (tok.BeginsWith("maxquerykept=")) {
            tok.ReplaceAll("maxquerykept=","");
            if (tok.IsDigit())
               fMaxQueries = tok.Atoi();
            else
               Info("SetupCommon",
                    "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
         }
         
         const char *ksz[2] = {"hwmsz=", "maxsz="};
         for (Int_t j = 0; j < 2; j++) {
            if (tok.BeginsWith(ksz[j])) {
               tok.ReplaceAll(ksz[j],"");
               Long64_t fact = -1;
               if (!tok.IsDigit()) {
                  
                  tok.ToLower();
                  const char *s[3] = {"k", "m", "g"};
                  Int_t i = 0, k = 1024;
                  while (fact < 0) {
                     if (tok.EndsWith(s[i]))
                        fact = k;
                     else
                        k *= 1024;
                  }
                  tok.Remove(tok.Length()-1);
               }
               if (tok.IsDigit()) {
                  if (j == 0)
                     fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
                  else
                     fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
               } else
                  Info("SetupCommon", "parsing '%.*s' : ignoring token %s",
                                      strlen(ksz[j])-1, ksz[j], tok.Data());
            }
         }
      }
   }
   
   if (IsMaster() && fQMgr)
      if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
         Warning("SetupCommon", "problems applying fMaxQueries");
   
   if (fProtocol > 12) {
      TString vac = gROOT->GetVersion();
      if (gROOT->GetSvnRevision() > 0)
         vac += Form(":r%d", gROOT->GetSvnRevision());
      TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
      if (rtag.Length() > 0)
         vac += Form(":%s", rtag.Data());
      vac += Form("|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
      TMessage m(kPROOF_VERSARCHCOMP);
      m << vac;
      fSocket->Send(m);
   }
   
   TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
   TString name;
   Int_t from = 0;
   while (all_vars.Tokenize(name, from, ",")) {
      if (!name.IsNull()) {
         TString value = gSystem->Getenv(name);
         TProof::AddEnvVar(name, value);
      }
   }
   if (gProofDebugLevel > 0)
      Info("SetupCommon", "successfully completed");
   
   return 0;
}
void TProofServ::Terminate(Int_t status)
{
   
   
   ProcInfo_t pi;
   if (!gSystem->GetProcInfo(&pi)){
      Info("Terminate", "process memory footprint: %ld kB virtual, %ld kB resident ",
                        pi.fMemVirtual, pi.fMemResident);
      if (fVirtMemHWM > 0 || fVirtMemMax > 0) {
         Info("Terminate", "process virtual memory limits: %ld kB HWM, %ld kB max ",
                           fVirtMemHWM, fVirtMemMax);
      }
   }
   
   if (status == 0) {
      
      gSystem->ChangeDirectory("/");
      
      gSystem->MakeDirectory(fSessionDir+"/.delete");
      gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
   }
   
   if (IsMaster()) {
      if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
         
         gSystem->ChangeDirectory("/");
         
         gSystem->MakeDirectory(fQueryDir+"/.delete");
         gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
         
         if (fQueryLock)
            gSystem->Unlink(fQueryLock->GetName());
      }
      
      if (fQueryLock)
         fQueryLock->Unlock();
   }
   
   
   TIter next(gSystem->GetListOfFileHandlers());
   TObject *fh = 0;
   while ((fh = next())) {
      TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
      if (ih)
         gSystem->RemoveFileHandler(ih);
   }
   
   gSystem->ExitLoop();
   
}
Bool_t TProofServ::IsActive()
{
   
   return gProofServ ? kTRUE : kFALSE;
}
TProofServ *TProofServ::This()
{
   
   
   
   return gProofServ;
}
Int_t TProofServ::OldAuthSetup(TString &conf)
{
   
   
   OldProofServAuthSetup_t oldAuthSetupHook = 0;
   if (!oldAuthSetupHook) {
      
      TString authlib = "libRootAuth";
      char *p = 0;
      
      if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
         delete[] p;
         if (gSystem->Load(authlib) == -1) {
            Error("OldAuthSetup", "can't load %s",authlib.Data());
            return kFALSE;
         }
      } else {
         Error("OldAuthSetup", "can't locate %s",authlib.Data());
         return -1;
      }
      
      
      Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
      if (f)
         oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
      else {
         Error("OldAuthSetup", "can't find OldProofServAuthSetup");
         return -1;
      }
   }
   
   
   if (oldAuthSetupHook) {
      return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
                                 fUser, fOrdinal, conf);
   } else {
      Error("OldAuthSetup",
            "hook to method OldProofServAuthSetup is undefined");
      return -1;
   }
}
TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
                                               const char *opt,
                                               TList *inlist, Long64_t fst,
                                               TDSet *dset, const char *selec,
                                               TObject *elist)
{
   
   
   if (fQMgr) fQMgr->IncrementSeqNum();
   
   Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
   if (olds)
      dset->SetWriteV3(kFALSE);
   
   TProofQueryResult *pqr = new TProofQueryResult(fQMgr->SeqNum(), opt, inlist, nent,
                                                  fst, dset, selec, elist);
   
   pqr->SetTitle(gSystem->BaseName(fQueryDir));
   
   if (olds)
      dset->SetWriteV3(kTRUE);
   return pqr;
}
void TProofServ::SetQueryRunning(TProofQueryResult *pq)
{
   
   
   fflush(stdout);
   Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
   
   Printf(" ");
   Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
   
   TString parlist = "";
   TIter nxp(fEnabledPackages);
   TObjString *os= 0;
   while ((os = (TObjString *)nxp())) {
      if (parlist.Length() <= 0)
         parlist = os->GetName();
      else
         parlist += Form(";%s",os->GetName());
   }
   
   pq->SetRunning(startlog, parlist);
   
   pq->SetProcessInfo(pq->GetEntries(),
                      fProof->GetCpuTime(), fProof->GetBytesRead());
}
void TProofServ::HandleArchive(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleArchive", "Enter");
   TString queryref;
   TString path;
   (*mess) >> queryref >> path;
   
   if (queryref == "Default") {
      fArchivePath = path;
      Info("HandleArchive",
           "default path set to %s", fArchivePath.Data());
      return;
   }
   Int_t qry = -1;
   TString qdir;
   TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
   TProofQueryResult *pqm = pqr;
   if (path.Length() <= 0) {
      if (fArchivePath.Length() <= 0) {
         Info("HandleArchive",
              "archive paths are not defined - do nothing");
         return;
      }
      if (qry > 0) {
         path.Form("%s/session-%s-%d.root",
                   fArchivePath.Data(), fTopSessionTag.Data(), qry);
      } else {
         path = queryref;
         path.ReplaceAll(":q","-");
         path.Insert(0, Form("%s/",fArchivePath.Data()));
         path += ".root";
      }
   }
   
   if (!pqr || qry < 0) {
      TString fout = qdir;
      fout += "/query-result.root";
      TFile *f = TFile::Open(fout,"READ");
      pqr = 0;
      if (f) {
         f->ReadKeys();
         TIter nxk(f->GetListOfKeys());
         TKey *k =  0;
         while ((k = (TKey *)nxk())) {
            if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
               pqr = (TProofQueryResult *) f->Get(k->GetName());
               if (pqr)
                  break;
            }
         }
         f->Close();
         delete f;
      } else {
         Info("HandleArchive",
              "file cannot be open (%s)",fout.Data());
         return;
      }
   }
   if (pqr) {
      PDB(kGlobal, 1) Info("HandleArchive",
                           "archive path for query #%d: %s",
                           qry, path.Data());
      TFile *farc = 0;
      if (gSystem->AccessPathName(path))
         farc = TFile::Open(path,"NEW");
      else
         farc = TFile::Open(path,"UPDATE");
      if (!farc || !(farc->IsOpen())) {
         Info("HandleArchive",
              "archive file cannot be open (%s)",path.Data());
         return;
      }
      farc->cd();
      
      pqr->SetArchived(path);
      if (pqm)
         pqm->SetArchived(path);
      
      pqr->Write();
      
      if (qry > -1 && fQMgr)
         fQMgr->SaveQuery(pqr);
      
      Info("HandleArchive",
           "results of query %s archived to file %s",
           queryref.Data(), path.Data());
   }
   
   return;
}
void TProofServ::HandleProcess(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleProcess", "Enter");
   
   if (!IsTopMaster() && !fIdle)
      return;
   TDSet *dset;
   TString filename, opt;
   TList *input;
   Long64_t nentries, first;
   TEventList *evl = 0;
   TEntryList *enl = 0;
   Bool_t sync;
   (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
   
   if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
      (*mess) >> enl;
   Bool_t hasNoData = (dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
   
   TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
   if (enl && evl)
      
      SafeDelete(evl);
   if ((!hasNoData) && elist)
      dset->SetEntryList(elist);
   if (IsTopMaster()) {
      
      if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
         TString emsg;
         if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
            SendAsynMessage(Form("AssertDataSet on %s: %s",
                                 fPrefix.Data(), emsg.Data()));
            Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
            return;
         }
      }
      TProofQueryResult *pq = 0;
      
      
      pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
      
      if (dset) input->Add(dset);
      if (elist) input->Add(elist);
      pq->SetInputList(input, kTRUE);
      
      input->Clear("nodelete");
      SafeDelete(input);
      input = pq->GetInputList();
      
      TString emsg;
      if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
         Warning("HandleProcess", "could not save input data: %s", emsg.Data());
      
      if (!(pq->IsDraw())) {
         if (fQMgr) {
            if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
            
            fQMgr->SaveQuery(pq);
         }
      }
      
      fWaitingQueries->Add(pq);
      
      
      
      TMessage m(kPROOF_QUERYSUBMITTED);
      if (!sync) {
         m << pq->GetSeqNum() << kFALSE;
         fSocket->Send(m);
      }
      
      if (!fIdle) {
         
         Info("HandleProcess",
              "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
         return;
      }
      
      while (fWaitingQueries->GetSize() > 0) {
         
         
         fIdle = kFALSE;
         
         
         pq = (TProofQueryResult *)(fWaitingQueries->First());
         if (pq) {
            if (IsMaster() && fProof->UseDynamicStartup()) {
               
               TList* workerList = new TList();
               Int_t pc = 0;
               if (GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
                  Error("HandleProcess", "getting list of worker nodes");
                  return;
               }
               if (Int_t ret = fProof->AddWorkers(workerList) < 0) {
                  Error("HandleProcess", "Adding a list of worker nodes returned: %d",
                        ret);
                  return;
               }
            }
            opt      = pq->GetOptions();
            input    = pq->GetInputList();
            nentries = pq->GetEntries();
            first    = pq->GetFirst();
            filename = pq->GetSelecImp()->GetName();
            Ssiz_t id = opt.Last('#');
            if (id != kNPOS && id < opt.Length() - 1)
               filename += opt(id + 1, opt.Length());
            
            TObject *o = 0;
            if ((o = pq->GetInputObject("TDSet")))
               dset = (TDSet *) o;
            elist = 0;
            if ((o = pq->GetInputObject("TEntryList")))
               elist = o;
            else if ((o = pq->GetInputObject("TEventList")))
               elist = o;
            
            
            if (pq->GetSelecImp()) {
               gSystem->Exec(Form("%s %s", kRM, pq->GetSelecImp()->GetName()));
               pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
            }
            if (pq->GetSelecHdr() &&
                !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
               gSystem->Exec(Form("%s %s", kRM, pq->GetSelecHdr()->GetName()));
               pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
            }
            
            
            fWaitingQueries->Remove(pq);
         } else {
            
            Error("HandleProcess", "empty query in queue!");
            continue;
         }
         
         SetQueryRunning(pq);
         
         if (fQMgr) {
            if (!(pq->IsDraw()))
               fQMgr->SaveQuery(pq);
            else
               fQMgr->IncrementDrawQueries();
         }
         fQMgr->ResetTime();
         
         m.Reset(kPROOF_STARTPROCESS);
         m << TString(pq->GetSelecImp()->GetName())
           << dset->GetListOfElements()->GetSize()
           << pq->GetFirst() << pq->GetEntries();
         fSocket->Send(m);
         
         MakePlayer();
         
         fPlayer->AddQueryResult(pq);
         
         fPlayer->SetCurrentQuery(pq);
         
         if (dset->IsA() == TDSetProxy::Class())
            ((TDSetProxy*)dset)->SetProofServ(this);
         
         
         input->Add(new TNamed("PROOF_QueryTag",Form("%s:%s",pq->GetTitle(),pq->GetName())));
         
         TIter next(input);
         TObject *o = 0;
         while ((o = next())) {
            PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
            fPlayer->AddInput(o);
         }
         
         if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
         
         PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
         fPlayer->Process(dset, filename, opt, nentries, first);
         
         if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
            Bool_t abort =
              (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
            m.Reset(kPROOF_STOPPROCESS);
            
            if (fProtocol > 18) {
               TProofProgressStatus* status = fPlayer->GetProgressStatus();
               m << status << abort;
               status = 0; 
            } else if (fProtocol > 8) {
               m << fPlayer->GetEventsProcessed() << abort;
            } else {
               m << fPlayer->GetEventsProcessed();
            }
            fSocket->Send(m);
         }
         
         if (fQMgr) {
            fProof->AskStatistics();
            if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
               fQMgr->SaveQuery(pq, fMaxQueries);
         }
         
         TQueryResult *pqr = pq->CloneInfo();
         if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
            PDB(kGlobal, 2) Info("HandleProcess","Sending results");
            if (fProtocol > 10) {
               
               TMessage mbuf(kPROOF_OUTPUTOBJECT);
               
               Int_t olsz = fPlayer->GetOutputList()->GetSize();
               
               SendAsynMessage(Form("%s: sending output: %d objs",fPrefix.Data(), olsz), kFALSE);
               
               mbuf << (Int_t) 0;
               mbuf.WriteObject(pqr);
               fSocket->Send(mbuf);
               Int_t ns = 0;
               Int_t totsz = 0;
               TIter nxo(fPlayer->GetOutputList());
               o = 0;
               while ((o = nxo())) {
                  ns++;
                  mbuf.Reset();
                  Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
                  mbuf << type;
                  mbuf.WriteObject(o);
                  totsz += mbuf.Length();
                  SendAsynMessage(Form("%s: sending obj %d/%d (%d bytes)",fPrefix.Data(),
                                       ns, olsz, mbuf.Length()), kFALSE);
                  fSocket->Send(mbuf);
               }
               
               SendAsynMessage(Form("%s: grand total: sent %d objects, size: %d bytes",
                                    fPrefix.Data(), olsz, totsz));
            } else if (fProtocol > 6) {
               
               TMessage mbuf(kPROOF_OUTPUTLIST);
               mbuf.WriteObject(pq);
               
               Int_t blen = mbuf.Length();
               Int_t olsz = fPlayer->GetOutputList()->GetSize();
               
               SendAsynMessage(Form("%s: sending output: %d objs, %d bytes",
                                     fPrefix.Data(), olsz, blen));
               fSocket->Send(mbuf);
            } else {
               
               PDB(kGlobal, 2) Info("HandleProcess","Sending output list");
               fSocket->SendObject(fPlayer->GetOutputList(), kPROOF_OUTPUTLIST);
            }
         } else {
            if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
               Warning("HandleProcess","The output list is empty!");
            fSocket->SendObject(0, kPROOF_OUTPUTLIST);
         }
         
         if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
            if (fQMgr) fQMgr->RemoveQuery(pq);
         } else {
            
            if (!(pq->IsDraw())) {
               if (fQMgr && fQMgr->Queries()) {
                  if (pqr)
                     fQMgr->Queries()->Add(pqr);
                  
                  fQMgr->Queries()->Remove(pq);
               }
               
               
               
               fPlayer->RemoveQueryResult(Form("%s:%s",
                                          pq->GetTitle(), pq->GetName()));
            }
         }
         DeletePlayer();
         if (IsMaster() && fProof->UseDynamicStartup())
            
            fProof->RemoveWorkers(0);
      } 
      
      fSocket->Send(kPROOF_SETIDLE);
   } else {
      
      fIdle = kFALSE;
      MakePlayer();
      
      if (dset->IsA() == TDSetProxy::Class())
         ((TDSetProxy*)dset)->SetProofServ(this);
      
      TString emsg;
      if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
         Warning("HandleProcess", "could not get input data: %s", emsg.Data());
      
      TIter next(input);
      TObject *o = 0;
      while ((o = next())) {
         PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
         fPlayer->AddInput(o);
      }
      
      fSocket->Send(kPROOF_STARTPROCESS);
      
      PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
      fPlayer->Process(dset, filename, opt, nentries, first);
      
      TMessage m(kPROOF_STOPPROCESS);
      Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
      if (fProtocol > 18) {
         TProofProgressStatus* status =
            new TProofProgressStatus(fPlayer->GetEventsProcessed(),
                                    gPerfStats?gPerfStats->GetBytesRead():0);
         if (status)
            m << status << abort;
      } else {
         m << fPlayer->GetEventsProcessed() << abort;
      }
      fSocket->Send(m);
      
      if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
         if (fProtocol > 10) {
            
            
            TMessage mbuf(kPROOF_OUTPUTOBJECT);
            
            Int_t ns = 0;
            Int_t olsz = fPlayer->GetOutputList()->GetSize();
            TIter nxo(fPlayer->GetOutputList());
            o = 0;
            while ((o = nxo())) {
               ns++;
               mbuf.Reset();
               mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
               mbuf.WriteObject(o);
               fSocket->Send(mbuf);
            }
         } else {
            PDB(kGlobal, 2) Info("HandleProcess","Sending output list");
            fSocket->SendObject(fPlayer->GetOutputList(), kPROOF_OUTPUTLIST);
         }
      } else {
         fSocket->SendObject(0, kPROOF_OUTPUTLIST);
      }
      
      SafeDelete(dset);
      SafeDelete(enl);
      SafeDelete(evl);
      
      fPlayer->GetInputList()->SetOwner(0);
      input->SetOwner();
      SafeDelete(input);
      
      fSocket->Send(kPROOF_SETIDLE);
      DeletePlayer();
   }
   
   fIdle = kTRUE;
   PDB(kGlobal, 1) Info("HandleProcess", "Done");
   
   return;
}
void TProofServ::HandleQueryList(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleQueryList", "Enter");
   Bool_t all;
   (*mess) >> all;
   TList *ql = new TList;
   Int_t ntot = 0, npre = 0, ndraw= 0;
   if (fQMgr) {
      if (all) {
         
         TString qdir = fQueryDir;
         Int_t idx = qdir.Index("session-");
         if (idx != kNPOS)
            qdir.Remove(idx);
         fQMgr->ScanPreviousQueries(qdir);
         
         if (fQMgr->PreviousQueries()) {
            TIter nxq(fQMgr->PreviousQueries());
            TProofQueryResult *pqr = 0;
            while ((pqr = (TProofQueryResult *)nxq())) {
               ntot++;
               pqr->fSeqNum = ntot;
               ql->Add(pqr);
            }
         }
      }
      npre = ntot;
      if (fQMgr->Queries()) {
         
         TIter nxq(fQMgr->Queries());
         TProofQueryResult *pqr = 0;
         TQueryResult *pqm = 0;
         while ((pqr = (TProofQueryResult *)nxq())) {
            ntot++;
            pqm = pqr->CloneInfo();
            pqm->fSeqNum = ntot;
            ql->Add(pqm);
         }
      }
      
      ndraw = fQMgr->DrawQueries();
   }
   TMessage m(kPROOF_QUERYLIST);
   m << npre << ndraw << ql;
   fSocket->Send(m);
   delete ql;
   
   return;
}
void TProofServ::HandleRemove(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleRemove", "Enter");
   TString queryref;
   (*mess) >> queryref;
   if (queryref == "cleanupqueue") {
      Int_t pend = fWaitingQueries->GetSize();
      
      fWaitingQueries->Delete();
      
      Info("HandleRemove", "%d queries removed from the waiting list", pend);
      
      return;
   }
   if (queryref == "cleanupdir") {
      
      Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
      
      Info("HandleRemove", "%d directories removed", nd);
      
      return;
   }
   if (fQMgr) {
      TProofLockPath *lck = 0;
      if (fQMgr->LockSession(queryref, &lck) == 0) {
         
         fQMgr->RemoveQuery(queryref, fWaitingQueries);
         
         if (lck) {
            gSystem->Unlink(lck->GetName());
            SafeDelete(lck);
         }
         
         return;
      }
   } else {
      Warning("HandleRemove", "query result manager undefined!");
   }
   
   Info("HandleRemove",
        "query %s could not be removed (unable to lock session)", queryref.Data());
   
   return;
}
void TProofServ::HandleRetrieve(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleRetrieve", "Enter");
   TString queryref;
   (*mess) >> queryref;
   
   Int_t qry = -1;
   TString qdir;
   if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
   TString fout = qdir;
   fout += "/query-result.root";
   TFile *f = TFile::Open(fout,"READ");
   TProofQueryResult *pqr = 0;
   if (f) {
      f->ReadKeys();
      TIter nxk(f->GetListOfKeys());
      TKey *k =  0;
      while ((k = (TKey *)nxk())) {
         if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
            pqr = (TProofQueryResult *) f->Get(k->GetName());
            
            if (fProtocol < 13) {
               TDSet *d = 0;
               TObject *o = 0;
               TIter nxi(pqr->GetInputList());
               while ((o = nxi()))
                  if ((d = dynamic_cast<TDSet *>(o)))
                     break;
               d->SetWriteV3(kTRUE);
            }
            if (pqr) {
               
               Float_t qsz = (Float_t) f->GetSize();
               Int_t ilb = 0;
               static const char *clb[4] = { "bytes", "KB", "MB", "GB" };
               while (qsz > 1000. && ilb < 3) {
                  qsz /= 1000.;
                  ilb++;
               }
               SendAsynMessage(Form("%s: sending result of %s:%s (%'.1f %s)",
                                    fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
                                    qsz, clb[ilb]));
               fSocket->SendObject(pqr, kPROOF_RETRIEVE);
            } else {
               Info("HandleRetrieve",
                    "query not found in file %s",fout.Data());
               
               fSocket->SendObject(0, kPROOF_RETRIEVE);
            }
            break;
         }
      }
      f->Close();
      delete f;
   } else {
      Info("HandleRetrieve",
           "file cannot be open (%s)",fout.Data());
      
      fSocket->SendObject(0, kPROOF_RETRIEVE);
      return;
   }
   
   return;
}
void TProofServ::HandleLibIncPath(TMessage *mess)
{
   
   TString type;
   Bool_t add;
   TString path;
   (*mess) >> type >> add >> path;
   
   if ((type != "lib") && (type != "inc")) {
      Error("HandleLibIncPath","unknown action type: %s", type.Data());
      return;
   }
   
   path.ReplaceAll(","," ");
   
   TObjArray *op = 0;
   if (path.Length() > 0 && path != "-") {
      if (!(op = path.Tokenize(" "))) {
         Error("HandleLibIncPath","decomposing path %s", path.Data());
         return;
      }
   }
   if (add) {
      if (type == "lib") {
         
         TIter nxl(op, kIterBackward);
         TObjString *lib = 0;
         while ((lib = (TObjString *) nxl())) {
            
            TString xlib = lib->GetName();
            gSystem->ExpandPathName(xlib);
            
            if (!gSystem->AccessPathName(xlib, kReadPermission)) {
               TString newlibpath = gSystem->GetDynamicPath();
               
               Int_t pos = 0;
               if (newlibpath.BeginsWith(".:"))
                  pos = 2;
               if (newlibpath.Index(xlib) == kNPOS) {
                  newlibpath.Insert(pos,Form("%s:", xlib.Data()));
                  gSystem->SetDynamicPath(newlibpath);
               }
            } else {
               Info("HandleLibIncPath",
                    "libpath %s does not exist or cannot be read - not added", xlib.Data());
            }
         }
         
         if (IsMaster())
            fProof->AddDynamicPath(path);
      } else {
         
         TIter nxi(op);
         TObjString *inc = 0;
         while ((inc = (TObjString *) nxi())) {
            
            TString xinc = inc->GetName();
            gSystem->ExpandPathName(xinc);
            
            if (!gSystem->AccessPathName(xinc, kReadPermission)) {
               TString curincpath = gSystem->GetIncludePath();
               if (curincpath.Index(xinc) == kNPOS)
                  gSystem->AddIncludePath(Form("-I%s", xinc.Data()));
            } else
               Info("HandleLibIncPath",
                    "incpath %s does not exist or cannot be read - not added", xinc.Data());
         }
         
         if (IsMaster())
            fProof->AddIncludePath(path);
      }
   } else {
      if (type == "lib") {
         
         TIter nxl(op);
         TObjString *lib = 0;
         while ((lib = (TObjString *) nxl())) {
            
            TString xlib = lib->GetName();
            gSystem->ExpandPathName(xlib);
            
            TString newlibpath = gSystem->GetDynamicPath();
            newlibpath.ReplaceAll(Form("%s:", xlib.Data()),"");
            gSystem->SetDynamicPath(newlibpath);
         }
         
         if (IsMaster())
            fProof->RemoveDynamicPath(path);
      } else {
         
         TIter nxi(op);
         TObjString *inc = 0;
         while ((inc = (TObjString *) nxi())) {
            TString newincpath = gSystem->GetIncludePath();
            newincpath.ReplaceAll(Form("-I%s", inc->GetName()),"");
            
            newincpath.ReplaceAll(gInterpreter->GetIncludePath(),"");
            gSystem->SetIncludePath(newincpath);
         }
         
         if (IsMaster())
            fProof->RemoveIncludePath(path);
      }
   }
}
void TProofServ::HandleCheckFile(TMessage *mess)
{
   
   TString filenam;
   TMD5    md5;
   UInt_t  opt = TProof::kUntar;
   TMessage reply(kPROOF_CHECKFILE);
   
   (*mess) >> filenam >> md5;
   if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
      (*mess) >> opt;
   if (filenam.BeginsWith("-")) {
      
      
      Int_t  st  = 0;
      Bool_t err = kFALSE;
      filenam = filenam.Strip(TString::kLeading, '-');
      TString packnam = filenam;
      packnam.Remove(packnam.Length() - 4);  
      
      fPackageLock->Lock();
      TMD5 *md5local = TMD5::FileChecksum(fPackageDir + "/" + filenam);
      if (md5local && md5 == (*md5local)) {
         if ((opt & TProof::kRemoveOld)) {
            
            st = gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
                               packnam.Data()));
            if (st)
               Error("HandleCheckFile", "failure executing: %s %s/%s",
                     kRM, fPackageDir.Data(), packnam.Data());
         }
         
         char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
                                       kExecutePermission);
         if (gunzip) {
            
            st = gSystem->Exec(Form(kUNTAR, gunzip, fPackageDir.Data(),
                               filenam.Data(), fPackageDir.Data()));
            if (st)
               Error("HandleCheckFile", "failure executing: %s",
                     Form(kUNTAR, gunzip, fPackageDir.Data(),
                          filenam.Data(), fPackageDir.Data()));
            delete [] gunzip;
         } else
            Error("HandleCheckFile", "%s not found", kGUNZIP);
         
         if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
            
            reply << (Int_t)0;
            if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
            err = kTRUE;
            Error("HandleCheckFile", "package %s did not unpack into %s",
                                     filenam.Data(), packnam.Data());
         } else {
            
            TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
            TMD5::WriteChecksum(md5f, md5local);
            
            reply << (Int_t)1;
            PDB(kPackage, 1)
               Info("HandleCheckFile",
                    "package %s installed on node", filenam.Data());
         }
      } else {
         reply << (Int_t)0;
         if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
         err = kTRUE;
         PDB(kPackage, 1)
            Info("HandleCheckFile",
                 "package %s not yet on node", filenam.Data());
      }
      
      
      
      
      
      
      if (err) {
         
         gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
                       filenam.Data()));
         fPackageLock->Unlock();
      } else if (IsMaster()) {
         
         fPackageLock->Unlock();
         fProof->UploadPackage(fPackageDir + "/" + filenam, (TProof::EUploadPackageOpt)opt);
      } else {
         
         fPackageLock->Unlock();
      }
      delete md5local;
      fSocket->Send(reply);
   } else if (filenam.BeginsWith("+")) {
      
      filenam = filenam.Strip(TString::kLeading, '+');
      TString packnam = filenam;
      packnam.Remove(packnam.Length() - 4);  
      TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
      fPackageLock->Lock();
      TMD5 *md5local = TMD5::ReadChecksum(md5f);
      fPackageLock->Unlock();
      if (md5local && md5 == (*md5local)) {
         
         reply << (Int_t)1;
         PDB(kPackage, 1)
            Info("HandleCheckFile",
                 "package %s already on node", filenam.Data());
         if (IsMaster())
            fProof->UploadPackage(fPackageDir + "/" + filenam);
      } else {
         reply << (Int_t)0;
         if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
         PDB(kPackage, 1)
            Info("HandleCheckFile",
                 "package %s not yet on node", filenam.Data());
      }
      delete md5local;
      fSocket->Send(reply);
   } else if (filenam.BeginsWith("=")) {
      
      filenam = filenam.Strip(TString::kLeading, '=');
      TString packnam = filenam;
      packnam.Remove(packnam.Length() - 4);  
      TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
      fPackageLock->Lock();
      TMD5 *md5local = TMD5::ReadChecksum(md5f);
      fPackageLock->Unlock();
      if (md5local && md5 == (*md5local)) {
         
         reply << (Int_t)1;
         PDB(kPackage, 1)
            Info("HandleCheckFile",
                 "package %s already on node", filenam.Data());
         if (IsMaster())
            fProof->UploadPackage(fPackageDir + "/" + filenam);
      } else {
         reply << (Int_t)0;
         if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
         PDB(kPackage, 1)
            Info("HandleCheckFile",
                 "package %s not yet on node", filenam.Data());
      }
      delete md5local;
      fSocket->Send(reply);
   } else {
      
      TString cachef = fCacheDir + "/" + filenam;
      fCacheLock->Lock();
      TMD5 *md5local = TMD5::FileChecksum(cachef);
      if (md5local && md5 == (*md5local)) {
         
         Bool_t cp = ((opt & TProof::kCp) || (fProtocol <= 19)) ? kTRUE : kFALSE;
         if (cp) {
            Bool_t cpbin = (opt & TProof::kCpBin) ? kTRUE : kFALSE;
            CopyFromCache(filenam, cpbin);
         }
         reply << (Int_t)1;
         PDB(kCache, 1)
            Info("HandleCheckFile", "file %s already on node", filenam.Data());
      } else {
         reply << (Int_t)0;
         if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
         PDB(kCache, 1)
            Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
      }
      delete md5local;
      fSocket->Send(reply);
      fCacheLock->Unlock();
   }
}
Int_t TProofServ::HandleCache(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleCache", "Enter");
   Int_t status = 0;
   Int_t type = 0;
   Bool_t all = kFALSE;
   TMessage msg;
   Bool_t fromglobal = kFALSE;
   
   TString noth;
   const char *k = (IsMaster()) ? "Mst" : "Wrk";
   noth.Form("%s-%s", k, fOrdinal.Data());
   TString package, pdir, ocwd, file;
   (*mess) >> type;
   switch (type) {
      case TProof::kShowCache:
         (*mess) >> all;
         printf("*** File cache %s:%s ***\n", gSystem->HostName(),
                fCacheDir.Data());
         fflush(stdout);
         PDB(kCache, 1) {
            gSystem->Exec(Form("%s -a %s", kLS, fCacheDir.Data()));
         } else {
            gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
         }
         if (IsMaster() && all)
            fProof->ShowCache(all);
         LogToMaster();
         break;
      case TProof::kClearCache:
         file = "";
         if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
         fCacheLock->Lock();
         if (file.IsNull() || file == "*") {
            gSystem->Exec(Form("%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
         } else {
            gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
         }
         fCacheLock->Unlock();
         if (IsMaster())
            fProof->ClearCache(file);
         break;
      case TProof::kShowPackages:
         (*mess) >> all;
         if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
            
            TIter nxd(fGlobalPackageDirList);
            TNamed *nm = 0;
            while ((nm = (TNamed *)nxd())) {
               printf("*** Global Package cache %s %s:%s ***\n",
                      nm->GetName(), gSystem->HostName(), nm->GetTitle());
               fflush(stdout);
               gSystem->Exec(Form("%s %s", kLS, nm->GetTitle()));
               printf("\n");
               fflush(stdout);
            }
         }
         printf("*** Package cache %s:%s ***\n", gSystem->HostName(),
                fPackageDir.Data());
         fflush(stdout);
         gSystem->Exec(Form("%s %s", kLS, fPackageDir.Data()));
         if (IsMaster() && all)
            fProof->ShowPackages(all);
         LogToMaster();
         break;
      case TProof::kClearPackages:
         status = UnloadPackages();
         if (status == 0) {
            fPackageLock->Lock();
            gSystem->Exec(Form("%s %s/*", kRM, fPackageDir.Data()));
            fPackageLock->Unlock();
            if (IsMaster())
               status = fProof->ClearPackages();
         }
         break;
      case TProof::kClearPackage:
         (*mess) >> package;
         status = UnloadPackage(package);
         if (status == 0) {
            fPackageLock->Lock();
            
            gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
                          package.Data()));
            if (IsMaster())
               gSystem->Exec(Form("%s %s/%s.par", kRM, fPackageDir.Data(),
                             package.Data()));
            fPackageLock->Unlock();
            if (IsMaster())
               status = fProof->ClearPackage(package);
         }
         break;
      case TProof::kBuildPackage:
         (*mess) >> package;
         
         pdir = fPackageDir + "/" + package;
         fromglobal = kFALSE;
         if (gSystem->AccessPathName(pdir, kReadPermission) ||
             gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
            
            if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
               
               TIter nxd(fGlobalPackageDirList);
               TNamed *nm = 0;
               while ((nm = (TNamed *)nxd())) {
                  pdir.Form("%s/%s", nm->GetTitle(), package.Data());
                  if (!gSystem->AccessPathName(pdir, kReadPermission) &&
                      !gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
                     
                     fromglobal = kTRUE;
                     break;
                  }
                  pdir = "";
               }
               if (pdir.Length() <= 0) {
                  
                  SendAsynMessage(Form("%s: kBuildPackage: failure locating %s ...",
                                       noth.Data(), package.Data()));
                  break;
               } else {
                  
                  break;
               }
            }
         }
         if (IsMaster()) {
            
            fProof->UploadPackage(pdir + ".par");
         }
         fPackageLock->Lock();
         if (!status) {
            PDB(kPackage, 1)
               Info("HandleCache",
                    "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
            ocwd = gSystem->WorkingDirectory();
            gSystem->ChangeDirectory(pdir);
            
            if (IsMaster())
               fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
            
            if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
               
               SendAsynMessage(Form("%s: building %s ...", noth.Data(), package.Data()));
               
               
               Bool_t savever = kFALSE;
               TString v;
               Int_t rev = -1;
               FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
               if (f) {
                  TString r;
                  v.Gets(f);
                  r.Gets(f);
                  rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
                  fclose(f);
               }
               if (!f || v != gROOT->GetVersion() ||
                  (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision())) {
                  if (!fromglobal) {
                     savever = kTRUE;
                     SendAsynMessage(Form("%s: %s: version change (current: %s:%d,"
                                          " build: %s:%d): cleaning ... ",
                                          noth.Data(), package.Data(), gROOT->GetVersion(),
                                          gROOT->GetSvnRevision(), v.Data(), rev));
                     
                     gSystem->ChangeDirectory(fPackageDir);
                     
                     gSystem->Exec(Form("%s %s", kRM, pdir.Data()));
                     
                     char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
                                                   kExecutePermission);
                     if (gunzip) {
                        TString par;
                        par.Form("%s.par", pdir.Data());
                        
                        TString cmd;
                        cmd.Form(kUNTAR3, gunzip, par.Data());
                        status = gSystem->Exec(cmd);
                        if (status) {
                           Error("HandleCache", "kBuildPackage: failure executing: %s", cmd.Data());
                        } else {
                           
                           TMD5 *md5local = TMD5::FileChecksum(par);
                           TString md5f = fPackageDir + "/" + package + "/PROOF-INF/md5.txt";
                           TMD5::WriteChecksum(md5f, md5local);
                           
                           gSystem->ChangeDirectory(pdir);
                        }
                        delete [] gunzip;
                     } else
                        Error("HandleCache", "kBuildPackage: %s not found", kGUNZIP);
                  } else {
                     SendAsynMessage(Form("%s: %s: ROOT version inconsistency (current: %s, build: %s):"
                                          " global package: cannot re-build!!! ",
                                          noth.Data(), package.Data(), gROOT->GetVersion(), v.Data()));
                  }
               }
               if (!status) {
                  
                  
                  
                  
                  
                  TString ipath(gSystem->GetIncludePath());
                  ipath.ReplaceAll("\"","");
                  TString cmd;
                  cmd.Form("export ROOTINCLUDEPATH=\"%s\" ; PROOF-INF/BUILD.sh", ipath.Data());
                  {
                     TProofServLogHandlerGuard hg(cmd, fSocket);
                  }
                  
                  if (savever) {
                     f = fopen("PROOF-INF/proofvers.txt", "w");
                     if (f) {
                        fputs(gROOT->GetVersion(), f);
                        fputs(Form("\n%d",gROOT->GetSvnRevision()), f);
                        fclose(f);
                     }
                  }
               }
            }
            gSystem->ChangeDirectory(ocwd);
         }
         fPackageLock->Unlock();
         if (status) {
            
            SendAsynMessage(Form("%s: failure building %s ...", noth.Data(), package.Data()));
         } else {
            
            if (IsMaster())
               fProof->BuildPackage(package, TProof::kCollectBuildResults);
            PDB(kPackage, 1)
               Info("HandleCache", "package %s successfully built", package.Data());
         }
         break;
      case TProof::kLoadPackage:
         (*mess) >> package;
         
         if (fEnabledPackages->FindObject(package)) {
            Info("HandleCache",
                 "package %s already loaded", package.Data());
            break;
         }
         
         pdir = fPackageDir + "/" + package;
         if (gSystem->AccessPathName(pdir, kReadPermission)) {
            
            if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
               
               TIter nxd(fGlobalPackageDirList);
               TNamed *nm = 0;
               while ((nm = (TNamed *)nxd())) {
                  pdir.Form("%s/%s", nm->GetTitle(), package.Data());
                  if (!gSystem->AccessPathName(pdir, kReadPermission)) {
                     
                     break;
                  }
                  pdir = "";
               }
               if (pdir.Length() <= 0) {
                  
                  SendAsynMessage(Form("%s: kLoadPackage: failure locating %s ...",
                                       noth.Data(), package.Data()));
                  break;
               }
            }
         }
         ocwd = gSystem->WorkingDirectory();
         gSystem->ChangeDirectory(pdir);
         
         if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
            Int_t err = 0;
            Int_t errm = gROOT->Macro("PROOF-INF/SETUP.C", &err);
            if (errm < 0)
               status = -1;
            if (err > TInterpreter::kNoError && err <= TInterpreter::kFatal)
               status = -1;
         }
         gSystem->ChangeDirectory(ocwd);
         if (status) {
            
            SendAsynMessage(Form("%s: failure loading %s ...", noth.Data(), package.Data()));
         } else {
            
            gSystem->Symlink(pdir, package);
            
            
            gSystem->AddIncludePath(TString("-I") + package);
            
            gROOT->ProcessLine(TString(".include ") + package);
            
            fEnabledPackages->Add(new TObjString(package));
            if (IsMaster())
               fProof->LoadPackage(package);
            PDB(kPackage, 1)
               Info("HandleCache", "package %s successfully loaded", package.Data());
         }
         break;
      case TProof::kShowEnabledPackages:
         (*mess) >> all;
         if (IsMaster()) {
            if (all)
               printf("*** Enabled packages on master %s on %s\n",
                      fOrdinal.Data(), gSystem->HostName());
            else
               printf("*** Enabled packages ***\n");
         } else {
            printf("*** Enabled packages on slave %s on %s\n",
                   fOrdinal.Data(), gSystem->HostName());
         }
         {
            TIter next(fEnabledPackages);
            while (TObjString *str = (TObjString*) next())
               printf("%s\n", str->GetName());
         }
         if (IsMaster() && all)
            fProof->ShowEnabledPackages(all);
         LogToMaster();
         break;
      case TProof::kShowSubCache:
         (*mess) >> all;
         if (IsMaster() && all)
            fProof->ShowCache(all);
         LogToMaster();
         break;
      case TProof::kClearSubCache:
         file = "";
         if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
         if (IsMaster())
            fProof->ClearCache(file);
         break;
      case TProof::kShowSubPackages:
         (*mess) >> all;
         if (IsMaster() && all)
            fProof->ShowPackages(all);
         LogToMaster();
         break;
      case TProof::kDisableSubPackages:
         if (IsMaster())
            fProof->DisablePackages();
         break;
      case TProof::kDisableSubPackage:
         (*mess) >> package;
         if (IsMaster())
            fProof->DisablePackage(package);
         break;
      case TProof::kBuildSubPackage:
         (*mess) >> package;
         if (IsMaster())
            fProof->BuildPackage(package);
         break;
      case TProof::kUnloadPackage:
         (*mess) >> package;
         status = UnloadPackage(package);
         if (IsMaster() && status == 0)
            status = fProof->UnloadPackage(package);
         break;
      case TProof::kDisablePackage:
         (*mess) >> package;
         fPackageLock->Lock();
         
         gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
                       package.Data()));
         gSystem->Exec(Form("%s %s/%s.par", kRM, fPackageDir.Data(),
                       package.Data()));
         fPackageLock->Unlock();
         if (IsMaster())
            fProof->DisablePackage(package);
         break;
      case TProof::kUnloadPackages:
         status = UnloadPackages();
         if (IsMaster() && status == 0)
            status = fProof->UnloadPackages();
         break;
      case TProof::kDisablePackages:
         fPackageLock->Lock();
         gSystem->Exec(Form("%s %s/*", kRM, fPackageDir.Data()));
         fPackageLock->Unlock();
         if (IsMaster())
            fProof->DisablePackages();
         break;
      case TProof::kListEnabledPackages:
         msg.Reset(kPROOF_PACKAGE_LIST);
         msg << type << fEnabledPackages;
         fSocket->Send(msg);
         break;
      case TProof::kListPackages:
         {
            TList *pack = new TList;
            void *dir = gSystem->OpenDirectory(fPackageDir);
            if (dir) {
               TString pac(gSystem->GetDirEntry(dir));
               while (pac.Length() > 0) {
                  if (pac.EndsWith(".par")) {
                     pac.ReplaceAll(".par","");
                     pack->Add(new TObjString(pac.Data()));
                  }
                  pac = gSystem->GetDirEntry(dir);
               }
            }
            gSystem->FreeDirectory(dir);
            msg.Reset(kPROOF_PACKAGE_LIST);
            msg << type << pack;
            fSocket->Send(msg);
         }
         break;
      case TProof::kLoadMacro:
         (*mess) >> package;
         
         
         if (IsMaster())
            fProof->Load(package, kFALSE, kTRUE);
         
         fCacheLock->Lock();
         
         
         CopyFromCache(package, kTRUE);
         
         Info("HandleCache", "loading macro %s ...", package.Data());
         gROOT->ProcessLine(Form(".L %s", package.Data()));
         
         CopyToCache(package, 1);
         
         fCacheLock->Unlock();
         
         
         if (IsMaster())
            fProof->Load(package, kFALSE, kFALSE);
         
         LogToMaster();
         break;
      default:
         Error("HandleCache", "unknown type %d", type);
         break;
   }
   
   return status;
}
void TProofServ::HandleWorkerLists(TMessage *mess)
{
   
   PDB(kGlobal, 1)
      Info("HandleWorkerLists", "Enter");
   Int_t type = 0;
   TString ord;
   (*mess) >> type;
   switch (type) {
      case TProof::kActivateWorker:
         (*mess) >> ord;
         if (fProof) {
            Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
            Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
                            fProof->GetListOfBadSlaves()->GetSize();
            if (nact < nactmax) {
               fProof->ActivateWorker(ord);
               Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
               if (ord == "*") {
                  if (nactnew == nactmax) {
                     Info("HandleWorkerList","all workers (re-)activated");
                  } else {
                     Info("HandleWorkerList","%d workers could not be (re-)activated", nactmax - nactnew);
                  }
               } else {
                  if (nactnew == (nact + 1)) {
                     Info("HandleWorkerList","worker %s (re-)activated", ord.Data());
                  } else {
                     Info("HandleWorkerList","worker %s could not be (re-)activated:"
                                             " check the ordinal number", ord.Data());
                  }
               }
            } else {
               Info("HandleWorkerList","all workers are already active");
            }
         } else {
            Warning("HandleWorkerList","undefined PROOF session: protocol error?");
         }
         break;
      case TProof::kDeactivateWorker:
         (*mess) >> ord;
         if (fProof) {
            Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
            if (nact > 0) {
               fProof->DeactivateWorker(ord);
               Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
               if (ord == "*") {
                  if (nactnew == 0) {
                     Info("HandleWorkerList","all workers deactivated");
                  } else {
                     Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
                  }
               } else {
                  if (nactnew == (nact - 1)) {
                     Info("HandleWorkerList","worker %s deactivated", ord.Data());
                  } else {
                     Info("HandleWorkerList","worker %s could not be deactivated:"
                                             " check the ordinal number", ord.Data());
                  }
               }
            } else {
               Info("HandleWorkerList","all workers are already inactive");
            }
         } else {
            Warning("HandleWorkerList","undefined PROOF session: protocol error?");
         }
         break;
      default:
         Warning("HandleWorkerList","unknown action type (%d)", type);
   }
}
TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
                                                Int_t & )
{
   
   
   
   if (!workers) {
      Error("GetWorkers", "output list undefined");
      return kQueryStop;
   }
   
   TProofResourcesStatic *resources =
      new TProofResourcesStatic(fConfDir, fConfFile);
   fConfFile = resources->GetFileName(); 
   PDB(kGlobal,1)
         Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
   
   TProofNodeInfo *master = resources->GetMaster();
   if (!master) {
      PDB(kAll,1)
         Info("GetWorkers",
              "no appropriate master line found in %s", fConfFile.Data());
      return kQueryStop;
   } else {
      
      if (fImage.IsNull() && strlen(master->GetImage()) > 0)
         fImage = master->GetImage();
   }
   
   if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
      PDB(kAll,1)
         resources->GetSubmasters()->Print();
      TProofNodeInfo *ni = 0;
      TIter nw(resources->GetSubmasters());
      while ((ni = (TProofNodeInfo *) nw()))
         workers->Add(new TProofNodeInfo(*ni));
   } else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
      PDB(kAll,1)
         resources->GetWorkers()->Print();
      TProofNodeInfo *ni = 0;
      TIter nw(resources->GetWorkers());
      while ((ni = (TProofNodeInfo *) nw()))
         workers->Add(new TProofNodeInfo(*ni));
   }
   
   return kQueryOK;
}
FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
{
   
   
   
   FILE *oldferr = fgErrorHandlerFile;
   fgErrorHandlerFile = (ferr) ? ferr : stderr;
   return oldferr;
}
void TProofServ::ErrorHandler(Int_t level, Bool_t abort, const char *location,
                              const char *msg)
{
   
   
   if (gErrorIgnoreLevel == kUnset) {
      gErrorIgnoreLevel = 0;
      if (gEnv) {
         TString lvl = gEnv->GetValue("Root.ErrorIgnoreLevel", "Print");
         if (!lvl.CompareTo("Print", TString::kIgnoreCase))
            gErrorIgnoreLevel = kPrint;
         else if (!lvl.CompareTo("Info", TString::kIgnoreCase))
            gErrorIgnoreLevel = kInfo;
         else if (!lvl.CompareTo("Warning", TString::kIgnoreCase))
            gErrorIgnoreLevel = kWarning;
         else if (!lvl.CompareTo("Error", TString::kIgnoreCase))
            gErrorIgnoreLevel = kError;
         else if (!lvl.CompareTo("Break", TString::kIgnoreCase))
            gErrorIgnoreLevel = kBreak;
         else if (!lvl.CompareTo("SysError", TString::kIgnoreCase))
            gErrorIgnoreLevel = kSysError;
         else if (!lvl.CompareTo("Fatal", TString::kIgnoreCase))
            gErrorIgnoreLevel = kFatal;
      }
   }
   if (level < gErrorIgnoreLevel)
      return;
   
   if (level >= kError && gProofServ)
      gProofServ->LogToMaster();
   static TString syslogService;
   Bool_t tosyslog = (gProofServ && gProofServ->LogToSysLog()) ? kTRUE : kFALSE;
   if (tosyslog) {
      if (syslogService.IsNull()) {
         syslogService = gProofServ != 0 ? gProofServ->GetService() : "proof";
         gSystem->Openlog(syslogService, kLogPid | kLogCons, kLogLocal5);
      } else if (gProofServ != 0 && syslogService != gProofServ->GetService()) {
         
         syslogService = gProofServ->GetService();
         gSystem->Openlog(syslogService, kLogPid | kLogCons, kLogLocal5);
      }
   }
   const char *type   = 0;
   ELogLevel loglevel = kLogInfo;
   Int_t ipos = (location) ? strlen(location) : 0;
   if (level >= kPrint) {
      loglevel = kLogInfo;
      type = "Print";
   }
   if (level >= kInfo) {
      loglevel = kLogInfo;
      char *ps = (char *) strrchr(location, '|');
      if (ps) {
         ipos = (int)(ps - (char *)location);
         type = "SvcMsg";
      } else {
         type = "Info";
      }
   }
   if (level >= kWarning) {
      loglevel = kLogWarning;
      type = "Warning";
   }
   if (level >= kError) {
      loglevel = kLogErr;
      type = "Error";
   }
   if (level >= kBreak) {
      loglevel = kLogErr;
      type = "*** Break ***";
   }
   if (level >= kSysError) {
      loglevel = kLogErr;
      type = "SysError";
   }
   if (level >= kFatal) {
      loglevel = kLogErr;
      type = "Fatal";
   }
   TString buf;
   
   TTimeStamp ts;
   TString st(ts.AsString("lc"),19);
   if (!location || ipos == 0 ||
       (level >= kPrint && level < kInfo) ||
       (level >= kBreak && level < kSysError)) {
      fprintf(fgErrorHandlerFile, "%s %5d %s | %s: %s\n", st(11,8).Data(),
                                  gSystem->GetPid(),
                                 (gProofServ ? gProofServ->GetPrefix() : "proof"),
                                  type, msg);
      if (tosyslog)
         buf.Form("%s:%s:%s:%s", (gProofServ ? gProofServ->GetUser() : "unknown"),
                                 (gProofServ ? gProofServ->GetPrefix() : "proof"),
                                 type, msg);
   } else {
      fprintf(fgErrorHandlerFile, "%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
                                  gSystem->GetPid(),
                                 (gProofServ ? gProofServ->GetPrefix() : "proof"),
                                  type, ipos, location, msg);
      if (tosyslog)
         buf.Form("%s:%s:%s:<%.*s>:%s", (gProofServ ? gProofServ->GetUser() : "unknown"),
                                        (gProofServ ? gProofServ->GetPrefix() : "proof"),
                                        type, ipos, location, msg);
   }
   fflush(fgErrorHandlerFile);
   if (tosyslog)
      gSystem->Syslog(loglevel, buf);
   if (abort) {
      static Bool_t recursive = kFALSE;
      if (gProofServ != 0 && !recursive) {
         recursive = kTRUE;
         gProofServ->GetSocket()->Send(kPROOF_FATAL);
         recursive = kFALSE;
      }
      fprintf(fgErrorHandlerFile, "aborting\n");
      fflush(fgErrorHandlerFile);
      gSystem->StackTrace();
      gSystem->Abort();
   }
}
Int_t TProofServ::CopyFromCache(const char *macro, Bool_t cpbin)
{
   
   
   
   if (!macro || strlen(macro) <= 0)
      
      return -1;
   
   TString name = macro;
   TString acmode, args, io;
   name = gSystem->SplitAclicMode(name, acmode, args, io);
   PDB(kGlobal,1)
      Info("CopyFromCache","enter: names: %s, %s", macro, name.Data());
   
   Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
   if (!locked) fCacheLock->Lock();
   
   TString srcname = name;
   Int_t dot = srcname.Last('.');
   if (dot != kNPOS) {
      srcname.Remove(dot);
      srcname += ".*";
   }
   PDB(kCache,1)
      Info("CopyFromCache",
           "retrieving %s/%s from cache", fCacheDir.Data(), srcname.Data());
   gSystem->Exec(Form("%s %s/%s .", kCP, fCacheDir.Data(), srcname.Data()));
   
   if (!cpbin) {
      
      if (!locked) fCacheLock->Unlock();
      return 0;
   }
   
   TString binname = name;
   dot = binname.Last('.');
   if (dot != kNPOS) {
      binname.Replace(dot,1,"_");
      binname += ".";
   } else {
      PDB(kCache,1)
         Info("CopyFromCache",
              "non-standard name structure: %s ('.' missing)", name.Data());
      
      if (!locked) fCacheLock->Unlock();
      return 0;
   }
   
   TString vername;
   vername.Form(".%s", name.Data());
   Int_t dotv = vername.Last('.');
   if (dotv != kNPOS)
      vername.Remove(dotv);
   vername += ".binversion";
   
   TString v;
   Int_t rev = -1;
   FILE *f = fopen(Form("%s/%s", fCacheDir.Data(), vername.Data()), "r");
   if (f) {
      TString r;
      v.Gets(f);
      r.Gets(f);
      rev = (!r.IsNull() && r.IsDigit()) ? r.Atoi() : -1;
      fclose(f);
   }
   Bool_t okver = (v != gROOT->GetVersion()) ? kFALSE : kTRUE;
   Bool_t okrev = (gROOT->GetSvnRevision() > 0 && rev != gROOT->GetSvnRevision()) ? kFALSE : kTRUE;
   if (!f || !okver || !okrev) {
   PDB(kCache,1)
      Info("CopyFromCache",
           "removing binaries: 'f': %p, 'ROOT version': %s, 'ROOT revision': %s",
           f, (okver ? "OK" : "not OK"), (okrev ? "OK" : "not OK") );
      
      binname += "*";
      gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
      
      gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
      
      if (!locked) fCacheLock->Unlock();
      return 0;
   }
   
   void *dirp = gSystem->OpenDirectory(fCacheDir);
   if (dirp) {
      const char *e = 0;
      while ((e = gSystem->GetDirEntry(dirp))) {
         if (!strncmp(e, binname.Data(), binname.Length())) {
            TString fncache;
            fncache.Form("%s/%s", fCacheDir.Data(), e);
            Bool_t docp = kTRUE;
            FileStat_t stlocal, stcache;
            if (!gSystem->GetPathInfo(fncache, stcache)) {
               Int_t rc = gSystem->GetPathInfo(e, stlocal);
               if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
                  docp = kFALSE;
               
               if (docp) {
                  gSystem->Exec(Form("%s %s", kRM, e));
                  PDB(kCache,1)
                     Info("CopyFromCache",
                          "retrieving %s from cache", fncache.Data());
                  gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
               }
            }
         }
      }
      gSystem->FreeDirectory(dirp);
   }
   
   if (!locked) fCacheLock->Unlock();
   
   return 0;
}
Int_t TProofServ::CopyToCache(const char *macro, Int_t opt)
{
   
   
   
   
   
   
   
   
   
   if (!macro || strlen(macro) <= 0 || opt < 0 || opt > 1)
      
      return -1;
   
   TString name = macro;
   TString acmode, args, io;
   name = gSystem->SplitAclicMode(name, acmode, args, io);
   PDB(kGlobal,1)
      Info("CopyToCache","enter: opt: %d, names: %s, %s", opt, macro, name.Data());
   
   TString binname = name;
   Int_t dot = binname.Last('.');
   if (dot != kNPOS)
      binname.Replace(dot,1,"_");
   
   TString vername;
   vername.Form(".%s", name.Data());
   dot = vername.Last('.');
   if (dot != kNPOS)
      vername.Remove(dot);
   vername += ".binversion";
   Bool_t savever = kFALSE;
   
   Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
   if (!locked) fCacheLock->Lock();
   
   if (opt == 0) {
      
      PDB(kCache,1)
         Info("CopyToCache",
              "caching %s/%s ...", fCacheDir.Data(), name.Data());
      gSystem->Exec(Form("%s %s %s", kCP, name.Data(), fCacheDir.Data()));
      
      if (dot != kNPOS) {
         binname += ".*";
         PDB(kCache,1)
            Info("CopyToCache", "opt = 0: removing binaries '%s'", binname.Data());
         gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
         gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
      }
   } else if (opt == 1) {
      
      if (dot != kNPOS) {
         binname += ".";
         void *dirp = gSystem->OpenDirectory(".");
         if (dirp) {
            const char *e = 0;
            while ((e = gSystem->GetDirEntry(dirp))) {
               if (!strncmp(e, binname.Data(), binname.Length())) {
                  Bool_t docp = kTRUE;
                  FileStat_t stlocal, stcache;
                  if (!gSystem->GetPathInfo(e, stlocal)) {
                     TString fncache;
                     fncache.Form("%s/%s", fCacheDir.Data(), e);
                     Int_t rc = gSystem->GetPathInfo(fncache, stcache);
                     if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
                        docp = kFALSE;
                     
                     if (docp) {
                        gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
                        PDB(kCache,1)
                           Info("CopyToCache","caching %s ...", e);
                        gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
                        savever = kTRUE;
                     }
                  }
               }
            }
            gSystem->FreeDirectory(dirp);
         }
         
         if (savever) {
            PDB(kCache,1)
               Info("CopyToCache","updating version file %s ...", vername.Data());
            FILE *f = fopen(Form("%s/%s", fCacheDir.Data(), vername.Data()), "w");
            if (f) {
               fputs(gROOT->GetVersion(), f);
               fputs(Form("\n%d",gROOT->GetSvnRevision()), f);
               fclose(f);
            }
         }
      }
   }
   
   if (!locked) fCacheLock->Unlock();
   
   return 0;
}
void TProofServ::MakePlayer()
{
   
   TVirtualProofPlayer *p = 0;
   if (IsParallel()) {
      
      p = fProof->MakePlayer();
   } else {
      
      p = TVirtualProofPlayer::Create("slave", 0, fSocket);
      if (IsMaster())
         fProof->SetPlayer(p);
   }
   
   fPlayer = p;
}
void TProofServ::DeletePlayer()
{
   
   if (IsMaster()) {
      if (fProof) fProof->SetPlayer(0);
   } else {
      delete fPlayer;
   }
   fPlayer = 0;
}
Int_t TProofServ::GetPriority()
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB","");
   TString sqluser = gEnv->GetValue("ProofServ.QueryLogUser","");
   TString sqlpass = gEnv->GetValue("ProofServ.QueryLogPasswd","");
   Int_t priority = 100;
   if (sqlserv == "")
      return priority;
   TString sql;
   sql.Form("SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
   
   TSQLServer *db =  TSQLServer::Connect(sqlserv, sqluser, sqlpass);
   if (!db || db->IsZombie()) {
      Error("GetPriority", "failed to connect to SQL server %s as %s %s",
            sqlserv.Data(), sqluser.Data(), sqlpass.Data());
      printf("%s\n", sql.Data());
   } else {
      TSQLResult *res = db->Query(sql);
      if (!res) {
         Error("GetPriority", "query into proofpriority failed");
         printf("%s\n", sql.Data());
      } else {
         TSQLRow *row = res->Next();   
         priority = atoi(row->GetField(0));
         delete row;
      }
      delete res;
   }
   delete db;
   return priority;
}
Int_t TProofServ::SendAsynMessage(const char *msg, Bool_t lf)
{
   
   
   
   
   
   static TMessage m(kPROOF_MESSAGE);
   
   
   PDB(kAsyn,1)
      Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
   if (fSocket && msg) {
      m.Reset(kPROOF_MESSAGE);
      m << TString(msg) << lf;
      return fSocket->Send(m);
   }
   
   return -1;
}
void TProofServ::FlushLogFile()
{
   
   
   
   lseek(fLogFileDes, lseek(fileno(stdout), (off_t)0, SEEK_END), SEEK_SET);
}
void TProofServ::HandleException(Int_t sig)
{
   
   Error("HandleException","exception triggered by signal: %d", sig);
   gSystem->Exit(sig);
}
Int_t TProofServ::HandleDataSets(TMessage *mess)
{
   
   if (gDebug > 0)
      Info("HandleDataSets", "enter");
   
   if (!fDataSetManager) {
      Error("HandleDataSets", "data manager instance undefined! - Protocol error?");
      return -1;
   }
   
   TString dsUser, dsGroup, dsName, uri, opt;
   Int_t rc = 0;
   
   Int_t type = 0;
   (*mess) >> type;
   switch (type) {
      case TProof::kCheckDataSetName:
         
         
         {
            (*mess) >> uri;
            if (fDataSetManager->ExistsDataSet(uri))
               
               return -1;
         }
         break;
      case TProof::kRegisterDataSet:
         
         {
            if (fDataSetManager->TestBit(TProofDataSetManager::kAllowRegister)) {
               (*mess) >> uri;
               (*mess) >> opt;
               
               TFileCollection *dataSet =
                  dynamic_cast<TFileCollection*> ((mess->ReadObject(TFileCollection::Class())));
               if (!dataSet || dataSet->GetList()->GetSize() == 0) {
                  Error("HandleDataSets", "can not save an empty list.");
                  return -1;
               }
               
               rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
               delete dataSet;
               return rc;
            } else {
               Info("HandleDataSets", "dataset registration not allowed");
               return -1;
            }
         }
         break;
      case TProof::kShowDataSets:
         {
            (*mess) >> uri;
            
            fDataSetManager->GetDataSets(uri, (UInt_t)TProofDataSetManager::kPrint);
         }
         break;
      case TProof::kGetDataSets:
         {
            (*mess) >> uri;
            
            TMap *returnMap = fDataSetManager->GetDataSets(uri, (UInt_t)TProofDataSetManager::kExport);
            if (returnMap) {
               
               fSocket->SendObject(returnMap, kMESS_OK);
               returnMap->DeleteAll();
            } else {
               
               return -1;
            }
         }
         break;
      case TProof::kGetDataSet:
         {
            (*mess) >> uri;
            
            TFileCollection *fileList = fDataSetManager->GetDataSet(uri);
            if (fileList) {
               fSocket->SendObject(fileList, kMESS_OK);
               delete fileList;
            } else {
               
               return -1;
            }
         }
         break;
      case TProof::kRemoveDataSet:
         {
            if (fDataSetManager->TestBit(TProofDataSetManager::kAllowRegister)) {
               (*mess) >> uri;
               if (!fDataSetManager->RemoveDataSet(uri)) {
                  
                  return -1;
               }
            } else {
               Info("HandleDataSets", "dataset creation / removal not allowed");
               return -1;
            }
         }
         break;
      case TProof::kVerifyDataSet:
         {
            if (fDataSetManager->TestBit(TProofDataSetManager::kAllowVerify)) {
               (*mess) >> uri;
               TProofServLogHandlerGuard hg(fLogFile,  fSocket);
               rc = fDataSetManager->ScanDataSet(uri);
            } else {
               Info("HandleDataSets", "dataset verification not allowed");
               return -1;
            }
         }
         break;
      case TProof::kGetQuota:
         {
            if (fDataSetManager->TestBit(TProofDataSetManager::kCheckQuota)) {
               TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
               if (groupQuotaMap) {
                  
                  fSocket->SendObject(groupQuotaMap, kMESS_OK);
               } else {
                  return -1;
               }
            } else {
               Info("HandleDataSets", "quota control disabled");
               return -1;
            }
         }
         break;
      case TProof::kShowQuota:
         {
            if (fDataSetManager->TestBit(TProofDataSetManager::kCheckQuota)) {
               (*mess) >> opt;
               
               fDataSetManager->ShowQuota(opt);
            } else {
               Info("HandleDataSets", "quota control disabled");
            }
         }
         break;
      default:
         rc = -1;
         Error("HandleDataSets", "unknown type %d", type);
         break;
   }
   
   return rc;
}
void TProofServ::HandleFork(TMessage *)
{
   
   Info("HandleFork", "fork cloning not implemented");
}
Int_t TProofServ::Fork()
{
   
   
   
   
#ifndef WIN32
   
   pid_t pid;
   if ((pid = fork()) < 0) {
      Error("Fork", "failed to fork");
      return pid;
   }
   
   if (!pid) return pid;
   
   if (!fReaperTimer) {
      fReaperTimer = new TReaperTimer(1000);
      fReaperTimer->Start(-1);
   }
   
   fReaperTimer->AddPid(pid);
   
   return pid;
#else
   Warning("Fork", "Functionality not provided under windows");
   return -1;
#endif
}
Int_t TProofLockPath::Lock()
{
   
   
   const char *pname = GetName();
   if (gSystem->AccessPathName(pname))
      fLockId = open(pname, O_CREAT|O_RDWR, 0644);
   else
      fLockId = open(pname, O_RDWR);
   if (fLockId == -1) {
      SysError("Lock", "cannot open lock file %s", pname);
      return -1;
   }
   PDB(kPackage, 2)
      Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
   
#if !defined(R__WIN32) && !defined(R__WINGCC)
   if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
      SysError("Lock", "error locking %s", pname);
      close(fLockId);
      fLockId = -1;
      return -1;
   }
#endif
   PDB(kPackage, 2)
      Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
   return 0;
}
Int_t TProofLockPath::Unlock()
{
   
   
   if (!IsLocked())
      return 0;
   PDB(kPackage, 2)
      Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
   
   lseek(fLockId, 0, SEEK_SET);
#if !defined(R__WIN32) && !defined(R__WINGCC)
   if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
      SysError("Unlock", "error unlocking %s", GetName());
      close(fLockId);
      fLockId = -1;
      return -1;
   }
#endif
   PDB(kPackage, 2)
      Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());
   close(fLockId);
   fLockId = -1;
   return 0;
}
Last change: Thu Dec 18 09:31:33 2008
Last generated: 2008-12-18 09:31
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.