物理の駅 Physics station by 現役研究者

テクノロジーは共有されてこそ栄える

ThreadPoolの引数を取得する方法と、処理順序を制御できるcondition_variableの使い方

CPUは複数あるので並列化できるが、出力は並列にする意味がない。また、CPUを使った処理は順不同で良いが、出力は順に行いたい。こういう目的のためには、ThreadPoolと、std::condition_variableを使う。

以下の例では、友人が作ってくれた ThreadPool を使っている。使いたい場合は彼に問い合わせて欲しい。CheckSum.hpp phst::base::checksum も独自である。単にハッシュ値 CRC32を計算して、一致または不一致を確認している。

重い処理として、OpenCV内の関数の、画像のエンコード cv::imencodeを使っている。

std::futureget() は実行した時点で std::move するので、複数回呼ぶことはできない。 const std::vector<uchar>& buf = p.get(); の部分

#include <random>

#include <thread>
#include <future>
#include <opencv2/opencv.hpp>
#include <ADAPT/CUF/ThreadPool.h>
#include <CheckSum.hpp>

using namespace std;

TEST(TestStd, TestMyThreadPool) {

    //4Mピクセルの画像を50枚作って乱数を詰める
    int width = 2048;
    int height = 2048;
    std::uniform_int_distribution<> irand = std::uniform_int_distribution<>(0, 255);
    std::mt19937_64 mt;

    std::vector<cv::Mat> vmat;
    for (int j = 0; j < 50; j++) {
        cv::Mat mat = cv::Mat::zeros(cv::Size(width, height), CV_8UC1);
        for (int i = 0; i < width * height; i++) {
            mat.data[i] = irand(mt);
        }
        vmat.push_back(mat);
    }

    //シリアル実行
    {
        std::string filepath = "test1.png";
        std::ofstream ofs;
        ofs.exceptions(std::ios::failbit | std::ios::badbit);
        ofs.open(filepath, std::ios::binary);

        for (int i = 0; i < vmat.size(); i++) {
            std::vector<uchar> buf;
            cv::imencode(".png", vmat[i], buf);
            ofs.write((char*)buf.data(), buf.size()); //順番を守りたい
        }
        ofs.close();
    }

    //ThreadPoolを使ってエンコード部分だけ並列化
    {
        std::string filepath = "test2.png";
        std::vector < std::future<std::vector<uchar>>> vfuture;

        adapt::ThreadPool pool(6);

        for (int i = 0; i < vmat.size(); i++) {

            std::future f = pool.AddTask([](const cv::Mat& mat) {
                std::vector<uchar> buf;
                cv::imencode(".png", mat, buf);
                return buf;
                }, std::reference_wrapper(vmat[i]));
            vfuture.push_back(std::move(f));
        }
        pool.Join();
        std::ofstream ofs;
        ofs.exceptions(std::ios::failbit | std::ios::badbit);
        ofs.open(filepath, std::ios::binary);

        for (auto& p : vfuture) {
            const std::vector<uchar>& buf = p.get();
            ofs.write((char*)buf.data(), buf.size()); //順番を守りたい
        }

        ofs.close();
    }

    //ThreadPoolとcondition_variableを使って全体を並列化
    {
        std::string filepath = "test3.png";

        adapt::ThreadPool pool(6);

        std::ofstream ofs;
        ofs.exceptions(std::ios::failbit | std::ios::badbit);
        ofs.open(filepath, std::ios::binary);

        std::mutex mtx;             //xの管理用mutex
        std::condition_variable cv; //処理順序制御用
        int x = 0;                    //処理すべき関数の番号

        for (int i = 0; i < vmat.size(); i++) {

            std::future f = pool.AddTask([](int n, 
                    const cv::Mat& mat, std::ofstream& ofs, 
                    std::mutex& mtx, std::condition_variable& cv, int& x) {
                std::vector<uchar> buf;
                cv::imencode(".png", mat, buf);

                std::unique_lock<std::mutex> ul(mtx); //mutexをロック
                while (x != n) { cv.wait(ul); }       //他スレッドからnotifyが来るまで待機
                ofs.write((char*)buf.data(), buf.size());  //順番を守りたい
                x++;                                //次の処理へ
                cv.notify_all();                        //状態の変化を他のスレッドへ通知
                },
                i, std::reference_wrapper(vmat[i]), std::reference_wrapper(ofs),
                    std::reference_wrapper(mtx), std::reference_wrapper(cv), std::reference_wrapper(x));
        }
        pool.Join();

        ofs.close();
    }

    //ThreadPoolの処理は順不同であることの確認
    {
        std::string filepath = "test4.png";

        adapt::ThreadPool pool(6);

        std::ofstream ofs;
        ofs.exceptions(std::ios::failbit | std::ios::badbit);
        ofs.open(filepath, std::ios::binary);

        for (int i = 0; i < vmat.size(); i++) {

            std::future f = pool.AddTask([](const cv::Mat& mat, std::ofstream& ofs) {
                std::vector<uchar> buf;
                cv::imencode(".png", mat, buf);
                ofs.write((char*)buf.data(), buf.size());  //順番を守りたい
                },
                std::reference_wrapper(vmat[i]), std::reference_wrapper(ofs));
        }
        pool.Join();

        ofs.close();
    }

    EXPECT_EQ(phst::base::checksum("test1.png"), phst::base::checksum("test2.png"));
    EXPECT_EQ(phst::base::checksum("test1.png"), phst::base::checksum("test3.png"));
    EXPECT_NE(phst::base::checksum("test1.png"), phst::base::checksum("test4.png"));

}

スレッドプール