Hello everybody!
I am using pangeo as configuration for my JupyterHub but decided to post this issue here as I think it is not pangeo specific. As some of you maybe know the current version of Spark (2.4) introduces PySpark support for the new kubernetes functionality of spark. I tried to get it running on my cluster by adapting this tutorial. I know this issue is primarily a PySpark issue but I thought it is maybe interesting to discuss it here as I can imagine it is interesting for other JupyterHub users too. Here is what I did:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: default
name: spark-role
rules:
- apiGroups: [“”]
resources: [“pods”]
verbs: [“get”, “watch”, “list”, “edit”, “create”, “delete”]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: spark
namespace: default
subjects:
- kind: User
name: system:serviceaccount:pangeo:daskkubernetes
apiGroup: “”
roleRef:
kind: Role
name: spark-role
apiGroup: “”
This extends the rights of the daskkubernetes service account which is necessary for pangeo to interact with dask properly.
Creating a user pod from the current Jupyter PySpark docker image, which supports Spark 2.4
Getting my master ip with kubectl cluster-info
Create an new SparkContext in the following way:
sc = SparkContext(master='k8s://https://<Master-IP>')
This is the output of the context:

When I do kubectl get po --all-namespaces I can not see a new Spark pod running.
sqlc = SQLContext(sc)
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlc.createDataFrame(people)
The last line sadly stucks and when I interrupt it this is the output:
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-6-f2a7a0f2bef0> in <module>
3 rdd = sc.parallelize(l)
4 people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
----> 5 schemaPeople = sqlc.createDataFrame(people)
/usr/local/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
305 Py4JJavaError: ...
306 """
--> 307 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
308
309 @since(1.3)
/usr/local/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
744
745 if isinstance(data, RDD):
--> 746 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
747 else:
748 rdd, schema = self._createFromLocal(map(prepare, data), schema)
/usr/local/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
388 """
389 if schema is None or isinstance(schema, (list, tuple)):
--> 390 struct = self._inferSchema(rdd, samplingRatio, names=schema)
391 converter = _create_converter(struct)
392 rdd = rdd.map(converter)
/usr/local/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
359 :return: :class:`pyspark.sql.types.StructType`
360 """
--> 361 first = rdd.first()
362 if not first:
363 raise ValueError("The first row in RDD is empty, "
/usr/local/spark/python/pyspark/rdd.py in first(self)
1376 ValueError: RDD is empty
1377 """
-> 1378 rs = self.take(1)
1379 if rs:
1380 return rs[0]
/usr/local/spark/python/pyspark/rdd.py in take(self, num)
1358
1359 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1360 res = self.context.runJob(self, takeUpToNumLeft, p)
1361
1362 items += res
/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
1049 # SparkContext#runJob.
1050 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1051 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1052 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
1053
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1253 proto.END_COMMAND_PART
1254
-> 1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
1257 answer, self.gateway_client, self.target_id, self.name)
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
983 connection = self._get_connection()
984 try:
--> 985 response = connection.send_command(command)
986 if binary:
987 return response, self._create_connection_guard(connection)
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command)
1150
1151 try:
-> 1152 answer = smart_decode(self.stream.readline()[:-1])
1153 logger.debug("Answer received: {0}".format(answer))
1154 if answer.startswith(proto.RETURN_MESSAGE):
/opt/conda/lib/python3.6/socket.py in readinto(self, b)
584 while True:
585 try:
--> 586 return self._sock.recv_into(b)
587 except timeout:
588 self._timeout_occurred = True
Referring to the tutorial I think that the SparkContext needs more Information to run correctly. Sadly it does not throw any errors when creating it. Is there anybody with more spark/kubernetes knowledge interested in trying it and sharing insights?
Edit:
The main problem seems to be that referring to the pyspark api, there is no way to provide the necessary information to the SparkContext.
Thank you very much!
Okay, I just figured out, that it is necessary to provide the self built docker images based on the executing environment (in this case the jupyter pyspark docker image) and to provide it to the SparkContext probably via the SparkConf object. I'll keep you posted.
Thanks for documenting what you learn @h4gen!
So, some progress over here. I did the following to build the images and provide the necessary information to the SparkContext:
docker run -i --rm -e GRANT_SUDO=yes \
-v /var/run/docker.sock:/var/run/docker.sock \ # This is important to expose the hosts docker daemon
jupyter/pyspark-notebook:5b2160dfd919 # Tag with spark 2.4
docker exec -it -u root <CONTAINER-ID> bash
Install docker in container following this example
Build executor images from env following the Spark on Kubernetes example
cd $SPARK_HOME
./bin/docker-image-tool.sh -r <repo> -t my-tag build
./bin/docker-image-tool.sh -r <repo> -t my-tag push
Feel free to skip these steps and use my pre-built images from docker hub for testing out yourself (assuming I made no mistakes so far):
idalab/spark
idalab/spark-r
idalab/spark-py
SparkConf on the user pod:conf.setMaster('k8s://https://<MASTER-IP>')
conf.set('spark.kubernetes.container.image', 'idalab/spark-py:spark')
conf.set('spark.submit.deployMode', 'cluster')
conf.set('spark.executor.instances', 2)
conf.setAppName('spark-k8s-test')
conf is provided to the SparkContext:sc = SparkContext(conf=conf)
Now I get the following error:
Exception Traceback (most recent call last)
<ipython-input-9-66f9c693822e> in <module>
----> 1 sc = SparkContext(conf=conf)
/usr/local/spark/python/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
113 """
114 self._callsite = first_spark_call() or CallSite(None, None, None)
--> 115 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
116 try:
117 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
/usr/local/spark/python/pyspark/context.py in _ensure_initialized(cls, instance, gateway, conf)
296 with SparkContext._lock:
297 if not SparkContext._gateway:
--> 298 SparkContext._gateway = gateway or launch_gateway(conf)
299 SparkContext._jvm = SparkContext._gateway.jvm
300
/usr/local/spark/python/pyspark/java_gateway.py in launch_gateway(conf)
92
93 if not os.path.isfile(conn_info_file):
---> 94 raise Exception("Java gateway process exited before sending its port number")
95
96 with open(conn_info_file, "rb") as info:
Exception: Java gateway process exited before sending its port number
Short google query leads to the asumption that this is a sudo problem in the user pod. Will investigate further the next days.
Cheers!
Hello everyone,
I further investigated the error and came up with a running configuration.
I am running the following code on my user pod to get spark running:
from pyspark import *
import os
# Config Spark
conf.setMaster('k8s://https://<MASTER_IP>:443') # The port is important. Otherwise it won't run.
conf.set('spark.kubernetes.container.image', 'idalab/spark-py:spark') # Provide the image for the executor pods
conf.set('spark.submit.deployMode', 'client') # Only client mode is possible
conf.set('spark.executor.instances', '2') # Set the number of executer pods
conf.setAppName('pyspark-shell')
conf.set('spark.driver.host', '<USER_POD_IP>') # This is the IP of the user pod in the K8s cluster
os.environ['PYSPARK_PYTHON'] = 'python3' # Needs to be explicitly provided as env. Otherwise workers run Python 2.7
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3' # Same
# Create context
sc = SparkContext(conf=conf)
So far everything I tried worked pretty nice. I will update this if I encounter any further problems.
Cheers!
Wieeee thank you so luch @h4gen! This may help me out personally a lot in the future!
A few further thoughts on this.
Right now the cluster admin has to provide the user with the <MASTER_IP> of the cluster to be able to connect to create the executor pods. Is there a more convenient way of asking for this IP for the user? As far as I understand the pods are assigned a DNS name like jupyter-username. Is there something similar for the K8s master? Or would it be possible for the hub to expose the <MASTER_IP> as environment variable in the user pod?
Currently the UI is not accessible. As far as I can see pangeo uses the nbserverproxy to expose the dask dashboard to the user. I think this would be also easy to do for the Jupyter Spark image. The question is whether the link for the dashboard in the SparkContext can be manipulated by configuration to make it more easily accessible for the user. Right now the user would have to type-in the correct link himself (which is annoying to explain for a lot of users).
some loose thoughts, no clear answer: writing from mobile
The hub.extraConfig can expand the dictionary found in c.KubeSpawner.environment, this will influence the env of spawned user pods if u need to find it programmatically.
but u van also use the charts singleuser.extraEnv to set it directly from the chart config.
if you have a k8s service pointing to some pod, u can reach it with mysvc.mynamespace.svc.cluster.local ar URI btw.
note that the the jupyter-username is a pod name, and u cannot access that as a network identifier like google.se. if u would need that u would need to create a service for each user pod pointing to a pod with certain labels.
i think the master may always be reached with a fixed ip on GKE and other managed k8s procided by some cloud provider btw
Kubernetes creates the environment variable KUBERNETES_SERVICE_HOST, which contains the master IP (the IP for the kubernetes service, actually).
See: https://kubernetes.io/docs/concepts/services-networking/service/#discovering-services
To get the pod IP, it is probably most convenient to use Kubernetes' downward API: https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
(You could also just call hostname -i, or write the equivalent Python code).
Slam dunk @dsludwig ! :D
This is a cool write up with lots of useful bits of information. Could you be persuaded to write it up once more now that you know what "the answer" is and then post it on https://discourse.jupyter.org/c/jupyterhub/z2jh-k8s? Our thinking is that the discourse forum is a lot more discoverable (and better indexed by google?) than GitHub issues. Something to ponder.
@dsludwig Thank you for the information. That was exactly what I was looking for. @betatim Sure thing! Will do it the next days.
Before I do the documentation I would like to ask again if somebody can give me a hint regarding the Spark UI problem. I assumed it would be working with nbserverextension, but it does not :( I sum up all Information, I have:
<POD_IP>:4040.../user/proxy/4040 but this _does not seem to work_npnetstat -pl:Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 localhost:51084 0.0.0.0:* LISTEN 23/python
tcp 0 0 localhost:42415 0.0.0.0:* LISTEN 26/python
tcp 0 0 localhost:58607 0.0.0.0:* LISTEN 26/python
tcp 0 0 localhost:39601 0.0.0.0:* LISTEN 23/python
tcp 0 0 localhost:34996 0.0.0.0:* LISTEN 27/python
tcp 0 0 localhost:41208 0.0.0.0:* LISTEN 27/python
tcp 0 0 0.0.0.0:8888 0.0.0.0:* LISTEN 7/python
tcp 0 0 localhost:58553 0.0.0.0:* LISTEN 23/python
tcp 0 0 jupyter-hagen:45243 0.0.0.0:* LISTEN 74/java
tcp 0 0 localhost:38241 0.0.0.0:* LISTEN 23/python
tcp 0 0 localhost:35746 0.0.0.0:* LISTEN 27/python
tcp 0 0 localhost:38050 0.0.0.0:* LISTEN 26/python
tcp 0 0 localhost:52964 0.0.0.0:* LISTEN 26/python
tcp 0 0 localhost:60869 0.0.0.0:* LISTEN 26/python
tcp 0 0 localhost:59910 0.0.0.0:* LISTEN 27/python
tcp 0 0 jupyter-hagen:42343 0.0.0.0:* LISTEN 74/java
tcp 0 0 localhost:47911 0.0.0.0:* LISTEN 26/python
tcp 0 0 0.0.0.0:4040 0.0.0.0:* LISTEN 74/java
tcp 0 0 localhost:35305 0.0.0.0:* LISTEN 27/python
tcp 0 0 localhost:40810 0.0.0.0:* LISTEN 23/python
tcp 0 0 localhost:36362 0.0.0.0:* LISTEN 23/python
tcp 0 0 localhost:43627 0.0.0.0:* LISTEN 74/java
tcp 0 0 localhost:45067 0.0.0.0:* LISTEN 27/python
tcp 0 0 localhost:57547 0.0.0.0:* LISTEN 26/python
Active UNIX domain sockets (only servers)
Proto RefCnt Flags Type State I-Node PID/Program name Path
One can see that the exposed Local Address has another format than the other ones which are accessible.
'_JAVA_OPTIONS' set to "-Djava.net.preferIPv4Stack=true" as I thought it would be an IPv6 problem which seems to be the standard for java but it did not resolve the issue.Any ideas on this? I'm a bit lost. Thank you!
@h4gen Can you submit this as an issue to nbserverproxy so we can discuss it there? A couple of simple things to try:
Not entirely sure if pangeo is sufficiently similar for this to be useful or not, but I'll throw it up here for the record. I've got Spark 2.4 on Kubernetes running nicely with standard Toree kernel and version 0.7 of the jupyterhub helm charts:
Create a config.yaml:
singleuser:
serviceAccountName: spark
image:
name: jupyter/all-spark-notebook
tag: 14fdfbf9cfc1
extraEnv:
SPARK_OPTS: >-
--deploy-mode=client
--master k8s://https://kubernetes.default.svc
--conf spark.kubernetes.namespace=`cat /var/run/secrets//kubernetes.io/serviceaccount/namespace`
--conf spark.driver.pod.name=${HOSTNAME}
--conf spark.driver.host=`hostname -i`
--conf spark.driver.port=19998
--conf spark.kubernetes.container.image=docker.io/7thsense/spark:spark24
--conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token
--conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Then install.
helm upgrade jhub --install jupyterhub/jupyterhub --namespace jhub --version 0.7.0 --values config.yaml
I've elided the spark RBAC setup but it's the usual stuff from the spark-on-kubernetes docs.
At least for me, figuring out how to use the service account credentials (and that they were needed) and getting the pod context configuration (namespace, pod name, ip) into the environment variable were tricky since most of the examples I've seen were using downward config maps which don't seem to "fit" into extraVars.
@easel This is super helpful! A few questions regarding your setup:
SPARK_OPTS at container start? I think this is pretty elegant.Hi @h4gen, answers per above:
I just built the images per the "spark on Kubernetes" docs at https://spark.apache.org/docs/latest/running-on-kubernetes.html#docker-images on my Mac -- the url's above are in a public repo though and you could just use them.
Spark ui works but you have to forward the port into the single-user pod, something like kubectl port-forward pod/jupyter-erik 4040:4040. It would be cool to get it wired back into the hub or an ingress somehow but I haven't done anything like that.
I may not fully understand your question. It just happens "magically", the extraVars pushes environment variables into the kernel and Toree picks it up. I guess maybe I got the idea from https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-an-apache-toree-scala-notebook ?
Thanks for the answers @easel. Few more questions:
Do you have to forward every uder pod by hand or is there a way to forward all automatically? Maybe jupyterhub/nbserverproxy#57 Is also interesting for you.
I see now how the variables are set. But weirdly it does not work for me because the shell commands are not evaluated but set as string. Example:
I set:
SPARK_OPTS: >-
--deploy-mode=client
--master=k8s://https://kubernetes.default.svc
--conf spark.driver.host=$(hostname -i)
--conf spark.kubernetes.container.image=idalab/spark-py:spark
--conf spark.ui.proxyBase=${JUPYTERHUB_SERVICE_PREFIX}proxy/4040
...
but on the pod this results in an environment variabel like:
'SPARK_OPTS': '--deploy-mode=client --master=k8s://https://kubernetes.default.svc --conf spark.driver.host=$(hostname -i) --conf spark.kubernetes.container.image=idalab/spark-py:spark --conf spark.ui.proxyBase=${JUPYTERHUB_SERVICE_PREFIX}proxy/4040
Does anybody know why this is the case?
Thank you!
@betatim Posted consolidated results here.
Having the same issue with spark.driver.host. How to set this dynamic value before the Docker's ENDPOINT executes? I am using KubeSpawner, if it is relevant.
@metrofun The best way I found to set this dynamically is to write to a spark-defaults.conf file when the container starts. In your JupyterHub configuration:
jupyterhub:
singleuser:
lifecycleHooks:
postStart:
exec:
command:
- "/bin/sh"
- "-c"
- |
echo "spark.driver.host=$(hostname -i)" >> $SPARK_HOME/conf/spark-defaults.conf
@h4gen were you able to find a way to automatically set SPARK_PUBLIC_DNS? I am running into the same issue where
jupyterhub:
singleuser:
extraEnv:
SPARK_PUBLIC_DNS: my.url.com${JUPYTERHUB_SERVICE_PREFIX}proxy/4040/jobs/
does not evaluate correctly, and setting environment variables in the postStart lifecycleHooks for the container doesn't seem to work either (it will result in an empty environment variable):
jupyterhub:
singleuser:
lifecycleHooks:
postStart:
exec:
command:
- "/bin/sh"
- "-c"
- |
export SPARK_PUBLIC_DNS="my.url.com${JUPYTERHUB_SERVICE_PREFIX}proxy/4040/jobs/"
I was thinking of opening a separate issue about setting environment variables that require other environment variables, but wanted to see if you had a solution first.
Edit: Of course the second solution won't work as that will only set the environment variable in the postStart shell. My workaround is to set SPARK_PUBLIC_DNS in the single user's Entrypoint script. I've opened an issue about this to see if this functionality is possible with extraEnv: https://github.com/jupyterhub/zero-to-jupyterhub-k8s/issues/1255
@h4gen have you updated this at all using updates over the past year or so?
Hi, @TheJaySmith . Sorry no updates from my side. We switched to kubeflow.
@h4gen thank you soo much for writing this up publically, too much work is repeated when we work in isolation! :heart:
At this point, I'll go ahead and close this issue as its becoming stale and doesn't involve an action point - it is still very findable with search engines though.
Lots of love from Sweden!
Most helpful comment
Hello everyone,
I further investigated the error and came up with a running configuration.
I am running the following code on my user pod to get spark running:
So far everything I tried worked pretty nice. I will update this if I encounter any further problems.
Cheers!