1
0
mirror of https://git.dev.opencascade.org/repos/occt.git synced 2025-08-04 13:13:25 +03:00

0029935: Foundation Classes - introduce OSD_ThreadPool class defining a thread pool

New class OSD_ThreadPool has been introduced to define a Thread Pool for multi-threading algorithm.
Thread Pool assigns a serial number for each thread allowing Multi-Threading algorithm to allocate thread-local storage variables as an array whose size is the same as the number of threads.

OSD_ThreadPool also redirects exceptions to a thread calling parallel execution and consistently initializes FPE exception handling.

New class Standard_Condition provides a platform-independent  tool similar to Event in WinAPI.

A new auxiliary function Standard_Atomic_CompareAndSwap() has been introduced
for performing atomic compare and swap of integer number.
Standard_Atomic_Increment/Standard_Atomic_Decrement fallback implementation
using ASM code for x86 processors for GCC has been dropped;
instead, it is expected that GCC should be properly configured targeting modern x86 architectures.

OSD_Signal now declares fFltExceptions as thread_local variable accessible through OSD::ToCatchFloatingSignals() property.
Standard_THREADLOCAL macro (wrapping thread_local attribute) has been moved to public header Standard_Macro.hxx.

OSD_Parallel::ForEach() has been extended with new optional parameter theNbItems and uses OSD_ThreadPool::DefaultPool().
This commit is contained in:
kgv
2018-07-07 02:27:51 +03:00
committed by bugmaster
parent be3d8cbc02
commit 6f498847fa
17 changed files with 1404 additions and 125 deletions

View File

@@ -85,6 +85,8 @@ OSD_SingleProtection.hxx
OSD_SysType.hxx
OSD_Thread.cxx
OSD_Thread.hxx
OSD_ThreadPool.cxx
OSD_ThreadPool.hxx
OSD_ThreadFunction.hxx
OSD_Timer.cxx
OSD_Timer.hxx

View File

@@ -104,7 +104,10 @@ public:
//! user's code. Refer to Foundation Classes User's Guide for further details.
//!
Standard_EXPORT static void SetSignal (const Standard_Boolean theFloatingSignal = Standard_True);
//! Return floating signal catching value previously set by SetSignal().
Standard_EXPORT static Standard_Boolean ToCatchFloatingSignals();
//! Commands the process to sleep for a number of seconds.
Standard_EXPORT static void SecSleep (const Standard_Integer aDelay);

View File

@@ -254,9 +254,11 @@ private:
//! @param theEnd the last index (exclusive)
//! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}"
//! performing task for the specified iterator position
//! @param theNbItems number of items passed by iterator, -1 if unknown
Standard_EXPORT static void forEach (UniversalIterator& theBegin,
UniversalIterator& theEnd,
const FunctorInterface& theFunctor);
const FunctorInterface& theFunctor,
Standard_Integer theNbItems);
public: //! @name public methods
@@ -274,13 +276,15 @@ public: //! @name public methods
//! @param theFunctor functor providing an interface "void operator(InputIterator theIter){}"
//! performing task for specified iterator position
//! @param isForceSingleThreadExecution if true, then no threads will be created
//! @param theNbItems number of items passed by iterator, -1 if unknown
template <typename InputIterator, typename Functor>
static void ForEach(InputIterator theBegin,
InputIterator theEnd,
const Functor& theFunctor,
const Standard_Boolean isForceSingleThreadExecution = Standard_False)
const Standard_Boolean isForceSingleThreadExecution = Standard_False,
Standard_Integer theNbItems = -1)
{
if (isForceSingleThreadExecution)
if (isForceSingleThreadExecution || theNbItems == 1)
{
for (InputIterator it(theBegin); it != theEnd; ++it)
theFunctor(*it);
@@ -290,7 +294,7 @@ public: //! @name public methods
UniversalIterator aBegin(new IteratorWrapper<InputIterator>(theBegin));
UniversalIterator aEnd (new IteratorWrapper<InputIterator>(theEnd));
FunctorWrapperIter<InputIterator,Functor> aFunctor (theFunctor);
forEach(aBegin, aEnd, aFunctor);
forEach(aBegin, aEnd, aFunctor, theNbItems);
}
}
@@ -311,7 +315,7 @@ public: //! @name public methods
const Functor& theFunctor,
const Standard_Boolean isForceSingleThreadExecution = Standard_False)
{
if (isForceSingleThreadExecution)
if (isForceSingleThreadExecution || (theEnd - theBegin) == 1)
{
for (Standard_Integer it (theBegin); it != theEnd; ++it)
theFunctor(it);
@@ -321,7 +325,7 @@ public: //! @name public methods
UniversalIterator aBegin(new IteratorWrapper<Standard_Integer>(theBegin));
UniversalIterator aEnd (new IteratorWrapper<Standard_Integer>(theEnd));
FunctorWrapperInt<Functor> aFunctor (theFunctor);
forEach(aBegin, aEnd, aFunctor);
forEach(aBegin, aEnd, aFunctor, theEnd - theBegin);
}
}

