asci_to_vault.py 10.6 KB
Newer Older
Gary Ruben's avatar
Gary Ruben committed
1
"""
2
A script to transfer a tree of data files from a remote/source server to a
3
4
5
local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
which the tape archive system is mounted; in our case, this is a machine at Monash.
Prior to running this an ssh key pair must be shared between the systems. See
6
https://confluence.apps.monash.edu/display/XI/ssh+between+MASSIVE+filesystem+and+ASCI
7
8
9
10
11
12
13
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.

Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu
14
linda.croton@monash.edu
15
16
17
18
19

Note that current version creates two files in the same directory as this script
1. A .log file named based on the start-time timestamp which is a capture of all
   stdout activity.
2. A Python pickle file named tree_state.pickle that contains the transfer state
20
   from which failed transfers can be restarted by setting the resume
21
22
23
24
   file to True.

Known issues
------------
25
26
Note: Some versions of fabric generate a harmless warning, which can be ignored. This
      issue is discussed here: https://github.com/paramiko/paramiko/issues/1369
27
28
29

Notes
-----
30
This is a possible option for checksumming:
31
https://stackoverflow.com/q/45819356/
32
KERNEL_CHECKSUM=$(cpio --to-stdout -i kernel.fat16 < archive.cpio  | sha256sum | awk "{print $1}")
Gary Ruben's avatar
Gary Ruben committed
33

34
We used the following command to check whether a transfer was successful
35
36
37
38
39
immediately prior to a failure of the ASCI filesystem.
The command to count the number of files in a tarball
$ tar -tf Lamb_Lung_Microfil_CT_18011B_right_CT.tar | wc -l
75920

Gary Ruben's avatar
Gary Ruben committed
40
41
"""
import os
42
import re
43
import sys
Gary Ruben's avatar
Gary Ruben committed
44
45
46
import warnings
from dataclasses import dataclass
import pathlib
47
import subprocess
48
49
import pickle
import pprint
50
import time
51
52
import click
import textwrap
53
from fabric import Connection
54

Gary Ruben's avatar
Gary Ruben committed
55

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def escape_parens(path):
    """ Explicitly escape parentheses. This is required to work around a bug in Fabric's
    Invoke module. See my question on Stackoverflow:
    https://stackoverflow.com/q/63225018/607587
    The recommended workaround, until Fabric fixes the bug, is to just "manually escape
    the parentheses"

    I used this method: https://stackoverflow.com/a/23563806/607587

    """
    replacements = {"(":"\(", ")":"\)"}
    escaped_path = "".join([replacements.get(c, c) for c in path])
    return escaped_path


def escape_path(path):
    """ Explicitly escape parentheses AND spaces.
    I used this method: https://stackoverflow.com/a/23563806/607587

    """
    # kludge; first (un)escape any already escaped characters
    unreplacements = {"\(":"(", "\)":")", "\ ":" "}
    unescaped_path = "".join([unreplacements.get(c, c) for c in path])
    # Now escape unescaped spaces, plus any unescaped parens
    replacements = {"(":"\(", ")":"\)", " ":"\ "}
    escaped_path = "".join([replacements.get(c, c) for c in unescaped_path])
    return escaped_path


Gary Ruben's avatar
Gary Ruben committed
85
86
@dataclass
class Node:
87
88
89
90
91
    """A directory tree node"""
    src: str                    # source tree node path
    dest: str                   # destination tree node path
    count: int = None           # number of files at the node
    processed: bool = False     # True iff a node transfer completes
Gary Ruben's avatar
Gary Ruben committed
92
93


94
class Logger(object):
95
    def __init__(self, log_filename):
96
        self.terminal = sys.stdout
97
        self.log = open(log_filename, "a")
98
99
100
101
102
103
104
105
106
107

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)  

    def flush(self):
        self.terminal.flush()
        self.log.flush()


108
def send_directory(node, remote_login, src_path):
Gary Ruben's avatar
Gary Ruben committed
109
    """Sends all files in the node.src directory to the node.dest directory
Gary Ruben's avatar
Gary Ruben committed
110
111
112
113
    across an ssh connection.

    Different methods are used for single versus multiple files. For single
    files, scp is used. For multiple files cpio is used to tar the files into a
Gary Ruben's avatar
Gary Ruben committed
114
    single tarball. The destination tarball is named after the directories
115
    trailing src_path. Permissions are set to r_x for group and owner.
Gary Ruben's avatar
Gary Ruben committed
116
117
118

    Args:
        node: Node object
Gary Ruben's avatar
Gary Ruben committed
119
            Contains source and destination directory information as follows:
Gary Ruben's avatar
Gary Ruben committed
120
121
122
123
            src: full path to a remote node
                 e.g. /data/13660a/asci/input
            dest: full path to a destination node
                  e.g. /home/grub0002/bapcxi/vault/imbl2018
Gary Ruben's avatar
Gary Ruben committed
124
            count: number of files at the remote node
125
126
127
128
        remote_login: str
            remote login username@url
        src_path: str
            asci src top-level directory
Gary Ruben's avatar
Gary Ruben committed
129
130

    """
Gary Ruben's avatar
Gary Ruben committed
131
    # Check if there are any files in the node.
132
    with Connection(remote_login) as c:
133
134
135
136
        with c.cd(escape_parens(node.src)):
            result = c.run(r"nice find -maxdepth 1 -type f -printf '%f\n'", echo=True)

    files = result.stdout.strip()
Gary Ruben's avatar
Gary Ruben committed
137
    node.count = len(files.splitlines())
138

139
    print(f"Node:{node.src}, file count:{node.count}")
140
    if node.count == 0:
141
        # No files at this node, just return
142
        print("No files to transfer")
Gary Ruben's avatar
Gary Ruben committed
143
    else:
144
        # At least one file. Transfer all files to a tarball.
145
        if node.src == src_path:
146
147
            filename = os.path.basename(node.src)
        else:
148
            filename = node.src.replace(src_path + "/", "").replace("/", "_")
149

150
151
152
        cmd_src = escape_path(node.src)
        cmd_dest = escape_path(node.dest)
        cmd_filename = escape_path(filename)
153
        output = subprocess.run(
154
155
156
            f'ssh {remote_login} "cd {cmd_src};'
            f'nice find -maxdepth 1 -type f -printf \'%f\\0\' |'
            f'xargs -0 tar -cf - " | cat > {cmd_dest}/{cmd_filename}.tar',
157
158
            shell=True,
            check=True
159
        )
160
161
        print("stdout:", output.stdout)
        print("stderr:", output.stderr)
162

163
        # os.chmod(f"{node.dest}/{filename}.tar", 0o550)
164
        print(f"Transferred {node.count} files {node.src} -> {node.dest}")
Gary Ruben's avatar
Gary Ruben committed
165

166
    node.processed = True
Gary Ruben's avatar
Gary Ruben committed
167

168

169
170
171
172
173
174
@click.command()
@click.argument("remote_login")
@click.argument("experiment_name")
@click.argument("src_path", type=click.Path())
@click.argument("dest_path", type=click.Path())
@click.option("-p","pickle_filename", help="Pickle filename, e.g. 'foo.pickle' (default = experiment_name.pickle")
175
176
@click.option("-r","resume",is_flag=True, help="If True, continue from current pickle state")
@click.option("-d","display_pickle_file",is_flag=True, help="If True, just show the pickle file state")
177
178
179
180
181
182
def main(
    remote_login,
    experiment_name,
    src_path,
    dest_path,
    pickle_filename,
183
184
    resume,
    display_pickle_file
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
):
    """
    \b
    Example
    -------
    $ python asci_to_vault.py gary.ruben@monash.edu@sftp1.synchrotron.org.au 15223 /data/15223/asci/input /home/gruben/vault/vault/IMBL/IMBL_2019_Nov_Croton/input

    A script to transfer a tree of data files from a remote/source server to a
    local/destination computer. This runs on a local Linux machine or the eResearch dtn, on
    which the tape archive system is mounted; in our case, this is a machine at Monash.
    Prior to running this an ssh key pair must be shared between the systems. See
    https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
    for details on how to do this between a Monash Linux machine and ASCI
    (Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
    and uses the fabric module.

    Note that current version creates two files in the same directory as this script
    1. A .log file named based on the start-time timestamp which is a capture of all
    stdout activity.
    2. A Python pickle file named tree_state.pickle that contains the transfer state
205
    from which failed transfers can be restarted by setting the resume
206
207
208
    file to True.

    """
