Skip to main content

7 posts tagged with "snowflake"

View All Tags

· 6 min read
Jeffrey Aven

DBT (or Data Build Tool) is a modern data transformation tool born in the cloud/DevOps era. It's a great project which much has been written about; I will try to give as brief an overview as possible.

ETL vs ELT Refresher

A quick refresher on ELT vs ETL before we discuss DBT. I have created an infographic for this...

ETL vs ELT

Summary

DBT is an open-source command line tool written in Python from DBT Labs (formerly Fishtown Analytics).

DBT is designed to manage data transformations while applying software engineering best practices (including version control, automated testing, reviews, approvals, etc). Its modern software engineering and cloud-first design goals separate it from its old-school ETL/ELT predecessors.

DBT is an ELT tool focusing on the T(ransform) only, the E(xtract) and L(oad) are up to you (there are plenty of tools that specialize in this).

At its core DBT is a templating engine using Jinja (Python templating engine); it generates templates that represent SQL commands to create, replace or update objects in your database (the “T” in ELT), then oversees the execution of the templated commands. The work is "pushed down" to the underlying database containing the source and target objects and data.

Models

The concept most integral to DBT is the Model. A Model is simply a representation of a transform (or set of transforms) to a dataset, resulting in a target object (which could be a table or tables in a datamart). A model is expressed as a SELECT statement stored in a .sql file in your dbt project (well get to that in a minute).

Suppose we want to create a denormalized fact table for commits to store in a datamart in BigQuery. This is what a model file might look like (using the BigQuery SQL dialect and referencing objects that should exist and be accessible at runtime when we execute the model).

{{ config(materialized='table') }}

with commits as (
SELECT
SUBSTR(commit, 0, 13) as commit_short_sha,
committer.name as commiter_name,
committer.date as commit_date,
message,
repo_name
FROM `bigquery-public-data.github_repos.sample_commits` c
)

select * from commits

Models are created as views by default, but you can materialize these as tables where needed.

You configure connectivity to the target database using adapters (software libraries provided by dbt in the case of most mainstream databases) and profiles (which contain details around authentication, databases/datasets, schemas etc).

DBT Project

A DBT project is simply a folder containing your models and some other configuration data. You can initialize this by running dbt init from your desired project directory. In its most basic form, the structure looks like this:

models/
├─ your_model.sql
├─ schema.yml
dbt_project.yml

