asci_to_vault.py 7.5 KB
Newer Older
Gary Ruben's avatar
Gary Ruben committed
1
"""
2
3
4
5
A script to transfer a tree of data files from a remote/source server to a
local/destination computer. This runs on the local Linux machine, on which the
tape archive system is mounted; in our case, this is a machine at Monash. Prior
to running this an ssh key pair must be shared between the systems. See
Gary Ruben's avatar
Gary Ruben committed
6
https://confluence.apps.monash.edu/display/XI/Australian+Synchrotron
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for details on how to do this between a Monash Linux machine and ASCI
(Australian Synchrotron Compute Infrastructure). Requires Python 3.7 or higher
and uses the fabric module.

Authors:
gary.ruben@monash.edu
michelle.croughan@monash.edu

Note that current version creates two files in the same directory as this script
1. A .log file named based on the start-time timestamp which is a capture of all
   stdout activity.
2. A Python pickle file named tree_state.pickle that contains the transfer state
   from which failed transfers can be restarted by setting the READ_PICKLE_FILE
   file to True.

Known issues
------------
24
25
26
Note: The current version of fabric generates harmless warnings. This issue is
      discussed
 here: https://github.com/paramiko/paramiko/issues/1369
27
28
29

Notes
-----
30
This is a possible option for checksumming:
31
32
https://stackoverflow.com/q/45819356/
KERNEL_CHECKSUM=$(cpio --to-stdout -i kernel.fat16 < archive.cpio  | sha256sum | awk '{print $1}')
Gary Ruben's avatar
Gary Ruben committed
33

34
We used the following command to check whether a transfer was successful
35
36
37
38
39
40
41
42
immediately prior to a failure of the ASCI filesystem.
The command to count the number of files in a tarball
$ tar -tf Lamb_Lung_Microfil_CT_18011B_right_CT.tar | wc -l
75920

Keyword arguments for fabric's Connection.run() are documented here:
http://docs.pyinvoke.org/en/1.2/api/runners.html#invoke.runners.Runner.run

Gary Ruben's avatar
Gary Ruben committed
43
44
"""
import os
45
import sys
Gary Ruben's avatar
Gary Ruben committed
46
47
48
import warnings
from dataclasses import dataclass
import pathlib
49
import subprocess
50
51
import pickle
import pprint
52
import time
53
from fabric import Connection
54

Gary Ruben's avatar
Gary Ruben committed
55

56
READ_PICKLE_FILE = False
57
EXPERIMENT_NAME = "13660b"
58
59
PICKLE_FILENAME = os.path.join(os.path.dirname(__file__), "tree_state.pickle")
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
60
61
62
63
LOG_FILENAME = os.path.join(
    os.path.dirname(__file__),
    f"{EXPERIMENT_NAME}-{timestamp}.log"
)
64
REMOTE_LOGIN = "gary.ruben@monash.edu@sftp1.synchrotron.org.au"
65
SRC_PATH = f"/data/{EXPERIMENT_NAME}/asci/output"
66
DEST_PATH = "/home/grub0002/bapcxi/vault/IMBL_2018_Oct_McGillick"
Gary Ruben's avatar
Gary Ruben committed
67
68
69
70


@dataclass
class Node:
71
72
73
74
75
    """A directory tree node"""
    src: str                    # source tree node path
    dest: str                   # destination tree node path
    count: int = None           # number of files at the node
    processed: bool = False     # True iff a node transfer completes
Gary Ruben's avatar
Gary Ruben committed
76
77


78
79
80
81
82
83
84
85
86
87
88
89
90
91
class Logger(object):
    def __init__(self):
        self.terminal = sys.stdout
        self.log = open(LOG_FILENAME, "a")

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)  

    def flush(self):
        self.terminal.flush()
        self.log.flush()


Gary Ruben's avatar
Gary Ruben committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
def tar_and_send_directory(node):
    """Sends all files in the node.src directory to the node.dest directory
    across an ssh connection using the cpio command to tar the files into a
    single tarball. The destination tarball is named after the directories
    trailing the SRC_PATH. Permissions are set to r_x for group and owner.

    Args:
        node: Node object
            Contains source and destination directories as follows:
            src: full path to a remote node
                 e.g. /data/13660a/asci/input
            dest: full path to a destination node
                  e.g. /home/grub0002/bapcxi/vault/imbl2018

    """
107
    # Check if there are any files in the node
108
    # add check to make sure connection is working
109
    with Connection(REMOTE_LOGIN) as c:
