Commit 795ab35e authored by Gary Ruben (Monash University)'s avatar Gary Ruben (Monash University)
Browse files

changed from embedded paths to commandline commands

parent 0a64bf40
"""
A script to generate and transfer md5 checksum files from a remote/source server
to a local/destination computer. This runs on the local Linux machine, on which
the tape archive system is mounted; in our case, this is a machine at Monash.
A script to generate and transfer md5 checksum files from a remote/source server to a
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
......@@ -16,8 +16,8 @@ Running this creates two files in the same directory as this script file:
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named md5_state.pickle that contains the transfer state
from which failed transfers can be restarted by setting the READ_PICKLE_FILE
file to True.
from which failed transfers can be restarted by including the resume file (-r)
flag.
Known issues
------------
......@@ -26,6 +26,7 @@ Note: The current version of fabric generates harmless warnings. This issue is
"""
import os
import re
import sys
import warnings
from dataclasses import dataclass
......@@ -34,22 +35,11 @@ import subprocess
import pickle
import pprint
import time
import click
import textwrap
from fabric import Connection
READ_PICKLE_FILE = False
EXPERIMENT_NAME = "13660a"
PICKLE_FILENAME = os.path.join(os.path.dirname(__file__), "md5_state.pickle")
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
LOG_FILENAME = os.path.join(
os.path.dirname(__file__),
f"md5-{EXPERIMENT_NAME}-{timestamp}.log"
)
REMOTE_LOGIN = "gary.ruben@monash.edu@sftp2.synchrotron.org.au"
SRC_PATH = f"/data/{EXPERIMENT_NAME}/asci/output"
DEST_PATH = "/home/grub0002/bapcxi/vault/IMBL_2018_Oct_McGillick/output"
@dataclass
class Node:
"""A directory tree node"""
......@@ -60,9 +50,9 @@ class Node:
class Logger(object):
def __init__(self):
def __init__(self, log_filename):
self.terminal = sys.stdout
self.log = open(LOG_FILENAME, "a")
self.log = open(log_filename, "a")
def write(self, message):
self.terminal.write(message)
......@@ -73,7 +63,21 @@ class Logger(object):
self.log.flush()
def send_checksum(node):
# READ_PICKLE_FILE = False
# EXPERIMENT_NAME = "13660a"
# PICKLE_FILENAME = os.path.join(os.path.dirname(__file__), "md5_state.pickle")
# timestamp = time.strftime("%Y-%m-%d-%H%M%S")
# LOG_FILENAME = os.path.join(
# os.path.dirname(__file__),
# f"md5-{EXPERIMENT_NAME}-{timestamp}.log"
# )
# REMOTE_LOGIN = "gary.ruben@monash.edu@sftp2.synchrotron.org.au"
# SRC_PATH = f"/data/{EXPERIMENT_NAME}/asci/output"
# DEST_PATH = "/home/grub0002/bapcxi/vault/IMBL_2018_Oct_McGillick/output"
def send_checksum(node, remote_login, src_path):
"""Checksums all files in the node.src directory and sends these to the
node.dest directory across an ssh connection. The checksum file is named
after the directories trailing the SRC_PATH. Permissions are set to r_x for
......@@ -86,10 +90,13 @@ def send_checksum(node):
e.g. /data/13660a/asci/input
dest: full path to a destination node
e.g. /home/grub0002/bapcxi/vault/imbl2018
remote_login: str
remote login username@url
src_path: str
asci src top-level directory
"""
# Check if there are any files in the node
with Connection(REMOTE_LOGIN) as c:
with Connection(remote_login) as c:
files = c.run(
rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'",
echo=True
......@@ -97,75 +104,169 @@ def send_checksum(node):
files = files.stdout.strip()
node.count = len(files.splitlines())
print(f'Node:{node.src}, file count:{node.count}')
print(f"Node:{node.src}, file count:{node.count}")
if node.count == 0:
# No files at this node, just return
print('No files at node')
print("No files at node")
else:
# Checksum files.
if node.src == SRC_PATH:
if node.src == src_path:
filename = os.path.basename(node.src)
else:
filename = node.src.replace(SRC_PATH+'/', '').replace('/', '_')
filename = node.src.replace(src_path+'/', '').replace('/', '_')
output = subprocess.run(
f"ssh {REMOTE_LOGIN} 'cd {node.src};"
f"ssh {remote_login} 'cd {node.src};"
"md5sum $(find -maxdepth 1 -type f | cut -c3-)'"
f"| cat > {node.dest}/{filename}.md5",
shell=True,
check=True
)
print('stdout:', output.stdout)
print('stderr:', output.stderr)
os.chmod(f'{node.dest}/{filename}.md5', 0o550)
print(f'Checksummed {node.count} files {node.src} -> {node.dest}')
print("stdout:", output.stdout)
print("stderr:", output.stderr)
# os.chmod(f"{node.dest}/{filename}.md5", 0o550)
print(f"Checksummed {node.count} files {node.src} -> {node.dest}")
node.processed = True
if __name__ == "__main__":
sys.stdout = Logger() # Log all stdout to a log file
@click.command()
@click.argument("remote_login")
@click.argument("experiment_name")
@click.argument("src_path", type=click.Path())
@click.argument("dest_path", type=click.Path())
@click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle")
@click.option("-r","resume",is_flag=True, help="If True, continue from current pickle state")
@click.option("-d","display_pickle_file",is_flag=True, help="If True, just show the pickle file state")
def main(
remote_login,
experiment_name,
src_path,
dest_path,
pickle_filename,
resume,
display_pickle_file
):
"""
\b
Example
-------
$ python md5_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input
A script to generate and transfer md5 checksum files from a remote/source server to a
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.
Running this creates two files in the same directory as this script file:
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named md5_state.pickle that contains the transfer state
from which failed transfers can be restarted by including the resume file (-r)
flag.
"""
assert 5 <= len(experiment_name) <= 6
if pickle_filename is None:
pickle_filename = experiment_name + "md5_state.pickle"
path, base = os.path.split(pickle_filename)
if path == "":
pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename)
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
# A hacky way to restart an interrupted transfer is to set
# READ_PICKLE_FILE = True above so that the transfer state is retrieved. By
# default the tree is built from scratch from the remote file system.
if READ_PICKLE_FILE:
log_filename = os.path.join(
os.path.dirname(__file__),
f"md5-{experiment_name}-{timestamp}.log"
)
if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None:
raise Exception("Invalid form for login address")
"""
Possible file name formats:
/data/<experiment number>/asci/input
/data/<experiment number>/asci/output
input
output
output/username/working/
output/username/working
"""
src_file_path = src_path.split("/")[:5]
if src_file_path[0] == "":
# Assume full path specified
assert "/".join(src_file_path[:4]) == f"/data/{experiment_name}/asci"
assert src_file_path[4] == "input" or src_file_path[4] == "output"
else:
assert src_file_path[0] == "input" or src_file_path[0] == "output"
src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path)
sys.stdout = Logger(log_filename) # Log all stdout to a log file
print(textwrap.dedent(f"""
remote_login = {remote_login}
experiment_name = {experiment_name}
src_path = {src_path}
dest_path = {dest_path}
pickle_filename = {pickle_filename}
resume = {resume}
display_pickle_file = {display_pickle_file}
"""))
# If the resume flag is set, resume the transfer.
if resume or display_pickle_file:
# Read the saved transfer state from the locally pickled tree object.
with open(PICKLE_FILENAME, 'rb') as f:
with open(pickle_filename, "rb") as f:
tree = pickle.load(f)
print('tree:')
print("tree:")
pprint.pprint(tree)
# Reset nodes at the end of the list with count==0 to unprocessed
# This is done because we observed a failure that mistakenly reported
# source tree nodes to have 0 files, so force a recheck of those.
for node in reversed(tree):
if node.count == 0:
node.processed = False
else:
break
if display_pickle_file:
sys.exit()
if resume:
# Reset nodes at the end of the list with count==0 to unprocessed
# This is done because we observed a failure that mistakenly reported
# source tree nodes to have 0 files, so force a recheck of those.
for node in reversed(tree):
if node.count == 0:
node.processed = False
else:
break
else:
# Get the directory tree from remote server as a list
with Connection(REMOTE_LOGIN) as c:
result = c.run(f'find {SRC_PATH} -type d')
# Get the directory tree from the remote server as a list.
with Connection(remote_login) as c:
result = c.run(f"find {src_path} -type d")
remote_dirs = result.stdout.strip().splitlines()
# Create a tree data structure that represents both source and
# destination tree paths.
tree = []
for src in remote_dirs:
dest = src.replace(SRC_PATH, DEST_PATH)
dest = src.replace(src_path, dest_path)
tree.append(Node(src, dest))
# Transfer all directory tree nodes
# Transfer all directory tree nodes.
for i, node in enumerate(tree):
if not node.processed:
pathlib.Path(node.dest).mkdir(parents=True, exist_ok=True)
os.chmod(node.dest, 0o770)
send_checksum(node)
pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True)
# os.chmod(node.dest, 0o770)
send_checksum(node, remote_login, src_path)
# pickle the tree to keep a record of the processed state
with open(PICKLE_FILENAME, 'wb') as f:
with open(pickle_filename, "wb") as f:
pickle.dump(tree, f)
print(f"Processed {i + 1} of {len(tree)} directory tree nodes")
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment