1. Introduction to Window Functions
A Window Function performs a calculation across a set of table rows that are somehow related to the current row and returns a single aggregated value for each row. Unlike regular aggregate functions like SUM() or AVG(), window functions do not group rows into a single output row. Instead, they compute a value for each row based on a specific window of rows.
Refer to our previous article to learn more about Window Functions in SQL, including examples to help you understand them better.
In this article, let us explore how to implement Window Functions on DataFrames in Snowflake Snowpark.
2. Window Functions in Snowpark
The Window class in Snowpark enables defining a WindowSpec (or Window Specification) that determines which rows are included in a window. A window is a group of rows that are associated with the current row by some relation.
Syntax
The following is the syntax to form a WindowSpec in Snowpark.
Window.<partitionBy_specification>.<orderBy_specification>.<windowFrame_specification>
- PartitionBy Specification
The Partition specification defines which rows are included in a window (partition). If no partition is defined, all the rows are included in a single partition.
- OrderBy Specification
The Ordering specification determines the ordering of the rows within the window. The ordering could be ascending (ASC) or descending (DESC).
- WindowFrame Specification
The Window Frame specification defines the subset of rows within a partition over which the window function operates. It determines the range of rows to consider for each row’s computation within the window partition.
A Window Function is formed by passing the WindowSpec to the aggregate functions (like SUM(), AVG(), etc.) using the OVER clause.
<aggregate_function>(<arguments>).over(<windowSpec>)
To know the full list of functions that support Windows, refer to Snowflake Documentation.
3. Demonstration of Window Functions in Snowpark
Consider the EMPLOYEES data below for the demonstration of the Window Functions in Snowpark.
#// creating dataframe with employee data
employee_data = [
[1,'TONY',24000,101],
[2,'STEVE',17000,101],
[3,'BRUCE',9000,101],
[4,'WANDA',20000,102],
[5,'VICTOR',12000,102],
[6,'STEPHEN',10000,103],
[7,'HANK',15000,103],
[8,'THOR',21000,103]
]
employee_schema = ["EMP_ID", "EMP_NAME", "SALARY", "DEPT_ID"]
df_emp =session.createDataFrame(employee_data, schema=employee_schema)
df_emp.show()
------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |
------------------------------------------------
|1 |TONY |24000 |101 |
|2 |STEVE |17000 |101 |
|3 |BRUCE |9000 |101 |
|4 |WANDA |20000 |102 |
|5 |VICTOR |12000 |102 |
|6 |STEPHEN |10000 |103 |
|7 |HANK |15000 |103 |
|8 |THOR |21000 |103 |
------------------------------------------------
3.1. Find the Employees with Highest salary in each Department
Follow the below steps to find the details of Employees with the highest salary in each Department using Window Functions in Snowpark.
STEP-1: Import all the necessary Snowpark libraries and create a WindowSpec
The following code creates a WindowSpec where a partition is created based on the DEPT_ID field and the rows within each partition are ordered by SALARY in descending order.
#// Importing Snowpark Libraries
from snowflake.snowpark import Window
from snowflake.snowpark.functions import row_number, desc, col, min
#// creating a WindowSpec
windowSpec = Window.partitionBy("DEPT_ID").orderBy(col("SALARY").desc())
STEP-2: Create a Window Function that Ranks Employees by Salary within each Department
The following code creates a new field using a Window Function that assigns ranks to employees based on their salaries within each department.
df_emp.withColumn("RANK", row_number().over(windowSpec)).show()
---------------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |"RANK" |
---------------------------------------------------------
|4 |WANDA |20000 |102 |1 |
|5 |VICTOR |12000 |102 |2 |
|1 |TONY |24000 |101 |1 |
|2 |STEVE |17000 |101 |2 |
|3 |BRUCE |9000 |101 |3 |
|8 |THOR |21000 |103 |1 |
|7 |HANK |15000 |103 |2 |
|6 |STEPHEN |10000 |103 |3 |
---------------------------------------------------------
In the above code, we have used the DataFrame.withColumn() method that returns a DataFrame with an additional column with the specified column name computed using the specified expression.
In this scenario, the method returns all the columns of the DataFrame df_emp along with a new field named RANK computed based on the Window Function passed as an expression.
Alternatively, the DataFrame.select() method can be used to achieve the same output as shown below.
df_emp.select("*", row_number().over(windowSpec).alias("RANK")).show()
STEP-3: Filter the records with the Highest Salary in each Department
The following code filters the records with RANK value 1 and sorts the records based on the DEPT_ID.
df_emp.withColumn("RANK", row_number().over(windowSpec)).filter(col("RANK")==1).sort("DEPT_ID").show()
---------------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |"RANK" |
---------------------------------------------------------
|1 |TONY |24000 |101 |1 |
|4 |WANDA |20000 |102 |1 |
|8 |THOR |21000 |103 |1 |
---------------------------------------------------------
When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement will be equivalent to the following query.
SELECT * FROM(
SELECT
EMP_ID, EMP_NAME, SALARY, DEPT_ID,
ROW_NUMBER() OVER (PARTITION BY DEPT_ID ORDER BY SALARY DESC) AS RANK
FROM EMPLOYEES
)
WHERE RANK = 1
ORDER BY DEPT_ID ;
3.2. Calculate Total Sum of Salary for each Department and display it alongside each employee’s record
The following code calculates the total Sum of the Salary for each Department and displays it alongside each employee record using Window Functions in Snowpark.
windowSpec = Window.partitionBy("DEPT_ID")
df_emp.withColumn("TOTAL_SAL", sum("SALARY").over(windowSpec)).show()
--------------------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |"TOTAL_SAL" |
--------------------------------------------------------------
|1 |TONY |24000 |101 |50000 |
|2 |STEVE |17000 |101 |50000 |
|3 |BRUCE |9000 |101 |50000 |
|4 |WANDA |20000 |102 |32000 |
|5 |VICTOR |12000 |102 |32000 |
|6 |STEPHEN |10000 |103 |46000 |
|7 |HANK |15000 |103 |46000 |
|8 |THOR |21000 |103 |46000 |
--------------------------------------------------------------
The above Snowpark code is equivalent to the following SQL query.
SELECT
EMP_ID, EMP_NAME, SALARY, DEPT_ID,
SUM(SALARY) OVER (PARTITION BY DEPT_ID) AS TOTAL_SAL
FROM EMPLOYEES
;
3.3. Calculate the Cumulative Sum of Salary for each Department
The following code calculates the total Cumulative Sum of the Salary for each Department and displays it alongside each employee record using Window Functions in Snowpark.
windowSpec = Window.partitionBy("DEPT_ID").orderBy(col("EMP_ID"))
df_emp.withColumn("CUM_SAL", sum("SALARY").over(windowSpec)).show()
------------------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |"CUM_SAL" |
------------------------------------------------------------
|1 |TONY |24000 |101 |24000 |
|2 |STEVE |17000 |101 |41000 |
|3 |BRUCE |9000 |101 |50000 |
|4 |WANDA |20000 |102 |20000 |
|5 |VICTOR |12000 |102 |32000 |
|6 |STEPHEN |10000 |103 |10000 |
|7 |HANK |15000 |103 |25000 |
|8 |THOR |21000 |103 |46000 |
------------------------------------------------------------
The above Snowpark code is equivalent to the following SQL query.
SELECT
EMP_ID, EMP_NAME, SALARY, DEPT_ID,
SUM(SALARY) OVER (PARTITION BY DEPT_ID ORDER BY EMP_ID) AS CUM_SAL
FROM EMPLOYEES
;
3.4. Calculate the Minimum Salary Between the Current Employee and the one Following for each Department
The following code calculates the minimum salary between the current employee and the one following for each department using Window Functions in Snowpark.
windowSpec = Window.partitionBy("DEPT_ID").orderBy(col("EMP_ID")).rows_between(Window.currentRow,1)
df_emp.withColumn("MIN_SAL", min("SALARY").over(windowSpec)).sort("EMP_ID").show()
------------------------------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |"DEPT_ID" |"MIN_SAL" |
------------------------------------------------------------
|1 |TONY |24000 |101 |17000 |
|2 |STEVE |17000 |101 |9000 |
|3 |BRUCE |9000 |101 |9000 |
|4 |WANDA |20000 |102 |12000 |
|5 |VICTOR |12000 |102 |12000 |
|6 |STEPHEN |10000 |103 |10000 |
|7 |HANK |15000 |103 |15000 |
|8 |THOR |21000 |103 |21000 |
------------------------------------------------------------
The above Snowpark code is equivalent to the following SQL query where the salary of the current employee record is compared with the next employee record.
SELECT
EMP_ID, EMP_NAME, SALARY, DEPT_ID,
MIN(SALARY) OVER (PARTITION BY DEPT_ID ORDER BY EMP_ID ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS MIN_SAL
FROM EMPLOYEES
ORDER BY EMP_ID
;
Window Frames require the data within the window to be ordered. So, even though the ORDER BY clause is optional in regular window function syntax, it is mandatory in window frame syntax.
Subscribe to our Newsletter !!
Related Articles:
- Introduction to Snowflake Snowpark for Python
- HOW TO: Create and Read Data from Snowflake Snowpark DataFrames?
- HOW TO: Write data into Snowflake from a Snowpark DataFrame?
- HOW TO: COPY Data from CSV Files INTO Snowflake Table using Snowpark?
- HOW TO: Add a New Column to a Snowpark DataFrame?
- HOW TO: Drop a Column from a Snowpark DataFrame?
- HOW TO: Remove Duplicates in a Snowflake Snowpark DataFrame?
- HOW TO: Update a DataFrame in Snowflake Snowpark?
- HOW TO: Merge two DataFrames in Snowflake Snowpark?
- HOW TO: Execute SQL Statements in Snowflake Snowpark?
- Aggregate Functions in Snowflake Snowpark
- GROUP BY in Snowflake Snowpark
- Joins in Snowflake Snowpark
- IN Operator in Snowflake Snowpark
- Window Functions in Snowflake Snowpark
- CASE Statement in Snowflake Snowpark
- UDFs in Snowflake Snowpark