Macros

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
//
//  SerialTaskQueue_test.cpp
//  DispatchProcessingDemo
//
//  Created by Chris Jones on 9/27/11.
//

#include <iostream>
#define CATCH_CONFIG_MAIN

#include <catch.hpp>
#include <chrono>
#include <memory>
#include <atomic>
#include <thread>
#include "oneapi/tbb/task_arena.h"
#include "FWCore/Concurrency/interface/WaitingTask.h"
#include "FWCore/Concurrency/interface/FinalWaitingTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Concurrency/interface/SerialTaskQueue.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"

using namespace std::chrono_literals;

TEST_CASE("SerialTaskQueue", "[SerialTaskQueue]") {
  SECTION("push") {
    std::atomic<unsigned int> count{0};
    edm::SerialTaskQueue queue;
    {
      oneapi::tbb::task_group group;
      edm::FinalWaitingTask lastTask(group);
      {
        edm::WaitingTaskHolder waitingTask(group, &lastTask);
        queue.push(group, [&count, waitingTask] {
          REQUIRE(count++ == 0u);
          std::this_thread::sleep_for(10us);
        });
        queue.push(group, [&count, waitingTask] {
          REQUIRE(count++ == 1u);
          std::this_thread::sleep_for(10us);
        });
        queue.push(group, [&count, waitingTask] {
          REQUIRE(count++ == 2u);
          std::this_thread::sleep_for(10us);
        });
      }
      lastTask.wait();
      REQUIRE(count == 3u);
    }
  }

  SECTION("pause") {
    std::atomic<unsigned int> count{0};
    edm::SerialTaskQueue queue;
    {
      queue.pause();
      {
        oneapi::tbb::task_group group;
        edm::FinalWaitingTask lastTask(group);

        queue.push(group, [&count, waitingTask = edm::WaitingTaskHolder(group, &lastTask)] { REQUIRE(count++ == 0u); });
        std::this_thread::sleep_for(1000us);
        REQUIRE(count == 0u);
        queue.resume();
        lastTask.wait();
        REQUIRE(count == 1u);
      }
      {
        oneapi::tbb::task_group group;
        edm::FinalWaitingTask lastTask(group);
        {
          edm::WaitingTaskHolder waitingTask(group, &lastTask);
          queue.push(group, [&count, &queue, waitingTask] {
            queue.pause();
            REQUIRE(count++ == 1u);
          });
          queue.push(group, [&count, waitingTask] { REQUIRE(count++ == 2u); });
          queue.push(group, [&count, waitingTask] { REQUIRE(count++ == 3u); });
        }
        std::this_thread::sleep_for(100us);
        REQUIRE(2u >= count);
        queue.resume();
        lastTask.wait();
        REQUIRE(count == 4u);
      }
    }
  }

  SECTION("stress test") {
    oneapi::tbb::task_group group;
    edm::SerialTaskQueue queue;
    unsigned int index = 100;
    const unsigned int nTasks = 1000;
    while (0 != --index) {
      edm::FinalWaitingTask lastTask(group);
      std::atomic<unsigned int> count{0};
      std::atomic<bool> waitToStart{true};
      {
        edm::WaitingTaskHolder waitingTask(group, &lastTask);
        group.run([&queue, &waitToStart, waitingTask, &count, &group] {
          while (waitToStart.load()) {
          }
          for (unsigned int i = 0; i < nTasks; ++i) {
            queue.push(group, [&count, waitingTask] { ++count; });
          }
        });
        group.run([&queue, &waitToStart, waitingTask, &count, &group] {
          waitToStart = false;
          for (unsigned int i = 0; i < nTasks; ++i) {
            queue.push(group, [&count, waitingTask] { ++count; });
          }
        });
      }
      lastTask.wait();
      REQUIRE(2 * nTasks == count);
    }
  }
}