17#define dprintf(...) printf(__VA_ARGS__)
19#define dprintf(...) (void(0))
27 const char *m_traceID =
"ResourceMonitor";
48 m_dir_scan_mutex.
Lock();
49 if (m_dir_scan_in_progress) {
50 bool dir_checked =
false;
51 m_dir_scan_open_requests.push_back({lfn, cond, dir_checked});
54 while ( ! dir_checked)
62void ResourceMonitor::process_inter_dir_scan_open_requests(
FsTraversal &fst)
64 m_dir_scan_mutex.
Lock();
65 while ( ! m_dir_scan_open_requests.empty())
67 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
70 cross_check_or_process_oob_lfn(lcr.f_lfn, fst);
76 m_dir_scan_mutex.
Lock();
77 m_dir_scan_open_requests.pop_front();
82void ResourceMonitor::cross_check_or_process_oob_lfn(
const std::string &lfn,
FsTraversal &fst)
86 static const char *trc_pfx =
"cross_check_or_process_oob_lfn() ";
91 DirState *last_existing_ds =
nullptr;
96 size_t pos = lfn.find_last_of(
"/");
97 std::string dir = (pos == std::string::npos) ?
"" : lfn.substr(0, pos);
102 fst.slurp_dir_ll(*dhp, ds->m_depth, dir.c_str(), trc_pfx);
105 DirUsage &here = ds->m_here_usage;
106 for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
108 if (it->second.has_data && it->second.has_cinfo) {
109 here.m_StBlocks += it->second.stat_data.st_blocks;
120 dprintf(
"In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
131 dprintf(
"would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
132 it->first.c_str(), it->second.has_data, it->second.has_cinfo);
140 if (it->second.has_data && it->second.has_cinfo) {
141 here.
m_StBlocks += it->second.stat_data.st_blocks;
149 std::vector<std::string> dirs;
152 if (++m_dir_scan_check_counter >= 100)
154 process_inter_dir_scan_open_requests(fst);
155 m_dir_scan_check_counter = 0;
160 for (
auto &dname : dirs)
197 m_dir_scan_in_progress =
false;
198 m_dir_scan_check_counter = 0;
201 while ( ! m_dir_scan_open_requests.empty())
203 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
205 lcr.f_checked =
true;
209 m_dir_scan_open_requests.pop_front();
227 static const char *trc_pfx =
"process_queues() ";
238 n_records += m_file_open_q.swap_queues();
239 n_records += m_file_update_stats_q.swap_queues();
240 n_records += m_file_close_q.swap_queues();
241 n_records += m_file_purge_q1.swap_queues();
242 n_records += m_file_purge_q2.swap_queues();
243 n_records += m_file_purge_q3.swap_queues();
247 for (
auto &i : m_file_open_q.read_queue())
250 AccessToken &at =
token(i.id);
251 dprintf(
"process file open for token %d, time %ld -- %s\n",
252 i.id, i.record.m_open_time, at.m_filename.c_str());
257 DirState *last_existing_ds =
nullptr;
263 if ( ! i.record.m_existing_file) {
266 while (pp != last_existing_ds) {
275 for (
auto &i : m_file_update_stats_q.read_queue())
278 AccessToken &at =
token(i.id);
281 dprintf(
"process file update for token %d, %p -- %s\n",
282 i.id, ds, at.m_filename.c_str());
285 m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
288 for (
auto &i : m_file_close_q.read_queue())
291 AccessToken &at =
token(i.id);
292 dprintf(
"process file close for token %d, time %ld -- %s\n",
293 i.id, i.record.m_close_time, at.m_filename.c_str());
303 for (
auto &i : m_file_close_q.read_queue())
304 m_access_tokens_free_slots.push_back(i.id);
307 for (
auto &i : m_file_purge_q1.read_queue())
313 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
315 for (
auto &i : m_file_purge_q2.read_queue())
320 TRACE(
Error, trc_pfx <<
"DirState not found for directory path '" << i.id <<
"'.");
326 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
328 for (
auto &i : m_file_purge_q3.read_queue())
333 TRACE(
Error, trc_pfx <<
"DirState not found for LFN path '" << i.id <<
"'.");
338 m_current_usage_in_st_blocks -= i.record;
353 static const char *tpfx =
"heart_beat() ";
357 const int s_queue_proc_interval = 10;
359 const int s_purge_check_interval = 60;
365 time_t now = time(0);
366 time_t next_queue_proc_time = now + s_queue_proc_interval;
367 time_t next_sshot_report_time = (now / 60) * 60 + 60;
368 time_t next_purge_check_time = now + s_purge_check_interval;
369 time_t next_purge_report_time = now + s_purge_report_interval;
370 time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
374 time_t start = time(0);
375 time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
376 next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
378 if (next_event > start)
380 unsigned int t_sleep = next_event - start;
381 TRACE(
Debug, tpfx <<
"sleeping for " << t_sleep <<
" seconds until the next beat.");
396 time_t queue_swap_time = time(0);
400 next_queue_proc_time = queue_swap_time + s_queue_proc_interval;
401 TRACE(
Debug, tpfx <<
"process_queues -- n_records=" << n_processed);
410 bool do_sshot_report = next_sshot_report_time <= now;
411 bool do_purge_check = next_purge_check_time <= now;
412 bool do_purge_report = next_purge_report_time <= now;
413 bool do_purge_cold_files = next_purge_cold_files_time <= now;
416 if (do_sshot_report || do_purge_check || do_purge_report || do_purge_cold_files)
418 unlink_func unlink_foo = [&](
const std::string &dp)->
int {
419 int ret = m_oss.
Unlink(dp.c_str());
421 TRACE(
Info, tpfx <<
"Empty dir unlink error: " << ret <<
" at " << dp);
423 TRACE(
Debug, tpfx <<
"Empty dir unlink success: " << dp);
451 next_sshot_report_time = ((now + 1) / s_sshot_report_interval) * s_sshot_report_interval + s_sshot_report_interval;
467 const char* dumpfile =
"/pfc-stats/DirStat.json";
472 if (do_purge_check || do_purge_report || do_purge_cold_files)
476 next_purge_check_time = now + s_purge_check_interval;
477 if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
478 if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
490 std::vector<DirStateElement> &vec,
493 int pos = vec.size();
494 int n_children = parent_ds.
m_subdirs.size();
500 if (n_children == 0)
return;
507 if (parent_ds.
m_depth < max_depth)
519 std::vector<DirPurgeElement> &vec,
522 int pos = vec.size();
523 int n_children = parent_ds.
m_subdirs.size();
529 if (n_children == 0)
return;
536 if (parent_ds.
m_depth < max_depth)
552 static const char *trc_pfx =
"update_vs_and_file_usage_info() ";
558 if (m_oss.
StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
559 TRACE(
Error, trc_pfx <<
"can't get StatVS for oss space '" << conf.m_data_space <<
"'. This is a fatal error.");
564 m_fs_state.
m_file_usage = 512ll * m_current_usage_in_st_blocks;
565 if (m_oss.
StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
566 TRACE(
Error, trc_pfx <<
"can't get StatVS for oss space '" << conf.m_meta_space <<
"'. This is a fatal error.");
573long long ResourceMonitor::get_file_usage_bytes_to_remove(
const DataFsPurgeshot &ps,
long long write_estimate,
int tl)
589 long long delta = write_estimate;
590 TRACE_INT(tl,
"file usage increased since the previous purge interval in bytes: " << delta );
592 long long bytes_to_remove = 0;
595 auto clamp = [&x, &bytes_to_remove](
long long lowval,
long long highval)
598 long long newval = val - bytes_to_remove;
607 if (newval > highval)
609 return val - highval;
612 return bytes_to_remove;
623 float frac_u =
static_cast<float>(u - w2) / (T - w2);
624 float frac_x =
static_cast<float>(x - f0) / (f1 - f0);
628 bytes_to_remove = u -w1;
635 bytes_to_remove = (frac_x - frac_u) * (f1 - f0);
636 bytes_to_remove += delta;
637 bytes_to_remove = clamp(f0, f1);
642 bytes_to_remove = clamp(f0, f2);
644 return bytes_to_remove;
650 if (u > w1 && x > f1)
652 float frac_u =
static_cast<float>(u - w1) / (w2 - w1);
653 float frac_x =
static_cast<float>(x - f1) / (f2 - f1);
656 TRACE_INT(tl,
"Disproportional file quota usage comapared to disc usage (frac_x/frac_u) = " << frac_x <<
"/"<< frac_u);
657 bytes_to_remove = (frac_x - frac_u) * (f2 - f1);
658 bytes_to_remove += delta;
662 bytes_to_remove = clamp(f0, f2);
663 return bytes_to_remove;
673 TRACE_INT(tl,
"File usage exceeds maxim file usage. Total disk usage is under lowWatermark. Clearing to low file usage.");
674 long long f2delta = std::max(f2 - delta, f0);
675 bytes_to_remove = clamp(f0, f2delta);
676 return bytes_to_remove;
679 return bytes_to_remove;
684 static const char *trc_pfx =
"perform_purge_check() ";
687 std::unique_ptr<DataFsPurgeshot> psp(
new DataFsPurgeshot(m_fs_state) );
695 TRACE_INT(tl, trc_pfx <<
"Purge check:");
729 TRACE(
Info, trc_pfx <<
"purge not required.");
734 TRACE(Warning, trc_pfx <<
"purge required but previous purge task is still active!");
738 TRACE(
Info, trc_pfx <<
"scheduling purge task.");
754 dprintf(
"purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
762 struct PurgeDriverJob :
public XrdJob
767 XrdJob(
"XrdPfc::ResourceMonitor::PurgeDriver"),
768 m_purge_shot_ptr(psp)
776 delete m_purge_shot_ptr;
830 m_dir_scan_in_progress =
true;
835 const char *tpfx =
"main_thread_function ";
837 time_t is_start = time(0);
839 TRACE(
Info, tpfx <<
"Stating initial directory scan.");
842 TRACE(
Error, tpfx <<
"Initial directory scan has failed. This is a terminal error, aborting.")
847 time_t is_duration = time(0) - is_start;
848 TRACE(
Info, tpfx <<
"Initial directory scan complete, duration=" << is_duration <<
"s");
852 TRACE(
Info, tpfx <<
"First process_queues finished, n_records=" << n_proc_is);
855 if (is_duration > 30 || n_proc_is > 3000)
857 m_file_open_q.shrink_read_queue();
858 m_file_update_stats_q.shrink_read_queue();
859 m_file_close_q.shrink_read_queue();
860 m_file_purge_q1.shrink_read_queue();
861 m_file_purge_q2.shrink_read_queue();
862 m_file_purge_q3.shrink_read_queue();
908 time_t heartbeat_start = time(0);
952 int heartbeat_duration = time(0) - heartbeat_start;
957 int sleep_time = 60 - heartbeat_duration;
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
void Proto_ResourceMonitorHeartBeat()
#define TRACE_INT(act, x)
virtual int Opendir(const char *path, XrdOucEnv &env)
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
virtual XrdOssDF * newDir(const char *tident)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
static const Configuration & Conf()
static ResourceMonitor & ResMon()
void ClearPurgeProtectedSet()
static Cache & GetInstance()
Singleton access.
static XrdScheduler * schedP
long long WritesSinceLastCall()
PurgePin * GetPurgePin() const
void AddUp(const DirStats &s)
int m_NDirectoriesCreated
long long m_StBlocksRemoved
std::vector< std::string > m_current_dirs
std::string m_current_path
bool begin_traversal(DirState *root, const char *root_path)
std::set< std::string > m_protected_top_dirs
bool cd_down(const std::string &dir_name)
XrdOucEnv & default_env()
std::map< std::string, FilePairStat > m_current_files
Status of cached file. Can be read from and written into a binary file.
virtual bool CallPeriodically()
bool m_purge_task_complete
AccessToken & token(int i)
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
ResourceMonitor(XrdOss &oss)
void perform_purge_task(DataFsPurgeshot &ps)
void scan_dir_and_recurse(FsTraversal &fst)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
XrdSysCondVar m_purge_task_cond
void perform_purge_task_cleanup()
void update_vs_and_file_usage_info()
bool perform_initial_scan()
time_t m_purge_task_start
void main_thread_function()
void Schedule(XrdJob *jp)
std::function< int(const std::string &)> unlink_func
void OldStylePurgeDriver(DataFsPurgeshot &ps)
Contains parameters configurable from the xrootd config file.
long long m_RamAbsAvailable
available from configuration
long long m_diskTotalSpace
total disk space on configured partition or oss space
long long m_fileUsageMax
cache purge - files usage maximum
long long m_fileUsageBaseline
cache purge - files usage baseline
int m_dirStatsStoreDepth
depth to which statistics should be collected
long long m_diskUsageHWM
cache purge - disk usage high water mark
bool are_file_usage_limits_set() const
long long m_fileUsageNominal
cache purge - files usage nominal
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
long long m_diskUsageLWM
cache purge - disk usage low water mark
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
bool is_age_based_purge_in_effect() const
int m_purgeInterval
sleep interval between cache purges
bool is_dir_stat_reporting_on() const
std::vector< DirPurgeElement > m_dir_vec
long long m_bytes_to_remove
long long m_estimated_writes_from_writeq
void write_json_file(const std::string &fname, XrdOss &oss, bool include_preamble)
std::vector< DirStateElement > m_dir_states
void reset_stats(time_t last_update)
void dump_recursively(int max_depth) const
void init_stat_reset_times(time_t t)
DirState * find_dirstate_for_lfn(const std::string &lfn, DirState **last_existing_dir=nullptr)
time_t m_sshot_stats_reset_time
void update_stats_and_usages(time_t last_update, bool purge_empty_dirs, unlink_func unlink_foo)
void reset_sshot_stats(time_t last_update)
DirUsage m_recursive_subdir_usage
int count_dirs_to_level(int max_depth) const
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
void upward_propagate_initial_scan_usages()