Spark 3.0 에 포함될 Structured Streaming 관련 변화들

Jung-taek Lim
13 min readFeb 3, 2020

지난 주말, Spark 3.0.0 릴리즈를 위한 브랜치가 생성되었다. Spark 3.0 릴리즈를 위한 “feature freeze” 상태로 전환된 것이다. (예외 사항은 있을 수 있지만) 새로 제안될 신규 기능은 3.0.x 에는 추가되기 어려울 것 같고, 다르게 보면 Spark 3.0 에 적용될 새로운 변화들을 짚어보기에 적당한 시점이라고 할 수 있겠다.

나의 주된 관심사 및 공헌 분야는 Structured Streaming 이라서 이 분야에 도입될 변화들만 간단하게 정리해 보려고 한다. 다른 분야들은 3.0 출시 전후로 많은 글이 올라올 것 같기도 하고 잘 모르기도 하니… JIRA 기준으로 정리했기 때문에 SQL 로 분류되었지만 Structured Streaming 에도 영향을 끼치는 변화들도 제외되었다.

(DISCLAIMER: 공헌을 직접 하다 보니 아무래도 본인의 작업들이 다수 포함되어 있다. 덜 객관적으로 소개하거나 중요한 기능들이 소개에 누락되어 있을 수 있다. JIRA 기준으로 단순 버그패치나 단순 기능개선 외에는 최대한 담아보려고 했다.)

Kafka 관련 변화가 많으니 Kafka 부터 짚어 보자.

  • Kafka version upgrade

Kafka client version 이 현재 기준 2.4.0 으로 업그레이드 되었다. 하위 호환성을 잘 지원하고 있는 Kafka client 이기 때문에 하위 버전 Kafka 클러스터와도 문제가 없을 것으로 추측은 되지만 혹시 모르니 알아두면 좋을 것 같다.

  • Kafka delegation token

Kafka delegation token 을 지원한다. Source/Sink 둘 다 적용되며, 한 쿼리에 여러 Kafka cluster 를 연결하는 경우 또한 지원한다. Proxy user 는 Kafka 자체에서 아직 지원하지 않는 관계로 지원하지 않는다.

(외 다수)

  • Header 지원

레코드의 Header 정보를 얻거나 Kafka 로 기록할 수 있게 되었다.

Source 의 경우 옵션으로 켜고 끌 수 있으며 기본적으로는 꺼져 있다. (스키마가 변경되면서 기존 stateful query 들이 영향을 받을 수 있기 때문)

Sink 의 경우 column 이 추가되어 해당 column 에 값을 채워넣으면 레코드의 Header 로 기록된다.

  • Kafka Source 에서 timestamp offset 지정 가능

Source 에서 읽을 범위를 지정할 때 offset 값 및 earliest/latest 외에도 timestamp 를 입력할 수 있게 되었다. 내부적으로 Kafka 의 offset by timestamp 를 호출하므로 같은 동작을 기대하면 된다. 시작 및 끝 범위 지정에 선택적으로 지정 가능하다. (끝 범위는 스트리밍 쿼리에서는 기본적으로 무시되니 주의.)

  • Partition column 지원

선택적으로 Kafka 의 partitioner 를 사용하지 않고 row 별로 파티션 “번호" 를 직접 지정하여 Kafka 에 레코드를 기록할 수 있게 되었다.

  • consumer group ID 의 prefix/전체 지정 가능

옵션을 통해 consumer group ID 의 prefix 혹은 전체를 지정 가능할 수 있게 되었다. 기존에는 Spark 에서 지정한 prefix 에 UUID 를 적용해 랜덤하게 생성했다. 보안 팀에서 요구하는 경우가 많다고 한다 (group ID 로 제어하거나 하는 듯).

단, 전체를 지정하는 것은 굉장히 위험하므로 주의해서 사용해야 한다. (Spark 쿼리는 commit offset 관리를 직접 하는데, 토픽 내 모든 레코드를 읽는 것을 보증하기 위해서이다. 이 때 consumer group ID 가 공유되어 실행이 된다면… offset 관리도 꼬이고 레코드도 나누어 읽게 되어 부정확한 결과를 초래하게 된다.)

  • Consumer/Producer pool 변화

Consumer pool 의 경우 Apache Commons Pool 이 도입되어 pool 관련 메트릭을 JMX 를 통해 제공하고 기존 pool 구현이 구현 한계로 인해 정상적으로 다루지 못했던 케이스들을 다룰 수 있게 되었다. (예시: self-join)

Producer pool 의 경우 eviction 기준을 재정리하면서 producer 가 idle timeout 보다 길게 사용할 경우 cache 가 오동작하는 경우를 해결했다. (내부적으로는 Guava Cache 를 걷어내고 별도 구현체를 사용했다.)

다음으로 File Source/Sink 를 짚어 보면…

  • 처리가 완료된 소스 파일들을 삭제 혹은 보관 가능

Structured Streaming 쿼리에서 파일 데이터를 처리하는 경우 처리 단위가 파일이기 때문에 필연적으로 수많은 파일들이 처리를 위해 입력 디렉토리에 쌓이게 된다. 기존에는 이미 처리한 파일을 삭제하는 것이 사용자에게 맡겨져 있었고 처리한 파일을 확인하는 작업이 쉽지 않았다. (checkpoint 내 commit log 를 직접 분석해야 하는데 사용자가 파일 포맷까지 분석해서 이걸 해야 될까?)

