visit
Working with Spark Datasets have been quite interesting and most of the time rewarding in our current project. It has a simple yet powerful API that abstracts out the need to code in complex transformations and computations. To be honest, we also have a fairly straightforward use case: few domain entities, fewer transformations based on simple joins.
However, there are also few things that have been counterproductive to us but I am going to focus on one of them: lack of type safety in some operations, particularly, joins.dataSetA.join(dataSetB, "columnA")
The above code will fail on runtime if either of dataSetA and dataSetB (or both) don’t have “columnA” column. This is a waste of resources at multiple levels: from precious CPU cycles to developer’s time. In the remainder of this blog, we will add compile-time safety to join operations and learn a lot in the process.
Before we proceed, a disclaimer: This is not an unsolved problem. does a fantastic job at providing the type-safety for Datasets. However, it is a very evolved and complete framework which provides a newer abstraction of TypedDatasets and we really did not want to add an external dependency when we just wanted to have type safety in our select Dataset methods. The solution we are going to formulate is what Frameless does which inturn leverages on generic programming using awesome .
For any Dataset of type T (case class/Product type), we need to understand all the properties of type T along with their types. This means that we want to move from a specialized T to generalized list of properties with types. and this, in a very very simplified way of explanation, is what Shapeless provides. It provides a conversion to and from a case class and a heterogeneous list (HList) and a bouquet of functions to apply on the list. The best material to read about shapeless is and I strongly suggest to give it a thorough read.
For now, we can do with a knowledge that shapeless provides an interface LabelledGeneric which provides the interface. This can be explained as belowcase class Person(name: String, age: Int, isEmployee: Boolean)
//defined class Person
generic = LabelledGeneric[Person]
//generic: shapeless.LabelledGeneric[Person]{type Repr = shapeless.::[String with shapeless.labelled.KeyTag[Symbol with shapeless.tag.Tagged[String("name")],String],shapeless.::[Int with shapeless.labelled.KeyTag[Symbol with shapeless.tag.Tagged[String("age")],Int],shapeless.::[Boolean with shapeless.labelled.KeyTag[Symbol with shapeless.tag.Tagged[String("isEmployee")],Boolean],shapeless.HNil]]]}
//usage:
val person = Person("John Doe", 32, true)
val hlist = generic.to(person)
//hlist: generic.Repr = John Doe :: 32 :: true :: HNil
HNilgeneric.from(hlist)
//res0: Person = Person(John Doe,32,true)
Given a type T, if there exists a property of name PName and type PType then yes, the conditions are satisfied<a href="//medium.com/media/1a1b7bfc9b765cc9cef244fc438d206c/href">//medium.com/media/1a1b7bfc9b765cc9cef244fc438d206c/href</a> Let’s break down the gist line by line:
We define an apply method which accepts a Witness and implicitly expects an instance of PropertyExists for a certain PType. Witness is one of the utility abstractions of Shapeless which given a Symbol returns handle to its type and value.
selector: Selector.Aux[H, PName, PType]
The Selector is one of the simpler abstraction of Shapeless which provides the PType given it finds the propertyName PName in record H.
So in simpler terms, the implicitProvider talks the following:
For a given type T, if you are able to create a HList of type H from LabelledGeneric[T] and then if you are able to also select PType from that HList H a property of name PName, then go ahead and provide a PropertyExists instance for type T, PName and PType.
personDs = Seq(persons).toDS().enriched
val ageColumn: Column = personDs('age) //compiles
val nameColumn: Column = personDs('namesss)
//Error:(36, 56) Symbol with shapeless.tag.Tagged[String("namesss")] not found in Person
val nameColumn: Column = personDs('namesss)
and that is our first milestone!
PS: we need to expose enriched as the compile will pick apply method of Dataset and not that of RichDataset.
//for left join
//natural join single key reference
datasetA.leftJoin(datasetB).withKey('key)
//natural join multiple keys
datasetA.leftJoin(datasetB).on('key1, 'key2)
//for joins not natural.
datasetA.leftJoin(datasetB) where {
datasetA('keyA) === datasetB('keyB)
}
seems pretty ok. Let’s dive in!
<a href="//medium.com/media/dfc60e490507fa5ff7974e83defa5b2b/href">//medium.com/media/dfc60e490507fa5ff7974e83defa5b2b/href</a>
We introduce a JoinDataSet which provides the syntactical sugar to facilitate the actual join operations. JoinDataSet will also provide us with the final methods of actual join as decided in DSL: withKey, on and where.
This enforces that not only column name should be the same, but also their type.for a Symbol column, we check if PropertyExists for both Dataset[L] and Dataset[R] and also for both datasets the type is K.
.where is even simpler. It takes a nullary function which returns a Column and leverages on the way we express conditions on Column. To express Column we use the apply method we created
As one can observe .on is not a function at all! If we think on this and our definition of on method in the DSL, what we need to work on is the varargs of Symbol and for each such symbol have a PropertyExists created. Unfortunately, there is no way to convert a varargs to HList as varargs are Seq and Seq is not Product (case class type). For this Shapeless has provided a sugar abstraction SingletonProductArgs which uses dynamic programming to create an HList. the applyProduct is really an apply method on “on” object and allows us to achieve our syntax.
Here’s how the above code pans out:Heres the complete code: <a href="//medium.com/media/adf2bc599111e677b9da8a85f86cae27/href">//medium.com/media/adf2bc599111e677b9da8a85f86cae27/href</a>Given I have varargs to dynamically apply to a method named “apply” which gives out an HList V, and I can generate an implicit instance which gives List of Symbols out of it, and also for both the Datasets, PropertiesExists of type K in the HList V: do the Join.
trait PropertiesExists[T, PName <: HList, PType]
Now, like a List, HList also has 3 basic building blocks: Head, Tail and Nil (in this case HNil) where:
HList = Head :: Tail :: HNil
So all we need to do now is define implicitProviders for HNil, Tail and Head. Since the head is essentially a single Property, PropertyExists fits just fine! for the tail, we recursively try to create an implicit provider as we do for any List.
<a href="//medium.com/media/17a5c19eca9d2c0565461a09aa8b7195/href">//medium.com/media/17a5c19eca9d2c0565461a09aa8b7195/href</a>
we can complete our RichDataSet as below:
<a href="//medium.com/media/a3a077534c4e0ca0a42013b506605cc9/href">//medium.com/media/a3a077534c4e0ca0a42013b506605cc9/href</a>