Skip to main content

8 posts tagged with "python"

View All Tags

· 8 min read
Chris Ottinger

Container images provide an ideal software packaging solution for DataOps and python based data pipeline workloads. Containers enable Data Scientists and Data Engineers to incorporate the latest packages and libraries without the issues associated with introducing breaking changes into shared environments. A Data Engineer or Data Scienctist can quickly release new functionality with the best tools available.

Container images provide safer developer environments but as the number of container images used for production workloads grow, a maintenance challenge can emerge. Whether using pip or poetry to manage python packages and dependencies, updating a container definition requires edits to the explicit package versions as well as to the pinned or locked versions of the package dependencies. This process can be error prone without automation and a repeatable CICD workflow.

A workflow pattern based on docker buildkit / moby buildkit multi-stage builds provides an approach that maintains all the build specifications in a single Dockerfile, while build tools like make provide a simple and consistent interface into the container build stages. The data pipeline challenges addresses with a multi-stage build pattern include:

  • automating lifecycle management of the Python packages used by data pipelines
  • integrating smoke testing of container images to weed out compatibility issues early
  • simplifying the developer experience with tools like make that can be used both locally and in CI/CD pipelines

The Dockerfile contains the definitions of the different target build stages and order of execution from one stage to the next. The Makefile wraps the Dockerfile build targets into a standard set of workflow activities, following a similar to $ config && make && make install

The DataOps Container Lifecycle Workflow

A typical dataops/gitops style workflow for maintaining container images includes actions in the local environment to define the required packages and produce the pinned dependency poetry.lock file or requirements.txt packages list containing the full set of pinned dependent packages.

Given and existing project in a remote git repository with a CI/CD pipeline defined, the following workflow would be used to update package versions and dependencies:

Multi-stage build workflow

The image maintainer selects the packages to update or refresh using a local development environment, working from a feature branch. This includes performing an image smoke-test to validate the changes within the container image.

Once refreshed image has been validated, the lock file or full pinned package list is commited back to the repository and pushed to the remote repository. The CI/CD pipeline performs a trial build and conducts smoke testing. On merge into the main branch, the target image is built, re-validated, and pushed to the container image registry.

The multi-stage build pattern can support both defining both the declared packages for an environment as well as the dependent packages, but poetry splits the two into distinct files, a pyproject.toml file containing the declated packages and a poetry.lock file that contains the full set of declared and dependent packages, including pinned versions. pip supports loading packages from different files, but requires a convention for which requirements file contains the declared packages and while contains the full set of pinned package versions produced by pip freeze. The example code repo contains examples using both pip and poetry.

The following example uses poetry in a python:3.8 base image to illustrate managing the dependencies and version pinning of python packages.

Multi-stage Dockerfile

The Dockerfile defines the build stages used for both local refresh and by the CICD pipelines to build the target image.

Dockerfile Stages

The Dockerfile makes use of the docker build arguments feature to pass in whether the build should refresh package versions or build the image from pinned packages.

Build Stage: base-pre-pkg

Any image setup and pre-python package installation steps. For poetry, this includes setting the config option to skip the creation of a virtual environment as the container already provides the required isolation.

FROM python:3.8 as base-pre-pkg

RUN install -d /src && \
pip install --no-cache-dir poetry==1.1.13 && \
poetry config virtualenvs.create false

Build Stage: python-pkg-refresh

The steps to generate a poetry.lock file containing the pinned package versions.

FROM base-pre-pkg as python-pkg-refresh
COPY pyproject.toml poetry.lock /src/
RUN poetry update && \
poetry install

Build Stage: python-pkg-pinned

The steps to install packages using the pinned package versions.

FROM base-pre-pkg as python-pkg-pinned
COPY pyproject.toml poetry.lock /src/
RUN poetry install

Build Stage: base-post-pkg

A consolidation build target that can refer to either the python-pkg-refresh or the python-pkg-pinned stages, depending on the docker build argument and includes any post-package installation steps.

FROM python-pkg-${PYTHON_PKG_VERSIONS} as base-post-pkg

Build Stage: smoke-test

