Source code for cybergis_compute_client.CyberGISCompute

"""
This module exposes CyberGISCompute class which creates a CyberGISCompute
object that serves as an entry point to the CyberGISX environment from a Python/Jupyter notebook.
All interactions with the High Performance Computing (HPC) backend are performed using this object.

Example:
        cybergis = CyberGISCompute(url='localhost', port='3030', protocol='HTTP', isJupyter=False)
"""

from .Client import Client  # noqa
from .Job import Job  # noqa
from .UI import UI  # noqa
from .MarkdownTable import MarkdownTable  # noqa
import json
import base64
import os
import getpass
from IPython.display import display, Markdown, Javascript


class ParamAccumulator:
    def __init__(self,
                 params={}):
        self.params = params


[docs]class CyberGISCompute: """CyberGISCompute class An inteface that handles all interactions with the HPC backend Args: url (str): url that needs to be accessed port (str): port of the Jupyter or Python interface protocol (str): Typically HTTP or HTTPS suffix (str): specify version. For e.g v2 isJupyter(bool): set to True if you are using Jupyter environment Attributes: client (Client object): Initialized using url(str), protocol(str), port(str) and suffix(str) jupyterhubApiToken (string): jupyterhub's REST API token that can be used to authenticate the user username (string): username isJupyter (bool): set to True if you are working in a jupyter environment else set it to False ui (UI): Serves as entry point to UI functionality job (Job): Serves as entry point to access job interactions recentDownloadPath (str): Gets the most recent download path from globus jupyterhubHost (str): static variable that stores the path to jupyterhubHost """ # static variable jupyterhubHost = None job = None def __init__(self, url="cgjobsup.cigi.illinois.edu", port=443, protocol='HTTPS', suffix="v2", isJupyter=True): """ Initializes instance CyberGISCompute using inputs from the client Args: url (str): url that needs to be accessed port (str): port of the Jupyter or Python interface protocol (str): Typically HTTP or HTTPS suffix (str): specify version. For e.g v2 isJupyter(bool): set to True if you are using Jupyter environment Returns: CyberGISCompute: this CyberGISCompute """ self.client = Client(url=url, protocol=protocol, port=port, suffix=suffix) self.jupyterhubApiToken = None self.username = None self.isJupyter = isJupyter self.ui = UI(self) if isJupyter: self.enable_jupyter() # job self.job = None self.recentDownloadPath = None self.simple = False
[docs] def encrypt_token(self, token): """ Encrypts the token using host variable. Args: token (str): User/Environment provided token. """ self.jupyterhubApiToken = base64.b64encode( (self.jupyterhubHost + '@' + token).encode('ascii')).decode('utf-8')
[docs] def get_jupyterhubHost(self): """ Gets the jupyterhub host(str) from the user. """ if (self.jupyterhubHost is None): print( "Please copy the JupyterHub url along with port. E.g http://127.0.0.1:8081") self.jupyterhubHost = input('Enter your jupyterhubHost here: ')
def cancel_job(self): if self.job is not None: res = self.client.request('PUT', '/job/' + self.job.id + '/cancel', {"jupyterhubApiToken": self.jupyterhubApiToken, "jobId": self.job.id}) display(Markdown(str(res)))
[docs] def set_username(self): """ Authenticates the token(str) and saves the username(str). """ res = self.client.request( 'GET', '/user', {"jupyterhubApiToken": self.jupyterhubApiToken}) self.username = res['username']
[docs] def save_token(self): """ Writes token(str) to json file. """ with open('./cybergis_compute_user.json', 'w') as json_file: json.dump({"token": self.jupyterhubApiToken}, json_file)
[docs] def login_token(self): """ Saves username(str) and token(str). """ try: self.set_username() self.save_token() return self.login() except: print('❌ Failed to login via system token')
[docs] def host_token_login(self, token): """ Gets the host(str), encrypts the token(str) and calls login. Args: token (str): User/Environment provided token. """ self.get_jupyterhubHost() self.encrypt_token(token) return self.login_token()
[docs] def login_manual(self): """ Asks for token and host from user and calls login_token function. """ if self.isJupyter: print('📢 Please go to Control Panel -> Token, request a new API token') token = getpass.getpass('Enter your API token here') try: return self.host_token_login(token) except: print('❌ Failed to login via user input') else: print('❌ Enable Jupyter using .enable_jupyter() before you login')
[docs] def login_json(self): """ Checks for json file and calls login_token function. """ try: with open(os.path.abspath('cybergis_compute_user.json')) as f: user = json.load(f) token = user['token'] print('📃 Found "cybergis_compute_user.json! NOTE: if you want to login as another user, please remove this file') self.jupyterhubApiToken = token self.set_username() self.save_token() return self.login() except: # print('❌ Failed to login via token JSON file, trying environment variable...') envToken = os.getenv('JUPYTERHUB_API_TOKEN') if envToken is not None: return self.host_token_login(envToken)
[docs] def login(self, manualLogin=False, manualHost=None, verbose=True): """ Authenticates the client's jupyterhubApiToken and gives them access to CyberGISCompute features Args: manualLogin (bool): set to True if env variable and file login modes are not available Todo: Document exceptions/errors raised. """ if manualHost is not None: self.jupyterhubHost = manualHost # login via env variable if self.jupyterhubApiToken is not None: if self.username is None: self.set_username() if verbose: print('🎯 Logged in as ' + self.username) return # manual login if manualLogin: return self.login_manual() # login via json file elif os.path.exists('./cybergis_compute_user.json'): return self.login_json() else: envToken = os.getenv('JUPYTERHUB_API_TOKEN') if envToken is not None: return self.host_token_login(envToken) print('❌ Not logged in. To enable more features, use .login()')
[docs] def create_job(self, maintainer='community_contribution', hpc=None, hpcUsername=None, hpcPassword=None, verbose=True): """ Creates a job object Initializes instance CyberGISCompute using inputs from the client Args: maintainer (str): Pre-packaged programs which can be configured and controlled remotely and behave as a bridge between user and HPC backends hpc(str): HPC backend that is being accessed. For e.g 'keeling_community' hpcUsername (str): username for HPC backend hpcPassword (str): password for HPC backend printJob (str): prints the Job infortmation if set to True Returns: Job: The new job instance that was initialized """ self.login() return Job(maintainer=maintainer, hpc=hpc, id=None, hpcUsername=hpcUsername, hpcPassword=hpcPassword, client=self.client, isJupyter=self.isJupyter, jupyterhubApiToken=self.jupyterhubApiToken, printJob=verbose)
def run_job_using_params(self, input_params=[], maintainer='community_contribution', hpc="keeling_community", hpcUsername=None, hpcPassword=None, localExecutableFolder={"type": "git", "gitId": "hello_world"}, localDataFolder=None, localResultFolder=None, env=None, slurm=None, verbose=True): for params in input_params: param_acc = ParamAccumulator(params) job = self.create_job(maintainer, hpc, hpcUsername, hpcPassword) job.set(localExecutableFolder, localDataFolder, localResultFolder, param_acc.params, env, slurm) job.submit()
[docs] def get_job_by_id(self, id=None, verbose=True): """ Returns Job object with the specified id Args: id (int): Job id Returns: Job: Job object with the specified id otherwise None """ self.login(verbose=False) return Job(client=self.client, id=id, isJupyter=self.isJupyter, jupyterhubApiToken=self.jupyterhubApiToken, printJob=verbose)
[docs] def get_slurm_usage(self, raw=False): """ Prints slurm usage Args: raw(bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed directly into the interface """ self.login() usage = self.client.request('GET', '/user/slurm-usage?format={}'.format( not raw), {"jupyterhubApiToken": self.jupyterhubApiToken}) if raw: return usage display( Markdown( "Nodes: {}<br>Allocated CPUs: {}<br>Total CPU Time: {}<br>Memory Utilized: {}<br>Total Allocated Memory: {}<br>Total Walltime: {}".format(usage['nodes'], usage['cpus'], usage['cpuTime'], usage['memory'], usage['memoryUsage'], usage['walltime'])))
[docs] def list_job(self, raw=False): """ Prints a list of jobs that were submitted Args: raw (bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed into the interface """ self.login() if self.jupyterhubApiToken is None: print('❌ please login') jobs = self.client.request( 'GET', '/user/job', { "jupyterhubApiToken": self.jupyterhubApiToken}) if raw: return jobs headers = ['id', 'hpc', 'remoteExecutableFolder', 'remoteDataFolder', 'remoteResultFolder', 'param', 'slurm', 'userId', 'maintainer', 'createdAt'] data = [] for job in jobs['job']: to_append = [ job['id'], job['hpc'], job['remoteExecutableFolder']["id"] if ( job['remoteExecutableFolder'] is not None and "id" in job['remoteExecutableFolder']) else None, job['remoteDataFolder']["id"] if ( job['remoteDataFolder'] is not None and "id" in job['remoteDataFolder']) else None, job['remoteResultFolder']["id"] if ( job['remoteResultFolder'] is not None and "id" in job['remoteResultFolder']) else None, json.dumps(job['param']), json.dumps(job['slurm']), job['userId'], job['maintainer'], job['createdAt'] ] data.append(to_append) if self.isJupyter: if len(data) == 0: print('empty') return display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers))
[docs] def list_hpc(self, raw=False): """ Prints a list of hpc resources that the server supports Args: raw (bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed directly into the interface """ hpc = self.client.request('GET', '/hpc')['hpc'] if raw: return hpc headers = ['hpc', 'ip', 'port', 'is_community_account'] data = [] for i in hpc: data.append([ i, hpc[i]['ip'], hpc[i]['port'], hpc[i]['is_community_account'] ]) if self.isJupyter: if len(data) == 0: print('empty') return display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers))
[docs] def list_container(self, raw=False): """ Prints a list of containers that the server supports Args: raw (bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed directly into the interface """ container = self.client.request('GET', '/container')['container'] if raw: return container headers = ['container name', 'dockerfile', 'dockerhub'] data = [] for i in container: data.append([ i, container[i]['dockerfile'], container[i]['dockerhub'] ]) if self.isJupyter: if len(data) == 0: print('empty') return display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers))
[docs] def list_jupyter_host(self, raw=False): """ Prints a list of jupyter hosts that the server supports Args: raw (bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed directly into the interface """ try: hosts = self.client.request('GET', '/whitelist')['whitelist'] if raw: return hosts headers = ['jupyter_host', 'description'] data = [] for i in hosts: data.append([ i, hosts[i], ]) if self.isJupyter: if len(data) == 0: print('empty') return display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers)) except: print("The server " + self.client.url + " doesn't have this route")
[docs] def list_git(self, raw=False): """ Prints a list of Git projects that the server supports Args: raw (bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed directly into the interface """ git = self.client.request('GET', '/git')['git'] if raw: return git headers = ['link', 'name', 'container', 'repository', 'commit'] data = [] for i in git: data.append([ 'git://' + i, git[i]['name'], git[i]['container'], git[i]['repository'], git[i]['commit'] if 'commit' in git[i] else 'NONE', ]) if self.isJupyter: if len(data) == 0: print('empty') return display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers))
[docs] def list_maintainer(self, raw=False): """ prints a list of maintainers that the server supports Args: raw (bool): set to True if you want the raw output Returns: JSON: Raw output if raw=True otherwise its printed or displayed directly into the interface """ maintainers = self.client.request('GET', '/maintainer')['maintainer'] if raw: return maintainers headers = [ 'maintainer', 'hpc', 'default_hpc', 'job_pool_capacity', 'executable_folder->from_user', 'executable_folder->must_have'] data = [] for i in maintainers: maintainer = maintainers[i] from_user = 'not specified' if 'executable_folder' in maintainer: from_user = maintainer['executable_folder']['from_user'] must_have = 'not specified' if 'executable_folder' in maintainer: if 'file_config' in maintainer['executable_folder']: if 'must_have' in maintainer[ 'executable_folder'][ 'file_config']: must_have = maintainer[ 'executable_folder'][ 'file_config'][ 'must_have'] data.append([ i, maintainer['hpc'], maintainer['default_hpc'], maintainer['job_pool_capacity'], from_user, must_have ]) if self.isJupyter: if len(data) == 0: print('empty') return display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers))
# Integrated functions
[docs] def list_info(self, list_maintainer=False, list_container=False): """ Calls :meth:`cybergis_compute_client.CyberGISCompute.CyberGISCompute.list_git`, :meth:`cybergis_compute_client.CyberGISCompute.CyberGISCompute.list_hpc`, :meth:`cybergis_compute_client.CyberGISCompute.CyberGISCompute.list_job` with options to call :meth:`cybergis_compute_client.CyberGISCompute.CyberGISCompute.list_maintainer` and :meth:`cybergis_compute_client.CyberGISCompute.CyberGISCompute.list_container`. Args: list_maintainer (bool): set to True if you want to call list_maintainer list_container (bool): set to True of you want to call list """ print('📦 Git repositories:') self.list_git() print('🖥 HPC endpoints:') self.list_hpc() if self.is_login(): print('📮 Submitted jobs:') self.list_job() if list_container: print('🗳 Containers:') self.list_container() if list_maintainer: print('🤖 Maintainers:') self.list_maintainer()
[docs] def create_job_by_ui( self, input_params=None, defaultJob="hello_world", defaultDataFolder="./", defaultRemoteResultFolder=None): """ Displays the job submission UI. Args: defaultJob (str): Stores the default job that shows up on the UI defaultDataFolder (str): Stores the default input folder that shows up on the UI defaultRemoteResultFolder (str): Stores the default output folder that shows up on the UI """ self.show_ui(input_params, defaultJob, defaultDataFolder, defaultRemoteResultFolder)
[docs] def show_ui(self, simple=False, input_params=None, defaultJob="hello_world", defaultDataFolder="./", defaultRemoteResultFolder=None, jupyterhubApiToken=None): """ Displays the job submission UI Args: defaultJob (str): Stores the default job that shows up on the UI defaultDataFolder (str): Stores the default input folder that shows up on the UI defaultRemoteResultFolder (str): Stores the default output folder that shows up on the UI Returns: None """ self.simple = simple if (jupyterhubApiToken is not None): self.jupyterhubApiToken = jupyterhubApiToken self.ui.defaultJobName = defaultJob self.ui.defaultDataFolder = defaultDataFolder df = defaultRemoteResultFolder self.ui.input_params = input_params if df is not None: self.ui.defaultRemoteResultFolder = df if df[0] == '/' else '/' + df self.ui.render()
[docs] def get_latest_created_job(self): """ Return the current job instance Returns: Job: Latest Job object instance """ return self.job
# helper functions
[docs] def enable_jupyter(self): """ Sets up jupyter environment in jupyterhubHost """ self.isJupyter = True # get jupyter variable url = os.getenv('JUPYTER_INSTANCE_URL') if url is not None: CyberGISCompute.jupyterhubHost = url.replace( 'https://', '').replace( 'http://', '') else: display(Javascript( 'IPython.notebook.kernel.execute(''`CyberGISCompute.jupyterhubHost = "${window.location.host}"`);'))
[docs] def get_user_jupyter_globus(self): """ Return the current job instance Returns: Job: Latest Job object instance """ return self.client.request( 'GET', '/user/jupyter-globus', { "jupyterhubApiToken": self.jupyterhubApiToken})
[docs] def is_login(self): """ Checks whether jupyterhubApi token exists or not Returns: bool: jupyterhubAPI existence check """ return self.jupyterhubApiToken is not None