Snowpipe is Snowflake’s continuous data ingestion service which enables loading data from files into database tables as soon as they are available in a stage.
In our previous articles, we have discussed about setting up Snowpipe on AWS and Snowpipe on Azure for automatic data ingestion. In this article, let us focus on how to load the files from external storage using the Snowpipe and monitor the load status of these files in Snowflake.
For the demonstration, we will load the data from files in AWS S3 location into Snowflake using
- A Snowpipe named MY_SNOWPIPE.
- A database table named EMPLOYEES to which the data will be loaded.
- The automatic data loads using Snowpipe are configured using Event Notifications.
The below image shows the source files present in the AWS S3 location to be loaded into Snowflake.
2. Loading Historic files older than 7 days
When a Snowpipe is created on top of an external stage, it will not process the files which are already existing in the location before the creation of Snowpipe. This is because the historic data files do not trigger event notifications and hence they are not picked up for ingestion by Snowpipe.
The files in the external storage location which are placed 7 days prior to the creation of the Snowpipe can be loaded manually by executing the COPY INTO <table> statement. This COPY INTO statement can be extracted from the DDL of the Snowpipe definition.
The below COPY INTO command loads the data from CSV files present in the Inbox folder of external stage MY_S3_STAGE into EMPLOYEES table.
COPY INTO EMPLOYEES FROM @MY_S3_STAGE/Inbox FILE_FORMAT = (TYPE = 'CSV' skip_header = 1);
The below image shows output of the COPY INTO statement which provides the loads status of the files processed into table.
The below image shows that a total of 1 million records are loaded into EMPLOYEES table.
3. Loading Historic files staged within the previous 7 days
The files in the external storage location which are placed within 7 days prior to the creation of the Snowpipe can still be loaded using the Snowpipe by executing the ALTER PIPE…REFRESH statement.
The below SQL statement copies files staged within the previous 7 days to the Snowpipe ingest queue for loading into the target table.
ALTER PIPE MY_SNOWPIPE REFRESH;
4. Loading Incremental files from External Stage using Snowpipe
For the Snowpipe to automatically pick the files placed after its creation, an Event Notification should be configured on your external location to notify Snowpipe when new data is available to load.
Once new files are placed in the external location, a configured event notification service informs Snowpipe that files are ready to load. Snowpipe copies the files into a queue and loads data from the queued files into the target table.
Let us place a new file in the same external location discussed earlier and verify how its load status can be tracked.
The below image shows that the total records in table increased from 1 million to 1.2 million after the new file is processed.
5. Monitoring Load status of Snowpipe
There are several ways through which the files processed through Snowpipe can be tracked as listed below.
- Snowsight Copy History Page
PIPE_USAGE_HISTORY is a table function that can be used to query the history of data loaded into Snowflake tables using Snowpipe within a specified date range.
PIPE_USAGE_HISTORY table function returns pipe activity within the last 14 days.
The following are the optional parameters that can be passed along with the function to narrow the search of the load history.
The below SQL query returns the data load history of your account through Snowpipe for a 30 minute range using PIPE_USAGE_HISTORY table function.
SELECT * FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY( DATE_RANGE_START=>TO_TIMESTAMP_TZ('2023-05-14 16:00:00.000 +0530'), DATE_RANGE_END=>TO_TIMESTAMP_TZ('2023-05-14 16:30:00.000 +0530'), PIPE_NAME=>'MY_SNOWPIPE'));
The below SQL query returns the data load history of of your account through Snowpipe of last 1 hour using PIPE_USAGE_HISTORY table function.
SELECT * FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY( DATE_RANGE_START=>DATEADD('HOUR',-1,CURRENT_DATE()), PIPE_NAME=>'MY_SNOWPIPE'));
The below image shows the output of the PIPE_USAGE_HISTORY queries provided above.
COPY_HISTORY is a table function that can be used to query the history of data loaded into Snowflake tables through both bulk data loading using COPY INTO statements and continuous data loading using Snowpipe within a specified date range.
COPY_HISTORY table function returns copy activity within the last 14 days.
The following are the parameters that can be passed along with the function to narrow the search of the copy activity.
- END_TIME (optional)
The below SQL query returns the data load history of last 1 hour of your account through Snowpipe using COPY_HISTORY table function.
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY( TABLE_NAME=>'EMPLOYEES', START_TIME=> DATEADD(HOURS, -1, CURRENT_TIMESTAMP())));
LOAD_HISTORY is an Information Schema View that can be used to query the history of data loaded into Snowflake tables using COPY INTO statements.
LOAD_HISTORY view returns copy activity within the last 14 days.
This view does not return the history of data loaded using Snowpipe.
The below SQL query returns the history of data loaded into the EMPLOYEES table using LOAD_HISTORY view.
SELECT * FROM INFORMATION_SCHEMA.LOAD_HISTORY WHERE TABLE_NAME = 'EMPLOYEES' ORDER BY LAST_LOAD_TIME DESC;
Note that the historical data for COPY INTO commands is removed from the view when a table is dropped.
5.4. Snowsight Copy History Page
The Snowsight Copy History page provides details of both bulk data loading activity using COPY INTO statements and continuous data loading using Snowpipe. The Copy History page provides a detailed table of data loads for your tables.
Copy History page provides details of data loading activity that has occurred over the last 365 days for all tables in your account.
To access data load history information in Snowsight,
- Login to Snowsight > Activity > Copy History
The below image shows the data load information from the Copy History page in Snowsight.
To summarize everything related to loading files using Snowpipe.
- The Historical files which are older than 7 days cannot be processed by Snowpipe. They should be manually processed using COPY INTO statement.
- The Historical files which are placed within the previous 7 days can be processed by executing the ALTER PIPE…REFRESH statement.
- The Incremental files are processed by Snowpipe automatically by setting up an Event Notification service which notifies Snowpipe when new files are arrived.
The below table summarizes the details related to monitoring load status of Snowpipe
|Monitoring Method||Type||Captures load status of COPY INTO statement||Captures load status of SNOWPIPE||No of days of history captured|
|Copy History Page||Snowsight UI feature||Yes||Yes||365|
Subscribe to our Newsletter !!
Snowflake SQL REST API allows users to interact with Snowflake through HTTP requests, making it easy to integrate with other systems.
Learn how to submit an API request containing multiple statements to execute to the Snowflake SQL REST API using Postman.
A step by step guide on automating continuous data loading into Snowflake through Snowpipe on AWS S3.