00001
00010 #include <memory>
00011 #include <vector>
00012
00013 #include <cstring>
00014
00015 struct stat;
00016
00017 class XrdSfsFile;
00018 class XrdSysError;
00019
00020 namespace TPC {
00021 class Stream {
00022 public:
00023 Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
00024 : m_open_for_write(false),
00025 m_avail_count(max_blocks),
00026 m_fh(std::move(fh)),
00027 m_offset(0),
00028 m_log(log)
00029 {
00030 m_buffers.reserve(max_blocks);
00031 for (size_t idx=0; idx < max_blocks; idx++) {
00032 m_buffers.push_back(new Entry(buffer_size));
00033 }
00034 m_open_for_write = true;
00035 }
00036
00037 ~Stream();
00038
00039 int Stat(struct stat *);
00040
00041 int Read(off_t offset, char *buffer, size_t size);
00042
00043 int Write(off_t offset, const char *buffer, size_t size);
00044
00045 size_t AvailableBuffers() const {return m_avail_count;}
00046
00047 void DumpBuffers() const;
00048
00049
00050
00051
00052
00053
00054
00055
00056 bool Finalize();
00057
00058 private:
00059
00060 class Entry {
00061 public:
00062 Entry(size_t capacity) :
00063 m_offset(-1),
00064 m_capacity(capacity),
00065 m_size(0)
00066 {}
00067
00068 bool Available() const {return m_offset == -1;}
00069
00070 int Write(Stream &stream) {
00071 if (Available() || !CanWrite(stream)) {return 0;}
00072
00073 int size_desired = m_size;
00074 int retval = stream.Write(m_offset, &m_buffer[0], size_desired);
00075 m_size = 0;
00076 m_offset = -1;
00077 if (retval != size_desired) {
00078 return -1;
00079 }
00080 return retval;
00081 }
00082
00083 bool Accept(off_t offset, const char *buf, size_t size) {
00084
00085 if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
00086 return false;
00087 }
00088 if (size > m_capacity - m_size) {
00089 return false;
00090 }
00091
00092
00093 ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
00094 if (new_bytes_needed > 0) {
00095 m_buffer.reserve(m_capacity);
00096 }
00097
00098
00099 memcpy(&m_buffer[0] + m_size, buf, size);
00100 m_size += size;
00101 if (m_offset == -1) {
00102 m_offset = offset;
00103 }
00104 return true;
00105 }
00106
00107 void ShrinkIfUnused() {
00108 if (!Available()) {return;}
00109 #if __cplusplus > 199711L
00110 m_buffer.shrink_to_fit();
00111 #endif
00112 }
00113
00114 void Move(Entry &other) {
00115 m_buffer.swap(other.m_buffer);
00116 m_offset = other.m_offset;
00117 m_size = other.m_size;
00118 }
00119
00120 off_t GetOffset() const {return m_offset;}
00121 size_t GetCapacity() const {return m_capacity;}
00122 size_t GetSize() const {return m_size;}
00123
00124 private:
00125
00126 Entry(const Entry&) = delete;
00127
00128 bool CanWrite(Stream &stream) const {
00129 return (m_size > 0) && (m_offset == stream.m_offset);
00130 }
00131
00132 off_t m_offset;
00133 size_t m_capacity;
00134 size_t m_size;
00135 std::vector<char> m_buffer;
00136 };
00137
00138 bool m_open_for_write;
00139 size_t m_avail_count;
00140 std::unique_ptr<XrdSfsFile> m_fh;
00141 off_t m_offset;
00142 std::vector<Entry*> m_buffers;
00143 XrdSysError &m_log;
00144 };
00145 }