Skip to content
Snippets Groups Projects
asci_to_vault.py 10.6 KiB
Newer Older
Gary Ruben's avatar
Gary Ruben committed
"""
A script to transfer a tree of data files from a remote/source server to a
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/ssh+between+MASSIVE+filesystem+and+ASCI
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.

Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu

Note that current version creates two files in the same directory as this script
1. A .log file named based on the start-time timestamp which is a capture of all
   stdout activity.
2. A Python pickle file named tree_state.pickle that contains the transfer state
   from which failed transfers can be restarted by setting the resume
   file to True.

Known issues
------------
Note: Some versions of fabric generate a harmless warning, which can be ignored. This
      issue is discussed here: https://github.com/paramiko/paramiko/issues/1369
This is a possible option for checksumming:
https://stackoverflow.com/q/45819356/
KERNEL_CHECKSUM=$(cpio --to-stdout -i kernel.fat16 < archive.cpio  | sha256sum | awk "{print $1}")
Gary Ruben's avatar
Gary Ruben committed

We used the following command to check whether a transfer was successful
immediately prior to a failure of the ASCI filesystem.
The command to count the number of files in a tarball
$ tar -tf Lamb_Lung_Microfil_CT_18011B_right_CT.tar | wc -l
75920

Gary Ruben's avatar
Gary Ruben committed
"""
import os
Gary Ruben's avatar
Gary Ruben committed
import warnings
from dataclasses import dataclass
import pathlib
import time
from fabric import Connection
Gary Ruben's avatar
Gary Ruben committed

def escape_parens(path):
    """ Explicitly escape parentheses. This is required to work around a bug in Fabric's
    Invoke module. See my question on Stackoverflow:
    https://stackoverflow.com/q/63225018/607587
    The recommended workaround, until Fabric fixes the bug, is to just "manually escape
    the parentheses"

    I used this method: https://stackoverflow.com/a/23563806/607587

    """
    replacements = {"(":"\(", ")":"\)"}
    escaped_path = "".join([replacements.get(c, c) for c in path])
    return escaped_path


def escape_path(path):
    """ Explicitly escape parentheses AND spaces.
    I used this method: https://stackoverflow.com/a/23563806/607587

    """
    # kludge; first (un)escape any already escaped characters
    unreplacements = {"\(":"(", "\)":")", "\ ":" "}
    unescaped_path = "".join([unreplacements.get(c, c) for c in path])
    # Now escape unescaped spaces, plus any unescaped parens
    replacements = {"(":"\(", ")":"\)", " ":"\ "}
    escaped_path = "".join([replacements.get(c, c) for c in unescaped_path])
    return escaped_path


Gary Ruben's avatar
Gary Ruben committed
@dataclass
class Node:
    """A directory tree node"""
    src: str                    # source tree node path
    dest: str                   # destination tree node path
    count: int = None           # number of files at the node
    processed: bool = False     # True iff a node transfer completes
Gary Ruben's avatar
Gary Ruben committed


class Logger(object):
        self.terminal = sys.stdout

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)  

    def flush(self):
        self.terminal.flush()
        self.log.flush()


def send_directory(node, remote_login, src_path):
Gary Ruben's avatar
Gary Ruben committed
    """Sends all files in the node.src directory to the node.dest directory
Gary Ruben's avatar
Gary Ruben committed
    across an ssh connection.

    Different methods are used for single versus multiple files. For single
    files, scp is used. For multiple files cpio is used to tar the files into a
Gary Ruben's avatar
Gary Ruben committed
    single tarball. The destination tarball is named after the directories
    trailing src_path. Permissions are set to r_x for group and owner.
Gary Ruben's avatar
Gary Ruben committed

    Args:
        node: Node object
Gary Ruben's avatar
Gary Ruben committed
            Contains source and destination directory information as follows:
Gary Ruben's avatar
Gary Ruben committed
            src: full path to a remote node
                 e.g. /data/13660a/asci/input
            dest: full path to a destination node
                  e.g. /home/grub0002/bapcxi/vault/imbl2018
Gary Ruben's avatar
Gary Ruben committed
            count: number of files at the remote node
        remote_login: str
            remote login username@url
        src_path: str
            asci src top-level directory
Gary Ruben's avatar
Gary Ruben committed

    """
Gary Ruben's avatar
Gary Ruben committed
    # Check if there are any files in the node.
        with c.cd(escape_parens(node.src)):
            result = c.run(r"nice find -maxdepth 1 -type f -printf '%f\n'", echo=True)

    files = result.stdout.strip()
Gary Ruben's avatar
Gary Ruben committed
    node.count = len(files.splitlines())
    print(f"Node:{node.src}, file count:{node.count}")
        # No files at this node, just return
Gary Ruben's avatar
Gary Ruben committed
    else:
            filename = os.path.basename(node.src)
        else:
            filename = node.src.replace(src_path + "/", "").replace("/", "_")
        cmd_src = escape_path(node.src)
        cmd_dest = escape_path(node.dest)
        cmd_filename = escape_path(filename)
            f'ssh {remote_login} "cd {cmd_src};'
            f'nice find -maxdepth 1 -type f -printf \'%f\\0\' |'
            f'xargs -0 tar -cf - " | cat > {cmd_dest}/{cmd_filename}.tar',
        print("stdout:", output.stdout)
        print("stderr:", output.stderr)
        # os.chmod(f"{node.dest}/{filename}.tar", 0o550)
        print(f"Transferred {node.count} files {node.src} -> {node.dest}")
Gary Ruben's avatar
Gary Ruben committed

Gary Ruben's avatar
Gary Ruben committed

@click.command()
@click.argument("remote_login")
@click.argument("experiment_name")
@click.argument("src_path", type=click.Path())
@click.argument("dest_path", type=click.Path())
@click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle")
@click.option("-r","resume",is_flag=True, help="If True, continue from current pickle state")
@click.option("-d","display_pickle_file",is_flag=True, help="If True, just show the pickle file state")
def main(
    remote_login,
    experiment_name,
    src_path,
    dest_path,
    pickle_filename,
    resume,
    display_pickle_file
):
    """
    \b
    Example
    -------
    $ python asci_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input

    A script to transfer a tree of data files from a remote/source server to a
    local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
    which the tape archive system is mounted; in our case, this is a machine at Monash.
    Prior to running this an ssh key pair must be shared between the systems. See
    https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
    for details on how to do this between a Monash Linux machine and ASCI
    (Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
    and uses the fabric module.

    Note that current version creates two files in the same directory as this script
    1. A .log file named based on the start-time timestamp which is a capture of all
    stdout activity.
    2. A Python pickle file named tree_state.pickle that contains the transfer state
    from which failed transfers can be restarted by setting the resume
    if pickle_filename is None:
        pickle_filename = experiment_name+".pickle"

    path, base = os.path.split(pickle_filename)

    if path == "":
        pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename)

    timestamp = time.strftime("%Y-%m-%d-%H%M%S")

    log_filename = os.path.join(
        os.path.dirname(__file__),
        f"{experiment_name}-{timestamp}.log"
    )

    if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None:
        raise Exception("Invalid form for login address")
    """
    Possible file name formats:
    /data/<experiment number>/asci/input
    /data/<experiment number>/asci/output
    input
    output
    output/username/working/
    output/username/working
    """

    src_file_path = src_path.split("/")[:5]

        src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path)

    sys.stdout = Logger(log_filename)       # Log all stdout to a log file

    print(textwrap.dedent(f"""
        remote_login = {remote_login}
        experiment_name = {experiment_name}
        src_path = {src_path}
        dest_path = {dest_path}
        pickle_filename = {pickle_filename}
        resume = {resume}
        display_pickle_file = {display_pickle_file}
    # If the resume flag is set, resume the transfer.
    if resume or display_pickle_file:
        # Read the saved transfer state from the locally pickled tree object.
        with open(pickle_filename, "rb") as f: 
        if display_pickle_file:
            sys.exit()

        if resume:
            # Reset nodes at the end of the list with count==0 to unprocessed
            # This is done because we observed a failure that mistakenly reported
            # source tree nodes to have 0 files, so force a recheck of those.
            for node in reversed(tree):
                if node.count == 0:
                    node.processed = False
                else:
                    break
Gary Ruben's avatar
Gary Ruben committed
        # Get the directory tree from the remote server as a list.
        with Connection(remote_login) as c:
            result = c.run(f"find {src_path} -type d")
        remote_dirs = result.stdout.strip().splitlines()
        # Create a tree data structure that represents both source and
        # destination tree paths.
            dest = src.replace(src_path, dest_path)
Gary Ruben's avatar
Gary Ruben committed

Gary Ruben's avatar
Gary Ruben committed
    # Transfer all directory tree nodes.
    for i, node in enumerate(tree):
            pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True)
            # os.chmod(node.dest, 0o770)
            send_directory(node, remote_login, src_path)
Gary Ruben's avatar
Gary Ruben committed
        # pickle the tree to keep a record of the processed state.
        with open(pickle_filename, "wb") as f:

        print(f"Processed {i + 1} of {len(tree)} directory tree nodes")