【PySpark】Amazon S3 の JSON を DataFrame で読み込む

今回は PySpark で Amazon S3 の JSON を DataFrame で読み込む Tips です。環境は macOS 10.13.5, Apache Spark 2.3.0 です。

S3 の JSON を DataFrame で読み込む

Amazon S3 に置いてある以下のような JSON を DataFrame として読み込みたい。

$ ls *.json
alice.json  bob.json    eve.json
$ cat *.json
{"name": "Alice", "age": 30, "gender": "F"}
{"name": "Bob", "age": 25,"gender": "M"}
{"name": "Eve", "age": 20, "gender": "F"}

S3 への connector は以下があるが,  S3AFileSystem がデファクトスタンダードで残りの2つは deprecated となっている。

  • S3FileSystem (url prefix: “s3”)
  • S3 Native FileSystem (url prefix: “s3n”)
  • S3AFileSystem (url prefix: “s3a”)

まず, 以下のような s3.properties を書く。

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

hadoop-aws を使うが, 上記の設定で spark.driver.extraClassPath に jar の path を追加するとコマンド実行時に DL しないで済む。

今回は pyspark の packages オプションに org.apache.hadoop:hadoop-aws:2.7.3, properties-file オプションに s3.properties を指定する。

spark.read.json() で s3a://$(bucket-name)/$(object-name) で読みこめ DataFrame が得られる。

$ export PYSPARK_DRIVER_PYTHON=ipython
$ pyspark --packages org.apache.hadoop:hadoop-aws:2.7.3 --properties-file s3.properties
Python 2.7.11 |Anaconda custom (x86_64)| (default, Jun 15 2016, 16:09:16)
Type "copyright", "credits" or "license" for more information.

IPython 4.2.0 -- An enhanced Interactive Python.

...

2018-06-24 16:18:38 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 2.7.11 (default, Jun 15 2016 16:09:16)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.types import *

In [2]: fields = [StructField('name', StringType(), True),
          StructField('age', IntegerType(), True),
          StructField('gender', StringType(), True)]

In [3]: df = spark.read.json("s3a://t2sy.example/*.json", schema=StructType(fields))

In [4]: df.describe
Out[4]: <bound method DataFrame.describe of DataFrame[name: string, age: int, gender: string]>

In [5]: df.show()
+-----+---+------+
| name|age|gender|
+-----+---+------+
|Alice| 30|     F|
|  Eve| 20|     F|
|  Bob| 25|     M|
+-----+---+------+

AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY を設定ファイルからでなく, 実行時に sc._jsc.hadoopConfiguration().set() で読み込むこともできる。

$ pyspark --packages org.apache.hadoop:hadoop-aws:2.7.3
Python 2.7.11 |Anaconda custom (x86_64)| (default, Jun 15 2016, 16:09:16)
Type "copyright", "credits" or "license" for more information.

IPython 4.2.0 -- An enhanced Interactive Python.

2018-06-24 16:48:19 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 2.7.11 (default, Jun 15 2016 16:09:16)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.types import *

In [2]: sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", [ACCESS_KEY])

In [3]: sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", [SECRET_KEY])

In [4]: fields = [StructField('name', StringType(), True),
          StructField('age', IntegerType(), True),
          StructField('gender', StringType(), True)]

In [5]: df = spark.read.json("s3a://t2sy.example/*.json", schema=StructType(fields))
2018-06-24 16:50:27 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException

In [6]: df.show()
+-----+---+------+
| name|age|gender|
+-----+---+------+
|Alice| 30|     F|
|  Eve| 20|     F|
|  Bob| 25|     M|
+-----+---+------+

Boto 3 を使う方法も試したが hadoop-aws の方が簡単だった。もちろん Scala (spark-shell) でも同様の方法で読みこめる。

S3 に RDD を書き込む

DataFrame を RDD に変換して saveAsTextFile() で S3 に書き込む。

In [7]: df_age = df.select("age").summary()

In [8]: df_age.show()
+-------+----+
|summary| age|
+-------+----+
|  count|   3|
|   mean|25.0|
| stddev| 5.0|
|    min|  20|
|    25%|  20|
|    50%|  25|
|    75%|  30|
|    max|  30|
+-------+----+

In [9]: df_age.rdd.saveAsTextFile("s3a://t2sy.example/df_age")

[1] Manipulating files from S3 with Apache Spark
[2] Difference in boto3 between resource, client, and session?