Skip to main content

5 posts tagged with "bigquery"

View All Tags

· 6 min read
Jeffrey Aven

DBT (or Data Build Tool) is a modern data transformation tool born in the cloud/DevOps era. It's a great project which much has been written about; I will try to give as brief an overview as possible.

ETL vs ELT Refresher

A quick refresher on ELT vs ETL before we discuss DBT. I have created an infographic for this...

ETL vs ELT

Summary

DBT is an open-source command line tool written in Python from DBT Labs (formerly Fishtown Analytics).

DBT is designed to manage data transformations while applying software engineering best practices (including version control, automated testing, reviews, approvals, etc). Its modern software engineering and cloud-first design goals separate it from its old-school ETL/ELT predecessors.

DBT is an ELT tool focusing on the T(ransform) only, the E(xtract) and L(oad) are up to you (there are plenty of tools that specialize in this).

At its core DBT is a templating engine using Jinja (Python templating engine); it generates templates that represent SQL commands to create, replace or update objects in your database (the “T” in ELT), then oversees the execution of the templated commands. The work is "pushed down" to the underlying database containing the source and target objects and data.

Models

The concept most integral to DBT is the Model. A Model is simply a representation of a transform (or set of transforms) to a dataset, resulting in a target object (which could be a table or tables in a datamart). A model is expressed as a SELECT statement stored in a .sql file in your dbt project (well get to that in a minute).

Suppose we want to create a denormalized fact table for commits to store in a datamart in BigQuery. This is what a model file might look like (using the BigQuery SQL dialect and referencing objects that should exist and be accessible at runtime when we execute the model).

{{ config(materialized='table') }}

with commits as (
SELECT
SUBSTR(commit, 0, 13) as commit_short_sha,
committer.name as commiter_name,
committer.date as commit_date,
message,
repo_name
FROM `bigquery-public-data.github_repos.sample_commits` c
)

select * from commits

Models are created as views by default, but you can materialize these as tables where needed.

You configure connectivity to the target database using adapters (software libraries provided by dbt in the case of most mainstream databases) and profiles (which contain details around authentication, databases/datasets, schemas etc).

DBT Project

A DBT project is simply a folder containing your models and some other configuration data. You can initialize this by running dbt init from your desired project directory. In its most basic form, the structure looks like this:

models/
├─ your_model.sql
├─ schema.yml
dbt_project.yml

