File indexing completed on 2024-04-06 12:19:12
0001
0002 #include <cassert>
0003 #include <cstring>
0004
0005 #include "ReadRepacker.h"
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021 int ReadRepacker::pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) {
0022 reset(nbuf);
0023 m_len = len;
0024
0025
0026 char *tmp_buf;
0027 IOSize tmp_size;
0028 if (buffer_size < TEMPORARY_BUFFER_SIZE) {
0029 m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE);
0030 tmp_buf = m_spare_buffer.data();
0031 tmp_size = TEMPORARY_BUFFER_SIZE;
0032 } else {
0033 tmp_buf = buf;
0034 tmp_size = buffer_size;
0035 }
0036
0037 int pack_count = packInternal(pos, len, nbuf, tmp_buf, tmp_size);
0038
0039 if ((nbuf - pack_count > 0) &&
0040 (tmp_buf != m_spare_buffer.data()) &&
0041 ((IOSize)len[pack_count] <
0042 TEMPORARY_BUFFER_SIZE)) {
0043
0044
0045
0046 m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE);
0047
0048
0049
0050
0051 pack_count += packInternal(
0052 &pos[pack_count], &len[pack_count], nbuf - pack_count, m_spare_buffer.data(), TEMPORARY_BUFFER_SIZE);
0053 }
0054
0055 return pack_count;
0056 }
0057
0058 int ReadRepacker::packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) {
0059 if (nbuf == 0) {
0060 return 0;
0061 }
0062
0063
0064 int iopb_offset = m_iov.size();
0065
0066
0067 assert(static_cast<IOSize>(len[0]) <= buffer_size);
0068 IOPosBuffer iopb(pos[0], buf, len[0]);
0069 m_idx_to_iopb.push_back(iopb_offset);
0070 m_idx_to_iopb_offset.push_back(0);
0071
0072 IOSize buffer_used = len[0];
0073 int idx;
0074 for (idx = 1; idx < nbuf; idx++) {
0075 if (buffer_used + len[idx] > buffer_size) {
0076
0077 break;
0078 }
0079
0080 edm::storage::IOOffset extra_bytes_signed = (idx == 0) ? 0 : ((pos[idx] - iopb.offset()) - iopb.size());
0081 assert(extra_bytes_signed >= 0);
0082 IOSize extra_bytes = static_cast<IOSize>(extra_bytes_signed);
0083
0084 if (((static_cast<IOSize>(len[idx]) < BIG_READ_SIZE) || (iopb.size() < BIG_READ_SIZE)) &&
0085 (extra_bytes < READ_COALESCE_SIZE) && (buffer_used + len[idx] + extra_bytes <= buffer_size)) {
0086
0087
0088
0089
0090
0091 m_idx_to_iopb.push_back(iopb_offset);
0092 m_idx_to_iopb_offset.push_back(pos[idx] - iopb.offset());
0093 iopb.set_size(pos[idx] + len[idx] - iopb.offset());
0094 buffer_used += (len[idx] + extra_bytes);
0095 m_extra_bytes += extra_bytes;
0096 continue;
0097 }
0098
0099
0100 m_iov.push_back(iopb);
0101
0102
0103 iopb.set_offset(pos[idx]);
0104 iopb.set_data(buf + buffer_used);
0105 iopb.set_size(len[idx]);
0106
0107
0108 iopb_offset++;
0109
0110 m_idx_to_iopb.push_back(iopb_offset);
0111 m_idx_to_iopb_offset.push_back(0);
0112
0113 buffer_used += len[idx];
0114 }
0115 m_iov.push_back(iopb);
0116
0117 m_buffer_used += buffer_used;
0118 return idx;
0119 }
0120
0121
0122
0123
0124
0125 void ReadRepacker::unpack(char *buf) {
0126 char *root_result_ptr = buf;
0127 int nbuf = m_idx_to_iopb.size();
0128 for (int idx = 0; idx < nbuf; idx++) {
0129 int iov_idx = m_idx_to_iopb[idx];
0130 IOPosBuffer &iopb = m_iov[iov_idx];
0131 int iopb_offset = m_idx_to_iopb_offset[idx];
0132 char *io_result_ptr = static_cast<char *>(iopb.data()) + iopb_offset;
0133
0134
0135 memmove(root_result_ptr, io_result_ptr, m_len[idx]);
0136
0137 root_result_ptr += m_len[idx];
0138 }
0139 }
0140
0141 void ReadRepacker::reset(unsigned int nbuf) {
0142 m_extra_bytes = 0;
0143 m_buffer_used = 0;
0144
0145
0146
0147 m_iov.reserve(nbuf / 2);
0148 m_iov.clear();
0149 m_idx_to_iopb.reserve(nbuf);
0150 m_idx_to_iopb.clear();
0151 m_idx_to_iopb_offset.reserve(nbuf);
0152 m_idx_to_iopb_offset.clear();
0153 }