이제 처리가 완료된 소스 파일들을 Spark 에서 안전하게 삭제하거나 지정한 디렉토리로 이동(보관) 할 수 있게 된다. 별도 쓰레드로 처리가 되어 쿼리 실행 속도에 영향을 크게 주지 않는다.

(외 다수)

File Source/Sink 는 DataSource V2 적용으로 재구현되었는데 기존과 다른 변화는 분석해 보지 않아서 잘 모르겠다. Spark+AI summit 2019 에서 다루어졌으니 궁금한 분들은 동영상을 참고하는 것도 방법이겠다.

마지막으로 Source/Sink 외 Structured Streaming 자체 변화를 짚어 보자.

  • Structured Streaming UI

DStream 에 한해 제공되던 Streaming 탭이 Structured Streaming 으로 포팅되었다. 기본 UI 를 DStream 에서 가져와서 대부분의 기능이 DStream 에서 제공되던 UI 와 비슷한 것으로 보인다. 마이크로 배치 처리 관련 UI 가 제공되지 않아 streaming query listener 를 통해 정보를 받아서 직접 분석해야 하는 어려움이 있었는데 사용자들에게 큰 도움이 될 것으로 기대가 된다.

  • Stream-Stream left outer join 관련 correctness 이슈 수정

Stream-Stream left outer join 사용 시 양 측에서 매칭되어 아웃풋으로 나간 데이터가 특정 조건에서 버그로 인해 한 번도 매칭되지 않은 것으로 오판단되어 null 과 조인되어 한 번 더 아웃풋으로 나가는 버그가 발견되었고, 해당 버그가 수정되었다. 수정을 위해 state 구조의 변화가 필요했고, 그 결과 하위 호환성을 지원하지 못하게 되었다. 기존 2.x 버전의 Stream-Stream join 쿼리를 실행한 checkpoint 를 3.0 에서 읽어들이는 경우 에러가 발생하고 아쉽게도 사용자는 checkpoint 를 버리고 데이터 재처리를 통해 처음부터 다시 실행해야 한다. (Stream-Stream inner join 은 영향을 받지 않는다.)

  • UNION 의 오른쪽 입력으로 stream-stream join 사용 시 crash 이슈 수정

UNION 의 오른쪽 입력으로 stream-stream join 이 사용될 경우, state 의 partition index 가 왼쪽 입력의 partition 수에 영향을 받는 버그가 발견되었다. partition 수는 고정되지 않고 배치 사이에 변경될 수 있는 것이라 정상 동작하던 쿼리가 갑자기 crash 될 가능성이 열려 있는 것이었다. 해당 버그 또한 수정되었는데, 마이그레이션 로직은 아직 제공되지 않았다. 현재로써는 위와 마찬가지로 checkpoint 를 버리고 재처리해야 하는데, 조건 자체가 까다롭기 때문에 아주 많은 사용자가 영향을 받진 않을 것으로 보인다. (실제로 커뮤니티에서 JIRA 이슈가 단 한 번 등록되었고 다른 유저의 댓글도 vote 도 없다)

  • Trigger.Once() 실행 시 Data Source 의 limit option 무시

Trigger.Once 는 batch 쿼리를 연속성 있게 실행할 수 있게 하는 특수한 트리거로 기획되었다. 예를 들면 하루에 한 번씩 쿼리를 실행하되 (batch) 데이터를 읽을 위치, 실행 결과 및 상태가 쿼리 실행 간에 연속성 있게 (streaming) 하는 것이 이 트리거의 목적이라고 할 수 있는데, 이런 목적을 제대로 달성하기 위해선 Data Source 의 limit option 이 적용되지 않아야 한다. (일반적으로 Data Source 가 제공하는 limit option 은 개별 micro-batch 가 적당한 양의 데이터를 입력으로 취해 짧게 동작하게 할 수 있게 하는 장치인데 Trigger.Once 는 처리 주기 중간에 누적된 데이터를 모두 처리해야 목적에 부합하게 된다.) 이 부분이 수정되었다.

그 외에도 이벤트 로그 관련 메이저 기능 추가가 있었는데, 주로 스트리밍 쿼리에 도움이 될 변화라 잠깐 소개하자면…

이벤트 로그 파일을 사용자가 지정한 크기로 잘라서 저장할 수 있게 된다. (driver 영역) 그리고 실행 종료된 Job 이나 executor, SQL execution 에 대한 이벤트를 제외하고 다시 기록하는 방식으로 오래된 이벤트 로그 파일들을 축약해서 저장할 수 있게 된다. (History Server 영역)

특히 축약 기능은 스트리밍 어플리케이션 실행 시 꽤 도움이 될 수 있을 것으로 기대하고 있다. 스트리밍 어플리케이션의 특성상 수많은 Job 들이 실행 종료된 상태로 남기 때문에 축약 기능이 로그가 차지하는 공간을 엄청나게 줄일 수 있다.

--

--