Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 12:53:05

0001 #include "Utilities/Testing/interface/CppUnit_testdriver.icpp"
0002 #include "cppunit/extensions/HelperMacros.h"
0003 
0004 #include "DataFormats/Common/interface/DetSetNew.h"
0005 #include "DataFormats/Common/interface/DetSetVectorNew.h"
0006 #include "DataFormats/Common/interface/DetSetAlgorithm.h"
0007 #include "DataFormats/Common/interface/DetSet2RangeMap.h"
0008 
0009 #include "FWCore/Utilities/interface/EDMException.h"
0010 
0011 #include <vector>
0012 #include <algorithm>
0013 
0014 #include <mutex>
0015 typedef std::mutex Mutex;
0016 // typedef std::lock_guard<std::mutex> Lock;
0017 typedef std::unique_lock<std::mutex> Lock;
0018 
0019 namespace global {
0020   // control cout....
0021   Mutex coutLock;
0022 }  // namespace global
0023 
0024 #include <iostream>
0025 #include <atomic>
0026 #include <thread>
0027 
0028 template <typename T>
0029 inline void spinlock(std::atomic<T> const& lock, T val) {
0030   while (lock.load(std::memory_order_acquire) != val) {
0031   }
0032 }
0033 
0034 template <typename T>
0035 inline void spinlockSleep(std::atomic<T> const& lock, T val) {
0036   while (lock.load(std::memory_order_acquire) != val) {
0037     nanosleep(0, 0);
0038   }
0039 }
0040 
0041 // syncronize all threads in a parallel section (for testing purposes)
0042 void sync(std::atomic<int>& all, int total) {
0043   ++all;
0044   spinlock(all, total);
0045 }
0046 
0047 namespace {
0048   unsigned int number_of_threads() {
0049     auto nThreads = std::thread::hardware_concurrency();
0050     return nThreads == 0 ? 1 : nThreads;
0051   }
0052 
0053   template <typename F>
0054   void parallel_run(F iFunc) {
0055     std::vector<std::thread> threads;
0056 
0057     auto nThreads = number_of_threads();
0058     for (unsigned int i = 0; i < nThreads; ++i) {
0059       threads.emplace_back([i, nThreads, iFunc] { iFunc(i, nThreads); });
0060     }
0061 
0062     for (auto& thread : threads) {
0063       thread.join();
0064     }
0065   }
0066 }  // namespace
0067 
0068 struct B {
0069   virtual ~B() {}
0070   virtual B* clone() const = 0;
0071 };
0072 
0073 struct T : public B {
0074   T(int iv = 0) : v(iv) {}
0075   int v;
0076   bool operator==(T t) const { return v == t.v; }
0077 
0078   virtual T* clone() const { return new T(*this); }
0079 };
0080 
0081 bool operator==(T const& t, B const& b) {
0082   T const* p = dynamic_cast<T const*>(&b);
0083   return p && p->v == t.v;
0084 }
0085 
0086 bool operator==(B const& b, T const& t) { return t == b; }
0087 
0088 typedef edmNew::DetSetVector<T> DSTV;
0089 typedef edmNew::DetSet<T> DST;
0090 typedef edmNew::det_id_type det_id_type;
0091 typedef DSTV::FastFiller FF;
0092 typedef DSTV::TSFastFiller TSFF;
0093 
0094 class TestDetSet : public CppUnit::TestFixture {
0095   CPPUNIT_TEST_SUITE(TestDetSet);
0096   CPPUNIT_TEST(infrastructure);
0097   CPPUNIT_TEST(fillSeq);
0098   CPPUNIT_TEST(fillPar);
0099 
0100   CPPUNIT_TEST_SUITE_END();
0101 
0102 public:
0103   TestDetSet();
0104   ~TestDetSet() {}
0105   void setUp() {}
0106   void tearDown() {}
0107 
0108   void infrastructure();
0109   void fillSeq();
0110   void fillPar();
0111 
0112 public:
0113   int nth = 1;
0114   std::vector<DSTV::data_type> sv;
0115 };
0116 
0117 CPPUNIT_TEST_SUITE_REGISTRATION(TestDetSet);
0118 
0119 TestDetSet::TestDetSet() : sv(10) {
0120   DSTV::data_type v[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
0121   std::copy(v, v + 10, sv.begin());
0122   nth = number_of_threads();
0123 }
0124 
0125 void read(DSTV const& detsets, bool all = false) {
0126   int i = 0;
0127   for (auto di = detsets.begin(false); di != detsets.end(false); ++di) {
0128     auto ds = *di;
0129     auto id = ds.id();
0130     std::cout << id << ' ';
0131     // if (all) CPPUNIT_ASSERT(int(id)==20+i);
0132     if (ds.isValid()) {
0133       CPPUNIT_ASSERT(ds[0] == 100 * (id - 20) + 3);
0134       CPPUNIT_ASSERT(ds[1] == -(100 * (id - 20) + 3));
0135     }
0136     ++i;
0137   }
0138   std::cout << std::endl;
0139 }
0140 
0141 void TestDetSet::infrastructure() {
0142   std::cout << std::endl;
0143   for (int i = 0; i < 10; i++) {
0144     int a = 0;
0145     std::atomic<int> b(0);
0146     std::atomic<int> lock(0);
0147     std::atomic<int> nt(number_of_threads());
0148     parallel_run([&a, &b, &lock, &nt](unsigned int, unsigned int) {
0149       sync(lock, nt);
0150       a++;
0151       b.fetch_add(1, std::memory_order_acq_rel);
0152     });
0153 
0154     if (i == 5)
0155       std::cout << "threads " << lock << " " << a << ' ' << b << std::endl;
0156     CPPUNIT_ASSERT(b == nt);
0157     a = 0;
0158     b = 0;
0159 
0160     parallel_run([&a, &b](unsigned int, unsigned int) {
0161       a++;
0162       b.fetch_add(1, std::memory_order_acq_rel);
0163     });
0164     if (i == 5)
0165       std::cout << "threads " << lock << " " << a << ' ' << b << std::endl;
0166 
0167     nth = nt;
0168   }
0169 }
0170 
0171 void TestDetSet::fillSeq() {
0172   std::cout << std::endl;
0173 
0174   DSTV detsets(2);
0175   // unsigned int ntot=0;
0176 
0177   std::atomic<int> lock(0);
0178   std::atomic<int> idet(0);
0179   std::atomic<int> trial(0);
0180   int maxDet = 100 * nth;
0181   parallel_run([&lock, &idet, &trial, &detsets, maxDet](unsigned int, unsigned int numberOfThreads) {
0182     sync(lock, numberOfThreads);
0183     while (true) {
0184       int ldet = idet;
0185       if (!(ldet < maxDet))
0186         break;
0187       while (!idet.compare_exchange_weak(ldet, ldet + 1))
0188         ;
0189       if (ldet >= maxDet)
0190         break;
0191       unsigned int id = 20 + ldet;
0192       bool done = false;
0193       while (!done) {
0194         try {
0195           {
0196             FF ff(detsets, id);  // serialize
0197             ff.push_back(100 * ldet + 3);
0198             CPPUNIT_ASSERT(detsets.m_data.back().v == (100 * ldet + 3));
0199             ff.push_back(-(100 * ldet + 3));
0200             CPPUNIT_ASSERT(detsets.m_data.back().v == -(100 * ldet + 3));
0201           }
0202           // read(detsets);  // cannot read in parallel while filling in this case
0203           done = true;
0204         } catch (edm::Exception const&) {
0205           trial++;
0206           //read(detsets);
0207         }
0208       }
0209     }
0210     // read(detsets);
0211   });
0212 
0213   std::cout << idet << ' ' << detsets.size() << std::endl;
0214   read(detsets, true);
0215   CPPUNIT_ASSERT(int(detsets.size()) == maxDet);
0216   std::cout << "trials " << trial << std::endl;
0217 }
0218 
0219 struct Getter final : public DSTV::Getter {
0220   Getter(TestDetSet* itest) : ntot(0), test(*itest) {}
0221 
0222   void fill(TSFF& ff) override {
0223     int n = ff.id() - 20;
0224     CPPUNIT_ASSERT(n >= 0);
0225     CPPUNIT_ASSERT(ff.size() == 0);
0226     ff.push_back((100 * n + 3));
0227     CPPUNIT_ASSERT(ff.size() == 1);
0228     CPPUNIT_ASSERT(ff[0] == 100 * n + 3);
0229     ff.push_back(-(100 * n + 3));
0230     CPPUNIT_ASSERT(ff.size() == 2);
0231     CPPUNIT_ASSERT(ff[1] == -(100 * n + 3));
0232     ntot.fetch_add(1, std::memory_order_acq_rel);
0233   }
0234 
0235   std::atomic<unsigned int> ntot;
0236   TestDetSet& test;
0237 };
0238 
0239 void TestDetSet::fillPar() {
0240   std::cout << std::endl;
0241   auto pg = std::make_shared<Getter>(this);
0242   Getter& g = *pg;
0243   int maxDet = 100 * nth;
0244   std::vector<unsigned int> v(maxDet);
0245   int k = 20;
0246   for (auto& i : v)
0247     i = k++;
0248   DSTV detsets(pg, v, 2);
0249   detsets.reserve(maxDet, 100 * maxDet);
0250   CPPUNIT_ASSERT(g.ntot == 0);
0251   CPPUNIT_ASSERT(detsets.onDemand());
0252   CPPUNIT_ASSERT(maxDet == int(detsets.size()));
0253 
0254   std::atomic<int> lock(0);
0255   std::atomic<int> idet(0);
0256 
0257   std::atomic<int> count(0);
0258 
0259   DST df31 = detsets[31];
0260 
0261   std::cout << "start parallel section" << std::endl;
0262   parallel_run([&lock, &detsets, &idet, maxDet, &g](unsigned int threadNumber, unsigned int numberOfThreads) {
0263     sync(lock, numberOfThreads);
0264     if (threadNumber % 2 == 0) {
0265       DST df = detsets[25];  // everybody!
0266       CPPUNIT_ASSERT(df.id() == 25);
0267       CPPUNIT_ASSERT(df.size() == 2);
0268       CPPUNIT_ASSERT(df[0] == 100 * (25 - 20) + 3);
0269       CPPUNIT_ASSERT(df[1] == -(100 * (25 - 20) + 3));
0270     }
0271     while (true) {
0272       if (threadNumber == 0)
0273         read(detsets);
0274       int ldet = idet.load(std::memory_order_acquire);
0275       if (!(ldet < maxDet))
0276         break;
0277       while (!idet.compare_exchange_weak(ldet, ldet + 1, std::memory_order_acq_rel))
0278         ;
0279       if (ldet >= maxDet)
0280         break;
0281       unsigned int id = 20 + ldet;
0282       {
0283         DST df = *detsets.find(id, true);
0284         CPPUNIT_ASSERT(int(g.ntot) > 0);
0285         assert(df.id() == id);
0286         assert(df.size() == 2);
0287         assert(df[0] == 100 * (id - 20) + 3);
0288         assert(df[1] == -(100 * (id - 20) + 3));
0289       }
0290       if (threadNumber == 1)
0291         read(detsets);
0292     }
0293   });
0294   std::cout << "end parallel section" << std::endl;
0295 
0296   CPPUNIT_ASSERT(df31.id() == 31);
0297   CPPUNIT_ASSERT(df31.size() == 2);
0298   CPPUNIT_ASSERT(df31[0] == 100 * (31 - 20) + 3);
0299   CPPUNIT_ASSERT(df31[1] == -(100 * (31 - 20) + 3));
0300 
0301   std::cout << "summary " << idet << ' ' << detsets.size() << ' ' << g.ntot << ' ' << count << std::endl;
0302   read(detsets, true);
0303   CPPUNIT_ASSERT(int(g.ntot) == maxDet);
0304   CPPUNIT_ASSERT(int(detsets.size()) == maxDet);
0305 }