""" A script to generate and transfer md5 checksum files from a remote/source server to a local/destination computer. This runs on a local Linux machine or the eResearch dtn, on which the tape archive system is mounted; in our case, this is a machine at Monash. Prior to running this an ssh key pair must be shared between the systems. See https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron for details on how to do this between a Monash Linux machine and ASCI (Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher and uses the fabric module. Authors: gary.ruben@monash.edu michelle.croughan@monash.edu Running this creates two files in the same directory as this script file: 1. A .log file named based on the start-time timestamp which is a capture of all stdout activity. 2. A Python pickle file named md5_state.pickle that contains the transfer state from which failed transfers can be restarted by including the resume file (-r) flag. Known issues ------------ Note: The current version of fabric generates harmless warnings. This issue is discussed here: https://github.com/paramiko/paramiko/issues/1369 """ import os import re import sys import warnings from dataclasses import dataclass import pathlib import subprocess import pickle import pprint import time import click import textwrap from fabric import Connection @dataclass class Node: """A directory tree node""" src: str # source tree node path dest: str # destination tree node path count: int = None # number of files at the node processed: bool = False # True iff a node transfer completes class Logger(object): def __init__(self, log_filename): self.terminal = sys.stdout self.log = open(log_filename, "a") def write(self, message): self.terminal.write(message) self.log.write(message) def flush(self): self.terminal.flush() self.log.flush() def send_checksum(node, remote_login, src_path): """Checksums all files in the node.src directory and sends these to the node.dest directory across an ssh connection. The checksum file is named after the directories trailing the SRC_PATH. Permissions are set to r_x for group and owner. Args: node: Node object Contains source and destination directories as follows: src: full path to a remote node e.g. /data/13660a/asci/input dest: full path to a destination node e.g. /home/grub0002/bapcxi/vault/imbl2018 remote_login: str remote login username@url src_path: str asci src top-level directory """ # Check if there are any files in the node with Connection(remote_login) as c: files = c.run( rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'", echo=True ) files = files.stdout.strip() node.count = len(files.splitlines()) print(f"Node:{node.src}, file count:{node.count}") if node.count == 0: # No files at this node, just return print("No files at node") else: # Checksum files. if node.src == src_path: filename = os.path.basename(node.src) else: filename = node.src.replace(src_path+'/', '').replace('/', '_') output = subprocess.run( f"ssh {remote_login} 'cd {node.src}; md5sum *'" f"| cat > {node.dest}/{filename}.md5", shell=True, check=True ) print("stdout:", output.stdout) print("stderr:", output.stderr) # os.chmod(f"{node.dest}/{filename}.md5", 0o550) print(f"Checksummed {node.count} files {node.src} -> {node.dest}") node.processed = True @click.command() @click.argument("remote_login") @click.argument("experiment_name") @click.argument("src_path", type=click.Path()) @click.argument("dest_path", type=click.Path()) @click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle") @click.option("-r","resume",is_flag=True, help="If True, continue from current pickle state") @click.option("-d","display_pickle_file",is_flag=True, help="If True, just show the pickle file state") def main( remote_login, experiment_name, src_path, dest_path, pickle_filename, resume, display_pickle_file ): """ \b Example ------- $ python md5_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input A script to generate and transfer md5 checksum files from a remote/source server to a local/destination computer. This runs on a local Linux machine or the eResearch dtn, on which the tape archive system is mounted; in our case, this is a machine at Monash. Prior to running this an ssh key pair must be shared between the systems. See https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron for details on how to do this between a Monash Linux machine and ASCI (Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher and uses the fabric module. Running this creates two files in the same directory as this script file: 1. A .log file named based on the start-time timestamp which is a capture of all stdout activity. 2. A Python pickle file named md5_state.pickle that contains the transfer state from which failed transfers can be restarted by including the resume file (-r) flag. """ assert 5 <= len(experiment_name) <= 6 if pickle_filename is None: pickle_filename = experiment_name + "md5_state.pickle" path, base = os.path.split(pickle_filename) if path == "": pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename) timestamp = time.strftime("%Y-%m-%d-%H%M%S") log_filename = os.path.join( os.path.dirname(__file__), f"md5-{experiment_name}-{timestamp}.log" ) if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None: raise Exception("Invalid form for login address") """ Possible file name formats: /data/<experiment number>/asci/input /data/<experiment number>/asci/output input output output/username/working/ output/username/working """ src_file_path = src_path.split("/")[:5] if src_file_path[0] == "": # Assume full path specified assert "/".join(src_file_path[:4]) == f"/data/{experiment_name}/asci" assert src_file_path[4] == "input" or src_file_path[4] == "output" else: assert src_file_path[0] == "input" or src_file_path[0] == "output" src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path) sys.stdout = Logger(log_filename) # Log all stdout to a log file print(textwrap.dedent(f""" remote_login = {remote_login} experiment_name = {experiment_name} src_path = {src_path} dest_path = {dest_path} pickle_filename = {pickle_filename} resume = {resume} display_pickle_file = {display_pickle_file} """)) # If the resume flag is set, resume the transfer. if resume or display_pickle_file: # Read the saved transfer state from the locally pickled tree object. with open(pickle_filename, "rb") as f: tree = pickle.load(f) print("tree:") pprint.pprint(tree) if display_pickle_file: sys.exit() if resume: # Reset nodes at the end of the list with count==0 to unprocessed # This is done because we observed a failure that mistakenly reported # source tree nodes to have 0 files, so force a recheck of those. for node in reversed(tree): if node.count == 0: node.processed = False else: break else: # Get the directory tree from the remote server as a list. with Connection(remote_login) as c: result = c.run(f"find {src_path} -type d") remote_dirs = result.stdout.strip().splitlines() # Create a tree data structure that represents both source and # destination tree paths. tree = [] for src in remote_dirs: dest = src.replace(src_path, dest_path) tree.append(Node(src, dest)) # Transfer all directory tree nodes. for i, node in enumerate(tree): if not node.processed: pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True) # os.chmod(node.dest, 0o770) send_checksum(node, remote_login, src_path) # pickle the tree to keep a record of the processed state with open(pickle_filename, "wb") as f: pickle.dump(tree, f) print(f"Processed {i + 1} of {len(tree)} directory tree nodes") if __name__ == "__main__": main()