Window Functions in Scala: Time

Window functions are handy little tools that can be used to compute rolling averages, ranking by company or customer, and a host of other nifty things. But, they can be a little hard to comprehend, especially where dates and times are concerned.

In Scala, the easiest way to make time windows that don’t fall neatly on a day or year is using the rangeBetween function. Let’s use an example to illustrate.

The Set Up

First, let’s import the 2 scala packages you’ll need:

//import some built-in packages
import spark.implicits._
import org.apache.spark.sql.expressions.Window

Then, let’s create a simple dataframe. This creates a dataframe with integers representing unix timestamps for a handful of hours on 5-6-2018, along with temperature readings for 2 Northern Wisconsin cities.

//create a test dataframe
val testDF = Seq(
(1525567980,28.9, "Ashland"),
(1525569900,28.4, "Bayfield"),
(1525571580,28.9, "Bayfield"),
(1525575120,28.9, "Ashland"),
(1525578780,28.0, "Ashland"),
(1525582320,28.0, "Bayfield"),
(1525585980,28.0, "Bayfield"),
(1525589580,28.0, "Ashland"),
(1525591440,28.4, "Bayfield"),
(1525593180,28.0, "Ashland"),
(1525596780,28.0, "Bayfield"),
(1525600380,28.9, "Bayfield")
).toDF("uxt", "Temp", "City")

Next, we can make a time stamp from the integer that represents Unix time. This is just for readability.

//get a timestamp from the unix time
val testWithDates = testDF
.withColumn("Date", to_timestamp(from_unixtime($"uxt")))

Now we have a usable dataframe. Feel free to look at what you’ve got by using the show() command:

Table of Initial Data

Using rangeBetween

We’re ready to create a window. For this demonstration, we want a 1 hour window. In Unix time, that’s 3600 milliseconds. We’ll create the window by first ordering the data by the “uxt” column, and then creating a range that goes from 3600 milliseconds before this row’s timestamp to this row. This is a “look behind” sliding window. I’m using Window.currentRow here, but you can also specify the current row by using zero. That’s less intuitive to me, particularly when creating the window on a numeric column, so I prefer to use Window.currentRow. This code, then, creates a window that includes any rows that have a unix timestamp (“uxt” column) that are between the current row’s uxt value and the current row’s uxt value-3600.

//create a window for 1 hour - which is 3600 milliseconds in unix time.
val w = Window.orderBy(col("uxt")).rangeBetween(-3600, Window.currentRow)

Now that we have a window specification, we can use it to create new columns. Here, I’m creating a rolling 1 hour average, a minimum temperature, a maximum temperature and the count of the number of rows that are included in this window. I’m also creating a column that denotes the unix start of the window and a time stamp start of the window. And finally, I’m showing all the data.

val byRange = testWithDates
.withColumn("avgTemp", avg(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("minTemp", min(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("maxTemp", max(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("rowCount", count(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("uStart", col("uxt")-3600)
.withColumn("startDate", to_timestamp($"uStart"))
//show the data;

This results in output like this:
Table of data from the code example

This is a very small data set, and it’s relatively easy to follow along with what’s happening. The second row is less than an hour past the first row, so both the first and the second rows are included in the window. Likewise, the third row is less than an hour past the 1st and 2nd rows, and all three are included in the window. But the fourth row is more than an hour past the first 2 rows, so only the 3rd and 4th rows are included in that window. And so on. Note that the current row is always the end-point of the window, when using a look-behind window. We can visualize this data with windows for the 3rd and fourth rows. (Clicking on the visualization will open the interactive Tableau visualization in a new window.)

Adding a Partition

What if we wanted to look at a 2-hour window, by city. Easy! We can just add a partition to our window, and adjust the number of milliseconds. I’ve ordered the output by city and date, to make it a little easier to see what’s happening.

//create a window for 2 hours, partitioned by city
val wP = Window.partitionBy(col("City")).orderBy(col("uxt")).rangeBetween(-7200, Window.currentRow)
//add the aggregate columns
val byCity = testWithDates
.withColumn("avgTemp", avg(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("minTemp", min(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("maxTemp", max(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("rowCount", count(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("uStart", col("uxt")-3600)
.withColumn("startDate", to_timestamp($"uStart"))
.orderBy("City", "Date")
//show the data;

This gives you the following output:

Windowing with GroupBy

Using rangeBetween with unix time lets you have millisecond precision on the size of your windows. If you don’t need that kind of precision, you can also use windows with groupBy. Madahukar has written an excellent blog post on using windows with time and groupBy. Be careful, though. Watch what happens when we use groupBy to get the average, minimum and maximum for 1 hour:

//use window with groupby
val groupByWindow = testWithDates
.groupBy(window(col("Date"), "1 hour"))
.agg(avg("Temp").as("avgTemp"), min("Temp").as("minTemp"), max("Temp").as("maxTemp"))
.select("window.start", "window.end", "avgTemp", "minTemp", "maxTemp")

Using the groupBy function “rolls up” to the nearest whole value of whatever you are grouping by. This is great if you want an aggregate for each hour of the day (4 AM). It is not so great if you’re looking for more free-form windows (4:15 a.m to 5:15 a.m.).

Download the Code

If you’d like to run the example code in zeppelin, you can download the .json file.