Processing Large JSON dataset with Spark SQL
Spark SQL offers built-in support, like JSON, parquet, XML, CSV, text files, and other numbers of data formats.
Each new release of the Spark version contains various improvements in the API that make use of JSON data more convenient and efficient.
JavaScript Object Notation (JSON) format is a very basic file readable for humans, and convenient to use.
But its simplicity, since it’s schema-less, can lead to problems.
Such services can return insane JSON responses containing integer numbers as strings, or encode nulls
in different ways, such as null
, ""
, or even "null"
,
particularly when you have to deal with unreliable third-party data sources.
Assuming you have an access to spark tools like spark-shell
to execute the below commands.
Loading data
You can read and JSON directly from files like HDFS or S3 bucket path to DataFrame:
Example for S3
val df = spark.read.json("s3a://datastore/files/json/some-file.json")
Example for HDFS
val df = spark.read.json("/datastore/files/json/some-file.json")
To be noted - Spark assumes each line to be a separate JSON object, so it will fail if you’ll try to load a pretty formatted JSON file.
In that case, you can opt for the multiline
option to avoid the corrupt record problem while reading JSON files.
val df = spark.read.option("multiline", "true").json("s3a://datastore/files/json/some-file.json")
Also you read JSON data from RDD[String] object like:
Example 1
// construct RDD[Sting]
val events = sc.parallelize("""{"action":"create","timestamp":"2020-10-13T00:01:17Z"}""" :: Nil)
// read it
val df = spark.read.json(events)
Example 2
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val peopleDataset = spark.createDataset(
"""{"name":"Vijay","address":{"city":"Bengaluru","state":"Karnataka"}}"""
:: Nil)
val peopleDf = spark.read.json(peopleDataset)
Schema Inference and Explicit Schema definition
By hitting spark.read.json(events)
will not load data, since DataFrames are evaluated lazily.
But it will trigger schema inference, the spark will go over RDD to determine schema that fits the data.
In the spark-shell you can print schema using printSchema method:
scala> df.printSchema()
root
|-- action: string (nullable = true)
|-- timestamp: string (nullable = true)
As you saw in the last example Spark by default inferred type of both columns as strings similarly spark does the same for all strcut and arry types.
It is possible to provide schema explicitly to avoid that extra scan. This eventually helps to improve the reading performance for larger files:
val schema = (new StructType).add("action", StringType).add("timestamp", TimestampType)
val df = spark.read.schema(schema).json(events)
df.show(false)
// +------+--------------------+
// |action| timestamp|
// +------+--------------------+
// |create|2016-01-07 01:01:...|
// +------+--------------------+
As you might have noticed type of timestamp
column is explicitly forced to be a TimestampType
.
It’s important to understand that this type coercion is performed in JSON parser,
and it has nothing to do with DataFrame’s type casting functionality.
Type coercions implemented in parser are somewhat limited and in some cases unobvious.
Following example demonstrates it:
val events = sc.parallelize(
"""{"action":"create","timestamp":1452121277}""" ::
"""{"action":"create","timestamp":"1452121277"}""" ::
"""{"action":"create","timestamp":""}""" ::
"""{"action":"create","timestamp":null}""" ::
"""{"action":"create","timestamp":"null"}""" ::
Nil
)
val schema = (new StructType).add("action", StringType).add("timestamp", LongType)
spark.read.schema(schema).json(events).show(false)
// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// | null| null|
// |create| null|
// |create| null|
// | null| null|
// +------+----------+
Frankly, that is not a result that one can expect.
Look at 2nd row in the result set, as you may see, there is no conversion from string to an integer.
But here is one more big problem, if you try to set type for which parser doesn’t have conversion,
it won’t simply discard value and set that field to null
, instead, it will consider the entire row as incorrect, and set all fields to null
s.
The good news is that you can read all values as strings.
It is always better to read the JSON file as it is, default convert most columns to stringType first.
After that run Type Casting to avoid data loss from parser’s direct conversion as shown above.
Example of Type Casting
If you can’t be sure in a quality of you data, the best option is to explicitly provide schema forcing StringType
for all untrusted fields to avoid extra RDD scan, and then cast those columns to desired type:
val schema = (new StructType).add("action", StringType).add("timestamp", StringType)
spark.read.schema(schema).json(events).select($"action", $"timestamp".cast(LongType)).show(false)
// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// |create|1452121277|
// |create| null|
// |create| null|
// |create| null|
// +------+----------+
Now that’s more like a sane result.
Spark’s catalyst optimizer has a very powerful type casting functionality,
let’s see how we can parse UNIX timestamps from the previous example:
val schema = (new StructType).add("action", StringType).add("timestamp", StringType)
spark.read.schema(schema).json(events).select($"action", $"timestamp".cast(LongType).cast(TimestampType)).show(false)
// +------+--------------------+
// |action| timestamp|
// +------+--------------------+
// |create|2016-01-07 00:01:...|
// |create|2016-01-07 00:01:...|
// |create| null|
// |create| null|
// |create| null|
// +------+--------------------+
Handling nested objects
Often in API responses, useful data might be wrapped in several layers of nested objects:
{
"payload": {
"event": {
"action": "create",
"timestamp": 1452121277
}
}
}
Star (*) expansion makes it easier to unnest with such objects, for example:
val vals = spark.createDataset(
"""{"payload":{"event":{"action":"create","timestamp":1452121277}}}"""
:: Nil)
val schema = (new StructType)
.add("payload", (new StructType)
.add("event", (new StructType)
.add("action", StringType)
.add("timestamp", LongType)
)
)
spark.read.schema(schema).json(vals).select($"payload.event.*").show(false)
// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// +------+----------+
If you need more control over column names, you can always use as method to rename columns, e.g.:
spark.read.schema(schema).json(vals)
.select(
$"payload.event.action".as("event_action"),
$"payload.event.timestamp".as("event_timestamp")
).show(false)
// +------------+---------------+
// |event_action|event_timestamp|
// +------------+---------------+
// | create| 1452121277|
// +------------+---------------+
Converting Large JSON files to Parquet
Source - https://garrens.com/blog/2017/10/09/spark-file-format-showdown-csv-vs-json-vs-parquet/
Source - https://garrens.com/blog/2017/06/26/real-time-big-data-analytics-parquet-and-spark-bonus/
Parquet is optimized for the Write Once Read Many (WORM) paradigm.
It’s slow to write, but incredibly fast to read, especially when you’re only accessing a subset of the total columns.
It is a compressible binary columnar data format used in the Hadoop ecosystem.
- Binary means parquet files cannot be opened by typical text editors natively (sublime text, vim, etc).
- Columnar means the data is stored as columns instead of rows as most traditional databases (MySQL, PostgreSQL, etc)
and file formats (CSV, JSON, etc). - Compressed means the file footprint on disk (HDFS, S3, or local filesystem) is smaller than a typical raw uncompressed file.
Parquet handles compression differently than traditional compression of a CSV file for example, but in a similar vein to Avro. - Use snappy compression if storage space is not a concern due to it being splittable,
but for what should be a relatively small performance hit but much better compression, use gzip.
For use cases requiring operating on entire rows of data, a format like CSV, JSON, or even AVRO should be used.
JSON
Named columns | Inferred types | EAGERLY evaluated
val df = spark.read.json("data.json")
df.printSchema
root
|-- alphanum: string (nullable = true)
|-- epoch_date: long (nullable = true)
|-- guid: string (nullable = true)
|-- name: string (nullable = true)
Like the eagerly evaluated (for schema inferencing) CSV, JSON files also are eagerly evaluated.
Parquet
Named Columns | Defined types | lazily evaluated
val df = spark.read.parquet("data.parquet")
df.printSchema
root
|-- alphanum: string (nullable = true)
|-- date: long (nullable = true)
|-- guid: string (nullable = true)
|-- name: string (nullable = true)
Unlike CSV and JSON, Parquet files are binary files that contain metadata about their contents,
so without needing to read/parse the content of the file(s), Spark can just rely on the header/metadata inherent to Parquet to determine column names and data types.
Use Apache Parquet instead of CSV or JSON whenever possible, because it’s faster and better.
How Parquet helps in processing large JSON files?
Reading rows-stored JSON files directly and processing them through DataFrames has lots of limitations and drawbacks.
JSON is slow to parse because it requires reading all of the rows in the entire file, parsing each line’s columns.
If those limitations had you cringing, I’ve made my case well :). There is an alternative that utilizes SParquet…
- Process the JSON files into Parquet files (snappy or gzip-compressed)
- Use Spark with those Parquet files to drive a powerful and scalable analytics solution
Parquet wins in reading, filtering, aggregating (group by) over JSON.
Use the right compression for files
The types of files we deal with can be divided into two types
- Splittable ( eg. lso, bzip2, snappy)
- Non-splittable ( eg. gzip, zip, lz4)
For discussion purposes, “splittable files” means that they can be processed in parallel in a distributed manner rather than on a single machine (non-splittable).
Key Takeaways
The larger datasets are, the longer you wait. Even if you need only the first record from the file, Spark (by default) reads its whole content to create a valid schema consisting of the superset of used fields and their types. Let’s see how to improve the process with three simple hints.
-
Read large JSON and save it as Parquet file format with snappy compression for faster execution, data validation, and quick metrics.
-
In case of larger JSON to read, use
sampling ratio = 0.1
to read JSON files if you are sure don’t want to leave any field or schema is not static, go for the full dataset.
# JSON
spark.read.options(samplingRatio=0.1).json("sample/json/")
-
In case Schema is known and static. It is a better approach to read large JSON files with an explicit schema. Such the parser won’t spend much time in analyzing the dynamic content to assign the DataTypes while creating DataFrames.
-
It is better to store the static schema as JSON in an external file and read the schema into a variable before reading large JSON files.
schema_json = spark.read.text("/.../sample.schema").first()[0]
schema = StructType.fromJson(json.loads(schema_json))
Comments
Post a Comment