DE/Study

[Hadoop_기초] Udemy course - 섹션 4.Spark

Hazel_song 2022. 6. 19. 21:33
728x90

udemy - hadoop 기초강의 내용을 정리하면서 추가적으로 공부한 내용들을 정리했다.

 

네 번째 섹션에서는 Apache Spark에 대한 대략적인 설명으로 이루어졌다. 


28강. Why Spark

What is Spark?

Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

즉 spark는 대규모 데이터 처리에 사용되는 신속하고 보편적인 엔진이다.

MapReduce의 단점을 개선하기 위해 등장.

✅ MapReduce의 단점은 데이터 처리 단계 사이의 중간 데이터가 디스크에 잔류하는 것

✅ 그러나 Spark는 중간 데이터를 메모리에 둘 수 있어서, 데이터 재사용이 가능하고 이에 따라 반복형/대화형 테스크에 강점을 가진다. 

 

Why Spark?

  • java, scala, python 같은 실제 프로그래밍 언어를 사용해서 스크립트를 작성하여 데이터를 처리할 수 있다.
    즉 데이터 처리에 있어서 사용자에게 높은 유연성을 제공한다.
  • 유저 기준으로 사용하기 편하면서도, 동시에 복잡한 데이터를 조작,변형,분석할 수 있다.
  • 간단한 데이터 처리 뿐 아니라, spark 생태계 하나에서 머신러닝, 데이터 스트리밍, 데이터 분석이 가능하다. 
    • apache pig도 유저에게 (비교적) 익숙한 방식으로 데이터 처리가 가능한데, spark의 이러한 특징이 pig같은 기술과의 차이점이다.
  • scalable 하다.

✅ 기본적으로 분산환경을 사용하는 가장 큰 이유가 "확장성"이다. 

분산환경 VS MultiProcessing
- 데이터가 그렇게 크지 않다면, 단일 서버에서 multiprocessing으로 돌리는 것도 고려해볼 만하다. 왜냐하면 분산환경에서 실행계획, 리소스 할당 등의 과정에서 단일서버보다 성능이 저하되는 경우도 있기 때문이다.
- 그러나 이후 확장성이 큰 경우에는 당연히 분산환경을 고려하는 것이 좋다.

그렇다면 확장성을 고려해서 분산환경을 고려한다면 어떠한 분산 데이터 처리가 적절할까?

Spark VS MR VS Hive
- 통계같은 단순한 데이터 처리/조회는 Hive가 적절. SQL로 작업이 가능하기 때문
- 기계학습과 같은 반복적인 데이터 플로우/대화형 작업에는 Spark가 적절. 메모리에 의존하는 만큼 리소스 할당관련해서 오류가 발생하기 쉽다.
-  MR은  강한 failure tolerance(결함 감내)를 가진다. 즉 안정적이다.
  • 속도가 빠르다. 
    • DAG(방향성 비사이클 그래프) 엔진기반으로  workflow를 최적화
    • 인메모리 기반
    • lazy evaluation
      • spark의 연산 방식. 다음 강의에서 RDD와 함께 설명
  • hot하다

 

Spark의 구조

- spakr의 장점인 확장성을 이야기할 때 잠깐 언급된 spark의 구조

  • spark는 master-slave 구조
  • 작업을 관장하는 driver program / 실제 작업이 동작하는 executor 
    • executor 프로세스는 일종의 캐시(cache)를 갖고, 작업(task)을 진행
    • 이 캐시가 spark 성능의 핵심이다.
    • task가 이 캐시를 공유하여 작업의 속도를 높일 수 있는 것.
  • cluster manager는 애플리케이션 간의 컴퓨팅 자원을 관리해주는 역할
    • cluster manager에는  yarn, mesos 등

 

 Spark의 구성요소

  • spark streaming :  실시간 데이터 처리 가능
  • spark SQL :  SQL인터페이스에 spark 데이터 세트의 최적화 작업이 진행되고 있다. 
