00001 #ifndef __XRDFILECACHE_CACHE_HH__
00002 #define __XRDFILECACHE_CACHE_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <string>
00021 #include <list>
00022 #include <set>
00023
00024 #include "Xrd/XrdScheduler.hh"
00025 #include "XrdVersion.hh"
00026 #include "XrdSys/XrdSysPthread.hh"
00027 #include "XrdOuc/XrdOucCache2.hh"
00028 #include "XrdOuc/XrdOucCallBack.hh"
00029 #include "XrdCl/XrdClDefaultEnv.hh"
00030 #include "XrdFileCacheFile.hh"
00031 #include "XrdFileCacheDecision.hh"
00032
00033 class XrdOucStream;
00034 class XrdSysError;
00035 class XrdSysTrace;
00036
00037 namespace XrdCl {
00038 class Log;
00039 }
00040 namespace XrdFileCache {
00041 class File;
00042 class IO;
00043 }
00044
00045
00046 namespace XrdFileCache
00047 {
00048
00050
00051 struct Configuration
00052 {
00053 Configuration() :
00054 m_hdfsmode(false),
00055 m_allow_xrdpfc_command(false),
00056 m_data_space("public"),
00057 m_meta_space("public"),
00058 m_diskTotalSpace(-1),
00059 m_diskUsageLWM(-1),
00060 m_diskUsageHWM(-1),
00061 m_fileUsageBaseline(-1),
00062 m_fileUsageNominal(-1),
00063 m_fileUsageMax(-1),
00064 m_purgeInterval(300),
00065 m_purgeColdFilesAge(-1),
00066 m_purgeColdFilesPeriod(-1),
00067 m_bufferSize(1024*1024),
00068 m_RamAbsAvailable(0),
00069 m_NRamBuffers(-1),
00070 m_wqueue_blocks(16),
00071 m_wqueue_threads(4),
00072 m_prefetch_max_blocks(10),
00073 m_hdfsbsize(128*1024*1024),
00074 m_flushCnt(2000)
00075 {}
00076
00077 bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
00078 bool is_age_based_purge_in_effect() const { return m_purgeColdFilesAge > 0; }
00079 bool is_purge_plugin_set_up() const { return false; }
00080
00081 void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu);
00082
00083 bool m_hdfsmode;
00084 bool m_allow_xrdpfc_command;
00085
00086 std::string m_username;
00087 std::string m_data_space;
00088 std::string m_meta_space;
00089
00090 long long m_diskTotalSpace;
00091 long long m_diskUsageLWM;
00092 long long m_diskUsageHWM;
00093 long long m_fileUsageBaseline;
00094 long long m_fileUsageNominal;
00095 long long m_fileUsageMax;
00096 int m_purgeInterval;
00097 int m_purgeColdFilesAge;
00098 int m_purgeColdFilesPeriod;
00099
00100 long long m_bufferSize;
00101 long long m_RamAbsAvailable;
00102 int m_NRamBuffers;
00103 int m_wqueue_blocks;
00104 int m_wqueue_threads;
00105 int m_prefetch_max_blocks;
00106
00107 long long m_hdfsbsize;
00108 long long m_flushCnt;
00109 };
00110
00111 struct TmpConfiguration
00112 {
00113 std::string m_diskUsageLWM;
00114 std::string m_diskUsageHWM;
00115 std::string m_fileUsageBaseline;
00116 std::string m_fileUsageNominal;
00117 std::string m_fileUsageMax;
00118 std::string m_flushRaw;
00119
00120 TmpConfiguration() :
00121 m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
00122 m_flushRaw("")
00123 {}
00124 };
00125
00126
00128
00129 class Cache : public XrdOucCache2
00130 {
00131 public:
00132
00134
00135 Cache(XrdSysLogger *logger);
00136
00137
00139
00140 using XrdOucCache2::Attach;
00141
00142 virtual XrdOucCacheIO2 *Attach(XrdOucCacheIO2 *, int Options = 0);
00143
00144
00146
00147 virtual int isAttached();
00148
00149
00150
00151 virtual void EnvInfo(XrdOucEnv &theEnv);
00152
00153
00154
00155
00156 virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
00157 LFP_Reason why=ForAccess);
00158
00159
00160
00161 virtual int Prepare(const char *url, int oflags, mode_t mode);
00162
00163
00164 virtual int Stat(const char *url, struct stat &sbuff);
00165
00166
00167 virtual int Unlink(const char *url);
00168
00169
00175
00176 bool Decide(XrdOucCacheIO*);
00177
00178
00180
00181 const Configuration& RefConfiguration() const { return m_configuration; }
00182
00183
00190
00191 bool Config(const char *config_filename, const char *parameters);
00192
00193
00195
00196 static Cache &CreateInstance(XrdSysLogger *logger);
00197
00198
00200
00201 static Cache &GetInstance();
00202
00203
00205
00206 static bool VCheck(XrdVersionInfo &urVersion) { return true; }
00207
00208
00210
00211 void Purge();
00212
00213
00215
00216 int UnlinkUnlessOpen(const std::string& f_name);
00217
00218
00220
00221 void AddWriteTask(Block* b, bool from_read);
00222
00223
00226
00227 void RemoveWriteQEntriesFor(File *f);
00228
00229
00231
00232 void ProcessWriteTasks();
00233
00234 bool RequestRAMBlock();
00235
00236 void RAMBlockReleased();
00237
00238 void RegisterPrefetchFile(File*);
00239 void DeRegisterPrefetchFile(File*);
00240
00241 File* GetNextFileToPrefetch();
00242
00243 void Prefetch();
00244
00245 XrdOss* GetOss() const { return m_output_fs; }
00246
00247 bool IsFileActiveOrPurgeProtected(const std::string&);
00248
00249 File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
00250
00251 void ReleaseFile(File*, IO*);
00252
00253 void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
00254
00255 void FileSyncDone(File*, bool high_debug);
00256
00257 XrdSysError* GetLog() { return &m_log; }
00258 XrdSysTrace* GetTrace() { return m_trace; }
00259
00260 void ExecuteCommandUrl(const std::string& command_url);
00261
00262 private:
00263 bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
00264 bool ConfigXeq(char *, XrdOucStream &);
00265 bool xdlib(XrdOucStream &);
00266 bool xtrace(XrdOucStream &);
00267
00268 bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
00269
00270 int UnlinkCommon(const std::string& f_name, bool fail_if_open);
00271
00272 static Cache *m_factory;
00273 static XrdScheduler *schedP;
00274
00275 XrdSysError m_log;
00276 XrdSysTrace *m_trace;
00277 const char *m_traceID;
00278
00279 XrdOucCacheStats m_stats;
00280 XrdOss *m_output_fs;
00281
00282 std::vector<XrdFileCache::Decision*> m_decisionpoints;
00283
00284 std::map<std::string, long long> m_filesInQueue;
00285
00286 Configuration m_configuration;
00287
00288 XrdSysCondVar m_prefetch_condVar;
00289 bool m_prefetch_enabled;
00290
00291 XrdSysMutex m_RAMblock_mutex;
00292 int m_RAMblocks_used;
00293 bool m_isClient;
00294
00295 struct WriteQ
00296 {
00297 WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
00298
00299 XrdSysCondVar condVar;
00300 std::list<Block*> queue;
00301 long long writes_between_purges;
00302 int size;
00303 };
00304
00305 WriteQ m_writeQ;
00306
00307
00308 typedef std::map<std::string, File*> ActiveMap_t;
00309 typedef ActiveMap_t::iterator ActiveMap_i;
00310 typedef std::set<std::string> FNameSet_t;
00311
00312 ActiveMap_t m_active;
00313 FNameSet_t m_purge_delay_set;
00314 bool m_in_purge;
00315 XrdSysCondVar m_active_cond;
00316
00317 void inc_ref_cnt(File*, bool lock, bool high_debug);
00318 void dec_ref_cnt(File*, bool high_debug);
00319
00320 void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
00321
00322
00323 typedef std::vector<File*> PrefetchList;
00324 PrefetchList m_prefetchList;
00325 };
00326
00327 }
00328
00329 #endif