관리 메뉴

IT & Life

Spark 튜토리얼 : Apache Spark 시작하기 본문

프로그래밍/Spark

Spark 튜토리얼 : Apache Spark 시작하기

미운앙마 2017. 12. 2. 23:46

https://www.infoworld.com/article/3237984/analytics/spark-tutorial-get-started-with-apache-spark.html?upd=1512224974243

 

 

Spark 튜토리얼 : Apache Spark 시작하기

 

구조화 된 스트리밍을 사용하여 데이터 세트로드, 스키마 적용, 간단한 쿼리 작성 및 실시간 데이터 쿼리 단계별 가이드

 


Apache Spark는 대용량 데이터 집합 쿼리, 미래 학습 추세 예측을위한 기계 학습 모델 교육 또는 스트리밍 데이터 처리와 같은 대규모 데이터 처리를위한 사실상의 표준이되었습니다. 이 기사에서는 Apache Spark를 사용하여 Python과 Spark SQL의 데이터를 분석하는 방법을 설명합니다. 또한 플랫폼 내에서 스트리밍 데이터를 처리하기위한 새로운 최신 기술인 Structured Streaming을 지원하도록 코드를 확장 할 것입니다. 여기서는 Apache Spark 2.2.0을 사용하지만이 자습서의 코드는 Spark 2.1.0 이상에서도 작동합니다.

 


Apache Spark 실행 방법

 

시작하기 전에 Apache Spark 설치가 필요합니다. Spark은 여러 가지 방법으로 실행할 수 있습니다. Hortonworks , Cloudera 또는 MapR 클러스터를 이미 실행중인 경우 Spark가 이미 설치되어 있거나 Ambari , Cloudera Navigator 또는 MapR 사용자 정의 패키지를 통해 쉽게 설치할 수 있습니다 .

 

이러한 클러스터를 손쉽게 사용할 수없는 경우 Amazon EMR 또는 Google Cloud Dataproc 을 사용하면 쉽게 시작할 수 있습니다. 이러한 클라우드 서비스를 사용하면 Apache Spark을 설치하고 바로 사용할 수있는 Hadoop 클러스터를 만들 수 있습니다. 관리 서비스에 대한 추가 비용을 지불하면 컴퓨팅 리소스에 대한 요금이 청구됩니다. 클러스터를 사용하지 않을 때는 클러스터를 종료하는 것을 잊지 마십시오!

 

물론 spark.apache.org 에서 최신 릴리스를 다운로드하여 랩톱에서 실행할 수도 있습니다. Java 8 런타임이 설치되어 있어야합니다 (Java 7이 작동하지만 더 이상 사용되지 않습니다). 클러스터의 계산 능력은 없지만이 튜토리얼에서는 코드 스 니펫을 실행할 수 있습니다.

 

제 3의 클라우드 옵션 인 Databricks Community Edition을 사용할 것 입니다. Databricks Community Edition은 사용하기위한 무료 미니 클러스터, Jupyter 또는 Apache Zeppelin 과 유사한 웹 노트북 인터페이스 및 재생할 소량의 스토리지를 제공합니다. Databricks는 또한 노트북의 공용 공유를 허용합니다. 이 기사 에서 실행 한 모든 코드는 여기 에서 찾을 수 있습니다 .

 

아파치 스파크로 놀고 있다면 로컬 또는 클라우드 환경에 설치하고 싶지는 않을 것입니다. Databricks Community Edition은 실험 할 수있는 좋은 방법입니다. 나중에 Databricks 전체 플랫폼으로 업그레이드하거나 원하는 경우 다른 Apache Spark 플랫폼에서 코드를 실행하여 실행할 수 있습니다.

 

이 튜토리얼에서 파이썬 코드를 작성할 것이므로, Spark를 실행 한 후의 첫 번째 단계는 Spark의 Python 쉘 PySpark를 가져 오는 것입니다.

 

$> PATH_TO_SPARK/bin/pyspark

샘플 스파크 데이터 세트

 