Your models can be created under subfolders for organization. schema.yml is an optional file that contains tests for columns, can also include descriptions for documentation. The dbt_project.yml file is the main entry point for the dbt program, it contains the configuration for the project, including which profile to use. Profiles (stored in a file called profiles.yml store all of the necessary connectivity information for your target database platform. By default dbt init creates this file a .dbt folder under your home directory.

info

You could store this with your project (be careful not to commit secrets like database credentials to source control). If you store it in any other directory than the default, you will need to tell dbt where it can find this file using the --profiles-dir argument of any dbt command, see here for more information.

To confirm your project is ship shape, run dbt parse; if there are no errors, you are good to proceed running and testing your models.

Running DBT Models

To run your models, simply run the following command from the directory containing your dbt_project.yml file (typically the root of your project folder):

Model deployed! Let's test it:

This will run all of the tests associated with your model(s) - in this case, not null and unique tests defined in the schema.yml file. That's it, deployed and tested.

Other Stuff

There is some other stuff in DBT you should be aware of, like seeds, snapshots, analyses, macros, and more but our five minutes is up 😃. We can discuss these next time; you are up and running with the basics of DBT now, get transforming!

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 7 min read
Jeffrey Aven

Loading Parquet format files into BigQuery is straightforward, you just need to specify the file location (local, Google Cloud Storage, Drive, Amazon S3 or Azure Blob storage) and thats pretty much it, BigQuery works the rest out from there.

bq load \
--location=australia-southeast2 \
--project_id=parquet-demo \
--source_format=PARQUET \
parquet_test.dim_calendar \
.\Calendar.gzip

In Snowflake, however, it is not as simple, I'll share my approach to automating this here.

info

Parquet is a self-describing, column-oriented storage format commonly used in distributed systems for input and output. Data in Parquet files is serialised for optimised consumption from Parquet client libraries and packages such as pandas, pyarrow, fastparquet, dask, and pyspark.

Background

Data in a Parquet file is stored in a single column for a self-contained dataset. If you were to ingest this into Snowflake without knowing the schema you could do something like this...

CREATE OR REPLACE TABLE PARQUET_TEST.PUBLIC.DIM_CALENDAR (
Data variant
);

COPY INTO PARQUET_TEST.PUBLIC.DIM_CALENDAR
(
Data
) FROM (
SELECT
*
FROM
@PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE)
file_format = (TYPE = parquet);

You would end up with something like...

RowData
1{"CalMonthOfYearNo": 6, "CalYear": 2020, ... }
2{"CalMonthOfYearNo": 6, "CalYear": 2020, ... }
......

You could then have a second stage of processing to convert this into a normal relational structure.

Or you could do this in one step, with a little prep work ahead of time. In my scenario I was given several parquet files from a client for a one-off load into Snowflake, several files for a fact table and multiple single files representing different dimension tables.

Streamlined Ingestion for Parquet Files into Snowflake

To collapse the formatting and uploading of Parquet files into a materialized table into one step, we need to do a couple of things:

  1. Create the target table with the correct schema (column names and data types); and
  2. perform a projection in our COPY command from the single column containing all of the data (represented by $1 in Snowflake) into columns defined in step 1

Since this is technically a transformation and only named stages are supported for COPY transformations, we need to create a stage for the copy. In my case there is a pre-existing Storage Integration in place that can be used by the stage.

Generate Table DDL

To automate the generation of the DDL to create the table and stage and the COPY command, I used Python and Spark (which has first class support for Parquet files). Parquet datatypes are largely the same as Snowflake, but if we needed to, we could create a map and modify the target types during the DDL generation.

First copy specimen Parquet formatted files to a local directory, the script we are creating can then iterate through the parquet files and generate all of the commands we will need saved to a .sql file.

With some setup information provided (not shown for brevity), we will first go through each file in the directory, capture metadata along with the schema (column name and data type) as shown here:

for file in files:
tableMap = {}
table = file.stem
spark = launch_spark_session()
parquetFile = spark.read.parquet("%s/%s" %(BASE_DIR, file))
data_types = parquetFile.dtypes
stop_spark_session(spark)
tableMap['name'] = table
tableMap['file'] = file
tableMap['data_types'] = data_types
allTables.append(tableMap)

The allTables list looks something like this...

[{'name': 'Calendar', 'file': PosixPath('data/dim/Calendar.gzip'), 'data_types': [('Time_ID', 'bigint'), ('CalYear', 'bigint'), ('CalMonthOfYearNo', 'bigint'), ('FinYear', 'bigint'), ('FinWeekOfYearNo', 'bigint')]}, ... ]

Next we generate the CREATE TABLE statement using the allTables list:

# create output file for all sql
with open('all_tables.sql', 'w') as f:
for table in allTables:
print("processing %s..." % table['name'])
f.write("/*** Create %s Table***/" % table['name'].upper())
sql = """
CREATE OR REPLACE TABLE %s.%s.%s (
""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s %s,\n" % (column[0], column[1])
sql = sql[:-2] + "\n);"
f.write(sql)
f.write("\n\n")

Generate Named Stage DDL

Then we generate the stage in S3 from which the files will be loaded:

        f.write("/*** Create %s Stage***/" % table['name'].upper())
sql = """
CREATE OR REPLACE STAGE %s.%s.%s_STAGE
url='%s/%s'
storage_integration = %s
encryption=(type='AWS_SSE_KMS' kms_key_id = '%s');
""" % (database, schema, table['name'].upper(), s3_prefix, table['file'], storage_int, kms_key_id)
f.write(sql)
f.write("\n\n")

Generate COPY commands

Then we generate the COPY commands...

        f.write("/*** Copying Data into %s ***/" % table['name'].upper())
sql = """
COPY INTO %s.%s.%s
(\n""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s,\n" % column[0]
sql = sql[:-2] + "\n)"
sql += " FROM (\nSELECT\n"
for column in table['data_types']:
sql += " $1:%s::%s,\n" % (column[0], column[1])
sql = sql[:-2] + "\nFROM\n"
sql += "@%s.%s.%s_STAGE)\n" % (database, schema, table['name'].upper())
sql += " file_format = (TYPE = parquet);"
f.write(sql)
f.write("\n\n")

Since this is a one off load, we will go ahead and drop the stage we created as it is no longer needed (this step is optional)..

        f.write("/*** Dropping stage for %s ***/" % table['name'].upper())
sql = """
DROP STAGE %s.%s.%s_STAGE;
""" % (database, schema, table['name'].upper())
f.write(sql)
f.write("\n\n")

The resultant file created looks like this..

/*** Create CALENDAR Table***/
CREATE OR REPLACE TABLE PARQUET_TEST.PUBLIC.DIM_CALENDAR (
Time_ID bigint,
CalYear bigint,
CalMonthOfYearNo bigint,
FinYear bigint,
FinWeekOfYearNo bigint
);

/*** Create DIM_CALENDAR Stage***/
CREATE OR REPLACE STAGE PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE
url='s3://my-bucket/data/dim/Calendar.gzip'
storage_integration = my_storage_int
encryption=(type='AWS_SSE_KMS' kms_key_id = '4f715ec9-ee8e-44ab-b35d-8daf36c05f19');

/*** Copying Data into DIM_CALENDAR ***/
COPY INTO PARQUET_TEST.PUBLIC.DIM_CALENDAR
(
Time_ID,
CalYear,
CalMonthOfYearNo,
FinYear,
FinWeekOfYearNo
) FROM (
SELECT
$1:Time_ID::bigint,
$1:CalYear::bigint,
$1:CalMonthOfYearNo::bigint,
$1:FinYear::bigint,
$1:FinWeekOfYearNo::bigint
FROM
@PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE)
file_format = (TYPE = parquet);

/*** Dropping stage for DIM_CALENDAR ***/
DROP STAGE PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE;

Load your data

You can then run this along with all of the other dimension and fact table DDL and COPY commands generated to perform the one-off load from parquet files. You can find the complete code below, enjoy!

Complete Code
from pathlib import Path
from pyspark.sql import SparkSession
def launch_spark_session():
return SparkSession \
.builder \
.appName("Parquet DDL Generation") \
.getOrCreate()

def stop_spark_session(spark):
spark.stop()

allTables = []
database = "PARQUET_TEST"
schema = "PUBLIC"
s3_prefix = 's3://my-bucket'
storage_int = 'my_storage_int'
kms_key_id = '4f715ec9-ee8e-44ab-b35d-8daf36c05f19'

BASE_DIR = Path(__file__).resolve().parent
directory = 'data/dim'
files = Path(directory).glob('*.gzip')
for file in files:
tableMap = {}
table = file.stem
spark = launch_spark_session()
parquetFile = spark.read.parquet("%s/%s" %(BASE_DIR, file))
data_types = parquetFile.dtypes
stop_spark_session(spark)
tableMap['name'] = table
tableMap['file'] = file
tableMap['data_types'] = data_types
allTables.append(tableMap)

# create output file for all sql
with open('all_tables.sql', 'w') as f:
for table in allTables:
print("processing %s..." % table['name'])
f.write("/*** Create %s Table***/" % table['name'].upper())
sql = """
CREATE OR REPLACE TABLE %s.%s.%s (
""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s %s,\n" % (column[0], column[1])
sql = sql[:-2] + "\n);"
f.write(sql)
f.write("\n\n")

f.write("/*** Create %s Stage***/" % table['name'].upper())
sql = """
CREATE OR REPLACE STAGE %s.%s.%s_STAGE
url='%s/%s'
storage_integration = %s
encryption=(type='AWS_SSE_KMS' kms_key_id = '%s');
""" % (database, schema, table['name'].upper(), s3_prefix, table['file'], storage_int, kms_key_id)
f.write(sql)
f.write("\n\n")

f.write("/*** Copying Data into %s ***/" % table['name'].upper())
sql = """
COPY INTO %s.%s.%s
(\n""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s,\n" % column[0]
sql = sql[:-2] + "\n)"
sql += " FROM (\nSELECT\n"
for column in table['data_types']:
sql += " $1:%s::%s,\n" % (column[0], column[1])
sql = sql[:-2] + "\nFROM\n"
sql += "@%s.%s.%s_STAGE)\n" % (database, schema, table['name'].upper())
sql += " file_format = (TYPE = parquet);"
f.write(sql)
f.write("\n\n")

f.write("/*** Dropping stage for %s ***/" % table['name'].upper())
sql = """
DROP STAGE %s.%s.%s_STAGE;
""" % (database, schema, table['name'].upper())
f.write(sql)
f.write("\n\n")

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 3 min read
Yuncheng Yang

This article demonstrates how to use the Snowflake REST API to retrieve data for a web application using TypeScript, in this case we are using keypair authentication with Snowflake.

Overview

Snowflake’s SQL API allows you to access snowflake objects using SQL via a REST API request, this allows for easy integration with your applications and deployment pipelines. You can use the API to execute most DDL and DML statements.

There are some limitations you need to be aware of however, for example interactions with stages (using PUT and GET aren’t supported via the Snowflake API) or stored procedure operations (using CALL), you can read more on this here.

Endpoints

There are three endpoints provided:

  • /api/v2/statements/
  • /api/v2/statement/<statementHandle>
  • /api/v2/statements/<statementHandle/cancel

We will be looking at the first two in this article.

Authentication Methods

There are two types of Authentication methods for the API, OAuth and Key Pair. For OAuth method, you can choose to use X-Snowflake-Authorization-Token-Type header, if this header is not present, Snowflake assumes that the token in the Authorization header is an OAuth token. For Key Pair method, the JWT token will be in the Authorization header as Bearer <your token>.

Let’s walk through how to generate and use the JWT.

Generating the JWT

Here's whats needed:

Snowflake JWT

the Code

Request Body

Now we need a request body:

Submitting the Request

We will need to include the region and account identifier, for instance if your account identifier includes a region (e.g. xy12345.us-east2.aws.snowflakecomputing.com).

Response Handling

When making a SELECT query, there are three things worth noting:

  1. rowType fields in the resultSetMetaData represent the columns
  2. data without column names is in the format of string[][]
  3. partitionInfo is an array of object representing different partitions

For more information see Handling Responses from the SQL API - Snowflake Documentation.

Parsing data

Here is a Typescript code snippet demonstrating parsing return data:

Handling multiple partitions

Large result sets are paginated into partitions, each partition is a set of rows.

note

Note that the pages (referred to as partitions) are NOT based on row count, instead they are based on the compressed batch size, so they will not be uniform in terms of the number of rows.

To get a partition, send a GET request with Url https://<accountIdentifier>.snowflakecomputing.com/api/v2/statements/?partition=<partitionId>.

Thanks!

· 4 min read
Jeffrey Aven

Structured Streaming in Spark provides a powerful framework for stream processing an analysis, such as streaming transformations, stateful streaming or sliding window operations.

Kafka is a common streaming source and sink for Spark Streaming and Structured Streaming operations. However, there may be situations where a data warehouse (such as Snowflake) is a more appropriate target for streaming operations, especially where there is a reporting or long-term storage requirement on the data derived from the streaming source.

This article will demonstrate just how easy this is to implement using Python.

Design

The following diagram illustrates the ingestion design for this example:

Spark Structured Streaming using Kafka and Snowflake

Snowflake Setup

Some prerequisites for Snowflake:

  1. You will need to create a user (or use an existing user), in either case the user will need to be identified by a private key. You will need to generate a key pair as follows:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

copy the contents of the rsa_key.pub file, remove the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- strings, then remove the line breaks to form one string, use this string as the RSA_PUBLIC_KEY in a CREATE USER or ALTER USER statement in Snowflake, like:

ALTER USER youruser SET RSA_PUBLIC_KEY='MIIBI...';
  1. Now setup the target database, schema and table you will use to write out your stream data (the schema for the table must match the schema for the Data Stream you will use the DataStreamWriter to emit records to Snowflake

The user you will be using (that you setup the key pair authentication for) will need to be assigned a default role to which the appropriate write permissions are granted to the target objects in Snowflake. You will also need to designate a virtual warehouse (which your user must have USAGE permissions to.

The Code

Now that we have the objects and user setup in Snowflake, we can construct our Spark application.

First, you will need to start your Spark session (either using pyspark or spark-submit) including the packages that Spark will need to connect to Kafka and to Snowflake.

The Snowflake packages include a JDBC driver and the Snowflake Connector for Spark, see Snowflake Connector for Spark.

An example is shown here (package versions may vary depending upon the version of Spark you are using):

pyspark \
--packages \
net.snowflake:snowflake-jdbc:3.13.14,\
net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1

Now that we have a spark session with the necessary packages, lets go...

# import any required functions, set the checkpoint directory, and log level (optional)
from pyspark.sql.functions import split
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.streaming.checkpointLocation", "file:///tmp")

setup connection options for Snowflake by creating an sfOptions dictionary

sfOptions = {
"sfURL" : sfUrl,
"sfUser" : "avensolutions",
"pem_private_key": private_key,
"sfDatabase" : "SPARK_SNOWFLAKE_DEMO",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "COMPUTE_WH",
"streaming_stage" : "mystage"
}

set a variable for the Snowflake Spark connector

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

read messages from Kafka:

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkabroker:9092") \
.option("subscribe", "weblogs") \
.load()

perform necessary transformations (the fields and data types in the resultant data structure must match the target table you created in Snowflake:

log_recs = lines.select(
split(lines.value.cast("string"), " ").alias("data")
)

log_data = log_recs.selectExpr(
"CAST(data[0] as string) as date",
"CAST(data[1] as string) as time",
"CAST(data[2] as string) as c_ip",
"CAST(data[3] as string) as cs_username",
"CAST(data[4] as string) as s_sitename",
"CAST(data[5] as string) as s_computername",
"CAST(data[6] as string) as s_ip",
"CAST(data[7] as int) as s_port",
"CAST(data[8] as string) as cs_method",
"CAST(data[9] as string) as cs_uri_stem",
"CAST(data[10] as string) as cs_uri_query",
"CAST(data[11] as int) as sc_status",
"CAST(data[12] as int) as time_taken",
"CAST(data[13] as string) as cs_version",
"CAST(data[14] as string) as cs_host",
"CAST(data[15] as string) as User_Agent",
"CAST(data[16] as string) as Referer",
)

write to Snowflake!

query = log_data\
.writeStream\
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "WEB_LOGS") \
.trigger(processingTime='30 seconds') \
.start()

query.awaitTermination()
info

Note that I have included the processingTime trigger of 30 seconds (this is akin to the batchInterval in the DStream API), you should tune this to get a balance between batch sizes to ingest into Snowflake (which will benefit from larger batches) and latency.

The Results

Spark Structured Streaming into Snowflake

Enjoy!

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 7 min read
Jeffrey Aven

When you want the SFTP service without the SFTP Server.

In implementing data platforms with external data providers, it is common to use a managed file transfer platform or an SFTP gateway as an entry point for providers to supply data to your system.

Often in past implementations this would involve deploying a sever (typically a Linux VM) and provisioning and configuring an SFTP service. If you wanted the data sent by clients to be copied to another storage medium (such as S3 or EFS) you would need to roll your own code or subscribe to a marketplace offering to do so.

I recently trialled the AWS Transfer Family SFTP gateway offering from AWS and sharing my adventures here.

Architecture

In this reference architecture, we are deploying an SFTP service which uses a path in an S3 bucket as a user’s home directory. Objects in the bucket are encrypted with a customer managed KMS key. The SFTP server front end address is mapped to a vanity URL using Route53. The bucket and path are integrated with a STORAGE INTEGRATION, STAGE and PIPE definition in Snowflake. The Snowflake bits are covered in more detail in this blog: Automating Snowflake Role Based Storage Integration for AWS. This article just details the AWS Transfer Family SFTP setup.

AWS Transfer SFTP Architecture

Setup

The steps to set up this pattern are detailed below.

info

This example uses the Jsonnet/CloudFormation pattern described in this article: Simplifying Large CloudFormation Templates using Jsonnet. This is a useful pattern for breaking up a monolithic CloudFormation template at design time to more manageable resource scoped documents, then pre-processing these in a CI routine (GitLab CI, GitHub Actions, etc) to create a complete template.

Setup the Service

To setup the SFTP transfer service use the AWS::Transfer::Server resource type as shown below:

note

Use the tags shown to display the custom hostname (used as a vanity url) in the Transfer UI in the AWS console.

Create the S3 Bucket

Create a bucket which will be used to store incoming files sent via SFTP.

note

This example logs to a logging bucket, not shown for brevity.

Create a Customer Managed KMS Key

Create a customer managed KMS key which will be used to encrypt data stored in the S3 bucket created in the previous step.

Create an IAM role to access the bucket

Create an IAM role which will be assumed by the AWS Transfer Service to read and write to the S3 staging bucket.

info

You must assign permissions to use the KMS key created previously, failure to do so will result in errors such as:

remote readdir(): Permission denied

User Directory Mappings

An SFTP users home directory is mapped to a path in your S3 bucket. It is recommended to use the LOGICAL HomeDirectoryType. This will prevent SFTP users from:

  • seeing or being able to access other users home directories
  • seeing the bucket name or paths in the bucket above their home directory

There are some trade offs for this which can make deployment a little more challenging but we will cover off the steps from here.

Create a Scoped Down Policy

A "scoped down" policy prevents users from seeing or accessing objects in other users home directories. This is a text file that will be sourced as a string into the Policy parameter of each SFTP user you create.

info

Using the LOGICAL HomeDirectoryType you don't have access to variables which represent the bucket, so this needs to be hard coded in the policy.txt document.

Also if you are using a customer managed KMS key to encrypt the data in the bucket (which you should be), you need to add permissions to the key - which again cannot be represented by a variable.

Failure to do so will result in errors when trying to ls, put, etc into the user's home directory such as:

Couldn't read directory: Permission denied
Couldn't close file: Permission denied

Since these properties are unlikely to change for the lifetime of your service this should not be an issue.

Create a user

Users are identified by a username and an SSH key, providing the public key to the server. A sample user is shown here:

tip

As discussed previously, it is recommended to use LOGICAL home directory mappings, which prevents users from seeing information about the bucket or other directories on the SFTP server (including other users directories).

Create a Route 53 CNAME record

Ideally you want to use a vanity url for users to access your SFTP service, such as sftp.yourcompany.com. This can be accomplished by using a Route 53 CNAME record as shown here:

Create some shared Tags

You would have noticed a shared Tags definition in many of the libsonnet files shown, an example Tags source file is shown here:

Pull it all together!

Now that we have all of the input files, lets pull them all together in a jsonnet file, which will be preprocessed in a CI process to create a template we can deploy with AWS CloudFormation.

Your customers would now connect to your service using they private key which corresponds to the public key they supplied to you in one of the previous steps, for example:

sftp -i mysftpkey jeffrey_aven@sftp.yourdomain.com

Add more users and enjoy!

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!