HOW TO: Write data into Snowflake from a Snowpark DataFrame?

Spread the love

Introduction

Snowpark is a developer framework from Snowflake that allows developers to interact with Snowflake directly and build complex data pipelines using Python. In our previous articles, we have discussed what Snowpark DataFrames are and how to create and read data using Snowflake Snowpark DataFrames.

In this article, we will explore how to write data into Snowflake tables using Snowpark DataFrames.

How to Write data into Snowflake table from a Snowpark DataFrame?

DataFrameWriter class in Snowflake provides methods for writing data from a DataFrame to desired destinations within the Snowflake ecosystem.

To write data from DataFrame into a table:

  1. Create a DataFrame containing the data to be written into a Snowflake table.
  2. Create a DataFrameWriter object by calling the DataFrame.write property on the DataFrame.
  3. Specify the write mode by calling the mode() method on the DataFrameWriter object. This returns a new DataFrameWriter object that is configured with the specified mode.
  4. Call the save_as_table method on the DataFrameWriter object to save the contents of the DataFrame to a specified table.

Syntax:

DataFrame.write.mode(save_mode).save_as_table(table_name)

Methods:

mode(save_mode)

Sets the save mode of the DataFrameWriter. The supported values of the save_mode are:

  • “append”: Appends data from the DataFrame to the existing table. If the table does not exist, it creates a new one.
  • “overwrite”: Overwrite the existing table with the data from the DataFrame.
  • “errorifexists”: Throws an exception if the table already exists.
  • “ignore”: Ignore this operation if the table already exists.

save_as_table(table_name)

Writes the data to the specified table in a Snowflake database.

Demonstration

For demonstration purposes, let’s create a DataFrame by reading data from an existing Snowflake table. We will then transform the data retrieved from Snowflake and, ultimately, save the transformed data as a new table.

Refer to our previous article to learn how to establish a connection with a Snowflake database using Snowpark before starting to create the DataFrames.

In the below example, a DataFrame is created that reads data from a Snowflake table (“SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER“), filters rows where “C_NATIONKEY” is ‘15‘, and selects only the “C_CUSTKEY” and “C_NAME” columns.

from snowflake.snowpark.functions import col

df_customer = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER")
df_customer_filter = df_customer.filter(col("C_NATIONKEY")=='15')
df_customer_select = df_customer_filter.select(col("C_CUSTKEY"),col("C_NAME"))

OverWrite Data

The following code writes the contents of the df_customer_select DataFrame to the specified Snowflake table, overwriting the table’s existing data if it already exists.

customer_wrt = df_customer_select.write.mode("overwrite").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement is as follows:

--creates a new table overwriting the existing one if already exists
CREATE OR REPLACE TABLE SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER (
    "C_CUSTKEY" BIGINT NOT NULL,
    "C_NAME" STRING(25) NOT NULL
) AS SELECT * FROM (
    SELECT "C_CUSTKEY", "C_NAME" FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER WHERE ("C_NATIONKEY" = '15')
);

The following code confirms that the table is created and displays the count of records loaded into the CUSTOMER table.

session.table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER").count()

+----+
|5921|
+----+

Append Data

The following code appends the contents of the df_customer_select DataFrame to the specified Snowflake table, adding new records to the existing ones.

customer_wrt = df_customer_select.write.mode("append").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement is as follows:

--verifies if specified table already exists
show tables like 'CUSTOMER' in schema SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA

--Inserts data as table is existing already; otherwise creates the table and inserts data.
INSERT INTO SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER
SELECT "C_CUSTKEY", "C_NAME"
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER
WHERE ("C_NATIONKEY" = '15')

The following code displays the count of records in the CUSTOMER table. Since we have already created and inserted data in the preceding step, the record count indicates that data got appended.

session.table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER").count()

+------+
|11842 |
+------+

Ignore Data

The following code ignores the write operation if the specified table already exists.

customer_wrt = df_customer_select.write.mode("errorifexists").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement is as follows:

--creates table only if not already existing
CREATE TABLE IF NOT EXISTS SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER (
    "C_CUSTKEY" BIGINT NOT NULL,
    "C_NAME" STRING(25) NOT NULL
) AS
SELECT * FROM (
    SELECT "C_CUSTKEY", "C_NAME"
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER
    WHERE "C_NATIONKEY" = '15'
);

The following code displays the count of records in the CUSTOMER table. Since we have already created and inserted data in the preceding steps, the record count indicates that no data got written into the table.

session.table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER").count()

+------+
|11842 |
+------+

Throw Error

The following code throws an exception if the specified table already exists.

customer_wrt = df_customer_select.write.mode("errorifexists").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER")

// SQL compilation error: Object 'SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER' already exists.

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API and throws error. The resulting SQL statement and the result are as follows:

--"create table" is used instead of "create or replace"
CREATE TABLE SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER (
    "C_CUSTKEY" BIGINT NOT NULL,
    "C_NAME" STRING(25) NOT NULL
) AS SELECT * FROM (
    SELECT "C_CUSTKEY", "C_NAME" FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER WHERE ("C_NATIONKEY" = '15')
);

--SQL compilation error: Object 'SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.CUSTOMER' already exists.

How to Specify Table Type while Writing data into Snowflake from a Snowpark DataFrame?

By default, the save_as_table method creates a permanent table. To create tables of the temporary or transient type, include an additional parameter table_type along with the table_name.

The supported values of table_type are: temp, temporary, and transient.

Syntax:

DataFrame.write.mode(save_mode).save_as_table(table_name, table_type= "{,temp, temporary, and transient}")

The following code writes the contents of the df_customer_select DataFrame to a temporary table named TEMP_CUSTOMER.

customer_wrt = df_customer_select.write.mode("overwrite").save_as_table("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.TEMP_CUSTOMER" ,table_type="temp")

How to Create View from a Snowpark DataFrame?

To create a view from a DataFrame, call the create_or_replace_view method, which creates a new view immediately.

Syntax:

DataFrame.create_or_replace_view("view_name")

The following code creates a view named VW_CUSTOMER using the computation expressed by the df_customer_select DataFrame.

customer_view = df_customer_select.create_or_replace_view("SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.VW_CUSTOMER")

When executed, this code is translated and executed as SQL in Snowflake through the Snowpark API. The resulting SQL statement is as follows:

--creating view based on DataFrame expression
CREATE OR REPLACE VIEW SNOWPARK_DEMO_DB.SNOWPARK_DEMO_SCHEMA.VW_CUSTOMER AS
SELECT * FROM (
    SELECT "C_CUSTKEY", "C_NAME"
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER
    WHERE "C_NATIONKEY" = '15'
);

Alternatively, the create_or_replace_temp_view method can be used to creates a temporary view. The temporary view is only available in the session in which it is created.

Subscribe to our Newsletter !!

Leave a Comment

Related Posts