Simple smoke tests and validation commands to validate the built image.

FROM base-post-pkg as smoke-test
COPY tests/ ./tests
RUN poetry --version && \
python ./tests/

Build Stage: target-image

The final build target container image. Listing the target-image as the last stage in the Dockerfile has the effect of also making this the default build target.

FROM base-post-pkg as target-image

Multi-stage Makefile

The Makefile provides a workflow oriented wrapper over the Dockerfile build stage targets. The Makefile targets can be executed both in a local development environment as well as via a CICD pipeline. The Makefile includes several variables that can either be run using default values, or overridden by the CI/CD pipeline.

Makefile targets

Make Target: style-check

Linting and style checking of source code. Can include both application code as well as the Dockerfile itself using tools such as hadolint.

hadolint ./Dockerfile

Make Target: python-pkg-refresh

The python-pkg-refresh target builds a version of the target image with refreshed package versions. A temporary container instance is created from the target image and the poetry.lock file is copied into the local file system. The smoke-test docker build target is used to ensure image validation is also performed. The temporary container as well as the package refresh image are removed after the build.

@echo ">> Update python packages in container image"
docker build ${DOCKER_BUILD_ARGS} \
--target smoke-test \
--build-arg PYTHON_PKG_VERSIONS=refresh \
--tag ${TARGET_IMAGE_NAME}:$@ .
@echo ">> Copy the new poetry.lock file with updated package versions"
docker create --name ${TARGET_IMAGE_NAME}-$@ ${TARGET_IMAGE_NAME}:$@
docker cp ${TARGET_IMAGE_NAME}-$@:/src/poetry.lock .
@echo ">> Clean working container and refresh image"
docker rm ${TARGET_IMAGE_NAME}-$@
docker rmi ${TARGET_IMAGE_NAME}:$@

Make Target: build

The standard build target using pinned python package versions.

docker build ${DOCKER_BUILD_ARGS} \
--target target-image \

Make Target: smoke-test

Builds an image and peforms smoke testing. The smoke-testing image is removed after the build.

docker build ${DOCKER_BUILD_ARGS} \
--target smoke-test \
--tag ${TARGET_IMAGE_NAME}:$@ .
docker rmi ${TARGET_IMAGE_NAME}:$@


The toolchain combination of multi-stage container image builds with make provides a codified method for the lifecycle management of the containers used in data science and data engineering workloads.

The maintainer:

git checkout -b my-refresh-feature
make python-pkg-refresh
make smoke-test
git add pyproject.toml poetry.lock
git commit -m "python package versions updated"
git push

The CICD pipeline:

make build
make smoke-test
docker push <target-image>:<build-tag>

You can find the complete source code for this article at

· 6 min read
Mark Stella

Recently I found myself at a client that were using a third party tool to scan all their enterprise applications in order to collate their data lineage. They had spent two years onboarding applications to the tool, resulting in a large technical mess that was hard to debug and impossible to extend. As new applications were integrated onto the platform, developers were forced to think of new ways of connecting and tranforming the data so it could be consumed.

The general approach was: setup scanner -> scan application -> modify results -> upload results -> backup results -> cleanup workspace -> delete anything older than 'X' days

Each developer had their own style of doing this - involving shell scripts, python scripts, SQL and everything in between. Worse, there was slabs of code replicated across the entire repository, with variables and paths changed depending on the use case.