🤔
이걸 통해 DAG 이상의 최적화를 달성합니다??
-> Spark SQL 인터페이스에 ‘데이터 셋’이라는 최적화 작업이 진행 중이고, 이를 통해 DAG 그 이상의 최적화를 만들어낸다. → 실행중인 쿼리에도 SQL 최적화를 할 수 있기 때문.
  • MLLib : spark의 데이터 세트에 실행하는 머신러닝을 위한 라이브러리
  • GraphX : 그래프이론에서 사용. 그래프의 속성을 분석하고 관계성을 파악하는 것을 spark로 가능

 

spark 활용하기

해당 강의에서는 위와 같은 이유로 python을 사용했다. 그러나 실제 활용 시에는 여러 이유로 scala를 고려하게 될 것이다.

  • 안정적인 성능, 빠른 속도, 적은 리소스 사용 등...

 

29강. RDD(회복성 분산 데이터 : Resilient Distributed Datasets)

RDD란?

spark내부에서 일어나는 여러 작업들을 추상적으로 표현한 것.

작업을 클러스터에 고르게 '분산'하고 실패가 생겨도 '회복'할 수 있으며 사용자에게 '데이터세트'처럼 보인다.

 

즉 RDD는 "회복력을 가진 분산 데이터 집합"인 것. 키-값 정보를 저장하는 데이터 세트

-> 여기서 회복력은 데이터를 처리하는 과정에서 일부 문제가 발생하더라도 스스로 복구할 수 있다는 의미

-> 단 복구의 의미는 spark가 정상적으로 동작하고 있는 상황을 가정

-> RDD의 작업 히스토리를 이용한 재시도를 수행함으로써 복구를 수행한다는 뜻

 

spark는 기본적으로 데이터의 일부가 유실되면, 백업한 데이터를 불러오는 것이 아니라 데이터를 다시 만들어내는 방식으로 복구를 수행한다. RDD는 불변성을 띄는데, 어떤 경우에도 같은 방법으로 만든 RDD는 항상 같은 데이터를 갖는다. 

따라서  RDD를 만드는 방법만 기억하면 언제든 똑같은 데이터를 만들 수 있다.

 

RDD 생성하기

SparkContext

spark에서 작업을 관장하는 driver program이  SparkContext를 만든다.

SparkContext는 RDD를 운영할 환경이면서  RDD를 만들어준다.

  • 독립적인 스크립트를 사용해서 생성
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

// SparkContext 객체 초기화 
// 클러스터 매니저의 타입 지정
val conf = new SparkConf().setAppName("sample").setMaster("yarn")
val sc = new SparkContext(conf)
  • spark shell을 이용하여 sc생성
$ spark-shell --master yarn --queue queue_name
Spark context Web UI available at http://127.0.0.1:4040
Spark context available as 'sc' (master = yarn, app id = application_1520227878653_37974).
Spark session available as 'spark'.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3c910acd

 

Creating RDD

  • parallelized() 메소드 이용 - 내부데이터를 이용하는 방법
 nums = parallelize([1,2,3,4])

-> 현실적으로는 유용하지않는 방법

  • SparkContext의 textfile() 메소드 이용 -  HDFS, S3 등 다양한 외부 파일시스템의 데이터를 이용
sc.textFile("file:///c:/users/frank/gobs-o-text.txt")
  • HiveContext 이용 - hive에 저장된 데이터 이용
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name,age FROM users")

이외에도 JDBC, Cassandra, HBase, Elasticsearch 등과 연결된 데이터베이스나 JSON, CSV, 압축 양식 등 hadoop이 지원하는 실제 구조화된 데이터 파일을 사용해서 RDD를 만들 수 있다. 

즉 대부분의 데이터 소스의 빅데이터를 RDD로 만들 수 있다. 

🤔 이후 DataFrame 강의 부분과 헷갈림
-  sparkcontext뿐 아니라, hivecontext를 통해서도 RDD를 생성할 수 있다고 함. 이와 더불어 다양한 database와도 통신이 가능하다고 함
- DataFrame에 대한 강의내용에서 DataFrame을 생성하는 방법으로 hivecontext가 나옴.

