Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert OpenMP parallelization to OneAPI::TBB #6626

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
46e7d63
Switched from using OpenMP for parallelism to using oneAPI::TBB (thou…
dbs4261 Jan 26, 2024
0c36784
Swichted usage of std::mutex to tbb::mutex for consistency when used …
dbs4261 Jan 26, 2024
1514fc1
Fixed bug in CPU reduction
dbs4261 Jan 27, 2024
31f1e25
Shift atomic to outside of RW mutex in PointCloudSegmentation.cpp
dbs4261 Jan 27, 2024
d7a82f1
Switch from using a mutex to a concurrent vector for parallelization …
dbs4261 Jan 27, 2024
181a431
Get maximum threads from TBB instead of OpenMP
dbs4261 Jan 27, 2024
2312299
Updates to ProgressBar.h/.cpp Including separating code for writing t…
dbs4261 Jan 27, 2024
bd258e3
Updated ClusterDBSCAN in PointCloudCluster.cpp to bulk update the pro…
dbs4261 Jan 27, 2024
2789817
Applied Open3D style
dbs4261 Jan 27, 2024
6399001
Marked single argument constructors for reductions as explicit to ple…
dbs4261 Feb 12, 2024
11c546d
Explicitly load atomics in calls to utility::Log*(...) because newer …
dbs4261 Feb 12, 2024
f48abcc
style fix
ssheorey Apr 17, 2024
c04077f
Fixed incorrect include in Parallel.h
dbs4261 Sep 11, 2024
7d6c707
Updated ProgressBar.h to use size_t in std namespace for consistency
dbs4261 Sep 11, 2024
bba01ef
Switched std::lock_guard<std::mutex> for DiscreteGenerator to use tbb…
dbs4261 Sep 11, 2024
ec98cae
Switch to using tbb::parallel_reduce on all platforms.
dbs4261 Sep 11, 2024
dbe4133
Fixed progress bar bug that allow for multiple threads writing to the…
dbs4261 Sep 20, 2024
25d8e7f
Fixed issue with CPU reduction. Need to exit early if no work is to b…
dbs4261 Sep 20, 2024
929f010
Updated MemoryManagerStatistic to use a tbb::spin_mutex instead of a …
dbs4261 Sep 20, 2024
31484e5
Updated point cloud segmentation to generate random samples using the…
dbs4261 Sep 20, 2024
81b9bb3
Updated example usage of global mutex and engine access to reflect th…
dbs4261 Sep 23, 2024
23ce717
Merge branch 'main' into omp2tbb
dbs4261 Sep 23, 2024
86a474d
Turn TBB into publicly linked library
dbs4261 Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions 3rdparty/find_dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ if(OPEN3D_USE_ONEAPI_PACKAGES)
PACKAGE TBB
TARGETS TBB::tbb
)
list(APPEND Open3D_3RDPARTY_PRIVATE_TARGETS_FROM_SYSTEM Open3D::3rdparty_tbb)
list(APPEND Open3D_3RDPARTY_PUBLIC_TARGETS_FROM_SYSTEM Open3D::3rdparty_tbb)

