Spark Dataset Casting with `as` Method

Silent errors when casting Spark Dataset using `as` method

Apache Spark has a lot of convenience methods for managing Datasets. In fact users are encouraged to use Datasets instead of Dataframes as they can benefit not only from the Catalyst Optimizer, but also from performant in-memory encoding with Tungsten as well as compile-time type safety. But casting Datasets with as method can cause some painful troubles.

Let’s say we want to process data about Starfighters. For that we will start with two case classes: one for detailed information and one for short description:

final case class StarfighterDetails(id: Long, model: String, clazz: String, manufacturer: String)
final case class StarfighterShortDesc(id: Long, model: String)

Now let’s create a Dataset populated with some starfighters’ details and let’s see how it looks like:

val details: Dataset[StarfighterDetails] = Seq(
    StarfighterDetails(1, "T-65 X-wing", "Assault starfighter", "Incom Corporation"),
    StarfighterDetails(2, "Kuat RZ-1 A-wing", "Interception starfighter", "Kuat Systems Engineering"),
    StarfighterDetails(3, "Koensayr BTL Y-wing", "Bomber", "Koensayr Manufacturing")
).toDS()

details.show(false)
+---+-------------------+------------------------+------------------------+
|id |model              |clazz                   |manufacturer            |
+---+-------------------+------------------------+------------------------+
|1  |T-65 X-wing        |Assault starfighter     |Incom Corporation       |
|2  |Kuat RZ-1 A-wing   |Interception starfighter|Kuat Systems Engineering|
|3  |Koensayr BTL Y-wing|Bomber                  |Koensayr Manufacturing  |
+---+-------------------+------------------------+------------------------+

So far so good. Now let’s try to drop some superfluous information by casting the dataset to Dataset[StarfighterShortDesc]. We expect to get 2-column dataset, with only id and model columns preserved. According to the documentation of Dataset.as[T] method:

Returns a new Dataset where each record has been mapped on to the specified type. The method used to map columns depend on the type of U:

  • When U is a class, fields for the class will be mapped to columns of the same name (…)

Seems perfect, here we go…

val shortDesc = starfightersDetails.as[StarfighterShortDesc]
shortDesc.show(false)
+---+-------------------+------------------------+------------------------+
|id |model              |clazz                   |manufacturer            |
+---+-------------------+------------------------+------------------------+
|1  |T-65 X-wing        |Assault starfighter     |Incom Corporation       |
|2  |Kuat RZ-1 A-wing   |Interception starfighter|Kuat Systems Engineering|
|3  |Koensayr BTL Y-wing|Bomber                  |Koensayr Manufacturing  |
+---+-------------------+------------------------+------------------------+

We get the same result, although the Dataset’s type parameter has changed!

How’s that possible? Well, it turns out that the casting is performed in some lazy manner and only the view of the data is changed, without any actual transformation. If we would RTFM till the end we could find this:

Note that as[] only changes the view of the data that is passed into typed operations, such as map(), and does not eagerly project away any columns that are not present in the specified class.

Interestingly, all we need to do to have the Dataset populated with proper objects is to map the Dataset with identity function, like this:

val mapped = shortDesc.map(s => s)
//or
val mapped = shortDesc.map(identity)
mapped.show(false)
+---+-------------------+
|id |model              |
+---+-------------------+
|1  |T-65 X-wing        |
|2  |Kuat RZ-1 A-wing   |
|3  |Koensayr BTL Y-wing|
+---+-------------------+

But there’s more fun – not every operation on the Dataset will update it’s internal structure. E.g. filtering will eliminate some rows according to the predicate, but it won’t touch the data structure:

val filtered = shortDesc.filter(s => s.id < 3)
filtered.show(false)
+---+----------------+------------------------+------------------------+
|id |model           |clazz                   |manufacturer            |
+---+----------------+------------------------+------------------------+
|1  |T-65 X-wing     |Assault starfighter     |Incom Corporation       |
|2  |Kuat RZ-1 A-wing|Interception starfighter|Kuat Systems Engineering|
+---+----------------+------------------------+------------------------+