-> 실질적으로 RDD, DataFrame은 크게 다르지 않다. 생성된 후 ,  RDD / DataFrame 방식으로 오감. 기본적으로 DataFrame이 전혀 새로운 것이 아니라 RDD위에 뭔가가 추가된것

 

 

 RDD 연산 

기본적으로 MR에서의 mapping과 reducing을 할 수 있다.

Transformation - MR의 mapping에 비유

새로운 RDD를 만들어내는 연산.

return 타입은 다른 RDD타입으로 바뀌며 한번 만든 RDD는 영속성을 지닌다.

action을 하기 전까지는 transformation은 일어나지 않는다.

  •  map
    • 각 입력 행에 어떤 함수를 적용해서 전환된 데이터를 가지고 새로운 RDD를 만든다.
    • 기존 RDD 입력 행이 세로운 출력행과 1대1 관계를 가지고 있을때 사용

 -> lambda함수는 함수형 프로그래밍의 패러다임이다.

  •  flatmap
    • 각 입력행마다 다른 수의 출력이 필요할때
    • 예를 들어 각 줄을 여러행으로 분할하거나 무효한 입력 줄을 폐기할때
  • filter
  • distinct
  • sample
  • union, intersection, subtract, cartesian 등...

Action - MR의 Reducing에 비유

transformation을 실행. 즉 action 함수를 호출하기 전까지 드라이버 스크립트에서 아무것도 실행되지 않는다.

-> 이 것이 바로 spark의 lazy evaluation이다,

각 고유키와 관련된 값을들 집계하는 함수를정의

return 타입은 RDD에서 다른 타입으로 나온다.

  • collect
    •  RDD데이터를 가져와 드라이버 스크립트를 사용해 출력하거나 텍스트 파일로 저장하는 작업
  •  count
  • countByValue
    •  디버깅에 유용
  • take 등...

Lazy Evaluation

지난 강의에서 spark를 사용해야 하는 이유 중, 빠른 속도에 대해서 이야기 하다가 아래와 같은 말이 언급되었다.

Spark도 최종 결과에서 시작해 거꾸로 돌아오며 작업 흐름을 최적화하고 최종 결과에 다다르는 가장 빠른 방법을 계산합니다.
이것도 Spark의 인상적인 속도와 성능의 비결의 일부입니다.

이는 spark의 연산방식이 lazy evaluation으로 수행되기 때문이다.

spark의 연산은 action이 시작되는 시점에서 transformation끼리의 연계를 파악해 실행 계획의 최적화가 이루어진다.

즉 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행방법을 찾아 처리하는 장점을 가지는 것.

 

action을 실행하는 시점에서 spark에게  RDD에서 원하는 결과물이 무엇인지 말하고, 

이때 spark는 해당 결과물을 만들기 위해 거꾸로 일하며 그 결과물에 다다를 수 있는 최단 경로를 찾아낸다. 

 

이로 인해 디버깅할 때 조금 헷갈릴 수도 있고, 실행하기 전까지는 아무 일도 일어나지 않을 것이다.

 

30강.RDD 실습

- RDD로 평균 평점이 가장 낮은 영화 찾기

spark1.0기반의 예시

from pyspark import SparkConf, SparkContext

# This function just creates a Python "dictionary" we can later
# use to convert movie ID's to movie names while printing out
# the final results.
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            # fields[0]: 영화ID / fields[1]: 영화 제목 -> 즉 영화ID에 영화 제목을 매핑
    return movieNames

# Take each line of u.data and convert it to (movieID, (rating, 1.0))
# This way we can then add up all the ratings for each movie, and
# the total number of ratings for each movie (which lets us compute the average)
def parseInput(line):
    fields = line.split()
    return (int(fields[1]), (float(fields[2]), 1.0))

if __name__ == "__main__":
    # The main script - create our SparkContext
    conf = SparkConf().setAppName("WorstMovies")
    sc = SparkContext(conf = conf)

    # Load up our movie ID -> movie name lookup table
    movieNames = loadMovieNames()

    # Load up the raw u.data file
    lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")

    # Convert to (movieID, (rating, 1.0))
    movieRatings = lines.map(parseInput)

    # Reduce to (movieID, (sumOfRatings, totalRatings))
    ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )

    # Map to (movieID, averageRating)
    averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])

    # Sort by average rating
    sortedMovies = averageRatings.sortBy(lambda x: x[1])

    # Take the top 10 results
    results = sortedMovies.take(10)

    # Print them out:
    for result in results:
        print(movieNames[result[0]], result[1])

 

31강.DataSets 및 Spark 2.0

spark v.1에서 RDD로 spark 어플리케이션을 구현.

spark v.2에서 RDD의 단점으로 개선하여 발표한 것이 DataFrame과 Dataset을 이용.

-> RDD위에 데이터 세트를 생성할 수 있게 된 것.

RDD의 한계

  • RDD는 정보를 담은 행(row). 그러나 담긴 정보가 어떤 유형(type)인지 구체적으로 명시하고 있지 않음
  • 메모리나 디스크에 저장공간이 충분치 않으면 동작하지 않음.
  • 스키마(데이터베이스 구조)개념이 별도로 없음
    • 구조화 /  비구조화 데이터를 구분하지 않고 함께 저장하여 효율성이 떨어짐.
    • Spark가 구조화된 데이터를 다루는 방향으로 진화하면서 RDD한계점이 드러남
  • 인메모리 데이터 처리로 속도를 높일 수 있었으나, 테이블 조인 효율화 같은 처리를 사용자가 직접 제어해야 했기 때문에 최적화에 한계
    • 내장된 최적화 엔진이 없음.
// RDD 예제 
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.map(x => if(x >= 3) x else 0).reduce((x, y) => x + y)

 

DataFrame의 등장 - 별칭은 SchemaRDD

  • 구조화된 데이터 구조
    • 데이터 유형과 이름이 주어진 실제 열(column)을 가진 DataFrame에 구조화된 정보를 갖는다.
    • 이를 통해 SQL쿼리를 실행할 수 있게 됨.
  • 데이터의 스키마를 추상화하고 실제 데이터 유형 정보를 담고 있음으로 인해 데이터를 더 효율적으로 저장하고 전송할 수 있다.
    • SQL쿼리를 최적화하여, Spark를 더 빠르고 효율적으로 만들 수 있다.
    • JSON 이나 parquet 파일등의 구조화 데이터 양식을 읽거나 작성할 수 있다.
    • Tableau같은 시스템이나 JDBC를 사용해서 다른 데이터베이스와 소통 
  • RDD를 DataFrame 객체로 확장 
    • DataFrame은 실제로 RDD위에 구축되었으므로 RDD가 내재되어있다.

// 데이터프레임 예제 
val df = spark.read.json("examples/src/main/resources/people.json")
df.select($"name", $"age").filter($"age" > 20).show()
df.groupBy("age").count().show()

 

DataFrame 활용하기

실제로 DataFrame객체를 생성하는 방법에는 여러가지가 있다.

  • HiveContext
  • spark.read.json을 통해, JSON파일과 그 안에 내재된 구조에서 열의 이름과 열의 데이터 유형을 가져와서 DataFrame 생성
  • 이를 통해 만들어진 DataFrame에 대해 createOrReplaceTempView를 호출해서 데이터베이스의 table 같은 것을 만들 수 있다.
    • 이렇게 구조화된 데이터에 실제로 SQL쿼리를 실행할 수 있다.
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name,age FROM users")

 

혹은 SQL쿼리가 아니라 코드로 구조화된 데이터를 다룰수도 있다.

 

🤔
 DataFrame은 RDD위에 구축되었으므로 내재된 RDD를 추출해서 RDD수준의 작업을 할 수도있다.

'rdd'를 사용해 열 객체가 들어있는 'DataFrame'의 RDD를 되찾아 그 열 객체에 함수를 적용하고 RDD 수준의 작업이 가능합니다??

-> rdd는 말그대로 위에 언급된 하나의 함수 이름.  DataFrame을 rdd로 변환시켜 준다는 의미

 

