Window Functions in Snowflake Snowpark

Spread the love

1. Introduction to Window Functions

A Window Function performs a calculation across a set of table rows that are somehow related to the current row and returns a single aggregated value for each row. Unlike regular aggregate functions like SUM() or AVG(), window functions do not group rows into a single output row. Instead, they compute a value for each row based on a specific window of rows.

Refer to our previous article to learn more about Window Functions in SQL, including examples to help you understand them better.

In this article, let us explore how to implement Window Functions on DataFrames in Snowflake Snowpark.

2. Window Functions in Snowpark

The Window class in Snowpark enables defining a WindowSpec (or Window Specification) that determines which rows are included in a window. A window is a group of rows that are associated with the current row by some relation.

Syntax

The following is the syntax to form a WindowSpec in Snowpark.

Window.<partitionBy_specification>.<orderBy_specification>.<windowFrame_specification>
  • PartitionBy Specification
    The Partition specification defines which rows are included in a window (partition). If no partition is defined, all the rows are included in a single partition.
  • OrderBy Specification
    The Ordering specification determines the ordering of the rows within the window. The ordering could be ascending (ASC) or descending (DESC).
  • WindowFrame Specification
    The Window Frame specification defines the subset of rows within a partition over which the window function operates. It determines the range of rows to consider for each row’s computation within the window partition.

A Window Function is formed by passing the WindowSpec to the aggregate functions (like SUM(), AVG(), etc.) using the OVER clause.

<aggregate_function>(<arguments>).over(<windowSpec>)

To know the full list of functions that support Windows, refer to Snowflake Documentation.

3. Demonstration of Window Functions in Snowpark

Consider the EMPLOYEES data below for the demonstration of the Window Functions in Snowpark.

#// creating dataframe with employee data

employee_data = [
    [1,'TONY',24000,101],
    [2,'STEVE',17000,101],
    [3,'BRUCE',9000,101],
    [4,'WANDA',20000,102],
    [5,'VICTOR',12000,102],
    [6,'STEPHEN',10000,103],
    [7,'HANK',15000,103],
    [8,'THOR',21000,103]
]

employee_schema = ["EMP_ID", "EMP_NAME", "SALARY", "DEPT_ID"]

df_emp =session.createDataFrame(employee_data, schema=employee_schema)
df_emp.show()

------------------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |"DEPT_ID"  |
------------------------------------------------
|1         |TONY        |24000     |101        |
|2         |STEVE       |17000     |101        |
|3         |BRUCE       |9000      |101        |
|4         |WANDA       |20000     |102        |
|5         |VICTOR      |12000     |102        |
|6         |STEPHEN     |10000     |103        |
|7         |HANK        |15000     |103        |
|8         |THOR        |21000     |103        |
------------------------------------------------

3.1. Find the Employees with Highest salary in each Department

Follow the below steps to find the details of Employees with the highest salary in each Department using Window Functions in Snowpark.

STEP-1: Import all the necessary Snowpark libraries and create a WindowSpec

The following code creates a WindowSpec where a partition is created based on the DEPT_ID field and the rows within each partition are ordered by SALARY in descending order.

#// Importing Snowpark Libraries
from snowflake.snowpark import Window
from snowflake.snowpark.functions import row_number, desc, col, min

#// creating a WindowSpec
windowSpec = Window.partitionBy("DEPT_ID").orderBy(col("SALARY").desc())

STEP-2: Create a Window Function that Ranks Employees by Salary within each Department

The following code creates a new field using a Window Function that assigns ranks to employees based on their salaries within each department.

df_emp.withColumn("RANK", row_number().over(windowSpec)).show()

---------------------------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |"DEPT_ID"  |"RANK"  |
---------------------------------------------------------
|4         |WANDA       |20000     |102        |1       |
|5         |VICTOR      |12000     |102        |2       |
|1         |TONY        |24000     |101        |1       |
|2         |STEVE       |17000     |101        |2       |
|3         |BRUCE       |9000      |101        |3       |
|8         |THOR        |21000     |103        |1       |
|7         |HANK        |15000     |103        |2       |
|6         |STEPHEN     |10000     |103        |3       |
---------------------------------------------------------

In the above code, we have used the DataFrame.withColumn() method that returns a DataFrame with an additional column with the specified column name computed using the specified expression.

In this scenario, the method returns all the columns of the DataFrame df_emp along with a new field named RANK computed based on the Window Function passed as an expression.

Alternatively, the DataFrame.select() method can be used to achieve the same output as shown below.

df_emp.select("*", row_number().over(windowSpec).alias("RANK")).show()

STEP-3: Filter the records with the Highest Salary in each Department

The following code filters the records with RANK value 1 and sorts the records based on the DEPT_ID.

df_emp.withColumn("RANK", row_number().over(windowSpec)).filter(col("RANK")==1).sort("DEPT_ID").show()

