HOW TO: Merge two DataFrames in Snowflake Snowpark?

Spread the love

1. Introduction

The Table.merge() method in Snowpark allows merging the contents of a Snowpark table object with a DataFrame source based on a specified join condition. It supports INSERT, UPDATE, and DELETE actions on the table object’s contents using matched or not-matched clauses.

The method returns a MergeResult tuple, representing the number of rows inserted, updated, and deleted by the merge action.

Syntax

The following is the syntax for merging the contents of a table using Table.Merge() method in Snowpark.

  • The source is a DataFrame to join with the target table. It can also be another table.
  • The join condition represents the expression on which the table and source are joined.
  • A list of matched and not-matched clauses specifies the actions (Insert, Update, or Delete) to take when the values from this table and the source either match or do not match on the join condition.
target_table.merge(
    source_df, # Source 
    ( # Join Condition
        (target_table["join_column1"] == source_df["join_column1"]) & 
        (target_table["join_column2"] == source_df["join_column2"])
    ),
    [ # Insert, Update and Delete operations using matched and not-matched clauses
        when_matched().update({
            "target_column1": source_df["source_column1"], 
            "target_column2": source_df["source_column2"]
        }),
        when_not_matched().insert({
            "target_column1": source_df["source_column1"], 
            "target_column2": source_df["source_column2"]
        }),
        when_matched().delete() 
    ]
)

2. Steps to Merge two DataFrames in Snowpark

Follow the below steps to Merge data into a table using Table.merge() method in Snowpark.

  1. Create two DataFrames: one for the source data and one for the target data to be merged, using Session.createDataFrame().
    The DataFrames can be built based on an existing table, data read from a CSV file, or data created within the code.
  2. Create a temporary table in Snowflake with the contents of the target DataFrame using the save_as_table() method.
  3. Then, create a Snowpark Table object to read the contents of the temporary table using the Session.table() method.
  4. Use the Table.merge() method to update the contents of the target table by merging with the source DataFrame.
  5. Finally, display the contents of the Table object using the show() method to verify that records have been inserted, updated, or deleted as expected.

3. Demonstration of Merging two DataFrames in Snowflake

3.1. Create Source and Target DataFrames

The following code creates source and target DataFrames to demonstrate the table merge operation.

# Target DataFrame
employee_data = [
    [1,'TONY',24000],
    [2,'STEVE',17000],
    [3,'BRUCE',9000],
    [4,'WANDA',20000]
]

employee_schema = ["EMP_ID", "EMP_NAME", "SALARY"]

df_emp = session.createDataFrame(employee_data, schema=employee_schema)
df_emp.show()

Output:

------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |
------------------------------------
|1         |TONY        |24000     |
|2         |STEVE       |17000     |
|3         |BRUCE       |9000      |
|4         |WANDA       |20000     |
------------------------------------
# Source DataFrame
employee_raw_data = [
    [1,'TONY',25000],
    [3,'BRUCE',10000],
    [5,'PETER',15000]
]

df_emp_raw = session.createDataFrame(employee_raw_data, schema=employee_schema)
df_emp_raw.show()

Output:

------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |
------------------------------------
|1         |TONY        |25000     |
|3         |BRUCE       |10000     |
|5         |PETER       |15000     |
------------------------------------

3.2. Create a Temporary Table

The following code creates a temporary table named tmp_emp in the Snowflake database using the contents of df_emp target DataFrame.

# Create temporary table for Target DataFrame
df_emp.write.mode("overwrite").save_as_table("tmp_emp", table_type="temp")

3.3. Create a Snowpark Table Object

The following code creates a new DataFrame df_tmp_emp which reads the contents of temporary table tmp_emp using session.table() method.

# Read the temp table as Snowpark Table object
df_tmp_emp = session.table("tmp_emp")

3.4. Perform the Merge

The following code merges the contents of the df_emp_raw DataFrame into the df_tmp_emp temporary table in Snowflake.

  • It matches rows based on the EMP_ID column and updates EMP_NAME and SALARY in tmp_emp when a match is found.
  • If no match is found, it inserts new rows from df_emp_raw into tmp_emp with EMP_ID, EMP_NAME, and SALARY values.
# Merge contents of source DataFrame into temporary table
from snowflake.snowpark.functions import when_matched, when_not_matched

df_tmp_emp.merge(
    df_emp_raw, 
    (df_tmp_emp["EMP_ID"] == df_emp_raw["EMP_ID"]),
    [
        when_matched().update({
            "EMP_NAME": df_emp_raw["EMP_NAME"], 
            "SALARY": df_emp_raw["SALARY"]
        }), 
        when_not_matched().insert({
            "EMP_ID": df_emp_raw["EMP_ID"], 
            "EMP_NAME": df_emp_raw["EMP_NAME"],
            "SALARY": df_emp_raw["SALARY"]
        })
    ]
)

# output: MergeResult(rows_inserted=1, rows_updated=2, rows_deleted=0)

2.5. Verify Results

The following code displays the contents of the updated DataFrame.

df_tmp_emp.show()

Output:

------------------------------------
|"EMP_ID"  |"EMP_NAME"  |"SALARY"  |
------------------------------------
|5         |PETER       |15000     |
|2         |STEVE       |17000     |
|1         |TONY        |25000     |
|3         |BRUCE       |10000     |
|4         |WANDA       |20000     |
------------------------------------

4. Merging two Snowflake Tables using Snowpark

When both the source and target are Snowflake tables, it is possible to merge data directly between them using the Table.merge() method in Snowpark. This approach simplifies the merging process by eliminating the need for intermediate temporary tables.

The following code merges the employees_raw table with the employees table using the Table.merge() method.

#// Reading source and target tables // 
emp = session.table("employees")
emp_raw = session.table("employees_raw")

#// Performing Merge between the tables //
emp.merge(
    emp_raw, 
    (emp["EMP_ID"] == emp_raw["EMP_ID"]),
    [
        when_matched().update({
            "EMP_NAME": emp_raw["EMP_NAME"], 
            "SALARY": emp_raw["SALARY"]
        }), 
        when_not_matched().insert({
            "EMP_ID": emp_raw["EMP_ID"], 
            "EMP_NAME": emp_raw["EMP_NAME"],
            "SALARY": emp_raw["SALARY"]
        })
    ]
)

5. Summary

The Table.merge() method in Snowpark allows you to perform efficient data merging operations between:

  • Two Snowpark DataFrames
  • Two Snowflake tables
  • A Snowflake table and a DataFrame

By defining matched and not-matched clauses, it helps you control how data is synchronized between the source and target tables enabling you to handle scenarios such as upserts and deletes in a single step.

Subscribe to our Newsletter !!

Related Articles:

Leave a Comment

Related Posts