DataSets

DataFrame은 열 객체의 DataSet이다.

DataSet이 더 포괄적인 용어로, DataFrame에서 사용하는 열객체를 포함해 어떤 유형의 정보도 포함할 수 있다.

python을 사용하지 않고 spark를 쓴다면 dataframe과 dataset을 유의해서 사용해야한다.

 

 

DataSets의 특징

  •  컴파일 과정에서 에러를 잡아낼 수 있다.
Spark에서 DataFrame이 나오면서 Structured한 프로그래밍을 사용할 수 있게 되었다. 여기서 Structured가 무엇인지 명확한 정의를 내리기 어려우나 구조가 잡힘으로서 필드마다 data type을 지정할 수 있게 되어 내부적인 효율성을 늘리게 되었다 (메모리 구조라던지, 실행 계획이라던지)
Dataset에서는 한발 더 나아가 typed API를 사용하여 컴파일 타임에 에러를 잡을 수도 있게 되었다.

  • SQL쿼리를 사용해 추가적인 최적화가 가능하다.
    • spark SQL을 사용해서 데이터베이스 서버를 시작해 연결할 수 있다.
    • spark에 내재된 thriftwerver를 시작하면, JDBC, ODBC를 통해 연결하여 입력된 테이블에 SQL명령어를 실행할 수 있다.

  • UDF - User Defined Functions
    •  SQL에 플러그인해서 사용할 수 있는 사용자 정의 함수

-> 위의 예시처럼 square라는 제곱하는 함수를 쿼리에 사용할 수 있다.

 

DataSet을 사용해야하는 경우

  • DataFrame 기능만으로 수행할 연산을 표현하는데 한계가 있을때
    • 복잡한 비즈니스 로직을 SQL이나 DataFrame 대신 단일함수로 인코디앻야할때
  • 성능저하를 감수하면서 타입안정성을 가진 데이터 타입을 사용하고 싶을때
    • 데이터타입이 유효하지 않은 작업을 수행하지 못하도록 방어적 코드, 정확도 높은 프로그램 개발을 위해서

 

RDD는 스파크 컨텍스트(SparkContext)를 이용하고,  데이터셋, 데이터프레임은 스파크 세션(SparkSession) 객체를 이용한다.

SQL과 데이터셋, 데이터프레임을 이용한 처리는 동일한 엔진을 이용하기 때문에 사용자에게 편리한 API를 이용하면 된다.

 

-> Spark 2.0의 머신 러닝 라이브러리(MLLib)나 스트리밍 라이브러리 등에서 'DataSet'를 기반으로 한 API를 사용한다.

이렇듯 Spark 2.0의 'DataSet'를 사용함으로써 성능적 이득을 얻을 뿐 아니라 Spark 위에 구축된 다양한 기술들을 더 쉽게 사용하고 이들을 다양한 방식으로 섞어서도 사용할 수 있는 것이다.

 

정리하면,
RDD를 사용하다가, 
1. 사용의 최적화에 대한 한계 2. 구조화된 데이터를 다루는 것(SQL이나, 코드를 통해 더 고도화된 분석 및 집계)에 대한 한계
이러한 이유로 DataFrame이 등장햇고,
1. 컴파일 에러에 대한 필요성 2. 수행할 연산의 한계
이러한 이유로 DataSets이 등장하여, DataFrame + DataSet이 결합된 형태로 활용하는 방법이 등장 

 

 

32강.DataFrame실습

- 평균 평점이 가장 낮은 영화 찾기

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    # Convert it to a RDD of Row objects with (movieID, rating)
    movies = lines.map(parseInput)
    # Convert that to a DataFrame
    movieDataset = spark.createDataFrame(movies)

    # Compute average rating for each movieID
    averageRatings = movieDataset.groupBy("movieID").avg("rating")

    # Compute count of ratings for each movieID
    counts = movieDataset.groupBy("movieID").count()

    # Join the two together (We now have movieID, avg(rating), and count columns)
    averagesAndCounts = counts.join(averageRatings, "movieID")

    # Pull the top 10 results
    topTen = averagesAndCounts.orderBy("avg(rating)").take(10)

    # Print them out, converting movie ID's to names as we go.
    for movie in topTen:
        print (movieNames[movie[0]], movie[1], movie[2])

    # Stop the session
    spark.stop()
  • SparkSession의 특징
    • 작업에 실패가 발생할 경우, 새로운 SparkSession을 생성하거나 지난 시간에 제대로 멈추지 않았다면 그 세션에 저장된 지점부터 시작할 수 있게 한다.

