Skip to main content

4 posts tagged with "sql"

View All Tags

· 3 min read
Yuncheng Yang

This article demonstrates how to use the Snowflake REST API to retrieve data for a web application using TypeScript, in this case we are using keypair authentication with Snowflake.

Overview

Snowflake’s SQL API allows you to access snowflake objects using SQL via a REST API request, this allows for easy integration with your applications and deployment pipelines. You can use the API to execute most DDL and DML statements.

There are some limitations you need to be aware of however, for example interactions with stages (using PUT and GET aren’t supported via the Snowflake API) or stored procedure operations (using CALL), you can read more on this here.

Endpoints

There are three endpoints provided:

  • /api/v2/statements/
  • /api/v2/statement/<statementHandle>
  • /api/v2/statements/<statementHandle/cancel

We will be looking at the first two in this article.

Authentication Methods

There are two types of Authentication methods for the API, OAuth and Key Pair. For OAuth method, you can choose to use X-Snowflake-Authorization-Token-Type header, if this header is not present, Snowflake assumes that the token in the Authorization header is an OAuth token. For Key Pair method, the JWT token will be in the Authorization header as Bearer <your token>.

Let’s walk through how to generate and use the JWT.

Generating the JWT

Here's whats needed:

Snowflake JWT

the Code

Request Body

Now we need a request body:

Submitting the Request

We will need to include the region and account identifier, for instance if your account identifier includes a region (e.g. xy12345.us-east2.aws.snowflakecomputing.com).

Response Handling

When making a SELECT query, there are three things worth noting:

  1. rowType fields in the resultSetMetaData represent the columns
  2. data without column names is in the format of string[][]
  3. partitionInfo is an array of object representing different partitions

For more information see Handling Responses from the SQL API - Snowflake Documentation.

Parsing data

Here is a Typescript code snippet demonstrating parsing return data:

Handling multiple partitions

Large result sets are paginated into partitions, each partition is a set of rows.

note

Note that the pages (referred to as partitions) are NOT based on row count, instead they are based on the compressed batch size, so they will not be uniform in terms of the number of rows.

To get a partition, send a GET request with Url https://<accountIdentifier>.snowflakecomputing.com/api/v2/statements/?partition=<partitionId>.

Thanks!

· 4 min read
Jeffrey Aven

Structured Streaming in Spark provides a powerful framework for stream processing an analysis, such as streaming transformations, stateful streaming or sliding window operations.

Kafka is a common streaming source and sink for Spark Streaming and Structured Streaming operations. However, there may be situations where a data warehouse (such as Snowflake) is a more appropriate target for streaming operations, especially where there is a reporting or long-term storage requirement on the data derived from the streaming source.

This article will demonstrate just how easy this is to implement using Python.

Design

The following diagram illustrates the ingestion design for this example:

Spark Structured Streaming using Kafka and Snowflake

Snowflake Setup

Some prerequisites for Snowflake:

  1. You will need to create a user (or use an existing user), in either case the user will need to be identified by a private key. You will need to generate a key pair as follows:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

copy the contents of the rsa_key.pub file, remove the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- strings, then remove the line breaks to form one string, use this string as the RSA_PUBLIC_KEY in a CREATE USER or ALTER USER statement in Snowflake, like:

ALTER USER youruser SET RSA_PUBLIC_KEY='MIIBI...';
  1. Now setup the target database, schema and table you will use to write out your stream data (the schema for the table must match the schema for the Data Stream you will use the DataStreamWriter to emit records to Snowflake

The user you will be using (that you setup the key pair authentication for) will need to be assigned a default role to which the appropriate write permissions are granted to the target objects in Snowflake. You will also need to designate a virtual warehouse (which your user must have USAGE permissions to.

The Code

Now that we have the objects and user setup in Snowflake, we can construct our Spark application.

First, you will need to start your Spark session (either using pyspark or spark-submit) including the packages that Spark will need to connect to Kafka and to Snowflake.

The Snowflake packages include a JDBC driver and the Snowflake Connector for Spark, see Snowflake Connector for Spark.

An example is shown here (package versions may vary depending upon the version of Spark you are using):

pyspark \
--packages \
net.snowflake:snowflake-jdbc:3.13.14,\
net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1

Now that we have a spark session with the necessary packages, lets go...

# import any required functions, set the checkpoint directory, and log level (optional)
from pyspark.sql.functions import split
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.streaming.checkpointLocation", "file:///tmp")

setup connection options for Snowflake by creating an sfOptions dictionary

sfOptions = {
"sfURL" : sfUrl,
"sfUser" : "avensolutions",
"pem_private_key": private_key,
"sfDatabase" : "SPARK_SNOWFLAKE_DEMO",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "COMPUTE_WH",
"streaming_stage" : "mystage"
}

set a variable for the Snowflake Spark connector

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

read messages from Kafka:

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkabroker:9092") \
.option("subscribe", "weblogs") \
.load()

perform necessary transformations (the fields and data types in the resultant data structure must match the target table you created in Snowflake:

log_recs = lines.select(
split(lines.value.cast("string"), " ").alias("data")
)

log_data = log_recs.selectExpr(
"CAST(data[0] as string) as date",
"CAST(data[1] as string) as time",
"CAST(data[2] as string) as c_ip",
"CAST(data[3] as string) as cs_username",
"CAST(data[4] as string) as s_sitename",
"CAST(data[5] as string) as s_computername",
"CAST(data[6] as string) as s_ip",
"CAST(data[7] as int) as s_port",
"CAST(data[8] as string) as cs_method",
"CAST(data[9] as string) as cs_uri_stem",
"CAST(data[10] as string) as cs_uri_query",
"CAST(data[11] as int) as sc_status",
"CAST(data[12] as int) as time_taken",
"CAST(data[13] as string) as cs_version",
"CAST(data[14] as string) as cs_host",
"CAST(data[15] as string) as User_Agent",
"CAST(data[16] as string) as Referer",
)

write to Snowflake!

query = log_data\
.writeStream\
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "WEB_LOGS") \
.trigger(processingTime='30 seconds') \
.start()

query.awaitTermination()
info

Note that I have included the processingTime trigger of 30 seconds (this is akin to the batchInterval in the DStream API), you should tune this to get a balance between batch sizes to ingest into Snowflake (which will benefit from larger batches) and latency.

The Results

Spark Structured Streaming into Snowflake

Enjoy!

· One min read
Jeffrey Aven

Snowflake allows roles to be assigned to other roles, so when a user is assigned to a role, they may inherit the ability to use countless other roles.

Challenge: recursively enumerate all roles for a given user

One solution would be to create a complex query on the “SNOWFLAKE"."ACCOUNT_USAGE"."GRANTS_TO_ROLES" object.

An easier solution is to use a stored procedure to recurse through grants for a given user and return an ARRAY of roles for that user.

This is a good programming exercise in tail call recursion (sort of) in JavaScript. Here is the code:

To call the stored proc, execute:

One drawback of stored procedures in Snowflake is that they can only have scalar or array return types and cannot be used directly in a SQL query, however you can use the table(result_scan(last_query_id())) trick to get around this, as shown below where we will pivot the ARRAY into a record set with the array elements as rows:

IMPORTANT

This query must be the next statement run immediately after the CALL statement and cannot be run again until you run another CALL statement.

More adventures with Snowflake soon!

· 3 min read
Jeffrey Aven

Most traditional data warehouse or datamart ETL routines consist of multi stage SQL transformations, often a series of CTAS (CREATE TABLE AS SELECT) statements usually creating transient or temporary tables – such as volatile tables in Teradata or Common Table Expressions (CTE’s).

The initial challenge when moving from a SQL/MPP based ETL framework platformed on Oracle, Teradata, SQL Server, etc to a Spark based ETL framework is what to do with this…

Multi Stage SQL Based ETL

One approach is to use the lightweight, configuration driven, multi stage Spark SQL based ETL framework described in this post.

This framework is driven from a YAML configuration document. YAML was preferred over JSON as a document format as it allows for multi-line statements (SQL statements), as well as comments - which are very useful as SQL can sometimes be undecipherable even for the person that wrote it.

The YAML config document has three main sections: sources, transforms and targets.

Sources

The sources section is used to configure the input data source(s) including optional column and row filters. In this case the data sources are tables available in the Spark catalog (for instance the AWS Glue Catalog or a Hive Metastore), this could easily be extended to read from other datasources using the Spark DataFrameReader API.

Transforms

The transforms section contains the multiple SQL statements to be run in sequence where each statement creates a temporary view using objects created by preceding statements.

Targets

Finally the targets section writes out the final object or objects to a specified destination (S3, HDFS, etc).

Process SQL Statements

The process_sql_statements.py script that is used to execute the framework is very simple (30 lines of code not including comments, etc). It loads the sources into Spark Dataframes and then creates temporary views to reference these datasets in the transforms section, then sequentially executes the SQL statements in the list of transforms. Lastly the script writes out the final view or views to the desired destination – in this case parquet files stored in S3 were used as the target.

You could implement an object naming convention such as prefixing object names with sv_, iv_, fv_ (for source view, intermediate view and final view respectively) if this helps you differentiate between the different objects.

To use this framework you would simply use spark-submit as follows:

spark-submit process_sql_statements.py config.yml

Full source code can be found at: https://github.com/avensolutions/spark-sql-etl-framework