Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:19:12

0001 
0002 #include <cassert>
0003 #include <cstring>
0004 
0005 #include "ReadRepacker.h"
0006 
0007 /**
0008    Given a list of offsets and positions, pack them into a vector of IOPosBuffer (an "IO Vector").
0009    This function will coalesce reads that are within READ_COALESCE_SIZE into a IOPosBuffer.
0010    This function will not create an IO vector whose summed buffer size is larger than TEMPORARY_BUFFER_SIZE. 
0011    The IOPosBuffer in iov all point to a location inside buf.
0012     
0013    @param pos: An array of file offsets, nbuf long.
0014    @param len: An array of offset length, nbuf long.
0015    @param nbuf: Number of buffers to pack.
0016    @param buf: Location of temporary buffer for the results of the storage request.
0017    @param buffer_size: Size of the temporary buffer.
0018 
0019    Returns the number of entries of the original array packed into iov.
0020  */
0021 int ReadRepacker::pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) {
0022   reset(nbuf);
0023   m_len = len;  // Record the len array so we can later unpack.
0024 
0025   // Determine the buffer to use for the initial packing.
0026   char *tmp_buf;
0027   IOSize tmp_size;
0028   if (buffer_size < TEMPORARY_BUFFER_SIZE) {
0029     m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE);
0030     tmp_buf = m_spare_buffer.data();
0031     tmp_size = TEMPORARY_BUFFER_SIZE;
0032   } else {
0033     tmp_buf = buf;
0034     tmp_size = buffer_size;
0035   }
0036 
0037   int pack_count = packInternal(pos, len, nbuf, tmp_buf, tmp_size);
0038 
0039   if ((nbuf - pack_count > 0) &&             // If there is remaining work..
0040       (tmp_buf != m_spare_buffer.data()) &&  // and the spare buffer isn't already used
0041       ((IOSize)len[pack_count] <
0042        TEMPORARY_BUFFER_SIZE)) {  // And the spare buffer is big enough to hold at least one read.
0043 
0044     // Verify the spare is allocated.
0045     // If tmp_buf != &m_spare_buffer[0] before, it certainly won't after.
0046     m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE);
0047 
0048     // If there are remaining chunks and we aren't already using the spare
0049     // buffer, try using that too.
0050     // This clutters up the code badly, but could save a network round-trip.
0051     pack_count += packInternal(
0052         &pos[pack_count], &len[pack_count], nbuf - pack_count, m_spare_buffer.data(), TEMPORARY_BUFFER_SIZE);
0053   }
0054 
0055   return pack_count;
0056 }
0057 
0058 int ReadRepacker::packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) {
0059   if (nbuf == 0) {
0060     return 0;
0061   }
0062 
0063   // Handle case 1 separately to make the for-loop cleaner.
0064   int iopb_offset = m_iov.size();
0065   // Because we re-use the buffer from ROOT, we are guarantee this iopb will
0066   // fit.
0067   assert(static_cast<IOSize>(len[0]) <= buffer_size);
0068   IOPosBuffer iopb(pos[0], buf, len[0]);
0069   m_idx_to_iopb.push_back(iopb_offset);
0070   m_idx_to_iopb_offset.push_back(0);
0071 
0072   IOSize buffer_used = len[0];
0073   int idx;
0074   for (idx = 1; idx < nbuf; idx++) {
0075     if (buffer_used + len[idx] > buffer_size) {
0076       // No way we can include this chunk in the read buffer
0077       break;
0078     }
0079 
0080     edm::storage::IOOffset extra_bytes_signed = (idx == 0) ? 0 : ((pos[idx] - iopb.offset()) - iopb.size());
0081     assert(extra_bytes_signed >= 0);
0082     IOSize extra_bytes = static_cast<IOSize>(extra_bytes_signed);
0083 
0084     if (((static_cast<IOSize>(len[idx]) < BIG_READ_SIZE) || (iopb.size() < BIG_READ_SIZE)) &&
0085         (extra_bytes < READ_COALESCE_SIZE) && (buffer_used + len[idx] + extra_bytes <= buffer_size)) {
0086       // The space between the two reads is small enough we can coalesce.
0087 
0088       // We enforce that the current read or the current iopb must be small.
0089       // This is so we can "perfectly pack" buffers consisting of only big
0090       // reads - in such a case, read coalescing doesn't help much.
0091       m_idx_to_iopb.push_back(iopb_offset);
0092       m_idx_to_iopb_offset.push_back(pos[idx] - iopb.offset());
0093       iopb.set_size(pos[idx] + len[idx] - iopb.offset());
0094       buffer_used += (len[idx] + extra_bytes);
0095       m_extra_bytes += extra_bytes;
0096       continue;
0097     }
0098     // There is a big jump, but still space left in the temporary buffer.
0099     // Record our current iopb:
0100     m_iov.push_back(iopb);
0101 
0102     // Reset iopb
0103     iopb.set_offset(pos[idx]);
0104     iopb.set_data(buf + buffer_used);
0105     iopb.set_size(len[idx]);
0106 
0107     // Record location of this chunk.
0108     iopb_offset++;
0109 
0110     m_idx_to_iopb.push_back(iopb_offset);
0111     m_idx_to_iopb_offset.push_back(0);
0112 
0113     buffer_used += len[idx];
0114   }
0115   m_iov.push_back(iopb);
0116 
0117   m_buffer_used += buffer_used;
0118   return idx;
0119 }
0120 
0121 /**
0122  * Unpack the optimized set of reads from the storage system and copy the
0123  * results in the order ROOT requested.
0124  */
0125 void ReadRepacker::unpack(char *buf) {
0126   char *root_result_ptr = buf;
0127   int nbuf = m_idx_to_iopb.size();
0128   for (int idx = 0; idx < nbuf; idx++) {
0129     int iov_idx = m_idx_to_iopb[idx];
0130     IOPosBuffer &iopb = m_iov[iov_idx];
0131     int iopb_offset = m_idx_to_iopb_offset[idx];
0132     char *io_result_ptr = static_cast<char *>(iopb.data()) + iopb_offset;
0133     // Note that we use the input buffer as a temporary where possible.
0134     // Hence, the source and destination can overlap; use memmove instead of memcpy.
0135     memmove(root_result_ptr, io_result_ptr, m_len[idx]);
0136 
0137     root_result_ptr += m_len[idx];
0138   }
0139 }
0140 
0141 void ReadRepacker::reset(unsigned int nbuf) {
0142   m_extra_bytes = 0;
0143   m_buffer_used = 0;
0144 
0145   // Number of buffers to storage typically decreases, but nbuf/2 is just an
0146   // somewhat-informed guess.
0147   m_iov.reserve(nbuf / 2);
0148   m_iov.clear();
0149   m_idx_to_iopb.reserve(nbuf);
0150   m_idx_to_iopb.clear();
0151   m_idx_to_iopb_offset.reserve(nbuf);
0152   m_idx_to_iopb_offset.clear();
0153 }