CPUは複数あるので並列化できるが、出力は並列にする意味がない。また、CPUを使った処理は順不同で良いが、出力は順に行いたい。こういう目的のためには、ThreadPoolと、std::condition_variable
を使う。
以下の例では、友人が作ってくれた ThreadPool
を使っている。使いたい場合は彼に問い合わせて欲しい。CheckSum.hpp
phst::base::checksum
も独自である。単にハッシュ値 CRC32を計算して、一致または不一致を確認している。
重い処理として、OpenCV内の関数の、画像のエンコード cv::imencode
を使っている。
std::future
の get()
は実行した時点で std::move
するので、複数回呼ぶことはできない。 const std::vector<uchar>& buf = p.get();
の部分
#include <random> #include <thread> #include <future> #include <opencv2/opencv.hpp> #include <ADAPT/CUF/ThreadPool.h> #include <CheckSum.hpp> using namespace std; TEST(TestStd, TestMyThreadPool) { //4Mピクセルの画像を50枚作って乱数を詰める int width = 2048; int height = 2048; std::uniform_int_distribution<> irand = std::uniform_int_distribution<>(0, 255); std::mt19937_64 mt; std::vector<cv::Mat> vmat; for (int j = 0; j < 50; j++) { cv::Mat mat = cv::Mat::zeros(cv::Size(width, height), CV_8UC1); for (int i = 0; i < width * height; i++) { mat.data[i] = irand(mt); } vmat.push_back(mat); } //シリアル実行 { std::string filepath = "test1.png"; std::ofstream ofs; ofs.exceptions(std::ios::failbit | std::ios::badbit); ofs.open(filepath, std::ios::binary); for (int i = 0; i < vmat.size(); i++) { std::vector<uchar> buf; cv::imencode(".png", vmat[i], buf); ofs.write((char*)buf.data(), buf.size()); //順番を守りたい } ofs.close(); } //ThreadPoolを使ってエンコード部分だけ並列化 { std::string filepath = "test2.png"; std::vector < std::future<std::vector<uchar>>> vfuture; adapt::ThreadPool pool(6); for (int i = 0; i < vmat.size(); i++) { std::future f = pool.AddTask([](const cv::Mat& mat) { std::vector<uchar> buf; cv::imencode(".png", mat, buf); return buf; }, std::reference_wrapper(vmat[i])); vfuture.push_back(std::move(f)); } pool.Join(); std::ofstream ofs; ofs.exceptions(std::ios::failbit | std::ios::badbit); ofs.open(filepath, std::ios::binary); for (auto& p : vfuture) { const std::vector<uchar>& buf = p.get(); ofs.write((char*)buf.data(), buf.size()); //順番を守りたい } ofs.close(); } //ThreadPoolとcondition_variableを使って全体を並列化 { std::string filepath = "test3.png"; adapt::ThreadPool pool(6); std::ofstream ofs; ofs.exceptions(std::ios::failbit | std::ios::badbit); ofs.open(filepath, std::ios::binary); std::mutex mtx; //xの管理用mutex std::condition_variable cv; //処理順序制御用 int x = 0; //処理すべき関数の番号 for (int i = 0; i < vmat.size(); i++) { std::future f = pool.AddTask([](int n, const cv::Mat& mat, std::ofstream& ofs, std::mutex& mtx, std::condition_variable& cv, int& x) { std::vector<uchar> buf; cv::imencode(".png", mat, buf); std::unique_lock<std::mutex> ul(mtx); //mutexをロック while (x != n) { cv.wait(ul); } //他スレッドからnotifyが来るまで待機 ofs.write((char*)buf.data(), buf.size()); //順番を守りたい x++; //次の処理へ cv.notify_all(); //状態の変化を他のスレッドへ通知 }, i, std::reference_wrapper(vmat[i]), std::reference_wrapper(ofs), std::reference_wrapper(mtx), std::reference_wrapper(cv), std::reference_wrapper(x)); } pool.Join(); ofs.close(); } //ThreadPoolの処理は順不同であることの確認 { std::string filepath = "test4.png"; adapt::ThreadPool pool(6); std::ofstream ofs; ofs.exceptions(std::ios::failbit | std::ios::badbit); ofs.open(filepath, std::ios::binary); for (int i = 0; i < vmat.size(); i++) { std::future f = pool.AddTask([](const cv::Mat& mat, std::ofstream& ofs) { std::vector<uchar> buf; cv::imencode(".png", mat, buf); ofs.write((char*)buf.data(), buf.size()); //順番を守りたい }, std::reference_wrapper(vmat[i]), std::reference_wrapper(ofs)); } pool.Join(); ofs.close(); } EXPECT_EQ(phst::base::checksum("test1.png"), phst::base::checksum("test2.png")); EXPECT_EQ(phst::base::checksum("test1.png"), phst::base::checksum("test3.png")); EXPECT_NE(phst::base::checksum("test1.png"), phst::base::checksum("test4.png")); }
スレッドプール