00001 #ifndef __XRDFILECACHE_PREFETCH_HH__
00002 #define __XRDFILECACHE_PREFETCH_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <string>
00022 #include <queue>
00023
00024 #include "XrdCl/XrdClDefaultEnv.hh"
00025
00026 #include "XrdFileCacheInfo.hh"
00027 #include "XrdFileCacheStats.hh"
00028
00029 class XrdJob;
00030 class XrdOucIOVec;
00031
00032 namespace XrdCl
00033 {
00034 class Log;
00035 }
00036
00037 namespace XrdFileCache
00038 {
00039
00041
00042 class Prefetch
00043 {
00044 friend class IOEntireFile;
00045 friend class IOFileBlock;
00046 enum ReadRamState_t { kReadWait, kReadSuccess, kReadFailed};
00047
00048 struct Task;
00049 public:
00050
00052
00053 Prefetch(XrdOucCacheIO& inputFile, std::string& path,
00054 long long offset, long long fileSize);
00055
00056
00058
00059 ~Prefetch();
00060
00061
00063
00064 void Run();
00065
00066
00068
00069 Stats& GetStats() { return m_stats; }
00070
00071
00073
00074 void WriteBlockToDisk(int ramIdx, size_t size);
00075
00076
00078
00079 void DecRamBlockRefCount(int ramIdx);
00080
00081
00084
00085 bool InitiateClose();
00086
00087
00089
00090 void Sync();
00091
00092
00093 protected:
00095 ssize_t Read(char * buff, off_t offset, size_t size);
00096
00098 int ReadV (const XrdOucIOVec *readV, int n);
00099
00101 void AppendIOStatToFileInfo();
00102
00103 private:
00104
00106
00107 struct Task
00108 {
00109 int ramBlockIdx;
00110 XrdSysCondVar *condVar;
00111
00112 Task(): ramBlockIdx(-1), condVar(0) {}
00113 Task(int r, XrdSysCondVar *cv):
00114 ramBlockIdx(r), condVar(cv) {}
00115 ~Task() {}
00116 };
00117
00118 struct RAMBlock {
00119 int fileBlockIdx;
00120 int refCount;
00121 bool fromRead;
00122 ReadRamState_t status;
00123 int readErrno;
00124
00125 RAMBlock():fileBlockIdx(-1), refCount(0), fromRead(false), status(kReadWait) {}
00126 };
00127
00128 struct RAM
00129 {
00130 int m_numBlocks;
00131 char* m_buffer;
00132 RAMBlock* m_blockStates;
00133 XrdSysCondVar m_writeMutex;
00134
00135 RAM();
00136 ~RAM();
00137 };
00138
00140 void CloseCleanly();
00141
00143 Task* GetNextTask();
00144
00146 bool Open();
00147
00149 XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }
00150
00152 ssize_t ReadInBlocks( char* buff, off_t offset, size_t size);
00153
00155 Task* CreateTaskForFirstUndownloadedBlock();
00156
00158 bool ReadFromTask(int bIdx, char* buff, long long off, size_t size);
00159
00161 void DoTask(Task* task);
00162
00164 const char* lPath() const;
00165
00166 RAM m_ram;
00167
00168 XrdOssDF *m_output;
00169 XrdOssDF *m_infoFile;
00170 Info m_cfi;
00171 XrdOucCacheIO &m_input;
00172
00173 std::string m_temp_filename;
00174
00175 long long m_offset;
00176 long long m_fileSize;
00177
00178 bool m_started;
00179 bool m_failed;
00180 bool m_stopping;
00181 bool m_stopped;
00182 XrdSysCondVar m_stateCond;
00183
00184 XrdSysMutex m_downloadStatusMutex;
00185
00186 std::deque<Task*> m_tasks_queue;
00187 XrdSysCondVar m_queueCond;
00188
00189 Stats m_stats;
00190
00191
00192 XrdSysMutex m_syncStatusMutex;
00193 XrdJob *m_syncer;
00194 std::vector<int> m_writes_during_sync;
00195 int m_non_flushed_cnt;
00196 bool m_in_sync;
00197 };
00198 }
00199 #endif