__init__.py 11.7 KB
Newer Older
Chris Hines's avatar
Chris Hines committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/python
import os
import sys
import subprocess
import argparse
import getpass


class SlurmJob():
    jobid=None
    jobname=None
    jobstate=None

    def __init__(self,jobid,jobname,jobstate):
        self.jobid=jobid
        self.jobname=jobname
        self.jobstate=jobstate

Chris Hines's avatar
Chris Hines committed
19
20
21
class SmuxConnectionError(Exception):
    pass

Chris Hines's avatar
Chris Hines committed
22
class Smux():
Chris Hines's avatar
Chris Hines committed
23
    slurm_script=b"""#!/bin/bash
Chris Hines's avatar
Chris Hines committed
24
25
26
27
28
29
30
31
32
33
34
35
tmux new-session -d -s $SLURM_JOB_NAME bash
# determine the process id of the tmux server
pid=$( /bin/ps x | /bin/grep -i "[t]mux new-session -d -s" | sed 's/^\ *//' | cut -f 1 -d " " )
ps x
# Sleep until the tmux server exits
while [ -e /proc/$pid ]; do sleep 5; done
"""
    ALLUSERS=False
    programname='smux'
    @classmethod
    def get_node(cls,jobid):
        output = subprocess.check_output(['squeue','--job',"%s"%jobid,'-o','%B','-h'])
Chris Hines's avatar
Chris Hines committed
36
        return output.decode('utf-8').strip()
Chris Hines's avatar
Chris Hines committed
37
38
39
40

    @classmethod
    def get_job_name(cls,jobid):
        output = subprocess.check_output(['squeue','--job',"%s"%jobid,'-o','%j','-h'])
Chris Hines's avatar
Chris Hines committed
41
        return output.decode('utf-8').strip()
