Skip to main content

· 4 min read
Jeffrey Aven

Structured Streaming in Spark provides a powerful framework for stream processing an analysis, such as streaming transformations, stateful streaming or sliding window operations.

Kafka is a common streaming source and sink for Spark Streaming and Structured Streaming operations. However, there may be situations where a data warehouse (such as Snowflake) is a more appropriate target for streaming operations, especially where there is a reporting or long-term storage requirement on the data derived from the streaming source.

This article will demonstrate just how easy this is to implement using Python.

Design

The following diagram illustrates the ingestion design for this example:

Spark Structured Streaming using Kafka and Snowflake

Snowflake Setup

Some prerequisites for Snowflake:

  1. You will need to create a user (or use an existing user), in either case the user will need to be identified by a private key. You will need to generate a key pair as follows:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

copy the contents of the rsa_key.pub file, remove the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- strings, then remove the line breaks to form one string, use this string as the RSA_PUBLIC_KEY in a CREATE USER or ALTER USER statement in Snowflake, like:

ALTER USER youruser SET RSA_PUBLIC_KEY='MIIBI...';
  1. Now setup the target database, schema and table you will use to write out your stream data (the schema for the table must match the schema for the Data Stream you will use the DataStreamWriter to emit records to Snowflake

The user you will be using (that you setup the key pair authentication for) will need to be assigned a default role to which the appropriate write permissions are granted to the target objects in Snowflake. You will also need to designate a virtual warehouse (which your user must have USAGE permissions to.

The Code

Now that we have the objects and user setup in Snowflake, we can construct our Spark application.

First, you will need to start your Spark session (either using pyspark or spark-submit) including the packages that Spark will need to connect to Kafka and to Snowflake.

The Snowflake packages include a JDBC driver and the Snowflake Connector for Spark, see Snowflake Connector for Spark.

An example is shown here (package versions may vary depending upon the version of Spark you are using):

pyspark \
--packages \
net.snowflake:snowflake-jdbc:3.13.14,\
net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1

Now that we have a spark session with the necessary packages, lets go...

# import any required functions, set the checkpoint directory, and log level (optional)
from pyspark.sql.functions import split
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.streaming.checkpointLocation", "file:///tmp")

setup connection options for Snowflake by creating an sfOptions dictionary

sfOptions = {
"sfURL" : sfUrl,
"sfUser" : "avensolutions",
"pem_private_key": private_key,
"sfDatabase" : "SPARK_SNOWFLAKE_DEMO",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "COMPUTE_WH",
"streaming_stage" : "mystage"
}

set a variable for the Snowflake Spark connector

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

read messages from Kafka:

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkabroker:9092") \
.option("subscribe", "weblogs") \
.load()

perform necessary transformations (the fields and data types in the resultant data structure must match the target table you created in Snowflake:

log_recs = lines.select(
split(lines.value.cast("string"), " ").alias("data")
)

log_data = log_recs.selectExpr(
"CAST(data[0] as string) as date",
"CAST(data[1] as string) as time",
"CAST(data[2] as string) as c_ip",
"CAST(data[3] as string) as cs_username",
"CAST(data[4] as string) as s_sitename",
"CAST(data[5] as string) as s_computername",
"CAST(data[6] as string) as s_ip",
"CAST(data[7] as int) as s_port",
"CAST(data[8] as string) as cs_method",
"CAST(data[9] as string) as cs_uri_stem",
"CAST(data[10] as string) as cs_uri_query",
"CAST(data[11] as int) as sc_status",
"CAST(data[12] as int) as time_taken",
"CAST(data[13] as string) as cs_version",
"CAST(data[14] as string) as cs_host",
"CAST(data[15] as string) as User_Agent",
"CAST(data[16] as string) as Referer",
)

write to Snowflake!

query = log_data\
.writeStream\
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "WEB_LOGS") \
.trigger(processingTime='30 seconds') \
.start()

query.awaitTermination()
info

Note that I have included the processingTime trigger of 30 seconds (this is akin to the batchInterval in the DStream API), you should tune this to get a balance between batch sizes to ingest into Snowflake (which will benefit from larger batches) and latency.

The Results

Spark Structured Streaming into Snowflake

Enjoy!

· 3 min read
Jeffrey Aven

This article demonstrates a simple command line utility to login to an authorization server (Okta in this case) using a PKCE (Proof Key for Code Exchange) flow. This is the preferred flow for public clients (such as Single Page Applications).

The code for this article is available on GitHub

Example

Okta PKCE cli login example

Overview

This application can be used to illustrate the authorization/authentication flow discussed in Simple SSO with an external IdP using Active Directory and Okta. A flow which is pictured here:

PKCE Authorization t Okta using an AD IdP

Steps

The steps involved in the implementation of a PKCE login flow are as follows:

Generate a code_challenge

To implement a PKCE flow, you first need to generate a Code Verifier (which is a random value you create), the Code Verifier is then hashed using a SHA256 algorithm. The hash is then used as the Code Challenge. An example function to generate a code challenge is shown below:

For more information see Use PKCE to Make Your Apps More Secure.

Build the authorize url

The authorize url is used to initiate the authorization flow with the authorization server. An example function to construct the authorize url is shown below:

Get the authorization code via redirect uri

The redirecturi parameter supplied in the authorize url is used to retrieve the authorization code from the authorization server. In order to get this code using a front end flow, you need to define a handler that will get the authorization code, call the token endpoint, and close the HTTP server, as shown here:

Exchange the code for an access token

The access token is what you ultimatly want, as this is the token that will be used to access protected resources. An example function to exchange the authorization code for an access token is shown below:

(Optional) Get the user profile

The access token can be used to get the user profile, this is done by calling the userinfo endpoint using the token. An example function to get the user profile is shown below:

with inspiration from...

· 7 min read
Chris Ottinger

Prefect.io is a python based Data Engineering toolbox for building and operating Data Pipelines. Out of the box, Prefect provides an initial workflow for managing data pipelines that results in a container image per data pipeline job.

The one-to-one relationship between data pipeline jobs and container images enables data engineers to craft pipelines that are loosely coupled and don't require a shared runtime environment configuration. However, as the number of data pipeline jobs grow the default container per job approach starts to introduce workflow bottlenecks and lifecycle management overheads. For example, in order to update software components used by flows, such as upgrading the version of Prefect, all the data pipeline job images have to be rebuilt and redeployed. Additionally the container image per job workflow introduces a wait time for data engineers to re-build data pipeline container images and test flows centrally on Prefect Server or Prefect Cloud environment.

Fortunately, Prefect comes to its own rescue with the ability to open up the box, exposing the flexibility in the underlying framework.

Out of the box - Prefect DockerStorage

Out of the box, Prefect provides a simple workflow for defining and deploying data pipelines as container images. After getting a first data pipeline running in a local environment, the attention turns to scaling up development and deploying flows into a managed environment, using either the Prefect Cloud service or a Prefect Server.

Combining Prefect Cloud or Prefect Server with Kubernetes provides a flexible and scalable platform solution for moving data pipelines into production. There are a number of options for packaging data pipeline flow code for execution on kubernetes clusters. The Docker Storage option provides the workflow for bundling the data pipeline job code into container images, enabling a common controlled execution environment and well understood distribution mechanism. The data pipeline runs as a pod using the flow container image.

Prefect Docker Storage workflow steps for building and deploying data pipeline flows include:

Workflow Steps

  • packaging a flow (python code) as a serialised/pickled object into a container image
  • registering the flow using the container image name
  • pushing the container image to a container repository accessible from the kubernetes cluster
  • running the flow by running an instance of the named container image as a kubernetes pod

This is relatively simple immutable workflow. Each data pipeline flow version is effectively a unique and self contained 'point-in-time' container image. This initial workflow can also be extended to package multiple related flows into a single container image, reducing the number of resulting container images. But, as the number of data pipeline jobs grow, there issues of container image explosion and data engineering productivity remain.

Using Prefect GitStorage for flows addresses both container image proliferation as well as development bottlenecks.

Prefect Git Storage

Prefect Git Storage provides a workflow for developing and deploying data pipelines directly from git repositories, such as Gitlab or Github. The data pipeline code (python) is pulled from the git repository on each invocation with the ability to reference git branches and git tags. This approach enables:

  • reducing the number of container images to the number of different runtime configurations to be supported.
  • improving the data engineering development cycle time by removing the need to build and push container images on each code change.
  • when combined with kubernetes Prefect Run Configs and Job templates, enables selection of specific runtime environment images

Note that the GitStorage option does required access from the runtime kubernetes cluster to the central git storage service, e.g. gitlab, github, etc.

Prefect Git Storage workflow steps for 'building' and deploying data pipeline flows include:

Workflow Steps

  • pushing the committed code to the central git service
  • registering the flow using the git repository url and branch or tag reference
  • running the flow by pulling the reference code from the git service in a kubernetes pod

The container image build and push steps are removed from the developer feedback cycle time. Depending on network bandwidth and image build times, this can save remove 5 to 10 minutes from each deployment iteration.

Pushing the flow code

Once a set of changes to the data pipeline code has been committed, push to the central git service.

$ git commit
$ git push

Registering the flow

The flow can be registered with Prefect using either a branch (HEAD or latest) or tag reference. Assuming a workflow with feature branches:

  • feature branches: register the flow code using the feature branch. This enables the latest version (HEAD) of the pushed flow code to be used for execution. It also enables skipping re-registration of the flow on new changes as the HEAD of the branch is pulled on each flow run
  • main line branches: register pinned versions of the flow using git tags. This enables the use of a specific version of the flow code to be pulled on each flow run, regardless of future changes.

Determining the which reference to use:

# using gitpython module to work with repo info
from git import Repo

# presidence for identifing where to find the flow code
# BUILD_TAG => GIT_BRANCH => active_branch
build_tag = branch_name = None
build_tag = os.getenv("BUILD_TAG", "")
if build_tag == "":
branch_name = os.getenv("GIT_BRANCH", "")
if branch_name == "":
branch_name = str(Repo(os.getcwd()).active_branch)

Configuring Prefect Git storage:

from prefect.storage import Git
import my_flows.hello_flow as flow # assuming flow is defined in ./my_flows/flow.py

# example using Gitlab
# either branch_name or tag must be empty string "" or None
storage = Git(
repo_host=git_hostname,
repo=repo_path,
flow_path=f"{flow.__name__.replace('.','/')}.py",
flow_name=flow.flow.name,
branch_name=branch_name,
tag=build_tag,
git_token_secret_name=git_token_secret_name,
git_token_username=git_token_username
)

storage.add_flow(flow.flow)
flow.flow.storage = storage

flow.flow.regsiter(build=False)

Once registered, the flow storage details can be viewed in the Prefect Server or Prefect Cloud UI. In this example, Prefect will use the HEAD version of the main branch on each flow run.

hello flow storage details

Next Steps - Run Config

With Prefect Git Storage the runtime configuration and environment management is decoupled from the data pipeline development workflow. Unlike with Docker Storage, with Git Storage, the runtime execution environment and data pipeline development workflows are defined and managed separately. As an added benefit, the developer feedback loop cycle time is also reduced.

With the data engineering workflow addressed, the next step in scaling out the Prefect solution turns to configuration and lifecycle management of the runtime environment for data pipelines. Prefect Run Configs and Job templates provide the tools retaining the flexibility on container image based runtime environments with improved manageability.

· 6 min read
Jeffrey Aven

When you want the SFTP service without the SFTP Server.

In implementing data platforms with external data providers, it is common to use a managed file transfer platform or an SFTP gateway as an entry point for providers to supply data to your system.

Often in past implementations this would involve deploying a sever (typically a Linux VM) and provisioning and configuring an SFTP service. If you wanted the data sent by clients to be copied to another storage medium (such as S3 or EFS) you would need to roll your own code or subscribe to a marketplace offering to do so.

I recently trialled the AWS Transfer Family SFTP gateway offering from AWS and sharing my adventures here.

Architecture

In this reference architecture, we are deploying an SFTP service which uses a path in an S3 bucket as a user’s home directory. Objects in the bucket are encrypted with a customer managed KMS key. The SFTP server front end address is mapped to a vanity URL using Route53. The bucket and path are integrated with a STORAGE INTEGRATION, STAGE and PIPE definition in Snowflake. The Snowflake bits are covered in more detail in this blog: Automating Snowflake Role Based Storage Integration for AWS. This article just details the AWS Transfer Family SFTP setup.

AWS Transfer SFTP Architecture

Setup

The steps to set up this pattern are detailed below.

info

This example uses the Jsonnet/CloudFormation pattern described in this article: Simplifying Large CloudFormation Templates using Jsonnet. This is a useful pattern for breaking up a monolithic CloudFormation template at design time to more manageable resource scoped documents, then pre-processing these in a CI routine (GitLab CI, GitHub Actions, etc) to create a complete template.

Setup the Service

To setup the SFTP transfer service use the AWS::Transfer::Server resource type as shown below:

note

Use the tags shown to display the custom hostname (used as a vanity url) in the Transfer UI in the AWS console.

Create the S3 Bucket

Create a bucket which will be used to store incoming files sent via SFTP.

note

This example logs to a logging bucket, not shown for brevity.

Create a Customer Managed KMS Key

Create a customer managed KMS key which will be used to encrypt data stored in the S3 bucket created in the previous step.

Create an IAM role to access the bucket

Create an IAM role which will be assumed by the AWS Transfer Service to read and write to the S3 staging bucket.

important

You must assign permissions to use the KMS key created previously, failure to do so will result in errors such as:

remote readdir(): Permission denied

User Directory Mappings

An SFTP users home directory is mapped to a path in your S3 bucket. It is recommended to use the LOGICAL HomeDirectoryType. This will prevent SFTP users from:

  • seeing or being able to access other users home directories
  • seeing the bucket name or paths in the bucket above their home directory

There are some trade offs for this which can make deployment a little more challenging but we will cover off the steps from here.

Create a Scoped Down Policy

A "scoped down" policy prevents users from seeing or accessing objects in other users home directories. This is a text file that will be sourced as a string into the Policy parameter of each SFTP user you create.

important

Using the LOGICAL HomeDirectoryType you don't have access to variables which represent the bucket, so this needs to be hard coded in the policy.txt document.

Also if you are using a customer managed KMS key to encrypt the data in the bucket (which you should be), you need to add permissions to the key - which again cannot be represented by a variable.

Failure to do so will result in errors when trying to ls, put, etc into the user's home directory such as:

Couldn't read directory: Permission denied
Couldn't close file: Permission denied

Since these properties are unlikely to change for the lifetime of your service this should not be an issue.

Create a user

Users are identified by a username and an SSH key, providing the public key to the server. A sample user is shown here:

tip

As discussed previously, it is recommended to use LOGICAL home directory mappings, which prevents users from seeing information about the bucket or other directories on the SFTP server (including other users directories).

Create a Route 53 CNAME record

Ideally you want to use a vanity url for users to access your SFTP service, such as sftp.yourcompany.com. This can be accomplished by using a Route 53 CNAME record as shown here:

Create some shared Tags

You would have noticed a shared Tags definition in many of the libsonnet files shown, an example Tags source file is shown here:

Pull it all together!

Now that we have all of the input files, lets pull them all together in a jsonnet file, which will be preprocessed in a CI process to create a template we can deploy with AWS CloudFormation.

Your customers would now connect to your service using they private key which corresponds to the public key they supplied to you in one of the previous steps, for example:

sftp -i mysftpkey jeffrey_aven@sftp.yourdomain.com

Add more users and enjoy!