00001 #ifndef __XRDFILECACHE_FILE_HH__
00002 #define __XRDFILECACHE_FILE_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "XrdCl/XrdClXRootDResponses.hh"
00022 #include "XrdCl/XrdClDefaultEnv.hh"
00023
00024 #include "XrdOuc/XrdOucCache2.hh"
00025 #include "XrdOuc/XrdOucIOVec.hh"
00026
00027 #include "XrdFileCacheInfo.hh"
00028 #include "XrdFileCacheStats.hh"
00029
00030 #include <string>
00031 #include <map>
00032
00033 class XrdJob;
00034 class XrdOucIOVec;
00035
00036 namespace XrdCl
00037 {
00038 class Log;
00039 }
00040
00041 namespace XrdFileCache
00042 {
00043 class BlockResponseHandler;
00044 class DirectResponseHandler;
00045 class IO;
00046
00047 struct ReadVBlockListRAM;
00048 struct ReadVChunkListRAM;
00049 struct ReadVBlockListDisk;
00050 struct ReadVChunkListDisk;
00051 }
00052
00053
00054 namespace XrdFileCache
00055 {
00056
00057 class File;
00058
00059 class Block
00060 {
00061 public:
00062 std::vector<char> m_buff;
00063 long long m_offset;
00064 File *m_file;
00065 IO *m_io;
00066
00067 int m_refcnt;
00068 int m_errno;
00069 bool m_downloaded;
00070 bool m_prefetch;
00071
00072 Block(File *f, IO *io, long long off, int size, bool m_prefetch) :
00073 m_offset(off), m_file(f), m_io(io), m_refcnt(0),
00074 m_errno(0), m_downloaded(false), m_prefetch(m_prefetch)
00075 {
00076 m_buff.resize(size);
00077 }
00078
00079 char* get_buff(long long pos = 0) { return &m_buff[pos]; }
00080 int get_size() { return (int) m_buff.size(); }
00081 long long get_offset() { return m_offset; }
00082
00083 IO* get_io() const { return m_io; }
00084
00085 bool is_finished() { return m_downloaded || m_errno != 0; }
00086 bool is_ok() { return m_downloaded; }
00087 bool is_failed() { return m_errno != 0; }
00088
00089 void set_downloaded() { m_downloaded = true; }
00090 void set_error(int err) { m_errno = err; }
00091
00092 void reset_error_and_set_io(IO *io)
00093 {
00094 m_errno = 0;
00095 m_io = io;
00096 }
00097 };
00098
00099
00100
00101 class BlockResponseHandler : public XrdOucCacheIOCB
00102 {
00103 public:
00104 Block *m_block;
00105 bool m_for_prefetch;
00106
00107 BlockResponseHandler(Block *b, bool prefetch) :
00108 m_block(b), m_for_prefetch(prefetch) {}
00109
00110 virtual void Done(int result);
00111 };
00112
00113
00114
00115 class DirectResponseHandler : public XrdOucCacheIOCB
00116 {
00117 public:
00118 XrdSysCondVar m_cond;
00119 int m_to_wait;
00120 int m_errno;
00121
00122 DirectResponseHandler(int to_wait) : m_cond(0), m_to_wait(to_wait), m_errno(0) {}
00123
00124 bool is_finished() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0; }
00125 bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; }
00126 bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; }
00127
00128 virtual void Done(int result);
00129 };
00130
00131
00132
00133 class File
00134 {
00135 public:
00136
00138
00139 File(const std::string &path, long long offset, long long fileSize);
00140
00141
00143
00144 static File* FileOpen(const std::string &path, long long offset, long long fileSize);
00145
00146
00148
00149 ~File();
00150
00152 void BlockRemovedFromWriteQ(Block*);
00153
00155 void BlocksRemovedFromWriteQ(std::list<Block*>&);
00156
00158 bool Open();
00159
00161 int ReadV(IO *io, const XrdOucIOVec *readV, int n);
00162
00164 int Read (IO *io, char* buff, long long offset, int size);
00165
00166
00168
00169 bool isOpen() const { return m_is_open; }
00170
00171
00174
00175 bool ioActive(IO *io);
00176
00177
00180
00181 void RequestSyncOfDetachStats();
00182
00183
00186
00187 bool FinalizeSyncBeforeExit();
00188
00189
00191
00192 void Sync();
00193
00194
00196
00197 Stats& GetStats() { return m_stats; }
00198
00199 void ProcessBlockResponse(BlockResponseHandler* brh, int res);
00200 void WriteBlockToDisk(Block* b);
00201
00202 void Prefetch();
00203
00204 float GetPrefetchScore() const;
00205
00207 const char* lPath() const;
00208
00209 std::string& GetLocalPath() { return m_filename; }
00210
00211 XrdSysError* GetLog();
00212 XrdSysTrace* GetTrace();
00213
00214 long long GetFileSize() { return m_fileSize; }
00215
00216 void AddIO(IO *io);
00217 int GetPrefetchCountOnIO(IO *io);
00218 void StopPrefetchingOnIO(IO *io);
00219 void RemoveIO(IO *io);
00220
00221
00222
00223
00224
00225 int get_ref_cnt() { return m_ref_cnt; }
00226 int inc_ref_cnt() { return ++m_ref_cnt; }
00227 int dec_ref_cnt() { return --m_ref_cnt; }
00228
00229 void initiate_emergency_shutdown();
00230 bool is_in_emergency_shutdown() { return m_in_shutdown; }
00231
00232 private:
00233 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
00234
00235 int m_ref_cnt;
00236
00237 bool m_is_open;
00238 bool m_in_shutdown;
00239
00240 XrdOssDF *m_output;
00241 XrdOssDF *m_infoFile;
00242 Info m_cfi;
00243
00244 std::string m_filename;
00245 long long m_offset;
00246 long long m_fileSize;
00247
00248
00249
00250 struct IODetails
00251 {
00252 int m_active_prefetches;
00253 bool m_allow_prefetching;
00254 bool m_ioactive_false_reported;
00255
00256 IODetails() : m_active_prefetches(0), m_allow_prefetching(true), m_ioactive_false_reported(false) {}
00257 };
00258
00259 typedef std::map<IO*, IODetails> IoMap_t;
00260 typedef IoMap_t::iterator IoMap_i;
00261
00262 IoMap_t m_io_map;
00263 IoMap_i m_current_io;
00264 int m_ios_in_detach;
00265
00266
00267 std::vector<int> m_writes_during_sync;
00268 int m_non_flushed_cnt;
00269 bool m_in_sync;
00270
00271 typedef std::list<int> IntList_t;
00272 typedef IntList_t::iterator IntList_i;
00273
00274 typedef std::list<Block*> BlockList_t;
00275 typedef BlockList_t::iterator BlockList_i;
00276
00277 typedef std::map<int, Block*> BlockMap_t;
00278 typedef BlockMap_t::iterator BlockMap_i;
00279
00280
00281 BlockMap_t m_block_map;
00282
00283 XrdSysCondVar m_downloadCond;
00284
00285 Stats m_stats;
00286
00287 PrefetchState_e m_prefetchState;
00288
00289 int m_prefetchReadCnt;
00290 int m_prefetchHitCnt;
00291 float m_prefetchScore;
00292
00293 bool m_detachTimeIsLogged;
00294
00295 static const char *m_traceID;
00296 bool overlap(int blk,
00297 long long blk_size,
00298 long long req_off,
00299 int req_size,
00300
00301 long long &off,
00302 long long &blk_off,
00303 long long &size);
00304
00305
00306 Block* PrepareBlockRequest(int i, IO *io, bool prefetch);
00307
00308 void ProcessBlockRequest (Block *b, bool prefetch);
00309 void ProcessBlockRequests(BlockList_t& blks, bool prefetch);
00310
00311 int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks,
00312 char* buff, long long req_off, long long req_size);
00313
00314 int ReadBlocksFromDisk(IntList_t& blocks,
00315 char* req_buf, long long req_off, long long req_size);
00316
00317
00318 bool VReadValidate (const XrdOucIOVec *readV, int n);
00319 void VReadPreProcess (IO *io, const XrdOucIOVec *readV, int n,
00320 BlockList_t& blks_to_request,
00321 ReadVBlockListRAM& blks_to_process,
00322 ReadVBlockListDisk& blks_on_disk,
00323 std::vector<XrdOucIOVec>& chunkVec);
00324 int VReadFromDisk (const XrdOucIOVec *readV, int n,
00325 ReadVBlockListDisk& blks_on_disk);
00326 int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n,
00327 std::vector<ReadVChunkListRAM>& blks_to_process,
00328 std::vector<ReadVChunkListRAM>& blks_rocessed);
00329
00330 long long BufferSize();
00331
00332 void inc_ref_count(Block*);
00333 void dec_ref_count(Block*);
00334 void free_block(Block*);
00335
00336 bool select_current_io_or_disable_prefetching(bool skip_current);
00337
00338 int offsetIdx(int idx);
00339 };
00340
00341 }
00342
00343 #endif