// @(#)root/proof:$Id: TProof.h 26951 2008-12-16 12:04:38Z ganis $
// Author: Fons Rademakers   13/02/97

/*************************************************************************
 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TProof
#define ROOT_TProof


//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TProof                                                               //
//                                                                      //
// This class controls a Parallel ROOT Facility, PROOF, cluster.        //
// It fires the worker servers, it keeps track of how many workers are  //
// running, it keeps track of the workers running status, it broadcasts //
// messages to all workers, it collects results, etc.                   //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#ifndef ROOT_TProof
#include "TProof.h"
#endif
#ifndef ROOT_TProofMgr
#include "TProofMgr.h"
#endif
#ifndef ROOT_TProofDebug
#include "TProofDebug.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_MessageTypes
#include "MessageTypes.h"
#endif
#ifndef ROOT_TMD5
#include "TMD5.h"
#endif
#ifndef ROOT_TSysEvtHandler
#include "TSysEvtHandler.h"
#endif
#ifndef ROOT_TThread
#include "TThread.h"
#endif
#ifndef ROOT_TUrl
#include "TUrl.h"
#endif

#include <map>

#ifdef R__GLOBALSTL
namespace std { using ::map; }
#endif

#define CANNOTUSE(x) Info(x,"Not manager: cannot use this method")

class TChain;
class TCondor;
class TCondorSlave;
class TDrawFeedback;
class TDSet;
class TEventList;
class THashList;
class TList;
class TCollection;
class TMessage;
class TMonitor;
class TPluginHandler;
class TProof;
class TProofInputHandler;
class TProofInterruptHandler;
class TProofLockPath;
class TVirtualProofPlayer;
class TProofPlayer;
class TProofPlayerRemote;
class TProofProgressDialog;
class TProofServ;
class TQueryResult;
class TSignalHandler;
class TSlave;
class TSemaphore;
class TSocket;
class TTree;
class TVirtualMutex;
class TFileCollection;
class TMap;
class TProofDataSetManager;

// protocol changes:
// 1 -> 2: new arguments for Process() command, option added
// 2 -> 3: package manager enabling protocol changed
// 3 -> 4: introduction of multi-level-master support
// 4 -> 5: added friends support
// 5 -> 6: drop TFTP, support for asynchronous queries
// 6 -> 7: support for multisessions, archieve, retrieve, ...
// 7 -> 8: return number of entries in GetNextPacket
// 8 -> 9: support for stateless connection via xproofd
// 9 -> 10: new features requested, tested at CAF
// 10 -> 11: new merging strategy
// 11 -> 12: new progress message
// 12 -> 13: exchange version/architecture/compiler info
// 13 -> 14: new proofserv environment setting
// 14 -> 15: add support for entry lists; new version of TFileInfo
// 15 -> 16: add support for generic non-data based processing
// 16 -> 17: new dataset handling system; support for TFileCollection processing 
// 17 -> 18: support for reconnection on daemon restarts
// 18 -> 19: TProofProgressStatus used in kPROOF_PROGRESS, kPROOF_STOPPROCESS
//           and kPROOF_GETNEXTPACKET messages in Master - worker communication
// 19 -> 20: Fix the asynchronous mode (required changes in some messages)

// PROOF magic constants
const Int_t       kPROOF_Protocol        = 20;            // protocol version number
const Int_t       kPROOF_Port            = 1093;          // IANA registered PROOF port
const char* const kPROOF_ConfFile        = "proof.conf";  // default config file
const char* const kPROOF_ConfDir         = "/usr/local/root";  // default config dir
const char* const kPROOF_WorkDir         = ".proof";      // default working directory
const char* const kPROOF_CacheDir        = "cache";       // file cache dir, under WorkDir
const char* const kPROOF_PackDir         = "packages";    // package dir, under WorkDir
const char* const kPROOF_QueryDir        = "queries";     // query dir, under WorkDir
const char* const kPROOF_DataSetDir      = "datasets";    // dataset dir, under WorkDir
const char* const kPROOF_CacheLockFile   = "proof-cache-lock-";   // cache lock file
const char* const kPROOF_PackageLockFile = "proof-package-lock-"; // package lock file
const char* const kPROOF_QueryLockFile   = "proof-query-lock-";   // query lock file
const char* const kPROOF_TerminateWorker = "+++ terminating +++"; // signal worker termination in MarkBad
const char* const kPROOF_InputDataFile   = "inputdata.root";      // Default input data file name

#ifndef R__WIN32
const char* const kCP     = "/bin/cp -fp";
const char* const kRM     = "/bin/rm -rf";
const char* const kLS     = "/bin/ls -l";
const char* const kUNTAR  = "%s -c %s/%s | (cd %s; tar xf -)";
const char* const kUNTAR2 = "%s -c %s | (cd %s; tar xf -)";
const char* const kUNTAR3 = "%s -c %s | (tar xf -)";
const char* const kGUNZIP = "gunzip";
#else
const char* const kCP     = "copy";
const char* const kRM     = "delete";
const char* const kLS     = "dir";
const char* const kUNTAR  = "...";
const char* const kUNTAR2 = "...";
const char* const kUNTAR3 = "...";
const char* const kGUNZIP = "gunzip";
#endif

R__EXTERN TVirtualMutex *gProofMutex;

typedef void (*PrintProgress_t)(Long64_t tot, Long64_t proc, Float_t proctime);

// Helper classes used for parallel startup
class TProofThreadArg {
public:
   TUrl         *fUrl;
   TString       fOrd;
   Int_t         fPerf;
   TString       fImage;
   TString       fWorkdir;
   TString       fMsd;
   TList        *fSlaves;
   TProof       *fProof;
   TCondorSlave *fCslave;
   TList        *fClaims;
   Int_t         fType;

   TProofThreadArg(const char *h, Int_t po, const char *o, Int_t pe,
                   const char *i, const char *w,
                   TList *s, TProof *prf);

   TProofThreadArg(TCondorSlave *csl, TList *clist,
                   TList *s, TProof *prf);

   TProofThreadArg(const char *h, Int_t po, const char *o,
                   const char *i, const char *w, const char *m,
                   TList *s, TProof *prf);

   virtual ~TProofThreadArg() { if (fUrl) delete fUrl; }

private:

   TProofThreadArg(const TProofThreadArg&); // Not implemented
   TProofThreadArg& operator=(const TProofThreadArg&); // Not implemented

};

// PROOF Thread class for parallel startup
class TProofThread {
public:
   TThread         *fThread;
   TProofThreadArg *fArgs;

   TProofThread(TThread *t, TProofThreadArg *a): fThread(t), fArgs(a) {}
   virtual ~TProofThread() { SafeDelete(fThread); SafeDelete(fArgs); }
private:

   TProofThread(const TProofThread&); // Not implemented
   TProofThread& operator=(const TProofThread&); // Not implemented

};

// PROOF Interrupt signal handler
class TProofInterruptHandler : public TSignalHandler {
private:

   TProofInterruptHandler(const TProofInterruptHandler&); // Not implemented
   TProofInterruptHandler& operator=(const TProofInterruptHandler&); // Not implemented

   TProof *fProof;
public:
   TProofInterruptHandler(TProof *p)
      : TSignalHandler(kSigInterrupt, kFALSE), fProof(p) { }
   Bool_t Notify();
};

// Input handler for messages from TProofServ
class TProofInputHandler : public TFileHandler {
private:

   TProofInputHandler(const TProofInputHandler&); // Not implemented
   TProofInputHandler& operator=(const TProofInputHandler&); // Not implemented

   TSocket *fSocket;
   TProof  *fProof;
public:
   TProofInputHandler(TProof *p, TSocket *s);
   Bool_t Notify();
   Bool_t ReadNotify() { return Notify(); }
};

// Slaves info class
class TSlaveInfo : public TObject {
public:
   enum ESlaveStatus { kActive, kNotActive, kBad };

   TString      fOrdinal;      //slave ordinal
   TString      fHostName;     //hostname this slave is running on
   TString      fMsd;          //mass storage domain slave is in
   Int_t        fPerfIndex;    //relative performance of this slave
   ESlaveStatus fStatus;       //slave status

   TSlaveInfo(const char *ordinal = "", const char *host = "", Int_t perfidx = 0,
              const char *msd = "") :
              fOrdinal(ordinal), fHostName(host), fMsd(msd),
              fPerfIndex(perfidx), fStatus(kNotActive) { }

   const char *GetMsd() const { return fMsd; }
   const char *GetName() const { return fHostName; }
   const char *GetOrdinal() const { return fOrdinal; }
   void        SetStatus(ESlaveStatus stat) { fStatus = stat; }

   Int_t  Compare(const TObject *obj) const;
   Bool_t IsSortable() const { return kTRUE; }
   void   Print(Option_t *option="") const;

   ClassDef(TSlaveInfo,2) //basic info on slave
};

class TProof : public TNamed, public TQObject {

friend class TPacketizer;
friend class TPacketizerDev;
friend class TPacketizerAdaptive;
friend class TProofLite;
friend class TProofDataSetManager;
friend class TProofServ;
friend class TProofInputHandler;
friend class TProofInterruptHandler;
friend class TProofPlayer;
friend class TProofPlayerLite;
friend class TProofPlayerRemote;
friend class TProofProgressDialog;
friend class TSlave;
friend class TSlaveLite;
friend class TVirtualPacketizer;
friend class TXSlave;
friend class TXSocket;        // to access kPing
friend class TXSocketHandler; // to access fCurrentMonitor and CollectInputFrom
friend class TXProofMgr;      // to access EUrgent
friend class TXProofServ;     // to access EUrgent

public:
   // PROOF status bits
   enum EStatusBits {
      kUsingSessionGui     = BIT(14),
      kNewInputData        = BIT(15),
      kIsClient            = BIT(16),
      kIsMaster            = BIT(17)
   };
   enum EQueryMode {
      kSync                = 0,
      kAsync               = 1
   };
   enum EUploadOpt {
      kAppend              = 0x1,
      kOverwriteDataSet    = 0x2,
      kNoOverwriteDataSet  = 0x4,
      kOverwriteAllFiles   = 0x8,
      kOverwriteNoFiles    = 0x10,
      kAskUser             = 0x0
   };
   enum ERegisterOpt {
      kFailIfExists        = 0,
      kOverwriteIfExists   = 1,
      kMergeIfExists       = 2
   };
   enum EUploadDataSetAnswer {
      kError               = -1,
      kDataSetExists       = -2,
      kFail		   = -3
   };
   enum EUploadPackageOpt {
      kUntar               = 0x0,  //Untar over existing dir [default]
      kRemoveOld           = 0x1   //Remove existing dir with same name
   };
   enum ERunStatus {
      kRunning             = 0,    // Normal status
      kStopped             = 1,    // After the stop button has been pressed
      kAborted             = 2     // After the abort button has been pressed
   };

private:
   enum EUrgent {
      kLocalInterrupt      = -1,
      kPing                = 0,
      kHardInterrupt       = 1,
      kSoftInterrupt,
      kShutdownInterrupt
   };
   enum EProofCacheCommands {
      kShowCache           = 1,
      kClearCache          = 2,
      kShowPackages        = 3,
      kClearPackages       = 4,
      kClearPackage        = 5,
      kBuildPackage        = 6,
      kLoadPackage         = 7,
      kShowEnabledPackages = 8,
      kShowSubCache        = 9,
      kClearSubCache       = 10,
      kShowSubPackages     = 11,
      kDisableSubPackages  = 12,
      kDisableSubPackage   = 13,
      kBuildSubPackage     = 14,
      kUnloadPackage       = 15,
      kDisablePackage      = 16,
      kUnloadPackages      = 17,
      kDisablePackages     = 18,
      kListPackages        = 19,
      kListEnabledPackages = 20,
      kLoadMacro           = 21
   };
   enum EProofDataSetCommands {
      kUploadDataSet       = 1,  //Upload a dataset
      kCheckDataSetName    = 2,  //Check wheter dataset of this name exists
      kGetDataSets         = 3,  //List datasets saved on  the master node
      kRegisterDataSet     = 4,  //Save a TList object as a dataset
      kGetDataSet          = 5,  //Get a TFileCollection of TFileInfo objects
      kVerifyDataSet       = 6,  //Try open all files from a dataset and report results
      kRemoveDataSet       = 7,  //Remove a dataset but leave files belonging to it
      kMergeDataSet        = 8,  //Add new files to an existing dataset
      kShowDataSets        = 9,  //Shows datasets, returns formatted output
      kGetQuota            = 10, //Get quota info per group
      kShowQuota           = 11  //Show quotas
   };
   enum ESendFileOpt {
      kAscii               = 0x0,
      kBinary              = 0x1,
      kForce               = 0x2,
      kForward             = 0x4,
      kCpBin               = 0x8,
      kCp                  = 0x10
   };
   enum EProofWrkListAction {
      kActivateWorker      = 1,
      kDeactivateWorker    = 2
   };
   enum EBuildPackageOpt {
      kDontBuildOnClient   = -2,
      kBuildOnSlavesNoWait = -1,
      kBuildAll            = 0,
      kCollectBuildResults = 1
   };
   enum EProofShowQuotaOpt {
      kPerGroup = 0x1,
      kPerUser = 0x2
   };

   Bool_t          fValid;           //is this a valid proof object
   TString         fMaster;          //master server ("" if a master); used in the browser
   TString         fWorkDir;         //current work directory on remote servers
   Int_t           fLogLevel;        //server debug logging level
   Int_t           fStatus;          //remote return status (part of kPROOF_LOGDONE)
   Int_t           fCheckFileStatus; //remote return status after kPROOF_CHECKFILE
   TList          *fRecvMessages;    //Messages received during collect not yet processed
   TList          *fSlaveInfo;       //!list returned by kPROOF_GETSLAVEINFO
   Bool_t          fMasterServ;      //true if we are a master server
   Bool_t          fSendGroupView;   //if true send new group view
   TList          *fActiveSlaves;    //list of active slaves (subset of all slaves)
   TList          *fInactiveSlaves;  //list of inactive slaves (good but not used for processing)
   TList          *fUniqueSlaves;    //list of all active slaves with unique file systems
   TList          *fAllUniqueSlaves;  //list of all active slaves with unique file systems, including all submasters
   TList          *fNonUniqueMasters; //list of all active masters with a nonunique file system
   TMonitor       *fActiveMonitor;   //monitor activity on all active slave sockets
   TMonitor       *fUniqueMonitor;   //monitor activity on all unique slave sockets
   TMonitor       *fAllUniqueMonitor; //monitor activity on all unique slave sockets, including all submasters
   TMonitor       *fCurrentMonitor;  //currently active monitor
   Long64_t        fBytesRead;       //bytes read by all slaves during the session
   Float_t         fRealTime;        //realtime spent by all slaves during the session
   Float_t         fCpuTime;         //CPU time spent by all slaves during the session
   TSignalHandler *fIntHandler;      //interrupt signal handler (ctrl-c)
   TPluginHandler *fProgressDialog;  //progress dialog plugin
   Bool_t          fProgressDialogStarted; //indicates if the progress dialog is up
   TVirtualProofPlayer *fPlayer;     //current player
   TList          *fFeedback;        //list of names to be returned as feedback
   TList          *fChains;          //chains with this proof set
   struct MD5Mod_t {
      TMD5   fMD5;                   //file's md5
      Long_t fModtime;               //file's modification time
   };
   typedef std::map<TString, MD5Mod_t> FileMap_t;
   FileMap_t       fFileMap;         //map keeping track of a file's md5 and mod time
   TDSet          *fDSet;            //current TDSet being validated

   Int_t           fNotIdle;         //Number of non-idle sub-nodes
   Bool_t          fSync;            //true if type of currently processed query is sync
   ERunStatus      fRunStatus;       //run status

   Bool_t          fRedirLog;        //redirect received log info
   TString         fLogFileName;     //name of the temp file for redirected logs
   FILE           *fLogFileW;        //temp file to redirect logs
   FILE           *fLogFileR;        //temp file to read redirected logs
   Bool_t          fLogToWindowOnly; //send log to window only

   TList          *fWaitingSlaves;   //stores a TPair of the slaves's TSocket and TMessage
   TList          *fQueries;         //list of TProofQuery objects
   Int_t           fOtherQueries;    //number of queries in list from previous sessions
   Int_t           fDrawQueries;     //number of draw queries during this sessions
   Int_t           fMaxDrawQueries;  //max number of draw queries kept
   Int_t           fSeqNum;          //Remote sequential # of the last query submitted

   Int_t           fSessionID;       //remote ID of the session

   Bool_t          fEndMaster;       //true for a master in direct contact only with workers

   TString         fPackageDir;      //package directory (used on client)
   THashList      *fGlobalPackageDirList;//list of directories containing global packages libs
   TProofLockPath *fPackageLock;     //package lock
   TList          *fEnabledPackagesOnClient; //list of packages enabled on client

   TList          *fInputData;       //Input data objects sent over via file
   TString         fInputDataFile;   //File with input data objects

   PrintProgress_t fPrintProgress;   //Function function to display progress info in batch mode

   TVirtualMutex  *fCloseMutex;      // Avoid crashes in MarkBad or alike while closing

   TList          *fLoadedMacros;    // List of loaded macros (just file names)
   static TList   *fgProofEnvList;   // List of TNameds defining environment
                                     // variables to pass to proofserv
protected:
   enum ESlaves { kAll, kActive, kUnique, kAllUnique };

   TUrl            fUrl;            //Url of the master
   TString         fConfFile;       //file containing config information
   TString         fConfDir;        //directory containing cluster config information
   TString         fImage;          //master's image name
   Int_t           fProtocol;       //remote PROOF server protocol version number
   TList          *fSlaves;         //list of all slave servers as in config file
   TList          *fBadSlaves;      //dead slaves (subset of all slaves)
   TMonitor       *fAllMonitor;     //monitor activity on all valid slave sockets
   Bool_t          fDataReady;      //true if data is ready to be analyzed
   Long64_t        fBytesReady;     //number of bytes staged
   Long64_t        fTotalBytes;     //number of bytes to be analyzed
   TList          *fAvailablePackages; //list of available packages
   TList          *fEnabledPackages;   //list of enabled packages
   TList          *fRunningDSets;   // Temporary datasets used for async running

   Int_t           fCollectTimeout; // Timeout for (some) collect actions

   TString         fDataPoolUrl;    // default data pool entry point URL
   TProofMgr::EServType fServType;  // type of server: proofd, XrdProofd
   TProofMgr      *fManager;        // manager to which this session belongs (if any)
   EQueryMode      fQueryMode;      // default query mode
   Bool_t          fDynamicStartup; // are the workers started dynamically?

   static TSemaphore *fgSemaphore;   //semaphore to control no of parallel startup threads

private:
   TProof(const TProof &);           // not implemented
   void operator=(const TProof &);   // idem

   void     CleanGDirectory(TList *ol);

   Int_t    Exec(const char *cmd, ESlaves list, Bool_t plusMaster);
   Int_t    SendCommand(const char *cmd, ESlaves list = kActive);
   Int_t    SendCurrentState(ESlaves list = kActive);
   Bool_t   CheckFile(const char *file, TSlave *sl, Long_t modtime, Int_t cpopt = (kCp | kCpBin));
   Int_t    SendFile(const char *file, Int_t opt = (kBinary | kForward | kCp | kCpBin),
                     const char *rfile = 0, TSlave *sl = 0);
   Int_t    SendObject(const TObject *obj, ESlaves list = kActive);
   Int_t    SendGroupView();
   Int_t    SendInitialState();
   Int_t    SendPrint(Option_t *option="");
   Int_t    Ping(ESlaves list);
   void     Interrupt(EUrgent type, ESlaves list = kActive);
   void     AskStatistics();
   void     AskParallel();
   Int_t    GoParallel(Int_t nodes, Bool_t accept = kFALSE, Bool_t random = kFALSE);
   Int_t    SetParallelSilent(Int_t nodes, Bool_t random = kFALSE);
   void     RecvLogFile(TSocket *s, Int_t size);
   void     NotifyLogMsg(const char *msg, const char *sfx = "\n");
   Int_t    BuildPackage(const char *package, EBuildPackageOpt opt = kBuildAll);
   Int_t    BuildPackageOnClient(const TString &package);
   Int_t    LoadPackage(const char *package, Bool_t notOnClient = kFALSE);
   Int_t    LoadPackageOnClient(const TString &package);
   Int_t    UnloadPackage(const char *package);
   Int_t    UnloadPackageOnClient(const char *package);
   Int_t    UnloadPackages();
   Int_t    UploadPackageOnClient(const TString &package, EUploadPackageOpt opt, TMD5 *md5);
   Int_t    DisablePackage(const char *package);
   Int_t    DisablePackageOnClient(const char *package);
   Int_t    DisablePackages();

   void     Activate(TList *slaves = 0);
   Int_t    Broadcast(const TMessage &mess, TList *slaves);
   Int_t    Broadcast(const TMessage &mess, ESlaves list = kActive);
   Int_t    Broadcast(const char *mess, Int_t kind, TList *slaves);
   Int_t    Broadcast(const char *mess, Int_t kind = kMESS_STRING, ESlaves list = kActive);
   Int_t    Broadcast(Int_t kind, TList *slaves) { return Broadcast(0, kind, slaves); }
   Int_t    Broadcast(Int_t kind, ESlaves list = kActive) { return Broadcast(0, kind, list); }
   Int_t    BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks);
   Int_t    BroadcastFile(const char *file, Int_t opt, const char *rfile = 0, ESlaves list = kAllUnique);
   Int_t    BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list = kAllUnique);
   Int_t    BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers);
   Int_t    BroadcastObject(const TObject *obj, Int_t kind, TList *slaves);
   Int_t    BroadcastObject(const TObject *obj, Int_t kind = kMESS_OBJECT, ESlaves list = kActive);
   Int_t    BroadcastRaw(const void *buffer, Int_t length, TList *slaves);
   Int_t    BroadcastRaw(const void *buffer, Int_t length, ESlaves list = kActive);
   Int_t    Collect(const TSlave *sl, Long_t timeout = -1, Int_t endtype = -1);
   Int_t    Collect(TMonitor *mon, Long_t timeout = -1, Int_t endtype = -1);
   Int_t    CollectInputFrom(TSocket *s, Int_t endtype = -1);
   Int_t    HandleInputMessage(TSlave *wrk, TMessage *m);
   void     SetMonitor(TMonitor *mon = 0, Bool_t on = kTRUE);

   void     ReleaseMonitor(TMonitor *mon);

   void     FindUniqueSlaves();
   TSlave  *FindSlave(TSocket *s) const;
   TList   *GetListOfSlaves() const { return fSlaves; }
   TList   *GetListOfInactiveSlaves() const { return fInactiveSlaves; }
   TList   *GetListOfUniqueSlaves() const { return fUniqueSlaves; }
   TList   *GetListOfBadSlaves() const { return fBadSlaves; }
   Int_t    GetNumberOfSlaves() const;
   Int_t    GetNumberOfActiveSlaves() const;
   Int_t    GetNumberOfInactiveSlaves() const;
   Int_t    GetNumberOfUniqueSlaves() const;
   Int_t    GetNumberOfBadSlaves() const;

   Bool_t   IsEndMaster() const { return fEndMaster; }
   void     ModifyWorkerLists(const char *ord, Bool_t add);

   Bool_t   IsSync() const { return fSync; }
   void     InterruptCurrentMonitor();

   void     SetRunStatus(ERunStatus rst) { fRunStatus = rst; }

   void     MarkBad(TSlave *wrk, const char *reason = 0);
   void     MarkBad(TSocket *s, const char *reason = 0);
   void     TerminateWorker(TSlave *wrk);
   void     TerminateWorker(const char *ord);

   void     ActivateAsyncInput();
   void     DeActivateAsyncInput();

   Int_t    GetQueryReference(Int_t qry, TString &ref);

   void     PrintProgress(Long64_t total, Long64_t processed, Float_t procTime = -1.);

protected:
   TProof(); // For derived classes to use
   Int_t           Init(const char *masterurl, const char *conffile,
                        const char *confdir, Int_t loglevel,
                        const char *alias = 0);
   virtual Bool_t  StartSlaves(Bool_t parallel, Bool_t attach = kFALSE);
   Int_t AddWorkers(TList *wrks);
   Int_t RemoveWorkers(TList *wrks);

   void                         SetPlayer(TVirtualProofPlayer *player);
   TVirtualProofPlayer         *GetPlayer() const { return fPlayer; }
   virtual TVirtualProofPlayer *MakePlayer(const char *player = 0, TSocket *s = 0);

   void    UpdateDialog();

   void    HandleLibIncPath(const char *what, Bool_t add, const char *dirs);

   TList  *GetListOfActiveSlaves() const { return fActiveSlaves; }
   TSlave *CreateSlave(const char *url, const char *ord,
                       Int_t perf, const char *image, const char *workdir);
   TSlave *CreateSubmaster(const char *url, const char *ord,
                           const char *image, const char *msd);

   virtual void SaveWorkerInfo();

   Int_t    Collect(ESlaves list = kActive, Long_t timeout = -1, Int_t endtype = -1);
   Int_t    Collect(TList *slaves, Long_t timeout = -1, Int_t endtype = -1);

   void         SetDSet(TDSet *dset) { fDSet = dset; }
   virtual void ValidateDSet(TDSet *dset);

   TPluginHandler *GetProgressDialog() const { return fProgressDialog; }

   Int_t AssertPath(const char *path, Bool_t writable);

   void PrepareInputDataFile(TString &dataFile);
   virtual void SendInputDataFile();

   static void *SlaveStartupThread(void *arg);

   static Int_t AssertDataSet(TDSet *dset, TList *input,
                              TProofDataSetManager *mgr, TString &emsg);
   // Input data handling
   static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg);
   static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg);
   static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg);

public:
   TProof(const char *masterurl, const char *conffile = kPROOF_ConfFile,
          const char *confdir = kPROOF_ConfDir, Int_t loglevel = 0,
          const char *alias = 0, TProofMgr *mgr = 0);
   virtual ~TProof();

   void        cd(Int_t id = -1);

   Int_t       Ping();
   void        Touch();
   Int_t       Exec(const char *cmd, Bool_t plusMaster = kFALSE);

   virtual Long64_t Process(TDSet *dset, const char *selector,
                            Option_t *option = "", Long64_t nentries = -1,
                            Long64_t firstentry = 0);
   virtual Long64_t Process(TFileCollection *fc, const char *selector,
                            Option_t *option = "", Long64_t nentries = -1,
                            Long64_t firstentry = 0);
   virtual Long64_t Process(const char *dsetname, const char *selector,
                            Option_t *option = "", Long64_t nentries = -1,
                            Long64_t firstentry = 0, TObject *enl = 0);
   virtual Long64_t Process(const char *selector, Long64_t nentries,
                            Option_t *option = "");

   virtual Long64_t DrawSelect(TDSet *dset, const char *varexp,
                          const char *selection = "",
                          Option_t *option = "", Long64_t nentries = -1,
                          Long64_t firstentry = 0);
   Long64_t    DrawSelect(const char *dsetname, const char *varexp,
                          const char *selection = "",
                          Option_t *option = "", Long64_t nentries = -1,
                          Long64_t firstentry = 0, TObject *enl = 0);
   Int_t       Archive(Int_t query, const char *url);
   Int_t       Archive(const char *queryref, const char *url = 0);
   Int_t       CleanupSession(const char *sessiontag);
   Long64_t    Finalize(Int_t query = -1, Bool_t force = kFALSE);
   Long64_t    Finalize(const char *queryref, Bool_t force = kFALSE);
   Int_t       Remove(Int_t query, Bool_t all = kFALSE);
   Int_t       Remove(const char *queryref, Bool_t all = kFALSE);
   Int_t       Retrieve(Int_t query, const char *path = 0);
   Int_t       Retrieve(const char *queryref, const char *path = 0);

   void        StopProcess(Bool_t abort, Int_t timeout = -1);
   void        Browse(TBrowser *b);

   Int_t       SetParallel(Int_t nodes = 9999, Bool_t random = kFALSE);
   void        SetLogLevel(Int_t level, UInt_t mask = TProofDebug::kAll);

   void        Close(Option_t *option="");
   virtual void Print(Option_t *option="") const;

   //-- cache and package management
   virtual void ShowCache(Bool_t all = kFALSE);
   virtual void ClearCache(const char *file = 0);
   TList      *GetListOfPackages();
   TList      *GetListOfEnabledPackages();
   void        ShowPackages(Bool_t all = kFALSE);
   void        ShowEnabledPackages(Bool_t all = kFALSE);
   Int_t       ClearPackages();
   Int_t       ClearPackage(const char *package);
   Int_t       EnablePackage(const char *package, Bool_t notOnClient = kFALSE);
   Int_t       UploadPackage(const char *par, EUploadPackageOpt opt = kUntar);
   Int_t       Load(const char *macro, Bool_t notOnClient = kFALSE, Bool_t uniqueOnly = kTRUE);

   Int_t       AddDynamicPath(const char *libpath, Bool_t onClient = kFALSE);
   Int_t       AddIncludePath(const char *incpath, Bool_t onClient = kFALSE);
   Int_t       RemoveDynamicPath(const char *libpath, Bool_t onClient = kFALSE);
   Int_t       RemoveIncludePath(const char *incpath, Bool_t onClient = kFALSE);

   //-- dataset management
   Int_t       UploadDataSet(const char *dataset,
                             TList *files,
                             const char *dest = 0,
                             Int_t opt = kAskUser,
                             TList *skippedFiles = 0);
   Int_t       UploadDataSet(const char *dataset,
                             const char *files,
                             const char *dest = 0,
                             Int_t opt = kAskUser,
                             TList *skippedFiles = 0);
   Int_t       UploadDataSetFromFile(const char *dataset,
                                     const char *file,
                                     const char *dest = 0,
                                     Int_t opt = kAskUser,
                                     TList *skippedFiles = 0);
   virtual Bool_t  RegisterDataSet(const char *name,
                               TFileCollection *dataset, const char* optStr = "");
   virtual TMap *GetDataSets(const char *uri = "", const char* optStr = "");
   virtual void  ShowDataSets(const char *uri = "", const char* optStr = "");

   TMap       *GetDataSetQuota(const char* optStr = "");
   void        ShowDataSetQuota(Option_t* opt = 0);

   void        ShowDataSet(const char *dataset = "", const char* opt = "M");
   virtual Int_t RemoveDataSet(const char *dataset, const char* optStr = "");
   virtual Int_t VerifyDataSet(const char *dataset, const char* optStr = "");
   virtual TFileCollection *GetDataSet(const char *dataset, const char* optStr = "");
   TList       *FindDataSets(const char *searchString, const char* optStr = "");

   const char *GetMaster() const { return fMaster; }
   const char *GetConfDir() const { return fConfDir; }
   const char *GetConfFile() const { return fConfFile; }
   const char *GetUser() const { return fUrl.GetUser(); }
   const char *GetWorkDir() const { return fWorkDir; }
   const char *GetSessionTag() const { return GetName(); }
   const char *GetImage() const { return fImage; }
   const char *GetUrl() { return fUrl.GetUrl(); }
   Int_t       GetPort() const { return fUrl.GetPort(); }
   Int_t       GetRemoteProtocol() const { return fProtocol; }
   Int_t       GetClientProtocol() const { return kPROOF_Protocol; }
   Int_t       GetStatus() const { return fStatus; }
   Int_t       GetLogLevel() const { return fLogLevel; }
   Int_t       GetParallel() const;
   Int_t       GetSessionID() const { return fSessionID; }
   TList      *GetListOfSlaveInfos();
   Bool_t      UseDynamicStartup() const { return fDynamicStartup; }

   EQueryMode  GetQueryMode(Option_t *mode = 0) const;
   void        SetQueryMode(EQueryMode mode);

   void        SetRealTimeLog(Bool_t on = kTRUE);

   Long64_t    GetBytesRead() const { return fBytesRead; }
   Float_t     GetRealTime() const { return fRealTime; }
   Float_t     GetCpuTime() const { return fCpuTime; }

   Bool_t      IsLite() const { return (fServType == TProofMgr::kProofLite); }
   Bool_t      IsProofd() const { return (fServType == TProofMgr::kProofd); }
   Bool_t      IsFolder() const { return kTRUE; }
   Bool_t      IsMaster() const { return fMasterServ; }
   Bool_t      IsValid() const { return fValid; }
   Bool_t      IsParallel() const { return GetParallel() > 0 ? kTRUE : kFALSE; }
   Bool_t      IsIdle() const { return (fNotIdle <= 0) ? kTRUE : kFALSE; }

   ERunStatus  GetRunStatus() const { return fRunStatus; }
   TList      *GetLoadedMacros() const { return fLoadedMacros; }

   //-- input list parameter handling
   void        SetParameter(const char *par, const char *value);
   void        SetParameter(const char *par, Int_t value);
   void        SetParameter(const char *par, Long_t value);
   void        SetParameter(const char *par, Long64_t value);
   void        SetParameter(const char *par, Double_t value);
   TObject    *GetParameter(const char *par) const;
   void        DeleteParameters(const char *wildcard);
   void        ShowParameters(const char *wildcard = "PROOF_*") const;

   void        AddInput(TObject *obj);
   void        ClearInput();
   TList      *GetInputList();
   TObject    *GetOutput(const char *name);
   TList      *GetOutputList();

   void        AddInputData(TObject *obj, Bool_t push = kFALSE);
   void        SetInputDataFile(const char *datafile);
   void        ClearInputData(TObject *obj = 0);
   void        ClearInputData(const char *name);

   void        AddFeedback(const char *name);
   void        RemoveFeedback(const char *name);
   void        ClearFeedback();
   void        ShowFeedback() const;
   TList      *GetFeedbackList() const;

   virtual TList *GetListOfQueries(Option_t *opt = "");
   Int_t       GetNumberOfQueries();
   Int_t       GetNumberOfDrawQueries() { return fDrawQueries; }
   TList      *GetQueryResults();
   TQueryResult *GetQueryResult(const char *ref = 0);
   void        GetMaxQueries();
   void        SetMaxDrawQueries(Int_t max);
   void        ShowQueries(Option_t *opt = "");

   Bool_t      IsDataReady(Long64_t &totalbytes, Long64_t &bytesready);

   void        SetActive(Bool_t /*active*/ = kTRUE) { }

   void        LogMessage(const char *msg, Bool_t all); //*SIGNAL*
   void        Progress(Long64_t total, Long64_t processed); //*SIGNAL*
   void        Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
                        Float_t initTime, Float_t procTime,
                        Float_t evtrti, Float_t mbrti); // *SIGNAL*
   void        Feedback(TList *objs); //*SIGNAL*
   void        QueryResultReady(const char *ref); //*SIGNAL*
   void        CloseProgressDialog(); //*SIGNAL*
   void        ResetProgressDialog(const char *sel, Int_t sz,
                                   Long64_t fst, Long64_t ent); //*SIGNAL*
   void        StartupMessage(const char *msg, Bool_t status, Int_t done,
                              Int_t total); //*SIGNAL*
   void        DataSetStatus(const char *msg, Bool_t status,
                             Int_t done, Int_t total); //*SIGNAL*

   void        SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st);

   void        GetLog(Int_t start = -1, Int_t end = -1);
   void        PutLog(TQueryResult *qr);
   void        ShowLog(Int_t qry = -1);
   void        ShowLog(const char *queryref);
   Bool_t      SendingLogToWindow() const { return fLogToWindowOnly; }
   void        SendLogToWindow(Bool_t mode) { fLogToWindowOnly = mode; }

   void        ResetProgressDialogStatus() { fProgressDialogStarted = kFALSE; }

   virtual TTree *GetTreeHeader(TDSet *tdset);
   TList      *GetOutputNames();

   void        AddChain(TChain *chain);
   void        RemoveChain(TChain *chain);

   TDrawFeedback *CreateDrawFeedback();
   void           SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
   void           DeleteDrawFeedback(TDrawFeedback *f);

   void        Detach(Option_t *opt = "");

   virtual void SetAlias(const char *alias="");

   TProofMgr  *GetManager() { return fManager; }
   void        SetManager(TProofMgr *mgr);

   void        ActivateWorker(const char *ord);
   void        DeactivateWorker(const char *ord);

   const char *GetDataPoolUrl() const { return fDataPoolUrl; }
   void        SetDataPoolUrl(const char *url) { fDataPoolUrl = url; }

   void        SetPrintProgress(PrintProgress_t pp) { fPrintProgress = pp; }

   // Opening and managing PROOF connections
   static TProof       *Open(const char *url = 0, const char *conffile = 0,
                             const char *confdir = 0, Int_t loglevel = 0);
   static TProofMgr    *Mgr(const char *url);
   static void          Reset(const char *url, Bool_t hard = kFALSE);

   static void          AddEnvVar(const char *name, const char *value);
   static void          DelEnvVar(const char *name);
   static const TList  *GetEnvVars();
   static void          ResetEnvVars();

   // Input/output list utilities
   static Int_t         GetParameter(TCollection *c, const char *par, TString &value);
   static Int_t         GetParameter(TCollection *c, const char *par, Int_t &value);
   static Int_t         GetParameter(TCollection *c, const char *par, Long_t &value);
   static Int_t         GetParameter(TCollection *c, const char *par, Long64_t &value);
   static Int_t         GetParameter(TCollection *c, const char *par, Double_t &value);

   ClassDef(TProof,0)  //PROOF control class
};

// Global object with default PROOF session
R__EXTERN TProof *gProof;

#endif

Last change: Thu Dec 18 09:31:35 2008
Last generated: 2008-12-18 09:31

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.