#!/usr/bin/env python # # Find and deal with DPM lost and dark files # # usage: # python dpm-lost-and-dark-chk.py --help # examples: # python dpm-lost-and-dark-chk.py --verbose --processes 10 --stat-types=dark --fix-dark --all --include-fs-rdonly &> dpm-lost-and-dark-chk.out # __author__ = 'Petr Vokac' __date__ = 'March 2018' __version = 0.1 '''Consistency checks for DPM to detect candidates for dark and lost files (differences between DPNS and stored files).''' import os import sys import time import random import signal import select import multiprocessing import logging, logging.handlers try: import MySQLdb except: sys.exit("Could not import MySQLdb module. Please install the MySQL Python module.") try: import paramiko except: sys.exit("Could not import ssh library module. Please install the paramiko rpm.") try: import dpm except: sys.exit("Could not import dpm library module. Please install the DPM python rpm.") _log = logging.getLogger('DPMDL') ######################################################################### # modules applied on normal/dark/lost filesk ######################################################################### class BaseModule(object): def __init__(self, diskserver, diskserverfs, run = []): self._status = 'init' self._diskserver = diskserver self._diskserverfs = diskserverfs self.run_normal = 'normal' in run self.run_dark = 'dark' in run self.run_lost = 'lost' in run self._ssh = None def start(self, ssh): if self._status not in [ 'init' ]: raise Exception("invalid state transition (%s -> started)" % self._status) self._status = 'started' self._ssh = ssh def normal(self, filename, data): pass def dark(self, filename, data): pass def lost(self, filename, data): pass def finish(self): if self._status not in [ 'started' ]: raise Exception("invalid state transition (%s -> finished)" % self._status) self._status = 'finished' self._ssh = None return {} class TestModule(BaseModule): """Example of module used to deal with consistency check output.""" def __init__(self, diskserver, diskserverfs, run=['normal','dark','lost']): super(TestModule, self).__init__(diskserver, diskserverfs, run) print "%s.__init__(%s, %s, %s)" % (self.__class__.__name__, diskserver, diskserverfs, run) def start(self, ssh): super(TestModule, self).start(ssh) print "%s.start(%s)" % (self.__class__.__name__, ssh) def normal(self, filename, data): #if not self.run_normal: return super(TestModule, self).normal(filename, data) print "%s.normal(%s, %s)" % (self.__class__.__name__, filename, data) def dark(self, filename, data): #if not self.run_dark: return super(TestModule, self).dark(filename, data) print "%s.dark(%s, %s)" % (self.__class__.__name__, filename, data) def lost(self, filename, data): #if not self.run_lost: return super(TestModule, self).lost(filename, data) print "%s.lost(%s, %s)" % (self.__class__.__name__, filename, data) def finish(self): ret = super(TestModule, self).finish() print "%s.finish()" % (self.__class__.__name__, ) return ret class StatModule(BaseModule): """Get stat details about diskpool files.""" def __init__(self, diskserver, diskserverfs, run=['dark']): super(StatModule, self).__init__(diskserver, diskserverfs, run) def start(self, ssh): super(StatModule, self).start(ssh) self._cnt = 0 self._done = 0 self._time = 0 def finish(self): ret = super(StatModule, self).finish() ret['cnt'] = getattr(self, '_cnt', 0) ret['done'] = getattr(self, '_done', 0) ret['time'] = getattr(self, '_time', 0) _log.info("%s:%s stat time %0.1fs (cnt: %i, done: %i)" % (self._diskserver, self._diskserverfs, ret['time'], ret['cnt'], ret['done'])) return ret def normal(self, filename, data): self._cnt += 1 start = time.time() stat = self._stat(filename) self._time += time.time() - start if stat != None: self._done += 1 data['stat'] = stat def dark(self, filename, data): self._cnt += 1 start = time.time() stat = self._stat(filename) self._time += time.time() - start if stat != None: self._done += 1 data['stat'] = stat def _stat(self, filename): ret = None stat_filename = filename.replace(r"'", r"'\''") stat_cmd = "stat --format='%%i;%%h;%%f;%%s;%%U;%%G;%%X;%%Y;%%Z' '%s'" % stat_filename try: exit_code, stdout_lines, stderr_lines = run_ssh_command(self._ssh, stat_cmd) if exit_code != 0: raise Exception("non-zero exit code %i" % exit_code) if len(stdout_lines) == 0: raise Exception("no output") statout = stdout_lines[0].strip().split(';') if len(statout) != 9: raise Exception("ivalid output %s" % str(statout)) inode, links, mode, size, user, group, atime, mtime, ctime = statout ret = (int(inode), int(links), mode, int(size), user, group, int(atime), int(mtime), int(ctime)) except Exception, e: _log.error("%s:%s stat '%s' failed: %s" % (self._diskserver, self._diskserverfs, stat_filename, str(e))) return ret class ChksumModule(BaseModule): """Calculate checksum of the diskpool files.""" def __init__(self, diskserver, diskserverfs, run=['normal']): super(ChksumModule, self).__init__(diskserver, diskserverfs, run) def start(self, ssh): super(ChksumModule, self).start(ssh) self._cnt = 0 self._done = 0 self._time = 0 def finish(self): ret = super(ChksumModule, self).finish() ret['cnt'] = getattr(self, '_cnt', 0) ret['done'] = getattr(self, '_done', 0) ret['time'] = getattr(self, '_time', 0) _log.info("%s:%s chksum time %0.1fs (cnt: %i, done: %i)" % (self._diskserver, self._diskserverfs, ret['time'], ret['cnt'], ret['done'])) return ret def normal(self, filename, data): self._cnt += 1 start = time.time() chksum = self._chksum(filename) self._time += time.time() - start if chksum != None: self._done += 1 data['chksum'] = chksum def dark(self, filename, data): self._cnt += 1 start = time.time() chksum = self._chksum(filename) self._time += time.time() - start if chksum != None: self._done += 1 data['chksum'] = chksum def _chksum(self, filename): ret = None chksum_filename = filename.replace(r"'", r"'\''") chksum_cmd = "xrdadler32 '%s'" % chksum_filename try: exit_code, stdout_lines, stderr_lines = run_ssh_command(self._ssh, chksum_cmd) if exit_code != 0: raise Exception("non-zero exit code %i" % exit_code) if len(stdout_lines) == 0: raise Exception("no output") chksumout = stdout_lines[0].strip() if len(chksumout) != 8: raise Exception("ivalid output %s" % str(chksumout)) ret = chksumout except Exception, e: _log.error("%s:%s chksum '%s' failed: %s" % (self._diskserver, self._diskserverfs, chksum_filename, str(e))) return ret class FileModule(BaseModule): """Write consistency check details in the file.""" def __init__(self, diskserver, diskserverfs, run=['normal','dark','lost'], filename=None): if filename == None: raise Exception("FileModule filename argument can't be None") super(FileModule, self).__init__(diskserver, diskserverfs, run) self._filename = filename self._fh = None self._summary = {} def start(self, ssh): super(FileModule, self).start(ssh) if self._filename == '': self._fh = sys.stdout elif self._filename == '': self._fh = sys.stderr else: self._fh = open(self._filename, "w") def finish(self): data = super(FileModule, self).finish() for k, v in sorted(self._summary.items()): _log.info("FileModule summary %s: %s" % (k, v)) if self._fh == None: return if self._fh not in [ sys.stdout, sys.stderr ]: self._fh.close() self._fh = None return data def normal(self, filename, data): self._write('FILE', self._diskserver, filename, data) def dark(self, filename, data): self._write('DARK', self._diskserver, filename, data) def lost(self, filename, data): if data['parent_dir_exists']: self._write('LOST', self._diskserver, filename, data) else: self._write('LOSTNODIR', self._diskserver, filename, data) def _write(self, name, diskserver, filename, data): fileid = data.get('db_fileid', '') rrowid = data.get('db_rrowid', '') status = data.get('db_status', '') mrowid = data.get('db_mrowid', '') filesize = data.get('db_filesize', '') size = data.get('stat', ('', '', '', '', '', '', '', '', ''))[3] chksum = data.get('chksum', '') self._fh.write("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (name, diskserver, filename, fileid, rrowid, mrowid, status, filesize, size, chksum)) self._summary["nfiles %s" % name] = self._summary.get("nfiles %s" % name, 0) + 1 try: self._summary["size %s" % name] = self._summary.get("size %s" % name, 0) + int(filesize) except ValueError, e: _log.debug("invalid file size: %s ... %s" % (filename, filesize)) class FixModule(BaseModule): """Fix problems found by consistency checks.""" def __init__(self, diskserver, diskserverfs, run=['dark','lost']): super(FixModule, self).__init__(diskserver, diskserverfs, run) def dark(self, filename, data): rm_cmd = "rm '%s'" % filename.replace(r"'", r"'\''") try: exit_code, stdout_lines, stderr_lines = run_ssh_command(self._ssh, rm_cmd) if exit_code != 0: raise Exception("non-zero exit code %i" % exit_code) except Exception, e: _log.error("%s:%s remove dark '%s' failed: %s" % (self._diskserver, self._diskserverfs, filename, str(e))) def lost(self, filename, data): status = data.get('status', '') if status != '-': _log.info("skipping replica %s:%s with status %s" % (self._diskserver, filename, status)) return if data['parent_dir_exists']: dpm.dpm_delreplica("%s:%s" % (self._diskserver, filename)) else: _log.warn("replica directory missing, skipping delreplica %s" % (self._diskserver, filename)) ######################################################################### ######################################################################### def guess_config_files(): possible_nsconfigs = ['/opt/lcg/etc/NSCONFIG', '/usr/etc/NSCONFIG'] if os.environ.has_key('LCG_LOCATION'): possible_nsconfigs.append(os.environ['LCG_LOCATION'].rstrip('/') + '/etc/NSCONFIG') guess_nsconfig = possible_nsconfigs[0] for f in possible_nsconfigs: if os.path.exists(f): guess_nsconfig = f return guess_nsconfig def get_conn_data(nsconfig, default_ns_db = 'cns_db'): """Returns connection data from NSCONFIG.""" _log.debug("getting connection info from %s" % nsconfig) nsconfig_line = open(nsconfig).readline().strip() splitlist = [x.split('/') for x in nsconfig_line.split('@')] retval = {} retval['user'] = splitlist[0][0] retval['pass'] = splitlist[0][1] retval['host'] = splitlist[1][0] if len(splitlist[1])==2: retval['db'] = splitlist[1][1] else: retval['db'] = default_ns_db _log.debug("connection info %s" % str(retval)) return retval def get_all_dpm_fs(): result, pools = dpm.dpm_getpools() if result != 0: raise Exception("Unable to get DPM pools (error %i)" % result) ret = [] for pool in pools: #print "POOL %s, CAPACITY %i, FREE %i" % (pool.poolname, pool.capacity, pool.free) result, poolfss = dpm.dpm_getpoolfs(pool.poolname) if result != 0: raise Exception("Unable to list DPM pool %s (error %i)" % (pool.poolname, result)) for poolfs in poolfss: #print " POOL %s, SERVER %s, FS %s, WEIGHT %i, STATUS %i, CAPACITY %i, FREE %i" % (poolfs.poolname, poolfs.server, poolfs.fs, poolfs.weight, poolfs.status, poolfs.capacity, poolfs.free) ret.append(poolfs) return ret def get_dpm_fs_details(diskserver, diskserverfs): result, pools = dpm.dpm_getpools() if result != 0: raise Exception("Unable to get DPM pools (error %i)" % result) ret = [] for pool in pools: result, poolfss = dpm.dpm_getpoolfs(pool.poolname) if result != 0: raise Exception("Unable to list DPM pool %s (error %i)" % (pool.poolname, result)) for poolfs in poolfss: if poolfs.server != diskserver: continue if poolfs.fs != diskserverfs: continue _log.debug("%s:%s details - POOL %s, SERVER %s, FS %s, WEIGHT %i, STATUS %i, CAPACITY %i, FREE %i" % (diskserver, diskserverfs, poolfs.poolname, poolfs.server, poolfs.fs, poolfs.weight, poolfs.status, poolfs.capacity, poolfs.free)) return poolfs return None # execute command on remote machine and deal with buffering # for commands that produce huge stdout/stderr def run_ssh_command(client, cmd, timeout=0): _log.debug("%s running command `%s` (timeout=%s)" % (str(client.get_transport().getpeername()), cmd, timeout)) channel = client.get_transport().open_session() channel.exec_command(cmd) channel.shutdown_write() stdout_chunks = [] stderr_chunks = [] while not channel.eof_received or channel.recv_ready() or channel.recv_stderr_ready(): readq, _, _ = select.select([channel], [], [], timeout) for c in readq: if c.recv_ready(): stdout_chunks.append(channel.recv(len(c.in_buffer))) if c.recv_stderr_ready(): stderr_chunks.append(channel.recv_stderr(len(c.in_stderr_buffer))) exit_status = channel.recv_exit_status() channel.shutdown_read() channel.close() stdout_lines = [] stderr_lines = [] if len(stdout_chunks) > 0: stdout_lines = ''.join(stdout_chunks).rstrip('\n').split('\n') if len(stderr_chunks) > 0: stderr_lines = ''.join(stderr_chunks).rstrip('\n').split('\n') _log.debug("%s finished command `%s`: exit_status %i, stdout lines %i, stderr lines %i" % (str(client.get_transport().getpeername()), cmd, exit_status, len(stdout_lines), len(stderr_lines))) return (exit_status, stdout_lines, stderr_lines) def diskserverfs_check(diskserver, diskserverfs, dbcfg, modules = []): _log.debug("diskserverfs_check(%s, %s, dbcfg, %s)" % (diskserver, diskserverfs, modules)) # login to diskserver try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(diskserver, username='root', allow_agent=True, look_for_keys=True) except Exception, e: _log.error("could not ssh to %s for file listing. Passwordless ssh needed at present from runuting account to target disk server: %s" % (diskserver, str(e))) return 'SSH connection failed' for module in modules: module.start(ssh) # obtain list of all directories on given filesystem _log.debug("%s:%s find directories" % (diskserver, diskserverfs)) try: cmd = "find '%s' -type d" % diskserverfs.replace(r"'", r"'\''") exit_code, stdout_lines, stderr_lines = run_ssh_command(ssh, cmd) if exit_code != 0: raise Exception("find failed with exit code %i" % exit_code) if len(stderr_lines) > 0: raise Exception("find returned non-empty stderr with %i lines (first error): %s" % (len(stderr_lines), stderr_lines[0])) diskdirs = set([ x.rstrip('\n') for x in stdout_lines ]) except Exception, e: _log.error("remote directory find over SSH failed on %s:%s: %s" % (diskserver, diskserverfs, str(e))) return 'SSH read dirlist failed' # obtain list of all files on given filesystem _log.debug("%s:%s find files" % (diskserver, diskserverfs)) try: cmd = "find '%s' -type f" % diskserverfs.replace(r"'", r"'\''") exit_code, stdout_lines, stderr_lines = run_ssh_command(ssh, cmd) if exit_code != 0: raise Exception("find failed with exit code %i" % exit_code) if len(stderr_lines) > 0: raise Exception("find returned non-empty stderr with %i lines (first error): %s" % (len(stderr_lines), stderr_lines[0])) diskfiles = set([ x.rstrip('\n') for x in stdout_lines ]) except Exception, e: _log.error("remote file find over SSH failed on %s:%s: %s" % (diskserver, diskserverfs, str(e))) return 'SSH read filelist failed' _log.info("%s:%s found %i directories and %i files" % (diskserver, diskserverfs, len(diskdirs), len(diskfiles))) # read also all files for diskserver:diskserverfs from DPM database _log.debug("%s:%s read DPNS replicas" % (diskserver, diskserverfs)) try: dpnsfiles = {} conn = MySQLdb.connect(dbcfg['host'], dbcfg['user'], dbcfg['pass'], dbcfg['db']) cursor = conn.cursor(cursorclass=MySQLdb.cursors.SSCursor) #cursor.execute("select sfn, rowid, fileid, status from Cns_file_replica where host = %s and fs = %s", (diskserver, diskserverfs)) # MySQL doesn't support FULL OUTER JOIN, use UNION and two separate LEFT+RIGHT exclusive joins sql = """ SELECT Cns_file_replica.sfn, Cns_file_replica.fileid, Cns_file_replica.rowid, Cns_file_replica.status, Cns_file_metadata.rowid, Cns_file_metadata.filesize, Cns_file_metadata.csumtype, Cns_file_metadata.csumvalue FROM Cns_file_replica LEFT JOIN Cns_file_metadata on Cns_file_replica.fileid = Cns_file_metadata.fileid WHERE host = %s and fs = %s UNION ALL SELECT Cns_file_replica.sfn, Cns_file_replica.fileid, Cns_file_replica.rowid, Cns_file_replica.status, Cns_file_metadata.rowid, Cns_file_metadata.filesize, Cns_file_metadata.csumtype, Cns_file_metadata.csumvalue FROM Cns_file_replica RIGHT JOIN Cns_file_metadata on Cns_file_replica.fileid = Cns_file_metadata.fileid WHERE Cns_file_replica.rowid IS NULL AND host = %s and fs = %s """ cursor.execute(sql, (diskserver, diskserverfs, diskserver, diskserverfs)) for row in cursor.fetchall(): if row[0] == None: _log.warn("no replica for metadata: %s" % str(row)) continue if row[5] == None: _log.warn("no metadata for replica: %s" % str(row)) filename = row[0].split(':', 1)[1] dpnsfiles[filename] = (row[1], row[2], row[3], row[4], row[5], row[6], row[7]) cursor.close() conn.close() except Exception, e: _log.error("unable to get data from DPNS for %s:%s: %s" % (diskserver, diskserverfs, str(e))) return 'DPNS read failed' _log.info("%s:%s dpns has %i entries" % (diskserver, diskserverfs, len(dpnsfiles))) # find lost files (files in DPM database with missing file on diskserver) for sfn_local in dpnsfiles: try: fileid, rrowid, status, mrowid, filesize, csumtype, csumvalue = dpnsfiles[sfn_local] data = { 'db_fileid': fileid, 'db_rrowid': rrowid, 'db_status': status, 'db_mrowid': mrowid, 'db_filesize': filesize, 'db_csumtype': csumtype, 'db_csumvalue': csumvalue } if sfn_local in diskfiles: for module in modules: if not module.run_normal: continue module.normal(sfn_local, data) else: data['parent_dir_exists'] = os.path.dirname(sfn_local) in diskdirs for module in modules: if not module.run_lost: continue module.lost(sfn_local, data) except Exception, e: _log.error("%s:%s file -> db comparison failed for %s: %s" % (diskserver, diskserverfs, sfn_local, str(e))) _log.info("%s:%s finished file -> db check" % (diskserver, diskserverfs)) # find dark files (files on diskserver that have no record in DPM database) for sfn_local in diskfiles: try: if sfn_local in dpnsfiles: continue data = {} for module in modules: if not module.run_dark: continue module.dark(sfn_local, data) except Exception, e: _log.error("%s:%s db -> file comparision failed for %s: %s" % (diskserver, diskserverfs, sfn_local, str(e))) _log.info("%s:%s finished db -> file check" % (diskserver, diskserverfs)) ssh.close() for module in modules: module.finish() return 'DONE' def process_diskserverfs(diskserver, diskserverfs, dbcfg, modules): _log.debug("process_diskserverfs(%s, %s, %s)" % (diskserver, diskserverfs, modules)) poolfs = None try: poolfs = get_dpm_fs_details(diskserver, diskserverfs) except Exception, e: _log.error("unable to get FS details %s:%s: %s" % (diskserver, diskserverfs, str(e))) return (diskserver, diskserverfs, 'FS details failed') if poolfs == None: _log.error("missing FS %s:%s" % (diskserver, diskserverfs)) return (diskserver, diskserverfs, 'FS details unavailable') online = poolfs.status == 0 if online: # set DPM filesystem read-only before checking for DARK/LOST data # not to let DPM create/delete data files diskpool and in database # during our attempt to find inconsistencies try: _log.debug("setting FS read-only on %s:%s" % (diskserver, diskserverfs)) result = dpm.dpm_modifyfs(diskserver, diskserverfs, dpm.FS_RDONLY, -1) if result != 0: raise Exception("dpm_modifyfs returned %i" % result) except Exception, e: _log.error("unable to change FS read-only on %s:%s: %s" % (diskserver, diskserverfs, str(e))) return (diskserver, diskserverfs, 'FS RDONLY failed') retval = 'EMPTY' try: retval = diskserverfs_check(diskserver, diskserverfs, dbcfg, modules) except Exception, e: _log.error("unexpected exception %s:%s: %s" % (diskserver, diskserverfs, str(e))) retval = "Unexpected error %s" % str(e) finally: if online: # set DPM filesystem back online in case it was set read-only by this script try: _log.debug("setting FS online on %s:%s" % (diskserver, diskserverfs)) result = dpm.dpm_modifyfs(diskserver, diskserverfs, 0, -1) if result != 0: raise Exception("dpm_modifyfs returned %i" % result) except Exception, e: _log.error("unable to change FS read-only on %s:%s: %s" % (diskserver, diskserverfs, str(e))) return (diskserver, diskserverfs, 'FS ONLINE failed') return (diskserver, diskserverfs, retval) def main(): import optparse import getpass import inspect # basic logging configuration streamHandler = logging.StreamHandler(sys.stderr) streamHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S")) _log.addHandler(streamHandler) _log.setLevel(logging.WARN) # parse options from command line def opt_set_loglevel(option, opt, value, parser): loglevel = option.default if value != None: loglevel = int({ 'CRITICAL': logging.CRITICAL, 'DEBUG': logging.DEBUG, 'ERROR': logging.ERROR, 'FATAL': logging.FATAL, 'INFO': logging.INFO, 'NOTSET': logging.NOTSET, 'WARN': logging.WARN, 'WARNING': logging.WARNING, }.get(value, value)) _log.setLevel(loglevel) setattr(parser.values, option.dest, loglevel) guess_nsconfig = guess_config_files() parser = optparse.OptionParser(usage = "usage: %prog [options] [--all | --pool poolname ] [ server1:fs1 [ server2:fs2 [ ... ] ] ]", version="%prog") parser.add_option("-v", "--verbose", dest="loglevel", action="callback", callback=opt_set_loglevel, default=logging.DEBUG, help="set log level to DEBUG") parser.add_option("-q", "--quiet", dest="loglevel", action="callback", callback=opt_set_loglevel, default=logging.ERROR, help="set log level to ERROR") parser.add_option("--log-level", dest="loglevel", action="callback", callback=opt_set_loglevel, type="string", help="set log level (default: %default)") parser.add_option("--log-file", dest="logfile", metavar="FILE", help="set log file (default: %default)") parser.add_option("--log-size", dest="logsize", type="int", default=10*1024*1024, help="maximum size of log file (default: %default)") parser.add_option("--log-backup", dest="logbackup", type="int", default=2, help="number of log backup files (default: %default)") parser.add_option("-a", "--all", dest='all', action="store_true", default=False, help='check all pools (default: %default)') parser.add_option("-p", "--pool", dest='pools', action='append', default=[], help='list of pools to check (default: %default).') parser.add_option("-c", "--nsconfig", dest="nsconfig", action="store", default=guess_nsconfig, help="path to NSCONFIG with sql connection info (default: %s)" % guess_nsconfig) parser.add_option("-n", "--processes", dest="processes", type="int", default=1, help="skip filesystems with RDONLY status (default: %default)") parser.add_option("-t", "--timeout", dest="timeout", type="int", default=12*60*60, help="consistency check timeout (default: %default)") parser.add_option("-r", "--include-fs-rdonly", dest="fsrdonly", action="store_true", default=False, help="include filesystems with RDONLY status (default: %default)") parser.add_option("-b", "--include-fs-disabled", dest="fsdisabled", action="store_true", default=False, help="include filesystems with DISABLED status (default: %default)") parser.add_option("-k", "--fix-dark", dest="fixdark", action="store_true", default=False, help="remove dark data - files for which there is no corresponding DPNS entry (default: %default)") parser.add_option("-e", "--fix-lost", dest="fixlost", action="store_true", default=False, help="remove lost files - remove DPNS replicas entries with no corresponding file (default: %default)") parser.add_option("--stat-types", dest="stattypes", default="", help="stat diskpool normal or dark files (default: %default)") parser.add_option("--chksum-types", dest="chksumtypes", default="", help="checksum diskpool normal or dark files (default: %default)") parser.add_option("-o", "--output", dest="output", default="dpm-lad-%(DISKSERVER)s-%(DISKSERVERFS)s-%(POOL)s-%(TIMESTAMP)s", help="output filename template (default: %default)") parser.add_option("--output-types", dest="outputtypes", default="normal,dark,lost", help="include information about normal/dark/lost in the output file (default: %default)") (options, args) = parser.parse_args() if options.logfile == '-': _log.removeHandler(streamHandler) streamHandler = logging.StreamHandler(sys.stdout) streamHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S")) _log.addHandler(streamHandler) elif options.logfile != None and options.logfile != '': #fileHandler = logging.handlers.TimedRotatingFileHandler(options.logfile, 'midnight', 1, 4) fileHandler = logging.handlers.RotatingFileHandler(options.logfile, maxBytes=options.logsize, backupCount=options.logbackup) fileHandler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s](%(module)s:%(lineno)d) %(message)s", "%d %b %H:%M:%S")) _log.addHandler(fileHandler) _log.removeHandler(streamHandler) _log.info("command: %s" % " ".join(sys.argv)) _log.info("script: %s" % os.path.abspath(inspect.getfile(inspect.currentframe()))) _log.info("user: %s" % getpass.getuser()) if len(args) == 0 and not options.all and len(options.pools) == 0: _log.error("wrong number of args, use `%s -h` for basic help" % sys.argv[0]) sys.exit(1) dbcfg = get_conn_data(options.nsconfig) disk2pool = {} disk2online = {} for poolfs in get_all_dpm_fs(): disk2pool["%s:%s" % (poolfs.server, poolfs.fs)] = poolfs.poolname disk2online["%s:%s" % (poolfs.server, poolfs.fs)] = poolfs.status == 0 tasks = [] if options.all or len(options.pools) > 0: fs_online = 0 for poolfs in get_all_dpm_fs(): if not options.all and poolfs.poolname not in options.pools: continue if not options.fsdisabled and poolfs.status == dpm.FS_DISABLED: continue if not options.fsrdonly and poolfs.status == dpm.FS_RDONLY: continue if poolfs.status == 0: fs_online += 1 tasks.append((poolfs.server, poolfs.fs)) if fs_online <= options.processes: # TODO: this is not perfect because it doesn't prevent # all read-only filesystems in one storage pool _log.warn("%d processes could set all %d online filesystems read-only during check" % (options.processes, fs_online)) for diskfs in args: diskserver, diskserverfs = diskfs.split(':', 1) if (diskserver, diskserverfs) in tasks: continue tasks.append((diskserver, diskserverfs)) starttime = time.time() _log.info("%d filesystems checked with %d processes" % (len(tasks), options.processes)) # random order of disknodefs not to stress one disknode # with up to options.processes at the same time random.shuffle(tasks) # disable SIGINT and SIGTERM in child processes original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) #original_sigterm_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) pool = multiprocessing.Pool(processes=options.processes) #signal.signal(signal.SIGTERM, original_sigterm_handler) signal.signal(signal.SIGINT, original_sigint_handler) futures = [] for diskserver, diskserverfs in tasks: #future = pool.apply_async(process_diskserverfs, (diskserver, diskserverfs, options, sys.stdout)) #fh = open("TEST_%s_%s" % (diskserver, diskserverfs.replace('/', '_')), "w") fname = options.output % { 'POOL': disk2pool.get("%s:%s" % (diskserver, diskserverfs), 'unknown'), 'DISKSERVER': diskserver, 'DISKSERVERFS': diskserverfs.lstrip('/').replace('/', '_'), 'TIMESTAMP': time.strftime("%Y%m%d%H%M%S", time.localtime(starttime)), } # initialize modules modules = [] #modules.append(TestModule(diskserver, diskserverfs)) if options.stattypes != '': modules.append(StatModule(diskserver, diskserverfs, run=options.stattypes.split(','))) if options.chksumtypes != '': modules.append(ChksumModule(diskserver, diskserverfs, run=options.chksumtypes.split(','))) if options.output != '' and options.outputtypes != '': modules.append(FileModule(diskserver, diskserverfs, filename=fname, run=options.outputtypes.split(','))) if options.fixdark or options.fixlost: run = [] if options.fixdark: run.append('dark') if options.fixlost: run.append('lost') modules.append(FixModule(diskserver, diskserverfs, run=run)) # run constency check asynchronoutsly in separate processes future = pool.apply_async(process_diskserverfs, (diskserver, diskserverfs, dbcfg, modules), callback=lambda x: _log.info("finished %s:%s check: %s" % (diskserver, diskserverfs, x))) futures.append(future) try: timed_out = False for future in futures: time_remaining = options.timeout+10 - (time.time() - starttime) if time_remaining > 0: future.wait(time_remaining) else: timed_out = True break if timed_out: _log.error("childs terminated, reached global script run timeout %i" % options.timeout) pool.terminate() else: pool.close() except KeyboardInterrupt, e: _log.info("process terminated: %s" % str(e)) pool.terminate() pool.join() for poolfs in get_all_dpm_fs(): if poolfs.status == 0: continue if not disk2online.has_key("%s:%s" % (poolfs.server, poolfs.fs)): _log.error("unknown %s:%s online status?!?!" % (poolfs.server, poolfs.fs)) continue if not disk2online["%s:%s" % (poolfs.server, poolfs.fs)]: continue diskserver = poolfs.server diskserverfs = poolfs.fs # set DPM filesystem back online in case it was set read-only by this script try: _log.debug("setting FS online on %s:%s" % (diskserver, diskserverfs)) result = dpm.dpm_modifyfs(diskserver, diskserverfs, 0, -1) if result != 0: raise Exception("dpm_modifyfs returned %i" % result) except Exception, e: _log.error("unable to change FS read-only on %s:%s: %s" % (diskserver, diskserverfs, str(e))) _log.info("total runtime: %.2fs" % (time.time()-starttime, )) if __name__ == '__main__': main()