Skip to main content

· 6 min read
Chris Ottinger

The out-of-the-box dbt snapshots provide change data capture (CDC) capability for tracking the changes to data in your data lake or data warehouse. The dbt snapshot metadata columns enable a view of change to data - which records have been updated and when. However, the dbt snapshot metadata doesn't provide a view of the processing audit - which process or job was responsible for the changes. The ability to audit at the processing level requires additional operational metadata.

The out-of-the-box dbt snapshot strategies (rules for detecting changes) likely provide the desired logic for detecting and managing data change. No change to the snapshot strategies or snapshot pipeline processing is desired, but additional operational metadata fields must be set and carried through with the data.

note

The full source code for this article is available at github.com/datwiz/dbt-snapshot-metadata.

Objectives

Both operational and governance requirements can drive the need for greater fidelity of operational metadata. Example considerations could include:

  • use of the out-of-the-box dbt snapshot logic and strategies for Change Data Capture (CDC)
  • addition of operational metadata fields to snapshot tables with processing details for ops support and audit
    • when new records are inserted, add operational processing metadata information to each record
    • when existing records are closed or end-dated, update operational metadata fields with processing metadata

standard snapshot table

enhanced snapshot table

Aside from including a new process_id value in records, these enhancements don't add further information to the table. Instead they are a materialization of the operational data that is easier to access. The same information could be derived from standard dbt metadata fields but would require a more complex SQL statement that includes a left outer self-join. As with any materialization decision, there is a trade-off between ease of access vs. additional storage requirements.

NULL vs High-End Date/Timestamp

In addition to the ops support and audit requirements, there can also be a legacy migration complication related to how open records (the most current version of the record) are represented in snapshots. dbt snapshots represent open records using NULL values for dbt_valid_to fields. In legacy data lakes or data warehouses, the open records often are identified using a well-known high value for the effective end date/timestamp, such as 9999-12-31 or 9999-12-31 23:59:59. Adding additional snapshot metadata columns enables a legacy view of record changes without having to alter the dbt snapshot strategy or processing logic.

tip

Transitioning to NULL values for the valid_to end date/timestamp value for open records is highly recommended, especially when porting to a new database platform or cloud-based service. On-premise legacy database platforms often use TIMESTAMP values without including timezones or timezone offsets, relying on a system-wide default timezone setting. Different databases may also have extra millisecond precision for TIMESTAMP columns. Precision and timezone treatment can cause unexpected issues when migrating to a new database platform.

For example, in BigQuery

datetime('9999-12-31 23:59:59.999999', 'Australia/Melbourne')

will generate an invalid value error, while

timestamp('9999-12-31 23:59:59.999999', 'Australia/Melbourne')

will silently convert the localised timestamp to UTC 9999-12-31 23:59:59.999999+00

The use of NULL values for open records/valid_to fields avoids this risk of subtle breakage.

Enhancing the default Snapshot

Modify the default dbt snapshot behavior by overriding the dbt snapshot materialization macros. dbt enbles macros to be overridden using the following resolution or search order:

  1. locally defined macros in the project's ./macros directory
  2. macros defined in additional dbt packages included in the project packages.yml file
  3. dbt adaptor-specific macros
  4. dbt provided default macros

To inject additional snapshot metadata fields into snapshot tables override the following two default macros:

  • default__build_snapshot_table() creates the snapshot table on the first run
  • default__snapshot_staging_table() stages in the inserts and updates to be applied to the snapshot table

To update fields on snapshot update, override the following default macro:

  • default__snapshot_merge_sql() performs the MERGE/UPSERT

Note that if the dbt database adaptor implements adaptor-specific versions of these macros, then update the adaptor-specific macro accordingly. For example the dbt-spark adaptor overrides the dbt default__snapshot_merge_sql() as spark__snapshot_merge_sql().

build_snapshot_table()

The default__build_snapshot_table() macro is called on the first dbt snapshot invocation. This macro defines the content to include in the CREATE TABLE statement. The following example adds process id's using the dbt invocation_id and additional timestamp fields, including use of the well-known high timestamp value for open records. This value is defined as the variable default_high_dttm in the dbt_project.yml file. The dbt snapshot strategy processing uses the unmodified standard dbt columns, so modification to change detection logic is not required.

