# Spark on Kubernetes

Preparing the notebook https://towardsdatascience.com/make-kubeflow-into-your-own-data-science-workspace-cc8162969e29

## Run Spark inside Notebook

https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes

In [None]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="Pi")
sc.setLogLevel("INFO")
num_samples = 100000

def inside(p): 
 x, y = random.random(), random.random()
 return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

## Setup service account permissions

https://github.com/kubeflow/kubeflow/issues/4306 issue with launching spark-operator from jupyter notebook

Run command in your shell (not in notebook)

```shell
export NAMESPACE=
kubectl create serviceaccount spark -n ${NAMESPACE}
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=${NAMESPACE}:spark --namespace=${NAMESPACE}
```

## Python version

> Note: Make sure your driver python and executor python version matches.
> Otherwise, you will see error msg like below

Exception: Python in worker has different version 3.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` are correctly set.

In [None]:
import sys
print(sys.version)

## Client Mode

In [None]:
import findspark, pyspark,socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

findspark.init()

localIpAddress = socket.gethostbyname(socket.gethostname())

conf = SparkConf().setAppName('sparktest1')
conf.setMaster('k8s://https://kubernetes.default.svc:443')
conf.set("spark.submit.deployMode", "client")
conf.set("spark.executor.instances", "2")
conf.set("spark.driver.host", localIpAddress)
conf.set("spark.driver.port", "7778")
conf.set("spark.kubernetes.namespace", "kf10")
conf.set("spark.kubernetes.container.image", "seedjeffwan/spark-py:v3.0.0")
conf.set("spark.kubernetes.pyspark.pythonVersion", "3")
conf.set("spark.kubernetes.namespace", "spark")
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
conf.set("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")

In [None]:
sc = pyspark.context.SparkContext.getOrCreate(conf=conf)

# following works as well
# spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
num_samples = 100000

def inside(p): 
 x, y = random.random(), random.random()
 return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

In [None]:
sc.stop()

## Cluster Mode

## Java

In [None]:
%%bash

/opt/spark/bin/spark-submit --master "k8s://https://kubernetes.default.svc:443" \
--deploy-mode cluster \
--name spark-pi-java \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false \
--conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false \
--conf spark.kubernetes.container.image=seedjeffwan/spark:v3.0.0 \
--conf spark.kubernetes.driver.pod.name=spark-pi-java \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.6.jar 1000

## Python

In [None]:
%%bash

/opt/spark/bin/spark-submit --master "k8s://https://kubernetes.default.svc:443" \
--deploy-mode cluster \
--name spark-pi \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=seedjeffwan/spark-py:v3.0.0 \
--conf spark.kubernetes.driver.pod.name=spark-pi \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false \
--conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark /opt/spark/examples/src/main/python/pi.py 1000