-> DataSet를 활용함으로써  SQL명령어를 더 쉽게 사용하고 명시적으로 코딩함수를 사용함으로써 데이터를 더 잘 다룰 수 있게 됨.

 

33강.DataSets-MLLibs 실습

- 영화 추천하기

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

# Load up movie ID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
    return movieNames

# Convert u.data lines into (userID, movieID, rating) rows
def parseInput(line):
    fields = line.value.split()
    return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))


if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

    spark.conf.set("spark.sql.crossJoin.enabled", "true")

    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.read.text("hdfs:///user/maria_dev/ml-100k/u.data").rdd

    # Convert it to a RDD of Row objects with (userID, movieID, rating)
    ratingsRDD = lines.map(parseInput)

    # Convert to a DataFrame and cache it
    ratings = spark.createDataFrame(ratingsRDD).cache()

    # Create an ALS collaborative filtering model from the complete data set
    als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
    model = als.fit(ratings)

    # Print out ratings from user 0:
    print("\nRatings for user ID 0:")
    userRatings = ratings.filter("userID = 0")
    for rating in userRatings.collect():
        print movieNames[rating['movieID']], rating['rating']

    print("\nTop 20 recommendations:")
    # Find movies rated more than 100 times
    ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
    # Construct a "test" dataframe for user 0 with every movie rated more than 100 times
    popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))

    # Run our model on that list of popular movies for user ID 0
    recommendations = model.transform(popularMovies)

    # Get the top 20 movies with the highest predicted rating for this user
    topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

    for recommendation in topRecommendations:
        print (movieNames[recommendation['movieID']], recommendation['prediction'])

    spark.stop()
  • spark.read.text()
    • return DataSet[row] or DataFrame
  •  sc(spark.SparkContext).textfile()
    • return RDD
🤔
RDD로 출력한 후 -> 행객체로 전환(row) -> DataFrame으로 전환해주는 이유는?
spark.read.text()로 하면 바로 DataFrame으로 출력해주는데 이를 바로 사용안하고 굳이 RDD로 전환한후에,
RDD의 map 연산을 거친후에 다시 DataFrame으로 전환해주는 이유는? 

-> 사용목적에 따라서 변환해주며 사용한다.  DataFrame 연산에 한계가 있는 경우 UDF를 사용하기 조금 그럴때, RDD로 변환해서 연산 사용

 

34강-35강.개인 실습

-평균 평점이 가장 낮은 영화를 평점의 개수로 걸러내기

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()

    # Get the raw data
    lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    # Convert it to a RDD of Row objects with (movieID, rating)
    movies = lines.map(parseInput)
    # Convert that to a DataFrame
    movieDataset = spark.createDataFrame(movies)

    # Compute average rating for each movieID
    averageRatings = movieDataset.groupBy("movieID").avg("rating")

    # Compute count of ratings for each movieID
    counts = movieDataset.groupBy("movieID").count()

    # Join the two together (We now have movieID, avg(rating), and count columns)
    averagesAndCounts = counts.join(averageRatings, "movieID")

    # Filter movies rated 10 or fewer times
    popularAveragesAndCounts = averagesAndCounts.filter("count > 10")

    # Pull the top 10 results
    topTen = popularAveragesAndCounts.orderBy("avg(rating)").take(10)

    # Print them out, converting movie ID's to names as we go.
    for movie in topTen:
        print (movieNames[movie[0]], movie[1], movie[2])

    # Stop the session
    spark.stop()

 


참고자료

728x90