---------------------------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |"DEPT_ID"  |"RANK"  |
---------------------------------------------------------
|1         |TONY        |24000     |101        |1       |
|4         |WANDA       |20000     |102        |1       |
|8         |THOR        |21000     |103        |1       |
---------------------------------------------------------

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement will be equivalent to the following query.

SELECT * FROM(
  SELECT
    EMP_ID, EMP_NAME, SALARY, DEPT_ID,
    ROW_NUMBER() OVER (PARTITION BY DEPT_ID ORDER BY SALARY DESC) AS RANK
  FROM EMPLOYEES
)
WHERE RANK = 1 
ORDER BY DEPT_ID ;

3.2. Calculate Total Sum of Salary for each Department and display it alongside each employee’s record

The following code calculates the total Sum of the Salary for each Department and displays it alongside each employee record using Window Functions in Snowpark.

windowSpec = Window.partitionBy("DEPT_ID")
df_emp.withColumn("TOTAL_SAL", sum("SALARY").over(windowSpec)).show()

--------------------------------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |"DEPT_ID"  |"TOTAL_SAL"  |
--------------------------------------------------------------
|1         |TONY        |24000     |101        |50000        |
|2         |STEVE       |17000     |101        |50000        |
|3         |BRUCE       |9000      |101        |50000        |
|4         |WANDA       |20000     |102        |32000        |
|5         |VICTOR      |12000     |102        |32000        |
|6         |STEPHEN     |10000     |103        |46000        |
|7         |HANK        |15000     |103        |46000        |
|8         |THOR        |21000     |103        |46000        |
--------------------------------------------------------------

The above Snowpark code is equivalent to the following SQL query.

SELECT
   EMP_ID, EMP_NAME, SALARY, DEPT_ID,
   SUM(SALARY) OVER (PARTITION BY DEPT_ID) AS TOTAL_SAL
FROM EMPLOYEES
;

3.3. Calculate the Cumulative Sum of Salary for each Department

The following code calculates the total Cumulative Sum of the Salary for each Department and displays it alongside each employee record using Window Functions in Snowpark.

windowSpec = Window.partitionBy("DEPT_ID").orderBy(col("EMP_ID"))
df_emp.withColumn("CUM_SAL", sum("SALARY").over(windowSpec)).show()

------------------------------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |"DEPT_ID"  |"CUM_SAL"  |
------------------------------------------------------------
|1         |TONY        |24000     |101        |24000      |
|2         |STEVE       |17000     |101        |41000      |
|3         |BRUCE       |9000      |101        |50000      |
|4         |WANDA       |20000     |102        |20000      |
|5         |VICTOR      |12000     |102        |32000      |
|6         |STEPHEN     |10000     |103        |10000      |
|7         |HANK        |15000     |103        |25000      |
|8         |THOR        |21000     |103        |46000      |
------------------------------------------------------------

The above Snowpark code is equivalent to the following SQL query.

SELECT
   EMP_ID, EMP_NAME, SALARY, DEPT_ID,
   SUM(SALARY) OVER (PARTITION BY DEPT_ID ORDER BY EMP_ID) AS CUM_SAL
FROM EMPLOYEES
;

3.4. Calculate the Minimum Salary Between the Current Employee and the one Following for each Department

The following code calculates the minimum salary between the current employee and the one following for each department using Window Functions in Snowpark.

windowSpec = Window.partitionBy("DEPT_ID").orderBy(col("EMP_ID")).rows_between(Window.currentRow,1)
df_emp.withColumn("MIN_SAL", min("SALARY").over(windowSpec)).sort("EMP_ID").show()

------------------------------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |"DEPT_ID"  |"MIN_SAL"  |
------------------------------------------------------------
|1         |TONY        |24000     |101        |17000      |
|2         |STEVE       |17000     |101        |9000       |
|3         |BRUCE       |9000      |101        |9000       |
|4         |WANDA       |20000     |102        |12000      |
|5         |VICTOR      |12000     |102        |12000      |
|6         |STEPHEN     |10000     |103        |10000      |
|7         |HANK        |15000     |103        |15000      |
|8         |THOR        |21000     |103        |21000      |
------------------------------------------------------------

The above Snowpark code is equivalent to the following SQL query where the salary of the current employee record is compared with the next employee record.

SELECT
   EMP_ID, EMP_NAME, SALARY, DEPT_ID,
   MIN(SALARY) OVER (PARTITION BY DEPT_ID ORDER BY EMP_ID ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS MIN_SAL
FROM EMPLOYEES
ORDER BY EMP_ID
;

Window Frames require the data within the window to be ordered. So, even though the ORDER BY clause is optional in regular window function syntax, it is mandatory in window frame syntax.

Subscribe to our Newsletter !!

Related Articles:

Leave a Comment

Related Posts