209
    assert 4 <= len(experiment_name) <= 6
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
    if pickle_filename is None:
        pickle_filename = experiment_name+".pickle"

    path, base = os.path.split(pickle_filename)

    if path == "":
        pickle_filename = os.path.join(os.path.dirname(__file__), pickle_filename)

    timestamp = time.strftime("%Y-%m-%d-%H%M%S")

    log_filename = os.path.join(
        os.path.dirname(__file__),
        f"{experiment_name}-{timestamp}.log"
    )

    if re.fullmatch(r"[a-zA-z0-9_\-\.@]+@[a-zA-Z0-9_\-\.]+", remote_login) is None:
        raise Exception("Invalid form for login address")
227

228
229
230
231
232
233
234
235
236
237
238
239
    """
    Possible file name formats:
    /data/<experiment number>/asci/input
    /data/<experiment number>/asci/output
    input
    output
    output/username/working/
    output/username/working
    """

    src_file_path = src_path.split("/")[:5]

240
    if src_file_path[0] != "":
241
242
243
244
245
246
247
248
249
250
        src_path = os.path.join(f"/data/{experiment_name}/asci/", *src_file_path)

    sys.stdout = Logger(log_filename)       # Log all stdout to a log file

    print(textwrap.dedent(f"""
        remote_login = {remote_login}
        experiment_name = {experiment_name}
        src_path = {src_path}
        dest_path = {dest_path}
        pickle_filename = {pickle_filename}
251
252
        resume = {resume}
        display_pickle_file = {display_pickle_file}
253
254
    """))

255
256
    # If the resume flag is set, resume the transfer.
    if resume or display_pickle_file:
257
        # Read the saved transfer state from the locally pickled tree object.
258
        with open(pickle_filename, "rb") as f: 
259
            tree = pickle.load(f)
260
        print("tree:")
261
262
        pprint.pprint(tree)

263
264
265
266
267
268
269
270
271
272
273
274
        if display_pickle_file:
            sys.exit()

        if resume:
            # Reset nodes at the end of the list with count==0 to unprocessed
            # This is done because we observed a failure that mistakenly reported
            # source tree nodes to have 0 files, so force a recheck of those.
            for node in reversed(tree):
                if node.count == 0:
                    node.processed = False
                else:
                    break
275
    else:
Gary Ruben's avatar
Gary Ruben committed
276
        # Get the directory tree from the remote server as a list.
277
278
        with Connection(remote_login) as c:
            result = c.run(f"find {src_path} -type d")
279
        remote_dirs = result.stdout.strip().splitlines()
280

281
282
        # Create a tree data structure that represents both source and
        # destination tree paths.
283
284
        tree = []
        for src in remote_dirs:
285
            dest = src.replace(src_path, dest_path)
286
            tree.append(Node(src, dest))
Gary Ruben's avatar
Gary Ruben committed
287

Gary Ruben's avatar
Gary Ruben committed
288
    # Transfer all directory tree nodes.
289
    for i, node in enumerate(tree):
290
        if not node.processed:
291
            pathlib.Path(node.dest).mkdir(mode=0o770, parents=True, exist_ok=True)
292
293
            # os.chmod(node.dest, 0o770)
            send_directory(node, remote_login, src_path)
294

Gary Ruben's avatar
Gary Ruben committed
295
        # pickle the tree to keep a record of the processed state.
296
        with open(pickle_filename, "wb") as f:
297
            pickle.dump(tree, f)
298
299

        print(f"Processed {i + 1} of {len(tree)} directory tree nodes")
300
301
302
303


if __name__ == "__main__":
    main()