SW 개발/Kotlin
Apache Spark, ETL 데이터 파이프라인 생성하기
지단로보트
2020. 10. 29. 18:08
개요
Apache Spark
는 분산 컴퓨팅을 위한 인메모리 데이터 프레임워크이다. 어떤 크기의 데이터라도 클러스터로 서로 연결된 n대의 노드에 인메모리 데이터 처리를 통해 분석 및 가공이 가능하다. 현재 관련 기술에서는 적수가 없을 정도로 명성을 얻고 있다.Apache Parquet
은 컬럼 기반의 구조체의 특화된 데이터 파일 형식이다. 적절하게 파티셔닝되고 압축 처리된 Apache Parquet 파일은 대용량에서도 좋은 조회 성능을 보여준다. 이번 글에서는Apache Spark
프레임워크를 이용하여 원격지에 위치한 다양한 데이터 소스로부터 가공과 Apache Parquet으로의 변환, 적재하는 방법을 설명하고자 한다. (잠재적으로 Amazon EMR에서 제출 가능한 작업을 만드는 것을 고려했다.)
Windows 10 운영체제 사전 조건
- Windows 10 운영체제에서 Apache Spark를 기동할 경우 정상적인 실행을 위해
winutils.exe
,hadoop.dll
파일을 필요로 한다. 여기 에서 다운로드 받아 아래 경로에 각각 복사한다. (파일이 없을 경우 java.lang.UnsatisfiedLinkError 오류가 발생한다.) [관련 링크]
C:\Hadoop\bin\winutils.exe
C:\Windows\System32\hadoop.dll
- 환경에 따라 위 작업 후에도 동일한 오류가 발생하는 경우가 있다. 이 경우, 시스템 환경 변수에 아래 내용을 추가하면 해결된다.
Path=C:\Hadoop\bin
build.gradle
- 프로젝트 루트의 /build.gradle 파일에 아래 내용을 추가한다.
dependencies {
implementation("org.elasticsearch:elasticsearch-hadoop:6.8.10")
implementation("org.mongodb.spark:mongo-spark-connector_2.11:2.4.2")
implementation("org.apache.hadoop:hadoop-aws:3.3.0")
implementation("org.apache.hadoop:hadoop-common:3.3.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
implementation("commons-io:commons-io:2.8.0")
implementation("org.apache.httpcomponents:httpclient:4.5.13")
implementation("commons-httpclient:commons-httpclient:3.1")
implementation("org.apache.xbean:xbean-asm6-shaded:4.10")
implementation("org.apache.spark:spark-core_2.11:2.4.7") {
exclude(group = "org.apache.xbean", module = "xbean-asm6-shaded")
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
implementation("org.apache.spark:spark-sql_2.11:2.4.7") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "org.codehaus.janino", module = "janino")
exclude(group = "org.codehaus.janino", module = "commons-compiler")
}
implementation("org.codehaus.janino:janino:3.0.8")
implementation("org.codehaus.janino:commons-compiler:3.0.8")
implementation("org.apache.spark:spark-network-common_2.11:2.4.7")
}
xbean-asm6-shaded
아티팩트를 굳이 제외하고, 별도로 4.10 최신 버전을 명시한 이유는 라이브러리 충돌 오류 때문이다. 명시하지 않을 경우 java.lang.IllegalArgumentException: Unsupported class file major version 55 오류가 발생한다. [관련 링크]janino
,commons-complier
아티팩트를 굳이 제외하고, 별도로 3.0.8 구버전을 명시한 이유는 라이브러리 충돌 오류 때문이다. 명시하지 않을 경우, java.lang.ClassNotFoundException: org.codehaus.janino.InternalCompilerException 오류가 발생한다. [관련 링크]
SparkSession 생성
- Apache Spark는
SparkSession
오브젝트의 생성에서 시작된다. Spring Boot 기반 프로젝트일 경우 @Bean으로 생성할 것을 추천한다.
System.setProperty("hadoop.home.dir", "C:\\Hadoop")
val sparkSession = SparkSession
.builder()
.master("local")
.appName("Some Application")
.orCreate
- 위 예제는 편의상 Apache Spark를
local
모드로 실행한 것이다. (local[*]를 입력할 경우 Spark가 CPU 코어 전부를 사용하도록 지정할 수 있다.) 로컬 모드는 모든 실행이 애플리케이션의 생명주기 내에서 시작되고 종료된다. Apache Spark를 고성능으로 사용하려면, 클러스터 모드로 구동하면 된다. (만약, 매니지드 서비스를 이용하고자 한다면Amazon EMR
을 사용하면 된다.) - Windows 운영체제에서 실행할 경우 파일시스템에 대한 네이티브 접근을 위해
winutils.exe
파일을 요구한다. 여기에서 다운로드 받아 적당한 경로에 설치한 후hadoop.home.dir
환경 변수에 해당 경로를 지정해주면 된다. (C:\Hadoop\bin에 설치했다면 경로는 C:\Hadoop이 되는 것에 유의한다. - 만약, Spring Boot 기반의 애플리케이션에 본 기능을 작성했다면, Spring Boot와 Spark간의 로그 라이브러리 충돌 오류가 발생한다. 이 경우, 실행 옵션에
-Dorg.springframework.boot.logging.LoggingSystem=none
옵션을 명시해주면 해결된다. [관련 링크]
MongoDB로부터 DataSet 획득
- 아래는 MongoDB로부터 DataSet 오브젝트를 획득하는 방법이다. [관련 링크]
val someDataSet = sparkSession
.read()
.option("spark.mongodb.input.uri", "mongodb://{username}:{password}@{hostname}:{port}/?authSource=admin")
.option("spark.mongodb.input.database", "{database}")
.option("spark.mongodb.input.collection", "{collection}")
.format("com.mongodb.spark.sql.DefaultSource")
.load()
- 별도로 스키마를 정의하지 않았다면
load()
메써드를 호출하는 시점에 대상 컬렉션의 대해 전체의 5%에 해당하는 샘플 데이터를 무작위로 획득한다. 이렇게 획득된 데이터로 생성될 DataSet 오브젝트의 스키마를 정의하게 된다. 실제 전체 데이터의 획득은 DataSet에 대한 count(), write() 등의 연산을 실행하는 시점에 수행된다. - 필터를 적용하여 조회 결과를 한정하려면 아래와 같이 작성하면 된다.
sparkSession.sparkContext().conf().set("spark.mongodb.input.uri", "mongodb://{username}:{password}@{hostname}:{port}/?authSource=admin")
sparkSession.sparkContext().conf().set("spark.mongodb.input.database", "{database}")
sparkSession.sparkContext().conf().set("spark.mongodb.input.collection", "{collection}")
val someDataSet = MongoSpark
.load(JavaSparkContext(sparkSession.sparkContext()))
.withPipeline(listOf(Document(
mapOf("\$match" to
mapOf("{date_time_field_name}" to
mapOf(
"\$gte" to LocalDate.of(2020, 10, 14),
"\$lt" to LocalDate.of(2020, 10, 15)
)
)
)
)))
.toDF()
- Spring Boot 기반 프로젝트의 경우 메인 클래스 레벨에 아래와 같이 @EnableAutoConfiguration를 명시해주어야 오류가 발생하지 않는다.
@SpringBootApplication
@EnableAutoConfiguration(exclude = [MongoAutoConfiguration::class])
Elasticsearch로부터 DataSet 획득
- 아래는 Elasticsearch로부터 DataSet 오브젝트를 획득하는 방법이다.
val someDataSet = sparkSession
.read()
.option("es.index.auto.create", true)
.option("es.nodes", "{hostname}:{port}")
// SSL 연결일 경우 아래 옵션 추가
.option("es.net.ssl", true)
.option("es.net.ssl.cert.allow.self.signed", true)
.option("es.nodes.discovery", false)
.option("es.nodes.data.only", false)
.option("es.read.field.as.array.include", true)
.option("es.read.field.as.array.include", "streams")
.option("es.query", objectMapper.writeValueAsString(
mapOf("query" to
mapOf("range" to
mapOf("{date_time_field_name}" to
mapOf("gte" to LocalDate.of(2020, 10, 14),
"lt" to LocalDate.of(2020, 10, 15),
"format" to "yyyy-MM-dd"
)
)
)
)
))
.format("org.elasticsearch.spark.sql")
.load("{index_prefix}_*")
- load() 메써드에는 조회할 인덱스명을 전달한다. 접미어로
*
문자를 추가하면 앞의 문자열로 시작하는 모든 인덱스를 대상으로 조회할 수 있다.
DataSet 가공: 저장 방식 정의
- DataSet 오브젝트는 기본적으로 각 워커 노드의 메모리에 파티션 단위로 분산 저장되어 필요한 연산을 수행한다. 처리해야할 DataSet의 크기가 노드의 메모리보다 크면 OOM 오류가 발생하는데 이 경우 메모리 외에 디스크에도 저장하도록 설정할 수 있다. 방법은 아래와 같다.
// 메모리와 디스크에 저장하도록 설정
val newDataSet = dataSet
.persist(StorageLevel.MEMORY_AND_DISK)
DataSet 가공: 특정 컬럼 제거
- DataSet 오브젝트를 만드는 방법은 2가지이다. 첫째는 위 예제와 같이 특정 저장소에서 로드하거나, 둘째는 새롭게 생성하는 것이다. 이렇게 획득된 DataSet 오브젝트의 가공 방법은 아래와 같다.
// 필요 없는 필드를 제거
val newDataSet = dataSet
.drop("foo", "bar")
DataSet 가공: 특정 컬럼만 선택
- DataSet에서 특정 컬럼만 선택하여 새로운 DataSet을 생성할 수 있다.
Column
클래스를 활용하면 별칭을 선언하는as
와 같은 다양한 기능을 이용할 수 있다.
// 특정 컬럼만 선택, 구조체 컬럼의 경우 특정 요소 선택도 가능
val newDataSet = dataSet
.select("foo", "foo.bar", Column("foo").`as`("bar"))
DataSet 가공: 컬럼 이름 변경
- DataSet의 컬럼 이름을 아래와 같이 변경할 수 있다.
// 특정 컬럼명 변경
val newDataSet = dataSet
.withColumnRenamed("{old_column_name}", "{new_column_name}")
DataSet 가공: 새 컬럼 추가
- DataSet에 새로운 컬럼을 추가하고 싶을 경우, 아래와 같이 추가할 수 있다.
// 새로운 컬럼과 값을 추가
val newDataSet = dataSet
.withColumn("{new_column_name}", functions.lit({new_value}))
DataSet 가공: 날짜/시간 문자열 컬럼 TIMESTAMP 컬럼으로 변경
- 날짜/시간을 저장한 특정 문자열 컬럼을 아래와 같이 TIMESTAMP 타입으로 변경할 수 있다.
// 날짜/시간을 저장한 특정 문자열 필드를 timestamp 타입으로 변경
val newDataSet = dataSet
.withColumn(
"{column_name}",
functions.to_timestamp(Column("{column_name}"))
)
DataSet 가공: DATE, TIMESTAMP 컬럼으로부터 년월일 컬럼 추가
- DataSet에 년월일은 파티셔닝의 가장 기본적인 단위로 사용할 수 있다. 이를 이용하여 조회시 조회 대상과 속도를 비약적으로 향샹시킬 수 있다.
// DATE, TIMESTAMP 타입 컬럼으로부터 year, month, day 컬럼을 추가
val newDataSet = dataSet
.withColumn("year", functions.year(Column("{date_or_timestamp_column_name}")))
.withColumn("month", functions.month(Column("{date_or_timestamp_column_name}")))
.withColumn("day", functions.dayofmonth(Column("{date_or_timestamp_column_name}")))
DataSet 가공: 배열 컬럼 빼내기
- 배열 타입의 컬럼을 아래와 같이 바깥으로 빼낼 수 있다. 이 경우 배열의 각 요소가 하나의 로우로 표현된다.
// foos 배열 컬럼의 각 요소를 foo 컬럼으로 빼냄
val newDataSet = dataSet
.withColumn("foo", functions.explode_outer(Column("foos")))
DataSet 가공: 조건 필터링
- 특정 조건으로 필터링 또한 가능하다. 방법은 아래와 같다.
// foo 배열 컬럼의 크기가 0보다 큰 것만 필터링
val newDataSet = dataSet
.filter(functions.size(Column("foo")).`$greater`(0))
DataSet 가공: 구조가 동일한 두 DataSet을 합치기
- 구조가 동일한 두 DataSet을 아래와 같이 새로운 DataSet으로 합칠 수 있다.
// 2개 DataSet을 합침
val newDataSet = fooDataSet
.union(barDataSet)
DataSet 가공: JSON 문자열 컬럼을 새 DataSet으로 가공
- 때로는 특정 컬럼에 구조체가 아니라 JSON 문자열이 그대로 저장된 경우도 있다. 이러한 컬럼을 구조화된 새로운 DataSet으로 가공할 수 있다. 방법은 아래와 같다.
// 특정 JSON 문자열 컬럼으로부터 DataSet<String>을 생성
val newDataSet = dataSet
.select("{json_column_name}")
.`as`(Encoders.STRING())
// 구조화된 새로운 DataSet<Row>을 생성
val newNewDataSet = sparkSession
.read()
.json(newDataSet)
- json() 실행시 아래 오류가 발생하는데 이 경우 파라메터로 DataSet에 대한
toJavaRDD()
변환한 오브젝트를 전달하면 해결된다.
java.io.CharConversionException: Invalid UTF-32 character 0x7374227b (above 0x0010ffff) at char #6, byte #27)
- 위 방법으로 JSON 문자열을 독립된 구조체 DataSet으로 획득할 수 있지만 같은 로우의 다른 컬럼과 같이 획득하는 것이 불가능하다. 아래와 같이
from_json()
을 이용하면 같은 로우의 다른 컬럼과 함께 획득하는 것이 가능하다.
// 앞서 획득한 DataSet의 Schema을 JSON 문자열로 획득
val schemaJson = newNewDataSet.schema().json()
// JSON 문자열을 이용하여 다른 컬럼과 함께 온전한 구조체 컬럼 동시 획득 가능
dataSet.
select(
functions.from_json(
Column("{json_column_name}"),
DataType.fromJson(schemaJson)
)
)
Apache Parquet으로 변환 후 Amazon S3에 업로드
- 아래는 획득한 DataSet 오브젝트를 Apache Parquet으로 변환 후 Amazon S3에 업로드하는 방법이다.
someDataSet
.write()
// 옵션: 년월일로 파티셔닝
.partitionBy("year", "month", "day")
.mode(SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("s3a://{bucket_name}/{folder_name}")
- Apache Parquet 파일을 압축하면 2가지 장점을 가지게 된다. 일단, Amazon S3에 업로드하면 용량을 절약할 수 있다. 또한, Amazon Athena와 연동하여 SQL 질의시 파일 전송 시간도 단축할 수 있다. 위 예제에서는 빠른 속도의 압축/해제에 최적화된
Snappy
라이브러리로 압축하였다. - 만약, Amazon Glue의 크롤러와 연동시 위 {folder_name}이 그대로 데이터베이스 카탈로그의 테이블 이름이 된다.
- 한편, Amazon S3 업로드 기능이 정상 작동하려면, 환경 변수에 AWS_ACCESS_KEY_ID, AWS_SECRET_KEY가 애플리케이션 기동 전에 정의되어 있어야 한다.
S3A
는 Apache Spark가 제공하는 S3 커넥터의 3세대를 이르는 명칭으로, 1세대 S3, 2세대 S3N에 이어 현재 공식적으로 사용을 권장하고 있는 커넥터이다. 최근에는 클라우드 시대가 보편화되면서 각 노드의 HDFS 파일 시스템 보다는 S3에 DataSet의 결과물을 저장하는 추세이다..paritionBy()
옵션으로 파티셔닝을 설정할 수 있다. 파티셔닝을 통해 조회시 전체 데이터를 스캔하지 않고 특정 데이터만 조회할 수 있어 퍼포먼스 향상을 얻을 수 있다. 가장 일반적인 방법은 년월일 컬럼을 이용하여 파티셔닝하는 것으로 S3 버킷은 아래 폴더 구조로 저장된다. [관련 링크]
s3://{bucket_name}/{folder_name}/year=2020/month=11/day=09/*.parquet
- 파티셔닝되어 저장된 데이터는 아래와 같이 조건 필터링을 통해 필요한 데이터만 조회할 수 있다.
val someDataSet = sparkSession
.read()
.parquet("s3a://{bucket_name}/{folder_name}")
.filter("year = '2020' and month = '11'")
Amazon EMR 연동
- 최근에는
Amazon EMR
클러스터를 생성한 후 마스터 인스턴스에 접속하여spark-submit
명령을 사용하여 작업이 정의된 애플리케이션을 실행하는 것이 성능이나 편의성 면에서 대세라고 할 수 있다. 이 경우 아래 요소에 유의해야 한다. - Java 8 기반으로 프로젝트를 빌드해야 한다. 또한 모든 라이브러리 종속성을 포함한 .jar 파일을 빌드해야 한다.
- 작업을 정의한 소스 코드 내에서 SparkSession 정의시 마스터를
yarn
으로 설정해야 한다. (Amazon EMR은 YARN 클러스터로 작동한다.)
트러블슈팅: HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas.
- 파티셔닝시 각 파티션의 타입을 명시하지 않으면 파티션마다 독립적으로 타입을 자동 판단하기 때문에 미일치 오류가 발생할 수 있다. (예를 들면 A 파티션은 double 타입인데, B 파티션은 bitint 타입으로 인식) 이 경우, 아래와 같이 DataSet에서 강제로 타입을 명시해주면 해결된다. [관련 링크]
val newDataSet = dataSet
.select(
Column("some_column").cast("double"),
...
)
- 또한, AWS Glue 크롤러에서는 구성 옵션에서 아래 항목을 체크한 후 크롤러를 재실행하면 해당 이슈가 해결된다.
테이블에서 메타데이터가 있는 새 파티션과 기존 파티션을 모두 업데이트 (체크)