110
        files = c.run(
111
            rf"cd {node.src}; find -maxdepth 1 -type f -printf '%f\n'",
112
113
            echo=True
        )
114
        files = files.stdout.strip()
115
116
117
118
        # if not c.is_connected():
        #     print(f'Connection failed on {node.src}')
        #     node.error = "Connection failure on initial file find"
        #     return
119
    node.count = files.count('\n')
120

121
122
    print(f'Node:{node.src}, file count:{node.count}')
    if node.count == 0:
123
        # No files at this node, just return
124
        print('No files to transfer')
125
    elif node.count == 1:
126
        # Only one file. No need to tar. Just copy unchanged.
127
        output = subprocess.run(
128
            f"ssh {REMOTE_LOGIN} 'cd {node.src};"
129
            f"find -maxdepth 1 -type f | cpio -o' |"
130
131
132
            f"cat > {node.dest}/{files}",
            shell=True,
            check=True
133
        )
134
135
        print('stdout:', output.stdout)
        print('stderr:', output.stderr)
136
        os.chmod(f'{node.dest}/{files}', 0o550)
137
        print(f'Transferred single file {node.src} -> {node.dest}')
Gary Ruben's avatar
Gary Ruben committed
138
    else:
139
140
141
142
143
144
        # More than one file. Transfer all files to a tarball.
        if node.src == SRC_PATH:
            filename = os.path.basename(node.src)
        else:
            filename = node.src.replace(SRC_PATH+'/', '').replace('/', '_')

145
        output = subprocess.run(
146
            f"ssh {REMOTE_LOGIN} 'cd {node.src};"
147
            f"find -maxdepth 1 -type f -print0 |"
148
            f"cpio -o -H ustar -0' | cat > {node.dest}/{filename}.tar",
149
150
151
            # f"ssh {REMOTE_LOGIN} 'cd {node.src};"
            # rf"find -maxdepth 1 -type f -printf '%f\n' -print0 | tee >(md5sum >)"
            # f"cpio -o -H ustar -0' | cat > {node.dest}/{filename}.tar",
152
153
            shell=True,
            check=True
154
        )
155
156
        print('stdout:', output.stdout)
        print('stderr:', output.stderr)
157
        os.chmod(f'{node.dest}/{filename}.tar', 0o550)
158
        print(f'Transferred {node.count} files {node.src} -> {node.dest}')
Gary Ruben's avatar
Gary Ruben committed
159

160
    node.processed = True
Gary Ruben's avatar
Gary Ruben committed
161

162

Gary Ruben's avatar
Gary Ruben committed
163
if __name__ == "__main__":
164
    sys.stdout = Logger()       # Log all stdout to a log file
165

166
167
168
    # A hacky way to restart an interrupted transfer is to set
    # READ_PICKLE_FILE = True above so that the transfer state is retrieved. By
    # default the tree is built from scratch from the remote file system.
169
170
171
172
173
174
175
    if READ_PICKLE_FILE:
        # Read the saved transfer state from the locally pickled tree object.
        with open(PICKLE_FILENAME, 'rb') as f: 
            tree = pickle.load(f)
        print('tree:')
        pprint.pprint(tree)

176
177
178
179
        # Reset nodes at the end of the list with count==0 to unprocessed
        # This is done because we observed a failure that mistakenly reported
        # source tree nodes to have 0 files, so force a recheck of those.
        for node in reversed(tree):
180
181
            if node.count == 0:
                node.processed = False
182
183
            else:
                break
184
    else:
185
186
187
188
189
        # Get the directory tree from remote server as a list
        with Connection(REMOTE_LOGIN) as c:
            result = c.run(f'find {SRC_PATH} -type d')
        remote_dirs = result.stdout.strip().split('\n')

190
191
        # Create a tree data structure that represents both source and
        # destination tree paths.
192
193
194
195
        tree = []
        for src in remote_dirs:
            dest = src.replace(SRC_PATH, DEST_PATH)
            tree.append(Node(src, dest))
Gary Ruben's avatar
Gary Ruben committed
196
197

    # Transfer all directory tree nodes
198
    for i, node in enumerate(tree):
199
200
201
202
203
204
205
206
        if not node.processed:
            pathlib.Path(node.dest).mkdir(parents=True, exist_ok=True)
            os.chmod(node.dest, 0o770)
            tar_and_send_directory(node)

        # pickle the tree to keep a record of the processed state
        with open(PICKLE_FILENAME, 'wb') as f:
            pickle.dump(tree, f)
207
208

        print(f"Processed {i + 1} of {len(tree)} directory tree nodes")