Skip to main content

2 posts tagged with "kafka"

View All Tags

· 4 min read
Jeffrey Aven

Structured Streaming in Spark provides a powerful framework for stream processing an analysis, such as streaming transformations, stateful streaming or sliding window operations.

Kafka is a common streaming source and sink for Spark Streaming and Structured Streaming operations. However, there may be situations where a data warehouse (such as Snowflake) is a more appropriate target for streaming operations, especially where there is a reporting or long-term storage requirement on the data derived from the streaming source.

This article will demonstrate just how easy this is to implement using Python.

Design

The following diagram illustrates the ingestion design for this example:

Spark Structured Streaming using Kafka and Snowflake

Snowflake Setup

Some prerequisites for Snowflake:

  1. You will need to create a user (or use an existing user), in either case the user will need to be identified by a private key. You will need to generate a key pair as follows:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

copy the contents of the rsa_key.pub file, remove the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- strings, then remove the line breaks to form one string, use this string as the RSA_PUBLIC_KEY in a CREATE USER or ALTER USER statement in Snowflake, like:

ALTER USER youruser SET RSA_PUBLIC_KEY='MIIBI...';
  1. Now setup the target database, schema and table you will use to write out your stream data (the schema for the table must match the schema for the Data Stream you will use the DataStreamWriter to emit records to Snowflake

The user you will be using (that you setup the key pair authentication for) will need to be assigned a default role to which the appropriate write permissions are granted to the target objects in Snowflake. You will also need to designate a virtual warehouse (which your user must have USAGE permissions to.

The Code

Now that we have the objects and user setup in Snowflake, we can construct our Spark application.

First, you will need to start your Spark session (either using pyspark or spark-submit) including the packages that Spark will need to connect to Kafka and to Snowflake.

The Snowflake packages include a JDBC driver and the Snowflake Connector for Spark, see Snowflake Connector for Spark.

An example is shown here (package versions may vary depending upon the version of Spark you are using):

pyspark \
--packages \
net.snowflake:snowflake-jdbc:3.13.14,\
net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1

Now that we have a spark session with the necessary packages, lets go...

# import any required functions, set the checkpoint directory, and log level (optional)
from pyspark.sql.functions import split
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.streaming.checkpointLocation", "file:///tmp")

setup connection options for Snowflake by creating an sfOptions dictionary

sfOptions = {
"sfURL" : sfUrl,
"sfUser" : "avensolutions",
"pem_private_key": private_key,
"sfDatabase" : "SPARK_SNOWFLAKE_DEMO",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "COMPUTE_WH",
"streaming_stage" : "mystage"
}

set a variable for the Snowflake Spark connector

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

read messages from Kafka:

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkabroker:9092") \
.option("subscribe", "weblogs") \
.load()

perform necessary transformations (the fields and data types in the resultant data structure must match the target table you created in Snowflake:

log_recs = lines.select(
split(lines.value.cast("string"), " ").alias("data")
)

log_data = log_recs.selectExpr(
"CAST(data[0] as string) as date",
"CAST(data[1] as string) as time",
"CAST(data[2] as string) as c_ip",
"CAST(data[3] as string) as cs_username",
"CAST(data[4] as string) as s_sitename",
"CAST(data[5] as string) as s_computername",
"CAST(data[6] as string) as s_ip",
"CAST(data[7] as int) as s_port",
"CAST(data[8] as string) as cs_method",
"CAST(data[9] as string) as cs_uri_stem",
"CAST(data[10] as string) as cs_uri_query",
"CAST(data[11] as int) as sc_status",
"CAST(data[12] as int) as time_taken",
"CAST(data[13] as string) as cs_version",
"CAST(data[14] as string) as cs_host",
"CAST(data[15] as string) as User_Agent",
"CAST(data[16] as string) as Referer",
)

write to Snowflake!

query = log_data\
.writeStream\
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "WEB_LOGS") \
.trigger(processingTime='30 seconds') \
.start()

query.awaitTermination()
info

Note that I have included the processingTime trigger of 30 seconds (this is akin to the batchInterval in the DStream API), you should tune this to get a balance between batch sizes to ingest into Snowflake (which will benefit from larger batches) and latency.

The Results

Spark Structured Streaming into Snowflake

Enjoy!

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 4 min read
Jeffrey Aven

Kappa Architecture and Data Warehousing re-imagined

Streaming Data Warehouse

The aspiration to extend data analysis (predictive, descriptive or otherwise) to streaming event data has been common across every enterprise scale program I have been involved with. Often, however, this aspiration goes unrealised as it tends to slide down the priority scale as we still grapple with legacy batch oriented integration patterns and processes.

Event processing is not a new concept, real time event and transaction processing has been a standard feature for security, digital and operations functions for some time, however in the Data Warehousing, BI and Advanced Analytics worlds it is often spoken about but rarely implemented, except for tech companies of course. In many cases personalization is still a batch oriented process, e.g. train a model from a feature set built from historical data, generate recommendations in batch, serve these recommendations upon the next visit - wash, rinse, and repeat.

Lambda has existed for several years now as a data-processing architecture pattern designed to incorporate both batch and stream-processing capabilities. Moreover, messaging platforms have existed for decades, from point-to-point messaging systems, to message-oriented-middleware systems, to distributed pub-sub messaging systems such as Apache Kafka.

Additionally, open source streaming data processing frameworks and tools have proliferated in recent years with projects such as Storm, Samza, Flink and Spark Streaming becoming established solutions.

Kafka in particular, with its focus on durability, resiliency, availability and consistency, has graduated into fully fledged data platform not simply a transient messaging system. In many cases Kafka is serving as a back end for operational processes, such as applications implementing the CQRS (Command Query Responsibility Segregation) design pattern.

In other words, it is not the technology that holds us back, it's our lack of imagination.

Enter Kappa Architecture where we no longer have to attempt to integrate streaming data with batch processes…everything is a stream. The ultimate embodiment of Kappa Architecture is the Streaming Data Warehouse.

In the Streaming Data Warehouse, tables are represented by topics. Topics represent either:

  • unbounded event or change streams; or
  • stateful representations of data (such as master, reference or summary data sets).

This approach makes possible the enrichment and/or summarisation of transaction or event data with master or reference data. Furthermore many of the patterns used in data warehousing and master data management are inherent in Kafka as you can represent the current state of an object as well as the complete change history of that object (in other words change data capture and associated slowly changing dimensions from one inbound stream).

Data is acquired from source systems either in real time or as a scheduled extract process, in either case the data is presented to Kafka as a stream.

The Kafka Avro Schema Registry provides a systematic contract with source systems which also serves as a data dictionary for consumers supporting schema evolution with backward and forward compatibility. Data is retained on the Kafka platform for a designated period of time (days or weeks) where it is available for applications and processes to consume - these processes can include data summarisation or sliding window operations for reporting or notification, or data integration or datamart building processes which sink data to other systems - these could include relational or non-relational data stores.

Real time applications can be built using the KStreams API and emerging tools such as KSQL can be used to provide a well-known interface for sampling streaming data or performing windowed processing operations on streams. Structured Streaming in Spark or Spark Streaming in its original RDD/DStream implementation can be used to prepare and enrich data for machine learning operations using Spark ML or Spark MLlib.

In addition, data sinks can operate concurrently to sink datasets to S3 or Google Cloud Storage or both (multi cloud - like real time analytics - is something which is talked about more than it’s implemented…).

In the Streaming Data Warehouse architecture Kafka is much more than a messaging platform it is a distributed data platform, which could easily replace major components of a legacy (or even a modern) data architecture.

It just takes a little imagination…

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!