Newer
Older

Gary Ruben
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""
A script to generate and transfer md5 checksum files from a remote/source server
to a local/destination computer. This runs on the local Linux machine, on which
the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.
Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu
Running this creates two files in the same directory as this script file:
1. A .log file named based on the start-time timestamp which is a capture of all
stdout activity.
2. A Python pickle file named md5_state.pickle that contains the transfer state
from which failed transfers can be restarted by setting the READ_PICKLE_FILE
file to True.
Known issues
------------
Note: The current version of fabric generates harmless warnings. This issue is
discussed here: https://github.com/paramiko/paramiko/issues/1369
"""
import os
import sys
import warnings
from dataclasses import dataclass
import pathlib
import subprocess
import pickle
import pprint
import time
from fabric import Connection
READ_PICKLE_FILE = False
EXPERIMENT_NAME = "13660a"
PICKLE_FILENAME = os.path.join(os.path.dirname(__file__), "md5_state.pickle")
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
LOG_FILENAME = os.path.join(
os.path.dirname(__file__),
f"md5-{EXPERIMENT_NAME}-{timestamp}.log"
)
REMOTE_LOGIN = "gary.ruben@monash.edu@sftp2.synchrotron.org.au"
SRC_PATH = f"/data/{EXPERIMENT_NAME}/asci/input"
DEST_PATH = "/home/grub0002/bapcxi/vault/IMBL_2018_Oct_McGillick"
@dataclass
class Node:
"""A directory tree node"""
src: str # source tree node path
dest: str # destination tree node path
count: int = None # number of files at the node
processed: bool = False # True iff a node transfer completes
class Logger(object):
def __init__(self):
self.terminal = sys.stdout
self.log = open(LOG_FILENAME, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
self.terminal.flush()
self.log.flush()
def send_checksum(node):
"""Checksums all files in the node.src directory and sends these to the
node.dest directory across an ssh connection. The checksum file is named
after the directories trailing the SRC_PATH. Permissions are set to r_x for
group and owner.
Args:
node: Node object
Contains source and destination directories as follows:
src: full path to a remote node
e.g. /data/13660a/asci/input
dest: full path to a destination node
e.g. /home/grub0002/bapcxi/vault/imbl2018
"""
# Check if there are any files in the node
with Connection(REMOTE_LOGIN) as c:
files = c.run(
rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'",
echo=True
)
files = files.stdout.strip()
node.count = files.count('\n')
print(f'Node:{node.src}, file count:{node.count}')
if node.count == 0:
# No files at this node, just return
print('No files at node')
else:
# Checksum files.
if node.src == SRC_PATH:
filename = os.path.basename(node.src)
else:
filename = node.src.replace(SRC_PATH+'/', '').replace('/', '_')
output = subprocess.run(
f"ssh {REMOTE_LOGIN} 'cd {node.src};"
"md5sum $(find -maxdepth 1 -type f | cut -c3-)'"
f"| cat > {node.dest}/{filename}.md5",
shell=True,
check=True
)
print('stdout:', output.stdout)
print('stderr:', output.stderr)
os.chmod(f'{node.dest}/{filename}.md5', 0o550)
print(f'Checksummed {node.count} files {node.src} -> {node.dest}')
node.processed = True
if __name__ == "__main__":
sys.stdout = Logger() # Log all stdout to a log file
# A hacky way to restart an interrupted transfer is to set
# READ_PICKLE_FILE = True above so that the transfer state is retrieved. By
# default the tree is built from scratch from the remote file system.
if READ_PICKLE_FILE:
# Read the saved transfer state from the locally pickled tree object.
with open(PICKLE_FILENAME, 'rb') as f:
tree = pickle.load(f)
print('tree:')
pprint.pprint(tree)
# Reset nodes at the end of the list with count==0 to unprocessed
# This is done because we observed a failure that mistakenly reported
# source tree nodes to have 0 files, so force a recheck of those.
for node in reversed(tree):
if node.count == 0:
node.processed = False
else:
break
else:
# Get the directory tree from remote server as a list
with Connection(REMOTE_LOGIN) as c:
result = c.run(f'find {SRC_PATH} -type d')
remote_dirs = result.stdout.strip().split('\n')
# Create a tree data structure that represents both source and
# destination tree paths.
tree = []
for src in remote_dirs:
dest = src.replace(SRC_PATH, DEST_PATH)
tree.append(Node(src, dest))
# Transfer all directory tree nodes
for i, node in enumerate(tree):
if not node.processed:
pathlib.Path(node.dest).mkdir(parents=True, exist_ok=True)
os.chmod(node.dest, 0o770)
send_checksum(node)
# pickle the tree to keep a record of the processed state
with open(PICKLE_FILENAME, 'wb') as f:
pickle.dump(tree, f)
print(f"Processed {i + 1} of {len(tree)} directory tree nodes")