else(OPEN3D_USE_ONEAPI_PACKAGES)
# MKL/BLAS
Expand Down Expand Up @@ -1687,7 +1687,7 @@ else(OPEN3D_USE_ONEAPI_PACKAGES)
INCLUDE_DIRS ${STATIC_MKL_INCLUDE_DIR}
LIB_DIR ${STATIC_MKL_LIB_DIR}
LIBRARIES ${STATIC_MKL_LIBRARIES}
DEPENDS ext_tbb ext_mkl_include ext_mkl
DEPENDS Open3D::3rdparty_tbb ext_mkl_include ext_mkl
)
if(UNIX)
target_compile_options(3rdparty_blas INTERFACE "$<$<COMPILE_LANGUAGE:CXX>:-m64>")
Expand All @@ -1709,9 +1709,13 @@ else(OPEN3D_USE_ONEAPI_PACKAGES)
endif()
if(NOT USE_SYSTEM_TBB)
include(${Open3D_3RDPARTY_DIR}/mkl/tbb.cmake)
list(APPEND Open3D_3RDPARTY_PRIVATE_TARGETS_FROM_CUSTOM TBB::tbb)
add_library(3rdparty_tbb INTERFACE)
add_library(Open3D::3rdparty_tbb ALIAS 3rdparty_tbb)
target_link_libraries(3rdparty_tbb INTERFACE TBB::tbb)
install(TARGETS 3rdparty_tbb EXPORT ${PROJECT_NAME}Targets)
list(APPEND Open3D_3RDPARTY_PUBLIC_TARGETS_FROM_CUSTOM Open3D::3rdparty_tbb)
else()
list(APPEND Open3D_3RDPARTY_PRIVATE_TARGETS_FROM_SYSTEM Open3D::3rdparty_tbb)
list(APPEND Open3D_3RDPARTY_PUBLIC_TARGETS_FROM_SYSTEM Open3D::3rdparty_tbb)
endif()

endif(OPEN3D_USE_ONEAPI_PACKAGES)
Expand Down
125 changes: 88 additions & 37 deletions cpp/apps/OfflineReconstruction/LegacyReconstructionUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#pragma once

#include <json/json.h>
#include <tbb/parallel_for.h>
#include <tbb/parallel_reduce.h>

