이제 k3s 클러스터에 MinIO 도 설치되었고 private docker registry 도 설정되었다. 이제 k3s 클러스터에 Spark 어플리케이션을 실행해 보자. (나중에는 remote shuffle service 도 설정해보고 할 예정인데 아직은 k8s 자체도 잘 모르니 쉬운 것부터…)

최신 버전인 Spark 3.0.1 의 바이너리를 다운로드받고 (spark-3.0.1-hadoop-3.2 로 사용함) <SPARK_HOME> 이라고 하자.

실행해 보니 bounty castle 관련 라이브러리가 없다는 에러가 나타나서 다운로드 받아서 넣어주는 것으로 했다. Spark docker image 를 다시 만들때에도 포함이 된다.

cd <SPARK_HOME>/jarswget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk15on/1.67/bcpkix-jdk15on-1.67.jar
wget https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk15on/1.67/bcprov-jdk15on-1.67.jar

이제 image 를 빌드하고 push 해 보자. (pyspark 을 사용할 예정이라 해당 Dockerfile 을 사용하였다. Spark k8s 가이드 문서에 명시되어 있다.)

이전 문서에서 설정한 대로, http://10.38.12.80:5000 이 private docker registry 의 endpoint 라고 하자.

sudo ./bin/docker-image-tool.sh -r 10.38.12.80:5000 -t spark-3.0.1-bin-hadoop3.2-with-bounty-castle-20201221-v1 -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile buildsudo ./bin/docker-image-tool.sh -r 10.38.12.80:5000 -t spark-3.0.1-bin-hadoop3.2-with-bounty-castle-20201221-v1 -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile push

private docker registry 에 image 가 push 되었다.

실행하기 전에, service account 를 만들고 cluster role binding 을 해당 account 에 설정해 Spark driver pod 이 executor pod 을 띄울 수 있게 해 준다.

kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default

그리고 private docker registry 에 인증할 수 있게 secret 을 만들어 준다.

kubectl create secret docker-registry regcred --docker-server=http://10.38.12.80:5000 --docker-username="<docker registry username>" --docker-password="<docker registry password>" --docker-email="<email address>"

여기서 만든 regcred 는 imagePullSecrets 에 제시되어야 한다. Spark 은 해당 설정을 노출하고 있어서 옵션으로 넣어주면 된다.

아래는 SparkPi 를 실행하는 명령이다. 실행 전에 mc mb minio-k3s/spark-upload 로 필요한 bucket 을 생성하는 것도 잊지 말자.

<SPARK_HOME>/bin/spark-submit \
--master k8s://https://10.38.12.80:6443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=10.38.12.80:5000/spark:spark-3.0.1-bin-hadoop3.2-with-bounty-castle-20201221-v1 \
--conf spark.kubernetes.container.image.pullSecrets=regcred \
--conf spark.kubernetes.local.dirs.tmpfs=true \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--packages org.apache.hadoop:hadoop-aws:3.2.0 \
--repositories https://repo1.maven.org/maven2/ \
--conf spark.kubernetes.file.upload.path=s3a://spark-upload \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.access.key="heartsavior" \
--conf spark.hadoop.fs.s3a.secret.key="heartsavior" \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint=http://10.38.12.116:9000 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
file://<SPARK_HOME>/examples/jars/spark-examples_2.12-3.0.1.jar

실행하면 Yarn 에 Spark application 을 제출할 때 처럼 pending 이 지속되다가 running 혹은 error 가 발생하고 application 이 끝날 때까지 멈추지 않는다. 다른 콘솔 창을 띄워서 k9s 를 실행하고 pods 를 지켜보면 driver 가 실행되고, 이어 executor 가 실행된다. driver pod 을 kill 하면 executor pod 은 자연 종료된다. Spark PI 는 계산이 끝나면 종료되고, 상태는 completed 로 남는다. pod 이 자동으로 사라지지 않아 로그를 확인하거나 설정 등을 차후에 확인할 수 있지만, 실행 후에는 지워주는 게 좋겠다.

이제 기본적인 Spark application 실행이 가능해졌다. jar 로 실행하는 어플리케이션은 그냥 이 틀에 맞춰 실행해도 될 것이다. 최종 목표는 Structured Streaming query 를 실행하는 거였고 내가 갖고 있는 쿼리들은 PySpark 으로 구현되어 있으므로 거기에 맞게 명령을 조금 바꾸기로 한다.

그 전에, 기왕에 Structured Streaming query 를 실행하는 거니까 Kafka 도 활용해 보려고 한다. 그래봐야 데탑 한 대에 전부 띄우는 거고 적당히 동작하는지 확인하는 용도니까 브로커 1개짜리 최소 설치로 진행한다. 그래도 k8s 클러스터가 있으니 Kafka 도 k8s 에 올려보기로 하자.

역시 쉽고 편하게 간다. strimzi 의 kafka operator 를 사용한다.

https://strimzi.io/quickstarts/

kubectl create namespace kafka
kubectl apply -f '<https://strimzi.io/install/latest?namespace=kafka>' -n kafka
kubectl apply -f <https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml> -n kafka

이대로 설치하면 ClusterIP 로 bootstrap address 가 노출된다.

kubectl get service -w my-cluster-kafka-bootstrap -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 10.43.152.51 <none> 9091/TCP,9092/TCP,9093/TCP 5d

