Commit 0e20caa0 authored by Gary Ruben (Monash University)'s avatar Gary Ruben (Monash University)
Browse files

Changed to use commandline instead of hardcoded options

parent 9c8b9a60
"""
A script to transfer a tree of data files from a remote/source server to a
local/destination computer. This runs on the local Linux machine, on which the
tape archive system is mounted; in our case, this is a machine at Monash. Prior
to running this an ssh key pair must be shared between the systems. See
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
......@@ -11,25 +11,25 @@ and uses the fabric module.
Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu
linda.croton@monash.edu
Note that current version creates two files in the same directory as this script
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named tree_state.pickle that contains the transfer state
from which failed transfers can be restarted by setting the READ_PICKLE_FILE
from which failed transfers can be restarted by setting the read_pickle_file
file to True.
Known issues
------------
Note: The current version of fabric generates harmless warnings. This issue is
discussed
here: https://github.com/paramiko/paramiko/issues/1369
discussed here: https://github.com/paramiko/paramiko/issues/1369
Notes
-----
This is a possible option for checksumming:
https://stackoverflow.com/q/45819356/
KERNEL_CHECKSUM=$(cpio --to-stdout -i kernel.fat16 < archive.cpio | sha256sum | awk '{print $1}')
KERNEL_CHECKSUM=$(cpio --to-stdout -i kernel.fat16 < archive.cpio | sha256sum | awk "{print $1}")
We used the following command to check whether a transfer was successful
immediately prior to a failure of the ASCI filesystem.
......@@ -42,6 +42,7 @@ http://docs.pyinvoke.org/en/1.2/api/runners.html#invoke.runners.Runner.run
"""
import os
import re
import sys
import warnings
from dataclasses import dataclass
......@@ -50,22 +51,11 @@ import subprocess
import pickle
import pprint
import time
import click
import textwrap
from fabric import Connection
READ_PICKLE_FILE = False
EXPERIMENT_NAME = "13660a"
PICKLE_FILENAME = os.path.join(os.path.dirname(__file__), "tree_state.pickle")
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
LOG_FILENAME = os.path.join(
os.path.dirname(__file__),
f"{EXPERIMENT_NAME}-{timestamp}.log"
)
REMOTE_LOGIN = "gary.ruben@monash.edu@sftp1.synchrotron.org.au"
SRC_PATH = f"/data/{EXPERIMENT_NAME}/asci/output"
DEST_PATH = "/home/grub0002/bapcxi/vault/IMBL_2018_Oct_McGillick/output"
@dataclass
class Node:
"""A directory tree node"""
......@@ -76,9 +66,9 @@ class Node:
class Logger(object):
def __init__(self):
def __init__(self, log_filename):
self.terminal = sys.stdout
self.log = open(LOG_FILENAME, "a")
self.log = open(log_filename, "a")
def write(self, message):
self.terminal.write(message)
......@@ -96,7 +86,7 @@ def send_directory(node):
Different methods are used for single versus multiple files. For single
files, scp is used. For multiple files cpio is used to tar the files into a
single tarball. The destination tarball is named after the directories
trailing SRC_PATH. Permissions are set to r_x for group and owner.
trailing src_path. Permissions are set to r_x for group and owner.
Args:
node: Node object
......@@ -109,7 +99,7 @@ def send_directory(node):
"""
# Check if there are any files in the node.
with Connection(REMOTE_LOGIN) as c:
with Connection(remote_login) as c:
files = c.run(
rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'",
echo=True
......@@ -117,53 +107,141 @@ def send_directory(node):
files = files.stdout.strip()
node.count = len(files.splitlines())
print(f'Node:{node.src}, file count:{node.count}')
print(f"Node:{node.src}, file count:{node.count}")
if node.count == 0:
# No files at this node, just return
print('No files to transfer')
print("No files to transfer")
elif node.count == 1:
# Only one file. Just copy unchanged.
output = subprocess.run(
f"scp -q {REMOTE_LOGIN}:{node.src}/{files} {node.dest}",
f"scp -q {remote_login}:{node.src}/{files} {node.dest}",
shell=True,
check=True
)
print('stdout:', output.stdout)
print('stderr:', output.stderr)
os.chmod(f'{node.dest}/{files}', 0o550)
print(f'Transferred single file {node.src} -> {node.dest}')
print("stdout:", output.stdout)
print("stderr:", output.stderr)
os.chmod(f"{node.dest}/{files}", 0o550)
print(f"Transferred single file {node.src} -> {node.dest}")
else:
# More than one file. Transfer all files to a tarball.
if node.src == SRC_PATH:
if node.src == src_path:
filename = os.path.basename(node.src)
else:
filename = node.src.replace(SRC_PATH+'/', '').replace('/', '_')
filename = node.src.replace(src_path+"/", "").replace("/", "_")
output = subprocess.run(
f"ssh {REMOTE_LOGIN} 'cd {node.src};"
f"ssh {remote_login} 'cd {node.src};"
f"find -maxdepth 1 -type f -print0 |"
f"cpio -o -H ustar -0' | cat > {node.dest}/{filename}.tar",
shell=True,
check=True
)
print('stdout:', output.stdout)
print('stderr:', output.stderr)
os.chmod(f'{node.dest}/{filename}.tar', 0o550)
print(f'Transferred {node.count} files {node.src} -> {node.dest}')
print("stdout:", output.stdout)
print("stderr:", output.stderr)
os.chmod(f"{node.dest}/{filename}.tar", 0o550)
print(f"Transferred {node.count} files {node.src} -> {node.dest}")
node.processed = True
if __name__ == "__main__":
sys.stdout = Logger() # Log all stdout to a log file
@click.command()
@click.argument("remote_login")
@click.argument("experiment_name")
@click.argument("src_path", type=click.Path())
@click.argument("dest_path", type=click.Path())
@click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle")
@click.option("-r","read_pickle_file",is_flag=True, help="If True, continue from current pickle state")
def main(
remote_login,
experiment_name,
src_path,
dest_path,
pickle_filename,
read_pickle_file
):
"""
\b
Example
-------
$ python asci_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input
A script to transfer a tree of data files from a remote/source server to a
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.
Note that current version creates two files in the same directory as this script
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named tree_state.pickle that contains the transfer state
from which failed transfers can be restarted by setting the read_pickle_file
file to True.
"""
assert 5 <= len(experiment_name) <= 6
if pickle_filename is None:
pickle_filename = experiment_name+".pickle"
path, base = os.path.split(pickle_filename)
if path == "":
pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename)
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
log_filename = os.path.join(
os.path.dirname(__file__),
f"{experiment_name}-{timestamp}.log"
)
if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None:
raise Exception("Invalid form for login address")
# A hacky way to restart an interrupted transfer is to set
# READ_PICKLE_FILE = True above so that the transfer state is retrieved.
if READ_PICKLE_FILE:
"""
Possible file name formats:
/data/<experiment number>/asci/input
/data/<experiment number>/asci/output
input
output
output/username/working/
output/username/working
"""
src_file_path = src_path.split("/")[:5]
if src_file_path[0] == "":
# Assume full path specified
assert "/".join(src_file_path[:4]) == f"/data/{experiment_name}/asci"
assert src_file_path[4] == "input" or src_file_path[4] == "output"
else:
assert src_file_path[0] == "input" or src_file_path[0] == "output"
src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path)
# remote_login = "gary.ruben@monash.edu@sftp1.synchrotron.org.au"
# src_path = f"/data/{experiment_name}/asci/output"
# dest_path = "/home/grub0002/bapcxi/vault/IMBL_2018_Oct_McGillick/output"
sys.stdout = Logger(log_filename) # Log all stdout to a log file
print(textwrap.dedent(f"""
remote_login = {remote_login}
experiment_name = {experiment_name}
src_path = {src_path}
dest_path = {dest_path}
pickle_filename = {pickle_filename}
read_pickle_file = {read_pickle_file}
"""))
# If the read_pickle_file flag is set, resume the transfer.
if read_pickle_file:
# Read the saved transfer state from the locally pickled tree object.
with open(PICKLE_FILENAME, 'rb') as f:
with open(pickle_filename, "rb") as f:
tree = pickle.load(f)
print('tree:')
print("tree:")
pprint.pprint(tree)
# Reset nodes at the end of the list with count==0 to unprocessed
......@@ -176,26 +254,32 @@ if __name__ == "__main__":
break
else:
# Get the directory tree from the remote server as a list.
with Connection(REMOTE_LOGIN) as c:
result = c.run(f'find {SRC_PATH} -type d')
with Connection(remote_login) as c:
result = c.run(f"find {src_path} -type d")
remote_dirs = result.stdout.strip().splitlines()
# Create a tree data structure that represents both source and
# destination tree paths.
tree = []
for src in remote_dirs:
dest = src.replace(SRC_PATH, DEST_PATH)
dest = src.replace(src_path, dest_path)
tree.append(Node(src, dest))
# Transfer all directory tree nodes.
for i, node in enumerate(tree):
if not node.processed:
pathlib.Path(node.dest).mkdir(parents=True, exist_ok=True)
print(pathlib.Path(node.dest))
1/0
pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True)
os.chmod(node.dest, 0o770)
send_directory(node)
# pickle the tree to keep a record of the processed state.
with open(PICKLE_FILENAME, 'wb') as f:
with open(pickle_filename, "wb") as f:
pickle.dump(tree, f)
print(f"Processed {i + 1} of {len(tree)} directory tree nodes")
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment