Skip to main content

5 posts tagged with "snowflake"

View All Tags

· 3 min read
Yuncheng Yang

This article demonstrates how to use the Snowflake REST API to retrieve data for a web application using TypeScript, in this case we are using keypair authentication with Snowflake.

Overview

Snowflake’s SQL API allows you to access snowflake objects using SQL via a REST API request, this allows for easy integration with your applications and deployment pipelines. You can use the API to execute most DDL and DML statements.

There are some limitations you need to be aware of however, for example interactions with stages (using PUT and GET aren’t supported via the Snowflake API) or stored procedure operations (using CALL), you can read more on this here.

Endpoints

There are three endpoints provided:

  • /api/v2/statements/
  • /api/v2/statement/<statementHandle>
  • /api/v2/statements/<statementHandle/cancel

We will be looking at the first two in this article.

Authentication Methods

There are two types of Authentication methods for the API, OAuth and Key Pair. For OAuth method, you can choose to use X-Snowflake-Authorization-Token-Type header, if this header is not present, Snowflake assumes that the token in the Authorization header is an OAuth token. For Key Pair method, the JWT token will be in the Authorization header as Bearer <your token>.

Let’s walk through how to generate and use the JWT.

Generating the JWT

Here's whats needed:

Snowflake JWT

the Code

Request Body

Now we need a request body:

Submitting the Request

We will need to include the region and account identifier, for instance if your account identifier includes a region (e.g. xy12345.us-east2.aws.snowflakecomputing.com).

Response Handling

When making a SELECT query, there are three things worth noting:

  1. rowType fields in the resultSetMetaData represent the columns
  2. data without column names is in the format of string[][]
  3. partitionInfo is an array of object representing different partitions

For more information see Handling Responses from the SQL API - Snowflake Documentation.

Parsing data

Here is a Typescript code snippet demonstrating parsing return data:

Handling multiple partitions

Large result sets are paginated into partitions, each partition is a set of rows.

note

Note that the pages (referred to as partitions) are NOT based on row count, instead they are based on the compressed batch size, so they will not be uniform in terms of the number of rows.

To get a partition, send a GET request with Url https://<accountIdentifier>.snowflakecomputing.com/api/v2/statements/?partition=<partitionId>.

Thanks!

· 4 min read
Jeffrey Aven

Structured Streaming in Spark provides a powerful framework for stream processing an analysis, such as streaming transformations, stateful streaming or sliding window operations.

Kafka is a common streaming source and sink for Spark Streaming and Structured Streaming operations. However, there may be situations where a data warehouse (such as Snowflake) is a more appropriate target for streaming operations, especially where there is a reporting or long-term storage requirement on the data derived from the streaming source.

This article will demonstrate just how easy this is to implement using Python.

Design

The following diagram illustrates the ingestion design for this example:

Spark Structured Streaming using Kafka and Snowflake

Snowflake Setup

Some prerequisites for Snowflake:

  1. You will need to create a user (or use an existing user), in either case the user will need to be identified by a private key. You will need to generate a key pair as follows:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

copy the contents of the rsa_key.pub file, remove the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- strings, then remove the line breaks to form one string, use this string as the RSA_PUBLIC_KEY in a CREATE USER or ALTER USER statement in Snowflake, like:

ALTER USER youruser SET RSA_PUBLIC_KEY='MIIBI...';
  1. Now setup the target database, schema and table you will use to write out your stream data (the schema for the table must match the schema for the Data Stream you will use the DataStreamWriter to emit records to Snowflake

The user you will be using (that you setup the key pair authentication for) will need to be assigned a default role to which the appropriate write permissions are granted to the target objects in Snowflake. You will also need to designate a virtual warehouse (which your user must have USAGE permissions to.

The Code

Now that we have the objects and user setup in Snowflake, we can construct our Spark application.

First, you will need to start your Spark session (either using pyspark or spark-submit) including the packages that Spark will need to connect to Kafka and to Snowflake.

The Snowflake packages include a JDBC driver and the Snowflake Connector for Spark, see Snowflake Connector for Spark.

An example is shown here (package versions may vary depending upon the version of Spark you are using):

pyspark \
--packages \
net.snowflake:snowflake-jdbc:3.13.14,\
net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1

Now that we have a spark session with the necessary packages, lets go...

# import any required functions, set the checkpoint directory, and log level (optional)
from pyspark.sql.functions import split
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.streaming.checkpointLocation", "file:///tmp")

setup connection options for Snowflake by creating an sfOptions dictionary

sfOptions = {
"sfURL" : sfUrl,
"sfUser" : "avensolutions",
"pem_private_key": private_key,
"sfDatabase" : "SPARK_SNOWFLAKE_DEMO",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "COMPUTE_WH",
"streaming_stage" : "mystage"
}

set a variable for the Snowflake Spark connector

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

read messages from Kafka:

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkabroker:9092") \
.option("subscribe", "weblogs") \
.load()

perform necessary transformations (the fields and data types in the resultant data structure must match the target table you created in Snowflake:

log_recs = lines.select(
split(lines.value.cast("string"), " ").alias("data")
)

log_data = log_recs.selectExpr(
"CAST(data[0] as string) as date",
"CAST(data[1] as string) as time",
"CAST(data[2] as string) as c_ip",
"CAST(data[3] as string) as cs_username",
"CAST(data[4] as string) as s_sitename",
"CAST(data[5] as string) as s_computername",
"CAST(data[6] as string) as s_ip",
"CAST(data[7] as int) as s_port",
"CAST(data[8] as string) as cs_method",
"CAST(data[9] as string) as cs_uri_stem",
"CAST(data[10] as string) as cs_uri_query",
"CAST(data[11] as int) as sc_status",
"CAST(data[12] as int) as time_taken",
"CAST(data[13] as string) as cs_version",
"CAST(data[14] as string) as cs_host",
"CAST(data[15] as string) as User_Agent",
"CAST(data[16] as string) as Referer",
)

write to Snowflake!

query = log_data\
.writeStream\
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "WEB_LOGS") \
.trigger(processingTime='30 seconds') \
.start()

query.awaitTermination()
info

Note that I have included the processingTime trigger of 30 seconds (this is akin to the batchInterval in the DStream API), you should tune this to get a balance between batch sizes to ingest into Snowflake (which will benefit from larger batches) and latency.

The Results

Spark Structured Streaming into Snowflake

Enjoy!

· 6 min read
Jeffrey Aven

When you want the SFTP service without the SFTP Server.

In implementing data platforms with external data providers, it is common to use a managed file transfer platform or an SFTP gateway as an entry point for providers to supply data to your system.

Often in past implementations this would involve deploying a sever (typically a Linux VM) and provisioning and configuring an SFTP service. If you wanted the data sent by clients to be copied to another storage medium (such as S3 or EFS) you would need to roll your own code or subscribe to a marketplace offering to do so.

I recently trialled the AWS Transfer Family SFTP gateway offering from AWS and sharing my adventures here.

Architecture

In this reference architecture, we are deploying an SFTP service which uses a path in an S3 bucket as a user’s home directory. Objects in the bucket are encrypted with a customer managed KMS key. The SFTP server front end address is mapped to a vanity URL using Route53. The bucket and path are integrated with a STORAGE INTEGRATION, STAGE and PIPE definition in Snowflake. The Snowflake bits are covered in more detail in this blog: Automating Snowflake Role Based Storage Integration for AWS. This article just details the AWS Transfer Family SFTP setup.

AWS Transfer SFTP Architecture

Setup

The steps to set up this pattern are detailed below.

info

This example uses the Jsonnet/CloudFormation pattern described in this article: Simplifying Large CloudFormation Templates using Jsonnet. This is a useful pattern for breaking up a monolithic CloudFormation template at design time to more manageable resource scoped documents, then pre-processing these in a CI routine (GitLab CI, GitHub Actions, etc) to create a complete template.

Setup the Service

To setup the SFTP transfer service use the AWS::Transfer::Server resource type as shown below:

note

Use the tags shown to display the custom hostname (used as a vanity url) in the Transfer UI in the AWS console.

Create the S3 Bucket

Create a bucket which will be used to store incoming files sent via SFTP.

note

This example logs to a logging bucket, not shown for brevity.

Create a Customer Managed KMS Key

Create a customer managed KMS key which will be used to encrypt data stored in the S3 bucket created in the previous step.

Create an IAM role to access the bucket

Create an IAM role which will be assumed by the AWS Transfer Service to read and write to the S3 staging bucket.

important

You must assign permissions to use the KMS key created previously, failure to do so will result in errors such as:

remote readdir(): Permission denied

User Directory Mappings

An SFTP users home directory is mapped to a path in your S3 bucket. It is recommended to use the LOGICAL HomeDirectoryType. This will prevent SFTP users from:

  • seeing or being able to access other users home directories
  • seeing the bucket name or paths in the bucket above their home directory

There are some trade offs for this which can make deployment a little more challenging but we will cover off the steps from here.

Create a Scoped Down Policy

A "scoped down" policy prevents users from seeing or accessing objects in other users home directories. This is a text file that will be sourced as a string into the Policy parameter of each SFTP user you create.

important

Using the LOGICAL HomeDirectoryType you don't have access to variables which represent the bucket, so this needs to be hard coded in the policy.txt document.

Also if you are using a customer managed KMS key to encrypt the data in the bucket (which you should be), you need to add permissions to the key - which again cannot be represented by a variable.

Failure to do so will result in errors when trying to ls, put, etc into the user's home directory such as:

Couldn't read directory: Permission denied
Couldn't close file: Permission denied

Since these properties are unlikely to change for the lifetime of your service this should not be an issue.

Create a user

Users are identified by a username and an SSH key, providing the public key to the server. A sample user is shown here:

tip

As discussed previously, it is recommended to use LOGICAL home directory mappings, which prevents users from seeing information about the bucket or other directories on the SFTP server (including other users directories).

Create a Route 53 CNAME record

Ideally you want to use a vanity url for users to access your SFTP service, such as sftp.yourcompany.com. This can be accomplished by using a Route 53 CNAME record as shown here:

Create some shared Tags

You would have noticed a shared Tags definition in many of the libsonnet files shown, an example Tags source file is shown here:

Pull it all together!

Now that we have all of the input files, lets pull them all together in a jsonnet file, which will be preprocessed in a CI process to create a template we can deploy with AWS CloudFormation.

Your customers would now connect to your service using they private key which corresponds to the public key they supplied to you in one of the previous steps, for example:

sftp -i mysftpkey jeffrey_aven@sftp.yourdomain.com

Add more users and enjoy!

· 5 min read
Jeffrey Aven

I have used the instructions here to configure Snowpipe for several projects.

Although it is accurate, it is entirely click-ops oriented. I like to automate (and script) everything, so I have created a fully automated implementation using PowerShell, the aws and snowsql CLIs.

The challenge is that you need to go back and forth between AWS and Snowflake, exchanging information from each platform with the other.

Overview

A Role Based Storage Integration in Snowflake allows a user (an AWS user arn) in your Snowflake account to use a role in your AWS account, which in turns enables access to S3 and KMS resources used by Snowflake for an external stage.

The following diagram explains this (along with the PlantUML code used to create the diagram..):

Snowflake S3 Storage Integration

Setup

Some prerequisites (removed for brevity):

  1. set the following variables in your script:
  • $accountid – your AWS account ID
  • $bucketname – the bucket you are letting Snowflake use as an External Stage
  • $bucketarn – used in policy statements (you could easily derive this from the bucket name)
  • $kmskeyarn – assuming you are used customer managed encryption keys, your Snowflake storage integration will need to use these to decrypt data in the stage
  • $prefix – if you want to set up granular access (on a key/path basis)
  1. Configure Snowflake access credentials using environment variables or using the ~/.snowsql/config file (you should definitely use the SNOWSQL_PWD env var for your password however)
  2. Configure access to AWS using aws configure
note

The actions performed in both AWS and Snowflake required privileged access on both platforms.

The Code

I have broken this into steps, the complete code is included at the end of the article.

Create Policy Documents

You will need to create the policy documents to allow the role you will create to access objects in the target S3 bucket, you will also need an initial “Assume Role” policy document which will be used to create the role and then updated with information you will get from Snowflake later.

Create Snowflake Access Policy

Use the snowflake_policy_doc.json policy document created in the previous step to create a managed policy, you will need the arn returned in a subsequent statement.

Create Snowflake IAM Role

Use the initial assume_role_policy_doc.json created to create a new Snowflake access role, you will need the arn for this resource when you configure the Storage Integration in Snowflake.

Attach S3 Access Policy to the Role

Now you will attach the snowflake-access-policy to the snowflake-access-role using the $policyarn captured from the policy creation statement.

Create Storage Integration in Snowflake

Use the snowsql CLI to create a Storage Integration in Snowflake supplying the $rolearn captured from the role creation statement.

Get STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID

You will need the STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID values for the storage integration you created in the previous statement, these will be used to updated the assume role policy in your snowflake-access-role.

Update Snowflake Access Policy

Using the STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID values retrieved in the previous statements, you will update the assume-role-policy for the snowflake-access-role.

Test the Storage Integration

To test the connectivity between your Snowflake account and your AWS external stage using the Storage Integartion just created, create a stage as shown here:

Now list objects in the stage (assuming there are any).

list @my_stage;

This should just work! You can use your storage integration to create different stages for different paths in your External Stage bucket and use both of these objects to create Snowpipes for automated ingestion. Enjoy!

Complete Code

The complete code for this example is shown here:

· One min read
Jeffrey Aven

Snowflake allows roles to be assigned to other roles, so when a user is assigned to a role, they may inherit the ability to use countless other roles.

Challenge: recursively enumerate all roles for a given user

One solution would be to create a complex query on the “SNOWFLAKE"."ACCOUNT_USAGE"."GRANTS_TO_ROLES" object.

An easier solution is to use a stored procedure to recurse through grants for a given user and return an ARRAY of roles for that user.

This is a good programming exercise in tail call recursion (sort of) in JavaScript. Here is the code:

To call the stored proc, execute:

One drawback of stored procedures in Snowflake is that they can only have scalar or array return types and cannot be used directly in a SQL query, however you can use the table(result_scan(last_query_id())) trick to get around this, as shown below where we will pivot the ARRAY into a record set with the array elements as rows:

IMPORTANT

This query must be the next statement run immediately after the CALL statement and cannot be run again until you run another CALL statement.

More adventures with Snowflake soon!