일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 플레이웨어즈
- 공개
- AWS
- 엔비디아
- LEAGUE OF LEGENDS
- 닌텐도 스위치
- 비트코인
- 리뷰
- 이벤트
- 출시
- 이원경기자
- 롤
- 아이폰X
- 업데이트
- 문태환기자
- Review
- 인텔
- 브레인박스
- 애플
- 삼성
- 리그오브레전드
- 오버워치
- 마이크로소프트
- Apple
- 배틀그라운드
- 카카오게임즈
- 구글
- Microsoft
- Bitcoin
- Today
- Total
IT & Life
Spark 튜토리얼 : Apache Spark 시작하기 본문
Spark 튜토리얼 : Apache Spark 시작하기
구조화 된 스트리밍을 사용하여 데이터 세트로드, 스키마 적용, 간단한 쿼리 작성 및 실시간 데이터 쿼리 단계별 가이드
Apache Spark는 대용량 데이터 집합 쿼리, 미래 학습 추세 예측을위한 기계 학습 모델 교육 또는 스트리밍 데이터 처리와 같은 대규모 데이터 처리를위한 사실상의 표준이되었습니다. 이 기사에서는 Apache Spark를 사용하여 Python과 Spark SQL의 데이터를 분석하는 방법을 설명합니다. 또한 플랫폼 내에서 스트리밍 데이터를 처리하기위한 새로운 최신 기술인 Structured Streaming을 지원하도록 코드를 확장 할 것입니다. 여기서는 Apache Spark 2.2.0을 사용하지만이 자습서의 코드는 Spark 2.1.0 이상에서도 작동합니다.
Apache Spark 실행 방법
시작하기 전에 Apache Spark 설치가 필요합니다. Spark은 여러 가지 방법으로 실행할 수 있습니다. Hortonworks , Cloudera 또는 MapR 클러스터를 이미 실행중인 경우 Spark가 이미 설치되어 있거나 Ambari , Cloudera Navigator 또는 MapR 사용자 정의 패키지를 통해 쉽게 설치할 수 있습니다 .
이러한 클러스터를 손쉽게 사용할 수없는 경우 Amazon EMR 또는 Google Cloud Dataproc 을 사용하면 쉽게 시작할 수 있습니다. 이러한 클라우드 서비스를 사용하면 Apache Spark을 설치하고 바로 사용할 수있는 Hadoop 클러스터를 만들 수 있습니다. 관리 서비스에 대한 추가 비용을 지불하면 컴퓨팅 리소스에 대한 요금이 청구됩니다. 클러스터를 사용하지 않을 때는 클러스터를 종료하는 것을 잊지 마십시오!
물론 spark.apache.org 에서 최신 릴리스를 다운로드하여 랩톱에서 실행할 수도 있습니다. Java 8 런타임이 설치되어 있어야합니다 (Java 7이 작동하지만 더 이상 사용되지 않습니다). 클러스터의 계산 능력은 없지만이 튜토리얼에서는 코드 스 니펫을 실행할 수 있습니다.
제 3의 클라우드 옵션 인 Databricks Community Edition을 사용할 것 입니다. Databricks Community Edition은 사용하기위한 무료 미니 클러스터, Jupyter 또는 Apache Zeppelin 과 유사한 웹 노트북 인터페이스 및 재생할 소량의 스토리지를 제공합니다. Databricks는 또한 노트북의 공용 공유를 허용합니다. 이 기사 에서 실행 한 모든 코드는 여기 에서 찾을 수 있습니다 .
아파치 스파크로 놀고 있다면 로컬 또는 클라우드 환경에 설치하고 싶지는 않을 것입니다. Databricks Community Edition은 실험 할 수있는 좋은 방법입니다. 나중에 Databricks 전체 플랫폼으로 업그레이드하거나 원하는 경우 다른 Apache Spark 플랫폼에서 코드를 실행하여 실행할 수 있습니다.
이 튜토리얼에서 파이썬 코드를 작성할 것이므로, Spark를 실행 한 후의 첫 번째 단계는 Spark의 Python 쉘 PySpark를 가져 오는 것입니다.
샘플 스파크 데이터 세트
Apache Spark를로드 한 후에는 작업 할 데이터가 필요합니다. 저는 가상의 비디오 스트리밍 서비스에서 데이터를 탐색 할 수있는 작은 데이터 세트를 만들었습니다. 고객은 앱에서 프로그램을 시청하고 앱은 이벤트 데이터를 JSON 형식으로 Google에 전송하여 이벤트 시간 (기원 형식), 고객 ID, 쇼 ID 및 이벤트 상태를 알려줍니다. 네 가지 유형 중 하나 일 수 있습니다.
”open”—the customer starts watching a new show
”heartbeat”—the customer is still watching this show
”finish_completed”—the customer finishes the entire show
”finish_incomplete”—the customer closed the show without finishing it
예제 이벤트는 다음과 같습니다.
샘플 JSON 데이터 세트를 여기서 다운로드하십시오 : https://images.idgesg.net/assets/2017/11/videodata.json.zip
Spark에서 데이터로드하기
Apache Spark에서 데이터로드는 특히 JSON 이벤트와 같은 구조화 된 정보를 처리 할 때 매우 쉽습니다. PySpark 셸에 라인을 입력하여 새로운 데이터 프레임을 생성하면됩니다 :
S3에 데이터를 저장하는 경우 URL 대신 버킷을 사용합니다.
Google Cloud Storage에도 동일하게 적용됩니다.
Spark을 로컬에서 실행중인 경우 file:///URI를 사용 하여 데이터 파일을 가리킬 수 있습니다.
Apache Spark 런타임은 저장소에서 JSON 파일을 읽고 파일의 내용을 기반으로 스키마를 추론합니다. 이것은 훌륭하지만 알고 있어야하는 몇 가지 문제를 제기합니다. 첫째,이 프로세스와 관련된 런타임 비용이 있습니다. 여기에 많은 영향을 미치지는 않겠지 만 대규모 데이터 세트에서는 성능 저하가 발생할 수 있습니다. 둘째, 필드가 선택적 일 수있는 데이터 스키마를 추론하면 전체 또는 필드를 설명하지 않는 스키마로 끝날 수 있습니다. 마지막으로 입력 필드에 모든 필드가 나타나더라도 자동 생성 스키마에서 원하는 필드를 정확히 얻을 수 없습니다.
JSON 파일에서 생성 된 데이터 프레임을 살펴보면이 데이터 프레임이 가깝지만 적합하지 않음을 알 수 있습니다.
customer_id, show_id및 state유형은 OK, 그러나 스파크가 결정 반면 timestamp긴을, 그것은 우리가 시대의 시간을 처리 할 필요가 없습니다 실제 타임 스탬프를 사용하는 것이 더 의미가있다. 스키마를 자동으로 추론하는 대신 우리는 스스로 정의하고 Spark에로드 된대로 데이터에 적용하도록 지시 할 수 있습니다. 또한 스키마가 정의하지 않은 필드가 나타나면 오류가 발생하는 이점이 있습니다. 스키마는 각 필드에 대한 객체 StructType배열이있는 StructField객체입니다.
jsonSchema = StructType ([ StructField ( " CUSTOMER_ID " LongType (), 참 ), StructField ( " show_id " LongType (), 참 ), StructField ( " 상태 ", StringType (), 참 ), StructField ( " 타임 스탬프 " TimestampType (), True ) ])
이 True값은 필드가 선택적인지 여부를 결정합니다. 그런 다음 입력 데이터를 열 때 스키마가 적용됩니다.
eventsSchemaDF = 스파크 . 읽기 . 스키마 ( jsonSchema ). json ( "/ FileStore / tables / videodata . json ")
head()첫 번째 항목을 얻기 위해이 데이터 프레임을 수행 하면 에포크 시간이 datetime객체 로 바뀌 었음을 알 수 있습니다 .
eventsSchemaDF . 헤드 () => 행 ( CUSTOMER_ID = 24 , show_id = 308 , 상태 = U ' 오픈 ', 소인 = 날짜 . 날짜 ( 2017 , 11 , 12 , 16 , 58 , 5 ))
Spark에서 데이터 탐색
데이터 프레임을 데이터 프레임으로로드 했으므로 쿼리를 통해 정보를 얻을 수 있습니다. 간단하게 시작하겠습니다. 데이터 프레임에는 몇 개의 이벤트가 있습니까? 이 count()방법을 사용하여 다음 을 찾을 수 있습니다 .
이제 "열린"사건의 수를 세어 봅시다. 우리는이 where()방법을 사용할 수 있습니다 :
이제 데이터 프레임을 테이블로 등록하면 Spark SQL 쿼리를 실행할 수 있습니다.
표준 SQL 구문을 사용하여보다 복잡한 쿼리를 작성할 수 있습니다. 아래에서는 각 쇼에 대해 등록 된 공개 이벤트 수를 확인하고 내림차순으로 결과를 표시합니다 ( show()쿼리 끝에 있는 메소드는 데이터 프레임의 처음 20 행을 콘솔에 인쇄합니다).
아파치 스파크 쇼 오픈 이벤트 IDG
데이터 프레임 구문에서이 모양은 다음과 같습니다.
우리는 데이터에 시간 창을 만들어 일정 기간 동안 쇼에서 어떤 일이 일어나고 있는지 확인할 수도 있습니다. 다음 쿼리는 5 분짜리 창을 생성하고 각 창 내의 각 프로그램에 대한 상태를 계산합니다.
시간 창을 만들었으므로이 결과를 JSON (또는 Apache Spark이 연결된 하이브 테이블과 같은 다른 모든 상점)의 write()메소드를 사용하여 다시 유지할 수 있습니다 .
Spark 웹 UI
Apache Spark에는 웹 인터페이스가있어 클러스터의 상태를 검사 할 수 있습니다. 기본적으로 Spark 웹 UI는 Spark 드라이버 프로세스 (PySpark를 시작한 곳)의 포트 8080과 모든 작업자 노드의 포트 8081에 있습니다. 대부분의 경우 드라이버 페이지를 보게 될 것입니다. 드라이버 페이지는 다음과 같습니다.
Spark 웹 UI를 사용하면 현재 실행중인 작업을 추적하고 이전에 실행 한 작업을 검사 할 수 있습니다. 활성 작업을 클릭하면 클러스터에서 활성 작업, 완료된 작업 및 아직 예약하지 않은 작업이 표시됩니다. Spark이 데이터를 처리하기 위해 만든 그래프의 시각화를 볼 수도 있습니다. 또한 웹 UI를 사용하면 드라이버 및 작업자의 로그, 가비지 수집에 걸리는 시간, 작업을 엉망으로 처리하는 것과 같은 중요한 JVM 메트릭 및 전체 작업 프로세스의 타임 라인을 검사하여 디버깅을 지원할 수 있습니다.
스트럭처 스트리밍
위의 예제는 정적 데이터에서 작동합니다. 그들은 우리가 조사하고 싶은 모든 사건들을 담은 파일을 가지고 있다고 가정합니다. 그러나 비디오 플레이어 응용 프로그램이 데이터를 집으로 보내는 경우에는 그렇지 않을 수 있습니다. 전 세계에서 항상 데이터가 도착할 것입니다. 우리는 어떻게 대응할 수 있습니까? 도착한 데이터를 저장하고 야간 일괄 처리 작업을 실행하여 지난 24 시간 동안의 추세를 볼 수는 있지만 충분하지 않은 경우에는 어떻게해야합니까? Apache Spark를 사용하면 스트리밍 지원을 통해 실시간으로 데이터를 처리하고 해당 데이터로부터 통찰력을 얻을 수 있습니다.
Apache Spark의 스트리밍 데이터 처리는 이전에 Spark Streaming에서 처리했습니다 . Spark Streaming은 작은 배치 ( "microbatches"라고 함)로 스트림을 처리하고 데이터를 조작하기위한 기능적 스타일의 API를 제공했습니다. 이 API는 Apache Spark 2.x에서 계속 사용할 수 있지만 구조화 스트리밍 이라는 새로운 접근 방식으로 대체되었습니다 . 이는 이미 본 데이터 프레임이나 Spark SQL API를 제공하지만 라이브 데이터에 대한 쿼리를 실행할 수 있습니다.
본격적인 분석 플랫폼을 구축하는 경우 들어오는 이벤트를 Apache Kafka 또는 Apache Pulsar 에 배치 한 다음 Spark를 연결하여 해당 로그 시스템에서 읽을 수 있습니다. 움직이는 부분의 수를 줄이기 위해 구조화 된 스트리밍이 파일 시스템에서 읽을 수있는 기능을 이용합니다. 새 파일은 제공된 경로에 추가 될 때 처리됩니다.
스트리밍 dataframe를 구축하는 것은 간단합니다 : 우리는 단순히 교체 read()와 함께 readStream(). 또한 여러 JSON 파일에 대해 와일드 카드를 사용할 수 있도록 입력 경로를 조정합니다. 지금은 하나의 파일 만 있지만 곧 추가 할 것입니다.
이제이 데이터 프레임을 인 메모리 테이블로 스트리밍하여 계속 진행되는 쿼리를 실행하고 Apache Spark에 스트림 처리를 시작하도록 지시합니다 start().
여기서도 실제 응용 프로그램을 만드는 경우 메모리에 테이블을 저장하는 대신 Hive 테이블이나 Parquet 파일 등으로 업데이트를 직렬화 할 가능성이 큽니다.
결과 테이블은 streaming_events입니다. Spark SQL의 다른 테이블과 마찬가지로 쿼리 할 수 있습니다.
위의 배치 작업에서 얻은 답변과 동일한 대답을 제공해야합니다. 당신이 입력 파일의 복사본을 만들 경우, 같은 위치에 업로드를 videodata2.json 이름 videodata.json ,이 쿼리를 다시 실행, 당신은 구조적 스트리밍 과정을 새 파일을 볼에 데이터를 추가해야 메모리 내 테이블 이제 카운트가 두 배가됩니다.
마지막으로 stop()쿼리 핸들 에서 메서드를 호출하여 스트리밍을 중지 할 수 있습니다.
Spark 예제에서 빌드
지금까지 살펴본 내용을 토대로 한 유용한 연습은 고객이 다른 사용자와 계정을 공유하고 있는지 여부를 감지 할 수있는 쿼리를 구현하는 것입니다. 우리는 비디오 응용 프로그램에서 일부 공유에 관대 할 수도 있지만 고객이 선상에 오르 길 원치는 않습니다. 이 자습서에서 이미 표시된 코드를 확장하여 임의의 고객 ID에 대해 주어진 5 분 시간 창에서 다섯 개 이상의 열린 이벤트를 확인하는 쿼리를 작성할 수 있습니다. 일괄 처리 및 스트럭처 스트리밍 모드에서 쿼리를 실행하십시오.