Skip to main content

· 2 min read
Jeffrey Aven

I had a scenario where I needed to find values for a key in a complex JavaScript object which could be nested n levels deep.

I found numerous approaches to doing this, most were overly complicated, so I thought I would share the most straightforward, concise process.

the Code

You can do this in a straightforward function implementing the "tail call recursion" pattern to search for a key (key) from the root of an object (obj), excluding any keys in excludeKeys.

This will return a list of values for the given key, searching all levels in all branches of the object.

function getAllValuesForKey(obj, key, excludeKeys=[], values=[]) {
for (let k in obj) {
if (typeof obj[k] === "object") {
getAllValuesForKey(obj[k], key, excludeKeys, values)
} else {
if (k === key){
return values;


In parsing an OpenAPI or Swagger specification, I am looking for all of the schema refs in a successful response body, for example:

'200': '...'

however these refs can present in various different ways depending upon the response type, such as:

$ref: '#/components/responses/actions_runner_labels'


$ref: '#/components/schemas/runner'


- $ref: '#/components/schemas/interaction-limit-response'


type: object
- total_count
- runners
type: integer
type: array
$ref: '#/components/schemas/runner'

To find all of the schema refs without knowing the response type or structure I used the above function as follows (excluding refs for examples):

function getRespSchemaName(op){
for(let respCode in op.responses){
return getAllValuesForKey(op.responses[respCode], "$ref", ['examples']);

You can find this implementation in openapi-doc-util and @stackql/openapi-doc-util.


· 8 min read
Chris Ottinger

Container images provide an ideal software packaging solution for DataOps and python based data pipeline workloads. Containers enable Data Scientists and Data Engineers to incorporate the latest packages and libraries without the issues associated with introducing breaking changes into shared environments. A Data Engineer or Data Scienctist can quickly release new functionality with the best tools available.

Container images provide safer developer environments but as the number of container images used for production workloads grow, a maintenance challenge can emerge. Whether using pip or poetry to manage python packages and dependencies, updating a container definition requires edits to the explicit package versions as well as to the pinned or locked versions of the package dependencies. This process can be error prone without automation and a repeatable CICD workflow.

A workflow pattern based on docker buildkit / moby buildkit multi-stage builds provides an approach that maintains all the build specifications in a single Dockerfile, while build tools like make provide a simple and consistent interface into the container build stages. The data pipeline challenges addresses with a multi-stage build pattern include:

  • automating lifecycle management of the Python packages used by data pipelines
  • integrating smoke testing of container images to weed out compatibility issues early
  • simplifying the developer experience with tools like make that can be used both locally and in CI/CD pipelines

The Dockerfile contains the definitions of the different target build stages and order of execution from one stage to the next. The Makefile wraps the Dockerfile build targets into a standard set of workflow activities, following a similar to $ config && make && make install

The DataOps Container Lifecycle Workflow

A typical dataops/gitops style workflow for maintaining container images includes actions in the local environment to define the required packages and produce the pinned dependency poetry.lock file or requirements.txt packages list containing the full set of pinned dependent packages.

Given and existing project in a remote git repository with a CI/CD pipeline defined, the following workflow would be used to update package versions and dependencies:

Multi-stage build workflow

The image maintainer selects the packages to update or refresh using a local development environment, working from a feature branch. This includes performing an image smoke-test to validate the changes within the container image.

Once refreshed image has been validated, the lock file or full pinned package list is commited back to the repository and pushed to the remote repository. The CI/CD pipeline performs a trial build and conducts smoke testing. On merge into the main branch, the target image is built, re-validated, and pushed to the container image registry.

The multi-stage build pattern can support both defining both the declared packages for an environment as well as the dependent packages, but poetry splits the two into distinct files, a pyproject.toml file containing the declated packages and a poetry.lock file that contains the full set of declared and dependent packages, including pinned versions. pip supports loading packages from different files, but requires a convention for which requirements file contains the declared packages and while contains the full set of pinned package versions produced by pip freeze. The example code repo contains examples using both pip and poetry.

The following example uses poetry in a python:3.8 base image to illustrate managing the dependencies and version pinning of python packages.

Multi-stage Dockerfile

The Dockerfile defines the build stages used for both local refresh and by the CICD pipelines to build the target image.

Dockerfile Stages

The Dockerfile makes use of the docker build arguments feature to pass in whether the build should refresh package versions or build the image from pinned packages.

Build Stage: base-pre-pkg

Any image setup and pre-python package installation steps. For poetry, this includes setting the config option to skip the creation of a virtual environment as the container already provides the required isolation.

FROM python:3.8 as base-pre-pkg

RUN install -d /src && \
pip install --no-cache-dir poetry==1.1.13 && \
poetry config virtualenvs.create false

Build Stage: python-pkg-refresh

The steps to generate a poetry.lock file containing the pinned package versions.

FROM base-pre-pkg as python-pkg-refresh
COPY pyproject.toml poetry.lock /src/
RUN poetry update && \
poetry install

Build Stage: python-pkg-pinned

The steps to install packages using the pinned package versions.

FROM base-pre-pkg as python-pkg-pinned
COPY pyproject.toml poetry.lock /src/
RUN poetry install

Build Stage: base-post-pkg

A consolidation build target that can refer to either the python-pkg-refresh or the python-pkg-pinned stages, depending on the docker build argument and includes any post-package installation steps.

FROM python-pkg-${PYTHON_PKG_VERSIONS} as base-post-pkg

Build Stage: smoke-test

Simple smoke tests and validation commands to validate the built image.

FROM base-post-pkg as smoke-test
COPY tests/ ./tests
RUN poetry --version && \
python ./tests/

Build Stage: target-image

The final build target container image. Listing the target-image as the last stage in the Dockerfile has the effect of also making this the default build target.

FROM base-post-pkg as target-image

Multi-stage Makefile

The Makefile provides a workflow oriented wrapper over the Dockerfile build stage targets. The Makefile targets can be executed both in a local development environment as well as via a CICD pipeline. The Makefile includes several variables that can either be run using default values, or overridden by the CI/CD pipeline.

Makefile targets

Make Target: style-check

Linting and style checking of source code. Can include both application code as well as the Dockerfile itself using tools such as hadolint.

hadolint ./Dockerfile

Make Target: python-pkg-refresh

The python-pkg-refresh target builds a version of the target image with refreshed package versions. A temporary container instance is created from the target image and the poetry.lock file is copied into the local file system. The smoke-test docker build target is used to ensure image validation is also performed. The temporary container as well as the package refresh image are removed after the build.

@echo ">> Update python packages in container image"
docker build ${DOCKER_BUILD_ARGS} \
--target smoke-test \
--build-arg PYTHON_PKG_VERSIONS=refresh \
--tag ${TARGET_IMAGE_NAME}:$@ .
@echo ">> Copy the new poetry.lock file with updated package versions"
docker create --name ${TARGET_IMAGE_NAME}-$@ ${TARGET_IMAGE_NAME}:$@
docker cp ${TARGET_IMAGE_NAME}-$@:/src/poetry.lock .
@echo ">> Clean working container and refresh image"
docker rm ${TARGET_IMAGE_NAME}-$@
docker rmi ${TARGET_IMAGE_NAME}:$@

Make Target: build

The standard build target using pinned python package versions.

docker build ${DOCKER_BUILD_ARGS} \
--target target-image \

Make Target: smoke-test

Builds an image and peforms smoke testing. The smoke-testing image is removed after the build.

docker build ${DOCKER_BUILD_ARGS} \
--target smoke-test \
--tag ${TARGET_IMAGE_NAME}:$@ .
docker rmi ${TARGET_IMAGE_NAME}:$@


The toolchain combination of multi-stage container image builds with make provides a codified method for the lifecycle management of the containers used in data science and data engineering workloads.

The maintainer:

git checkout -b my-refresh-feature
make python-pkg-refresh
make smoke-test
git add pyproject.toml poetry.lock
git commit -m "python package versions updated"
git push

The CICD pipeline:

make build
make smoke-test
docker push <target-image>:<build-tag>

You can find the complete source code for this article at

· 3 min read
Yuncheng Yang

This article demonstrates how to use the Snowflake REST API to retrieve data for a web application using TypeScript, in this case we are using keypair authentication with Snowflake.


Snowflake’s SQL API allows you to access snowflake objects using SQL via a REST API request, this allows for easy integration with your applications and deployment pipelines. You can use the API to execute most DDL and DML statements.

There are some limitations you need to be aware of however, for example interactions with stages (using PUT and GET aren’t supported via the Snowflake API) or stored procedure operations (using CALL), you can read more on this here.


There are three endpoints provided:

  • /api/v2/statements/
  • /api/v2/statement/<statementHandle>
  • /api/v2/statements/<statementHandle/cancel

We will be looking at the first two in this article.

Authentication Methods

There are two types of Authentication methods for the API, OAuth and Key Pair. For OAuth method, you can choose to use X-Snowflake-Authorization-Token-Type header, if this header is not present, Snowflake assumes that the token in the Authorization header is an OAuth token. For Key Pair method, the JWT token will be in the Authorization header as Bearer <your token>.

Let’s walk through how to generate and use the JWT.

Generating the JWT

Here's whats needed:

Snowflake JWT

the Code

Request Body

Now we need a request body:

Submitting the Request

We will need to include the region and account identifier, for instance if your account identifier includes a region (e.g.

Response Handling

When making a SELECT query, there are three things worth noting:

  1. rowType fields in the resultSetMetaData represent the columns
  2. data without column names is in the format of string[][]
  3. partitionInfo is an array of object representing different partitions

For more information see Handling Responses from the SQL API - Snowflake Documentation.

Parsing data

Here is a Typescript code snippet demonstrating parsing return data:

Handling multiple partitions

Large result sets are paginated into partitions, each partition is a set of rows.


Note that the pages (referred to as partitions) are NOT based on row count, instead they are based on the compressed batch size, so they will not be uniform in terms of the number of rows.

To get a partition, send a GET request with Url https://<accountIdentifier><partitionId>.


· 2 min read
Jeffrey Aven

Open API specifications can get quite large, especially for providers with upwards of 500 routes or operations.

The challenge is to create standalone documents scoped by a service or path within the parent API specification and include only the components (schemas, responses, etc.) that pertain to operations included in the child document.

When I went looking for library or utility to do this, I couldn’t find one... so I have developed one myself.

It's a simple command (nodejs based but can be run in a bash terminal or from the Windows command line) which requires a few options, including:

  • the provider name (e.g. github)
  • a provider version which is a version you set - allowing you to make minor modifications to the output documents (e.g. v0.1.0)
  • a service discriminator which is a JSONPath expression to identify a service name within each route in the parent file, this is used to assign operations to services in separate documents (e.g. '$["x-github"].category')
  • an output directory (e.g. ./dev)

and of course, the openapi spec document you are splitting up.

an example is shown here:

openapi-doc-util split \
-n github \
-v v0.1.0 \
-s '$["x-github"].category' \
-o ./dev \

Help for the command is available using openapi-doc-util split.

The net result is 59 self-contained, service scoped documents, containing only the components referenced by routes in the service document.

You can access this utility via NPMJS or via GitHub.

Splitting up a large open API spec document, is the first stage in developing a StackQL provider which we will discuss next time!

· 4 min read
Jeffrey Aven

Structured Streaming in Spark provides a powerful framework for stream processing an analysis, such as streaming transformations, stateful streaming or sliding window operations.

Kafka is a common streaming source and sink for Spark Streaming and Structured Streaming operations. However, there may be situations where a data warehouse (such as Snowflake) is a more appropriate target for streaming operations, especially where there is a reporting or long-term storage requirement on the data derived from the streaming source.

This article will demonstrate just how easy this is to implement using Python.


The following diagram illustrates the ingestion design for this example:

Spark Structured Streaming using Kafka and Snowflake

Snowflake Setup

Some prerequisites for Snowflake:

  1. You will need to create a user (or use an existing user), in either case the user will need to be identified by a private key. You will need to generate a key pair as follows:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out

copy the contents of the file, remove the -----BEGIN PUBLIC KEY----- and -----END PUBLIC KEY----- strings, then remove the line breaks to form one string, use this string as the RSA_PUBLIC_KEY in a CREATE USER or ALTER USER statement in Snowflake, like:

  1. Now setup the target database, schema and table you will use to write out your stream data (the schema for the table must match the schema for the Data Stream you will use the DataStreamWriter to emit records to Snowflake

The user you will be using (that you setup the key pair authentication for) will need to be assigned a default role to which the appropriate write permissions are granted to the target objects in Snowflake. You will also need to designate a virtual warehouse (which your user must have USAGE permissions to.

The Code

Now that we have the objects and user setup in Snowflake, we can construct our Spark application.

First, you will need to start your Spark session (either using pyspark or spark-submit) including the packages that Spark will need to connect to Kafka and to Snowflake.

The Snowflake packages include a JDBC driver and the Snowflake Connector for Spark, see Snowflake Connector for Spark.

An example is shown here (package versions may vary depending upon the version of Spark you are using):

pyspark \
--packages \

Now that we have a spark session with the necessary packages, lets go...

# import any required functions, set the checkpoint directory, and log level (optional)
from pyspark.sql.functions import split
spark.conf.set("spark.sql.streaming.checkpointLocation", "file:///tmp")

setup connection options for Snowflake by creating an sfOptions dictionary

sfOptions = {
"sfURL" : sfUrl,
"sfUser" : "avensolutions",
"pem_private_key": private_key,
"sfDatabase" : "SPARK_SNOWFLAKE_DEMO",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "COMPUTE_WH",
"streaming_stage" : "mystage"

set a variable for the Snowflake Spark connector

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

read messages from Kafka:

lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkabroker:9092") \
.option("subscribe", "weblogs") \

perform necessary transformations (the fields and data types in the resultant data structure must match the target table you created in Snowflake:

log_recs =
split(lines.value.cast("string"), " ").alias("data")

log_data = log_recs.selectExpr(
"CAST(data[0] as string) as date",
"CAST(data[1] as string) as time",
"CAST(data[2] as string) as c_ip",
"CAST(data[3] as string) as cs_username",
"CAST(data[4] as string) as s_sitename",
"CAST(data[5] as string) as s_computername",
"CAST(data[6] as string) as s_ip",
"CAST(data[7] as int) as s_port",
"CAST(data[8] as string) as cs_method",
"CAST(data[9] as string) as cs_uri_stem",
"CAST(data[10] as string) as cs_uri_query",
"CAST(data[11] as int) as sc_status",
"CAST(data[12] as int) as time_taken",
"CAST(data[13] as string) as cs_version",
"CAST(data[14] as string) as cs_host",
"CAST(data[15] as string) as User_Agent",
"CAST(data[16] as string) as Referer",

write to Snowflake!

query = log_data\
.options(**sfOptions) \
.option("dbtable", "WEB_LOGS") \
.trigger(processingTime='30 seconds') \


Note that I have included the processingTime trigger of 30 seconds (this is akin to the batchInterval in the DStream API), you should tune this to get a balance between batch sizes to ingest into Snowflake (which will benefit from larger batches) and latency.

The Results

Spark Structured Streaming into Snowflake