diff --git a/src/BRepMesh/BRepMesh_FastDiscret.cxx b/src/BRepMesh/BRepMesh_FastDiscret.cxx index 1d453897ff..56e7b5f711 100644 --- a/src/BRepMesh/BRepMesh_FastDiscret.cxx +++ b/src/BRepMesh/BRepMesh_FastDiscret.cxx @@ -120,7 +120,7 @@ void BRepMesh_FastDiscret::Perform(const TopoDS_Shape& theShape) aFaces.push_back(aFace); } - OSD_Parallel::ForEach(aFaces.begin(), aFaces.end(), *this, !myParameters.InParallel); + OSD_Parallel::ForEach(aFaces.begin(), aFaces.end(), *this, !myParameters.InParallel, (Standard_Integer )aFaces.size()); } diff --git a/src/BRepMesh/BRepMesh_IncrementalMesh.cxx b/src/BRepMesh/BRepMesh_IncrementalMesh.cxx index 70278be259..09bd970a61 100644 --- a/src/BRepMesh/BRepMesh_IncrementalMesh.cxx +++ b/src/BRepMesh/BRepMesh_IncrementalMesh.cxx @@ -234,7 +234,7 @@ void BRepMesh_IncrementalMesh::update() update(aFaceIt.Value()); // Mesh faces - OSD_Parallel::ForEach(myFaces.begin(), myFaces.end(), *myMesh, !myParameters.InParallel); + OSD_Parallel::ForEach(myFaces.begin(), myFaces.end(), *myMesh, !myParameters.InParallel, myFaces.Size()); commit(); clear(); diff --git a/src/OSD/FILES b/src/OSD/FILES index 77b940ba57..488da0ccf9 100755 --- a/src/OSD/FILES +++ b/src/OSD/FILES @@ -85,6 +85,8 @@ OSD_SingleProtection.hxx OSD_SysType.hxx OSD_Thread.cxx OSD_Thread.hxx +OSD_ThreadPool.cxx +OSD_ThreadPool.hxx OSD_ThreadFunction.hxx OSD_Timer.cxx OSD_Timer.hxx diff --git a/src/OSD/OSD.hxx b/src/OSD/OSD.hxx index 4418a11766..dd80e028b0 100644 --- a/src/OSD/OSD.hxx +++ b/src/OSD/OSD.hxx @@ -104,7 +104,10 @@ public: //! user's code. Refer to Foundation Classes User's Guide for further details. //! Standard_EXPORT static void SetSignal (const Standard_Boolean theFloatingSignal = Standard_True); - + + //! Return floating signal catching value previously set by SetSignal(). + Standard_EXPORT static Standard_Boolean ToCatchFloatingSignals(); + //! Commands the process to sleep for a number of seconds. Standard_EXPORT static void SecSleep (const Standard_Integer aDelay); diff --git a/src/OSD/OSD_Parallel.hxx b/src/OSD/OSD_Parallel.hxx index 65e3087b0c..d39e9d7b47 100644 --- a/src/OSD/OSD_Parallel.hxx +++ b/src/OSD/OSD_Parallel.hxx @@ -254,9 +254,11 @@ private: //! @param theEnd the last index (exclusive) //! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}" //! performing task for the specified iterator position + //! @param theNbItems number of items passed by iterator, -1 if unknown Standard_EXPORT static void forEach (UniversalIterator& theBegin, UniversalIterator& theEnd, - const FunctorInterface& theFunctor); + const FunctorInterface& theFunctor, + Standard_Integer theNbItems); public: //! @name public methods @@ -274,13 +276,15 @@ public: //! @name public methods //! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}" //! performing task for specified iterator position //! @param isForceSingleThreadExecution if true, then no threads will be created + //! @param theNbItems number of items passed by iterator, -1 if unknown template static void ForEach(InputIterator theBegin, InputIterator theEnd, const Functor& theFunctor, - const Standard_Boolean isForceSingleThreadExecution = Standard_False) + const Standard_Boolean isForceSingleThreadExecution = Standard_False, + Standard_Integer theNbItems = -1) { - if (isForceSingleThreadExecution) + if (isForceSingleThreadExecution || theNbItems == 1) { for (InputIterator it(theBegin); it != theEnd; ++it) theFunctor(*it); @@ -290,7 +294,7 @@ public: //! @name public methods UniversalIterator aBegin(new IteratorWrapper(theBegin)); UniversalIterator aEnd (new IteratorWrapper(theEnd)); FunctorWrapperIter aFunctor (theFunctor); - forEach(aBegin, aEnd, aFunctor); + forEach(aBegin, aEnd, aFunctor, theNbItems); } } @@ -311,7 +315,7 @@ public: //! @name public methods const Functor& theFunctor, const Standard_Boolean isForceSingleThreadExecution = Standard_False) { - if (isForceSingleThreadExecution) + if (isForceSingleThreadExecution || (theEnd - theBegin) == 1) { for (Standard_Integer it (theBegin); it != theEnd; ++it) theFunctor(it); @@ -321,7 +325,7 @@ public: //! @name public methods UniversalIterator aBegin(new IteratorWrapper(theBegin)); UniversalIterator aEnd (new IteratorWrapper(theEnd)); FunctorWrapperInt aFunctor (theFunctor); - forEach(aBegin, aEnd, aFunctor); + forEach(aBegin, aEnd, aFunctor, theEnd - theBegin); } } diff --git a/src/OSD/OSD_Parallel_TBB.cxx b/src/OSD/OSD_Parallel_TBB.cxx index 19992ba26a..508119f393 100644 --- a/src/OSD/OSD_Parallel_TBB.cxx +++ b/src/OSD/OSD_Parallel_TBB.cxx @@ -31,8 +31,10 @@ void OSD_Parallel::forEach (UniversalIterator& theBegin, UniversalIterator& theEnd, - const FunctorInterface& theFunctor) + const FunctorInterface& theFunctor, + Standard_Integer theNbItems) { + (void )theNbItems; try { tbb::parallel_for_each(theBegin, theEnd, theFunctor); diff --git a/src/OSD/OSD_Parallel_Threads.cxx b/src/OSD/OSD_Parallel_Threads.cxx index 7c5cb47059..0ffb38fa71 100644 --- a/src/OSD/OSD_Parallel_Threads.cxx +++ b/src/OSD/OSD_Parallel_Threads.cxx @@ -19,6 +19,8 @@ #include +#include + #include #include #include @@ -29,7 +31,7 @@ namespace //! using threads (when TBB is not available); //! it is derived from OSD_Parallel to get access to //! Iterator and FunctorInterface nested types. - class OSD_Parallel_Threads : public OSD_Parallel + class OSD_Parallel_Threads : public OSD_ThreadPool, public OSD_Parallel { public: //! Auxiliary class which ensures exclusive @@ -84,7 +86,7 @@ namespace }; //! Auxiliary wrapper class for thread function. - class Task + class Task : public JobInterface { public: //! @name public methods @@ -97,15 +99,12 @@ namespace //! Method is executed in the context of thread, //! so this method defines the main calculations. - static Standard_Address Run(Standard_Address theTask) + virtual void Perform (int ) Standard_OVERRIDE { - Task& aTask = *(static_cast(theTask)); - - const Range& aData(aTask.myRange); - for (OSD_Parallel::UniversalIterator i = aData.It(); i != aData.End(); i = aData.It()) - aTask.myPerformer(i); - - return NULL; + for (OSD_Parallel::UniversalIterator anIter = myRange.It(); anIter != myRange.End(); anIter = myRange.It()) + { + myPerformer (anIter); + } } private: //! @name private methods @@ -117,9 +116,27 @@ namespace Task& operator=(const Task& theCopy); private: //! @name private fields + const FunctorInterface& myPerformer; //!< Link on functor + const Range& myRange; //!< Link on processed data block + }; - const OSD_Parallel::FunctorInterface& myPerformer; //!< Link on functor. - const Range& myRange; //!< Link on processed data block. + //! Launcher specialization. + class UniversalLauncher : public Launcher + { + public: + //! Constructor. + UniversalLauncher (OSD_ThreadPool& thePool, int theMaxThreads = -1) + : Launcher (thePool, theMaxThreads) {} + + //! Primitive for parallelization of "for" loops. + void Perform (OSD_Parallel::UniversalIterator& theBegin, + OSD_Parallel::UniversalIterator& theEnd, + const OSD_Parallel::FunctorInterface& theFunctor) + { + Range aData (theBegin, theEnd); + Task aJob (theFunctor, aData); + perform (aJob); + } }; }; } @@ -130,22 +147,13 @@ namespace //======================================================================= void OSD_Parallel::forEach (UniversalIterator& theBegin, UniversalIterator& theEnd, - const FunctorInterface& theFunctor) + const FunctorInterface& theFunctor, + Standard_Integer theNbItems) { - OSD_Parallel_Threads::Range aData(theBegin, theEnd); - OSD_Parallel_Threads::Task aTask(theFunctor, aData); - - const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors(); - NCollection_Array1 aThreads(0, aNbThreads - 1); - for (Standard_Integer i = 0; i < aNbThreads; ++i) - { - OSD_Thread& aThread = aThreads(i); - aThread.SetFunction(&OSD_Parallel_Threads::Task::Run); - aThread.Run(&aTask); - } - - for (Standard_Integer i = 0; i < aNbThreads; ++i) - aThreads(i).Wait(); + const Handle(OSD_ThreadPool)& aThreadPool = OSD_ThreadPool::DefaultPool(); + const Standard_Integer aNbThreads = theNbItems != -1 ? Min (theNbItems, aThreadPool->NbDefaultThreadsToLaunch()) : -1; + OSD_Parallel_Threads::UniversalLauncher aLauncher (*aThreadPool, aNbThreads); + aLauncher.Perform (theBegin, theEnd, theFunctor); } -#endif /* ! HAVE_TBB */ \ No newline at end of file +#endif /* ! HAVE_TBB */ diff --git a/src/OSD/OSD_ThreadPool.cxx b/src/OSD/OSD_ThreadPool.cxx new file mode 100644 index 0000000000..70ad50b31a --- /dev/null +++ b/src/OSD/OSD_ThreadPool.cxx @@ -0,0 +1,401 @@ +// Created by: Kirill Gavrilov +// Copyright (c) 2017 OPEN CASCADE SAS +// +// This file is part of commercial software by OPEN CASCADE SAS. +// +// This software is furnished in accordance with the terms and conditions +// of the contract and with the inclusion of this copyright notice. +// This software or any other copy thereof may not be provided or otherwise +// be made available to any third party. +// No ownership title to the software is transferred hereby. +// +// OPEN CASCADE SAS makes no representation or warranties with respect to the +// performance of this software, and specifically disclaims any responsibility +// for any damages, special or consequential, connected with its use. + +#include + +#include +#include +#include + +IMPLEMENT_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient) + +// ======================================================================= +// function : Lock +// purpose : +// ======================================================================= +bool OSD_ThreadPool::EnumeratedThread::Lock() +{ + return Standard_Atomic_CompareAndSwap (&myUsageCounter, 0, 1); +} + +// ======================================================================= +// function : Free +// purpose : +// ======================================================================= +void OSD_ThreadPool::EnumeratedThread::Free() +{ + Standard_Atomic_CompareAndSwap (&myUsageCounter, 1, 0); +} + +// ======================================================================= +// function : WakeUp +// purpose : +// ======================================================================= +void OSD_ThreadPool::EnumeratedThread::WakeUp (JobInterface* theJob, bool theToCatchFpe) +{ + myJob = theJob; + myToCatchFpe = theToCatchFpe; + if (myIsSelfThread) + { + if (theJob != NULL) + { + OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex); + } + return; + } + + myWakeEvent.Set(); + if (theJob != NULL && !myIsStarted) + { + myIsStarted = true; + Run (this); + } +} + +// ======================================================================= +// function : WaitIdle +// purpose : +// ======================================================================= +void OSD_ThreadPool::EnumeratedThread::WaitIdle() +{ + if (!myIsSelfThread) + { + myIdleEvent.Wait(); + myIdleEvent.Reset(); + } +} + +// ======================================================================= +// function : DefaultPool +// purpose : +// ======================================================================= +const Handle(OSD_ThreadPool)& OSD_ThreadPool::DefaultPool (int theNbThreads) +{ + static const Handle(OSD_ThreadPool) THE_GLOBAL_POOL = new OSD_ThreadPool (theNbThreads); + return THE_GLOBAL_POOL; +} + +// ======================================================================= +// function : OSD_ThreadPool +// purpose : +// ======================================================================= +OSD_ThreadPool::OSD_ThreadPool (int theNbThreads) +: myNbDefThreads (0), + myShutDown (false) +{ + Init (theNbThreads); + myNbDefThreads = NbThreads(); +} + +// ======================================================================= +// function : IsInUse +// purpose : +// ======================================================================= +bool OSD_ThreadPool::IsInUse() +{ + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More(); aThreadIter.Next()) + { + EnumeratedThread& aThread = aThreadIter.ChangeValue(); + if (!aThread.Lock()) + { + return true; + } + aThread.Free(); + } + return false; +} + +// ======================================================================= +// function : Init +// purpose : +// ======================================================================= +void OSD_ThreadPool::Init (int theNbThreads) +{ + const int aNbThreads = Max (0, (theNbThreads > 0 ? theNbThreads : OSD_Parallel::NbLogicalProcessors()) - 1); + if (myThreads.Size() == aNbThreads) + { + return; + } + + // release old threads + if (!myThreads.IsEmpty()) + { + NCollection_Array1 aLockThreads (myThreads.Lower(), myThreads.Upper()); + aLockThreads.Init (NULL); + int aThreadIndex = myThreads.Lower(); + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More(); aThreadIter.Next()) + { + EnumeratedThread& aThread = aThreadIter.ChangeValue(); + if (!aThread.Lock()) + { + for (NCollection_Array1::Iterator aLockThreadIter (aLockThreads); + aLockThreadIter.More() && aLockThreadIter.Value() != NULL; aLockThreadIter.Next()) + { + aLockThreadIter.ChangeValue()->Free(); + } + throw Standard_ProgramError ("Error: active ThreadPool is reinitialized"); + } + aLockThreads.SetValue (aThreadIndex++, &aThread); + } + } + release(); + + myShutDown = false; + if (aNbThreads > 0) + { + myThreads.Resize (0, aNbThreads - 1, false); + int aLastThreadIndex = 0; + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More(); aThreadIter.Next()) + { + EnumeratedThread& aThread = aThreadIter.ChangeValue(); + aThread.myPool = this; + aThread.myThreadIndex = aLastThreadIndex++; + aThread.SetFunction (&OSD_ThreadPool::EnumeratedThread::runThread); + } + } + else + { + NCollection_Array1 anEmpty; + myThreads.Move (anEmpty); + } +} + +// ======================================================================= +// function : ~OSD_ThreadPool +// purpose : +// ======================================================================= +OSD_ThreadPool::~OSD_ThreadPool() +{ + release(); +} + +// ======================================================================= +// function : release +// purpose : +// ======================================================================= +void OSD_ThreadPool::release() +{ + if (myThreads.IsEmpty()) + { + return; + } + + myShutDown = true; + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More(); aThreadIter.Next()) + { + aThreadIter.ChangeValue().WakeUp (NULL, false); + aThreadIter.ChangeValue().Wait(); + } +} + +// ======================================================================= +// function : perform +// purpose : +// ======================================================================= +void OSD_ThreadPool::Launcher::perform (JobInterface& theJob) +{ + run (theJob); + wait(); +} + +// ======================================================================= +// function : run +// purpose : +// ======================================================================= +void OSD_ThreadPool::Launcher::run (JobInterface& theJob) +{ + bool toCatchFpe = OSD::ToCatchFloatingSignals(); + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next()) + { + aThreadIter.ChangeValue()->WakeUp (&theJob, toCatchFpe); + } +} + +// ======================================================================= +// function : wait +// purpose : +// ======================================================================= +void OSD_ThreadPool::Launcher::wait() +{ + int aNbFailures = 0; + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next()) + { + aThreadIter.ChangeValue()->WaitIdle(); + if (!aThreadIter.Value()->myFailure.IsNull()) + { + ++aNbFailures; + } + } + if (aNbFailures == 0) + { + return; + } + + TCollection_AsciiString aFailures; + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next()) + { + if (!aThreadIter.Value()->myFailure.IsNull()) + { + if (aNbFailures == 1) + { + aThreadIter.Value()->myFailure->Reraise(); + } + + if (!aFailures.IsEmpty()) + { + aFailures += "\n"; + } + aFailures += aThreadIter.Value()->myFailure->GetMessageString(); + } + } + + aFailures = TCollection_AsciiString("Multiple exceptions:\n") + aFailures; + throw Standard_ProgramError (aFailures.ToCString()); +} + +// ======================================================================= +// function : performJob +// purpose : +// ======================================================================= +void OSD_ThreadPool::performJob (Handle(Standard_Failure)& theFailure, + OSD_ThreadPool::JobInterface* theJob, + int theThreadIndex) +{ + try + { + OCC_CATCH_SIGNALS + theJob->Perform (theThreadIndex); + } + catch (Standard_Failure const& aFailure) + { + TCollection_AsciiString aMsg = TCollection_AsciiString (aFailure.DynamicType()->Name()) + + ": " + aFailure.GetMessageString(); + theFailure = new Standard_ProgramError (aMsg.ToCString()); + } + catch (std::exception& anStdException) + { + TCollection_AsciiString aMsg = TCollection_AsciiString (typeid(anStdException).name()) + + ": " + anStdException.what(); + theFailure = new Standard_ProgramError (aMsg.ToCString()); + } + catch (...) + { + theFailure = new Standard_ProgramError ("Error: Unknown exception"); + } +} + +// ======================================================================= +// function : performThread +// purpose : +// ======================================================================= +void OSD_ThreadPool::EnumeratedThread::performThread() +{ + OSD::SetSignal (false); + for (;;) + { + myWakeEvent.Wait(); + myWakeEvent.Reset(); + if (myPool->myShutDown) + { + return; + } + + myFailure.Nullify(); + if (myJob != NULL) + { + OSD::SetSignal (myToCatchFpe); + OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex); + myJob = NULL; + } + myIdleEvent.Set(); + } +} + +// ======================================================================= +// function : runThread +// purpose : +// ======================================================================= +Standard_Address OSD_ThreadPool::EnumeratedThread::runThread (Standard_Address theTask) +{ + EnumeratedThread* aThread = static_cast(theTask); + aThread->performThread(); + return NULL; +} + +// ======================================================================= +// function : Launcher +// purpose : +// ======================================================================= +OSD_ThreadPool::Launcher::Launcher (OSD_ThreadPool& thePool, Standard_Integer theMaxThreads) +: mySelfThread (true), + myNbThreads (0) +{ + const int aNbThreads = theMaxThreads > 0 + ? Min (theMaxThreads, thePool.NbThreads()) + : (theMaxThreads < 0 + ? Max (thePool.NbDefaultThreadsToLaunch(), 1) + : 1); + myThreads.Resize (0, aNbThreads - 1, false); + myThreads.Init (NULL); + if (aNbThreads > 1) + { + for (NCollection_Array1::Iterator aThreadIter (thePool.myThreads); + aThreadIter.More(); aThreadIter.Next()) + { + if (aThreadIter.ChangeValue().Lock()) + { + myThreads.SetValue (myNbThreads, &aThreadIter.ChangeValue()); + // make thread index to fit into myThreads range + aThreadIter.ChangeValue().myThreadIndex = myNbThreads; + if (++myNbThreads == aNbThreads - 1) + { + break; + } + } + } + } + + // self thread should be executed last + myThreads.SetValue (myNbThreads, &mySelfThread); + mySelfThread.myThreadIndex = myNbThreads; + ++myNbThreads; +} + +// ======================================================================= +// function : Release +// purpose : +// ======================================================================= +void OSD_ThreadPool::Launcher::Release() +{ + for (NCollection_Array1::Iterator aThreadIter (myThreads); + aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next()) + { + if (aThreadIter.Value() != &mySelfThread) + { + aThreadIter.Value()->Free(); + } + } + + NCollection_Array1 anEmpty; + myThreads.Move (anEmpty); + myNbThreads = 0; +} diff --git a/src/OSD/OSD_ThreadPool.hxx b/src/OSD/OSD_ThreadPool.hxx new file mode 100644 index 0000000000..8b6dab6ea3 --- /dev/null +++ b/src/OSD/OSD_ThreadPool.hxx @@ -0,0 +1,301 @@ +// Created by: Kirill Gavrilov +// Copyright (c) 2017 OPEN CASCADE SAS +// +// This file is part of commercial software by OPEN CASCADE SAS. +// +// This software is furnished in accordance with the terms and conditions +// of the contract and with the inclusion of this copyright notice. +// This software or any other copy thereof may not be provided or otherwise +// be made available to any third party. +// No ownership title to the software is transferred hereby. +// +// OPEN CASCADE SAS makes no representation or warranties with respect to the +// performance of this software, and specifically disclaims any responsibility +// for any damages, special or consequential, connected with its use. + +#ifndef _OSD_ThreadPool_HeaderFile +#define _OSD_ThreadPool_HeaderFile + +#include +#include +#include +#include +#include +#include + +//! Class defining a thread pool for executing algorithms in multi-threaded mode. +//! Thread pool allocates requested amount of threads and keep them alive +//! (in sleep mode when unused) during thread pool lifetime. +//! The same pool can be used by multiple consumers, +//! including nested multi-threading algorithms and concurrent threads: +//! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher. +//! The functor performing a job takes two parameters - Thread Index and Data Index: +//! void operator(int theThreadIndex, int theDataIndex){} +//! Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form, +//! since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper(). +//! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case, +//! but application may prefer creating a dedicated pool for better control. +//! - Default thread pool allocates the amount of threads considering concurrency +//! level of the system (amount of logical processors). +//! This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init() +//! (the pool should not be used!). +//! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job. +//! Normally, single Launcher instance will occupy all threads available in thread pool, +//! so that nested multi-threaded algorithms (within the same thread) +//! and concurrent threads trying to use the same thread pool will run sequentially. +//! This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter +//! and Launcher constructor, so that single Launcher instance will occupy not all threads +//! in the pool allowing other threads to be used concurrently. +//! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way. +//! - Each working thread catches exceptions occurred during job execution, and Launcher will +//! throw Standard_Failure in a caller thread on completed execution. +class OSD_ThreadPool : public Standard_Transient +{ + DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient) +public: + + //! Return (or create) a default thread pool. + //! Number of threads argument will be considered only when called first time. + Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1); + +public: + + //! Main constructor. + //! Application may consider specifying more threads than actually + //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value + //! so that concurrent threads will be able using single Thread Pool instance more efficiently. + //! @param theNbThreads threads number to be created by pool + //! (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used) + Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1); + + //! Destructor. + Standard_EXPORT virtual ~OSD_ThreadPool(); + + //! Return TRUE if at least 2 threads are available (including self-thread). + bool HasThreads() const { return NbThreads() >= 2; } + + //! Return the lower thread index. + int LowerThreadIndex() const { return 0; } + + //! Return the upper thread index (last index is reserved for self-thread). + int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); } + + //! Return the number of threads; >= 1. + int NbThreads() const { return myThreads.Size() + 1; } + + //! Return maximum number of threads to be locked by a single Launcher object by default; + //! the entire thread pool size is returned by default. + int NbDefaultThreadsToLaunch() const { return myNbDefThreads; } + + //! Set maximum number of threads to be locked by a single Launcher object by default. + //! Should be set BEFORE first usage. + void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; } + + //! Checks if thread pools has active consumers. + Standard_EXPORT bool IsInUse(); + + //! Reinitialize the thread pool with a different number of threads. + //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown! + Standard_EXPORT void Init (int theNbThreads); + +protected: + + //! Thread function interface. + class JobInterface + { + public: + virtual void Perform (int theThreadIndex) = 0; + }; + + //! Thread with back reference to thread pool and thread index in it. + class EnumeratedThread : public OSD_Thread + { + friend class OSD_ThreadPool; + public: + EnumeratedThread (bool theIsSelfThread = false) + : myPool (NULL), myJob (NULL), myWakeEvent (false), + myIdleEvent (false), myThreadIndex (0), myUsageCounter(0), + myIsStarted (false), myToCatchFpe (false), + myIsSelfThread (theIsSelfThread) {} + + //! Occupy this thread for thread pool launcher. + //! @return TRUE on success, or FALSE if thread has been already occupied + Standard_EXPORT bool Lock(); + + //! Release this thread for thread pool launcher; should be called only after successful OccupyThread(). + Standard_EXPORT void Free(); + + //! Wake up the thread. + Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe); + + //! Wait the thread going into Idle state (finished jobs). + Standard_EXPORT void WaitIdle(); + + private: + + //! Method is executed in the context of thread. + void performThread(); + + //! Method is executed in the context of thread. + static Standard_Address runThread (Standard_Address theTask); + + private: + OSD_ThreadPool* myPool; + JobInterface* myJob; + Handle(Standard_Failure) myFailure; + Standard_Condition myWakeEvent; + Standard_Condition myIdleEvent; + int myThreadIndex; + volatile int myUsageCounter; + bool myIsStarted; + bool myToCatchFpe; + bool myIsSelfThread; + }; + +public: + + //! Launcher object locking a subset of threads (or all threads) + //! in a thread pool to perform parallel execution of the job. + class Launcher + { + public: + //! Lock specified number of threads from the thread pool. + //! If thread pool is already locked by another user, + //! Launcher will lock as many threads as possible + //! (if none will be locked, then single threaded execution will be done). + //! @param thePool thread pool to lock the threads + //! @param theMaxThreads number of threads to lock; + //! -1 specifies that default number of threads + //! to be used OSD_ThreadPool::NbDefaultThreadsToLaunch() + Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1); + + //! Release threads. + ~Launcher() { Release(); } + + //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread); + //! otherwise, the functor will be executed within the caller thread. + bool HasThreads() const { return myNbThreads >= 2; } + + //! Return amount of locked threads; >= 1. + int NbThreads() const { return myNbThreads; } + + //! Return the lower thread index. + int LowerThreadIndex() const { return 0; } + + //! Return the upper thread index (last index is reserved for the self-thread). + int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; } + + //! Simple primitive for parallelization of "for" loops, e.g.: + //! @code + //! for (int anIter = theBegin; anIter < theEnd; ++anIter) {} + //! @endcode + //! @param theBegin the first data index (inclusive) + //! @param theEnd the last data index (exclusive) + //! @param theFunctor functor providing an interface + //! "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index + template + void Perform (int theBegin, int theEnd, const Functor& theFunctor) + { + JobRange aData (theBegin, theEnd); + Job aJob (theFunctor, aData); + perform (aJob); + } + + //! Release threads before Launcher destruction. + Standard_EXPORT void Release(); + + protected: + + //! Execute job. + Standard_EXPORT void perform (JobInterface& theJob); + + //! Initialize job and start threads. + Standard_EXPORT void run (JobInterface& theJob); + + //! Wait threads execution. + Standard_EXPORT void wait(); + + private: + Launcher (const Launcher& theCopy); + Launcher& operator=(const Launcher& theCopy); + + private: + NCollection_Array1 myThreads; //!< array of locked threads (including self-thread) + EnumeratedThread mySelfThread; + int myNbThreads; //!< amount of locked threads + }; + +protected: + + //! Auxiliary class which ensures exclusive access to iterators of processed data pool. + class JobRange + { + public: + + //! Constructor + JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {} + + //! Returns const link on the first element. + const int& Begin() const { return myBegin; } + + //! Returns const link on the last element. + const int& End() const { return myEnd; } + + //! Returns first non processed element or end. + //! Thread-safe method. + int It() const { return Standard_Atomic_Increment (reinterpret_cast(&myIt)) - 1; } + + private: + JobRange (const JobRange& theCopy); + JobRange& operator=(const JobRange& theCopy); + + private: + const int& myBegin; //!< First element of range + const int& myEnd; //!< Last element of range + mutable int myIt; //!< First non processed element of range + }; + + //! Auxiliary wrapper class for thread function. + template class Job : public JobInterface + { + public: + + //! Constructor. + Job (const FunctorT& thePerformer, JobRange& theRange) + : myPerformer (thePerformer), myRange (theRange) {} + + //! Method is executed in the context of thread. + virtual void Perform (int theThreadIndex) Standard_OVERRIDE + { + for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It()) + { + myPerformer (theThreadIndex, anIter); + } + } + + private: + Job (const Job& theCopy); + Job& operator=(const Job& theCopy); + + private: //! @name private fields + const FunctorT& myPerformer; //!< Link on functor + const JobRange& myRange; //!< Link on processed data block + }; + + //! Release threads. + void release(); + + //! Perform the job and catch exceptions. + static void performJob (Handle(Standard_Failure)& theFailure, + OSD_ThreadPool::JobInterface* theJob, + int theThreadIndex); + +private: + + NCollection_Array1 myThreads; //!< array of defined threads (excluding self-thread) + int myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default + bool myShutDown; //!< flag to shut down (destroy) the thread pool + +}; + +#endif // _OSD_ThreadPool_HeaderFile diff --git a/src/OSD/OSD_signal.cxx b/src/OSD/OSD_signal.cxx index ea595b3e0a..06b6394039 100644 --- a/src/OSD/OSD_signal.cxx +++ b/src/OSD/OSD_signal.cxx @@ -17,6 +17,17 @@ #include #include +static Standard_THREADLOCAL Standard_Boolean fFltExceptions = Standard_False; + +//======================================================================= +//function : ToCatchFloatingSignals +//purpose : +//======================================================================= +Standard_Boolean OSD::ToCatchFloatingSignals() +{ + return fFltExceptions; +} + #ifdef _WIN32 //---------------------------- Windows NT System -------------------------------- @@ -67,7 +78,6 @@ static Standard_Boolean fCtrlBrk; static Standard_Boolean fMsgBox; -static Standard_Boolean fFltExceptions; // used to forbid simultaneous execution of setting / executing handlers static Standard_Mutex THE_SIGNAL_MUTEX; @@ -616,7 +626,6 @@ LONG _osd_debug ( void ) { #ifdef __linux__ #include //#include -static Standard_Boolean fFltExceptions = Standard_False; #endif // variable signalling that Control-C has been pressed (SIGINT signal) diff --git a/src/QABugs/QABugs_19.cxx b/src/QABugs/QABugs_19.cxx index 18c9387016..b2555aa941 100644 --- a/src/QABugs/QABugs_19.cxx +++ b/src/QABugs/QABugs_19.cxx @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -55,9 +56,16 @@ #include #include +#ifdef HAVE_TBB + #include + #include + #include +#endif + #include #include #include +#include #define QCOMPARE(val1, val2) \ di << "Checking " #val1 " == " #val2 << \ @@ -2512,19 +2520,25 @@ static Standard_Integer OCC25340 (Draw_Interpretor& /*theDI*/, class ParallelTest_Saxpy { public: - typedef NCollection_Array1 Vector; - //! Constructor - ParallelTest_Saxpy(const Vector& theX, Vector& theY, Standard_Real theScalar) - : myX(theX), - myY(theY), - myScalar(theScalar) + ParallelTest_Saxpy (const NCollection_Array1& theX, + NCollection_Array1& theY, + Standard_Real theScalar) + : myX (theX), myY (theY), myScalar (theScalar) {} + + int Begin() const { return 0; } + int End() const { return myX.Size(); } + + //! Dummy calculation + void operator() (Standard_Integer theIndex) const { + myY(theIndex) = myScalar * myX(theIndex) + myY(theIndex); } //! Dummy calculation - void operator() (const Standard_Integer theIndex) const + void operator() (Standard_Integer theThreadIndex, Standard_Integer theIndex) const { + (void )theThreadIndex; myY(theIndex) = myScalar * myX(theIndex) + myY(theIndex); } @@ -2532,18 +2546,51 @@ private: ParallelTest_Saxpy( const ParallelTest_Saxpy& ); ParallelTest_Saxpy& operator =( ParallelTest_Saxpy& ); -private: - const Vector& myX; - Vector& myY; +protected: + const NCollection_Array1& myX; + NCollection_Array1& myY; const Standard_Real myScalar; }; +class ParallelTest_SaxpyBatch : private ParallelTest_Saxpy +{ +public: + static const Standard_Integer THE_BATCH_SIZE = 10000000; + + ParallelTest_SaxpyBatch (const NCollection_Array1& theX, + NCollection_Array1& theY, + Standard_Real theScalar) + : ParallelTest_Saxpy (theX, theY, theScalar), + myNbBatches ((int )Ceiling ((double )theX.Size() / THE_BATCH_SIZE)) {} + + int Begin() const { return 0; } + int End() const { return myNbBatches; } + + void operator() (int theBatchIndex) const + { + const int aLower = theBatchIndex * THE_BATCH_SIZE; + const int anUpper = Min (aLower + THE_BATCH_SIZE - 1, myX.Upper()); + for (int i = aLower; i <= anUpper; ++i) + { + myY(i) = myScalar * myX(i) + myY(i); + } + } + + void operator() (int theThreadIndex, int theBatchIndex) const + { + (void )theThreadIndex; + (*this)(theBatchIndex); + } +private: + int myNbBatches; +}; + //--------------------------------------------------------------------- static Standard_Integer OCC24826(Draw_Interpretor& theDI, - Standard_Integer trheArgc, + Standard_Integer theArgc, const char** theArgv) { - if ( trheArgc != 2 ) + if ( theArgc != 2 ) { theDI << "Usage: " << theArgv[0] @@ -2556,38 +2603,240 @@ static Standard_Integer OCC24826(Draw_Interpretor& theDI, NCollection_Array1 aX (0, aLength - 1); NCollection_Array1 anY(0, aLength - 1); - for ( Standard_Integer i = 0; i < aLength; ++i ) { aX(i) = anY(i) = (Standard_Real) i; } - OSD_Timer aTimer; - - aTimer.Start(); - - //! Serial proccesing - for ( Standard_Integer i = 0; i < aLength; ++i ) + //! Serial processing + NCollection_Array1 anY1 = anY; + Standard_Real aTimeSeq = 0.0; { - anY(i) = 1e-6 * aX(i) + anY(i); + OSD_Timer aTimer; + aTimer.Start(); + const ParallelTest_Saxpy aFunctor (aX, anY1, 1e-6); + for (Standard_Integer i = 0; i < aLength; ++i) + { + aFunctor(i); + } + + aTimer.Stop(); + std::cout << " Processing time (sequential mode): 1x [reference]\n"; + aTimeSeq = aTimer.ElapsedTime(); + aTimer.Show (std::cout); } - aTimer.Stop(); - cout << "Processing time (sequential mode):\n"; - aTimer.Show(); + // Parallel processing + for (Standard_Integer aMode = 0; aMode <= 4; ++aMode) + { + NCollection_Array1 anY2 = anY; + OSD_Timer aTimer; + aTimer.Start(); + const char* aModeDesc = NULL; + const ParallelTest_Saxpy aFunctor1 (aX, anY2, 1e-6); + const ParallelTest_SaxpyBatch aFunctor2 (aX, anY2, 1e-6); + switch (aMode) + { + case 0: + { + aModeDesc = "OSD_Parallel::For()"; + OSD_Parallel::For (aFunctor1.Begin(), aFunctor1.End(), aFunctor1); + break; + } + case 1: + { + aModeDesc = "OSD_ThreadPool::Launcher"; + OSD_ThreadPool::Launcher aLauncher (*OSD_ThreadPool::DefaultPool()); + aLauncher.Perform (aFunctor1.Begin(), aFunctor1.End(), aFunctor1); + break; + } + case 2: + { + aModeDesc = "OSD_Parallel::Batched()"; + OSD_Parallel::For (aFunctor2.Begin(), aFunctor2.End(), aFunctor2); + break; + } + case 3: + { + aModeDesc = "OSD_ThreadPool::Launcher, Batched"; + OSD_ThreadPool::Launcher aLauncher (*OSD_ThreadPool::DefaultPool()); + aLauncher.Perform (aFunctor2.Begin(), aFunctor2.End(), aFunctor2); + break; + } + case 4: + { + #ifdef HAVE_TBB + aModeDesc = "tbb::parallel_for"; + tbb::parallel_for (aFunctor1.Begin(), aFunctor1.End(), aFunctor1); + break; + #else + continue; + #endif + } + } + aTimer.Stop(); + std::cout << " " << aModeDesc << ": " + << aTimeSeq / aTimer.ElapsedTime() << "x " << (aTimer.ElapsedTime() < aTimeSeq ? "[boost]" : "[slow-down]") << "\n"; + aTimer.Show (std::cout); - const ParallelTest_Saxpy aFunctor(aX, anY, 1e-6); + for (Standard_Integer i = 0; i < aLength; ++i) + { + if (anY2(i) != anY1(i)) + { + std::cerr << "Error: Parallel algorithm produced invalid result!\n"; + break; + } + } + } + return 0; +} - aTimer.Reset(); - aTimer.Start(); +//! Initializes the given square matrix with values that are generated by the given generator function. +template void initRandMatrix (NCollection_Array2& theMat, GeneratorT& theGen) +{ + for (int i = theMat.LowerRow(); i <= theMat.UpperRow(); ++i) + { + for (int j = theMat.LowerCol(); j <= theMat.UpperCol(); ++j) + { + theMat(i, j) = static_cast(theGen()); + } + } +} + +//! Compute the product of two square matrices in parallel. +class ParallelTest_MatMult +{ +public: + ParallelTest_MatMult (const NCollection_Array2& theMat1, + const NCollection_Array2& theMat2, + NCollection_Array2& theResult, int theSize) + : myMat1 (theMat1), myMat2 (theMat2), myResult (theResult), mySize (theSize) {} + + int Begin() const { return 0; } + int End() const { return mySize; } + + void operator() (int theIndex) const + { + for (int j = 0; j < mySize; ++j) + { + double aTmp = 0; + for (int k = 0; k < mySize; ++k) + { + aTmp += myMat1(theIndex, k) * myMat2(k, j); + } + myResult(theIndex, j) = aTmp; + } + } + + void operator() (int theThreadIndex, int theIndex) const + { + (void )theThreadIndex; + (*this)(theIndex); + } + +private: + ParallelTest_MatMult (const ParallelTest_MatMult& ); + ParallelTest_MatMult& operator= (ParallelTest_MatMult& ); + +protected: + const NCollection_Array2& myMat1; + const NCollection_Array2& myMat2; + NCollection_Array2& myResult; + int mySize; +}; + +//--------------------------------------------------------------------- +static Standard_Integer OCC29935(Draw_Interpretor& , + Standard_Integer theArgc, + const char** theArgv) +{ + if (theArgc != 2) + { + std::cout << "Syntax error: wrong number of arguments\n"; + return 1; + } + + // Generate data; + Standard_Integer aSize = Draw::Atoi (theArgv[1]); + + opencascade::std::mt19937 aGen (42); + NCollection_Array2 aMat1 (0, aSize - 1, 0, aSize - 1); + NCollection_Array2 aMat2 (0, aSize - 1, 0, aSize - 1); + NCollection_Array2 aMatResRef(0, aSize - 1, 0, aSize - 1); + NCollection_Array2 aMatRes (0, aSize - 1, 0, aSize - 1); + initRandMatrix (aMat1, aGen); + initRandMatrix (aMat2, aGen); + + //! Serial processing + Standard_Real aTimeSeq = 0.0; + { + OSD_Timer aTimer; + aTimer.Start(); + ParallelTest_MatMult aFunctor (aMat1, aMat2, aMatResRef, aSize); + for (int i = aFunctor.Begin(); i < aFunctor.End(); ++i) + { + aFunctor(i); + } + + aTimer.Stop(); + std::cout << " Processing time (sequential mode): 1x [reference]\n"; + aTimeSeq = aTimer.ElapsedTime(); + aTimer.Show (std::cout); + } // Parallel processing - OSD_Parallel::For(0, aLength, aFunctor); + for (Standard_Integer aMode = 0; aMode <= 2; ++aMode) + { + aMatRes.Init (0.0); - aTimer.Stop(); - cout << "Processing time (parallel mode):\n"; - aTimer.Show(); + OSD_Timer aTimer; + aTimer.Start(); + const char* aModeDesc = NULL; + ParallelTest_MatMult aFunctor1 (aMat1, aMat2, aMatRes, aSize); + switch (aMode) + { + case 0: + { + aModeDesc = "OSD_Parallel::For()"; + OSD_Parallel::For (aFunctor1.Begin(), aFunctor1.End(), aFunctor1); + break; + } + case 1: + { + aModeDesc = "OSD_ThreadPool::Launcher"; + OSD_ThreadPool::Launcher aLauncher (*OSD_ThreadPool::DefaultPool()); + aLauncher.Perform (aFunctor1.Begin(), aFunctor1.End(), aFunctor1); + break; + } + case 2: + { + #ifdef HAVE_TBB + aModeDesc = "tbb::parallel_for"; + tbb::parallel_for (aFunctor1.Begin(), aFunctor1.End(), aFunctor1); + break; + #else + continue; + #endif + } + } + aTimer.Stop(); + std::cout << " " << aModeDesc << ": " + << aTimeSeq / aTimer.ElapsedTime() << "x " << (aTimer.ElapsedTime() < aTimeSeq ? "[boost]" : "[slow-down]") << "\n"; + aTimer.Show (std::cout); + for (int i = 0; i < aSize; ++i) + { + for (int j = 0; j < aSize; ++j) + { + if (aMatRes(i, j) != aMatResRef(i, j)) + { + std::cerr << "Error: Parallel algorithm produced invalid result!\n"; + i = aSize; + break; + } + } + } + } return 0; } @@ -5160,7 +5409,8 @@ void QABugs::Commands_19(Draw_Interpretor& theCommands) { "\nOCAF persistence without setting environment variables", __FILE__, OCC24925, group); theCommands.Add ("OCC25043", "OCC25043 shape", __FILE__, OCC25043, group); - theCommands.Add ("OCC24826,", "This test performs simple saxpy test.\n Usage: OCC24826 length", __FILE__, OCC24826, group); + theCommands.Add ("OCC24826,", "This test performs simple saxpy test using multiple threads.\n Usage: OCC24826 length", __FILE__, OCC24826, group); + theCommands.Add ("OCC29935,", "This test performs product of two square matrices using multiple threads.\n Usage: OCC29935 size", __FILE__, OCC29935, group); theCommands.Add ("OCC24606", "OCC24606 : Tests ::FitAll for V3d view ('vfit' is for NIS view)", __FILE__, OCC24606, group); theCommands.Add ("OCC25202", "OCC25202 res shape numF1 face1 numF2 face2", __FILE__, OCC25202, group); theCommands.Add ("OCC7570", "OCC7570 shape", __FILE__, OCC7570, group); diff --git a/src/Standard/FILES b/src/Standard/FILES index 4710604718..1d752bb89f 100755 --- a/src/Standard/FILES +++ b/src/Standard/FILES @@ -11,6 +11,8 @@ Standard_Byte.hxx Standard_Character.hxx Standard_CLocaleSentry.cxx Standard_CLocaleSentry.hxx +Standard_Condition.cxx +Standard_Condition.hxx Standard_ConstructionError.hxx Standard_Copy.tcl Standard_CString.cxx diff --git a/src/Standard/Standard_Atomic.hxx b/src/Standard/Standard_Atomic.hxx index a35f0b7a23..678e211dc2 100644 --- a/src/Standard/Standard_Atomic.hxx +++ b/src/Standard/Standard_Atomic.hxx @@ -35,6 +35,14 @@ inline int Standard_Atomic_Increment (volatile int* theValue); //! and returns resulting decremented value. inline int Standard_Atomic_Decrement (volatile int* theValue); +//! Perform an atomic compare and swap. +//! That is, if the current value of *theValue is theOldValue, then write theNewValue into *theValue. +//! @param theValue pointer to variable to modify +//! @param theOldValue expected value to perform modification +//! @param theNewValue new value to set in case if *theValue was equal to theOldValue +//! @return TRUE if theNewValue has been set to *theValue +inline bool Standard_Atomic_CompareAndSwap (volatile int* theValue, int theOldValue, int theNewValue); + // Platform-dependent implementation #if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) // gcc explicitly defines the macros __GCC_HAVE_SYNC_COMPARE_AND_SWAP_* @@ -55,16 +63,23 @@ int Standard_Atomic_Decrement (volatile int* theValue) return __sync_sub_and_fetch (theValue, 1); } +bool Standard_Atomic_CompareAndSwap (volatile int* theValue, int theOldValue, int theNewValue) +{ + return __sync_val_compare_and_swap (theValue, theOldValue, theNewValue) == theOldValue; +} + #elif defined(_WIN32) extern "C" { long _InterlockedIncrement (volatile long* lpAddend); long _InterlockedDecrement (volatile long* lpAddend); + long _InterlockedCompareExchange (long volatile* Destination, long Exchange, long Comparand); } #if defined(_MSC_VER) && ! defined(__INTEL_COMPILER) // force intrinsic instead of WinAPI calls #pragma intrinsic (_InterlockedIncrement) #pragma intrinsic (_InterlockedDecrement) + #pragma intrinsic (_InterlockedCompareExchange) #endif // WinAPI function or MSVC intrinsic @@ -80,6 +95,11 @@ int Standard_Atomic_Decrement (volatile int* theValue) return _InterlockedDecrement (reinterpret_cast(theValue)); } +bool Standard_Atomic_CompareAndSwap (volatile int* theValue, int theOldValue, int theNewValue) +{ + return _InterlockedCompareExchange (reinterpret_cast(theValue), theNewValue, theOldValue) == theOldValue; +} + #elif defined(__APPLE__) // use atomic operations provided by MacOS @@ -95,6 +115,11 @@ int Standard_Atomic_Decrement (volatile int* theValue) return OSAtomicDecrement32Barrier (theValue); } +bool Standard_Atomic_CompareAndSwap (volatile int* theValue, int theOldValue, int theNewValue) +{ + return OSAtomicCompareAndSwapInt (theOldValue, theNewValue, theValue); +} + #elif defined(__ANDROID__) // Atomic operations that were exported by the C library didn't @@ -114,34 +139,9 @@ int Standard_Atomic_Decrement (volatile int* theValue) return __atomic_dec (theValue) - 1; // analog of __sync_fetch_and_sub } -#elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64)) -// use x86 / x86_64 inline assembly (compatibility with alien compilers / old GCC) - -inline int Standard_Atomic_Add (volatile int* theValue, int theVal) +bool Standard_Atomic_CompareAndSwap (volatile int* theValue, int theOldValue, int theNewValue) { - // C equivalent: - // *theValue += theVal; - // return *theValue; - - int previous; - __asm__ __volatile__ - ( - "lock xadd %0,%1" - : "=q"(previous), "=m"(*theValue) //output - : "0"(theVal), "m"(*theValue) //input - : "memory" //clobbers - ); - return previous + theVal; -} - -int Standard_Atomic_Increment (volatile int* theValue) -{ - return Standard_Atomic_Add (theValue, 1); -} - -int Standard_Atomic_Decrement (volatile int* theValue) -{ - return Standard_Atomic_Add (theValue, -1); + return __atomic_cmpxchg (theOldValue, theNewValue, theValue) == 0; } #else @@ -159,6 +159,16 @@ int Standard_Atomic_Decrement (volatile int* theValue) return --(*theValue); } +bool Standard_Atomic_CompareAndSwap (volatile int* theValue, int theOldValue, int theNewValue) +{ + if (*theValue == theOldValue) + { + *theValue = theNewValue; + return true; + } + return false; +} + #endif #endif //_Standard_Atomic_HeaderFile diff --git a/src/Standard/Standard_Condition.cxx b/src/Standard/Standard_Condition.cxx new file mode 100644 index 0000000000..232ae2b2ec --- /dev/null +++ b/src/Standard/Standard_Condition.cxx @@ -0,0 +1,207 @@ +// Created by: Kirill Gavrilov +// Copyright (c) 2018 OPEN CASCADE SAS +// +// This file is part of Open CASCADE Technology software library. +// +// This library is free software; you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License version 2.1 as published +// by the Free Software Foundation, with special exception defined in the file +// OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT +// distribution for complete text of the license and disclaimer of any warranty. +// +// Alternatively, this file may be used under the terms of Open CASCADE +// commercial license or contractual agreement. + +#ifdef _WIN32 + #include +#else + #include + #include + #include + #include +#endif + +#include "Standard_Condition.hxx" + +namespace +{ +#ifndef _WIN32 + //! clock_gettime() wrapper. + static void conditionGetRealTime (struct timespec& theTime) + { + #if defined(__APPLE__) + struct timeval aTime; + gettimeofday (&aTime, NULL); + theTime.tv_sec = aTime.tv_sec; + theTime.tv_nsec = aTime.tv_usec * 1000; + #else + clock_gettime (CLOCK_REALTIME, &theTime); + #endif + } +#endif +} + +// ======================================================================= +// function : Standard_Condition +// purpose : +// ======================================================================= +Standard_Condition::Standard_Condition (bool theIsSet) +#ifdef _WIN32 +: myEvent((void* )::CreateEvent (0, true, theIsSet, NULL)) +#else +: myFlag (theIsSet) +#endif +{ +#ifndef _WIN32 + pthread_mutex_init(&myMutex, 0); + pthread_cond_init (&myCond, 0); +#endif +} + +// ======================================================================= +// function : ~Standard_Condition +// purpose : +// ======================================================================= +Standard_Condition::~Standard_Condition() +{ +#ifdef _WIN32 + ::CloseHandle ((HANDLE )myEvent); +#else + pthread_mutex_destroy(&myMutex); + pthread_cond_destroy (&myCond); +#endif +} + +// ======================================================================= +// function : Set +// purpose : +// ======================================================================= +void Standard_Condition::Set() +{ +#ifdef _WIN32 + ::SetEvent ((HANDLE )myEvent); +#else + pthread_mutex_lock(&myMutex); + myFlag = true; + pthread_cond_broadcast(&myCond); + pthread_mutex_unlock (&myMutex); +#endif +} + +// ======================================================================= +// function : Reset +// purpose : +// ======================================================================= +void Standard_Condition::Reset() +{ +#ifdef _WIN32 + ::ResetEvent ((HANDLE )myEvent); +#else + pthread_mutex_lock (&myMutex); + myFlag = false; + pthread_mutex_unlock (&myMutex); +#endif +} + +// ======================================================================= +// function : Wait +// purpose : +// ======================================================================= +void Standard_Condition::Wait() +{ +#ifdef _WIN32 + ::WaitForSingleObject ((HANDLE )myEvent, INFINITE); +#else + pthread_mutex_lock (&myMutex); + if (!myFlag) + { + pthread_cond_wait (&myCond, &myMutex); + } + pthread_mutex_unlock (&myMutex); +#endif +} + +// ======================================================================= +// function : Wait +// purpose : +// ======================================================================= +bool Standard_Condition::Wait (int theTimeMilliseconds) +{ +#ifdef _WIN32 + return (::WaitForSingleObject ((HANDLE )myEvent, (DWORD )theTimeMilliseconds) != WAIT_TIMEOUT); +#else + bool isSignalled = true; + pthread_mutex_lock (&myMutex); + if (!myFlag) + { + struct timespec aNow; + struct timespec aTimeout; + conditionGetRealTime (aNow); + aTimeout.tv_sec = (theTimeMilliseconds / 1000); + aTimeout.tv_nsec = (theTimeMilliseconds - aTimeout.tv_sec * 1000) * 1000000; + if (aTimeout.tv_nsec > 1000000000) + { + aTimeout.tv_sec += 1; + aTimeout.tv_nsec -= 1000000000; + } + aTimeout.tv_sec += aNow.tv_sec; + aTimeout.tv_nsec += aNow.tv_nsec; + isSignalled = (pthread_cond_timedwait (&myCond, &myMutex, &aTimeout) != ETIMEDOUT); + } + pthread_mutex_unlock (&myMutex); + return isSignalled; +#endif +} + +// ======================================================================= +// function : Check +// purpose : +// ======================================================================= +bool Standard_Condition::Check() +{ +#ifdef _WIN32 + return (::WaitForSingleObject ((HANDLE )myEvent, (DWORD )0) != WAIT_TIMEOUT); +#else + bool isSignalled = true; + pthread_mutex_lock (&myMutex); + if (!myFlag) + { + struct timespec aNow; + struct timespec aTimeout; + conditionGetRealTime (aNow); + aTimeout.tv_sec = aNow.tv_sec; + aTimeout.tv_nsec = aNow.tv_nsec + 100; + isSignalled = (pthread_cond_timedwait (&myCond, &myMutex, &aTimeout) != ETIMEDOUT); + } + pthread_mutex_unlock (&myMutex); + return isSignalled; +#endif +} + +// ======================================================================= +// function : CheckReset +// purpose : +// ======================================================================= +bool Standard_Condition::CheckReset() +{ +#ifdef _WIN32 + const bool wasSignalled = (::WaitForSingleObject ((HANDLE )myEvent, (DWORD )0) != WAIT_TIMEOUT); + ::ResetEvent ((HANDLE )myEvent); + return wasSignalled; +#else + pthread_mutex_lock (&myMutex); + bool wasSignalled = myFlag; + if (!myFlag) + { + struct timespec aNow; + struct timespec aTimeout; + conditionGetRealTime (aNow); + aTimeout.tv_sec = aNow.tv_sec; + aTimeout.tv_nsec = aNow.tv_nsec + 100; + wasSignalled = (pthread_cond_timedwait (&myCond, &myMutex, &aTimeout) != ETIMEDOUT); + } + myFlag = false; + pthread_mutex_unlock (&myMutex); + return wasSignalled; +#endif +} diff --git a/src/Standard/Standard_Condition.hxx b/src/Standard/Standard_Condition.hxx new file mode 100644 index 0000000000..c2f33dcdcd --- /dev/null +++ b/src/Standard/Standard_Condition.hxx @@ -0,0 +1,80 @@ +// Created by: Kirill Gavrilov +// Copyright (c) 2018 OPEN CASCADE SAS +// +// This file is part of Open CASCADE Technology software library. +// +// This library is free software; you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License version 2.1 as published +// by the Free Software Foundation, with special exception defined in the file +// OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT +// distribution for complete text of the license and disclaimer of any warranty. +// +// Alternatively, this file may be used under the terms of Open CASCADE +// commercial license or contractual agreement. + +#ifndef _Standard_Condition_HeaderFile +#define _Standard_Condition_HeaderFile + +#include + +#ifndef _WIN32 + #include +#endif + +//! This is boolean flag intended for communication between threads. +//! One thread sets this flag to TRUE to indicate some event happened +//! and another thread either waits this event or checks periodically its state to perform job. +//! +//! This class provides interface similar to WinAPI Event objects. +class Standard_Condition +{ +public: + + //! Default constructor. + //! @param theIsSet Initial flag state + Standard_EXPORT Standard_Condition (bool theIsSet); + + //! Destructor. + Standard_EXPORT ~Standard_Condition(); + + //! Set event into signaling state. + Standard_EXPORT void Set(); + + //! Reset event (unset signaling state) + Standard_EXPORT void Reset(); + + //! Wait for Event (infinity). + Standard_EXPORT void Wait(); + + //! Wait for signal requested time. + //! @param theTimeMilliseconds wait limit in milliseconds + //! @return true if get event + Standard_EXPORT bool Wait (int theTimeMilliseconds); + + //! Do not wait for signal - just test it state. + //! @return true if get event + Standard_EXPORT bool Check(); + + //! Method perform two steps at-once - reset the event object + //! and returns true if it was in signaling state. + //! @return true if event object was in signaling state. + Standard_EXPORT bool CheckReset(); + +#ifdef _WIN32 + //! Access native HANDLE to Event object. + void* getHandle() const { return myEvent; } +#endif + +private: + +#ifdef _WIN32 + void* myEvent; +#else + pthread_mutex_t myMutex; + pthread_cond_t myCond; + bool myFlag; +#endif + +}; + +#endif // _Standard_Condition_HeaderFile diff --git a/src/Standard/Standard_Failure.cxx b/src/Standard/Standard_Failure.cxx index 5059c54238..b0b1540998 100644 --- a/src/Standard/Standard_Failure.cxx +++ b/src/Standard/Standard_Failure.cxx @@ -58,33 +58,6 @@ static void deallocate_message(Standard_CString aMessage) } } -//! @def Standard_THREADLOCAL -//! Define Standard_THREADLOCAL modifier as C++11 thread_local keyword where it is available. -#if defined(__clang__) - // CLang version: standard CLang > 3.3 or XCode >= 8 (but excluding 32-bit ARM) - // Note: this has to be in separate #if to avoid failure of preprocessor on other platforms - #if __has_feature(cxx_thread_local) - #define Standard_THREADLOCAL thread_local - #endif -#elif defined(__INTEL_COMPILER) - #if (defined(_MSC_VER) && _MSC_VER >= 1900 && __INTEL_COMPILER > 1400) - // requires msvcrt vc14+ (Visual Studio 2015+) - #define Standard_THREADLOCAL thread_local - #elif (!defined(_MSC_VER) && __INTEL_COMPILER > 1500) - #define Standard_THREADLOCAL thread_local - #endif -#elif (defined(_MSC_VER) && _MSC_VER >= 1900) - // msvcrt coming with vc14+ (VS2015+) - #define Standard_THREADLOCAL thread_local -#elif (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8))) - // GCC >= 4.8 - #define Standard_THREADLOCAL thread_local -#endif - -#ifndef Standard_THREADLOCAL - #define Standard_THREADLOCAL -#endif - // ****************************************************************** // Standard_Failure * // ****************************************************************** diff --git a/src/Standard/Standard_Macro.hxx b/src/Standard/Standard_Macro.hxx index 1d239b47f8..ebcd0f329e 100644 --- a/src/Standard/Standard_Macro.hxx +++ b/src/Standard/Standard_Macro.hxx @@ -68,6 +68,33 @@ #define Standard_UNUSED #endif +//! @def Standard_THREADLOCAL +//! Define Standard_THREADLOCAL modifier as C++11 thread_local keyword where it is available. +#if defined(__clang__) + // CLang version: standard CLang > 3.3 or XCode >= 8 (but excluding 32-bit ARM) + // Note: this has to be in separate #if to avoid failure of preprocessor on other platforms + #if __has_feature(cxx_thread_local) + #define Standard_THREADLOCAL thread_local + #endif +#elif defined(__INTEL_COMPILER) + #if (defined(_MSC_VER) && _MSC_VER >= 1900 && __INTEL_COMPILER > 1400) + // requires msvcrt vc14+ (Visual Studio 2015+) + #define Standard_THREADLOCAL thread_local + #elif (!defined(_MSC_VER) && __INTEL_COMPILER > 1500) + #define Standard_THREADLOCAL thread_local + #endif +#elif (defined(_MSC_VER) && _MSC_VER >= 1900) + // msvcrt coming with vc14+ (VS2015+) + #define Standard_THREADLOCAL thread_local +#elif (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8))) + // GCC >= 4.8 + #define Standard_THREADLOCAL thread_local +#endif + +#ifndef Standard_THREADLOCAL + #define Standard_THREADLOCAL +#endif + //! @def Standard_DEPRECATED("message") //! Can be used in declaration of a method or a class to mark it as deprecated. //! Use of such method or class will cause compiler warning (if supported by