{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Initialize the Cluster Manager\n", "The cluster manager works in a local session (not spark) so it can interact with the cluster manager service to manage clusters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Current session configs: {'files': [], 'jars': [], 'conf': {'spark.pyspark.python': 'python3', 'spark.pyspark.virtualenv.enabled': 'true', 'spark.pyspark.virtualenv.type': 'native', 'spark.pyspark.virtualenv.bin.path': '/usr/bin/virtualenv', 'spark.pyspark.virtualenv.packages': '', 'spark.jars.packages': '', 'spark.jars.repositories': ''}, 'kind': 'pyspark'}
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
10application_1623322609824_0011pysparkbusyLinkLink
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%info" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Basic Cluster Management\n", "\n", "- list clusters\n", "- update (resize) them\n", "- stop (terminate) them" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "import pandas as pd\n", "\n", "# list all clusters, running and recently terminated\n", "pd.DataFrame.from_dict( finspace_clusters.list()['clusters'] )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "\n", "# from above return of list, enter the running cluster's clusterId below and your target size for the cluster\n", "\n", "cid = '' # paste your own id here\n", "\n", "finspace_clusters.update(cid, 'Medium') # Choices: Small, Medium, Large" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "import time\n", "\n", "def wait_for_status( client, clusterId:str, status:str, sleep_sec = 30 ):\n", " while True:\n", " resp = client.list()\n", "\n", " this_cluster = None\n", "\n", " # is this the cluster?\n", " for c in resp['clusters']:\n", " if cid == c['clusterId']:\n", " this_cluster = c\n", "\n", " if this_cluster is None:\n", " print(f\"clusterId:{cid} not found\")\n", " return( None )\n", "\n", " this_status = this_cluster['clusterStatus']['state']\n", "\n", " if this_status.upper() != status.upper():\n", " print(f\"Cluster status is {this_status}, waiting {sleep_sec} sec ...\")\n", " time.sleep(sleep_sec)\n", " continue\n", " else:\n", " return( this_cluster )\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "wait_for_status(finspace_clusters, cid, status='RUNNING')" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "%local\n", "\n", "### THIS IS SET TO NON_RUNNABLE TO PREVENT MISTAKES\n", "### To terminate a cluster, use the below command\n", "\n", "# terminate the cluster\n", "finspace_clusters.terminate('')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Livy Session\n", "Use livy and sparkmagic to monitor the spark sessions on the cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%info" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### This will delete a specific session\n", "Commented out so its not accidently run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#%%delete -f -s << livy session id >>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### This will cleanup/delete all livy sessions on the cluster\n", "Commented out so its not accidently run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#%%cleanup -f" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "import datetime\n", "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-5567a/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:656007506553:image/finspace-sparkmagic-5567a" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }