UDFs in Snowflake Snowpark

Spread the love

1. Introduction to UDFs

A Snowflake User-Defined Function (UDF) is a reusable component defined by the user to perform a specific task which can be called from a SQL statement. Similar to built-in functions, the user-defined functions can be called from a SQL repeatedly from multiple places in a code.

In our previous article, we discussed User Defined Functions(UDFs) in general within Snowflake. In this article, let’s explore how to create Python UDFs using Snowpark.

2. UDFs in Snowpark

Using Snowpark, you have the capability to generate User-Defined functions (UDFs) tailored to your custom functions. These UDFs can then be invoked to handle data processing within your DataFrame.

The Snowpark udf function in snowflake.snowpark.functions module registers a Python function as a Snowflake Python UDF in Snowflake and returns the UDF.

Syntax

Snowpark.functions.udf(
    func = <Python function used for creating UDF>
  , return_type = <Snowpark DataType object representing returned value of UDF>
  , input_types = [<List of DataType objects representing input parameters of UDF>]
  , is_permanent = <Create a Permanent UDF? True or False>
  , name = '<UDF name>'
  , replace = <Replace an existing UDF? True or False>
  , stage_location = '@<UDF stage name>'
)

3. Creating an Anonymous Temporary UDF in Snowpark

UDFs can be created as anonymous UDFs in Snowpark and assigned to a variable. As long as this variable is in scope, you can use this variable to call the UDF.

The following is an example of creating an anonymous UDF in Snowpark.

from snowflake.snowpark.functions import udf
from snowflake.snowpark.types import IntegerType

def addone(a:int) -> int:
    return a+1

add_one = udf(func=addone, return_type=IntegerType(), input_types=[IntegerType()])

Calling an anonymous UDF.

from snowflake.snowpark.functions import col

df = session.createDataFrame([1,2,3], schema=["a"])
df.show()
-------
|"A"  |
-------
|1    |
|2    |
|3    |
-------

df.select(col("a"), add_one(col("a")).alias("A_PLUS_ONE")).show()
----------------------
|"A"  |"A_PLUS_ONE"  |
----------------------
|1    |2             |
|2    |3             |
|3    |4             |
----------------------

4. Creating a Named Temporary UDF in Snowpark

Named Temporary UDFs can be created in Snowpark that are accessible in the same session. Instead of calling UDF as function, it can be also used as a decorator.

The following is an example of creating UDF with type hints and @udf decorator.

@udf
def udf_addone(a:int) -> int:
    return a+1

df = session.createDataFrame([1,2,3], schema=["a"])
df.select(col("a"), udf_addone(col("a")).alias("A_PLUS_ONE")).show()
----------------------
|"A"  |"A_PLUS_ONE"  |
----------------------
|1    |2             |
|2    |3             |
|3    |4             |
----------------------

When type hints are provided and are complete for a function, return_type and input_types parameters are optional in udf function and will be ignored.

5. Creating a Permanent UDF in Snowpark

A Permanent UDF registers functions as UDFs in the Snowflake database. When you create a permanent UDF, it is mandatory to set the is_permanent argument to True and the stage_location argument to the stage location where the Python file for the UDF and its dependencies are uploaded.

A Permanent UDF can be created in Snowpark using any of the below methods.

  • The udf function, in the snowflake.snowpark.functions module, with the name argument.
  • The register method, in the UDFRegistration class, with the name argument.

5.1. Creating a Permanent UDF using udf function in Snowpark

The following is an example of creating a Permanent UDF using udf function.

# Create a stage if it doesn't already exist
session.sql("create or replace stage udf_stage")

# Register UDF in Snowflake
@udf(name="udf_minusone", return_type=IntegerType(), input_types=[IntegerType()], replace=True, is_permanent=True, stage_location='@udf_stage')
def udf_minusone(a:int) -> int:
    return a-1

# Call UDF in a SQL statement
session.sql("select udf_minusone(100)").show()
-----------------------
|"UDF_MINUSONE(100)"  |
-----------------------
|99                   |
-----------------------

5.2. Creating a Permanent UDF using the register method in Snowpark

The following is an example of creating a Permanent UDF using the register function.

# Create a Python Function
def A_PLUS_B(a:int,b:int) -> int:
    return a+b

# Register UDF in Snowflake
session.udf.register(
      func=A_PLUS_B
    , name="udf_a_plus_b"
    , return_type=IntegerType()
    , input_types=[IntegerType(), IntegerType()]
    , replace=True
    , is_permanent=True
    , stage_location='@udf_stage')

# Call UDF in a SQL statement
session.sql("select udf_a_plus_b(100,10)").show()
--------------------------
|"UDF_A_PLUS_B(100,10)"  |
--------------------------
|110                     |
--------------------------

6. Creating a UDF from a Python source file

The register_from_file method, in the UDFRegistration class, registers a Python function from a Python or zip file as a Snowflake Python UDF. Apart from file_path and func_name, the input arguments of this method are the same as the register method.

Consider below are the contents of the Python file even_odd.py in your local environment.

def even_odd(a:int) -> str:
    if a % 2 == 0:
        return "EVEN"
    else:
        return "ODD"

The following code creates a UDF from the file even_odd.py using the even_odd function present inside it.

udf_even_odd = session.udf.register_from_file(
    file_path="D:\Snowflake\Snowpark\Resources\even_odd.py",
    func_name="even_odd"
)

df = session.createDataFrame([1,2,3], schema=["a"])
df.select(col("a"), udf_even_odd(col("a")).alias("NUM_TYPE")).show()
--------------------
|"A"  |"NUM_TYPE"  |
--------------------
|1    |ODD         |
|2    |EVEN        |
|3    |ODD         |
--------------------

7. Creating a UDF from a Python file on an Internal stage

A Permanent UDF can be registered in Snowflake using a Python file from your local development environment. Initially, the file should be imported to an Internal Stage, and then, using the register_from_file method, the function defined in the Python file can be registered as Snowflake Python UDF.

The following code uploads a Python file from the local environment and registers a UDF in Snowflake.

from snowflake.snowpark.types import IntegerType, StringType

# Placing Python file from local environment into Snowflake Internal Stage
session.file.put("D:\Snowflake\Snowpark\Resources\even_odd.py", "@udf_stage", auto_compress=False)
[PutResult(source='even_odd.py', target='even_odd.py', source_size=103, target_size=112, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

# Registering a UDF using the Imported Python file
udf_evenodd = session.udf.register_from_file(
    file_path="@udf_stage/even_odd.py",
    func_name="even_odd",
    return_type=StringType(),
    input_types=[IntegerType()]
)

# Call UDF in a Select 
df.select(col("a"), udf_evenodd(col("a")).alias("NUM_TYPE")).show()
--------------------
|"A"  |"NUM_TYPE"  |
--------------------
|1    |ODD         |
|2    |EVEN        |
|3    |ODD         |
--------------------

8. Creating a UDF using Third-Party Packages

Third-party Python packages that are not part of the standard inbuilt set can be used while creating UDFs using Snowpark. The Session.add_packages() method can be used to add packages for UDFs in Snowpark.

Snowflake has partnered with Anaconda when developing Snowpark for Python functionality. The advantage of this is that Anaconda-provided Python packages can be accessed out of the box inside Snowflake virtual warehouses. To use Anaconda-provided packages, you must enable the packages by accepting to the terms in the Admin dashboard.

The following code imports dateparser library to parse the date values available in different formats.

# Import the required modules
import dateparser

# Define main function that parses date values
def date_parser(a:str):
    return dateparser.parse(a)

# Add packages and data types
from snowflake.snowpark.types import StringType
session.add_packages('dateparser')

# Register UDF in Snowflake
session.udf.register(
	  func=date_parser
	, name="udf_dateparser"
	, return_type=StringType()
	, input_types=[StringType()]
	, replace=True
	, is_permanent=True
	, stage_location='@udf_stage')

# Call UDF in a SQL statement
session.sql("select udf_dateparser('12/12/12')").show()
--------------------------------
|"UDF_DATEPARSER('12/12/12')"  |
--------------------------------
|2012-12-12 00:00:00           |
--------------------------------

session.sql("select udf_dateparser('Wed, 13 Mar 2024 10:55:50')").show()
-------------------------------------------------
|"UDF_DATEPARSER('WED, 13 MAR 2024 10:55:50')"  |
-------------------------------------------------
|2024-03-13 10:55:50                            |
-------------------------------------------------

Subscribe to our Newsletter !!

Related Articles:

Leave a Comment

Related Posts