Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

Parallel/vtkMultiProcessController.h

Go to the documentation of this file.
00001 /*=========================================================================
00002 
00003   Program:   Visualization Toolkit
00004   Module:    $RCSfile: vtkMultiProcessController.h,v $
00005 
00006   Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
00007   All rights reserved.
00008   See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
00009 
00010      This software is distributed WITHOUT ANY WARRANTY; without even
00011      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
00012      PURPOSE.  See the above copyright notice for more information.
00013 
00014 =========================================================================*/
00045 #ifndef __vtkMultiProcessController_h
00046 #define __vtkMultiProcessController_h
00047 
00048 #include "vtkObject.h"
00049 
00050 #include "vtkCommunicator.h" // Needed for direct access to communicator
00051 
00052 class vtkDataSet;
00053 class vtkImageData;
00054 class vtkCollection;
00055 class vtkOutputWindow;
00056 class vtkDataObject;
00057 class vtkMultiProcessController;
00058 
00059 //BTX
00060 // The type of function that gets called when new processes are initiated.
00061 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 
00062                                        void *userData);
00063 
00064 // The type of function that gets called when an RMI is triggered.
00065 typedef void (*vtkRMIFunctionType)(void *localArg, 
00066                                    void *remoteArg, int remoteArgLength, 
00067                                    int remoteProcessId);
00068 //ETX
00069 
00070 
00071 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00072 {
00073 public:
00074   static vtkMultiProcessController *New();
00075   vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00076   void PrintSelf(ostream& os, vtkIndent indent);
00077 
00081   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00082 
00084 
00087   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00088                           int initializedExternally)=0;
00090 
00093   virtual void Finalize()=0;
00094 
00098   virtual void Finalize(int finalizedExternally)=0;
00099 
00101 
00104   virtual void SetNumberOfProcesses(int num);
00105   vtkGetMacro( NumberOfProcesses, int );
00107 
00108   //BTX
00110 
00113   void SetSingleMethod(vtkProcessFunctionType, void *data);
00114   //ETX
00116 
00120   virtual void SingleMethodExecute() = 0;
00121   
00122   //BTX
00124 
00128   void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 
00129   //ETX
00131 
00135   virtual void MultipleMethodExecute() = 0;
00136 
00138   virtual int GetLocalProcessId() { return this->LocalProcessId; }
00139 
00144   static vtkMultiProcessController *GetGlobalController();
00145 
00148   virtual void CreateOutputWindow() = 0;
00149   
00151 
00156   vtkSetMacro(ForceDeepCopy, int);
00157   vtkGetMacro(ForceDeepCopy, int);
00158   vtkBooleanMacro(ForceDeepCopy, int);
00160 
00161   
00162   //------------------ RMIs --------------------
00163   //BTX
00169   void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00170   
00172 
00173   void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00174     {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00175   //ETX
00177   
00179   void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00180 
00183   void TriggerBreakRMIs();
00184 
00186 
00187   void TriggerRMI(int remoteProcessId, char *arg, int tag) 
00188     { this->TriggerRMI(remoteProcessId, (void*)arg, 
00189                        static_cast<int>(strlen(arg))+1, tag); }
00191 
00193 
00194   void TriggerRMI(int remoteProcessId, int tag)
00195     { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00197 
00200   void ProcessRMIs();
00201 
00203 
00206   vtkSetMacro(BreakFlag, int);
00207   vtkGetMacro(BreakFlag, int);
00209 
00211   vtkGetObjectMacro(Communicator, vtkCommunicator);
00213   
00214 //BTX
00215 
00216   enum Consts {
00217     MAX_PROCESSES=8192,
00218     ANY_SOURCE=-1,
00219     INVALID_SOURCE=-2,
00220     RMI_TAG=315167,
00221     RMI_ARG_TAG=315168,
00222     BREAK_RMI_TAG=239954
00223   };
00224 
00225 //ETX
00226 
00228   virtual void Barrier() = 0;
00229 
00230   static void SetGlobalController(vtkMultiProcessController *controller);
00231 
00232   //------------------ Communication --------------------
00233   
00235 
00237   int Send(int* data, int length, int remoteProcessId, int tag);
00238   int Send(unsigned long* data, int length, int remoteProcessId, 
00239            int tag);
00240   int Send(char* data, int length, int remoteProcessId, int tag);
00241   int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00242   int Send(float* data, int length, int remoteProcessId, int tag);
00243   int Send(double* data, int length, int remoteProcessId, int tag);
00244 #ifdef VTK_USE_64BIT_IDS
00245   int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00247 #endif
00248   int Send(vtkDataObject *data, int remoteId, int tag);
00249   int Send(vtkDataArray *data, int remoteId, int tag);
00250 
00252 
00255   int Receive(int* data, int length, int remoteProcessId, int tag);
00256   int Receive(unsigned long* data, int length, int remoteProcessId, 
00257               int tag);
00258   int Receive(char* data, int length, int remoteProcessId, int tag);
00259   int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00260   int Receive(float* data, int length, int remoteProcessId, int tag);
00261   int Receive(double* data, int length, int remoteProcessId, int tag);
00262 #ifdef VTK_USE_64BIT_IDS
00263   int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00265 #endif
00266   int Receive(vtkDataObject* data, int remoteId, int tag);
00267   int Receive(vtkDataArray* data, int remoteId, int tag);
00268 
00269 // Internally implemented RMI to break the process loop.
00270 
00271 protected:
00272   vtkMultiProcessController();
00273   ~vtkMultiProcessController();
00274   
00275   int MaximumNumberOfProcesses;
00276   int NumberOfProcesses;
00277 
00278   int LocalProcessId;
00279   
00280   vtkProcessFunctionType      SingleMethod;
00281   void                       *SingleData;
00282   vtkProcessFunctionType      MultipleMethod[MAX_PROCESSES];
00283   void                       *MultipleData[MAX_PROCESSES];  
00284   
00285   vtkCollection *RMIs;
00286   
00287   // This is a flag that can be used by the ports to break
00288   // their update loop. (same as ProcessRMIs)
00289   int BreakFlag;
00290 
00291   void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00292 
00293   // This method implements "GetGlobalController".  
00294   // It needs to be virtual and static.
00295   virtual vtkMultiProcessController *GetLocalController();
00296 
00297   
00298   // This flag can force deep copies during send.
00299   int ForceDeepCopy;
00300 
00301   vtkOutputWindow* OutputWindow;
00302 
00303   // Note that since the communicators can be created differently
00304   // depending on the type of controller, the subclasses are
00305   // responsible of deleting them.
00306   vtkCommunicator* Communicator;
00307 
00308   // Communicator which is a copy of the current user
00309   // level communicator except the context; i.e. even if the tags 
00310   // are the same, the RMI messages will not interfere with user 
00311   // level messages. (This only works with MPI. When using threads,
00312   // the tags have to be unique.)
00313   // Note that since the communicators can be created differently
00314   // depending on the type of controller, the subclasses are
00315   // responsible of deleting them.
00316   vtkCommunicator* RMICommunicator;
00317 
00318 private:
00319   vtkMultiProcessController(const vtkMultiProcessController&);  // Not implemented.
00320   void operator=(const vtkMultiProcessController&);  // Not implemented.
00321 };
00322 
00323 
00324 inline int vtkMultiProcessController::Send(vtkDataObject *data, 
00325                                            int remoteThreadId, int tag)
00326 {
00327   if (this->Communicator)
00328     {
00329     return this->Communicator->Send(data, remoteThreadId, tag);
00330     }
00331   else
00332     {
00333     return 0;
00334     }
00335 }
00336 
00337 inline int vtkMultiProcessController::Send(vtkDataArray *data, 
00338                                            int remoteThreadId, int tag)
00339 {
00340   if (this->Communicator)
00341     {
00342     return this->Communicator->Send(data, remoteThreadId, tag);
00343     }
00344   else
00345     {
00346     return 0;
00347     }
00348 }
00349 
00350 inline int vtkMultiProcessController::Send(int* data, int length, 
00351                                            int remoteThreadId, int tag)
00352 {
00353   if (this->Communicator)
00354     {
00355     return this->Communicator->Send(data, length, remoteThreadId, tag);
00356     }
00357   else
00358     {
00359     return 0;
00360     }
00361 }
00362 
00363 inline int vtkMultiProcessController::Send(unsigned long* data, 
00364                                            int length, int remoteThreadId, 
00365                                            int tag)
00366 {
00367   if (this->Communicator)
00368     {
00369     return this->Communicator->Send(data, length, remoteThreadId, tag);
00370     }
00371   else
00372     {
00373     return 0;
00374     }
00375 }
00376 
00377 inline int vtkMultiProcessController::Send(char* data, int length, 
00378                                            int remoteThreadId, int tag)
00379 {
00380   if (this->Communicator)
00381     {
00382     return this->Communicator->Send(data, length, remoteThreadId, tag);
00383     }
00384   else
00385     {
00386     return 0;
00387     }
00388 }
00389 
00390 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 
00391                                            int remoteThreadId, int tag)
00392 {
00393   if (this->Communicator)
00394     {
00395     return this->Communicator->Send(data, length, remoteThreadId, tag);
00396     }
00397   else
00398     {
00399     return 0;
00400     }
00401 }
00402 
00403 inline int vtkMultiProcessController::Send(float* data, int length, 
00404                                            int remoteThreadId, int tag)
00405 {
00406   if (this->Communicator)
00407     {
00408     return this->Communicator->Send(data, length, remoteThreadId, tag);
00409     }
00410   else
00411     {
00412     return 0;
00413     }
00414 }
00415 
00416 inline int vtkMultiProcessController::Send(double* data, int length, 
00417                                            int remoteThreadId, int tag)
00418 {
00419   if (this->Communicator)
00420     {
00421     return this->Communicator->Send(data, length, remoteThreadId, tag);
00422     }
00423   else
00424     {
00425     return 0;
00426     }
00427 }
00428 
00429 #ifdef VTK_USE_64BIT_IDS
00430 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 
00431                                            int remoteThreadId, int tag)
00432 {
00433   if (this->Communicator)
00434     {
00435     return this->Communicator->Send(data, length, remoteThreadId, tag);
00436     }
00437   else
00438     {
00439     return 0;
00440     }
00441 }
00442 #endif
00443 
00444 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 
00445                                               int remoteThreadId, int tag)
00446 {
00447   if (this->Communicator)
00448     {
00449     return this->Communicator->Receive(data, remoteThreadId, tag);
00450     }
00451   else
00452     {
00453     return 0;
00454     }
00455 }
00456 
00457 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 
00458                                               int remoteThreadId, int tag)
00459 {
00460   if (this->Communicator)
00461     {
00462     return this->Communicator->Receive(data, remoteThreadId, tag);
00463     }
00464   else
00465     {
00466     return 0;
00467     }
00468 }
00469 
00470 inline int vtkMultiProcessController::Receive(int* data, int length, 
00471                                               int remoteThreadId, int tag)
00472 {
00473   if (this->Communicator)
00474     {
00475     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00476     }
00477   else
00478     {
00479     return 0;
00480     }
00481 }
00482 
00483 inline int vtkMultiProcessController::Receive(unsigned long* data, 
00484                                               int length,int remoteThreadId, 
00485                                               int tag)
00486 {
00487   if (this->Communicator)
00488     {
00489     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00490     }
00491   else
00492     {
00493     return 0;
00494     }
00495 }
00496 
00497 inline int vtkMultiProcessController::Receive(char* data, int length, 
00498                                               int remoteThreadId, int tag)
00499 {
00500   if (this->Communicator)
00501     {
00502     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00503     }
00504   else
00505     {
00506     return 0;
00507     }
00508 }
00509 
00510 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 
00511                                               int remoteThreadId, int tag)
00512 {
00513   if (this->Communicator)
00514     {
00515     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00516     }
00517   else
00518     {
00519     return 0;
00520     }
00521 }
00522 
00523 inline int vtkMultiProcessController::Receive(float* data, int length, 
00524                                               int remoteThreadId, int tag)
00525 {
00526   if (this->Communicator)
00527     {
00528     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00529     }
00530   else
00531     {
00532     return 0;
00533     }
00534 }
00535 
00536 inline int vtkMultiProcessController::Receive(double* data, int length, 
00537                                               int remoteThreadId, int tag)
00538 {
00539   if (this->Communicator)
00540     {
00541     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00542     }
00543   else
00544     {
00545     return 0;
00546     }
00547 }
00548 
00549 #ifdef VTK_USE_64BIT_IDS
00550 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 
00551                                               int remoteThreadId, int tag)
00552 {
00553   if (this->Communicator)
00554     {
00555     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00556     }
00557   else
00558     {
00559     return 0;
00560     }
00561 }
00562 #endif
00563 
00564 #endif