Your models can be created under subfolders for organization. schema.yml is an optional file that contains tests for columns, can also include descriptions for documentation. The dbt_project.yml file is the main entry point for the dbt program, it contains the configuration for the project, including which profile to use. Profiles (stored in a file called profiles.yml store all of the necessary connectivity information for your target database platform. By default dbt init creates this file a .dbt folder under your home directory.

info

You could store this with your project (be careful not to commit secrets like database credentials to source control). If you store it in any other directory than the default, you will need to tell dbt where it can find this file using the --profiles-dir argument of any dbt command, see here for more information.

To confirm your project is ship shape, run dbt parse; if there are no errors, you are good to proceed running and testing your models.

Running DBT Models

To run your models, simply run the following command from the directory containing your dbt_project.yml file (typically the root of your project folder):

Model deployed! Let's test it:

This will run all of the tests associated with your model(s) - in this case, not null and unique tests defined in the schema.yml file. That's it, deployed and tested.

Other Stuff

There is some other stuff in DBT you should be aware of, like seeds, snapshots, analyses, macros, and more but our five minutes is up 😃. We can discuss these next time; you are up and running with the basics of DBT now, get transforming!

· 7 min read
Jeffrey Aven

Loading Parquet format files into BigQuery is straightforward, you just need to specify the file location (local, Google Cloud Storage, Drive, Amazon S3 or Azure Blob storage) and thats pretty much it, BigQuery works the rest out from there.

bq load \
--location=australia-southeast2 \
--project_id=parquet-demo \
--source_format=PARQUET \
parquet_test.dim_calendar \
.\Calendar.gzip

In Snowflake, however, it is not as simple, I'll share my approach to automating this here.

info

Parquet is a self-describing, column-oriented storage format commonly used in distributed systems for input and output. Data in Parquet files is serialised for optimised consumption from Parquet client libraries and packages such as pandas, pyarrow, fastparquet, dask, and pyspark.

Background

Data in a Parquet file is stored in a single column for a self-contained dataset. If you were to ingest this into Snowflake without knowing the schema you could do something like this...

CREATE OR REPLACE TABLE PARQUET_TEST.PUBLIC.DIM_CALENDAR (
Data variant
);

COPY INTO PARQUET_TEST.PUBLIC.DIM_CALENDAR
(
Data
) FROM (
SELECT
*
FROM
@PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE)
file_format = (TYPE = parquet);

You would end up with something like...

RowData
1{"CalMonthOfYearNo": 6, "CalYear": 2020, ... }
2{"CalMonthOfYearNo": 6, "CalYear": 2020, ... }
......

You could then have a second stage of processing to convert this into a normal relational structure.

Or you could do this in one step, with a little prep work ahead of time. In my scenario I was given several parquet files from a client for a one-off load into Snowflake, several files for a fact table and multiple single files representing different dimension tables.

Streamlined Ingestion for Parquet Files into Snowflake

To collapse the formatting and uploading of Parquet files into a materialized table into one step, we need to do a couple of things:

  1. Create the target table with the correct schema (column names and data types); and
  2. perform a projection in our COPY command from the single column containing all of the data (represented by $1 in Snowflake) into columns defined in step 1

Since this is technically a transformation and only named stages are supported for COPY transformations, we need to create a stage for the copy. In my case there is a pre-existing Storage Integration in place that can be used by the stage.

Generate Table DDL

To automate the generation of the DDL to create the table and stage and the COPY command, I used Python and Spark (which has first class support for Parquet files). Parquet datatypes are largely the same as Snowflake, but if we needed to, we could create a map and modify the target types during the DDL generation.

First copy specimen Parquet formatted files to a local directory, the script we are creating can then iterate through the parquet files and generate all of the commands we will need saved to a .sql file.

With some setup information provided (not shown for brevity), we will first go through each file in the directory, capture metadata along with the schema (column name and data type) as shown here:

for file in files:
tableMap = {}
table = file.stem
spark = launch_spark_session()
parquetFile = spark.read.parquet("%s/%s" %(BASE_DIR, file))
data_types = parquetFile.dtypes
stop_spark_session(spark)
tableMap['name'] = table
tableMap['file'] = file
tableMap['data_types'] = data_types
allTables.append(tableMap)

The allTables list looks something like this...

[{'name': 'Calendar', 'file': PosixPath('data/dim/Calendar.gzip'), 'data_types': [('Time_ID', 'bigint'), ('CalYear', 'bigint'), ('CalMonthOfYearNo', 'bigint'), ('FinYear', 'bigint'), ('FinWeekOfYearNo', 'bigint')]}, ... ]

Next we generate the CREATE TABLE statement using the allTables list:

# create output file for all sql
with open('all_tables.sql', 'w') as f:
for table in allTables:
print("processing %s..." % table['name'])
f.write("/*** Create %s Table***/" % table['name'].upper())
sql = """
CREATE OR REPLACE TABLE %s.%s.%s (
""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s %s,\n" % (column[0], column[1])
sql = sql[:-2] + "\n);"
f.write(sql)
f.write("\n\n")

Generate Named Stage DDL

Then we generate the stage in S3 from which the files will be loaded:

        f.write("/*** Create %s Stage***/" % table['name'].upper())
sql = """
CREATE OR REPLACE STAGE %s.%s.%s_STAGE
url='%s/%s'
storage_integration = %s
encryption=(type='AWS_SSE_KMS' kms_key_id = '%s');
""" % (database, schema, table['name'].upper(), s3_prefix, table['file'], storage_int, kms_key_id)
f.write(sql)
f.write("\n\n")

Generate COPY commands

Then we generate the COPY commands...

        f.write("/*** Copying Data into %s ***/" % table['name'].upper())
sql = """
COPY INTO %s.%s.%s
(\n""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s,\n" % column[0]
sql = sql[:-2] + "\n)"
sql += " FROM (\nSELECT\n"
for column in table['data_types']:
sql += " $1:%s::%s,\n" % (column[0], column[1])
sql = sql[:-2] + "\nFROM\n"
sql += "@%s.%s.%s_STAGE)\n" % (database, schema, table['name'].upper())
sql += " file_format = (TYPE = parquet);"
f.write(sql)
f.write("\n\n")

Since this is a one off load, we will go ahead and drop the stage we created as it is no longer needed (this step is optional)..

        f.write("/*** Dropping stage for %s ***/" % table['name'].upper())
sql = """
DROP STAGE %s.%s.%s_STAGE;
""" % (database, schema, table['name'].upper())
f.write(sql)
f.write("\n\n")

The resultant file created looks like this..

/*** Create CALENDAR Table***/
CREATE OR REPLACE TABLE PARQUET_TEST.PUBLIC.DIM_CALENDAR (
Time_ID bigint,
CalYear bigint,
CalMonthOfYearNo bigint,
FinYear bigint,
FinWeekOfYearNo bigint
);

/*** Create DIM_CALENDAR Stage***/
CREATE OR REPLACE STAGE PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE
url='s3://my-bucket/data/dim/Calendar.gzip'
storage_integration = my_storage_int
encryption=(type='AWS_SSE_KMS' kms_key_id = '4f715ec9-ee8e-44ab-b35d-8daf36c05f19');

/*** Copying Data into DIM_CALENDAR ***/
COPY INTO PARQUET_TEST.PUBLIC.DIM_CALENDAR
(
Time_ID,
CalYear,
CalMonthOfYearNo,
FinYear,
FinWeekOfYearNo
) FROM (
SELECT
$1:Time_ID::bigint,
$1:CalYear::bigint,
$1:CalMonthOfYearNo::bigint,
$1:FinYear::bigint,
$1:FinWeekOfYearNo::bigint
FROM
@PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE)
file_format = (TYPE = parquet);

/*** Dropping stage for DIM_CALENDAR ***/
DROP STAGE PARQUET_TEST.PUBLIC.DIM_CALENDAR_STAGE;

Load your data

You can then run this along with all of the other dimension and fact table DDL and COPY commands generated to perform the one-off load from parquet files. You can find the complete code below, enjoy!

Complete Code
from pathlib import Path
from pyspark.sql import SparkSession
def launch_spark_session():
return SparkSession \
.builder \
.appName("Parquet DDL Generation") \
.getOrCreate()

def stop_spark_session(spark):
spark.stop()

allTables = []
database = "PARQUET_TEST"
schema = "PUBLIC"
s3_prefix = 's3://my-bucket'
storage_int = 'my_storage_int'
kms_key_id = '4f715ec9-ee8e-44ab-b35d-8daf36c05f19'

BASE_DIR = Path(__file__).resolve().parent
directory = 'data/dim'
files = Path(directory).glob('*.gzip')
for file in files:
tableMap = {}
table = file.stem
spark = launch_spark_session()
parquetFile = spark.read.parquet("%s/%s" %(BASE_DIR, file))
data_types = parquetFile.dtypes
stop_spark_session(spark)
tableMap['name'] = table
tableMap['file'] = file
tableMap['data_types'] = data_types
allTables.append(tableMap)

# create output file for all sql
with open('all_tables.sql', 'w') as f:
for table in allTables:
print("processing %s..." % table['name'])
f.write("/*** Create %s Table***/" % table['name'].upper())
sql = """
CREATE OR REPLACE TABLE %s.%s.%s (
""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s %s,\n" % (column[0], column[1])
sql = sql[:-2] + "\n);"
f.write(sql)
f.write("\n\n")

f.write("/*** Create %s Stage***/" % table['name'].upper())
sql = """
CREATE OR REPLACE STAGE %s.%s.%s_STAGE
url='%s/%s'
storage_integration = %s
encryption=(type='AWS_SSE_KMS' kms_key_id = '%s');
""" % (database, schema, table['name'].upper(), s3_prefix, table['file'], storage_int, kms_key_id)
f.write(sql)
f.write("\n\n")

f.write("/*** Copying Data into %s ***/" % table['name'].upper())
sql = """
COPY INTO %s.%s.%s
(\n""" % (database, schema, table['name'].upper())
for column in table['data_types']:
sql += " %s,\n" % column[0]
sql = sql[:-2] + "\n)"
sql += " FROM (\nSELECT\n"
for column in table['data_types']:
sql += " $1:%s::%s,\n" % (column[0], column[1])
sql = sql[:-2] + "\nFROM\n"
sql += "@%s.%s.%s_STAGE)\n" % (database, schema, table['name'].upper())
sql += " file_format = (TYPE = parquet);"
f.write(sql)
f.write("\n\n")

f.write("/*** Dropping stage for %s ***/" % table['name'].upper())
sql = """
DROP STAGE %s.%s.%s_STAGE;
""" % (database, schema, table['name'].upper())
f.write(sql)
f.write("\n\n")

· 3 min read
Yuncheng Yang

It is common to have a remote and dispersed team these days. As face to face meetings are less common and with geographically dispersed development teams not possible, it is challenging to have a clear picture of where your team is.

GitHub provides useful data to help us understand your development team's workload and progress. StackQL has an official GitHub provider which allows you to access this data using SQL.

info

StackQL is an open source project which enables you to query, analyze and interact with cloud and SaaS provider resources using SQL, see stackql.io

In this example we will use the pystackql Python package (Python wrapper for StackQL) along with a Jupyter Notebook to retrieve data from GitHub using SQL, then sink the data into a cloud native data warehouse for long term storage and analytics at scale, in this example we have used BigQuery.

Step by Step Guide

This guide will walk you through the steps involved in capturing and analyzing developer data using StackQL, Python, Jupyter and BigQuery.

1. Create GitHub Personal Access Token

You will need to create a Personal Access Token in GitHub for a user which has access to the org or orgs in GitHub you will be analyzing. Follow this guide to create your GitHub token and store it somewhere safe.

2. Setup your Jupyter Notebook

You need to set up your Jupyter environment, you can either use the Docker, see stackql/stackql-jupyter-demo or:

  1. Create your Jupyter project
  2. Download and install StackQL
  3. Clone the pystackql repo

3. Setup StackQL Authentication to GitHub

You can find instructions on how to use your personal access token to authenticate to GitHub here. The following example shows how to do this in a Jupyter notebook cell using pystackql.

4. Retrieve data

Next, we will use StackQL SQL queries to get commits, pull requests and pull request reviews, then we will aggregate by usernames of contributors. You can use JOIN semantics in StackQL to do this as well.

Get Contributors, Commits, Pull Requests and Reviews

In the following cell we will query data from GitHub using StackQL:

Aggregate Data By Username

Now we will aggregate the data by each contributor, see the following example:

5. Store the Data in BigQuery

After the transformation of data, we will then upload it to BigQuery. First, we will store the data as a new line delimited json file, making the uploading process much easier and handling the nested schema better, as shown in the following cell:

Now we can see the table on BigQuery as shown here:

BigQuery User Activity Table

From here you can use the same process to append data to the table and use BigQuery to perform analytics at scale on the data.

info

The complete notebook for this article can be accessed at FabioYyc/stackql-github-notebook-bq

· 4 min read
Tom Klimovski

So you're using BigQuery (BQ). It's all set up and humming perfectly. Maybe now, you want to run an ELT job whenever a new table partition is created, or maybe you want to retrain your ML model whenever new rows are inserted into the BQ table.

In my previous article on EventArc, we went through how Logging can help us create eventing-type functionality in your application. Let's take it a step further and walk through how we can couple BigQuery and Cloud Run.

In this article you will learn how to

  • Tie together BigQuery and Cloud Run
  • Use BigQuery's audit log to trigger Cloud Run
  • With those triggers, run your required code

Let's go!

Let's create a temporary dataset within BigQuery named tmp_bq_to_cr.

In that same dataset, let's create a table in which we will insert some rows to test our BQ audit log. Let's grab some rows from a BQ public dataset to create this table:

CREATE OR REPLACE TABLE tmp_bq_to_cr.cloud_run_trigger AS
SELECT
date, country_name, new_persons_vaccinated, population
from `bigquery-public-data.covid19_open_data.covid19_open_data`
where country_name='Australia'
AND
date > '2021-05-31'
LIMIT 100

Following this, let's run an insert query that will help us build our mock database trigger:

INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 3, 1000)

Now, in another browser tab let's navigate to BQ Audit Events and look for our INSERT INTO event:

BQ-insert-event

There will be several audit logs for any given BQ action. Only after a query is parsed does BQ know which table we want to interact with, so the initial log will, for e.g., not have the table name.

We don't want any old audit log, so we need to ensure we look for a unique set of attributes that clearly identify our action, such as in the diagram above.

In the case of inserting rows, the attributes are a combination of

  • The method is google.cloud.bigquery.v2.JobService.InsertJob
  • The name of the table being inserted to is the protoPayload.resourceName
  • The dataset id is available as resource.labels.dataset_id
  • The number of inserted rows is protoPayload.metadata.tableDataChanged.insertedRowsCount

Time for some code

Now that we've identified the payload that we're looking for, we can write the action for Cloud Run. We've picked Python and Flask to help us in this instance. (full code is on GitHub).

First, let's filter out the noise and find the event we want to process

@app.route('/', methods=['POST'])
def index():
# Gets the Payload data from the Audit Log
content = request.json
try:
ds = content['resource']['labels']['dataset_id']
proj = content['resource']['labels']['project_id']
tbl = content['protoPayload']['resourceName']
rows = int(content['protoPayload']['metadata']
['tableDataChange']['insertedRowsCount'])
if ds == 'cloud_run_tmp' and \
tbl.endswith('tables/cloud_run_trigger') and rows > 0:
query = create_agg()
return "table created", 200
except:
# if these fields are not in the JSON, ignore
pass
return "ok", 200

Now that we've found the event we want, let's execute the action we need. In this example, we'll aggregate and write out to a new table created_by_trigger:

def create_agg():
client = bigquery.Client()
query = """
CREATE OR REPLACE TABLE tmp_bq_to_cr.created_by_trigger AS
SELECT
count_name, SUM(new_persons_vaccinated) AS n
FROM tmp_bq_to_cr.cloud_run_trigger
"""
client.query(query)
return query

The Dockerfile for the container is simply a basic Python container into which we install Flask and the BigQuery client library:

FROM python:3.9-slim
RUN pip install Flask==1.1.2 gunicorn==20.0.4 google-cloud-bigquery
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY *.py ./
CMD exec gunicorn --bind :$PORT main:app

Now we Cloud Run

Build the container and deploy it using a couple of gcloud commands:

SERVICE=bq-cloud-run
PROJECT=$(gcloud config get-value project)
CONTAINER="gcr.io/${PROJECT}/${SERVICE}"
gcloud builds submit --tag ${CONTAINER}
gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed

I always forget about the permissions

In order for the trigger to work, the Cloud Run service account will need the following permissions:

gcloud projects add-iam-policy-binding $PROJECT \
--member="serviceAccount:service-${PROJECT_NO}@gcp-sa-pubsub.iam.gserviceaccount.com"\
--role='roles/iam.serviceAccountTokenCreator'

gcloud projects add-iam-policy-binding $PROJECT \
--member=serviceAccount:${SVC_ACCOUNT} \
--role='roles/eventarc.admin'

Finally, the event trigger

gcloud eventarc triggers create ${SERVICE}-trigger \
--location ${REGION} --service-account ${SVC_ACCOUNT} \
--destination-run-service ${SERVICE} \
--event-filters type=google.cloud.audit.log.v1.written \
--event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
--event-filters serviceName=bigquery.googleapis.com

Important to note here is that we're triggering on any Insert log created by BQ That's why in this action we had to filter these events based on the payload.

Take it for a spin

Now, try out the BigQuery -> Cloud Run trigger and action. Go to the BigQuery console and insert a row or two:

INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 5, 25000)

Watch as a new table called created_by_trigger gets created! You have successfully triggered a Cloud Run action on a database event in BigQuery.

Enjoy!

· 4 min read
Jeffrey Aven

cloudsql federated queries

This article demonstrates Cloud SQL federated queries for Big Query, a neat and simple to use feature.

Connecting to Cloud SQL

One of the challenges presented when using Cloud SQL on a private network (VPC) is providing access to users. There are several ways to accomplish this which include:

  • open the database port on the VPC Firewall (5432 for example for Postgres) and let users access the database using a command line or locally installed GUI tool (may not be allowed in your environment)
  • provide a web based interface deployed on your VPC such as PGAdmin deployed on a GCE instance or GKE pod (adds security and management overhead)
  • use the Cloud SQL proxy (requires additional software to be installed and configured)

In additional, all of the above solutions require direct IP connectivity to the instance which may not always be available. Furthermore each of these operations requires the user to present some form of authentication – in many cases the database user and password which then must be managed at an individual level.

Enter Cloud SQL federated queries for Big Query…

Big Query Federated Queries for Cloud SQL

Big Query allows you to query tables and views in Cloud SQL (currently MySQL and Postgres) using the Federated Queries feature. The queries could be authorized views in Big Query datasets for example.

This has the following advantages:

  • Allows users to authenticate and use the GCP console to query Cloud SQL
  • Does not require direct IP connectivity to the user or additional routes or firewall rules
  • Leverages Cloud IAM as the authorization mechanism – rather that unmanaged db user accounts and object level permissions
  • External queries can be executed against a read replica of the Cloud SQL instance to offload query IO from the master instance

Setting it up

Setting up big query federated queries for Cloud SQL is exceptionally straightforward, a summary of the steps are provided below:

Step 1. Enable a Public IP on the Cloud SQL instance

This sounds bad, but it isn’t really that bad. You need to enable a public interface for Big Query to be able to establish a connection to Cloud SQL, however this is not accessed through the actual public internet – rather it is accessed through the Google network using the back end of the front end if you will.

Furthermore, you configure an empty list of authorized networks which effectively shields the instance from the public network, this can be configured in Terraform as shown here:

This configuration change can be made to a running instance as well as during the initial provisioning of the instance.

As shown below you will get a warning dialog in the console saying that you have no authorized networks - this is by design.

Cloud SQL Public IP Enabled with No Authorized Networks

Step 2. Create a Big Query dataset which will be used to execute the queries to Cloud SQL

Connections to Cloud SQL are defined in a Big Query dataset, this can also be use to control access to Cloud SQL using authorized views controlled by IAM roles.

Step 3. Create a connection to Cloud SQL

To create a connection to Cloud SQL from Big Query you must first enable the BigQuery Connection API, this is done at a project level.

As this is a fairly recent feature there isn't great coverage with either the bq tool or any of the Big Query client libraries to do this so we will need to use the console for now...

Under the Resources -> Add Data link in the left hand panel of the Big Query console UI, select Create Connection. You will see a side info panel with a form to enter connection details for your Cloud SQL instance.

In this example I will setup a connection to a Cloud SQL read replica instance I have created:

Creating a Big Query Connection to Cloud SQL

More information on the Big Query Connections API can be found at: https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest

The following permissions are associated with connections in Big Query:

bigquery.connections.create  
bigquery.connections.get
bigquery.connections.list
bigquery.connections.use
bigquery.connections.update
bigquery.connections.delete

These permissions are conveniently combined into the following predefined roles:

roles/bigquery.connectionAdmin    (BigQuery Connection Admin)         
roles/bigquery.connectionUser (BigQuery Connection User)

Step 4. Query away!

Now the connection to Cloud SQL can be accessed using the EXTERNAL_QUERY function in Big Query, as shown here:

Querying Cloud SQL from Big Query