Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-06-07 02:17:31

0001 #include "Utilities/Testing/interface/CppUnit_testdriver.icpp"
0002 #include "cppunit/extensions/HelperMacros.h"
0003 
0004 #include "DataFormats/Common/interface/DetSetNew.h"
0005 #include "DataFormats/Common/interface/DetSetVectorNew.h"
0006 #include "DataFormats/Common/interface/DetSetAlgorithm.h"
0007 #include "DataFormats/Common/interface/DetSet2RangeMap.h"
0008 
0009 #include "FWCore/Utilities/interface/EDMException.h"
0010 
0011 #include <vector>
0012 #include <algorithm>
0013 
0014 #include <mutex>
0015 typedef std::mutex Mutex;
0016 // typedef std::lock_guard<std::mutex> Lock;
0017 typedef std::unique_lock<std::mutex> Lock;
0018 
0019 namespace global {
0020   // control cout....
0021   Mutex coutLock;
0022 }  // namespace global
0023 
0024 #include <iostream>
0025 #include <atomic>
0026 #include <thread>
0027 
0028 template <typename T>
0029 inline void spinlock(std::atomic<T> const& lock, T val) {
0030   while (lock.load(std::memory_order_acquire) != val) {
0031   }
0032 }
0033 
0034 template <typename T>
0035 inline void spinlockSleep(std::atomic<T> const& lock, T val) {
0036   while (lock.load(std::memory_order_acquire) != val) {
0037     nanosleep(0, 0);
0038   }
0039 }
0040 
0041 // syncronize all threads in a parallel section (for testing purposes)
0042 void sync(std::atomic<int>& all, int total) {
0043   ++all;
0044   spinlock(all, total);
0045 }
0046 
0047 namespace {
0048   unsigned int number_of_threads() {
0049     auto nThreads = std::thread::hardware_concurrency();
0050     return nThreads == 0 ? 1 : nThreads;
0051   }
0052 
0053   template <typename F>
0054   void parallel_run(F iFunc) {
0055     std::vector<std::thread> threads;
0056 
0057     auto nThreads = number_of_threads();
0058     for (unsigned int i = 0; i < nThreads; ++i) {
0059       threads.emplace_back([i, nThreads, iFunc] { iFunc(i, nThreads); });
0060     }
0061 
0062     for (auto& thread : threads) {
0063       thread.join();
0064     }
0065   }
0066 }  // namespace
0067 
0068 struct B {
0069   virtual ~B() {}
0070   virtual B* clone() const = 0;
0071 };
0072 
0073 struct T : public B {
0074   T(int iv = 0) : v(iv) {}
0075   int v;
0076   bool operator==(T t) const { return v == t.v; }
0077 
0078   virtual T* clone() const { return new T(*this); }
0079 };
0080 
0081 bool operator==(T const& t, B const& b) {
0082   T const* p = dynamic_cast<T const*>(&b);
0083   return p && p->v == t.v;
0084 }
0085 
0086 bool operator==(B const& b, T const& t) { return t == b; }
0087 
0088 typedef edmNew::DetSetVector<T> DSTV;
0089 typedef edmNew::DetSet<T> DST;
0090 typedef edmNew::det_id_type det_id_type;
0091 typedef DSTV::FastFiller FF;
0092 typedef DSTV::TSFastFiller TSFF;
0093 
0094 class TestDetSet : public CppUnit::TestFixture {
0095   CPPUNIT_TEST_SUITE(TestDetSet);
0096   CPPUNIT_TEST(infrastructure);
0097   CPPUNIT_TEST(fillSeq);
0098   CPPUNIT_TEST(fillPar);
0099 
0100   CPPUNIT_TEST_SUITE_END();
0101 
0102 public:
0103   TestDetSet();
0104   ~TestDetSet() {}
0105   void setUp() {}
0106   void tearDown() {}
0107 
0108   void infrastructure();
0109   void fillSeq();
0110   void fillPar();
0111 
0112 public:
0113   int nth = 1;
0114   std::vector<DSTV::data_type> sv;
0115 };
0116 
0117 CPPUNIT_TEST_SUITE_REGISTRATION(TestDetSet);
0118 
0119 TestDetSet::TestDetSet() : sv(10) {
0120   DSTV::data_type v[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
0121   std::copy(v, v + 10, sv.begin());
0122   nth = number_of_threads();
0123 }
0124 
0125 void read(DSTV const& detsets, bool all = false) {
0126   for (auto di = detsets.begin(false); di != detsets.end(false); ++di) {
0127     auto ds = *di;
0128     auto id = ds.id();
0129     std::cout << id << ' ';
0130     if (ds.isValid()) {
0131       CPPUNIT_ASSERT(ds[0] == 100 * (id - 20) + 3);
0132       CPPUNIT_ASSERT(ds[1] == -(100 * (id - 20) + 3));
0133     }
0134   }
0135   std::cout << std::endl;
0136 }
0137 
0138 void TestDetSet::infrastructure() {
0139   std::cout << std::endl;
0140   for (int i = 0; i < 10; i++) {
0141     int a = 0;
0142     std::atomic<int> b(0);
0143     std::atomic<int> lock(0);
0144     std::atomic<int> nt(number_of_threads());
0145     parallel_run([&a, &b, &lock, &nt](unsigned int, unsigned int) {
0146       sync(lock, nt);
0147       a++;
0148       b.fetch_add(1, std::memory_order_acq_rel);
0149     });
0150 
0151     if (i == 5)
0152       std::cout << "threads " << lock << " " << a << ' ' << b << std::endl;
0153     CPPUNIT_ASSERT(b == nt);
0154     a = 0;
0155     b = 0;
0156 
0157     parallel_run([&a, &b](unsigned int, unsigned int) {
0158       a++;
0159       b.fetch_add(1, std::memory_order_acq_rel);
0160     });
0161     if (i == 5)
0162       std::cout << "threads " << lock << " " << a << ' ' << b << std::endl;
0163 
0164     nth = nt;
0165   }
0166 }
0167 
0168 void TestDetSet::fillSeq() {
0169   std::cout << std::endl;
0170 
0171   DSTV detsets(2);
0172   // unsigned int ntot=0;
0173 
0174   std::atomic<int> lock(0);
0175   std::atomic<int> idet(0);
0176   std::atomic<int> trial(0);
0177   int maxDet = 100 * nth;
0178   parallel_run([&lock, &idet, &trial, &detsets, maxDet](unsigned int, unsigned int numberOfThreads) {
0179     sync(lock, numberOfThreads);
0180     while (true) {
0181       int ldet = idet;
0182       if (!(ldet < maxDet))
0183         break;
0184       while (!idet.compare_exchange_weak(ldet, ldet + 1))
0185         ;
0186       if (ldet >= maxDet)
0187         break;
0188       unsigned int id = 20 + ldet;
0189       bool done = false;
0190       while (!done) {
0191         try {
0192           {
0193             FF ff(detsets, id);  // serialize
0194             ff.push_back(100 * ldet + 3);
0195             CPPUNIT_ASSERT(detsets.m_data.back().v == (100 * ldet + 3));
0196             ff.push_back(-(100 * ldet + 3));
0197             CPPUNIT_ASSERT(detsets.m_data.back().v == -(100 * ldet + 3));
0198           }
0199           // read(detsets);  // cannot read in parallel while filling in this case
0200           done = true;
0201         } catch (edm::Exception const&) {
0202           trial++;
0203           //read(detsets);
0204         }
0205       }
0206     }
0207     // read(detsets);
0208   });
0209 
0210   std::cout << idet << ' ' << detsets.size() << std::endl;
0211   read(detsets, true);
0212   CPPUNIT_ASSERT(int(detsets.size()) == maxDet);
0213   std::cout << "trials " << trial << std::endl;
0214 }
0215 
0216 struct Getter final : public DSTV::Getter {
0217   Getter(TestDetSet* itest) : ntot(0), test(*itest) {}
0218 
0219   void fill(TSFF& ff) const override {
0220     int n = ff.id() - 20;
0221     CPPUNIT_ASSERT(n >= 0);
0222     CPPUNIT_ASSERT(ff.size() == 0);
0223     ff.push_back((100 * n + 3));
0224     CPPUNIT_ASSERT(ff.size() == 1);
0225     CPPUNIT_ASSERT(ff[0] == 100 * n + 3);
0226     ff.push_back(-(100 * n + 3));
0227     CPPUNIT_ASSERT(ff.size() == 2);
0228     CPPUNIT_ASSERT(ff[1] == -(100 * n + 3));
0229     ntot.fetch_add(1, std::memory_order_acq_rel);
0230   }
0231 
0232   mutable std::atomic<unsigned int> ntot;
0233   TestDetSet& test;
0234 };
0235 
0236 void TestDetSet::fillPar() {
0237   std::cout << std::endl;
0238   auto pg = std::make_shared<Getter>(this);
0239   Getter& g = *pg;
0240   int maxDet = 100 * nth;
0241   std::vector<unsigned int> v(maxDet);
0242   int k = 20;
0243   for (auto& i : v)
0244     i = k++;
0245   DSTV detsets(pg, v, 2);
0246   detsets.reserve(maxDet, 100 * maxDet);
0247   CPPUNIT_ASSERT(g.ntot == 0);
0248   CPPUNIT_ASSERT(detsets.onDemand());
0249   CPPUNIT_ASSERT(maxDet == int(detsets.size()));
0250 
0251   std::atomic<int> lock(0);
0252   std::atomic<int> idet(0);
0253 
0254   std::atomic<int> count(0);
0255 
0256   DST df31 = detsets[31];
0257 
0258   std::cout << "start parallel section" << std::endl;
0259   parallel_run([&lock, &detsets, &idet, maxDet, &g](unsigned int threadNumber, unsigned int numberOfThreads) {
0260     sync(lock, numberOfThreads);
0261     if (threadNumber % 2 == 0) {
0262       DST df = detsets[25];  // everybody!
0263       CPPUNIT_ASSERT(df.id() == 25);
0264       CPPUNIT_ASSERT(df.size() == 2);
0265       CPPUNIT_ASSERT(df[0] == 100 * (25 - 20) + 3);
0266       CPPUNIT_ASSERT(df[1] == -(100 * (25 - 20) + 3));
0267     }
0268     while (true) {
0269       if (threadNumber == 0)
0270         read(detsets);
0271       int ldet = idet.load(std::memory_order_acquire);
0272       if (!(ldet < maxDet))
0273         break;
0274       while (!idet.compare_exchange_weak(ldet, ldet + 1, std::memory_order_acq_rel))
0275         ;
0276       if (ldet >= maxDet)
0277         break;
0278       unsigned int id = 20 + ldet;
0279       {
0280         DST df = *detsets.find(id, true);
0281         CPPUNIT_ASSERT(int(g.ntot) > 0);
0282         assert(df.id() == id);
0283         assert(df.size() == 2);
0284         assert(df[0] == 100 * (id - 20) + 3);
0285         assert(df[1] == -(100 * (id - 20) + 3));
0286       }
0287       if (threadNumber == 1)
0288         read(detsets);
0289     }
0290   });
0291   std::cout << "end parallel section" << std::endl;
0292 
0293   CPPUNIT_ASSERT(df31.id() == 31);
0294   CPPUNIT_ASSERT(df31.size() == 2);
0295   CPPUNIT_ASSERT(df31[0] == 100 * (31 - 20) + 3);
0296   CPPUNIT_ASSERT(df31[1] == -(100 * (31 - 20) + 3));
0297 
0298   std::cout << "summary " << idet << ' ' << detsets.size() << ' ' << g.ntot << ' ' << count << std::endl;
0299   read(detsets, true);
0300   CPPUNIT_ASSERT(int(g.ntot) == maxDet);
0301   CPPUNIT_ASSERT(int(detsets.size()) == maxDet);
0302 }