1. Introduction
The Table.merge() method in Snowpark allows merging the contents of a Snowpark table object with a DataFrame source based on a specified join condition. It supports INSERT, UPDATE, and DELETE actions on the table object’s contents using matched or not-matched clauses.
The method returns a MergeResult tuple, representing the number of rows inserted, updated, and deleted by the merge action.
Syntax
The following is the syntax for merging the contents of a table using Table.Merge() method in Snowpark.
- The source is a DataFrame to join with the target table. It can also be another table.
- The join condition represents the expression on which the table and source are joined.
- A list of matched and not-matched clauses specifies the actions (Insert, Update, or Delete) to take when the values from this table and the source either match or do not match on the join condition.
target_table.merge(
source_df, # Source
( # Join Condition
(target_table["join_column1"] == source_df["join_column1"]) &
(target_table["join_column2"] == source_df["join_column2"])
),
[ # Insert, Update and Delete operations using matched and not-matched clauses
when_matched().update({
"target_column1": source_df["source_column1"],
"target_column2": source_df["source_column2"]
}),
when_not_matched().insert({
"target_column1": source_df["source_column1"],
"target_column2": source_df["source_column2"]
}),
when_matched().delete()
]
)
2. Steps to Merge two DataFrames in Snowpark
Follow the below steps to Merge data into a table using Table.merge() method in Snowpark.
- Create two DataFrames: one for the source data and one for the target data to be merged, using Session.createDataFrame().
The DataFrames can be built based on an existing table, data read from a CSV file, or data created within the code. - Create a temporary table in Snowflake with the contents of the target DataFrame using the save_as_table() method.
- Then, create a Snowpark Table object to read the contents of the temporary table using the Session.table() method.
- Use the Table.merge() method to update the contents of the target table by merging with the source DataFrame.
- Finally, display the contents of the Table object using the show() method to verify that records have been inserted, updated, or deleted as expected.
3. Demonstration of Merging two DataFrames in Snowflake
3.1. Create Source and Target DataFrames
The following code creates source and target DataFrames to demonstrate the table merge operation.
# Target DataFrame
employee_data = [
[1,'TONY',24000],
[2,'STEVE',17000],
[3,'BRUCE',9000],
[4,'WANDA',20000]
]
employee_schema = ["EMP_ID", "EMP_NAME", "SALARY"]
df_emp = session.createDataFrame(employee_data, schema=employee_schema)
df_emp.show()
Output:
------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |
------------------------------------
|1 |TONY |24000 |
|2 |STEVE |17000 |
|3 |BRUCE |9000 |
|4 |WANDA |20000 |
------------------------------------
# Source DataFrame
employee_raw_data = [
[1,'TONY',25000],
[3,'BRUCE',10000],
[5,'PETER',15000]
]
df_emp_raw = session.createDataFrame(employee_raw_data, schema=employee_schema)
df_emp_raw.show()
Output:
------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |
------------------------------------
|1 |TONY |25000 |
|3 |BRUCE |10000 |
|5 |PETER |15000 |
------------------------------------
3.2. Create a Temporary Table
The following code creates a temporary table named tmp_emp in the Snowflake database using the contents of df_emp target DataFrame.
# Create temporary table for Target DataFrame
df_emp.write.mode("overwrite").save_as_table("tmp_emp", table_type="temp")
3.3. Create a Snowpark Table Object
The following code creates a new DataFrame df_tmp_emp which reads the contents of temporary table tmp_emp using session.table() method.
# Read the temp table as Snowpark Table object
df_tmp_emp = session.table("tmp_emp")
3.4. Perform the Merge
The following code merges the contents of the df_emp_raw DataFrame into the df_tmp_emp temporary table in Snowflake.
- It matches rows based on the
EMP_ID
column and updatesEMP_NAME
andSALARY
in tmp_emp when a match is found. - If no match is found, it inserts new rows from df_emp_raw into tmp_emp with
EMP_ID
,EMP_NAME
, andSALARY
values.
# Merge contents of source DataFrame into temporary table
from snowflake.snowpark.functions import when_matched, when_not_matched
df_tmp_emp.merge(
df_emp_raw,
(df_tmp_emp["EMP_ID"] == df_emp_raw["EMP_ID"]),
[
when_matched().update({
"EMP_NAME": df_emp_raw["EMP_NAME"],
"SALARY": df_emp_raw["SALARY"]
}),
when_not_matched().insert({
"EMP_ID": df_emp_raw["EMP_ID"],
"EMP_NAME": df_emp_raw["EMP_NAME"],
"SALARY": df_emp_raw["SALARY"]
})
]
)
# output: MergeResult(rows_inserted=1, rows_updated=2, rows_deleted=0)
2.5. Verify Results
The following code displays the contents of the updated DataFrame.
df_tmp_emp.show()
Output:
------------------------------------
|"EMP_ID" |"EMP_NAME" |"SALARY" |
------------------------------------
|5 |PETER |15000 |
|2 |STEVE |17000 |
|1 |TONY |25000 |
|3 |BRUCE |10000 |
|4 |WANDA |20000 |
------------------------------------
4. Merging two Snowflake Tables using Snowpark
When both the source and target are Snowflake tables, it is possible to merge data directly between them using the Table.merge() method in Snowpark. This approach simplifies the merging process by eliminating the need for intermediate temporary tables.
The following code merges the employees_raw
table with the employees
table using the Table.merge() method.
#// Reading source and target tables //
emp = session.table("employees")
emp_raw = session.table("employees_raw")
#// Performing Merge between the tables //
emp.merge(
emp_raw,
(emp["EMP_ID"] == emp_raw["EMP_ID"]),
[
when_matched().update({
"EMP_NAME": emp_raw["EMP_NAME"],
"SALARY": emp_raw["SALARY"]
}),
when_not_matched().insert({
"EMP_ID": emp_raw["EMP_ID"],
"EMP_NAME": emp_raw["EMP_NAME"],
"SALARY": emp_raw["SALARY"]
})
]
)
5. Summary
The Table.merge() method in Snowpark allows you to perform efficient data merging operations between:
- Two Snowpark DataFrames
- Two Snowflake tables
- A Snowflake table and a DataFrame
By defining matched and not-matched clauses, it helps you control how data is synchronized between the source and target tables enabling you to handle scenarios such as upserts and deletes in a single step.
Subscribe to our Newsletter !!
Related Articles:
- Introduction to Snowflake Snowpark for Python
- HOW TO: Create and Read Data from Snowflake Snowpark DataFrames?
- HOW TO: Write data into Snowflake from a Snowpark DataFrame?
- HOW TO: COPY Data from CSV Files INTO Snowflake Table using Snowpark?
- HOW TO: Add a New Column to a Snowpark DataFrame?
- HOW TO: Drop a Column from a Snowpark DataFrame?
- HOW TO: Remove Duplicates in a Snowflake Snowpark DataFrame?
- HOW TO: Update a DataFrame in Snowflake Snowpark?
- HOW TO: Delete Rows From a DataFrame in Snowflake Snowpark?
- HOW TO: Merge two DataFrames in Snowflake Snowpark?
- HOW TO: Execute SQL Statements in Snowflake Snowpark?
- Aggregate Functions in Snowflake Snowpark
- GROUP BY in Snowflake Snowpark
- Joins in Snowflake Snowpark
- IN Operator in Snowflake Snowpark
- Window Functions in Snowflake Snowpark
- CASE Statement in Snowflake Snowpark
- UDFs in Snowflake Snowpark