#include "DebugUtil.h"
#include "FileSystemUtil.h"
Expand Down Expand Up @@ -547,12 +549,17 @@ class ReconstructionPipeline {

const size_t num_pairs = fragment_matching_results.size();
if (config_["multi_threading"].asBool()) {
#pragma omp parallel for num_threads(utility::EstimateMaxThreads())
for (int i = 0; i < (int)num_pairs; i++) {
RegisterFragmentPair(ply_files, fragment_matching_results[i].s_,
fragment_matching_results[i].t_,
fragment_matching_results[i]);
}
tbb::parallel_for(
tbb::blocked_range<std::size_t>(0, num_pairs),
[&](const tbb::blocked_range<std::size_t>& range) {
for (std::size_t i = range.begin(); i < range.end();
++i) {
RegisterFragmentPair(
ply_files, fragment_matching_results[i].s_,
fragment_matching_results[i].t_,
fragment_matching_results[i]);
}
});
} else {
for (size_t i = 0; i < num_pairs; i++) {
RegisterFragmentPair(ply_files, fragment_matching_results[i].s_,
Expand Down Expand Up @@ -635,13 +642,18 @@ class ReconstructionPipeline {
}

if (config_["multi_threading"].asBool()) {
#pragma omp parallel for num_threads(utility::EstimateMaxThreads())
for (int i = 0; i < (int)fragment_matching_results.size(); i++) {
const int s = fragment_matching_results[i].s_;
const int t = fragment_matching_results[i].t_;
RefineFragmentPair(ply_files, s, t,
fragment_matching_results[i]);
}
tbb::parallel_for(
tbb::blocked_range<std::size_t>(
0, fragment_matching_results.size()),
[&](const tbb::blocked_range<std::size_t>& range) {
for (std::size_t i = range.begin(); i < range.end();
++i) {
const int s = fragment_matching_results[i].s_;
const int t = fragment_matching_results[i].t_;
RefineFragmentPair(ply_files, s, t,
fragment_matching_results[i]);
}
});
} else {
for (size_t i = 0; i < fragment_matching_results.size(); i++) {
const int s = fragment_matching_results[i].s_;
Expand Down Expand Up @@ -722,37 +734,76 @@ class ReconstructionPipeline {
matched_result.information_ = std::get<1>(result);
}

struct PointCloudIntegrator {
// Globals
ReconstructionPipeline& pipeline;
const int fragment_id;
const std::vector<std::string>& color_files;
const std::vector<std::string>& depth_files;
const camera::PinholeCameraIntrinsic& intrinsic;
using PoseGraphT = pipelines::registration::PoseGraph;
const PoseGraphT& pose_graph;
// Locals
geometry::PointCloud fragment;

PointCloudIntegrator(ReconstructionPipeline& pipeline_,
const int fragment_id_,
const std::vector<std::string>& color_files_,
const std::vector<std::string>& depth_files_,
const camera::PinholeCameraIntrinsic& intrinsic_,
const PoseGraphT& pose_graph_)
: pipeline(pipeline_),
fragment_id(fragment_id_),
color_files(color_files_),
depth_files(depth_files_),
intrinsic(intrinsic_),
pose_graph(pose_graph_) {}

PointCloudIntegrator(PointCloudIntegrator& o, tbb::split)
: pipeline(o.pipeline),
fragment_id(o.fragment_id),
color_files(o.color_files),
depth_files(o.depth_files),
intrinsic(o.intrinsic),
pose_graph(o.pose_graph) {}

void operator()(const tbb::blocked_range<std::size_t>& range) {
for (std::size_t i = range.begin(); i < range.end(); ++i) {
const int i_abs =
fragment_id * pipeline.config_["n_frames_per_fragment"]
.asInt() +
i;
utility::LogInfo(
"Fragment {:03d} / {:03d} :: "
"Integrate rgbd frame {:d} ({:d} of {:d}).",
fragment_id, pipeline.n_fragments_ - 1, i_abs, i + 1,
pose_graph.nodes_.size());
const geometry::RGBDImage rgbd = pipeline.ReadRGBDImage(
color_files[i_abs], depth_files[i_abs], false);
auto pcd = geometry::PointCloud::CreateFromRGBDImage(
rgbd, intrinsic, Eigen::Matrix4d::Identity(), true);
pcd->Transform(pose_graph.nodes_[i].pose_);
}
}

void join(PointCloudIntegrator& rhs) { fragment += rhs.fragment; }
};

void IntegrateFragmentRGBD(
int fragment_id,
const std::vector<std::string>& color_files,
const std::vector<std::string>& depth_files,
const camera::PinholeCameraIntrinsic& intrinsic,
const pipelines::registration::PoseGraph& pose_graph) {
geometry::PointCloud fragment;
const size_t graph_num = pose_graph.nodes_.size();

#pragma omp parallel for schedule(static) \
num_threads(utility::EstimateMaxThreads())
for (int i = 0; i < int(graph_num); ++i) {
const int i_abs =
fragment_id * config_["n_frames_per_fragment"].asInt() + i;
utility::LogInfo(
"Fragment {:03d} / {:03d} :: Integrate rgbd frame {:d} "
"({:d} "
"of "
"{:d}).",
fragment_id, n_fragments_ - 1, i_abs, i + 1, graph_num);
const geometry::RGBDImage rgbd = ReadRGBDImage(
color_files[i_abs], depth_files[i_abs], false);
auto pcd = geometry::PointCloud::CreateFromRGBDImage(
rgbd, intrinsic, Eigen::Matrix4d::Identity(), true);
pcd->Transform(pose_graph.nodes_[i].pose_);
#pragma omp critical
{ fragment += *pcd; }
}

const geometry::PointCloud fragment_down = *fragment.VoxelDownSample(
config_["tsdf_cubic_size"].asDouble() / 512.0);
PointCloudIntegrator reducer(*this, fragment_id, color_files,
depth_files, intrinsic, pose_graph);
tbb::parallel_reduce(tbb::blocked_range<std::size_t>(0, graph_num),
reducer);

const geometry::PointCloud fragment_down =
*reducer.fragment.VoxelDownSample(
config_["tsdf_cubic_size"].asDouble() / 512.0);
io::WritePointCloud(
utility::filesystem::JoinPath(
config_["path_dataset"].asString(),
Expand Down
6 changes: 3 additions & 3 deletions cpp/open3d/core/MemoryManagerStatistic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ bool MemoryManagerStatistic::HasLeaks() const {
void MemoryManagerStatistic::CountMalloc(void* ptr,
size_t byte_size,
const Device& device) {
std::lock_guard<std::mutex> lock(statistics_mutex_);
tbb::spin_mutex::scoped_lock lock(statistics_mutex_);

// Filter nullptr. Empty allocations are not tracked.
if (ptr == nullptr && byte_size == 0) {
Expand All @@ -141,7 +141,7 @@ void MemoryManagerStatistic::CountMalloc(void* ptr,
}

void MemoryManagerStatistic::CountFree(void* ptr, const Device& device) {
std::lock_guard<std::mutex> lock(statistics_mutex_);
tbb::spin_mutex::scoped_lock lock(statistics_mutex_);

// Filter nullptr. Empty allocations are not tracked.
if (ptr == nullptr) {
Expand Down Expand Up @@ -170,7 +170,7 @@ void MemoryManagerStatistic::CountFree(void* ptr, const Device& device) {
}

void MemoryManagerStatistic::Reset() {
std::lock_guard<std::mutex> lock(statistics_mutex_);
tbb::spin_mutex::scoped_lock lock(statistics_mutex_);
statistics_.clear();
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/open3d/core/MemoryManagerStatistic.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstddef>
#include <map>
#include <mutex>
#include <tbb/spin_mutex.h>
#include <unordered_map>

#include "open3d/core/Device.h"
Expand Down Expand Up @@ -86,7 +87,7 @@ class MemoryManagerStatistic {
/// Print at each malloc and free, disabled by default.
bool print_at_malloc_free_ = false;

std::mutex statistics_mutex_;
tbb::spin_mutex statistics_mutex_;
std::map<Device, MemoryStatistics> statistics_;
};

Expand Down
13 changes: 9 additions & 4 deletions cpp/open3d/core/ParallelFor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <cuda_runtime.h>

#include "open3d/core/CUDAUtils.h"
#else
#include <tbb/parallel_for.h>
#endif

namespace open3d {
Expand Down Expand Up @@ -79,10 +81,13 @@ void ParallelForCPU_(const Device& device, int64_t n, const func_t& func) {
return;
}

#pragma omp parallel for num_threads(utility::EstimateMaxThreads())
for (int64_t i = 0; i < n; ++i) {
func(i);
}
tbb::parallel_for(tbb::blocked_range<int64_t>(0, n, 32),
[&func](const tbb::blocked_range<int64_t>& range) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many threads will be used here? Currently, it's estimated with utility::EstimateMaxThreads() which gives us one thread per core (excluding hyperthreading).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, avoid using "magic numbers" (32). I think you have a GetDefaultChunkSize() function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will use up to the number of threads in task arena that called it. As for the chunk size, see my other comment.

for (int64_t i = range.begin(); i < range.end();
++i) {
func(i);
}
});
}

#endif
Expand Down
14 changes: 10 additions & 4 deletions cpp/open3d/core/hashmap/CPU/CPUHashBackendBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
// SPDX-License-Identifier: MIT
// ----------------------------------------------------------------------------

#include <tbb/parallel_for.h>

#include "open3d/core/hashmap/HashBackendBuffer.h"
#include "open3d/utility/Parallel.h"

Expand All @@ -14,10 +16,14 @@ void CPUResetHeap(Tensor& heap) {
uint32_t* heap_ptr = heap.GetDataPtr<uint32_t>();
int64_t capacity = heap.GetLength();

#pragma omp parallel for num_threads(utility::EstimateMaxThreads())
for (int64_t i = 0; i < capacity; ++i) {
heap_ptr[i] = i;
}
tbb::parallel_for(tbb::blocked_range<int64_t>(
0, capacity, utility::DefaultGrainSizeTBB()),
[&](const tbb::blocked_range<int64_t>& range) {
for (int64_t i = range.begin(); i < range.end();
++i) {
heap_ptr[i] = i;
}
});
};
} // namespace core
} // namespace open3d
Loading
Loading