-
Pyspark Dynamic Frame에서 JSON을 요소로 가지는 배열 해체해서 컬럼으로 만들기 (make lists of JSON to columns of a dynamic frame)Data Engineering/AWS, Spark 2022. 12. 15. 01:31728x90반응형
데이터 전처리나 ETL을 하다보면 여러가지 형태의 데이터들을 파싱하게 되는데, 이번 포스팅에서는 pyspark dynamic frame에서 한 필드(행/컬럼)의 값이 array[json] 형태로 되어 있을 때 각 열의 array를 해체한 뒤 내부의 json의 키를 해당 dynamic frame의 새로운 필드로 설정하고 해당 키의 값을 필드의 값으로 넣는 과정을 설명하려고 한다.
글로 적으니까 되게 복잡한데 코드는 생각보다 짧다 (relationalize 메서드가 다 알아서 해줌)
1. DynamicFrame 클래스의 relationalize 메서드를 사용해 대상 dynamic frame과 1:N 관계를 갖는 JSON의 각 키를 필드로 가지는 새로운 dynamic frame을 생성한다.
2. 기존 dynamic frame과 새로 생성한 dynamic frame을 join한다.
Prologue. Relationalize 메서드 이해하기
DynamicFrame의 relationalize메서드는 dynamic frame내의 중첩된 필드를 해체하여 관계를 가지는 여러 개의 dynamic frame으로 만드는 메서드이다.
리턴 값은 dynamic frame의 집합인 DynamicCollection이 되며, 각 dynamic frame은 1:N 관계를 가진다.
예를들어, 다음 구조를 가지는 dynamic frame을 relationalize한다고 하자.
temp_df라는 이름의 dynamic frame의 params 필드는 1개 이상의 json을 요소로 가지는 배열을 값으로 가진다. 즉, params 한 행의 값은 다음과 같은 형태이다 (키 값은 위 이미지와 다르게 임의로 작성했다).
1. Realationalize하기
temp_df를 relationalize하면 다음과 같이 두 개의 dynamic frame을 담은 dynamic collection이 생성된다.
relationalize 후 각 dynamic frame의 스키마를 출력해보면 다음과 같다 (relationalize시 root_table_name을 param으로, 리턴되는 dynamic collection의 변수명을 param_collection으로 지정해주었다)
스키마를 보면 param이라는 dynamic frame에에 params가 array에서 long으로 바뀐 것을 볼 수 있고, param_params라는 dynamic frame에 기존 JSON에 들어있던 값들 외에 id가 추가된 것을 볼 수 있다.
값을 뽑아보면 기존 frame에 들어있던 array가 unique한 숫자로 대체되고, 그 값이 새로 생성된 frame의 id로 들어간 것을 확인할 수 있다.
즉, 기존 frame에 첫 번째 열에 [a,b], 두 번째 열에 [c,d]가 들어있었다 할 때, relationalize를 하면 [a,b]는 1, [c,d]는 2로 대체되고 새로 생성된 frame에 (1,a), (1,b), (2,c), (2,d)가 들어가서 이후 기존 frame의 필드와 새로운 frame의 필드를 기준으로 join을 할 수 있게 되는 것이다.
(물론 실제 메서드는 여기서 설명한 것과 같이 값을 기준으로 번호를 매기는 것이아니라 열을 기준으로 번호를 매기는 방식으로 수행될 것이다)
기존 dynamic frame에서 array가 unique한 숫자로 대체된 dynamic frame은 relationalize 메서드에서 "{root_table_name}"으로 지정해 준 키 값(여기 예시에서는 "param")으로, 새로 생성된 dynamic frame은 "{root_table_name}_{array가 담겨있던 field명}"(예시에 따르면 "param_params")으로 접근할 수 있다.
2. Join하기
우리가 최종 원하는 것은 하나의 dynamic frame이므로 저 두 프레임을 합쳐줘야한다. join 메서드는 다음과 같이 사용할 수 있다 (SQL join처럼 DynamicFrame1.기준키 = DynamicFrmae2.기준키로 join한다고 생각하면 된다).
이렇게 처음 array[json]형태에서 각 JSON의 키가 필드가 되고 값이 각 필드의 값으로 들어간 형태로 최종 변환되었다.
여기에서 필요하다면 rename_fields를 사용하여 필드명을 변경하는 작업 등을 수행할 수 있다.
Epliogue: DynamicFrame.Unnest 함수와의 차이
1. 함수의 목적 자체가 다름: Unnest의 경우 정규화가 목적이 아니라 중첩된 필드를 해체하는 것이 목적이다. 따라서 함수의 반환값이 DyanmicCollection이 아닌 하나의 DynamicFrame이다.
2. array는 중첩된 구조가 아닌 배열이기 때문에 unnest로 해체하지 못한다. 따라서 unnest함수의 결과값은 배열 내부의 계층만 파악할 뿐 dynamic frame의 스키마는 변화 없이 그대로 출력한다.
참고문서
728x90반응형'Data Engineering > AWS, Spark' 카테고리의 다른 글