View File

@@ -31,8 +31,10 @@
void OSD_Parallel::forEach (UniversalIterator& theBegin,
UniversalIterator& theEnd,
const FunctorInterface& theFunctor)
const FunctorInterface& theFunctor,
Standard_Integer theNbItems)
{
(void )theNbItems;
try
{
tbb::parallel_for_each(theBegin, theEnd, theFunctor);

View File

@@ -19,6 +19,8 @@
#include <OSD_Parallel.hxx>
#include <OSD_ThreadPool.hxx>
#include <NCollection_Array1.hxx>
#include <Standard_Mutex.hxx>
#include <OSD_Thread.hxx>
@@ -29,7 +31,7 @@ namespace
//! using threads (when TBB is not available);
//! it is derived from OSD_Parallel to get access to
//! Iterator and FunctorInterface nested types.
class OSD_Parallel_Threads : public OSD_Parallel
class OSD_Parallel_Threads : public OSD_ThreadPool, public OSD_Parallel
{
public:
//! Auxiliary class which ensures exclusive
@@ -84,7 +86,7 @@ namespace
};
//! Auxiliary wrapper class for thread function.
class Task
class Task : public JobInterface
{
public: //! @name public methods
@@ -97,15 +99,12 @@ namespace
//! Method is executed in the context of thread,
//! so this method defines the main calculations.
static Standard_Address Run(Standard_Address theTask)
virtual void Perform (int ) Standard_OVERRIDE
{
Task& aTask = *(static_cast<Task*>(theTask));
const Range& aData(aTask.myRange);
for (OSD_Parallel::UniversalIterator i = aData.It(); i != aData.End(); i = aData.It())
aTask.myPerformer(i);
return NULL;
for (OSD_Parallel::UniversalIterator anIter = myRange.It(); anIter != myRange.End(); anIter = myRange.It())
{
myPerformer (anIter);
}
}
private: //! @name private methods
@@ -117,9 +116,27 @@ namespace
Task& operator=(const Task& theCopy);
private: //! @name private fields
const FunctorInterface& myPerformer; //!< Link on functor
const Range& myRange; //!< Link on processed data block
};
const OSD_Parallel::FunctorInterface& myPerformer; //!< Link on functor.
const Range& myRange; //!< Link on processed data block.
//! Launcher specialization.
class UniversalLauncher : public Launcher
{
public:
//! Constructor.
UniversalLauncher (OSD_ThreadPool& thePool, int theMaxThreads = -1)
: Launcher (thePool, theMaxThreads) {}
//! Primitive for parallelization of "for" loops.
void Perform (OSD_Parallel::UniversalIterator& theBegin,
OSD_Parallel::UniversalIterator& theEnd,
const OSD_Parallel::FunctorInterface& theFunctor)
{
Range aData (theBegin, theEnd);
Task aJob (theFunctor, aData);
perform (aJob);
}
};
};
}
@@ -130,22 +147,13 @@ namespace
//=======================================================================
void OSD_Parallel::forEach (UniversalIterator& theBegin,
UniversalIterator& theEnd,
const FunctorInterface& theFunctor)
const FunctorInterface& theFunctor,
Standard_Integer theNbItems)
{
OSD_Parallel_Threads::Range aData(theBegin, theEnd);
OSD_Parallel_Threads::Task aTask(theFunctor, aData);
const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
for (Standard_Integer i = 0; i < aNbThreads; ++i)
{
OSD_Thread& aThread = aThreads(i);
aThread.SetFunction(&OSD_Parallel_Threads::Task::Run);
aThread.Run(&aTask);
}
for (Standard_Integer i = 0; i < aNbThreads; ++i)
aThreads(i).Wait();
const Handle(OSD_ThreadPool)& aThreadPool = OSD_ThreadPool::DefaultPool();
const Standard_Integer aNbThreads = theNbItems != -1 ? Min (theNbItems, aThreadPool->NbDefaultThreadsToLaunch()) : -1;
OSD_Parallel_Threads::UniversalLauncher aLauncher (*aThreadPool, aNbThreads);
aLauncher.Perform (theBegin, theEnd, theFunctor);
}
#endif /* ! HAVE_TBB */
#endif /* ! HAVE_TBB */

401
src/OSD/OSD_ThreadPool.cxx Normal file
View File

@@ -0,0 +1,401 @@
// Created by: Kirill Gavrilov
// Copyright (c) 2017 OPEN CASCADE SAS
//
// This file is part of commercial software by OPEN CASCADE SAS.
//
// This software is furnished in accordance with the terms and conditions
// of the contract and with the inclusion of this copyright notice.
// This software or any other copy thereof may not be provided or otherwise
// be made available to any third party.
// No ownership title to the software is transferred hereby.
//
// OPEN CASCADE SAS makes no representation or warranties with respect to the
// performance of this software, and specifically disclaims any responsibility
// for any damages, special or consequential, connected with its use.
#include <OSD_ThreadPool.hxx>
#include <OSD.hxx>
#include <Standard_Atomic.hxx>
#include <TCollection_AsciiString.hxx>
IMPLEMENT_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
// =======================================================================
// function : Lock
// purpose :
// =======================================================================
bool OSD_ThreadPool::EnumeratedThread::Lock()
{
return Standard_Atomic_CompareAndSwap (&myUsageCounter, 0, 1);
}
// =======================================================================
// function : Free
// purpose :
// =======================================================================
void OSD_ThreadPool::EnumeratedThread::Free()
{
Standard_Atomic_CompareAndSwap (&myUsageCounter, 1, 0);
}
// =======================================================================
// function : WakeUp
// purpose :
// =======================================================================
void OSD_ThreadPool::EnumeratedThread::WakeUp (JobInterface* theJob, bool theToCatchFpe)
{
myJob = theJob;
myToCatchFpe = theToCatchFpe;
if (myIsSelfThread)
{
if (theJob != NULL)
{
OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
}
return;
}
myWakeEvent.Set();
if (theJob != NULL && !myIsStarted)
{
myIsStarted = true;
Run (this);
}
}
// =======================================================================
// function : WaitIdle
// purpose :
// =======================================================================
void OSD_ThreadPool::EnumeratedThread::WaitIdle()
{
if (!myIsSelfThread)
{
myIdleEvent.Wait();
myIdleEvent.Reset();
}
}
// =======================================================================
// function : DefaultPool
// purpose :
// =======================================================================
const Handle(OSD_ThreadPool)& OSD_ThreadPool::DefaultPool (int theNbThreads)
{
static const Handle(OSD_ThreadPool) THE_GLOBAL_POOL = new OSD_ThreadPool (theNbThreads);
return THE_GLOBAL_POOL;
}
// =======================================================================
// function : OSD_ThreadPool
// purpose :
// =======================================================================
OSD_ThreadPool::OSD_ThreadPool (int theNbThreads)
: myNbDefThreads (0),
myShutDown (false)
{
Init (theNbThreads);
myNbDefThreads = NbThreads();
}
// =======================================================================
// function : IsInUse
// purpose :
// =======================================================================
bool OSD_ThreadPool::IsInUse()
{
for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
aThreadIter.More(); aThreadIter.Next())
{
EnumeratedThread& aThread = aThreadIter.ChangeValue();
if (!aThread.Lock())
{
return true;
}
aThread.Free();
}
return false;
}
// =======================================================================
// function : Init
// purpose :
// =======================================================================
void OSD_ThreadPool::Init (int theNbThreads)
{
const int aNbThreads = Max (0, (theNbThreads > 0 ? theNbThreads : OSD_Parallel::NbLogicalProcessors()) - 1);
if (myThreads.Size() == aNbThreads)
{
return;
}
// release old threads
if (!myThreads.IsEmpty())
{
NCollection_Array1<EnumeratedThread*> aLockThreads (myThreads.Lower(), myThreads.Upper());
aLockThreads.Init (NULL);
int aThreadIndex = myThreads.Lower();
for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
aThreadIter.More(); aThreadIter.Next())
{
EnumeratedThread& aThread = aThreadIter.ChangeValue();
if (!aThread.Lock())
{
for (NCollection_Array1<EnumeratedThread*>::Iterator aLockThreadIter (aLockThreads);
aLockThreadIter.More() && aLockThreadIter.Value() != NULL; aLockThreadIter.Next())
{
aLockThreadIter.ChangeValue()->Free();
}
throw Standard_ProgramError ("Error: active ThreadPool is reinitialized");
}
aLockThreads.SetValue (aThreadIndex++, &aThread);
}
}
release();
myShutDown = false;
if (aNbThreads > 0)
{
myThreads.Resize (0, aNbThreads - 1, false);
int aLastThreadIndex = 0;
for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
aThreadIter.More(); aThreadIter.Next())
{
EnumeratedThread& aThread = aThreadIter.ChangeValue();
aThread.myPool = this;
aThread.myThreadIndex = aLastThreadIndex++;
aThread.SetFunction (&OSD_ThreadPool::EnumeratedThread::runThread);
}
}
else
{
NCollection_Array1<EnumeratedThread> anEmpty;
myThreads.Move (anEmpty);
}
}
// =======================================================================
// function : ~OSD_ThreadPool
// purpose :
// =======================================================================
OSD_ThreadPool::~OSD_ThreadPool()
{
release();
}
// =======================================================================
// function : release
// purpose :
// =======================================================================
void OSD_ThreadPool::release()
{
if (myThreads.IsEmpty())
{
return;
}
myShutDown = true;
for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (myThreads);
aThreadIter.More(); aThreadIter.Next())
{
aThreadIter.ChangeValue().WakeUp (NULL, false);
aThreadIter.ChangeValue().Wait();
}
}
// =======================================================================
// function : perform
// purpose :
// =======================================================================
void OSD_ThreadPool::Launcher::perform (JobInterface& theJob)
{
run (theJob);
wait();
}
// =======================================================================
// function : run
// purpose :
// =======================================================================
void OSD_ThreadPool::Launcher::run (JobInterface& theJob)
{
bool toCatchFpe = OSD::ToCatchFloatingSignals();
for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
{
aThreadIter.ChangeValue()->WakeUp (&theJob, toCatchFpe);
}
}
// =======================================================================
// function : wait
// purpose :
// =======================================================================
void OSD_ThreadPool::Launcher::wait()
{
int aNbFailures = 0;
for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
{
aThreadIter.ChangeValue()->WaitIdle();
if (!aThreadIter.Value()->myFailure.IsNull())
{
++aNbFailures;
}
}
if (aNbFailures == 0)
{
return;
}
TCollection_AsciiString aFailures;
for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
{
if (!aThreadIter.Value()->myFailure.IsNull())
{
if (aNbFailures == 1)
{
aThreadIter.Value()->myFailure->Reraise();
}
if (!aFailures.IsEmpty())
{
aFailures += "\n";
}
aFailures += aThreadIter.Value()->myFailure->GetMessageString();
}
}
aFailures = TCollection_AsciiString("Multiple exceptions:\n") + aFailures;
throw Standard_ProgramError (aFailures.ToCString());
}
// =======================================================================
// function : performJob
// purpose :
// =======================================================================
void OSD_ThreadPool::performJob (Handle(Standard_Failure)& theFailure,
OSD_ThreadPool::JobInterface* theJob,
int theThreadIndex)
{
try
{
OCC_CATCH_SIGNALS
theJob->Perform (theThreadIndex);
}
catch (Standard_Failure const& aFailure)
{
TCollection_AsciiString aMsg = TCollection_AsciiString (aFailure.DynamicType()->Name())
+ ": " + aFailure.GetMessageString();
theFailure = new Standard_ProgramError (aMsg.ToCString());
}
catch (std::exception& anStdException)
{
TCollection_AsciiString aMsg = TCollection_AsciiString (typeid(anStdException).name())
+ ": " + anStdException.what();
theFailure = new Standard_ProgramError (aMsg.ToCString());
}
catch (...)
{
theFailure = new Standard_ProgramError ("Error: Unknown exception");
}
}
// =======================================================================
// function : performThread
// purpose :
// =======================================================================
void OSD_ThreadPool::EnumeratedThread::performThread()
{
OSD::SetSignal (false);
for (;;)
{
myWakeEvent.Wait();
myWakeEvent.Reset();
if (myPool->myShutDown)
{
return;
}
myFailure.Nullify();
if (myJob != NULL)
{
OSD::SetSignal (myToCatchFpe);
OSD_ThreadPool::performJob (myFailure, myJob, myThreadIndex);
myJob = NULL;
}
myIdleEvent.Set();
}
}
// =======================================================================
// function : runThread
// purpose :
// =======================================================================
Standard_Address OSD_ThreadPool::EnumeratedThread::runThread (Standard_Address theTask)
{
EnumeratedThread* aThread = static_cast<EnumeratedThread*>(theTask);
aThread->performThread();
return NULL;
}
// =======================================================================
// function : Launcher
// purpose :
// =======================================================================
OSD_ThreadPool::Launcher::Launcher (OSD_ThreadPool& thePool, Standard_Integer theMaxThreads)
: mySelfThread (true),
myNbThreads (0)
{
const int aNbThreads = theMaxThreads > 0
? Min (theMaxThreads, thePool.NbThreads())
: (theMaxThreads < 0
? Max (thePool.NbDefaultThreadsToLaunch(), 1)
: 1);
myThreads.Resize (0, aNbThreads - 1, false);
myThreads.Init (NULL);
if (aNbThreads > 1)
{
for (NCollection_Array1<EnumeratedThread>::Iterator aThreadIter (thePool.myThreads);
aThreadIter.More(); aThreadIter.Next())
{
if (aThreadIter.ChangeValue().Lock())
{
myThreads.SetValue (myNbThreads, &aThreadIter.ChangeValue());
// make thread index to fit into myThreads range
aThreadIter.ChangeValue().myThreadIndex = myNbThreads;
if (++myNbThreads == aNbThreads - 1)
{
break;
}
}
}
}
// self thread should be executed last
myThreads.SetValue (myNbThreads, &mySelfThread);
mySelfThread.myThreadIndex = myNbThreads;
++myNbThreads;
}
// =======================================================================
// function : Release
// purpose :
// =======================================================================
void OSD_ThreadPool::Launcher::Release()
{
for (NCollection_Array1<EnumeratedThread*>::Iterator aThreadIter (myThreads);
aThreadIter.More() && aThreadIter.Value() != NULL; aThreadIter.Next())
{
if (aThreadIter.Value() != &mySelfThread)
{
aThreadIter.Value()->Free();
}
}
NCollection_Array1<EnumeratedThread*> anEmpty;
myThreads.Move (anEmpty);
myNbThreads = 0;
}

301
src/OSD/OSD_ThreadPool.hxx Normal file
View File

@@ -0,0 +1,301 @@
// Created by: Kirill Gavrilov
// Copyright (c) 2017 OPEN CASCADE SAS
//
// This file is part of commercial software by OPEN CASCADE SAS.
//
// This software is furnished in accordance with the terms and conditions
// of the contract and with the inclusion of this copyright notice.
// This software or any other copy thereof may not be provided or otherwise
// be made available to any third party.
// No ownership title to the software is transferred hereby.
//
// OPEN CASCADE SAS makes no representation or warranties with respect to the
// performance of this software, and specifically disclaims any responsibility
// for any damages, special or consequential, connected with its use.
#ifndef _OSD_ThreadPool_HeaderFile
#define _OSD_ThreadPool_HeaderFile
#include <NCollection_Array1.hxx>
#include <OSD_Thread.hxx>
#include <OSD_Parallel.hxx>
#include <Standard_Atomic.hxx>
#include <Standard_Condition.hxx>
#include <Standard_Mutex.hxx>
//! Class defining a thread pool for executing algorithms in multi-threaded mode.
//! Thread pool allocates requested amount of threads and keep them alive
//! (in sleep mode when unused) during thread pool lifetime.
//! The same pool can be used by multiple consumers,
//! including nested multi-threading algorithms and concurrent threads:
//! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher.
//! The functor performing a job takes two parameters - Thread Index and Data Index:
//! void operator(int theThreadIndex, int theDataIndex){}
//! Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form,
//! since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper().
//! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case,
//! but application may prefer creating a dedicated pool for better control.
//! - Default thread pool allocates the amount of threads considering concurrency
//! level of the system (amount of logical processors).
//! This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init()
//! (the pool should not be used!).
//! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job.
//! Normally, single Launcher instance will occupy all threads available in thread pool,
//! so that nested multi-threaded algorithms (within the same thread)
//! and concurrent threads trying to use the same thread pool will run sequentially.
//! This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter
//! and Launcher constructor, so that single Launcher instance will occupy not all threads
//! in the pool allowing other threads to be used concurrently.
//! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way.
//! - Each working thread catches exceptions occurred during job execution, and Launcher will
//! throw Standard_Failure in a caller thread on completed execution.
class OSD_ThreadPool : public Standard_Transient
{
DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
public:
//! Return (or create) a default thread pool.
//! Number of threads argument will be considered only when called first time.
Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1);
public:
//! Main constructor.
//! Application may consider specifying more threads than actually
//! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value
//! so that concurrent threads will be able using single Thread Pool instance more efficiently.
//! @param theNbThreads threads number to be created by pool
//! (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used)
Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1);
//! Destructor.
Standard_EXPORT virtual ~OSD_ThreadPool();
//! Return TRUE if at least 2 threads are available (including self-thread).
bool HasThreads() const { return NbThreads() >= 2; }
//! Return the lower thread index.
int LowerThreadIndex() const { return 0; }
//! Return the upper thread index (last index is reserved for self-thread).
int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); }
//! Return the number of threads; >= 1.
int NbThreads() const { return myThreads.Size() + 1; }
//! Return maximum number of threads to be locked by a single Launcher object by default;
//! the entire thread pool size is returned by default.
int NbDefaultThreadsToLaunch() const { return myNbDefThreads; }
//! Set maximum number of threads to be locked by a single Launcher object by default.
//! Should be set BEFORE first usage.
void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; }
//! Checks if thread pools has active consumers.
Standard_EXPORT bool IsInUse();
//! Reinitialize the thread pool with a different number of threads.
//! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown!
Standard_EXPORT void Init (int theNbThreads);
protected:
//! Thread function interface.
class JobInterface
{
public:
virtual void Perform (int theThreadIndex) = 0;
};
//! Thread with back reference to thread pool and thread index in it.
class EnumeratedThread : public OSD_Thread
{
friend class OSD_ThreadPool;
public:
EnumeratedThread (bool theIsSelfThread = false)
: myPool (NULL), myJob (NULL), myWakeEvent (false),
myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
myIsStarted (false), myToCatchFpe (false),
myIsSelfThread (theIsSelfThread) {}
//! Occupy this thread for thread pool launcher.
//! @return TRUE on success, or FALSE if thread has been already occupied
Standard_EXPORT bool Lock();
//! Release this thread for thread pool launcher; should be called only after successful OccupyThread().
Standard_EXPORT void Free();
//! Wake up the thread.
Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe);
//! Wait the thread going into Idle state (finished jobs).
Standard_EXPORT void WaitIdle();
private:
//! Method is executed in the context of thread.
void performThread();
//! Method is executed in the context of thread.
static Standard_Address runThread (Standard_Address theTask);
private:
OSD_ThreadPool* myPool;
JobInterface* myJob;
Handle(Standard_Failure) myFailure;
Standard_Condition myWakeEvent;
Standard_Condition myIdleEvent;
int myThreadIndex;
volatile int myUsageCounter;
bool myIsStarted;
bool myToCatchFpe;
bool myIsSelfThread;
};
public:
//! Launcher object locking a subset of threads (or all threads)
//! in a thread pool to perform parallel execution of the job.
class Launcher
{
public:
//! Lock specified number of threads from the thread pool.
//! If thread pool is already locked by another user,
//! Launcher will lock as many threads as possible
//! (if none will be locked, then single threaded execution will be done).
//! @param thePool thread pool to lock the threads
//! @param theMaxThreads number of threads to lock;
//! -1 specifies that default number of threads
//! to be used OSD_ThreadPool::NbDefaultThreadsToLaunch()
Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1);
//! Release threads.
~Launcher() { Release(); }
//! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread);
//! otherwise, the functor will be executed within the caller thread.
bool HasThreads() const { return myNbThreads >= 2; }
//! Return amount of locked threads; >= 1.
int NbThreads() const { return myNbThreads; }
//! Return the lower thread index.
int LowerThreadIndex() const { return 0; }
//! Return the upper thread index (last index is reserved for the self-thread).
int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; }
//! Simple primitive for parallelization of "for" loops, e.g.:
//! @code
//! for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
//! @endcode
//! @param theBegin the first data index (inclusive)
//! @param theEnd the last data index (exclusive)
//! @param theFunctor functor providing an interface
//! "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index
template<typename Functor>
void Perform (int theBegin, int theEnd, const Functor& theFunctor)
{
JobRange aData (theBegin, theEnd);
Job<Functor> aJob (theFunctor, aData);
perform (aJob);
}
//! Release threads before Launcher destruction.
Standard_EXPORT void Release();
protected:
//! Execute job.
Standard_EXPORT void perform (JobInterface& theJob);
//! Initialize job and start threads.
Standard_EXPORT void run (JobInterface& theJob);
//! Wait threads execution.
Standard_EXPORT void wait();
private:
Launcher (const Launcher& theCopy);
Launcher& operator=(const Launcher& theCopy);
private:
NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread)
EnumeratedThread mySelfThread;
int myNbThreads; //!< amount of locked threads
};
protected:
//! Auxiliary class which ensures exclusive access to iterators of processed data pool.
class JobRange
{
public:
//! Constructor
JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {}
//! Returns const link on the first element.
const int& Begin() const { return myBegin; }
//! Returns const link on the last element.
const int& End() const { return myEnd; }
//! Returns first non processed element or end.
//! Thread-safe method.
int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; }
private:
JobRange (const JobRange& theCopy);
JobRange& operator=(const JobRange& theCopy);
private:
const int& myBegin; //!< First element of range
const int& myEnd; //!< Last element of range
mutable int myIt; //!< First non processed element of range
};
//! Auxiliary wrapper class for thread function.
template<typename FunctorT> class Job : public JobInterface
{
public:
//! Constructor.
Job (const FunctorT& thePerformer, JobRange& theRange)
: myPerformer (thePerformer), myRange (theRange) {}
//! Method is executed in the context of thread.
virtual void Perform (int theThreadIndex) Standard_OVERRIDE
{
for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It())
{
myPerformer (theThreadIndex, anIter);
}
}
private:
Job (const Job& theCopy);
Job& operator=(const Job& theCopy);
private: //! @name private fields
const FunctorT& myPerformer; //!< Link on functor
const JobRange& myRange; //!< Link on processed data block
};
//! Release threads.
void release();
//! Perform the job and catch exceptions.
static void performJob (Handle(Standard_Failure)& theFailure,
OSD_ThreadPool::JobInterface* theJob,
int theThreadIndex);
private:
NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread)
int myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default
bool myShutDown; //!< flag to shut down (destroy) the thread pool
};
#endif // _OSD_ThreadPool_HeaderFile

View File

@@ -17,6 +17,17 @@
#include <Standard_DivideByZero.hxx>
#include <Standard_Overflow.hxx>
static Standard_THREADLOCAL Standard_Boolean fFltExceptions = Standard_False;
//=======================================================================
//function : ToCatchFloatingSignals
//purpose :
//=======================================================================
Standard_Boolean OSD::ToCatchFloatingSignals()
{
return fFltExceptions;
}
#ifdef _WIN32
//---------------------------- Windows NT System --------------------------------
@@ -67,7 +78,6 @@
static Standard_Boolean fCtrlBrk;
static Standard_Boolean fMsgBox;
static Standard_Boolean fFltExceptions;
// used to forbid simultaneous execution of setting / executing handlers
static Standard_Mutex THE_SIGNAL_MUTEX;
@@ -616,7 +626,6 @@ LONG _osd_debug ( void ) {
#ifdef __linux__
#include <cfenv>
//#include <fenv.h>
static Standard_Boolean fFltExceptions = Standard_False;
#endif
// variable signalling that Control-C has been pressed (SIGINT signal)