snapshot_staging_table()

The default__snapshot_staging_table() macro is called on subsequent dbt snapshot invocations. This macro defines the content in the MERGE statement for inserts and updates. The following example adds the additional operational metadata fields to the insertions common table expression (CTE) and the updates (CTE). The dbt invocation_id is used again as the process_id for inserts on new records and updates that close existing records.

Note that the deletes CTE has not been updated with the additional fields. In scenarios that use the hard deletes feature, the deletes CTE would need to be modified as well.

snapshot_merge_sql()

The default__snapshot_merge_sql() macro is called to perform the MERGE/UPSERT into the target snapshot table. This macro defines how fields in the records being closed should be updated. The update set section of the MERGE statement defines the updated columns and values.

Conclusion

Overriding the default dbt snapshot macros enables the injection and updating of additional operational metadata in snapshot tables. Fields can be added such that the provided dbt logic and snapshot strategy processing is still applied. Still, the resulting snapshot tables contain the columns required for the data lake or data warehouse.

The sample dbt project in datwiz/dbt-snapshot-metadata/tree/main/dbt_snapshot_ops_metadata contains an implementation of the snapshot customization.

· 5 min read
Jeffrey Aven

Yoast is a well-known SEO plugin for WordPress which automagically generates structured data for every page (amongst other things). This helps render rich results for search as well as improve general on-site SEO.

We use Docusaurus, a React-based Static Site Generator from Facebook, for all of our docs and blog properties. Docusaurus does have some native structured data capabilities through Microdata. We were after:

  • Structured data implemented using JSON-LD - which is the recommended approach by Google; and
  • Support multi-level structured data (like Yoast does), including Organization, WebSite, WebPage, Article, and Breadcrumb level data

The solution was to create a Docusaurus plugin, docusaurus-plugin-structured-data.

info

Google allows you to combine structured data in Microdata format with data in JSON-LD format. You can see the union of the two markup approaches using the Rich Results Test.

How it works

Organization and Website level structured data are defined in the plugin configurations (see Configuration). WebPage, Article and Breadcrumb level data are derived for each page from metadata sourced from the postBuild lifecycle api and then injected into the <head> of each page using JSON-LD format as follows:

<head>
...
<script type="application/ld+json">
{"@context":"https://schema.org","@graph":[...]}
</script>
...
</head>

Docusaurus allows you to create hierarchical document structures using categories and folders defined at build time; although this is useful for organization and context, to search engines, it can appear too complex (with leaf-level documents seemingly multiple clicks from the home page). In actuality, this is not the case, as the sidebar in Docusuarus makes any page one click away from the home page.

As a solution (to keep the hierarchy), the plugin takes each level in the route, maps it to a friendly term or token (using the breadCrumbLabelMap configuration property), and creates a concatenated string, so for a route such as:

/docs/language-spec/functions/aggregate/group_concat

The resultant Breadcrumb structured data looks like this...

    {
"@type": "BreadcrumbList",
"@id": "https://stackql.io/docs/language-spec/functions/aggregate/group_concat/#breadcrumb",
"itemListElement": [
{
"@type": "ListItem",
"position": 1,
"item": "https://stackql.io",
"name": "Home"
},
{
"@type": "ListItem",
"position": 2,
"item": "https://stackql.io/docs",
"name": "Documentation"
},
{
"@type": "ListItem",
"position": 3,
"name": "Language Specification - Functions - Aggregate - GROUP_CONCAT"
}
]
},

Blog Posts

The docusaurus-plugin-structured-data plugin detects blog posts and injects Article structured data accordingly, including the following properties:

  • author
  • headline
  • datePublished
  • dateModified
  • wordCount
  • keywords
  • and more...

