ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Trouble shooting] Glue에서 사용자 정의 함수(udf) 만들어서 사용하기 (+apply, map, applymap 함수의 차이)
    Data Engineering/AWS, Spark 2022. 7. 4. 00:43
    728x90
    반응형

     

    *바로 udf코드를 보고싶다면 아래의 "Glue UDF 적용하기"부터 읽어주세요

     

     Redshift에 json으로 데이터가 쌓이고 있는 테이블이 있어서 이 테이블에서 데이터를 가져와 → 파싱 → 다른 데이터들과 함께 새로운 분석용 테이블을 만드는 ETL 스크립트를 Glue로 작성하고 있던 중이었다.

     사실 기존에 유사한 작업으로 생성되어있는 Job이 있어 거기에서 조금만 고치면 되는데 그 Job은 SQL로 작성이 되어있었고, 2주의 넉넉한 기간이 주어졌고, 그래서 spark도 써 볼겸 pyspark으로 처음부터 작성해봤다.

     처음에는 데이터 불러온 뒤 부터는 pandas처럼 쓰면 될 거라고 아주 가볍게 생각해서 이틀 컷이다 이러면서 spark도 깔아보고 이것 저것 해가면서 진행했는데 정말 생각대로 흘러가지 않았다 (이번 주에 제일 많이 한 말: 어.. 왜?.. 왜 안되냐고...ㅠㅠㅠㅠ..).

     그래도 pandas에 대한 이해가 있어서 그나마 3일 만에 끝났지 아니었음 큰일 날 뻔 했다.

     

    Map, Apply, Applymap 메서드 비교

     들어가기 전에 map, apply, applymap 함수에 대해 간단히 이해하고 넘어가자 (그래야할 것 같다).

     먼저 pandas의 dataframe은 한 개이상의 series로 구성되어있다. 각 컬럼, 즉, 한 열을 하나의 series로 볼 수 있고, series는 한 개이상의 값으로 구성되어있다.

     세 함수의 차이를 가장 간단히 설명하자면 다음 표와 같다.

    Method 적용 대상 Return
    Map Series Series
    Apply Series or Dataframe Series or Dataframe
    Applymap Dataframe Dataframe

     

    Map

     우리가 하나의 값 x에 어떤 함수를 적용할때 func(x)를 사용한다면,  여러 개의 값이 모인 series s에 적용을 하기 위해서 사용하는 것이 map 함수이다 (물론 for문을 돌려도 같은 결과를 만들 수는 있다). 아마 lambda 함수를 사용할 때 많이 봤을 거다 (li_2 = list(map(lambda x: func(x), li)) 이런식으로..)

     map함수는 1차원 배열을 입력으로 받아 임의의 함수를 적용한 후 1차원 배열로 출력을 돌려준다.

    df.컬럼1 = df.컬럼1.map(함수)

     여기서 포인트는 map은 1차원 배열만 입력으로 받는다. Dataframe에서 열은 1차원이지만 여러 열, 또는 행은 2차원이다. 따라서 여러 열, 또는 행 단위에는 apply함수를 사용해야한다.

    (행이 2차원이라는 사실이 이해가 안된다면 Dataframe을 만드는 과정을 생각해보면 쉽다. 데이터프레임을 만들때 pd.DataFrame({'컬럼 1': [값 1, 값 2], '컬럼 2': [값 3, 값 4]}  형식으로 만든다. 따라서 행은 [[값1],[값3]] 의 형태로 저장되는 것이다.)

     

    Apply

     Apply함수는 1차원 이상의 입력을 받아 함수를 적용한 후 1차원 이상의 출력을 리턴한다. 따라서 1차원 입력을 넣는 경우 map 함수와 수행값이 같다. 다음 코드는 map에서는 수행할 수 없지만 apply에서는 수행할 수 있다.

    df[[컬럼1,컬럼2]] = df[[컬럼1,컬럼2]].apply(함수)
    df.apply(함수, axis=0) #행에 적용

     즉, apply는 series와 dataframe 모두에 사용할 수 있는 메서드이다.

     

    Applymap

     마지막으로 applymap은 2차원 이상의 배열만 입력으로 받아 2차원 이상의 배열을 출력으로 리턴하는 메서드이다. 즉, series에는 사용할 수 없고, 두 개 이상의 열이나 행에만 사용할 수 있는 메서드이다.

    df.컬럼1 = 컬럼1.applymap(함수)
    #AttributeError: 'Series' object has no attribute 'applymap' 에러 발생
    
    df = df.applymap(함수) #가능
    df[[컬럼1,컬럼2]] = df[[컬럼1,컬럼2]].applymap(함수) #가능

     

     머리에 꽂는 가장 쉬운 방법은 예제 코드를 만들어서 돌려보는 거다. 근데 둘 다 되는 거를 applymap으로 작명을 하지 왜 네이밍을 저렇게 해놨을까.. 참..

     

    Glue UDF 적용하기

     암튼 본론으로 돌아와서 위의 로직을 되짚어보면 우리가 어떤 함수를 만들고 그걸 glue의 pyspark dataframe의 컬럼에 적용하고 싶은 경우, apply나 map함수를 사용하면 될 것 같다. 근데 이 공식 문서를 보면 "AWS Glue는 사용자 정의 함수인 Lambda 함수를 직접 지원하지 않습니다."라고 되어 있다. 그리고 udf라는 처음보는 함수가 나와 있어서 뭐지? lambda 대신에 udf를 쓰라는건가..? 나는 다른 함수를 외부에 정의해서 쓰고싶은데 어떡해야하지? 라는 혼란에 부딪힐 수 있다. 하지만 언제나 그렇듯 마음을 차분히 가라앉히고(필수 코스) 저 말을 다시 읽어보면 lambda를 "직접"지원하지 않는다 => "udf랑 lambda를 같이 써라"라는 말이다. 따라서 함수를 각 값에 적용해주는 lambda함수를 udf로 한번 더 만들어 주면 된다. 글로 설명하면 조금 복잡하니 아래 코드를 보자.

     저 한 문장 때문에 나는 몇 시간을 날렸는가..

     

    공식문서에 예제 코드가 가독성이 엄청나게 떨어지게 적혀있는데 udf의 사용법은 다음과 같다.

    #함수를 적용할 컬럼이 있는 Dynamic frame의 변수명을 source_dynamic_frame이라고하자
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import #원하는 컬럼 Type
    
    
    def my_func(val):
        return val
    
    udf_func = udf(lambda x: my_func(x), import한 원하는 컬럼 Type) #udf로 apply와 같은 역할을하는 함수를 만들어준다.
    df = source_dynamic_frame.toDF()
    parsed_df = df.withColumn("함수를 적용한 값이 들어갈 컬럼 이름", udf_func(df["함수를 적용 할 컬럼 이름"]))
    
    parsed_df.show()
    #결과: 함수를 적용한 컬럼, 적용하기 전 컬럼

    추가로 나의 코드는 다음과 같다(일부 컬럼명 등은 *처리하거나 삭제함). 참고로 Type을 명시할 때 함수의 return값이 type과 안맞으면 null값이 들어가니 주의하자.

     

    (컬럼타입에 IntegerType으로 지정해놓고 파싱 결과가 "1002"처럼 string으로 나오게 코딩해놔서 계속 값이 안들어갔는데 왜 안들어갔는지 한참을 디버깅했다는 슬픈 사연 들어보실분^_ㅠ)

    (근데 과연 이 방법이 SQL보다 나은가는 여전히 의문이다)

     

    import json
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType, IntegerType, BooleanType
    
    
    def parse_json(json_val, key:str, separator:str=None, idx:int=None):
        if not json_val:
            return None
        
        dic_val = json.loads(json_val)
        if (isinstance(dic_val, list)):
            dic_val = dic_val[0]
    
        if not separator:
            try:
                if isinstance(dic_val[key], str):
                    return int(dic_val[key])
                return dic_val[key]
            except ValueError:
                return dic_val[key]          
            except KeyError:
                return None
        try:
            return dic_val[key].split(separator)[idx]
        except:
            raise
    
    
    udf_name = udf(lambda x: parse_json(json_val=x, key="email", separator='@', idx=0),StringType())
    udf_cat = udf(lambda x: parse_json(json_val=x, key="***"),IntegerType())
    udf_is = udf(lambda x: parse_json(json_val=x, key="****"),BooleanType())
    udf_id = udf(lambda x: parse_json(json_val=x, key="**"),IntegerType())
    
    df_dlab_ods = source_dlab_ods.toDF()
    parsed_dlab_ods = df_dlab_ods.\
                                 withColumn("name", udf_name(df_dlab_ods["email"])).\
                                 withColumn("category_id", udf_cat(df_dlab_ods["fields"])).\
                                 withColumn("is_", udf_is(df_dlab_ods["fields"])).\
                                 withColumn("id", udf_id(df_dlab_ods["fields"]))
    
    pre_dlab_ods = parsed_dlab_ods.select("name", "category_id", "is_", "id")
    pre_dlab_ods.printSchema()
    728x90
    반응형

    댓글