My tasks was to create a framework that could orchestrate the scanning and adhered to the following philosophies:

  • DRY (Don't Repeat Yourself)
  • Config driven
  • Version controlled
  • Simple to extend
  • Idempotent

It also had to be written in Python as that was all the client was skilled in.

After looking at what was on the market (Airflow and Prefect being the main contenders) I decided to roll my own simplified orchestrator that required as little actual coding as possible and could be setup by configuration.

In choosing a configuration format, I settled on HOCON as it closely resembled JSON but has advanced features such as interpolation, substitions and the ability to include other hocon files - this would drastically reduce the amount of boilerplate configuration required.

Because I had focused so heavily on being configuration driven, I also needed the following charecteristics to be delivered:

  • Self discovery of task types (more on this later)
  • Configuration validation at startup

Tasks and self discovery

As I wanted anyone to be able to rapidly extend the framework by adding tasks, I needed to reduce as much repetition and boilerplate as possible. Ideally, I wanted a developer to just have to think about writing code and not have to deal with how to integrate this.

To achieve this, we needed a way of registering new 'tasks' that would become available to the framework. I wanted a developer to simply have to subclass the main Task class and implement a run function - the rest would be taken care of.

class TaskRegistry:

def __init__(self) -> None:
self._registry = {}

def register(self, cls: type) -> None:
n = getattr(cls, 'task_name', cls.__name__).lower()
self._registry[n] = cls

def registered(self) -> List[str]:
return list(self._registry.keys())

def has(self, name: str) -> bool:
return name in self._registry

def get(self, name: str) -> type:
return self._registry[name]

def create(self, name: str, *args, **kwargs) -> object:
return self._registry[name](*args, **kwargs)
except KeyError:
raise ClassNotRegisteredException(name)

registry = TaskRegistry()

Once the registry was instantiated, any new Tasks that inherited from 'Task' would automatically be added to the registry. We could then use the create(name) function to instantiate any class - essentially a pythonic Factory Method

class Task(ABC):

def __init__(self) -> None:
self.logger = logging.getLogger(self.__class__.__name__)

def __init_subclass__(cls) -> None:

def run(self, **kwargs) -> bool:
raise NotImplementedError

For the framework to automatically register the classes, it was important to follow the project structure. As long as the task resided in the 'tasks' module, we could scan this at runtime and register each task.

└── simple_tasker
└── tasks

This was achieved with a simple dynamic module importer

modules = glob.glob(join(dirname(__file__), "*.py"))

for f in modules:
if isfile(f) and not f.endswith(""):

The configuration

In designing how the configuration would bind to the task, I needed to capture the name (what object to instanticate) and what args to pass to the instantiated run function. I decided to model it as below with everything under a 'tasks' array

tasks: [
name: shell_script
args: {
script_path: uname
script_args: -a
name: shell_script
args: {
script_path: find
script_args: [${CWD}/simple_tasker/tasks, -name, "*.py"]
name: archive
args: {
input_directory_path: ${CWD}/simple_tasker/tasks
target_file_path: /tmp/${PLATFORM}_${TODAY}.tar.gz

Orchestration and validation

As mentioned previously, one of the goals was to ensure the configuration was valid prior to any execution. This meant that the framework needed to validate whether tha task name referred to a registered task, and that all mandatory arguments were addressed in the configuration. Determining whether the task was registered was just a simple key check, however to validate the arguments to the run required some inspection - I needed to get all args for the run function and filter out 'self' and any asterisk args (*args, **kwargs)

def get_mandatory_args(func) -> List[str]:

mandatory_args = []
for k, v in inspect.signature(func).parameters.items():
if (
k != "self"
and v.default is inspect.Parameter.empty
and not str(v).startswith("*")

return mandatory_args

And finally onto the actual execution bit. The main functionality required here is to validate that the config was defined correctly, then loop through all tasks and execute them - passing in any args.

class Tasker:

def __init__(self, path: Path, env: Dict[str, str] = None) -> None:

self.logger = logging.getLogger(self.__class__.__name__)
self._tasks = []

with wrap_environment(env):
self._config = ConfigFactory.parse_file(path)

def __validate_config(self) -> bool:

error_count = 0

for task in self._config.get("tasks", []):
name, args = task["name"].lower(), task.get("args", {})

if registry.has(name):
for arg in get_mandatory_args(registry.get(name).run):
if arg not in args:
print(f"Missing arg '{arg}' for task '{name}'")
error_count += 1
print(f"Unknown tasks '{name}'")
error_count += 1

self._tasks.append((name, args))

return error_count == 0

def run(self) -> bool:

if self.__validate_config():

for name, args in self._tasks:
exe = registry.create(name)"About to execute: '{name}'")
if not**args):
self.logger.error(f"Failed tasks '{name}'")
return False

return True
return False

Putting it together - sample tasks

Below are two examples of how easy it is to configure the framework. We have a simple folder archiver that will tar/gz a directory based on 2 input parameters.

class Archive(Task):

def __init__(self) -> None:

def run(self, input_directory_path: str, target_file_path: str) -> bool:"Archiving '{input_directory_path}' to '{target_file_path}'")

with, "w:gz") as tar:
return True

A more complex example would be the ability to execute shell scripts (or os functions) by passing in some optional variables and variables that can either be a string or list.

class ShellScript(Task):

task_name = "shell_script"

def __init__(self) -> None:

def run(
script_path: str,
script_args: Union[str, List[str]] = None,
working_directory_path: str = None
) -> bool:

cmd = [script_path]

if isinstance(script_args, str):
cmd += script_args


result = subprocess.check_output(

for o in result:

except (subprocess.CalledProcessError, FileNotFoundError) as e:
return False

return True

You can view the entire implementation here

· 3 min read
Jeffrey Aven


This is a follow up to a previous blog, Google Cloud Storage Object Notifications using Slack in which we used Slack to notify us of new objects being uploaded to GCS.

In this article we will take things a step further, where uploading an object to a GCS bucket will trigger a DLP inspection of the object and if any preconfigured info types (such as credit card numbers or API credentials) are present in the object, a Slack notification will be generated.

As DLP scans are “jobs”, meaning they run asynchronously, we will need to trigger scans and inspect results using two separate Cloud Functions (one for triggering a scan [gcs-dlp-scan-trigger] and one for inspecting the results of the scan [gcs-dlp-evaluate-results]) and a Cloud PubSub topic [dlp-scan-topic] which is used to hold the reference to the DLP job.

The process is described using the sequence diagram below:

The Code

The gcs-dlp-scan-trigger Cloud Function fires when a new object is created in a specified GCS bucket. This function configures the DLP scan to be executed, including the DLP info types (for instance CREDIT_CARD_NUMBER, EMAIL_ADDRESS, ETHNIC_GROUP, PHONE_NUMBER, etc) a and likelihood of that info type existing (for instance LIKELY). DLP scans determine the probability of an info type occurring in the data, they do not scan every object in its entirety as this would be too expensive.

The primary function executed in the gcs-dlp-scan-trigger Cloud Function is named inspect_gcs_file. This function configures and submits the DLP job, supplying a PubSub topic to which the DLP Job Name will be written, the code for the inspect_gcs_file is shown here:

At this stage the DLP job is created an running asynchronously, the next Cloud Function, gcs-dlp-evaluate-results, fires when a message is sent to the PubSub topic defined in the DLP job. The gcs-dlp-evaluate-results reads the DLP Job Name from the PubSub topic, connects to the DLP service and queries the job status, when the job is complete, this function checks the results of the scan, if the min_likliehood threshold is met for any of the specified info types, a Slack message is generated. The code for the main method in the gcs-dlp-evaluate-results function is shown here:

Finally, a Slack webhook is used to send the message to a specified Slack channel in a workspace, this is done using the send_slack_notification function shown here:

An example Slack message is shown here:

Slack Notification for Sensitive Data Detected in a Newly Created GCS Object

Full source code can be found at:

· 9 min read
Jeffrey Aven

CDC using Spark

Change Data Capture (CDC) is one of the most challenging processing patterns to implement at scale. I personally have had several cracks at this using various different frameworks and approaches, the most recent of which was implemented using Spark – and I think I have finally found the best approach. Even though the code examples referenced use Spark, the pattern is language agnostic – the focus is on the approach not the specific implementation (as this could be applied to any framework or runtime).

The first challenge you are faced with, is to compare a very large dataset (representing the current state of an object) with another potentially very large dataset (representing new or incoming data). Ideally, you would like the process to be configuration driven and accommodate such things as composite primary keys, or operational columns which you would like to restrict from change detection. You may also want to implement a pattern to segregate sensitive attributes from non-sensitive attributes.


This pattern (and all my other recent attempts) is fundamentally based upon calculating a deterministic hash of the key and non-key attribute(s), and then using this hash as the basis for comparison. The difference between this pattern and my other attempts is in the distillation and reconstitution of data during the process, as well as breaking the pattern into discrete stages (designed to minimize the impact to other applications). This pattern can be used to process delta or full datasets.

A high-level flowchart representing the basic pattern is shown here:

CDC Flowchart

The Example

The example provided uses the Synthetic CDC Data Generator application, configuring an incoming set with 5 uuid columns acting as a composite key, and 10 random number columns acting as non key values. The initial days payload consists of 10,000 records, the subsequent days payload consists of another 10,000 records. From the initial dataset, a DELETE operation was performed at the source system for 20% of records, an UPDATE was performed on 40% of the records and the remaining 40% of records were unchanged. In this case the 20% of records that were deleted at the source, were replaced by new INSERT operations creating new keys.

After creating the synthesized day 1 and day 2 datasets, the files are processed as follows:

$ spark-submit config.yaml data/day1 2019-06-18
$ spark-submit config.yaml data/day2 2019-06-19

Where config.yaml is the configuration for the dataset, data/day1 and data/day2 represent the different data files, and 2019-06-18 and 2019-06-19 represent a business effective date.

The Results

You should see the following output from running the preceding commands for day 1 and day 2 respectively:

Day 1:

Day 2:

A summary analysis of the resultant dataset should show:

Pattern Details

Details about the pattern and its implementation follow.

Current and Historical Datasets

The output of each operation will yield a current dataset (that is the current stateful representation of a give object) and a historical dataset partition (capturing the net changes from the previous state in an appended partition).

This is useful, because often consumers will primarily query the latest state of an object. The change sets (or historical dataset partitions) can be used for more advanced analysis by sophisticated users.

Type 2 SCDs (sort of)

Two operational columns are added to each current and historical object:

  • OPERATION : Represents the last known operation to the record, valid values include :
    • I (INSERT)
    • U (UPDATE)
    • D (DELETE – hard DELETEs, applies to full datasets only)
    • X (Not supplied, applies to delta processing only)
    • N (No change)

Since data structures on most big data or cloud storage platforms are immutable, we only store the effective start date for each record, this is changed as needed with each coarse-grained operation on the current object. The effective end date is inferred by the presence of a new effective start date (or change in the EFF_START_DATE value for a given record).

The Configuration

I am using a YAML document to store the configuration for the pattern. Important attributes to include in your configuration are a list of keys and non keys and their datatype (this implementation does type casting as well). Other important attributes include the table names and file paths for the current and historical data structures.

The configuration is read at the beginning of a routine as an input along with the path of an incoming data file (a CSV file in this case) and a business effective date (which will be used as the EFF_START_DATE for new or updated records).

Processing is performed using the specified key and non key attributes and the output datasets (current and historical) are written to columnar storage files (parquet in this case). This is designed to make subsequent access and processing more efficient.

The Algorithm

I have broken the process into stages as follows:

Stage 1 – Type Cast and Hash Incoming Data

The first step is to create deterministic hashes of the configured key and non key values for incoming data. The hashes are calculated based upon a list of elements representing the key and non key values using the MD5 algorithm. The hashes for each record are then stored with the respective record. Furthermore, the fields are casted their target datatype as specified in the configuration. Both of these operations can be performed in a single pass of each row using a map() operation.

Importantly we only calculate hashes once upon arrival of new data, as the hashes are persisted for the life of the data – and the data structures are immutable – the hashes should never change or be invalidated.

Stage 2 – Determine INSERTs

We now compare Incoming Hashes with previously calculated hash values for the (previous day’s) current object. If no current object exists for the dataset, then it can be assumed this is a first run. In this case every record is considered as an INSERT with an EFF_START_DATE of the business effective date supplied.

If there is a current object, then the key and non key hash values (only the hash values) are read from the current object. These are then compared to the respective hashes of the incoming data (which should still be in memory).

Given the full outer join:

incoming_data(keyhash, nonkeyhash) FULL OUTER JOIN
current_data(keyhash, nonkeyhash) ON keyhash

Keys which exist in the left entity which do not exist in the right entity must be the results of an INSERT operation.

Tag these records with an operation of I with an EFF_START_DATE of the business effective date, then rejoin only these records with their full attribute payload from the incoming dataset. Finally, write out these records to the current and historical partition in overwrite mode.

Stage 3 - Determine DELETEs or Missing Records

Referring the previous full outer join operation, keys which exist in the right entity (current object) which do not appear in the left entity (incoming data) will be the result of a (hard) DELETE operation if you are processing full snapshots, otherwise if you are processing deltas these would be missing records (possibly because there were no changes at the source).

Tag these records as D or X respectively with an EFF_START_DATE of the business effective date, rejoin these records with their full attribute payload from the current dataset, then write out these records to the current and historical partition in append mode.

Stage 4 - Determine UPDATEs or Unchanged Records

Again, referring to the previous full outer join, keys which exist in both the incoming and current datasets must be either the result of an UPDATE or they could be unchanged. To determine which case they fall under, compare the non key hashes. If the non key hashes differ, it must have been a result of an UPDATE operation at the source, otherwise the record would be unchanged.

Tag these records as U or N respectively with an EFF_START_DATE of the business effective date (in the case of an update - otherwise maintain the current EFF_START_DATE), rejoin these records with their full attribute payload from the incoming dataset, then write out these records to the current and historical partition in append mode.

Key Callouts

A summary of the key callouts from this pattern are:

  • Use the RDD API for iterative record operations (such as type casting and hashing)
  • Persist hashes with the records
  • Use Dataframes for JOIN operations
  • Only perform JOINs with the keyhash and nonkeyhash columns – this minimizes the amount of data shuffled across the network
  • Write output data in columnar (Parquet) format
  • Break the routine into stages, covering each operation, culminating with a saveAsParquet() action – this may seem expensive but for large datsets it is more efficient to break down DAGs for each operation
  • Use caching for objects which will be reused between actions

Metastore Integration

Although I did not include this in my example, you could easily integrate this pattern with a metastore (such as a Hive metastore or AWS Glue Catalog), by using table objects and ALTER TABLE statements to add historical partitions.

Further optimisations

If the incoming data is known to be relatively small (in the case of delta processing for instance), you could consider a broadcast join where the smaller incoming data is distributed to all of the different Executors hosting partitions from the current dataset.

Also you could add a key to the column config to configure a column to be nullable or not.

Happy CDCing!

Full source code for this article can be found at:

· 2 min read
Jeffrey Aven

This is a simple routine to generate random data with a configurable number or records, key fields and non key fields to be used to create synthetic data for source change data capture (CDC) processing. The output includes an initial directory containing CSV files representing an initial data load, and an incremental directory containing CSV files representing incremental data.

Spark Training Courses from the AlphaZetta Academy

Data Transformation and Analysis Using Apache Spark
Stream and Event Processing using Apache Spark
Advanced Analytics Using Apache Spark

Arguments (by position) include:

  • no_init_recs : the number of initial records to generate
  • no_incr_recs : the number of incremental records on the second run - should be >= no_init_recs
  • no_keys : number of key columns in the dataset – keys are generated as UUIDs
  • no_nonkeys : number of non-key columns in the dataset – nonkey values are generated as random numbers
  • pct_del : percentage of initial records deleted on the second run - between 0.0 and 1.0
  • pct_upd : percentage of initial records updated on the second run - between 0.0 and 1.0
  • pct_unchanged : percentage of records unchanged on the second run - between 0.0 and 1.0
  • initial_output : folder for initial output in CSV format
  • incremental_output : folder for incremental output in CSV format

NOTE : pct_del + pct_upd + pct_unchanged must equal 1.0

Example usage:

$ spark-submit 100000 100000 2 3 0.2 0.4 0.4 data/day1 data/day2

Example output from the day1 run for the above configuration would look like this:

Note that this routine can be run subsequent times producing different key and non key values each time, as the keys are UUIDs and the values are random numbers.

We will use this application to generate random input data to demonstrate CDC using Spark in a subsequent post, see you soon!

Full source code can be found at: