Skip to content
Snippets Groups Projects
md5_to_vault.py 8.92 KiB
Newer Older
A script to generate and transfer md5 checksum files from a remote/source server to a 
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.

Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu

Running this creates two files in the same directory as this script file:
1. A .log file named based on the start-time timestamp which is a capture of all
   stdout activity.
2. A Python pickle file named md5_state.pickle that contains the transfer state
   from which failed transfers can be restarted by including the resume file (-r)
   flag.

Known issues
------------
Note: The current version of fabric generates harmless warnings. This issue is
      discussed here: https://github.com/paramiko/paramiko/issues/1369

"""
import os
import sys
import warnings
from dataclasses import dataclass
import pathlib
import subprocess
import pickle
import pprint
import time
from fabric import Connection


@dataclass
class Node:
    """A directory tree node"""
    src: str                    # source tree node path
    dest: str                   # destination tree node path
    count: int = None           # number of files at the node
    processed: bool = False     # True iff a node transfer completes


class Logger(object):
    def __init__(self, log_filename):
        self.log = open(log_filename, "a")

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)  

    def flush(self):
        self.terminal.flush()
        self.log.flush()


def send_checksum(node, remote_login, src_path):
    """Checksums all files in the node.src directory and sends these to the
    node.dest directory across an ssh connection. The checksum file is named
    after the directories trailing the SRC_PATH. Permissions are set to r_x for
    group and owner.

    Args:
        node: Node object
            Contains source and destination directories as follows:
            src: full path to a remote node
                 e.g. /data/13660a/asci/input
            dest: full path to a destination node
                  e.g. /home/grub0002/bapcxi/vault/imbl2018
        remote_login: str
            remote login username@url
        src_path: str
            asci src top-level directory
    """
    # Check if there are any files in the node
    with Connection(remote_login) as c:
        files = c.run(
            rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'",
            echo=True
        )
        files = files.stdout.strip()
Gary Ruben's avatar
Gary Ruben committed
    node.count = len(files.splitlines())
    print(f"Node:{node.src}, file count:{node.count}")
    if node.count == 0:
        # No files at this node, just return
            filename = os.path.basename(node.src)
        else:
            filename = node.src.replace(src_path+'/', '').replace('/', '_')
            f"ssh {remote_login} 'cd {node.src}; md5sum *'"
            f"| cat > {node.dest}/{filename}.md5",
            shell=True,
            check=True
        )
        print("stdout:", output.stdout)
        print("stderr:", output.stderr)
        # os.chmod(f"{node.dest}/{filename}.md5", 0o550)
        print(f"Checksummed {node.count} files {node.src} -> {node.dest}")
@click.command()
@click.argument("remote_login")
@click.argument("experiment_name")
@click.argument("src_path", type=click.Path())
@click.argument("dest_path", type=click.Path())
@click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle")
@click.option("-r","resume",is_flag=True, help="If True, continue from current pickle state")
@click.option("-d","display_pickle_file",is_flag=True, help="If True, just show the pickle file state")
def main(
    remote_login,
    experiment_name,
    src_path,
    dest_path,
    pickle_filename,
    resume,
    display_pickle_file
):
    """
    \b
    Example
    -------
    $ python md5_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input

    A script to generate and transfer md5 checksum files from a remote/source server to a 
    local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
    which the tape archive system is mounted; in our case, this is a machine at Monash.
    Prior to running this an ssh key pair must be shared between the systems. See
    https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
    for details on how to do this between a Monash Linux machine and ASCI
    (Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
    and uses the fabric module.

    Running this creates two files in the same directory as this script file:
    1. A .log file named based on the start-time timestamp which is a capture of all
    stdout activity.
    2. A Python pickle file named md5_state.pickle that contains the transfer state
    from which failed transfers can be restarted by including the resume file (-r)
    flag.

    """
    assert 5 <= len(experiment_name) <= 6
    if pickle_filename is None:
        pickle_filename = experiment_name + "md5_state.pickle"

    path, base = os.path.split(pickle_filename)

    if path == "":
        pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename)

    timestamp = time.strftime("%Y-%m-%d-%H%M%S")
    log_filename = os.path.join(
        os.path.dirname(__file__),
        f"md5-{experiment_name}-{timestamp}.log"
    )

    if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None:
        raise Exception("Invalid form for login address")

    """
    Possible file name formats:
    /data/<experiment number>/asci/input
    /data/<experiment number>/asci/output
    input
    output
    output/username/working/
    output/username/working
    """

    src_file_path = src_path.split("/")[:5]

    if src_file_path[0] == "":
        # Assume full path specified
        assert "/".join(src_file_path[:4]) == f"/data/{experiment_name}/asci"
        assert src_file_path[4] == "input" or src_file_path[4] == "output"
    else:
        assert src_file_path[0] == "input" or src_file_path[0] == "output"
        src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path)

    sys.stdout = Logger(log_filename)       # Log all stdout to a log file

    print(textwrap.dedent(f"""
        remote_login = {remote_login}
        experiment_name = {experiment_name}
        src_path = {src_path}
        dest_path = {dest_path}
        pickle_filename = {pickle_filename}
        resume = {resume}
        display_pickle_file = {display_pickle_file}
    """))

    # If the resume flag is set, resume the transfer.
    if resume or display_pickle_file:
        # Read the saved transfer state from the locally pickled tree object.
        with open(pickle_filename, "rb") as f: 
        if display_pickle_file:
            sys.exit()

        if resume:
            # Reset nodes at the end of the list with count==0 to unprocessed
            # This is done because we observed a failure that mistakenly reported
            # source tree nodes to have 0 files, so force a recheck of those.
            for node in reversed(tree):
                if node.count == 0:
                    node.processed = False
                else:
                    break
        # Get the directory tree from the remote server as a list.
        with Connection(remote_login) as c:
            result = c.run(f"find {src_path} -type d")
        remote_dirs = result.stdout.strip().splitlines()

        # Create a tree data structure that represents both source and
        # destination tree paths.
        tree = []
        for src in remote_dirs:
            dest = src.replace(src_path, dest_path)
    # Transfer all directory tree nodes.
    for i, node in enumerate(tree):
        if not node.processed:
            pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True)
            # os.chmod(node.dest, 0o770)
            send_checksum(node, remote_login, src_path)

        # pickle the tree to keep a record of the processed state
        with open(pickle_filename, "wb") as f:
            pickle.dump(tree, f)

        print(f"Processed {i + 1} of {len(tree)} directory tree nodes")