import { Injectable, EventEmitter } from '@angular/core'; import { HttpClientModule, HttpClient, HttpHeaders, HttpErrorResponse } from '@angular/common/http'; import { Observable, Subject, BehaviorSubject, of, from } from 'rxjs'; import { fromEvent, throwError, Subscription, merge } from 'rxjs'; import { catchError, map, tap, filter } from 'rxjs/operators'; import { Job } from './job'; import { AppAction, Strudelapp, StrudelappInstance } from './strudelapp'; import { Computesite, Health } from './computesite'; import { APIServer } from './apiserver'; import { Identity, AuthToken, KeyCert, SshAuthzServer } from './identity'; import { BatchInterface } from './batchinterface'; import { ComputesitesService } from './computesites.service'; import { StrudelappsService } from './strudelapps.service'; import { timer } from 'rxjs/observable/timer'; import { repeat, takeUntil } from 'rxjs/operators'; import { LocationStrategy, Location } from '@angular/common'; import { ActivatedRoute, Router } from '@angular/router'; import { ModaldialogComponent } from './modaldialog/modaldialog.component'; import { MatDialog, MatDialogRef, MAT_DIALOG_DATA, MatDialogModule } from '@angular/material'; import { AuthorisationService } from './authorisation.service'; import { environment } from '../environments/environment'; import { BackendSelectionService } from './backend-selection.service'; /** The TES service contains ways to start Tunnels, and Execute programs Its also responsible for querying a compute site for running jobs */ @Injectable({ providedIn: 'root', }) export class TesService { public Base: string; //public Base=environment.tesurl; //private twsproxy = environment.twsurl; private twsproxy: string; // public Base='http://localhost:5000'; public statusMsg: BehaviorSubject; public jobs: any[]; // public joblist: BehaviorSubject<{ [id: string ]: Job[]}>; //public joblist: BehaviorSubject; //public userhealth: BehaviorSubject; private timerSubscription: any; private appwindow: any; public apiserver: BehaviorSubject; public apiservers: BehaviorSubject; private updateJobSub: Subscription; private updateUserHealthSub: Subscription; private cachetincidents: BehaviorSubject; private nextUpdate: Subscription; private cancelRequests$: Subject; public identitySubject: BehaviorSubject; public appSubject: BehaviorSubject; private openapps: any[]; // public batchinterface: {[id: string] : BatchInterface}; constructor(private http: HttpClient, public dialog: MatDialog, private computesitesService: ComputesitesService, private authorisationService: AuthorisationService, private strudelappsService: StrudelappsService, private backendSelectionService: BackendSelectionService, private location: Location ) { // this.joblist = new BehaviorSubject<{[id: string]: Job[]}>({}); //this.joblist = new BehaviorSubject([]); //this.cachetincidents = new BehaviorSubject([]); //this.userhealth = new BehaviorSubject([{'stat':'ok','msg':''}]) this.apiserver = new BehaviorSubject(null); this.apiservers = new BehaviorSubject([]); this.cancelRequests$ = new Subject(); this.openapps = []; //this.identitySubject = new BehaviorSubject(null); //this.appSubject = new BehaviorSubject(null); this.backendSelectionService.apiserver.subscribe( (value) => { if (value != null) {this.twsproxy = value.tws ; this.Base = value.tes }}); timer(500).pipe(repeat()).subscribe(() => this.checkWindows()); // this.batchinterface = {}; // this.computesitesService.identities.subscribe(identities => this.startPolling(identities)); } public checkWindows() { var app: any; this.openapps.forEach( (app,index) => { if (app.window.closed) { if (app.job.state == 'RUNNING') { let dialogRef = this.dialog.open(ModaldialogComponent, { width: '600px', data: app.job, }); //dialogRef.afterClosed().subscribe((job) => {if (job !== null) { console.log('terminate'); console.log(job);}}) dialogRef.afterClosed().subscribe((job) => {if (job !== null) { this.cancel(job)}}); } this.openapps.splice(index,1); } }) } public setStatusMsg(statusMsg: BehaviorSubject) { this.statusMsg = statusMsg; } private buildParams(app: Strudelapp, identity: Identity, batchinterface: BatchInterface, appinst?: any): string { let params = new URLSearchParams(); let id = identity.copy_skip_catalog(); id.site.appCatalog = null; if (appinst!== null) { params.set('appinstance',JSON.stringify(appinst)); } params.set('app',JSON.stringify(app)); params.set('interface',JSON.stringify(batchinterface)); params.set('identity',JSON.stringify(id)); return params.toString(); } updateJoblist(resp, identity: Identity) { // resp contains a javascript represnetiation of a list of jobs // We want to update the joblist BUT we don't want to create new Job objects // instead we want to reuse existing job objects removing any which are no longer valid // and adding any new ones. We also want the list sorted from largest jobid to smallest (oldest job) // The sort is lexographic since sometimes jobids are a string rather than a number var joblist: Job[] = [] var jobquery: Job[] = resp; var lastjoblist: Job[] = identity.joblist.value; var qjobids: any[] = []; var jobids: any[] = []; var j: Job; var newjob: Job; var idx: number; for (j of jobquery) { qjobids.push(j.jobid); } for (j of lastjoblist) { if (qjobids.indexOf(j.jobid) != -1) { if (jobids.indexOf(j.jobid) == -1) { idx = qjobids.indexOf(j.jobid) newjob = jobquery[idx] // These values in the job may change, but we need to keep using the old object j.state = newjob.state; j.endtime = newjob.endtime; j.batch_host = newjob.batch_host; joblist.push(j); jobids.push(j.jobid); } } } for (j of jobquery) { if (jobids.indexOf(j.jobid) == -1) { joblist.push(j); jobids.push(j.jobid); } } for (j of joblist) { if (j.app === undefined || j.app == null) { j.app = this.strudelappsService.getApp(j.appname,identity.site.appCatalog.value); } if (j.identity == undefined) { j.identity = identity; } if (j.connectionState == undefined) { j.connectionState = 0; } } joblist = joblist.sort((a,b) => (a.jobid < b.jobid)? 1:-1); identity.joblist.next(joblist); //this.statusMsg.next(null); } private getBatchInterfaceError(error: any) { console.error(error); } private getCachetIncidentsError(error: any, identity: Identity) { return this.getJobsError(error, identity) } public cancelHealthRequests() { this.cancelRequests$.next(true); } private getUserHealthError(error: any, identity: Identity) { identity.accountalerts.next([]); this.statusMsg.next("There was an error checking your user account"); //return this.getJobsError(error,identity) } private getJobsError(error: any, identity: Identity) { identity.joblist.next([]); if (error.status == 0) { this.statusMsg.next("A network error occurred. Please try again later"); return } console.error(error); if (error.status == 401) { this.statusMsg.next("Login expired. Please log in again."); this.authorisationService.updateAgentContents(); return } if (error.status == 400) { if (error.error !== undefined && error.error.message !== undefined) { this.statusMsg.next(error.error.message); } return } } getJobs(identity: Identity) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; // remove from the job list any jobs for identities that we don't know about let oldjobs = identity.joblist.value; let bi = new BatchInterface(); bi.cancelcmd = identity.site.cancelcmd; bi.statcmd = identity.site.statcmd; let params = new URLSearchParams(); params.set('statcmd',JSON.stringify(identity.site.statcmd)); params.set('host',JSON.stringify(identity.site.host)); params.set('username',JSON.stringify(identity.username)); this.updateJobSub = this.http.get(this.Base+'/stat'+'?'+params.toString(),options) .subscribe(resp => this.updateJoblist(resp, identity), error => this.getJobsError(error, identity)); } getUserHealth(identity: Identity) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; // remove from the job list any jobs for identities that we don't know about let params = new URLSearchParams(); if (identity === undefined || identity === null) { return } if (identity.site.userhealth === undefined) { identity.accountalerts.next([]); return } params.set('statcmd',JSON.stringify(identity.site.userhealth)); params.set('host',JSON.stringify(identity.site.host)); params.set('username',JSON.stringify(identity.username)); this.updateUserHealthSub = this.http.get(this.Base+'/stat'+'?'+params.toString(),options) .pipe(takeUntil(this.cancelRequests$)) .subscribe(resp => this.addUserHealth(identity,resp), error => this.getUserHealthError(error,identity)); } getHealthAlerts(identity: Identity) { //identity.accountalerts.next(null); //identity.systemalerts.next([]); this.getCachetIncidents(identity); this.getUserHealth(identity); } getCachetIncidents(identity: Identity) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: false}; // remove from the job list any jobs for identities that we don't know about let params = new URLSearchParams(); if (identity.site.cacheturis === undefined || identity.site.cacheturis.length == 0) { identity.systemalerts.next([]); return } for (let uri of identity.site.cacheturis) { this.http.get(uri,options) .pipe(takeUntil(this.cancelRequests$)) .subscribe(resp => this.addCachetIncidents(identity,resp), error => this.getCachetIncidentsError(error,identity)); } } addCachetIncidents(identity,resp) { console.log('got an sa response'); /*let ci = identity.systemalerts.value; if (ci == null) { ci = [] }*/ let ci = []; for (let i of resp.data) { if (i.status == 3 || i.status == 4) { continue; } let h = new Health(); h.stat = 'error'; h.msg = i.message; ci.push(h); } identity.systemalerts.next(ci); console.log('update system alerts for ',identity.displayName()); console.log(identity.systemalerts.value); console.log(identity.systemalerts); } addUserHealth(identity,resp) { /*let ci = identity.accountalerts.value; if (ci == null) { ci = [] }*/ let ci = [] for (let i of resp) { let h = new Health(); h.stat = i.stat; h.msg = i.message; if (i.title != undefined) { h.title = i.title; } if (i.type != undefined) { h.type = i.type; h.data = i.data; } ci.push(h); } console.log('update account alerts for ',identity.displayName()); identity.accountalerts.next(ci); console.log(identity.accountalerts.value); console.log(identity.accountalerts); } public getconfig(app: Strudelapp, identity: Identity): Observable { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; return this.http.get(identity.site.url+'getconfig/'+app.name,options) // .pipe(catchError(this.handleError)) } submissionError(error: any) { if (error.status != 0) { if ('error' in error && 'message' in error.error) { this.statusMsg.next(error.error.message); } else { this.statusMsg.next('Job submission failed'); } } } cancelError(error: any) { console.log('error canceling job',error); if (error.status != 0) { if ('error' in error && 'message' in error.error) { this.statusMsg.next(error.error.message); } else { this.statusMsg.next('Job submission failed'); } } } buildBody(app: Strudelapp, appparams?: string) { return JSON.stringify({'app': app, 'appparams': appparams}); } submit(app: Strudelapp, identity: Identity, batchinterface: BatchInterface, appparams?: any) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; this.statusMsg.next('Submitting job'); let paramstr = this.buildParams(app,identity,batchinterface); // let body = this.buildBody(app,appparams) let keys = JSON.stringify(this.authorisationService.getKeys()); let loggedin = JSON.stringify(this.authorisationService.loggedInAuthZ.value); let ids = []; for (let id of this.computesitesService.ftidentities.value) { ids.push(id.copy_skip_catalog()) } let body = {'app': app, 'appparams': appparams, 'keys': keys, 'ids': JSON.stringify(JSON.stringify(ids))} this.http.post(this.Base+'/submit'+'?'+paramstr, body, options) .subscribe(resp => { this.statusMsg.next(null) }, error => this.submissionError(error)); } submitted(resp: any, identity: Identity ) { this.getJobs(identity); } cancel(job: Job) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; // this.statusMsg.next(null); let data = {}; let bi = new BatchInterface(); bi.statcmd = job.identity.site.statcmd; bi.cancelcmd = job.identity.site.cancelcmd; let paramstr = this.buildParams(job.app,job.identity,bi); this.http.delete(this.Base+'/cancel/'+job.jobid+'?'+paramstr, options) .subscribe(resp => this.submitted(resp,job.identity), error => this.cancelError(error)); } public watchAppwindow(appwindow, dialogRef) { if (appwindow.closed) { dialogRef.close(); } } public getAppInstance(job: Job, action: AppAction) { let username = job.identity.username; let loginhost = job.identity.site.host; let batchhost = job.batch_host; let jobid = job.jobid; let params = new URLSearchParams; let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; params.set('cmd',JSON.stringify(action.paramscmd)); let paramstr = params.toString(); job.connectionState = 1; this.http.get(this.Base+'/appinstance/'+username+'/'+loginhost+'/'+batchhost+'/'+jobid+'?'+paramstr, options) // .pipe(catchError(this.handleError)) .subscribe(resp => { job.appinst = resp; if (action.client != null) { this.createTunnel(job, action) } else { job.connectionState = 0} }, error => { this.handleAppInstanceError(job,error) }) // let paramstr = this.buildParams(job.app,job.identity,this.batchinterface[job.identity.repr()]); // let headers = new HttpHeaders(); // let options = { headers: headers, withCredentials: true}; // this.http.get(this.Base+'/getappinst/'+paramstr,options) // .pipe(catchError(this.handleError)) // .subscribe(resp => { job.appinst = resp; this.openAppWindow(job);}); } public createTunnel(job: Job, action: AppAction) { let username = job.identity.username; let loginhost = job.identity.site.host; let batchhost = job.batch_host; let params = new URLSearchParams; let headers = new HttpHeaders(); job.connectionState = 2; // let sleepDuration = 20; //var now = new Date().getTime(); //while(new Date().getTime() < now + sleepDuration){ /* do nothing */ } let options = { headers: headers, withCredentials: true}; //params.set('cmd',JSON.stringify(action.paramscmd)); let paramstr = params.toString(); this.http.post(this.Base+'/createtunnel/'+username+'/'+loginhost+'/'+batchhost+'?'+paramstr, job.appinst, options) .subscribe(() => { this.getAppUrl(job, action) } ) } public getAppUrl(job: Job, action: AppAction) { let params = new URLSearchParams; let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; let pseudoapp = {'client':{'redir':action.client.redir}}; params.set('app',JSON.stringify(pseudoapp)); params.set('appinst',JSON.stringify(job.appinst)); let paramstr = params.toString(); job.connectionState = 3; this.http.get(this.Base+'/appurl?'+paramstr,options) .pipe(catchError(this.handleError)) .subscribe(resp => { job.connectionState = 0; this.openAppWindow(resp,job)}); } public getInterface(job: Job, action: AppAction) { this.getAppInstance(job, action); } public openAppWindow(url: any, job: Job) { var re = /^https:\/\/([a-z0-9\.-]+)\/?/; let twshost = this.twsproxy.replace(re,"$1"); let windowloc = url.replace(/\{twsproxy\}/g,this.twsproxy).replace(/twshost/g,twshost); let appwindow = window.open(windowloc); if (appwindow == null) { this.statusMsg.next('It looks like a window failed to open. Please check your popup blocker settings (Strudel 2 needs to be able to open a window to your application'); return; } if (appwindow.closed) { return } this.openapps.push({'window':appwindow,'job':job}) } public connect(job: Job, action: AppAction, appinst?: any) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; this.getInterface(job,action); // getInterface will subsequently called getAppInstance, which will call createTunnel, which will openAppWindow } public postAgentData(keyCert: KeyCert) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; this.statusMsg.next("Authorising ...") let data = {'key': keyCert.key, 'cert': keyCert.cert}; return this.http.post(this.Base+'/sshagent',data,options) .pipe(catchError(this.handleError)) } public postMkDir(id: Identity, path: string, name: string ) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; let params = new URLSearchParams; params.set('identity',JSON.stringify(id)); params.set('path',JSON.stringify(path)); return this.http.post(this.Base+'/mkdir?'+params.toString(),name, options); } public getSftpData(id: Identity, path: string, cd: string ) { let headers = new HttpHeaders(); let options = { headers: headers, withCredentials: true}; let params = new URLSearchParams; params.set('identity',JSON.stringify(id)); params.set('path',JSON.stringify(path)); params.set('cd',JSON.stringify(cd)); return this.http.get(this.Base+'/ls?'+params.toString(),options) // .pipe(catchError(this.handleError)) } public getstatusMsgSubject(): BehaviorSubject { return this.statusMsg; } private httperror(errorstr: string) { var re = /login expired/gi; let searchresult = errorstr.search(re); if (searchresult != -1) { this.statusMsg.next("Some authentication tokens have expired, you may need to log in again"); console.error(errorstr); this.authorisationService.updateAgentContents(); } console.error(errorstr); } // private networkError(error: HttpErrorResponse) { // if (error.error instanceof ErrorEvent) { // console.log('network error contacting TES backend'); // } else { // return error; // } // } private handleAppInstanceError(job: Job, error: any) { console.log(error); job.connectionState=0; if (error.status == 0) { this.statusMsg.next('It appears we had a timeout learning about your '+job.name+'. Its possible '+job.identity.site.name+' is experiencing issues'); return; } if (error.error !== undefined && error.error.message !== undefined) { this.statusMsg.next(error.error.message); } else if (error.error !== undefined && error.error.error !== undefined) { this.statusMsg.next(error.error.error.msg); } else { this.statusMsg.next(error); } } private handleError(error: HttpErrorResponse) { console.error('in handleError'); console.error(error); if (error.error instanceof ErrorEvent) { // A client-side or network error occurred. Handle it accordingly. return throwError("Hmm, that didn't work. If you're using a local connection, please make sure Strudel-TES is running."); } else { // Not sure if this code works correctly. It should update identities in case the error is // that the user isn't allowed to run the job var re = /identity/gi; if (error.error.message != undefined) { let searchresult = error.error.message.search(re); if (searchresult != -1) { // this.getIdentities(); return throwError('login expired, refreshing'); } return throwError(error.error.message) } else { return throwError('An error occured but I\'m not sure exactly what. Please try again latter or contact the sys admins'); } // this.statusMsg.next("There was an error submitting that job. The backend gave me the message: " + error.error.message); } } }