An example is shown here:

    {
"@type": "Article",
"@id": "https://stackql.io/blog/sumologic-provider-for-stackql-now-available/#article",
"isPartOf": {
"@type": "WebPage",
"@id": "https://stackql.io/blog/sumologic-provider-for-stackql-now-available/#webpage"
},
"author": {
"name": "Jeffrey Aven",
"@id": "https://stackql.io/#/schema/person/1"
},
"headline": "Sumologic Provider for StackQL Now Available",
"datePublished": "2023-01-03T00:00:00.000Z",
"dateModified": "2023-01-03T00:00:00.000Z",
"mainEntityOfPage": {
"@id": "https://stackql.io/blog/sumologic-provider-for-stackql-now-available/#webpage"
},
"wordCount": 201,
"publisher": {
"@id": "https://stackql.io/#organization"
},
"image": {
"@id": "https://stackql.io/blog/sumologic-provider-for-stackql-now-available/#primaryimage"
},
"thumbnailUrl": "https://stackql.io/img/blog/stackql-sumologic-provider-featured-image.png",
"keywords": ["stackql", "sumologic", "multicloud", "monitoring", "logging"],
"articleSection": ["Blog"],
"inLanguage": "en-US"
}

Installation

The docusaurus-plugin-structured-data is available via NPMJS at @stackql/docusaurus-plugin-structured-data.

To install via NPM use:

npm i @stackql/docusaurus-plugin-structured-data

To install using Yarn use:

yarn add @stackql/docusaurus-plugin-structured-data

Configuration

Add the docusaurus-plugin-structured-data plugin to plugins section in docusaurus.config.js:

{
plugins: [
'@stackql/docusaurus-plugin-structured-data',
...
]
}

Update themeConfig in the docusaurus.config.js file, the following shows mandatory properties:

{
...,
themeConfig: {
structuredData: {
excludedRoutes: [], // array of routes to exclude from structured data generation, include custom redirects here
verbose: boolean, // print verbose output to console (default: false)
featuredImageDimensions: {
width: 1200,
height: 630,
},
authors:{
author_name: {
authorId: string, // unique id for the author - used as an identifier in structured data
url: string, // MUST be the same as the `url` property in the `authors.yml` file in the `blog` directory
imageUrl: string, // gravatar url
sameAs: [] // synonymous entity links, e.g. github, linkedin, twitter, etc.
},
},
organization: {}, // Organization properties can be added to this object
website: {}, // WebSite properties can be added to this object
webpage: {
datePublished: string, // default is the current date
inLanguage: string, // default: en-US
},
breadcrumbLabelMap: {} // used to map the breadcrumb labels to a custom value
}
},
...
}

Resultant Structured Data Example

Below is an example of the data created and injected into the <head> of each page in the generated site (this is formatted for readability - the actual structured data generated is minified for performance).

Docusaurus Structured Data Example
<script type="application/ld+json">
{
"@context": "https://schema.org",
"@graph": [
{
"@type": "WebPage",
"isPartOf": {
"@id": "https://stackql.io/#website"
},
"inLanguage": "en-US",
"datePublished": "2021-07-01",
"@id": "https://stackql.io/docs/language-spec/functions/json/json_extract/#webpage",
"url": "https://stackql.io/docs/language-spec/functions/json/json_extract",
"name": "JSON_EXTRACT",
"description": "Query and Deploy Cloud Infrastructure and Resources using SQL",
"dateModified": "2023-01-23T23:56:08.545Z",
"breadcrumb": {
"@id": "https://stackql.io/docs/language-spec/functions/json/json_extract/#breadcrumb"
},
"potentialAction": [
{
"@type": "ReadAction",
"target": [
"https://stackql.io/docs/language-spec/functions/json/json_extract"
]
}
]
},
{
"@type": "BreadcrumbList",
"@id": "https://stackql.io/docs/language-spec/functions/json/json_extract/#breadcrumb",
"itemListElement": [
{
"@type": "ListItem",
"position": 1,
"item": "https://stackql.io",
"name": "Home"
},
{
"@type": "ListItem",
"position": 2,
"item": "https://stackql.io/docs",
"name": "Documentation"
},
{
"@type": "ListItem",
"position": 3,
"name": "Language Specification - Functions - JSON - JSON_EXTRACT"
}
]
},
{
"@type": "WebSite",
"@id": "https://stackql.io/#website",
"name": "StackQL",
"url": "https://stackql.io",
"description": "Provision and Query Cloud and SaaS Resources using SQL",
"publisher": {
"@id": "https://stackql.io/#organization"
},
"potentialAction": [
{
"@type": "SearchAction",
"target": {
"@type": "EntryPoint",
"urlTemplate": "https://stackql.io/search?q={searchTerms}"
},
"query-input": "required name=searchTerms"
}
],
"inLanguage": "en-US"
},
{
"@type": "Organization",
"@id": "https://stackql.io/#organization",
"name": "StackQL",
"url": "https://stackql.io",
"sameAs": [
"https://twitter.com/stackql",
"https://www.linkedin.com/company/stackql",
"https://github.com/stackql",
"https://www.youtube.com/@stackql",
"https://hub.docker.com/u/stackql"
],
"contactPoint": {
"@type": "ContactPoint",
"email": "info@stackql.io"
},
"logo": {
"@type": "ImageObject",
"inLanguage": "en-US",
"@id": "https://stackql.io/#logo",
"url": "https://stackql.io/img/stackql-cover.png",
"contentUrl": "https://stackql.io/img/stackql-cover.png",
"width": 1440,
"height": 900,
"caption": "StackQL - your cloud using SQL"
},
"image": {
"@id": "https://stackql.io/#logo"
},
"address": {
"@type": "PostalAddress",
"addressCountry": "AU",
"postalCode": "3001",
"streetAddress": "Level 24, 570 Bourke Street, Melbourne, Victoria"
},
"duns": "750469226",
"taxID": "ABN 65 656 147 054"
}
]
}
</script>

Testing

Once you have built and deployed your site (using yarn build), you can use the Schema Validator Tool or the Google Rich Results Tool to inspect urls from your site.

Pull requests or issues are welcome. Please feel free to contribute. Thanks!

· 2 min read
Jeffrey Aven

With our StackQL Provider Registry, we had an interesting challenge:

  1. Maintain different versions for one or more different documents in the same repo(which were decoupled from releases)
  2. Provide dynamic versioning (with no user input required and not dictated by tags)
  3. Maintain some traceability to the source repo (pull requests, commit shas, etc)

SemVer required users to make arbitrary decisions on major, minor, and build numbers.

Although CalVer required less user discretion for the major and minor components, the micro-component was still an arbitrary number. This was not ideal for our use case.

As our document versioning was not related to tags, and we have implemented GitFlow (specifically based upon PRs to dev or main) as our release path, we created a new variant scheme... GitVer.

This is completely different from GitVersion, which is a tool to determine the version of a project based on Git history.

This scheme is implemented using GitHub as the remote but could easily be adapted to GitLab, Bitbucket, etc.

How it works

Each pull request is assigned a version based on the date the PR was raised or merged, and the PR number. This version (the GitVer) can then be used to version artifacts (which could be pushed to releases if desired).

Workflow Example Code

This is an example using GitHub actions. The version is determined automatically within the workflow.

main.yml example:

The code used to get the relevant PR info is here (setup-job.js), the tricky bit is that the PR number presents differently for a pull request open or sync (pushing changes to an open PR) and a merge commit (which is simply a push to a protetcted branch). See the code below:

tip

you can export some other metadata while you are here like the commit sha, source and target branch, (PR) action, etc.

The code to generate the GitVer for the PR is here (get-version.js):

You can see it at work here stackql/stackql-provider-registry which builds and deploys providers for StackQL.

Thoughts?

· One min read
Jeffrey Aven

In the stackql project we needed an API to serve configuration file packages (stackql providers) to the stackql application at runtime.

Traditional artifact repositories or package managers were unsuitable as they were mainly designed for container images, JavaScript modules, Python packages etc. The artifacts, in this case, are signed tarball sets of OpenAPI specification documents (text files).

We have recently moved our provider registry (stackql-provider-registry) to use Deno Deploy as the serving layer (the API).

The code

The code is reasonably straightforward as shown here:

The deployment

We are using GitHub Actions to push assets and code to Deno Deploy, this was straightforward as well as you can see here:

· 5 min read
Jeffrey Aven

Apache Beam is an open-source project which provides a unified programming model for Batch and Streaming data pipelines.

B(atch) + str(EAM) => BEAM

Beam SDK and Execution Framework

