Window Functions in Scala: Time

Window functions are handy little tools that can be used to compute rolling averages, ranking by company or customer, and a host of other nifty things. But, they can be a little hard to comprehend, especially where dates and times are concerned.

In Scala, the easiest way to make time windows that don’t fall neatly on a day or year is using the rangeBetween function. Let’s use an example to illustrate.

The Set Up

First, let’s import the 2 scala packages you’ll need:

//import some built-in packages
import spark.implicits._
import org.apache.spark.sql.expressions.Window

Then, let’s create a simple dataframe. This creates a dataframe with integers representing unix timestamps for a handful of hours on 5-6-2018, along with temperature readings for 2 Northern Wisconsin cities.

//create a test dataframe
val testDF = Seq(
(1525567980,28.9, "Ashland"),
(1525569900,28.4, "Bayfield"),
(1525571580,28.9, "Bayfield"),
(1525575120,28.9, "Ashland"),
(1525578780,28.0, "Ashland"),
(1525582320,28.0, "Bayfield"),
(1525585980,28.0, "Bayfield"),
(1525589580,28.0, "Ashland"),
(1525591440,28.4, "Bayfield"),
(1525593180,28.0, "Ashland"),
(1525596780,28.0, "Bayfield"),
(1525600380,28.9, "Bayfield")
).toDF("uxt", "Temp", "City")

Next, we can make a time stamp from the integer that represents Unix time. This is just for readability.

//get a timestamp from the unix time
val testWithDates = testDF
.withColumn("Date", to_timestamp(from_unixtime($"uxt")))

Now we have a usable dataframe. Feel free to look at what you’ve got by using the show() command:

testWithDates.show()

Table of Initial Data

Using rangeBetween

We’re ready to create a window. For this demonstration, we want a 1 hour window. In Unix time, that’s 3600 milliseconds. We’ll create the window by first ordering the data by the “uxt” column, and then creating a range that goes from 3600 milliseconds before this row’s timestamp to this row. This is a “look behind” sliding window. I’m using Window.currentRow here, but you can also specify the current row by using zero. That’s less intuitive to me, particularly when creating the window on a numeric column, so I prefer to use Window.currentRow. This code, then, creates a window that includes any rows that have a unix timestamp (“uxt” column) that are between the current row’s uxt value and the current row’s uxt value-3600.

//create a window for 1 hour - which is 3600 milliseconds in unix time.
val w = Window.orderBy(col("uxt")).rangeBetween(-3600, Window.currentRow)

Now that we have a window specification, we can use it to create new columns. Here, I’m creating a rolling 1 hour average, a minimum temperature, a maximum temperature and the count of the number of rows that are included in this window. I’m also creating a column that denotes the unix start of the window and a time stamp start of the window. And finally, I’m showing all the data.

