Kubernetes 에서 Spark 배치 쿼리로 HDFS 없이 S3 에 기록하기
개발 환경 셋업의 마무리로 k8s 에서 Structured Streaming 쿼리로 결과를 파일 (S3), Kafka 등에 기록하는 것을 테스트했었다. 많은 TODO 를 남겼지만 일단 정상 동작하는 것을 확인해서 Spark 은 얼추 실행 테스트를 마쳤구나 생각했는데, 배치 쿼리는 파일 싱크에 기록할 때 기록 방법이 조금 다르다는 것을 나중에 깨달았다.
Structured Streaming 파일 싱크는 아웃풋 디렉토리 아래에 메타데이터 디렉토리를 만들고 거기에 각 마이크로 배치 별로 기록이 정상적으로 완료된 파일들의 리스트를 파일에 기입한다. 개별 task 가 아웃풋 파일들을 쓰고, task 가 정상적으로 완료되면 driver 로 파일 리스트를 넘기고 driver 가 모든 task 에 대해 정상 완료를 수신하면 전체 파일 리스트를 기록하는 방식으로 작동한다.
다른 쿼리에서 해당 아웃풋 디렉토리를 읽을 때, 메타데이터 디렉토리에 있는 파일들을 읽어서 실제 아웃풋 파일 리스트를 뽑고, 실제 파일들을 읽게 되어 있다. End-to-end exactly-once 보장을 위한 방법인데, Spark 만 메타데이터 디렉토리를 인지할 수 있기 때문에 Spark 으로 읽어야 기록하다 실패한 파일이라던지 여타 문제 있는 파일들을 제외하고 읽을 수 있다. (메타데이터 디렉토리를 무시하고 아웃풋 디렉토리를 읽으면 at-least-once 이고 깨진 파일들도 존재할 수 있어 엔진이 오류를 무시할 수도 있어야 한다.)
배치의 파일 싱크 기록은 일반적으로 지정된 임시 디렉토리에 개별 task 가 아웃풋 파일들을 쓰고 모든 task 가 정상 완료되면 driver 가 그 임시 디렉토리를 최종 디렉토리로 옮기는 방식으로 진행된다. (file output committer algorithm version 1) version 2 는 driver 가 개입하지 않고 개별 task 가 완료될 때 자신이 작성한 아웃풋 파일들을 최종 디렉토리로 옮기는 것으로 알려져 있다. 다른 말로 하면, 몇몇 task 들이 먼저 성공하고 특정 task 가 실패하면 최종 디렉토리에 “일부” 아웃풋 파일들이 노출되고 전체 쿼리는 실패한다는 뜻이다. 아웃풋 디렉토리를 바로 읽어들이는 쿼리가 동작 중이라면 이 “불완전한” 아웃풋을 읽어 간다는 이야기. 그래서 Spark 은 Hadoop 이 version 2 를 기본 설정으로 함에도 별도 기본값을 version 1 로 유지하고 사용자가 덮어쓸 수 있게 하고 있다. (3.0.2/3.1.0 에 포함 예정이고 이전 버전들은 Hadoop 기본 설정을 따라가므로 주의!)
이건 HDFS 호환 파일 시스템일 때의 이야기이다. version 1 은 디렉토리 rename 이 atomic 인 파일 시스템에서만 문제 없이 작동한다. 현재 셋업은 HDFS 가 구성되어 있지 않고 구성할 생각도 없다. S3 에 기록하는 게 목표이다. (정확히는 MinIO 겠지만…) 전용 committer 가 있을까?
있다. S3A committer.
이것도 크게 보면 세 종류 (file, staging, magic) 인데 file committer 는 위의 그 file output committer 같으니 무시하고, staging 은 클러스터 파일시스템을 필요로 한다. (예를 들어 HDFS) 얼추 대충 보니 클러스터 파일시스템에 먼저 기록하고 모든 기록이 성공하면 S3 로 업로드 하는 것으로 보인다.
HDFS 없이 S3 에 기록하는 게 목표라면, magic committer 외에는 선택의 여지가 없다. magic committer 는 파일시스템이 strong consistent 해야 정상 동작하고 그래서 S3 Guard 를 필수로 사용해야 한다고 명시되어 있는데, 아마존의 올해 발표로 S3 Guard 는 사실상 관짝에 들어가게 되었다. S3 호환 파일 시스템들이 strong consistent 한지는 몰라도 S3 자체는 magic committer 를 사용할 수 있는 것으로 보인다.
(이름이 magic 인 만큼 어떻게 동작하는 지 문서에도 딱히 설명된 게 없다… 디테일은 나중에 한 번 찾아보기로 하고…)
magic committer 를 사용하도록 설정해서 아래의 아주 단순한 배치 쿼리를 실행해 보자.
import sys
from pyspark.sql import SparkSessiondef main(output_path):
spark.range(100000).repartition(100).write.json(output_path)
df = spark.read.json(output_path)
df.show()if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL test") \
.getOrCreate()path = sys.argv[1]
print("path: %s" % path)main(path)
S3A committer 를 사용하려면 spark-hadoop-cloud 모듈에 있는 클래스를 사용해야 하는데 내가 아는 게 맞다면 해당 모듈은 (바뀔지 모르겠지만) 기본 프로파일로는 포함되지 않는다. 다시 말해서, 현재 시점에서는 직접 빌드를 해야 된다는 뜻이다.
어차피 직접 빌드해야 되는 마당이니 3.1 브랜치를 빌드하자. (k8s 지원은 Spark 3.1 에서 GA 가 되기 때문에 아직 릴리즈 되지 않았어도 3.1 로 테스트하는 게 사서 고생을 덜할 수도 있다.)
git clone https://github.com/apache/spark.git
git checkout branch-3.1
mvn clean install -DskipTests -Pkubernetes -Phadoop-3.2 -Phadoop-cloud
dev/make-distribution.sh -Pkubernetes -Phadoop-3.2 -Phadoop-cloud
dist 디렉토리를 적당히 복사하고, 해당 디렉토리로 이동한다.
그리고 docker image 를 만들고 private docker registry 에 밀어넣자.
sudo ./bin/docker-image-tool.sh -r 10.38.12.80:5000 -t spark-3.1.0-SNAPSHOT-bin-hadoop3.2-with-hadoop-cloud-20201223-v1 -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile buildsudo ./bin/docker-image-tool.sh -r 10.38.12.80:5000 -t spark-3.1.0-SNAPSHOT-bin-hadoop3.2-with-hadoop-cloud-20201223-v1 -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile push
hadoop-cloud 프로파일은 hadoop-aws 와 transitive dependencies (amazon-sdk 등등) 들을 포함하므로 더 이상 packages 란에 넣어 주지 않아도 된다. (처음부터 이렇게 했었어야 했나보다…)
bounty castle 관련 라이브러리가 없다는 에러는 여전히 발생한다. submit 할 때에만 있어도 되는 것으로 보인다. 예전 문서에서 언급한 것 처럼 다운로드 받아서 넣어준다. Spark docker image 에는 포함되지 않아도 되는 것 같다.
실행은 아래와 같이 한다. (test_batch.py 는 위의 배치 쿼리 코드가 기록된 파일)
./bin/spark-submit \
--master k8s://https://10.38.12.80:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.container.image=10.38.12.80:5000/spark-py:spark-3.1.0-SNAPSHOT-bin-hadoop3.2-with-hadoop-cloud-20201223-v1 \
--conf spark.kubernetes.pyspark.pythonVersion=3 \
--conf spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory \
--conf spark.hadoop.fs.s3a.committer.name=magic \
--conf spark.hadoop.fs.s3a.committer.magic.enabled=true \
--conf spark.hadoop.fs.s3a.buffer.dir=/opt/spark/work-dir/s3a-buffer \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.file.upload.path=s3a://spark-upload \
--conf spark.hadoop.fs.s3a.access.key="heartsavior" \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.secret.key="heartsavior" \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint=http://10.38.12.80:9000 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol \
--conf spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp/ivy-cache -Divy.home=/tmp/ivy" \
./test_batch.py s3a://batch-output/query-output-1
/opt/spark/work-dir 은 Spark docker image 가 생성하는 디렉토리이다. 일단 이 공간을 사용하되, 나중에는 NFS 에서 동적으로 할당받아서 쓰려고 한다. (k8s 노드에 디스크 공간 할애를 최대한 적게 하고, NFS 를 통해 호스트 서버의 디스크를 무작위로 사용하게 하려고 함.)
magic committer 사용 관련 변경이 있는 설정들만 다시 모아 보면…
--conf spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory \
--conf spark.hadoop.fs.s3a.committer.name=magic \
--conf spark.hadoop.fs.s3a.committer.magic.enabled=true \
--conf spark.hadoop.fs.s3a.buffer.dir=/opt/spark/work-dir/s3a-buffer \
--conf spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol \
--conf spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter \
이 정도이다. 하둡 설정 관련은 아래 문서를 참조하고…
Spark 설정 관련은 아래 문서를 참조했다.
실제로 실행해 보면, 아웃풋 디렉토리에 아래와 같이 파일들이 생성된다.
mc ls minio-k3s/batch-output/query-output-8/
[2020-12-23 17:53:03 KST] 12KiB _SUCCESS
[2020-12-23 17:52:54 KST] 13KiB part-00000-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:52:54 KST] 13KiB part-00001-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:52:55 KST] 13KiB part-00002-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:52:55 KST] 13KiB part-00003-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:52:55 KST] 13KiB part-00004-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:52:55 KST] 13KiB part-00005-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
...
[2020-12-23 17:53:03 KST] 13KiB part-00095-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:53:03 KST] 13KiB part-00096-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:53:03 KST] 13KiB part-00097-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:53:03 KST] 13KiB part-00098-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
[2020-12-23 17:53:03 KST] 13KiB part-00099-0d07120a-af31-46d8-8649-bdada9dd4246-c000.json
_SUCCESS 파일에 아웃풋에 대한 메타데이터가 기록되어 있다. 한 번 직접 확인해 보자.
mc cat minio-k3s/batch-output/query-output-8/_SUCCESS
이로써 magic committer 를 사용하면 k8s 노드들의 로컬 파일 시스템만 활용해서 Spark 를 통해 S3 에 파일 기록이 가능함을 확인할 수 있었다.