Beam SDKs allow you to define Pipelines (in languages such as Java or Python). A pipeline is essentially a graph (a DAG - Directed Acyclic Graph) of nodes that represent transformation steps.

Apache Beam Pipeline

Pipelines can then be executed on a backend service (such as your local machine, Apache Spark, or Google Cloud Dataflow) using Runners. For instance, to run a pipeline locally, you would use the DirectRunner; to run a pipeline on Google Cloud Dataflow you would use the DataflowRunner runner.

Beam Programming Model

The PCollection is the most atomic data unit in the Beam programming model, akin to the RDD in the Apache Spark core API; it is a representation of an immutable collection of items that is physically broken down into bundles (subsets of elements for parallelization).

PCollections can be bounded (which is a batch processing pattern) or unbounded (which is a stream processing pattern).

A PTransform is an operator that takes a PCollection as an input and outputs a new PCollection with transforms applied. This is the same coarse-grained transformation pattern employed by Spark.

Beam DSL

The Beam DSL is a set of higher-order functions that can be used to construct pipelines. These functions are used to construct the graph of nodes that represent the pipeline.

Map, FlatMap and Filter

The basic Map, FlatMap and Filter functions in the Beam API work similarly to their namesakes in the Spark Core API. The Map and FlatMap functions are higher-order functions (that is, functions that have arguments of other functions) that operate on each element in a collection, emitting an output element for each input element. The Filter function can be used to only emit elements from an input PCollection that satisfy a given expression.

ParDo and DoFn

ParDo is a wrapper function for parallel execution of a user-defined function called a DoFn ("do function"), ParDo's and DoFn's are used when the basic Map and FlatMap operators are not enough. DoFns are executed in parallel on a PCollection and can be used for computational transformations or transformations other than 1:1 between inputs and outputs.

Think of these as user-defined functions to operate on a PCollection in parallel.

GroupByKey, CombineByKey

GroupByKey and CombineByKey are operators that group data (key-value pairs) by the key for each element. This is typically a precursor to some aggregate operation (such as a count or sum operation).

CoGroupByKey and Flatten

CoGroupByKey is akin to a JOIN operation in SQL (by the key for each element in two PCollections). Flatten is akin to a UNION in SQL.

Side Inputs

Side Inputs can be used with ParDo and DoFn to provide additional data, which can be used to enrich your output PCollection, or utilized within the logic of your DoFn.

Sources, Sinks and Connectors

Sources represent where data is read into an Apache Beam pipeline; sinks represent destinations where data is written out from pipelines. A Beam pipeline will contain one or more sources and sinks.

Sources can be bounded (for batch processing) or unbounded for stream processing.

Connectors can be source connectors or sink connectors to read from or write to the various sources and targets used in a Beam pipeline. Examples include FileIO and TextIO for working with files or text data, BigQueryIO for reading or writing into BigQuery, PubSubIO for reading and writing messages into Google PubSub, and much more.

Streaming and Unbounded PCollections

Streaming data sources are represented by Unbounded PCollections. Unbounded PCollections support windowing operations using Fixed Windows, Sliding Windows, or Session Windows. Watermarks are used to allow for late-arriving data to be processed within its associated time window, and Triggers can be used to control the processing of windowed batches of data.

Templates

Beam templates enable the reusability of pipelines, converting compile-time pipeline parameters to run-time arguments. Jobs (invocations of pipelines) can be launched from templates.

Templates include classic templates, where the graph for the pipeline is built (compile-time) with the template, flex templates where the pipeline graph is created when the template is launched (runtime).

In addition, Google provides several templates with Cloud Dataflow (Google-provided templates), allowing you to launch routine jobs without writing any code.

Google-provided templates are available for batch, streaming, and utility pipelines, for example:

  • Kafka to BigQuery
  • Pub/Sub Topic to BigQuery
  • Text Files on Cloud Storage to BigQuery
  • Text Files on Cloud Storage to Cloud Spanner
  • Bulk Compress or Decompress Files on Cloud Storage
  • and more

5 minutes is up! I hope you enjoyed this quick introduction to Apache Beam. If you want to learn more, check out the Apache Beam documentation.