今回は PySpark で Amazon S3 の JSON を DataFrame で読み込む Tips です。環境は macOS 10.13.5, Apache Spark 2.3.0 です。
S3 の JSON を DataFrame で読み込む
Amazon S3 に置いてある以下のような JSON を DataFrame として読み込みたい。
$ ls *.json
alice.json bob.json eve.json
$ cat *.json
{"name": "Alice", "age": 30, "gender": "F"}
{"name": "Bob", "age": 25,"gender": "M"}
{"name": "Eve", "age": 20, "gender": "F"}
S3 への connector は以下があるが, S3AFileSystem がデファクトスタンダードで残りの2つは deprecated となっている。
- S3FileSystem (url prefix: “s3”)
- S3 Native FileSystem (url prefix: “s3n”)
- S3AFileSystem (url prefix: “s3a”)
まず, 以下のような s3.properties を書く。
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.access.key [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key [SECRET_KEY]
hadoop-aws を使うが, 上記の設定で spark.driver.extraClassPath に jar の path を追加するとコマンド実行時に DL しないで済む。
今回は pyspark の packages オプションに org.apache.hadoop:hadoop-aws:2.7.3, properties-file オプションに s3.properties を指定する。
spark.read.json() で s3a://$(bucket-name)/$(object-name) で読みこめ DataFrame が得られる。
$ export PYSPARK_DRIVER_PYTHON=ipython
$ pyspark --packages org.apache.hadoop:hadoop-aws:2.7.3 --properties-file s3.properties
Python 2.7.11 |Anaconda custom (x86_64)| (default, Jun 15 2016, 16:09:16)
Type "copyright", "credits" or "license" for more information.
IPython 4.2.0 -- An enhanced Interactive Python.
...
2018-06-24 16:18:38 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 2.7.11 (default, Jun 15 2016 16:09:16)
SparkSession available as 'spark'.
In [1]: from pyspark.sql.types import *
In [2]: fields = [StructField('name', StringType(), True),
StructField('age', IntegerType(), True),
StructField('gender', StringType(), True)]
In [3]: df = spark.read.json("s3a://t2sy.example/*.json", schema=StructType(fields))
In [4]: df.describe
Out[4]: <bound method DataFrame.describe of DataFrame[name: string, age: int, gender: string]>
In [5]: df.show()
+-----+---+------+
| name|age|gender|
+-----+---+------+
|Alice| 30| F|
| Eve| 20| F|
| Bob| 25| M|
+-----+---+------+
AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY を設定ファイルからでなく, 実行時に sc._jsc.hadoopConfiguration().set() で読み込むこともできる。
$ pyspark --packages org.apache.hadoop:hadoop-aws:2.7.3
Python 2.7.11 |Anaconda custom (x86_64)| (default, Jun 15 2016, 16:09:16)
Type "copyright", "credits" or "license" for more information.
IPython 4.2.0 -- An enhanced Interactive Python.
2018-06-24 16:48:19 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 2.7.11 (default, Jun 15 2016 16:09:16)
SparkSession available as 'spark'.
In [1]: from pyspark.sql.types import *
In [2]: sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", [ACCESS_KEY])
In [3]: sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", [SECRET_KEY])
In [4]: fields = [StructField('name', StringType(), True),
StructField('age', IntegerType(), True),
StructField('gender', StringType(), True)]
In [5]: df = spark.read.json("s3a://t2sy.example/*.json", schema=StructType(fields))
2018-06-24 16:50:27 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
In [6]: df.show()
+-----+---+------+
| name|age|gender|
+-----+---+------+
|Alice| 30| F|
| Eve| 20| F|
| Bob| 25| M|
+-----+---+------+
Boto 3 を使う方法も試したが hadoop-aws の方が簡単だった。もちろん Scala (spark-shell) でも同様の方法で読みこめる。
S3 に RDD を書き込む
DataFrame を RDD に変換して saveAsTextFile() で S3 に書き込む。
In [7]: df_age = df.select("age").summary()
In [8]: df_age.show()
+-------+----+
|summary| age|
+-------+----+
| count| 3|
| mean|25.0|
| stddev| 5.0|
| min| 20|
| 25%| 20|
| 50%| 25|
| 75%| 30|
| max| 30|
+-------+----+
In [9]: df_age.rdd.saveAsTextFile("s3a://t2sy.example/df_age")
[1] Manipulating files from S3 with Apache Spark
[2] Difference in boto3 between resource, client, and session?