XRootD
Loading...
Searching...
No Matches
XrdHttpTpcStream.hh
Go to the documentation of this file.
1
11
12#include <memory>
13#include <vector>
14#include <string>
15
16#include <cstring>
17
18struct stat;
19
20class XrdSysError;
21
22namespace TPC {
23class Stream {
24public:
25 Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
26 : m_open_for_write(false),
27 m_avail_count(max_blocks),
28 m_fh(std::move(fh)),
29 m_offset(0),
30 m_log(log)
31 {
32 m_buffers.reserve(max_blocks);
33 for (size_t idx=0; idx < max_blocks; idx++) {
34 m_buffers.push_back(new Entry(buffer_size));
35 }
36 m_open_for_write = true;
37 }
38
39 ~Stream();
40
41 int Stat(struct stat *);
42
43 int Read(off_t offset, char *buffer, size_t size);
44
45 // Writes a buffer of a given size to an offset.
46 // This will often keep the buffer in memory in to present the underlying
47 // filesystem with a single stream of data (required for HDFS); further,
48 // it will also buffer to align the writes on a 1MB boundary (required
49 // for some RADOS configurations). When force is set to true, it will
50 // skip the buffering and always write (this should only be done at the
51 // end of a stream!).
52 //
53 // Returns the number of bytes written; on error, returns -1 and sets
54 // the error code and error message for the stream
55 ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);
56
57 size_t AvailableBuffers() const {return m_avail_count;}
58
59 void DumpBuffers() const;
60
61 // Flush and finalize the stream. If all data has been sent to the underlying
62 // file handle, close() will be invoked on the file handle.
63 //
64 // Further write operations on this stream will result in an error.
65 // If any memory buffers remain, an error occurs.
66 //
67 // Returns true on success; false otherwise.
68 bool Finalize();
69
70 std::string GetErrorMessage() const {return m_error_buf;}
71
72private:
73
74 class Entry {
75 public:
76 Entry(size_t capacity) :
77 m_offset(-1),
78 m_capacity(capacity),
79 m_size(0)
80 {}
81
82 bool Available() const {return m_offset == -1;}
83
84 int Write(Stream &stream, bool force) {
85 if (Available() || !CanWrite(stream)) {return 0;}
86 // Only full buffer writes are accepted unless the stream forces a flush
87 // (i.e., we are at EOF) because the multistream code uses buffer occupancy
88 // to determine how many streams are currently in-flight. If we do an early
89 // write, then the buffer will be empty and the multistream code may decide
90 // to start another request (which we don't have the capacity to serve!).
91 if (!force && (m_size != m_capacity)) {
92 return 0;
93 }
94 ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
95 // Currently the only valid negative value is SFS_ERROR (-1); checking for
96 // all negative values to future-proof the code.
97 if ((retval < 0) || (static_cast<size_t>(retval) != m_size)) {
98 return -1;
99 }
100 m_offset = -1;
101 m_size = 0;
102 m_buffer.clear();
103 return retval;
104 }
105
106 size_t Accept(off_t offset, const char *buf, size_t size) {
107 // Validate acceptance criteria.
108 if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
109 return 0;
110 }
111 size_t to_accept = m_capacity - m_size;
112 if (to_accept == 0) {return 0;}
113 if (size > to_accept) {
114 size = to_accept;
115 }
116
117 // Inflate the underlying buffer if needed.
118 ssize_t new_bytes_needed = (m_size + size) - m_buffer.size();
119 if (new_bytes_needed > 0) {
120 m_buffer.resize(m_capacity);
121 }
122
123 // Finally, do the copy.
124 memcpy(&m_buffer[0] + m_size, buf, size);
125 m_size += size;
126 if (m_offset == -1) {
127 m_offset = offset;
128 }
129 return size;
130 }
131
132 void ShrinkIfUnused() {
133 if (!Available()) {return;}
134#if __cplusplus > 199711L
135 m_buffer.shrink_to_fit();
136#endif
137 }
138
139 void Move(Entry &other) {
140 m_buffer.swap(other.m_buffer);
141 m_offset = other.m_offset;
142 m_size = other.m_size;
143 }
144
145 off_t GetOffset() const {return m_offset;}
146 size_t GetCapacity() const {return m_capacity;}
147 size_t GetSize() const {return m_size;}
148
149 private:
150
151 Entry(const Entry&) = delete;
152
153 bool CanWrite(Stream &stream) const {
154 return (m_size > 0) && (m_offset == stream.m_offset);
155 }
156
157 off_t m_offset; // Offset within file that m_buffer[0] represents.
158 size_t m_capacity;
159 size_t m_size; // Number of bytes held in buffer.
160 std::vector<char> m_buffer;
161 };
162
163 ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);
164
165 bool m_open_for_write;
166 size_t m_avail_count;
167 std::unique_ptr<XrdSfsFile> m_fh;
168 off_t m_offset;
169 std::vector<Entry*> m_buffers;
170 XrdSysError &m_log;
171 std::string m_error_buf;
172};
173}
struct stat Stat
Definition XrdCks.cc:49
#define stat(a, b)
Definition XrdPosix.hh:101
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
void DumpBuffers() const
std::string GetErrorMessage() const
size_t AvailableBuffers() const