호스트 서버에서도 사용할 수 있게 external IP 를 따서 쓰고 싶지만 external IP 를 이용해 bootstrap address 를 지정해도 결국 broker advertised host/port 를 사용하게 되어 접근이 불가능해진다. Kafka 를 제대로 k8s 에서 돌리는 게 목적은 아니니까 이 정도에서 그치고 넘어가자.

확인한 걸로는 Spark 3.0.1 의 PySpark 은 cluster mode 에서 driver 의 PYTHONPATH 가 py-files 를 반영하지 못하는 것으로 보였다. branch-3.1 을 직접 빌드해서 테스트해 보니 잘 되는 것 같다. PySpark 부분은 직접 빌드한 3.1.0-SNAPSHOT 으로 진행하자.

git clone https://github.com/apache/spark.git
git checkout branch-3.1
mvn clean install -DskipTests -Pkubernetes -Phadoop-3.2
dev/make-distribution.sh -Pkubernetes -Phadoop-3.2

여기서 dist 디렉토리가 생성되는데 이 디렉토리를 적당한 위치로 옮기자. (향후에는 SPARK_3.1.0_SNAPSHOT_HOME 으로 지칭)

위에서 했던 것과 마찬가지로 bounty castle 의 jar 들을 받아서 jars 디렉토리에 넣어 주고, image 를 build 하고 push 해 주자. (tag 는 조금 바꿔주는 게 좋겠다. 아래에서는 spark-3.1.0-bin-hadoop3.2-SNAPSHOT-with-bounty-castle-20201221-v1 로 설정한 것으로 간주)

내가 링크한 Structured Streaming 프로젝트는 clone 받아서 make clean -> make all 하면 dist 디렉토리에 파일들이 생성된다. Python 3.6 에서 테스트했다.

프로젝트 디렉토리에서 아래의 명령을 실행하자. (checkpoint 도 S3 에 넣도록 설정할 예정인데 bucket 을 따로 파서 사용하고자 한다. MinIO 에 ‘spark-checkpoints’ bucket 을 만들어 주자.)

<SPARK_3.1.0_SNAPSHOT_HOME>/bin/spark-submit \
--master k8s://https://10.38.12.80:6443 \
--deploy-mode cluster \
--py-files ./dist/jobs.zip,./dist/libs.zip \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=10.38.12.80:5000/spark-py:spark-3.1.0-bin-hadoop3.2-SNAPSHOT-with-bounty-castle-20201221-v1 \
--conf spark.kubernetes.local.dirs.tmpfs=true \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--jars /home/heartsavior/.m2/repository/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.0-SNAPSHOT/spark-sql-kafka-0-10_2.12-3.1.0-SNAPSHOT.jar,/home/heartsavior/.m2/repository/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.0-SNAPSHOT/spark-token-provider-kafka-0-10_2.12-3.1.0-SNAPSHOT.jar,/home/heartsavior/.m2/repository/org/apache/kafka/kafka_2.12/2.6.0/kafka_2.12-2.6.0.jar,/home/heartsavior/.m2/repository/org/apache/kafka/kafka-clients/2.6.0/kafka-clients-2.6.0.jar,/home/heartsavior/.m2/repository/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar \
--packages org.apache.hadoop:hadoop-aws:3.2.0 \
--repositories https://repo1.maven.org/maven2/ \
--conf spark.kubernetes.file.upload.path=s3a://spark-upload \
--conf spark.hadoop.fs.s3a.access.key="heartsavior" \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.secret.key="heartsavior" \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint=http://10.38.12.116:9000 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
file://$PWD/dist/main.py \
simple_query --source-format=rate --sink-format=kafka \
--source-option-rows-per-second=100 \
--sink-option-bootstrap-servers=10.43.152.51:9092 \
--sink-option-topic="ss-output" \
--checkpoint-location=s3a://spark-checkpoints/checkpoint-query-1

SNAPSHOT 빌드를 사용하므로 packages 로 spark-sql-kafka 를 지정할 수 없어서 jars 에 transitive dependencies 를 모두 넘겨 주어야 한다. mvn install 를 먼저 실행했으므로 jar 파일들은 .m2 에 이미 마련되어 있다. 찾아서 jars 옵션에 넣어 준다.

잘 실행이 됐다면 Kafka 에 ss-output topic 이 만들어지고 데이터가 유입되고 있을 것이다.

여기까지가 몇 주 간의 삽질 일기이다. 대충 넘긴 게 많은 만큼 아직 많은 TODO 가 남아 있다. 스토리지 쪽은 MinIO 에 완전히 미뤄놓았지만 막상 MinIO 는 기본 스토리지 설정을 사용하고 있는데 이게 pod 이 실행된 노드에 500G 를 요구하는 것 같다. (없다고 실행이 안되는 것도 아니지만, 그래도 그냥 찜찜…) private docker registry 는 기본적으로 TLS 설정을 하는 만큼 결과적으로는 TLS 설정을 하고 workaround 들을 치워내는 게 좋겠고, 이왕 한 김에 MinIO 도 TLS 설정을 하면 좋겠다. Kafka 도 broker 의 advertised host/port 를 설정해서 호스트 서버에서 접근 가능하게 하면 좋겠고… Spark 도 규모가 있는 쿼리는 spark.kubernetes.local.dirs.tmpfs=true 를 치우고 PV 를 붙여줘야 될 것 같고…

TODO 가 해결되면 해결되는 대로 문서를 업데이트하거나 추가 문서를 또 올릴 생각이다. 그 전에 k8s 자체에 대한 학습을 먼저 할 예정이지만…

--

--