Newer
Older

Gary Ruben
committed
"""
A script to generate and transfer md5 checksum files from a remote/source server to a
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.

Gary Ruben
committed
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.
Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu
Running this creates two files in the same directory as this script file:
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named md5_state.pickle that contains the transfer state
from which failed transfers can be restarted by including the resume file (-r)
flag.

Gary Ruben
committed
Known issues
------------
Note: The current version of fabric generates harmless warnings. This issue is
discussed here: https://github.com/paramiko/paramiko/issues/1369
"""
import os
import re

Gary Ruben
committed
import sys
import warnings
from dataclasses import dataclass
import pathlib
import subprocess
import pickle
import pprint
import time
import click
import textwrap

Gary Ruben
committed
from fabric import Connection
@dataclass
class Node:
"""A directory tree node"""
src: str # source tree node path
dest: str # destination tree node path
count: int = None # number of files at the node
processed: bool = False # True iff a node transfer completes
class Logger(object):
def __init__(self, log_filename):

Gary Ruben
committed
self.terminal = sys.stdout
self.log = open(log_filename, "a")

Gary Ruben
committed
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
self.terminal.flush()
self.log.flush()
def send_checksum(node, remote_login, src_path):

Gary Ruben
committed
"""Checksums all files in the node.src directory and sends these to the
node.dest directory across an ssh connection. The checksum file is named
after the directories trailing the SRC_PATH. Permissions are set to r_x for
group and owner.
Args:
node: Node object
Contains source and destination directories as follows:
src: full path to a remote node
e.g. /data/13660a/asci/input
dest: full path to a destination node
e.g. /home/grub0002/bapcxi/vault/imbl2018
remote_login: str
remote login username@url
src_path: str
asci src top-level directory

Gary Ruben
committed
"""
# Check if there are any files in the node
with Connection(remote_login) as c:

Gary Ruben
committed
files = c.run(
rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'",
echo=True
)
files = files.stdout.strip()

Gary Ruben
committed
print(f"Node:{node.src}, file count:{node.count}")

Gary Ruben
committed
if node.count == 0:
# No files at this node, just return
print("No files at node")

Gary Ruben
committed
else:
# Checksum files.
if node.src == src_path:

Gary Ruben
committed
filename = os.path.basename(node.src)
else:
filename = node.src.replace(src_path+'/', '').replace('/', '_')

Gary Ruben
committed
output = subprocess.run(
Gary Ruben (Monash University)
committed
f"ssh {remote_login} 'cd {node.src}; md5sum *'"

Gary Ruben
committed
f"| cat > {node.dest}/{filename}.md5",
shell=True,
check=True
)
print("stdout:", output.stdout)
print("stderr:", output.stderr)
# os.chmod(f"{node.dest}/{filename}.md5", 0o550)
print(f"Checksummed {node.count} files {node.src} -> {node.dest}")

Gary Ruben
committed
node.processed = True
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@click.command()
@click.argument("remote_login")
@click.argument("experiment_name")
@click.argument("src_path", type=click.Path())
@click.argument("dest_path", type=click.Path())
@click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle")
@click.option("-r","resume",is_flag=True, help="If True, continue from current pickle state")
@click.option("-d","display_pickle_file",is_flag=True, help="If True, just show the pickle file state")
def main(
remote_login,
experiment_name,
src_path,
dest_path,
pickle_filename,
resume,
display_pickle_file
):
"""
\b
Example
-------
$ python md5_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input
A script to generate and transfer md5 checksum files from a remote/source server to a
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.
Running this creates two files in the same directory as this script file:
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named md5_state.pickle that contains the transfer state
from which failed transfers can be restarted by including the resume file (-r)
flag.
"""
assert 5 <= len(experiment_name) <= 6
if pickle_filename is None:
pickle_filename = experiment_name + "md5_state.pickle"
path, base = os.path.split(pickle_filename)
if path == "":
pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename)
timestamp = time.strftime("%Y-%m-%d-%H%M%S")

Gary Ruben
committed
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
log_filename = os.path.join(
os.path.dirname(__file__),
f"md5-{experiment_name}-{timestamp}.log"
)
if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None:
raise Exception("Invalid form for login address")
"""
Possible file name formats:
/data/<experiment number>/asci/input
/data/<experiment number>/asci/output
input
output
output/username/working/
output/username/working
"""
src_file_path = src_path.split("/")[:5]
if src_file_path[0] == "":
# Assume full path specified
assert "/".join(src_file_path[:4]) == f"/data/{experiment_name}/asci"
assert src_file_path[4] == "input" or src_file_path[4] == "output"
else:
assert src_file_path[0] == "input" or src_file_path[0] == "output"
src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path)
sys.stdout = Logger(log_filename) # Log all stdout to a log file
print(textwrap.dedent(f"""
remote_login = {remote_login}
experiment_name = {experiment_name}
src_path = {src_path}
dest_path = {dest_path}
pickle_filename = {pickle_filename}
resume = {resume}
display_pickle_file = {display_pickle_file}
"""))
# If the resume flag is set, resume the transfer.
if resume or display_pickle_file:

Gary Ruben
committed
# Read the saved transfer state from the locally pickled tree object.
with open(pickle_filename, "rb") as f:

Gary Ruben
committed
tree = pickle.load(f)
print("tree:")

Gary Ruben
committed
pprint.pprint(tree)
if display_pickle_file:
sys.exit()
if resume:
# Reset nodes at the end of the list with count==0 to unprocessed
# This is done because we observed a failure that mistakenly reported
# source tree nodes to have 0 files, so force a recheck of those.
for node in reversed(tree):
if node.count == 0:
node.processed = False
else:
break

Gary Ruben
committed
else:
# Get the directory tree from the remote server as a list.
with Connection(remote_login) as c:
result = c.run(f"find {src_path} -type d")

Gary Ruben
committed
remote_dirs = result.stdout.strip().splitlines()

Gary Ruben
committed
# Create a tree data structure that represents both source and
# destination tree paths.
tree = []
for src in remote_dirs:
dest = src.replace(src_path, dest_path)

Gary Ruben
committed
tree.append(Node(src, dest))
# Transfer all directory tree nodes.

Gary Ruben
committed
for i, node in enumerate(tree):
if not node.processed:
pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True)
# os.chmod(node.dest, 0o770)
send_checksum(node, remote_login, src_path)

Gary Ruben
committed
# pickle the tree to keep a record of the processed state
with open(pickle_filename, "wb") as f:

Gary Ruben
committed
pickle.dump(tree, f)
print(f"Processed {i + 1} of {len(tree)} directory tree nodes")
if __name__ == "__main__":
main()