1
0
mirror of https://git.dev.opencascade.org/repos/occt.git synced 2025-08-14 13:30:48 +03:00

0024826: Wrapping of parallelisation algorithms

Simple primitives to parallelize loops type "for" and "foreach" were implemented. The primitives encapsulates complete logic for creating and managing parallel context of loops. Moreover the primitives may be a wrapper for some primitives from 3rd-party library - TBB.

To use it is necessary to implement TBB like interface which is based on functors. For example:

Class Functor
{
public:
  void operator() ([proccesing instance]) const
  {
    //...
  }
};

In the body of the operator () should be implemented thread-safe logic of computations that can be performed in parallel context. If parallelized loop iterates on the collections with direct access by index (such as Vector, Array), it is more efficient to use the primitive ParallelFor (because it has no critical section).

All parts of  OCC code which are using tbb were changed on new primitives.

0024826: Wrapping of parallelisation algorithms

Small fix.
This commit is contained in:
msv
2015-02-05 15:49:35 +03:00
committed by bugmaster
parent a61133c8c7
commit c7b59798ca
34 changed files with 837 additions and 683 deletions

View File

@@ -0,0 +1 @@
CSF_TBB

View File

@@ -17,5 +17,7 @@ OSD_MAllocHook.cxx
OSD_MAllocHook.hxx
OSD_MemInfo.hxx
OSD_MemInfo.cxx
OSD_Parallel.hxx
OSD_Parallel.cxx
OSD_OpenFile.hxx
OSD_OpenFile.cxx

93
src/OSD/OSD_Parallel.cxx Normal file
View File

@@ -0,0 +1,93 @@
// Created on: 2014-08-19
// Created by: Alexander Zaikin
// Copyright (c) 1996-1999 Matra Datavision
// Copyright (c) 2013-2014 OPEN CASCADE SAS
//
// This file is part of Open CASCADE Technology software library.
//
// This library is free software; you can redistribute it and/or modify it under
// the terms of the GNU Lesser General Public License version 2.1 as published
// by the Free Software Foundation, with special exception defined in the file
// OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
// distribution for complete text of the license and disclaimer of any warranty.
//
// Alternatively, this file may be used under the terms of Open CASCADE
// commercial license or contractual agreement.
#include <OSD_Parallel.hxx>
#ifdef _WIN32
#include <windows.h>
#include <process.h>
#else
#include <sys/types.h>
#ifdef __sun
#include <sys/processor.h>
#include <sys/procset.h>
#else
#include <sched.h>
#endif
#endif
#ifdef _WIN32
namespace {
// for a 64-bit app running under 64-bit Windows, this is FALSE
static bool isWow64()
{
typedef BOOL (WINAPI *LPFN_ISWOW64PROCESS) (HANDLE , PBOOL);
BOOL bIsWow64 = FALSE;
HMODULE aKern32Module = GetModuleHandleW(L"kernel32");
LPFN_ISWOW64PROCESS aFunIsWow64 = (aKern32Module == NULL) ? (LPFN_ISWOW64PROCESS )NULL
: (LPFN_ISWOW64PROCESS)GetProcAddress(aKern32Module, "IsWow64Process");
return aFunIsWow64 != NULL &&
aFunIsWow64(GetCurrentProcess(), &bIsWow64) &&
bIsWow64 != FALSE;
}
}
#endif
//=======================================================================
//function : NbLogicalProcessors
//purpose : Returns number of logical proccessors.
//=======================================================================
Standard_Integer OSD_Parallel::NbLogicalProcessors()
{
static Standard_Integer aNumLogicalProcessors = 0;
if ( aNumLogicalProcessors != 0 )
{
return aNumLogicalProcessors;
}
#ifdef _WIN32
// GetSystemInfo() will return the number of processors in a data field in a SYSTEM_INFO structure.
SYSTEM_INFO aSysInfo;
if ( isWow64() )
{
typedef BOOL (WINAPI *LPFN_GSI)(LPSYSTEM_INFO );
HMODULE aKern32 = GetModuleHandleW(L"kernel32");
LPFN_GSI aFuncSysInfo = (LPFN_GSI )GetProcAddress(aKern32, "GetNativeSystemInfo");
// So, they suggest 32-bit apps should call this instead of the other in WOW64
if ( aFuncSysInfo != NULL )
{
aFuncSysInfo(&aSysInfo);
}
else
{
GetSystemInfo(&aSysInfo);
}
}
else
{
GetSystemInfo(&aSysInfo);
}
aNumLogicalProcessors = aSysInfo.dwNumberOfProcessors;
#else
// These are the choices. We'll check number of processors online.
// _SC_NPROCESSORS_CONF Number of processors configured
// _SC_NPROCESSORS_MAX Max number of processors supported by platform
// _SC_NPROCESSORS_ONLN Number of processors online
aNumLogicalProcessors = (Standard_Integer)sysconf(_SC_NPROCESSORS_ONLN);
#endif
return aNumLogicalProcessors;
}

298
src/OSD/OSD_Parallel.hxx Normal file
View File

@@ -0,0 +1,298 @@
// Copyright (c) 2013-2014 OPEN CASCADE SAS
//
// This file is part of Open CASCADE Technology software library.
//
// This library is free software; you can redistribute it and/or modify it under
// the terms of the GNU Lesser General Public License version 2.1 as published
// by the Free Software Foundation, with special exception defined in the file
// OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
// distribution for complete text of the license and disclaimer of any warranty.
//
// Alternatively, this file may be used under the terms of Open CASCADE
// commercial license or contractual agreement.
#ifndef OSD_Parallel_HeaderFile
#define OSD_Parallel_HeaderFile
#include <OSD_Thread.hxx>
#include <Standard_Mutex.hxx>
#include <Standard_NotImplemented.hxx>
#include <Standard_Atomic.hxx>
#include <NCollection_Array1.hxx>
#ifdef HAVE_TBB
#include <tbb/parallel_for.h>
#include <tbb/parallel_for_each.h>
#include <tbb/blocked_range.h>
#endif
//! @class OSD_Parallel
//! @brief Simplifies code parallelization.
//!
//! The Class provides an interface of parallel processing "for" and "foreach" loops.
//! These primitives encapsulates complete logic for creating and managing parallel context of loops.
//! Moreover the primitives may be a wrapper for some primitives from 3rd-party library - TBB.
//! To use it is necessary to implement TBB like interface which is based on functors.
//!
//! @code
//! class Functor
//! {
//! public:
//! void operator() ([proccesing instance]) const
//! {
//! //...
//! }
//! };
//! @endcode
//!
//! In the body of the operator () should be implemented thread-safe logic of computations that can be performed in parallel context.
//! If parallelized loop iterates on the collections with direct access by index (such as Vector, Array),
//! it is more efficient to use the primitive ParallelFor (because it has no critical section).
class OSD_Parallel
{
//! Auxiliary class which ensures exclusive
//! access to iterators of processed data pool.
template <typename Value>
class Range
{
public: //! @name public methods
typedef Value Iterator;
//! Constructor
Range(const Value& theBegin, const Value& theEnd)
: myBegin(theBegin),
myEnd (theEnd),
myIt (theBegin)
{
}
//! Returns const link on the first element.
inline const Value& Begin() const
{
return myBegin;
}
//! Returns const link on the last element.
inline const Value& End() const
{
return myEnd;
}
//! Returns first non processed element or end.
//! Thread-safe method.
inline Iterator It() const
{
Standard_Mutex::Sentry aMutex( myMutex );
return ( myIt != myEnd ) ? myIt++ : myEnd;
}
private: //! @name private methods
//! Empty copy constructor
Range(const Range& theCopy);
//! Empty copy operator.
Range& operator=(const Range& theCopy);
private: //! @name private fields
const Value& myBegin; //!< Fisrt element of range.
const Value& myEnd; //!< Last element of range.
mutable Value myIt; //!< First non processed element of range.
mutable Standard_Mutex myMutex; //!< Access controller for the first non processed element.
};
//! Auxiliary wrapper class for thread function.
template <typename Functor, typename InputIterator>
class Task
{
public: //! @name public methods
//! Constructor.
Task(const Functor& thePerformer, Range<InputIterator>& theRange)
: myPerformer(thePerformer),
myRange (theRange)
{
}
//! Method is executed in the context of thread,
//! so this method defines the main calculations.
static Standard_Address RunWithIterator(Standard_Address theTask)
{
Task<Functor, InputIterator>& aTask =
*( static_cast< Task<Functor, InputIterator>* >(theTask) );
const Range<InputIterator>& aData( aTask.myRange );
typename Range<InputIterator>::Iterator i = aData.It();
for ( ; i != aData.End(); i = aData.It() )
{
aTask.myPerformer(*i);
}
return NULL;
}
//! Method is executed in the context of thread,
//! so this method defines the main calculations.
static Standard_Address RunWithIndex(Standard_Address theTask)
{
Task<Functor, InputIterator>& aTask =
*( static_cast< Task<Functor, Standard_Integer>* >(theTask) );
const Range<Standard_Integer>& aData( aTask.myRange );
Standard_Integer i = aData.It();
for ( ; i < aData.End(); i = aData.It())
{
aTask.myPerformer(i);
}
return NULL;
}
private: //! @name private methods
//! Empty copy constructor.
Task(const Task& theCopy);
//! Empty copy operator.
Task& operator=(const Task& theCopy);
private: //! @name private fields
const Functor& myPerformer; //!< Link on functor.
const Range<InputIterator>& myRange; //!< Link on processed data block.
};
public: //! @name public methods
//! Returns number of logical proccesrs.
Standard_EXPORT static Standard_Integer NbLogicalProcessors();
//! Simple primitive for parallelization of "foreach" loops.
template <typename InputIterator, typename Functor>
static void ForEach( InputIterator theBegin,
InputIterator theEnd,
const Functor& theFunctor,
const Standard_Boolean isForceSingleThreadExecution
= Standard_False );
//! Simple primitive for parallelization of "for" loops.
template <typename Functor>
static void For( const Standard_Integer theBegin,
const Standard_Integer theEnd,
const Functor& theFunctor,
const Standard_Boolean isForceSingleThreadExecution
= Standard_False );
};
//=======================================================================
//function : OSD_Parallel::Range::It
//purpose : Template concretization.
//=======================================================================
template<> inline Standard_Integer OSD_Parallel::Range<Standard_Integer>::It() const
{
return Standard_Atomic_Increment( reinterpret_cast<volatile int*>(&myIt) ) - 1;
}
//=======================================================================
//function : ParallelForEach
//purpose :
//=======================================================================
template <typename InputIterator, typename Functor>
void OSD_Parallel::ForEach( InputIterator theBegin,
InputIterator theEnd,
const Functor& theFunctor,
const Standard_Boolean isForceSingleThreadExecution )
{
if ( isForceSingleThreadExecution )
{
for ( InputIterator it(theBegin); it != theEnd; it++ )
theFunctor(*it);
return;
}
#ifdef HAVE_TBB
{
try
{
tbb::parallel_for_each(theBegin, theEnd, theFunctor);
}
catch ( tbb::captured_exception& anException )
{
Standard_NotImplemented::Raise(anException.what());
}
}
#else
{
Range<InputIterator> aData(theBegin, theEnd);
Task<Functor, InputIterator> aTask(theFunctor, aData);
const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
for ( Standard_Integer i = 0; i < aNbThreads; ++i )
{
OSD_Thread& aThread = aThreads(i);
aThread.SetFunction(&Task<Functor, InputIterator>::RunWithIterator);
aThread.Run(&aTask);
}
for ( Standard_Integer i = 0; i < aNbThreads; ++i )
aThreads(i).Wait();
}
#endif
}
//=======================================================================
//function : ParallelFor
//purpose :
//=======================================================================
template <typename Functor>
void OSD_Parallel::For( const Standard_Integer theBegin,
const Standard_Integer theEnd,
const Functor& theFunctor,
const Standard_Boolean isForceSingleThreadExecution )
{
if ( isForceSingleThreadExecution )
{
for ( Standard_Integer i = theBegin; i < theEnd; ++i )
theFunctor(i);
return;
}
#ifdef HAVE_TBB
{
try
{
tbb::parallel_for( theBegin, theEnd, theFunctor );
}
catch ( tbb::captured_exception& anException )
{
Standard_NotImplemented::Raise(anException.what());
}
}
#else
{
Range<Standard_Integer> aData(theBegin, theEnd);
Task<Functor, Standard_Integer> aTask(theFunctor, aData);
const Standard_Integer aNbThreads = OSD_Parallel::NbLogicalProcessors();
NCollection_Array1<OSD_Thread> aThreads(0, aNbThreads - 1);
for ( Standard_Integer i = 0; i < aNbThreads; ++i )
{
OSD_Thread& aThread = aThreads(i);
aThread.SetFunction(&Task<Functor, Standard_Integer>::RunWithIndex);
aThread.Run(&aTask);
}
for ( Standard_Integer i = 0; i < aNbThreads; ++i )
aThreads(i).Wait();
}
#endif
}
#endif