Snowflake Streams & Tasks — Script
Data Pipelining and Change Data Capture:
In databases, change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data.
CDC is an approach to data integration that is based on the identification, capture and delivery of the changes made to enterprise data sources.
CDC occurs often in data-warehouse environments since capturing and preserving the state of data across time is one of the core functions of a data warehouse, but CDC can be utilized in any database or data repository system.
Snowflake enables two features which allow you to perform data engineering pipelines while working in a change data capture process. A stream is an object on a Snowflake table that enables you to identify records that are new based on what is called an Offset.
A task is simply an atomic work item that manifests activities on data in the form of a query or stored procedure.
Below we will go over what STREAM’s and TASK’s are and then end up with a script that you can run in Snowflake.
What are Streams:
A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data. This process is referred to as change data capture (CDC). An individual table stream tracks the changes made to rows in a source table. A table stream (also referred to as simply a “stream”) makes a “change table” available of what changed, at the row level, between two transactional points of time in a table. This allows querying and consuming a sequence of change records in a transactional fashion.
Streams can be created to query change data on the following objects:
- Standard tables, including shared tables.
- Views, including secure views
- Directory tables
- External tables
The offset!
When created, a stream logically takes an initial snapshot of every row in the source object (e.g. table, external table, or the underlying tables for a view) by initializing a point in time (called an offset) as the current transactional version of the object. The change tracking system utilized by the stream then records information about the DML changes after this snapshot was taken. Change records provide the state of a row before and after the change. Change information mirrors the column structure of the tracked source object and includes additional metadata columns that describe each change event.
What are Tasks:
A task can execute any one of the following types of SQL code:
- Single SQL statement
- Call to a stored procedure
- Procedural logic using Snowflake Scripting
Tasks can be combined with table streams for continuous ELT workflows to process recently changed table rows. Streams ensure exactly once semantics for new or changed data in a table.
Tasks can also be used independently to generate periodic reports by inserting or merging rows into a report table or perform other periodic work.
Compute Resources
Tasks require compute resources to execute SQL code. Either of the following compute models can be chosen for individual tasks:
- Snowflake-managed (i.e. serverless compute model)
- User-managed (i.e. virtual warehouse)
Serverless Tasks
The serverless compute model for tasks enables you to rely on compute resources managed by Snowflake instead of user-managed virtual warehouses. The compute resources are automatically resized and scaled up or down by Snowflake as required for each workload. Snowflake determines the ideal size of the compute resources for a given run based on a dynamic analysis of statistics for the most recent previous runs of the same task. Multiple workloads in your account share a common set of compute resources.
The option to enable the serverless compute model must be specified when creating a task. The CREATE TASK syntax is nearly identical to tasks that rely on user-managed virtual warehouses. Omit the WAREHOUSE parameter to allow Snowflake to manage the compute resources for the task. Note that the role that executes the CREATE TASK command must have the global EXECUTE MANAGED TASK privilege. For more information about the access control requirements for tasks, see Task Security.
//=============================================================================
// Set context
//=============================================================================
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE ADHOC;
USE DATABASE _JT_DEMOS;
CREATE SCHEMA NETFLIX_STREAMS_AND_TASKS;
USE SCHEMA NETFLIX_STREAMS_AND_TASKS;
//=============================================================================
//=============================================================================
// Create tables and add show data
//=============================================================================
CREATE OR REPLACE TABLE
NETFLIX_RATING_EVENTS(
RAW_DATA VARIANT
);
CREATE OR REPLACE TABLE
NETFLIX_RATINGS(
SHOW_ID NUMBER,
SHOW_TITLE STRING,
RATING NUMBER
);
CREATE OR REPLACE TABLE
NETFLIX_SHOWS(
ID NUMBER,
TITLE STRING
);
INSERT INTO
NETFLIX_SHOWS // Shows added in no particular order
VALUES
(0, ‘BoJack Horseman’), // ←- This is the best show on Netflix
(1, ‘Ozark’),
(2, ‘Master of None’),
(3, ‘Mindhunter’),
(4, ‘The Haunting of Hill House’),
(5, ‘Tiger King’),
(6, ‘Stranger Things’);
//=============================================================================
//=============================================================================
// Create 2 streams on the NETFLIX_RATING_EVENTS table
//=============================================================================
CREATE OR REPLACE STREAM STREAM_A ON TABLE NETFLIX_RATING_EVENTS;
CREATE OR REPLACE STREAM STREAM_B ON TABLE NETFLIX_RATING_EVENTS;
SHOW STREAMS;
//=============================================================================
//=============================================================================
// Modify NETFLIX_RATING_EVENTS and examine streams
//=============================================================================
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 0, “rating”: 10 } ‘);
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:”rating” AS RATING
FROM
NETFLIX_SHOWS RIGHT OUTER JOIN STREAM_A
ON NETFLIX_SHOWS.ID = STREAM_A.RAW_DATA:”show_id”
WHERE
STREAM_A.RAW_DATA:”rating” IS NOT NULL
);
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 1, “rating”: 9 } ‘);
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 2, “rating”: 8 } ‘);
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:”rating” AS RATING
FROM
NETFLIX_SHOWS RIGHT OUTER JOIN STREAM_A
ON NETFLIX_SHOWS.ID = STREAM_A.RAW_DATA:”show_id”
WHERE
STREAM_A.RAW_DATA:”rating” IS NOT NULL
);
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
SELECT * FROM NETFLIX_RATINGS;
DELETE FROM NETFLIX_RATING_EVENTS WHERE RAW_DATA:”rating” IS NOT NULL;
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
//=============================================================================
//=============================================================================
// Automate stream ingestion with a task
//=============================================================================
CREATE OR REPLACE TASK NETFLIX_RATINGS_EVENT_PROCESSOR
WAREHOUSE = ADHOC — Add your warehouse here
SCHEDULE = ‘USING CRON * * * * * America/Chicago’ // process new records every minute
WHEN
SYSTEM$STREAM_HAS_DATA(‘STREAM_A’)
AS
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:”rating” AS RATING
FROM
NETFLIX_SHOWS RIGHT OUTER JOIN STREAM_A
ON NETFLIX_SHOWS.ID = STREAM_A.RAW_DATA:”show_id”
WHERE
STREAM_A.RAW_DATA:”rating” IS NOT NULL
);
// Tasks are suspended by default. Resume the task so it will run on schedule
ALTER TASK NETFLIX_RATINGS_EVENT_PROCESSOR RESUME;
select * from NETFLIX_RATING_EVENTS;
// add new events
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 3, “rating”: 8 } ‘);
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 4, “rating”: 8 } ‘);
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 5, “rating”: 7 } ‘);
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “show_id”: 6, “rating”: 9 } ‘);
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(‘ { “platform”: 9.75} ‘);
// Find the best show on netflix
SELECT * FROM NETFLIX_RATINGS;
SELECT
SHOW_TITLE,
AVG(RATING) AS AVG_RATING
FROM
NETFLIX_RATINGS
GROUP BY
SHOW_TITLE
ORDER BY
AVG_RATING DESC
LIMIT 1;
//=============================================================================
//=============================================================================
// Cleanup
//=============================================================================
DROP TASK IF EXISTS NETFLIX_RATINGS_EVENT_PROCESSOR;
DROP TABLE IF EXISTS NETFLIX_SHOWS;
DROP TABLE IF EXISTS NETFLIX_RATING_EVENTS;
DROP TABLE IF EXISTS NETFLIX_RATINGS;
DROP STREAM IF EXISTS STREAM_A;
DROP STREAM IF EXISTS STREAM_B;
DROP SCHEMA IF EXISTS NETFLIX_STREAMS_AND_TASKS;
//=============================================================================