Apache Spark를로드 한 후에는 작업 할 데이터가 필요합니다. 저는 가상의 비디오 스트리밍 서비스에서 데이터를 탐색 할 수있는 작은 데이터 세트를 만들었습니다. 고객은 앱에서 프로그램을 시청하고 앱은 이벤트 데이터를 JSON 형식으로 Google에 전송하여 이벤트 시간 (기원 형식), 고객 ID, 쇼 ID 및 이벤트 상태를 알려줍니다. 네 가지 유형 중 하나 일 수 있습니다.

 

 ”open”—the customer starts watching a new show
 ”heartbeat”—the customer is still watching this show
 ”finish_completed”—the customer finishes the entire show
 ”finish_incomplete”—the customer closed the show without finishing it


예제 이벤트는 다음과 같습니다.

 

{”customer_id”:88,”show_id”:216,”state”:”open”,”timestamp”:1515502045}

샘플 JSON 데이터 세트를 여기서 다운로드하십시오 : https://images.idgesg.net/assets/2017/11/videodata.json.zip

 


Spark에서 데이터로드하기

 

Apache Spark에서 데이터로드는 특히 JSON 이벤트와 같은 구조화 된 정보를 처리 할 때 매우 쉽습니다. PySpark 셸에 라인을 입력하여 새로운 데이터 프레임을 생성하면됩니다 :

 

eventsDF = spark.read.json(“/FileStore/tables/videodata.json”)

S3에 데이터를 저장하는 경우 URL 대신 버킷을 사용합니다.

 

eventsDF = spark.read.json(“s3a://bucket-name/videodata.json”)

Google Cloud Storage에도 동일하게 적용됩니다.

 

eventsDF = spark.read.json(“gs://bucket-name/videodata.json”)

Spark을 로컬에서 실행중인 경우 file:///URI를 사용 하여 데이터 파일을 가리킬 수 있습니다.

 

Apache Spark 런타임은 저장소에서 JSON 파일을 읽고 파일의 내용을 기반으로 스키마를 추론합니다. 이것은 훌륭하지만 알고 있어야하는 몇 가지 문제를 제기합니다. 첫째,이 프로세스와 관련된 런타임 비용이 있습니다. 여기에 많은 영향을 미치지는 않겠지 만 대규모 데이터 세트에서는 성능 저하가 발생할 수 있습니다. 둘째, 필드가 선택적 일 수있는 데이터 스키마를 추론하면 전체 또는 필드를 설명하지 않는 스키마로 끝날 수 있습니다. 마지막으로 입력 필드에 모든 필드가 나타나더라도 자동 생성 스키마에서 원하는 필드를 정확히 얻을 수 없습니다.

 

JSON 파일에서 생성 된 데이터 프레임을 살펴보면이 데이터 프레임이 가깝지만 적합하지 않음을 알 수 있습니다.

 

eventsDF:pyspark.sql.dataframe.DataFrame customer_id:long show_id:long state:string timestamp:long

customer_id, show_id및 state유형은 OK, 그러나 스파크가 결정 반면 timestamp긴을, 그것은 우리가 시대의 시간을 처리 할 필요가 없습니다 실제 타임 스탬프를 사용하는 것이 더 의미가있다. 스키마를 자동으로 추론하는 대신 우리는 스스로 정의하고 Spark에로드 된대로 데이터에 적용하도록 지시 할 수 있습니다. 또한 스키마가 정의하지 않은 필드가 나타나면 오류가 발생하는 이점이 있습니다. 스키마는 각 필드에 대한 객체 StructType배열이있는 StructField객체입니다.

jsonSchema = StructType ([ StructField ( " CUSTOMER_ID " LongType (), 참 ), StructField ( " show_id " LongType (), 참 ), StructField ( " 상태 ", StringType (), 참 ), StructField ( " 타임 스탬프 " TimestampType (), True ) ])           
이 True값은 필드가 선택적인지 여부를 결정합니다. 그런 다음 입력 데이터를 열 때 스키마가 적용됩니다.

eventsSchemaDF = 스파크 . 읽기 . 스키마 ( jsonSchema ). json ( "/ FileStore / tables / videodata . json ")
head()첫 번째 항목을 얻기 위해이 데이터 프레임을 수행 하면 에포크 시간이 datetime객체 로 바뀌 었음을 알 수 있습니다 .

eventsSchemaDF . 헤드 () => 행 ( CUSTOMER_ID = 24 , show_id = 308 , 상태 = U ' 오픈 ', 소인 = 날짜 . 날짜 ( 2017 , 11 , 12 , 16 , 58 , 5 ))  

 

    
Spark에서 데이터 탐색

 

데이터 프레임을 데이터 프레임으로로드 했으므로 쿼리를 통해 정보를 얻을 수 있습니다. 간단하게 시작하겠습니다. 데이터 프레임에는 몇 개의 이벤트가 있습니까? 이 count()방법을 사용하여 다음 을 찾을 수 있습니다 .

 

eventsSchemaDF.count() => Out[]: 1166450

이제 "열린"사건의 수를 세어 봅시다. 우리는이 where()방법을 사용할 수 있습니다 :

 

eventsSchemaDF.where(eventsSchemaDF.state =="open").count() => Out[61]: 666258

이제 데이터 프레임을 테이블로 등록하면 Spark SQL 쿼리를 실행할 수 있습니다.

 

eventsSchemaDF.createOrReplaceTempView(“video_events”)
spark.sql(“SELECT count(*) FROM video_events WHERE state = open’”).show()

표준 SQL 구문을 사용하여보다 복잡한 쿼리를 작성할 수 있습니다. 아래에서는 각 쇼에 대해 등록 된 공개 이벤트 수를 확인하고 내림차순으로 결과를 표시합니다 ( show()쿼리 끝에 있는 메소드는 데이터 프레임의 처음 20 행을 콘솔에 인쇄합니다).

 

spark.sql(“SELECT show_id, count(*) AS open_events FROM video_events 
WHERE state = open GROUP BY show_id ORDER BY open_events DESC”).show()

아파치 스파크 쇼 오픈 이벤트 IDG

 


데이터 프레임 구문에서이 모양은 다음과 같습니다.

 

eventsSchemaDF.select(eventsSchemaDF.show_id)
.where(eventsSchemaDF.state=="open")
.groupBy(eventsSchemaDF.show_id)
.count()
.orderBy(desc(“count”))
.show()

우리는 데이터에 시간 창을 만들어 일정 기간 동안 쇼에서 어떤 일이 일어나고 있는지 확인할 수도 있습니다. 다음 쿼리는 5 분짜리 창을 생성하고 각 창 내의 각 프로그램에 대한 상태를 계산합니다.

 

windowDF = spark.sql(“SELECT show_id, state, window(timestamp, 5 minutes’) AS window,count(*) AS count 
FROM video_events GROUP BY state,show_id, window ORDER BY window, count,show_id”)


시간 창을 만들었으므로이 결과를 JSON (또는 Apache Spark이 연결된 하이브 테이블과 같은 다른 모든 상점)의 write()메소드를 사용하여 다시 유지할 수 있습니다 .

 

windowDF.write.json(“/FileStore/tables/windowdata.json”)


Spark 웹 UI

 

Apache Spark에는 웹 인터페이스가있어 클러스터의 상태를 검사 할 수 있습니다. 기본적으로 Spark 웹 UI는 Spark 드라이버 프로세스 (PySpark를 시작한 곳)의 포트 8080과 모든 작업자 노드의 포트 8081에 있습니다. 대부분의 경우 드라이버 페이지를 보게 될 것입니다. 드라이버 페이지는 다음과 같습니다.

 


Spark 웹 UI를 사용하면 현재 실행중인 작업을 추적하고 이전에 실행 한 작업을 검사 할 수 있습니다. 활성 작업을 클릭하면 클러스터에서 활성 작업, 완료된 작업 및 아직 예약하지 않은 작업이 표시됩니다. Spark이 데이터를 처리하기 위해 만든 그래프의 시각화를 볼 수도 있습니다. 또한 웹 UI를 사용하면 드라이버 및 작업자의 로그, 가비지 수집에 걸리는 시간, 작업을 엉망으로 처리하는 것과 같은 중요한 JVM 메트릭 및 전체 작업 프로세스의 타임 라인을 검사하여 디버깅을 지원할 수 있습니다.

 

 

스트럭처 스트리밍

 

위의 예제는 정적 데이터에서 작동합니다. 그들은 우리가 조사하고 싶은 모든 사건들을 담은 파일을 가지고 있다고 가정합니다. 그러나 비디오 플레이어 응용 프로그램이 데이터를 집으로 보내는 경우에는 그렇지 않을 수 있습니다. 전 세계에서 항상 데이터가 도착할 것입니다. 우리는 어떻게 대응할 수 있습니까? 도착한 데이터를 저장하고 야간 일괄 처리 작업을 실행하여 지난 24 시간 동안의 추세를 볼 수는 있지만 충분하지 않은 경우에는 어떻게해야합니까? Apache Spark를 사용하면 스트리밍 지원을 통해 실시간으로 데이터를 처리하고 해당 데이터로부터 통찰력을 얻을 수 있습니다.

 

Apache Spark의 스트리밍 데이터 처리는 이전에 Spark Streaming에서 처리했습니다 . Spark Streaming은 작은 배치 ( "microbatches"라고 함)로 스트림을 처리하고 데이터를 조작하기위한 기능적 스타일의 API를 제공했습니다. 이 API는 Apache Spark 2.x에서 계속 사용할 수 있지만 구조화 스트리밍 이라는 새로운 접근 방식으로 대체되었습니다 . 이는 이미 본 데이터 프레임이나 Spark SQL API를 제공하지만 라이브 데이터에 대한 쿼리를 실행할 수 있습니다.

 

본격적인 분석 플랫폼을 구축하는 경우 들어오는 이벤트를 Apache Kafka 또는 Apache Pulsar 에 배치 한 다음 Spark를 연결하여 해당 로그 시스템에서 읽을 수 있습니다. 움직이는 부분의 수를 줄이기 위해 구조화 된 스트리밍이 파일 시스템에서 읽을 수있는 기능을 이용합니다. 새 파일은 제공된 경로에 추가 될 때 처리됩니다.

 

스트리밍 dataframe를 구축하는 것은 간단합니다 : 우리는 단순히 교체 read()와 함께 readStream(). 또한 여러 JSON 파일에 대해 와일드 카드를 사용할 수 있도록 입력 경로를 조정합니다. 지금은 하나의 파일 만 있지만 곧 추가 할 것입니다.

 

eventsStreamingDF = spark.readStream
                         .schema(jsonSchema)
                         .json(/FileStore/tables/video*.json)

이제이 데이터 프레임을 인 메모리 테이블로 스트리밍하여 계속 진행되는 쿼리를 실행하고 Apache Spark에 스트림 처리를 시작하도록 지시합니다 start().

 

query = eventsStreamingDF.writeStream
                         .format(“memory”)
                         .queryName(“streaming_events”)
                         .start()

여기서도 실제 응용 프로그램을 만드는 경우 메모리에 테이블을 저장하는 대신 Hive 테이블이나 Parquet 파일 등으로 업데이트를 직렬화 할 가능성이 큽니다.

 

결과 테이블은 streaming_events입니다. Spark SQL의 다른 테이블과 마찬가지로 쿼리 할 수 ​​있습니다.

 

spark.sql(“SELECT count(*) FROM streaming_events WHERE state = open’”)

위의 배치 작업에서 얻은 답변과 동일한 대답을 제공해야합니다. 당신이 입력 파일의 복사본을 만들 경우, 같은 위치에 업로드를 videodata2.json 이름 videodata.json ,이 쿼리를 다시 실행, 당신은 구조적 스트리밍 과정을 새 파일을 볼에 데이터를 추가해야 메모리 내 테이블 이제 카운트가 두 배가됩니다.

마지막으로 stop()쿼리 핸들 에서 메서드를 호출하여 스트리밍을 중지 할 수 있습니다.

 

query.stop()

 

Spark 예제에서 빌드

 

지금까지 살펴본 내용을 토대로 한 유용한 연습은 고객이 다른 사용자와 계정을 공유하고 있는지 여부를 감지 할 수있는 쿼리를 구현하는 것입니다. 우리는 비디오 응용 프로그램에서 일부 공유에 관대 할 수도 있지만 고객이 선상에 오르 길 원치는 않습니다. 이 자습서에서 이미 표시된 코드를 확장하여 임의의 고객 ID에 대해 주어진 5 분 시간 창에서 다섯 개 이상의 열린 이벤트를 확인하는 쿼리를 작성할 수 있습니다. 일괄 처리 및 스트럭처 스트리밍 모드에서 쿼리를 실행하십시오.

Comments