spark-data

こんにちはmikotoです。

Apache Sparkの紹介記事に引き続きまして、今回はデータ形式の部分について、深掘りしてみたいと思います。

Sparkは基本的な分析・データ加工・バッチ処理を行う場合、以下の2つのデータ形式と、それらの基本的な処理を理解していれば利用できます。

  • RDD
  • DataFrame

ということで、今回はRDDとDataFrameのご紹介と、そのオペレーション方法についてご紹介したいと思います。

RDD: Resilient Distributed Dataset

RDDの基本コンセプトは、フォールトトレランスで並列処理可能なデータ形式です。
RDDクラスに実装された mapreduce メソッドを利用することで、自動的に並列処理されます。

RDD形式のデータを作成する方法

RDD形式のデータを作成するには、以下2つの方法があります。
scSparkContext

1. Collectionデータを並列化する

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

2. 外部データソースからロードする

ローカルファイル

val distFile = sc.textFile("data.txt")

S3

val distFile = sc.textFile("s3n://your-bucket/data.txt")

RDDの基本操作

map & reduce だけ抑えておけば基本的なことはできるかと思います。
もう少し凝ったことを行いたい場合は、リファレンスを参照してください。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

DataFrame

DataFrameは、Spark SQLで利用されるデータ形式です。
RDDとは異なり、型などの情報を与えることで、より最適化された計算を実現します。
また、基本的なSQL操作を供えており、一般的なDataFrameを利用したプログラミングと同じような形で記述することは当然ながら、SQL形式でも記述することができます。

DataFrame形式のデータを作成する方法

DataFrameは、構造化データファイル、HiveやRDBなどの外部DB、RDDから構築することができます。
今回はRDDからDataFrameを構築してみます。

RDDからDataFrameへの変換

RDDからDataFrameに変換するために、createDataFrameメソッドを利用する方法があります。
createDataFrameStructTypeを引数に取ることで、型情報を与えることができます。

引数違いで同名のメソッドが幾つか用意されていますが、今回は下記を利用します。

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

また、Rowという新しい型が登場しますが、その名の通り1行のデータを格納するために利用します。
Rowは可変長の引数を取るため、Row(data1, data2)のように利用できます。
下記では、テキストファイルをロードしたRDDに対し、mapにより各行を文字列から,で分割したリストに変換した後、0番目と1番目の要素からRowを構築しています。

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val df = sqlContext.createDataFrame(people, schema)

Case Classを利用したRDDからDataFrameへの変換

上記では、型情報を与えるために、StructTypeを構築する必要がありました。
しかし、要素がCase Classである場合は、型情報が明確であるため、暗黙的な型変換を利用することで、改めてStructTypeを利用して型定義を行う必要はありません。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

case class Person(name:String, age:Int)

// define data
val peopleData = List(
    Person("taro", 21),
    Person("hanako", 20)
)

// to RDD
val people = sc.parallelize(peopleData)

// to DF
val df = people.toDF()

DataFrameの基本操作

DataFrameの基本操作は、基本的なSQLに加え、map,reduceなどの並列処理可能な操作があります。

SQL

// Creates a temporary view using the DataFrame
df.createOrReplaceTempView("mytable")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT * FROM mytable")

// show results
results.show()

一般的なDataFrame処理

// This import is needed to use the $-notation
import spark.implicits._

// Print the schema in a tree format
df.printSchema()

// Select only the "name" column
df.select("name")

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1)

// Select people older than 21
df.filter($"age" > 21)

// Count people by age
df.groupBy("age").count()

終わりに

今回は既に手元にまとまったデータがある場合に活用できるSparkの基本的なテクニックを紹介しました。
もちろん、同じことはHadoopでも行うことができますが、Spark Shellを利用してインタラクティブに計算結果を確認できることがSparkの強みのひとつです。
まずは簡単なデータ分析やデータ加工に利用してみるとSparkのメリットを感じられるかもしれません。