val byRange = testWithDates
.withColumn("avgTemp", avg(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("minTemp", min(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("maxTemp", max(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("rowCount", count(col("Temp")).over(w)).orderBy(asc("uxt"))
.withColumn("uStart", col("uxt")-3600)
.withColumn("startDate", to_timestamp($"uStart"))
//show the data
byRange.show(truncate=false);

This results in output like this:
Table of data from the code example

This is a very small data set, and it’s relatively easy to follow along with what’s happening. The second row is less than an hour past the first row, so both the first and the second rows are included in the window. Likewise, the third row is less than an hour past the 1st and 2nd rows, and all three are included in the window. But the fourth row is more than an hour past the first 2 rows, so only the 3rd and 4th rows are included in that window. And so on. Note that the current row is always the end-point of the window, when using a look-behind window. We can visualize this data with windows for the 3rd and fourth rows. (Clicking on the visualization will open the interactive Tableau visualization in a new window.)

Adding a Partition

What if we wanted to look at a 2-hour window, by city. Easy! We can just add a partition to our window, and adjust the number of milliseconds. I’ve ordered the output by city and date, to make it a little easier to see what’s happening.

//create a window for 2 hours, partitioned by city
val wP = Window.partitionBy(col("City")).orderBy(col("uxt")).rangeBetween(-7200, Window.currentRow)
//add the aggregate columns
val byCity = testWithDates
.withColumn("avgTemp", avg(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("minTemp", min(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("maxTemp", max(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("rowCount", count(col("Temp")).over(wP)).orderBy(asc("uxt"))
.withColumn("uStart", col("uxt")-3600)
.withColumn("startDate", to_timestamp($"uStart"))
.orderBy("City", "Date")
//show the data
byCity.show(truncate=false);

This gives you the following output:

Windowing with GroupBy

Using rangeBetween with unix time lets you have millisecond precision on the size of your windows. If you don’t need that kind of precision, you can also use windows with groupBy. Madahukar has written an excellent blog post on using windows with time and groupBy. Be careful, though. Watch what happens when we use groupBy to get the average, minimum and maximum for 1 hour:

//use window with groupby
val groupByWindow = testWithDates
.groupBy(window(col("Date"), "1 hour"))
.agg(avg("Temp").as("avgTemp"), min("Temp").as("minTemp"), max("Temp").as("maxTemp"))
.select("window.start", "window.end", "avgTemp", "minTemp", "maxTemp")
groupByWindow.show(truncate=false)

Using the groupBy function “rolls up” to the nearest whole value of whatever you are grouping by. This is great if you want an aggregate for each hour of the day (4 AM). It is not so great if you’re looking for more free-form windows (4:15 a.m to 5:15 a.m.).

Download the Code

If you’d like to run the example code in zeppelin, you can download the .json file.

Advertisement

Understanding a Maximal Margin Separator

Maximal Margin Separators

A Maximal Margin Separator (in a 2-dimensional space) is a hyperplane (in this case a line) that completely separates 2 classes of observations, while giving the most space between the line and the nearest observation. These nearest observations are the support vectors. In the plot below, the support vectors are the circled points. In this instance, the support vectors are evenly spaced, which means that maximal margin separator would be a line that falls halfway between each pair of support vectors and matches their slope. (In an instance where there are 3 support vectors, the line will parallel the slope of the side that has 2 support vectors.)

Scatterplot showing 2 classes of points.

Finding the line

Since we have a relatively simple plot, and we know what our support vectors are, we can find find the equation for the hyperplane by first finding the equation for the line.

x2 = m*x1 + b

Compare blue cross point (2, 2) to red circle point (2, 3). Notice that a point halfway (vertically) between them would be (2, 2.5). This is point 1 on our maximal margin separator. Compare blue cross observation (4, 6) to red circle observation (4, 7). Note that a point halfway between those two points would be (4, 6.5). This is point 2 on our maximal margin separator.

We can now compute the slope by dividing x22 – x21 by x12 – x11. That works out to (6.5-2.5)/(4-2) = 2. That’s our slope and we can sub that in for m in the equation:

x2 = 2 * x1 + b

We know what our points are. We can sub in either one to find our intercept (b). Subbing in the point at (4, 6.5), we get:

6.5 = 2 * 4 + b

or

6.5 = 8 + b

We can subtract 8 from both sides to get b:

6.5 – 8 = b – 8

-1.5 = b

So now we know that our line equation is:

x2 = 2 * x1 + -1.5

Equation for a hyperplane

A hyperplane equation looks like this:

beta0 + (beta1 * x1) + (beta2 * x2) = 0

with the caveat that (beta1^2 + beta2^2) = 1

Notice that the hyperplane equation has to equal zero. That’s so that all points above the hyper plane end up being postive, and all points below it end up being negative. We can then classify our points according to each class by whether they are positive or negative.

Let’s take this in steps:

First let’s fill in what we know from our point on the hyperplane and our linear equation. beta0 is our intercept, so fill that in.

-1.5 + beta1 x1 + beta2 x2 = 0

beta1 is our slope, so fill that in.

-1.5 + 2 x1 + beta2 x2 = 0

We know a point on our hyperplane is (4, 6.5), so we can fill in x1 and x2

-1.5 + 2 * 4 + beta2 * 6.5 = 0

Now we have to use algebra to solve for beta2. If you struggle with algebra, meet MathPapa’s Algebra Calculator. It will be your new best friend.

In this case, beta2 = -1.

Our hyperplane equation, discounting the caveat is:

-1.5 + 2(x1) + -1(x2) = 0

Dealing with the caveat

Now we have to deal with the caveat. We can use a normalization process called the l2 norm to find a scaling factor for all of our betas. We’ll only use beta1 and beta2 to find our normalization factor. But, then we’ll apply it to all three coefficients so that our entire equation continues to equal zero. You can prove to yourself that multiplying all the coefficents by the same factor works by running the following code (in R).

#set up our points
x1 = 4
x2 = 6.5

#set up w - this is a weight. 
#We're going to scale our equation by the weight
w = 1

#set up the initial equation
eq = -1.5/w + (2/w)*x1 + (-1/w)*x2 

#look at the inital value
eq


#loop from 1 to 10. 
#We'll divide our betas by the weight. 
#Eq will still equal zero (when rounded):

for (w in 1:10){
  eq = -1.5/w + (2/w)*x1 + (-1/w)*x2 
  cat("Loop ", w, " eq = ", eq, "\n")
  
}

The formula for the li norm is:

||y|| = sqrt(beta1^2 + beta2^2)

In our case that is:

||y|| = sqrt(2^2 + -1^2)

||y|| = 1.73205080757

You can prove to yourself that the caveat is met by checking your work:

w = 1.73205080757
2/w
-1/w

(1.154701^2) +(-0.5773503^2)

The final hyperplane equation

Whew! You almost at the end. Now, we have our weight and we need to scale all three betas by our weight.

w = 1.73205080757

-1.5/w
2/w
-1/w

Our final hyperplane equation is:

-0.8660254 + 1.154701(x1) + -0.5773503(x2) = 0

If we fill in our known point (4,6.5) again, we can check our work:

#verify that this equals 0
round(-0.8660254 + 1.154701*x1 + -0.5773503*x2)

#verify that this equals 1
round((1.154701^2) + (-0.5773503^2))

How big is the margin

We have to do one final step to determine how big the margin is. Easy. We know what the points on the margin are. These are our support vectors above. Choose one. Let’s choose the the red circle at (4,7).

All we have to do is plug in this point on the margin and run our hyperplane equation again. Since this is a point above the hyperplane, we multiple the result by -1 to get the positive margin.

x1 = 4
x2 = 7

-1*(-0.8660254 + 1.154701*x1 + -0.5773503*x2)
## [1] 0.2886735