Chris Hines's avatar
Chris Hines committed
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
        
    @classmethod
    def whyAreWeWaiting(cls,args):
        print("Sorry, I haven't written this function yet")


    @classmethod
    def get_job_list(cls):
        user=getpass.getuser()
        if not cls.ALLUSERS:
            p = subprocess.Popen(['squeue','-u','{}'.format(user),'--noheader','-o','%A,%j,%t'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        else:
            p = subprocess.Popen(['squeue','--noheader','-o','%A,%j,%t'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        (stdout,stderr) = p.communicate()
        jobs=[]
        for l in stdout.splitlines():
Chris Hines's avatar
Chris Hines committed
58
            data = l.decode("utf-8").split(',')
Chris Hines's avatar
Chris Hines committed
59
60
61
62
63
64
65
66
67
68
            jobs.append(SlurmJob(data[0],data[1],data[2]))
        return jobs

    @classmethod
    def listJobs(cls,user,args):
        joblist = cls.get_job_list()
        print("")
        print("You have the following running jobs:")
        print("Job ID\tJob Name")
        for j in filter(lambda x: x.jobstate=='R', joblist):
Chris Hines's avatar
Chris Hines committed
69
            print("{}\t{}".format(j.jobid,j.jobname))
Chris Hines's avatar
Chris Hines committed
70
71
72
73
        print("")
        print("You have the following not yet started jobs:")
        print("Job ID\tJob Name")
        for j in filter(lambda x: not (x.jobstate=='R'), joblist):
Chris Hines's avatar
Chris Hines committed
74
            print("{}\t{}".format(j.jobid,j.jobname))
Chris Hines's avatar
Chris Hines committed
75
76
77
78
79
80
81
82
        print("")
        print("Use the command {} attach-session <jobid> or {} attach-session <jobname> to connect".format(cls.programname,cls.programname))
        print("Or use the command {} new-session to start a new interactive session".format(cls.programname))
        print("Or use the command {} why-are-we-waiting <jobid> to find out why a session hasn't started yet".format(cls.programname))

    @classmethod
    def newJob(cls,args):
        import time
83
        import sys
Chris Hines's avatar
Chris Hines committed
84
        command = ['sbatch',
85
86
                "--ntasks","{}".format(args.ntasks[0]),
                "-J","{}".format(args.jobname[0])
Chris Hines's avatar
Chris Hines committed
87
                ]
Gin Tan's avatar
Gin Tan committed
88
89
        if args.account[0] != None:
            command.append("--account={}".format(args.account[0]))
Chris Hines's avatar
Chris Hines committed
90
91
92
93
94
95
96
97
        if args.partition[0] != None:
            command.append("--partition={}".format(args.partition[0]))
        if args.reservation[0] != None:
            command.append("--reservation={}".format(args.reservation[0]))
        if args.cpuspertask[0] != None:
            command.append("--cpus-per-task={}".format(args.cpuspertask[0]))
        if args.nodes[0] != None:
            command.append("--nodes={}".format(args.nodes[0]))
Chris Hines's avatar
Chris Hines committed
98
99
        if args.mem[0] != None:
            command.append("--mem={}".format(args.mem[0]))
Chris Hines's avatar
Chris Hines committed
100
101
        if args.gres[0] != None:
            command.append("--gres={}".format(args.gres[0]))
102
103
        if args.qos[0] != None:
            command.append("--qos={}".format(args.qos[0]))
Chris Hines's avatar
Chris Hines committed
104
105
        if args.time[0] != None:
            command.append("--time={}".format(args.time[0]))
106
107
108
109
110
111
112
113

        '''This section appends the --output and --error to the command'''

        command.append("--output={}".format(args.output[0]))
        command.append("--error={}".format(args.error[0]))

        '''Now we start!'''

Chris Hines's avatar
Chris Hines committed
114
115
        p = subprocess.Popen(command, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        (stdout,stderr) = p.communicate(cls.slurm_script)
116
        print("Requesting an interactive session")
117
        if stderr is not None and len(stderr) > 0:
118
            print(stderr.decode())
119
120
121
        jobs = cls.get_job_list()
        if len(jobs) == 0:
            time.sleep(1)
Chris Hines's avatar
Chris Hines committed
122
123
        jobs = cls.get_job_list()
        if len(jobs) == 1:
124
125
            print("Waiting to see if your interactive session starts",end='')
            sys.stdout.flush()
Chris Hines's avatar
Chris Hines committed
126
127
128
129
130
            time.sleep(2)
            jobs = cls.get_job_list()
            if jobs[0].jobstate == 'R':
                cls.connect_job(jobs[0].jobid)
            else:
131
132
133
134
135
136
137
138
139
140
141
142
                # Loop for up to 20 seconds waiting for the job to start
                count=1
                while count<10:
                    jobs = cls.get_job_list()
                    if len(jobs) == 1:
                        if jobs[0].jobstate == 'R':
                            cls.connect_job(jobs[0].jobid)
                    time.sleep(2)
                    count=count+1
                    print('.',end='')
                    sys.stdout.flush()
                print("")
Chris Hines's avatar
Chris Hines committed
143
144
145
                print("I can't connect you straight to your session because it hasn't started yet")
                print("use smux list-sessions to determine when it starts and")
                print("smux attach-session <jobid> to connect once it has started")
146
147
148
149
150
151
152
        elif len(jobs) == 0:
            print("Your job failed to submit for some reason.")
            print("Please look above for any error messages from sbatch")
            print("One possibility is you asked for an invalid combination of resources")
            print("Another option is that you made a typo in the command line")
            print("Either way plese try options one at a time.")
            print("If all else fails submit a help request to help@massive.org.au")
Chris Hines's avatar
Chris Hines committed
153
154
155
156
157
158
159
160
        else:
            print("I can't connect you straight to your session because you have more than one session running")
            print("use smux list-sessions to list your sessions")
            print("smux attach-session <jobid> to connect to the correct session")


    @classmethod
    def connect_job(cls,jobid):
161
        import time
Chris Hines's avatar
Chris Hines committed
162
163
        node=cls.get_node(jobid)
        name=cls.get_job_name(jobid)
164
        time.sleep(1)
Chris Hines's avatar
Chris Hines committed
165
166
        import shutil
        srunpath = shutil.which('srun')
167
        os.execv(srunpath,["srun","--pty","-O","--ntasks","1","--jobid","{}".format(jobid),"tmux","attach-session","-t","{}".format(name)])
Chris Hines's avatar
Chris Hines committed
168
169
170

    @classmethod
    def connectJob(cls,args):
Chris Hines's avatar
Chris Hines committed
171
        jobs = cls.get_job_list()
Chris Hines's avatar
Chris Hines committed
172
173
174
175
176
177
178
        try:
            jobid=args.jobid[0]
        except:
            try:
                jobid=args.jobid
            except:
                jobid=None
Chris Hines's avatar
Chris Hines committed
179
180
181
        if jobid != None:
            for j in jobs:
                if str(jobid) in j.jobid and j.jobstate != 'R':
182
                    raise SmuxConnectionError("Your session hasn't started yet")
Chris Hines's avatar
Chris Hines committed
183
184
185
186
187
        if jobid == None:
            jobs = cls.get_job_list()
            if len(jobs) == 1:
                if jobs[0].jobstate == 'R':
                    jobid=jobs[0].jobid
Chris Hines's avatar
Chris Hines committed
188
                else:
189
                    raise SmuxConnectionError("Your session hasn't started yet")
Chris Hines's avatar
Chris Hines committed
190
191
        if jobid == None:
            raise SmuxConnectionError("I couldn't figure out what you were trying to connect to, try specifying a jobid")
Chris Hines's avatar
Chris Hines committed
192
193
194
195
196
197
198
199
        cls.connect_job(jobid)

    @classmethod
    def jobid(cls,user,string):
        if not cls.ALLUSERS:
            output=subprocess.check_output(['squeue','-u',user,'-h','-o','%i %j']).splitlines()
        else:
            output=subprocess.check_output(['squeue','-h','-o','%i %j']).splitlines()
Chris Hines's avatar
Chris Hines committed
200
201
        for lb in output:
            l=lb.decode('utf-8')
Chris Hines's avatar
Chris Hines committed
202
203
            if string in l:
                jobid=l.split(' ')[0]
Chris Hines's avatar
Chris Hines committed
204
                return int(jobid)
Chris Hines's avatar
Chris Hines committed
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
        msg="%s is not a job id or a job name"%string
        raise argparse.ArgumentTypeError(msg)

    @classmethod
    def main(cls):
        import os
        import textwrap
        import sys
        import getpass
        import argparse
        cls.programname=os.path.basename(sys.argv[0])
        cls.ALLUSERS=False
        user=getpass.getuser()
        parser=argparse.ArgumentParser(prog=cls.programname,formatter_class=argparse.RawDescriptionHelpFormatter,
                description=textwrap.dedent('''\
                A tool to created and reconnect to interactive sessions

                Use "%(prog)s new-session" to create a new session
                Use "%(prog)s list-sessions" to list existing sessions
Chris Hines's avatar
Chris Hines committed
224
                Use "%(prog)s attach-session -t <ID>" to connect to an existing session
Chris Hines's avatar
Chris Hines committed
225
226

                When in a session, use the keys control+b then press d to dettach from the session
Chris Hines's avatar
Chris Hines committed
227
228

                Short forms (n, l and a) are also accepted. <ID> is optional if you only have one job.
229
230
231

                For more detailed help on each subcommand you can %(prog)s <subcommand> --help, 
                for example %(prog)s n --help will display additional options for starting a new session
Chris Hines's avatar
Chris Hines committed
232
233
                '''))
        subparser = parser.add_subparsers()
Chris Hines's avatar
Chris Hines committed
234
235
        connect=subparser.add_parser('attach-session',aliases=['a'])
        connect.add_argument('jobid',metavar="<jobid>",default=[None], type=lambda x: Smux.jobid(user,x),nargs='?',help="A job ID or job name")
Chris Hines's avatar
Chris Hines committed
236
        connect.add_argument('-t','--target',action='store_true')
Chris Hines's avatar
Chris Hines committed
237
        connect.set_defaults(func=Smux.connectJob)
Chris Hines's avatar
Chris Hines committed
238
        new=subparser.add_parser('new-session',aliases=['n'])
Chris Hines's avatar
Chris Hines committed
239
240
        new.add_argument('--ntasks',type=int, default=[1], metavar="<n>",nargs=1,help="The number of tasks you will launch")
        new.add_argument('--nodes',type=int, default=[None], metavar="<n>",nargs=1,help="The number of nodes you need")
241
        new.add_argument('--mem', default=[None], metavar="<n>",nargs=1,help="The amount of memory you need")
Chris Hines's avatar
Chris Hines committed
242
        new.add_argument('--cpuspertask',type=int, default=[None], metavar="<n>",nargs=1,help="The number of cpus needed for each task")
243
        new.add_argument('--qos', default=[None], metavar="<n>",nargs=1,help="The QoS (Quality of Service) used for the task (certain QoS are only valid on some partitiotns)")
Gin Tan's avatar
Gin Tan committed
244
        new.add_argument('-J','--jobname', default=["interactive_session"], metavar="<n>",nargs=1,help="The name of your job")
245
        new.add_argument('-A','--account', default=[None], metavar="<n>",nargs=1,help="Specify your account")
Chris Hines's avatar
Chris Hines committed
246
        new.add_argument('-p','--partition',default=[None],nargs=1,help="The partition to execute on")
247
248
        new.add_argument('-r','--reservation',default=[None],nargs=1,help="The reservation to use")
        new.add_argument('-t','--time',default=[None],nargs=1,help="The amount of time to run for")
249
        new.add_argument('--gres',default=[None], metavar="<n>",nargs=1,help="The type and number of gpus needed for each task")
250
251
        new.add_argument('-o','--output',default=["smux-%j.out"], metavar="<n>",nargs=1,help="Standard output file name")
        new.add_argument('-e','--error', default=["smux-%j.err"], metavar="<n>",nargs=1,help="Error output file name")
Chris Hines's avatar
Chris Hines committed
252
        new.set_defaults(func=Smux.newJob)
Chris Hines's avatar
Chris Hines committed
253
        listjobs=subparser.add_parser('list-sessions',aliases=['l'])
Chris Hines's avatar
Chris Hines committed
254
255
256
257
258
259
        listjobs.set_defaults(func=lambda x: Smux.listJobs(user,x))
        waiting=subparser.add_parser('why-are-we-waiting')
        waiting.add_argument('jobid',metavar="<jobid>", type=lambda x: Smux.jobid(user,x),nargs=1,help="A job ID or job name")
        waiting.set_defaults(func=Smux.whyAreWeWaiting)
        
        args=parser.parse_args()
Chris Hines's avatar
Chris Hines committed
260
261
        try:
            args.func(args)
Chris Hines's avatar
Chris Hines committed
262
263
        except SmuxConnectionError as e:
            print(e)
Chris Hines's avatar
Chris Hines committed
264
265
        except Exception as e:
            parser.print_help()