The same goes with saving the Dataset to a file:

shortDesc.coalesce(1).write.parquet("shortDesc")

When we look into the output Parquet file, we can see the schema in the bottom. It’s for StarfighterDetails not for StarfighterShortDesc:

{
  "type": "struct",
  "fields": [
    {
      "name": "id",
      "type": "long",
      "nullable": false,
      "metadata": {}
    },
    {
      "name": "model",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "clazz",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "manufacturer",
      "type": "string",
      "nullable": true,
      "metadata": {}
    }
  ]
}

A silent fail case

Let’s now analyze a simple case where we will try to take the data that have been cast and save it to a Hive table. Will that reveal any errors?

Firstly, we create schema of type StructType from StarfighterShortDesc case class, thus that is what we are saving. Then we create Hive table named starfighters using the schema and choosing Avro as our file format. Just to see if everything is in place we can see the description of the table.

val schema = implicitly[Encoder[StarfighterShortDesc]].schema
spark.catalog.createTable("starfighters", "avro", schema, Map("path" -> "starfighters"))
spark.sql("describe formatted starfighters").show(false)
+----------------------------+---------------------------------------------------------------+-------+
|col_name                    |data_type                                                      |comment|
+----------------------------+---------------------------------------------------------------+-------+
|id                          |bigint                                                         |null   |
|model                       |string                                                         |null   |
|                            |                                                               |       |
|# Detailed Table Information|                                                               |       |
|Database                    |default                                                        |       |
|Table                       |starfighters                                                   |       |
|Owner                       |dawid                                                          |       |
|Created Time                |Sat May 16 17:16:50 CEST 2020                                  |       |
|Last Access                 |Thu Jan 01 01:00:00 CET 1970                                   |       |
|Created By                  |Spark 2.4.5                                                    |       |
|Type                        |EXTERNAL                                                       |       |
|Provider                    |avro                                                           |       |
|Table Properties            |[transient_lastDdlTime=1589642210]                             |       |
|Location                    |file:/home/dawid/src/blog-related/spark-playground/starfighters|       |
|Serde Library               |org.apache.hadoop.hive.serde2.avro.AvroSerDe                   |       |
|InputFormat                 |org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat     |       |
|OutputFormat                |org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat    |       |
|Storage Properties          |[serialization.format=1]                                       |       |
+----------------------------+---------------------------------------------------------------+-------+

NOTE: if you experience problems like that:

access denied org.apache.derby.security.SystemPermission("engine", "usederbyinternals")

try adding to your code:

System.setSecurityManager(null)

Please see this for more details.


Seems perfect! Now let’s save something in the table. We take the shortDesc Dataset, which was previously cast using as method, and save it to the table’s location. Then we analyze what’s in the table.

val shortDesc = details.as[StarfighterShortDesc]
shortDesc.write.mode(SaveMode.Overwrite).format("avro").save("starfighters")
spark.table("starfighters").show(false)
+---+-------------------+
|id |model              |
+---+-------------------+
|2  |Kuat RZ-1 A-wing   |
|1  |T-65 X-wing        |
|3  |Koensayr BTL Y-wing|
+---+-------------------+

So far so good! But what has been really saved in Avro files? Let’s see one of them. We can use avro-tools for that.

java -jar avro-tools-1.9.2.jar tojson part-00000-80afdf8a-5a0a-44d8-b93b-d3aa965d2d4d-c000.avro
{
  "id": 1,
  "model": {
    "string": "T-65 X-wing"
  },
  "clazz": {
    "string": "Assault starfighter"
  },
  "manufacturer": {
    "string": "Incom Corporation"
  }
}

We have all the unwanted data there, which Hive just ignores!

This way we can silently impact our cluster with unnecessary and unused data, which can turn out to be multiple times bigger than the data actually available with Hive. This kind of silent errors may result in significant costs – both in storage-related price and in performance, thus processing much bigger files will have its penalty.

meme