Spark flatMapGroupsWithState API 를 이용한 “이벤트 타임” 세션 윈도우 구현

Jung-taek Lim
7 min readOct 22, 2018

현재 Spark 2.3.x 기준으로 Spark 는 map/flatMapGroupsWithState API 를 이용하여 세션 윈도우를 구현하도록 권장하고 있으며, 이에 대한 예시 구현을 제공하고 있다.

예시 구현은 mapGroupsWithState API 의 아주 단순한 활용 사례이기 때문에 해당 구현을 이해할 수 있어야 제시하는 구현도 이해할 수 있을 것이다. 해당 코드를 처음 본다면 시간을 내어 코드와 map/flatMapGroupsWithState API 문서를 먼저 읽어보기 바란다.

예시 구현은 프로세싱 타임을 기준으로 세션 윈도우를 정의하고 있다. 프로세싱 타임의 경우에는 늦은 이벤트가 들어오지 않기 때문에 특정 시점에 유효한 세션은 그룹 키당 단 하나만 존재하며, 예시 코드처럼 아주 단순한 타임아웃 처리로도 구현이 가능하다.

이벤트 타임의 경우에는 늦은 이벤트가 들어올 수 있기 때문에 더 많은 경우를 고려해야 한다. 예를 들어 단순히 세션의 간격이 10 초인 세션 윈도우를 상정하고 현재 아래와 같은 세션들이 있다고 가정해 보자. 세션 표현은 (세션 시작 시간, 세션 끝 시간) 으로 하자.

(30, 40) (45, 55) (70, 80)

이벤트 타임의 경우 늦게 도착하는 이벤트 유입이 허용되기 때문에 워터마크의 진행에 따라 여러 세션들이 특정 시점에 동시에 유효할 수 있다.

늦게 도착하는 이벤트들이 허용된다고 하면, 새로운 이벤트가 만들어내는 경우의 수는 프로세싱 타임이 만들어내는 2가지 (기존 세션을 확장, 맨 뒤에 새로운 세션 추가) 가 아니라 총 4가지가 된다.

  1. 맨 앞에 새로운 세션 추가
  2. 기존 세션을 확장
  3. 기존 세션을 확장 & 확장된 세션이 다음 세션과 합쳐져 재확장
  4. 맨 뒤에 새로운 세션 추가

하나씩 살펴보자. 1번의 예시는 시간이 15 인 이벤트가 유입되는 것이다. 이벤트가 적용되었을 때 세션들은 아래와 같이 업데이트된다.

(15, 25) (30, 40) (45, 55) (70,80)

2번의 예시는 시간이 32 인 이벤트가 유입되는 것이다. 업데이트된 세션들은 아래와 같다.

(30, 42) (45, 55) (70, 80) // 세션 (30, 40) 와 (32, 42) 가 병합 되어 확장

4번의 예시는 시간이 85 인 이벤트가 유입되는 것이다. 업데이트된 세션들은 아래와 같다.

(30, 40) (45, 55) (70,80) (85, 95)

3번은 1, 2, 4 에 비해서 덜 직관적이고 놓치기 쉽다. 3번의 예시는 시간이 37 인 이벤트가 유입되는 것이다. 업데이트된 세션들은 아래와 같다.

(30, 55) (70, 80) // (30, 40) 과 (37, 47) 이 merge 되어 (30, 47) 로 확장, (30, 47) 과 (45, 55) 가 merge 되어 (30, 55) 로 확장

경우의 수는 살펴보았으니 실제로 state function 을 어떻게 구현해야 할 지 살펴보자. 시작 전, GroupsWithState 클래스 문서 를 먼저 한 번 정독하는 것을 추천한다. (약간 길지만, 주의해야 할 점과 어떻게 동작하는지에 대한 정보가 상세히 적혀 있다.) 문서에 대한 이해가 있으면 왜 이렇게 구현을 했는지에 대한 추측을 쉽게 할 수 있다.

먼저, 세션의 저장용 중간 상태와 출력 상태를 정의해야 한다. 위에서도 언급했지만, 여러 세션들이 동시에 존재할 수 있고 state 에 기록되어야 한다. 즉, 여러 개의 중간 상태들을 저장해야 한다. 최적화된 구현체가 있을 수 있겠지만, 여기서는 단순히 리스트에 기록하기로 하자.

state function 은 크게 두 부분으로 나눌 수 있다. (1) 유입된 데이터를 반영하는 부분, (2) 워터마크가 지나간 유효하지 않은 세션들을 내보내는 부분이다.

코드가 길어 별도 라인 단위 설명은 하지 않고, 적용된 알고리즘과 놓치기 쉬운 부분들에 대한 설명만 하려 한다. 상세한 내용은 링크된 코드를 읽어보기 바란다. (코드와 설명을 같이 읽는 독자들을 위해 코드를 먼저 링크한다. 설명 마지막에 링크가 한 번 더 나온다.)

(1) 부터 살펴보자.

세션 윈도우를 가장 직관적으로 처리하는 방법은 기존 세션 및 이벤트를 정렬한 다음 세션 병합을 적용하는 것이다. 하지만 클래스 문서에는 “이벤트에 대한 정렬은 보장되어 있지 않다" 고 명시되어 있다. 즉, 기존 세션 리스트가 정렬되어 있어도 병합 정렬은 사용할 수 없다. 그러므로 차선으로 기존 세션 리스트는 정렬 상태를 유지하고 삽입 정렬 형태로 이벤트를 기존 세션 리스트에 반영하도록 한다.

반영시 주의할 점이 있다면, 경우의 수 2 번처럼 이벤트가 기존 세션 리스트에 포함되어 세션이 확장되는 경우, 경우의 수 3 번을 다루기 위해 확장된 세션이 앞/뒤 세션과 겹치는지 다시 한 번 확인하고 겹치는 경우 세션을 병합해 줘야 한다는 것이다. 병합 시에 하나의 기존 세션이 삭제되므로 이 부분에 대한 처리도 해 주어야 한다.

모든 이벤트를 반영한 후, 유효하지 않은 세션을 내보내기 위한 타임아웃을 설정해야 한다. 클래스 문서에서도 알 수 있듯이 타임아웃을 설정하지 않으면, watermark 진행으로 인해 유효하지 않은 세션이 발생해도 해당 key 에 대한 이벤트가 유입되지 않으면 세션을 내보낼 수 없다.

여기서 주의해야 할 사항은 state 에 대한 타임아웃은 하나만 설정 가능하다는 것이다. 모든 세션들의 세션 끝 시간에 타임아웃을 설정하는 것이 불가능하다. 우리는 이 기능을 최소한의 트리거 정도로 활용하고, 타임아웃이 트리거되었을 때 유효하지 않은 세션을 직접 찾아서 모두 내보내는 것으로 대응하도록 한다. 타임아웃을 맨 처음 세션의 세션 끝 시간으로 설정한다. 여기서 하나 더 염두에 둘 사항은 watermark 가 타임아웃 시간을 ‘지나가야' 트리거된다는 것이다 .

이제 결과만 정의하면 된다. Append mode / Update mode 에 맞게 적당히 결과를 반환하도록 한다. 여기서는 Update mode 인 경우에만 결과가 발생한다. 필자는 Update mode 에서 실제로 업데이트가 일어난 세션들만 반환하기 위해 로직이 약간 복잡해졌는데, 단순히 특정 키에 업데이트가 일어난 경우 모든 세션을 반환하는 것으로 정의하면 로직이 단순해진다. (의미론 상 세션이 그룹 키에 포함되는 것으로 보아 “그룹키 + 세션” 을 기준으로 변경 사항에 대해 처리했다.)

이제 (2) 에 대해 살펴보자. 위에서도 언급했듯이, 우리는 타임아웃이 트리거되었을 때 유효하지 않은 세션을 직접 찾아서 모두 내보낼 것이다. 세션들이 정렬되어 있다는 점을 활용하면 (유효하지 않은 세션 목록, 유효한 세션 목록) 으로 쉽게 두 부류로 가를 수 있다. 유효한 세션 목록이 존재하지 않는다면, state 를 삭제할 수 있다. 유효한 세션 목록이 존재한다면, state 를 업데이트하고, 타임아웃을 같은 방법으로 다시 적용한다.

그리고 (1) 과 같은 방법으로 결과를 정의한다. 여기서는 Append mode 인 경우에만 결과가 발생한다.

구현된 코드는 아래와 같다. 위의 설명과 비교해 보면서 코드를 읽으면 더 쉽게 이해할 수 있을 것이다.

https://gist.github.com/HeartSaVioR/9a3aeeef0f1d8ee97516743308b14cd6#file-eventtimesessionwindowimplementationviaflatmapgroupswithstate-scala-L32-L189

위의 코드를 이해하면 단순 시간 간격으로 정의된 세션 윈도우 외에도 “세션 끝” 이벤트 등을 반영하거나 이벤트 별로 시간 간격이 다르게 정의되는 세션 윈도우 등을 큰 틀에서 비슷하게 구현할 수 있다.

… 마치며 …

ps. 단순 시간 간격 세션 윈도우의 경우 현재의 타임 윈도우와 동일한 방법으로 사용 가능하도록 하는 패치가 제안되어 있습니다. 위에 링크된 코드에도 SPARK-10816 으로 예시가 포함되어 있습니다. 해당 기능(또는 구현) 에 관심이 있으면 이슈 페이지 SPARK-10816 를 방문해 주시고 VOTE 도 해 주시면 감사하겠습니다. (이상 광고였습니다…!?)

--

--