00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00045 #ifndef __vtkMultiProcessController_h
00046 #define __vtkMultiProcessController_h
00047
00048 #include "vtkObject.h"
00049
00050 #include "vtkCommunicator.h"
00051
00052 class vtkDataSet;
00053 class vtkImageData;
00054 class vtkCollection;
00055 class vtkOutputWindow;
00056 class vtkDataObject;
00057 class vtkMultiProcessController;
00058
00059
00060
00061 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller,
00062 void *userData);
00063
00064
00065 typedef void (*vtkRMIFunctionType)(void *localArg,
00066 void *remoteArg, int remoteArgLength,
00067 int remoteProcessId);
00068
00069
00070
00071 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00072 {
00073 public:
00074 static vtkMultiProcessController *New();
00075 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00076 void PrintSelf(ostream& os, vtkIndent indent);
00077
00081 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00082
00084
00087 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00088 int initializedExternally)=0;
00090
00093 virtual void Finalize()=0;
00094
00098 virtual void Finalize(int finalizedExternally)=0;
00099
00101
00104 virtual void SetNumberOfProcesses(int num);
00105 vtkGetMacro( NumberOfProcesses, int );
00107
00108
00110
00113 void SetSingleMethod(vtkProcessFunctionType, void *data);
00114
00116
00120 virtual void SingleMethodExecute() = 0;
00121
00122
00124
00128 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data);
00129
00131
00135 virtual void MultipleMethodExecute() = 0;
00136
00138 virtual int GetLocalProcessId() { return this->LocalProcessId; }
00139
00144 static vtkMultiProcessController *GetGlobalController();
00145
00148 virtual void CreateOutputWindow() = 0;
00149
00151
00156 vtkSetMacro(ForceDeepCopy, int);
00157 vtkGetMacro(ForceDeepCopy, int);
00158 vtkBooleanMacro(ForceDeepCopy, int);
00160
00161
00162
00163
00169 void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00170
00172
00173 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00174 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00175
00177
00179 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00180
00183 void TriggerBreakRMIs();
00184
00186
00187 void TriggerRMI(int remoteProcessId, char *arg, int tag)
00188 { this->TriggerRMI(remoteProcessId, (void*)arg,
00189 static_cast<int>(strlen(arg))+1, tag); }
00191
00193
00194 void TriggerRMI(int remoteProcessId, int tag)
00195 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00197
00200 void ProcessRMIs();
00201
00203
00206 vtkSetMacro(BreakFlag, int);
00207 vtkGetMacro(BreakFlag, int);
00209
00211 vtkGetObjectMacro(Communicator, vtkCommunicator);
00213
00214
00215
00216 enum Consts {
00217 MAX_PROCESSES=8192,
00218 ANY_SOURCE=-1,
00219 INVALID_SOURCE=-2,
00220 RMI_TAG=315167,
00221 RMI_ARG_TAG=315168,
00222 BREAK_RMI_TAG=239954
00223 };
00224
00225
00226
00228 virtual void Barrier() = 0;
00229
00230 static void SetGlobalController(vtkMultiProcessController *controller);
00231
00232
00233
00235
00237 int Send(int* data, int length, int remoteProcessId, int tag);
00238 int Send(unsigned long* data, int length, int remoteProcessId,
00239 int tag);
00240 int Send(char* data, int length, int remoteProcessId, int tag);
00241 int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00242 int Send(float* data, int length, int remoteProcessId, int tag);
00243 int Send(double* data, int length, int remoteProcessId, int tag);
00244 #ifdef VTK_USE_64BIT_IDS
00245 int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00247 #endif
00248 int Send(vtkDataObject *data, int remoteId, int tag);
00249 int Send(vtkDataArray *data, int remoteId, int tag);
00250
00252
00255 int Receive(int* data, int length, int remoteProcessId, int tag);
00256 int Receive(unsigned long* data, int length, int remoteProcessId,
00257 int tag);
00258 int Receive(char* data, int length, int remoteProcessId, int tag);
00259 int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00260 int Receive(float* data, int length, int remoteProcessId, int tag);
00261 int Receive(double* data, int length, int remoteProcessId, int tag);
00262 #ifdef VTK_USE_64BIT_IDS
00263 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00265 #endif
00266 int Receive(vtkDataObject* data, int remoteId, int tag);
00267 int Receive(vtkDataArray* data, int remoteId, int tag);
00268
00269
00270
00271 protected:
00272 vtkMultiProcessController();
00273 ~vtkMultiProcessController();
00274
00275 int MaximumNumberOfProcesses;
00276 int NumberOfProcesses;
00277
00278 int LocalProcessId;
00279
00280 vtkProcessFunctionType SingleMethod;
00281 void *SingleData;
00282 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES];
00283 void *MultipleData[MAX_PROCESSES];
00284
00285 vtkCollection *RMIs;
00286
00287
00288
00289 int BreakFlag;
00290
00291 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00292
00293
00294
00295 virtual vtkMultiProcessController *GetLocalController();
00296
00297
00298
00299 int ForceDeepCopy;
00300
00301 vtkOutputWindow* OutputWindow;
00302
00303
00304
00305
00306 vtkCommunicator* Communicator;
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316 vtkCommunicator* RMICommunicator;
00317
00318 private:
00319 vtkMultiProcessController(const vtkMultiProcessController&);
00320 void operator=(const vtkMultiProcessController&);
00321 };
00322
00323
00324 inline int vtkMultiProcessController::Send(vtkDataObject *data,
00325 int remoteThreadId, int tag)
00326 {
00327 if (this->Communicator)
00328 {
00329 return this->Communicator->Send(data, remoteThreadId, tag);
00330 }
00331 else
00332 {
00333 return 0;
00334 }
00335 }
00336
00337 inline int vtkMultiProcessController::Send(vtkDataArray *data,
00338 int remoteThreadId, int tag)
00339 {
00340 if (this->Communicator)
00341 {
00342 return this->Communicator->Send(data, remoteThreadId, tag);
00343 }
00344 else
00345 {
00346 return 0;
00347 }
00348 }
00349
00350 inline int vtkMultiProcessController::Send(int* data, int length,
00351 int remoteThreadId, int tag)
00352 {
00353 if (this->Communicator)
00354 {
00355 return this->Communicator->Send(data, length, remoteThreadId, tag);
00356 }
00357 else
00358 {
00359 return 0;
00360 }
00361 }
00362
00363 inline int vtkMultiProcessController::Send(unsigned long* data,
00364 int length, int remoteThreadId,
00365 int tag)
00366 {
00367 if (this->Communicator)
00368 {
00369 return this->Communicator->Send(data, length, remoteThreadId, tag);
00370 }
00371 else
00372 {
00373 return 0;
00374 }
00375 }
00376
00377 inline int vtkMultiProcessController::Send(char* data, int length,
00378 int remoteThreadId, int tag)
00379 {
00380 if (this->Communicator)
00381 {
00382 return this->Communicator->Send(data, length, remoteThreadId, tag);
00383 }
00384 else
00385 {
00386 return 0;
00387 }
00388 }
00389
00390 inline int vtkMultiProcessController::Send(unsigned char* data, int length,
00391 int remoteThreadId, int tag)
00392 {
00393 if (this->Communicator)
00394 {
00395 return this->Communicator->Send(data, length, remoteThreadId, tag);
00396 }
00397 else
00398 {
00399 return 0;
00400 }
00401 }
00402
00403 inline int vtkMultiProcessController::Send(float* data, int length,
00404 int remoteThreadId, int tag)
00405 {
00406 if (this->Communicator)
00407 {
00408 return this->Communicator->Send(data, length, remoteThreadId, tag);
00409 }
00410 else
00411 {
00412 return 0;
00413 }
00414 }
00415
00416 inline int vtkMultiProcessController::Send(double* data, int length,
00417 int remoteThreadId, int tag)
00418 {
00419 if (this->Communicator)
00420 {
00421 return this->Communicator->Send(data, length, remoteThreadId, tag);
00422 }
00423 else
00424 {
00425 return 0;
00426 }
00427 }
00428
00429 #ifdef VTK_USE_64BIT_IDS
00430 inline int vtkMultiProcessController::Send(vtkIdType* data, int length,
00431 int remoteThreadId, int tag)
00432 {
00433 if (this->Communicator)
00434 {
00435 return this->Communicator->Send(data, length, remoteThreadId, tag);
00436 }
00437 else
00438 {
00439 return 0;
00440 }
00441 }
00442 #endif
00443
00444 inline int vtkMultiProcessController::Receive(vtkDataObject* data,
00445 int remoteThreadId, int tag)
00446 {
00447 if (this->Communicator)
00448 {
00449 return this->Communicator->Receive(data, remoteThreadId, tag);
00450 }
00451 else
00452 {
00453 return 0;
00454 }
00455 }
00456
00457 inline int vtkMultiProcessController::Receive(vtkDataArray* data,
00458 int remoteThreadId, int tag)
00459 {
00460 if (this->Communicator)
00461 {
00462 return this->Communicator->Receive(data, remoteThreadId, tag);
00463 }
00464 else
00465 {
00466 return 0;
00467 }
00468 }
00469
00470 inline int vtkMultiProcessController::Receive(int* data, int length,
00471 int remoteThreadId, int tag)
00472 {
00473 if (this->Communicator)
00474 {
00475 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00476 }
00477 else
00478 {
00479 return 0;
00480 }
00481 }
00482
00483 inline int vtkMultiProcessController::Receive(unsigned long* data,
00484 int length,int remoteThreadId,
00485 int tag)
00486 {
00487 if (this->Communicator)
00488 {
00489 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00490 }
00491 else
00492 {
00493 return 0;
00494 }
00495 }
00496
00497 inline int vtkMultiProcessController::Receive(char* data, int length,
00498 int remoteThreadId, int tag)
00499 {
00500 if (this->Communicator)
00501 {
00502 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00503 }
00504 else
00505 {
00506 return 0;
00507 }
00508 }
00509
00510 inline int vtkMultiProcessController::Receive(unsigned char* data, int length,
00511 int remoteThreadId, int tag)
00512 {
00513 if (this->Communicator)
00514 {
00515 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00516 }
00517 else
00518 {
00519 return 0;
00520 }
00521 }
00522
00523 inline int vtkMultiProcessController::Receive(float* data, int length,
00524 int remoteThreadId, int tag)
00525 {
00526 if (this->Communicator)
00527 {
00528 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00529 }
00530 else
00531 {
00532 return 0;
00533 }
00534 }
00535
00536 inline int vtkMultiProcessController::Receive(double* data, int length,
00537 int remoteThreadId, int tag)
00538 {
00539 if (this->Communicator)
00540 {
00541 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00542 }
00543 else
00544 {
00545 return 0;
00546 }
00547 }
00548
00549 #ifdef VTK_USE_64BIT_IDS
00550 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length,
00551 int remoteThreadId, int tag)
00552 {
00553 if (this->Communicator)
00554 {
00555 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00556 }
00557 else
00558 {
00559 return 0;
00560 }
00561 }
00562 #endif
00563
00564 #endif