Skip to main content

· 10 min read
Tom Klimovski

Metadata Hub (MDH) is intended to be the source of truth for metadata around the Company’s platform. It has the ability to load metadata configuration from yaml, and serve that information up via API. It will also be the store of information for pipeline information while ingesting files into the platform.

Key philosophies:

Config-Driven. Anyone who has been authorized to do so, should be able to add another ‘table-info.yaml’ in to MDH without the need to update any code in the system

Here’s how table information makes its way into MDH:

Metadata Hub
Metadata Hub

Paths

/tablesget:summary: All tables in MDHdescription: get the title of all tables that exist in MDH
post:summary: Creates a new table in MDHdescription: Creates a new table in MDH
/tables/{id}getsummary: Obtain information about specific table
/tables/{id}/columnsgetsummary: All columns for a particular tabledescription: Obtain information on columns for a particular table
/rungetsummary: All information about a particular end-to-end batch run of file ingestion
postsummary: Update metadata on a batch loaddescription: Update metadata on a batch load
/calendargetsummary: Use this to save on calculation of business days.description: This base response gives you today's date in a string
/calendar/previousBusinessDaygetsummary: Will return a string of the previous business daydescription: Will return a string of the previous business day, based on the date on when it's called
/calendar/nextBusinessDaygetsummary: Will return a string of the next business daydescription: Will return a string of the next business day, based on the date on when it's called

Yaml to Datastore - Entity/Kind design

Datastore Primer

Before we jump right into Entity Groups in Datastore, it is important to first go over the basics and establish a common vocabulary. Datastore holds entities, which are objects, that can contain various key/value pairs, called properties. Each entity must contain a unique identifier, known as a key. When creating an entity, a user can choose to specify a custom key or let Datastore create a key. If a user decides to specify a custom key, it will contain two fields: a kind, which represents a category such as ‘Toy’ or ‘Marital Status’, and a name, which is the identifying value. If a user decides to only specify a kind when creating a key, and does not specify a unique identifier, Datastore automatically generates an ID behind the scenes. Below is an example of a Python3 script which illustrates this identifier concept.

from google.cloud import datastore

client = datastore.Client()
#Custom key- specify my kind=item and a unique_id of broker
custom_key_entry = datastore.Entity(client.key("table","broker"))
client.put(custom_key_entry)

#Only specify kind=item, let datastore generate unique_id
datastore_gen_key_entry = datastore.Entity(client.key("table"))
client.put(datastore_gen_key_entry)

In your GCP Console under Datastore, you will then see your two entities of kind “table”. One will contain your custom key and one will contain the automatically generated key.

Ancestors and Entity Groups

For highly related or hierarchical data, Datastore allows entities to be stored in a parent/child relationship. This is known as an entity group or ancestor/descendent relationship.

Entity Group

erd

This is an example of an entity group with kinds of types: table, column, and classification. The ‘Grandparent’ in this relationship is the ‘table’. In order to configure this, one must first create the table entity. Then, a user can create a column, and specify that the parent is a table key. In order to create the grandchild, a user then creates a classification and sets its parent to be a column key. To further add customizable attributes, a user can specify additional key-value pairs such as pii and data_type. These key-value pairs are stored as properties. We model this diagram in Datastore in our working example below.

One can create entity groups by setting the ‘parent’ parameter while creating an entity key for a child. This command adds the parent key to be part of the child entity key. The child’s key is represented as a tuple (‘parent_key’, ‘child_key’), such that the parents’ key is the prefix of the key, which is followed by its own unique identifier. For example, follow the diagram above:

table_key = datastore_client.key("table","broker")
column_key = datastore_client.key("column","broker_legal_name", parent=table_key)

Printing the variable table_key will display: ("table", "broker","column", "broker_legal_name")

Datastore also supports chaining of parents, which can lead to very large keys for descendants with a long lineage of ancestors. Additionally, parents can have multiple children (representing a one-to-many relationship). However, there is no native support for entities to have multiple parents (representing a many-to-many relationship). Once you have configured this ancestral hierarchy, it is easy to retrieve all descendants for a given parent. You can do this by querying on the parent key by using the ‘ancestor’ parameter. For example, given the entity table_key created above, I can query for all of the tables

columns: my_query = client.query(kind="table", ancestor = column_key) .

A Full Working Example for MDH

As per our Key Philosophies - Config-Driven - anyone should be able to add a new table to be processed and landed in a target-table somewhere within MDH with our yaml syntax. Below is a full working python3 example of the table/column/classification hierarchical model described above.

from google.cloud import datastore

datastore_client = datastore.Client()

# Entities with kinds- table, column, classification
my_entities = [
{"kind": "table", "table_id": "broker", "table_type": "snapshot",
"notes": "describes mortgage brokers"},
{"kind": "column", "column_id": "broker_legal_name", "table_id": "broker",
"data_type": "string", "size": 20, "nullable": 1},
{"kind": "column", "column_id": "broker_short_code", "table_id": "broker",
"data_type": "string", "size": 3, "nullable": 1},
{"kind": "classification", "classification_id":"classif_id_REQ_01",
"restriction_level": "public", "pii": 0, "if": "greater than 90 days",
"column_id": "broker_legal_name", "table_id": "broker"},
{"kind": "classification", "classification_id":"classif_id_REQ_03",
"restriction_level": "restricted", "pii": 0, "if": "less than 90 days",
"column_id": "broker_legal_name", "table_id": "broker"},
{"kind": "classification", "classification_id":"classif_id_REQ_214",
"restriction_level": "public", "pii": 0, "column_id": "broker_short_code",
"table_id": "broker"},
]


# traverse my_entities, set parents and add those to datastore
for entity in my_entities:
kind = entity['kind']
parent_key = None
if kind == "column":
parent_key = datastore_client.key("table", entity["table_id"])
elif kind == "classification":
parent_key = datastore_client.key("table", entity["table_id"],
"column", entity["column_id"])

key = datastore_client.key(kind, entity[kind+"_id"],
parent=parent_key)
datastore_entry = datastore.Entity(key)
datastore_entry.update(entity)

print("Saving: {}".format(entity))

datastore_client.put(datastore_entry)

The code above assumes that you’ve set yourself up with a working Service Account or authorised yourself in, and that your GCP project has been set.

Now let’s do some digging around our newly minted Datastore model. Let’s grab the column ‘broker_legal_name’

query1 = datastore_client.query(kind="column")
query1.add_filter("column_id", "=", "broker_legal_name")

Now that we have the column entity, let’s locate it’s parent id.

column = list(query1.fetch())[0]
print("This column belongs to: " +str(column.key.parent.id_or_name))

Further to this, we can also get all data classification elements attributed to a single column using the ancestor clause query.

query2 = datastore_client.query(kind="classification", ancestor=column.key)
for classification in list(query2.fetch()):
print(classification.key)
print(classification["restriction_level"])

For more complex queries, Datastore has the concept of indexes being set, usually via it’s index.yaml configuration. The following is an example of an index.yaml file:

indexes:
- kind: Cat
ancestor: no
properties:
- name: name
- name: age
direction: desc

- kind: Cat
properties:
- name: name
direction: asc
- name: whiskers
direction: desc

- kind: Store
ancestor: yes
properties:
- name: business
direction: asc
- name: owner
direction: asc

Indexes are important when attempting to add filters on more than one particular attribute within a Datastore entity. For example, the following code will fail:

# Adding a '>' filter will cause this to fail. Sidenote; it will work
# without an index if you add another '=' filter.
query2 = datastore_client.query(kind="classification", ancestor=column.key)
query2.add_filter("pii", ">", 0)
for classification in list(query2.fetch()):
print(classification.key)
print(classification["classification_id"])

To rectify this issue, you need to create an index.yaml that looks like the following:

indexes:
- kind: classification
ancestor: yes
properties:
- name: pii

You would usually upload the yaml file using the gcloud commands:

gcloud datastore indexes create path/to/index.yaml.

However, let’s do this programmatically.

The official pypi package for google-cloud-datastore can be found here: https://pypi.org/project/google-cloud-datastore/. At the time of writing, Firestore in Datastore-mode will be the way forward, as per the release note from January 31, 2019.

Cloud Firestore is now Generally Available. Cloud Firestore is the new version of Cloud Datastore and includes a backwards-compatible Datastore mode.

If you intend to use the Cloud Datastore API in a new project, use Cloud Firestore in Datastore mode. Existing Cloud Datastore databases will be automatically upgraded to Cloud Firestore in Datastore mode.

Except where noted, the Cloud Datastore documentation now describes behavior for Cloud Firestore in Datastore mode.

We’ve purposefully created MDH in Datastore to show you how it was done originally, and we’ll be migrating the Datastore code to Firestore in an upcoming post.

Creating and deleting indexes within Datastore will need to be done through the REST API via googleapiclient.discovery, as this function doesn’t exist via the google-cloud-datastore API. Working with the discovery api client can be a bit daunting for a first-time user, so here’s the code to add an index on Datastore:

import os
from google.oauth2 import service_account
from googleapiclient.discovery import build
from google.cloud import datastore


SCOPES = ['https://www.googleapis.com/auth/cloud-platform']

SERVICE_ACCOUNT_FILE = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
PROJECT_ID = os.getenv("PROJECT_ID")

credentials = service_account
.Credentials
.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)

datastore_api = build('datastore', 'v1', credentials=credentials)

body = {
'ancestor': 'ALL_ANCESTORS',
'kind': 'classification',
'properties': [{
'name': 'pii',
'direction': 'DESCENDING'
}]
}

response = datastore_api.projects()
.indexes()
.create(projectId=PROJECT_ID, body=body)
.execute()

How did we craft this API request? We can use the Google API Discovery Service to build client libraries, IDE plugins, and other tools that interact with Google APIs. The Discovery API provides a list of Google APIs and a machine-readable "Discovery Document" for each API. Features of the Discovery API:

  • A directory of supported APIs schemas based on JSON Schema.
  • A machine-readable "Discovery Document" for each of the supported APIs. Each document contains:
  • A list of API methods and available parameters for each method.
  • A list of available OAuth 2.0 scopes.
  • Inline documentation of methods, parameters, and available parameter values.

Navigating to the API reference page for Datastore and going to the ‘Datastore Admin’ API page, we can see references to the Indexes and RESTful endpoints we can hit for those Indexes. Therefore, looking at the link for the Discovery document for Datastore:

https://datastore.googleapis.com/$discovery/rest?version=v1

From this, we can build out our instantiation for the google api discovery object build('datastore', 'v1', credentials=credentials)

With respect to building out the body aspect of the request, I’ve found crafting that part within the ‘Try this API’ section of https://cloud.google.com/datastore/docs/reference/admin/rest/v1/projects.indexes/create pretty valuable.

With this code, your index should show up in your Datastore console! You can also retrieve them within gcloud with gcloud datastore indexes list if you’d like to verify the indexes outside our python code. So there you have it: a working example of entity groups, ancestors, indexes and Metadata within Datastore. Have fun coding!

· 2 min read
Jeffrey Aven

Big fan of GitLab (and GitLab CI in particular). I had a recent requirement to push changes to a wiki repo associated with a GitLab project through a GitLab CI pipeline (using the SaaS version of GitLab) and ran into a conundrum…

Using the GitLab SaaS version - deploy tokens can’t have write api access, so the next best solution is to use deploy keys, adding your public key as a deploy key and granting this key write access to repositories is relatively straightforward.

This issue is when you attempt to create a masked GitLab CI variable using the private key from your keypair, you get this…

I was a bit astonished to see this to be honest… Looks like it has been raised as an issue several times over the last few years but never resolved (the root cause of which is something to do with newline characters or base64 encoding or the overall length of the string).

I came up with a solution! Not pretty but effective, masks the variable so that it cannot be printed in CI logs as shown here:

Setup

Add a masked and protected GitLab variable for each line in the private key, for example:

The Code

Add the following block to your .gitlab-ci.yml file:

now within Jobs in your pipeline you can simply do this to clone, push or pull from a remote GitLab repo:

as mentioned not pretty, but effective and no other cleaner options as I could see…

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 6 min read
Mark Stella

Recently I found myself at a client that were using a third party tool to scan all their enterprise applications in order to collate their data lineage. They had spent two years onboarding applications to the tool, resulting in a large technical mess that was hard to debug and impossible to extend. As new applications were integrated onto the platform, developers were forced to think of new ways of connecting and tranforming the data so it could be consumed.

The general approach was: setup scanner -> scan application -> modify results -> upload results -> backup results -> cleanup workspace -> delete anything older than 'X' days

Each developer had their own style of doing this - involving shell scripts, python scripts, SQL and everything in between. Worse, there was slabs of code replicated across the entire repository, with variables and paths changed depending on the use case.

My tasks was to create a framework that could orchestrate the scanning and adhered to the following philosophies:

  • DRY (Don't Repeat Yourself)
  • Config driven
  • Version controlled
  • Simple to extend
  • Idempotent

It also had to be written in Python as that was all the client was skilled in.

After looking at what was on the market (Airflow and Prefect being the main contenders) I decided to roll my own simplified orchestrator that required as little actual coding as possible and could be setup by configuration.

In choosing a configuration format, I settled on HOCON as it closely resembled JSON but has advanced features such as interpolation, substitions and the ability to include other hocon files - this would drastically reduce the amount of boilerplate configuration required.

Because I had focused so heavily on being configuration driven, I also needed the following charecteristics to be delivered:

  • Self discovery of task types (more on this later)
  • Configuration validation at startup

Tasks and self discovery

As I wanted anyone to be able to rapidly extend the framework by adding tasks, I needed to reduce as much repetition and boilerplate as possible. Ideally, I wanted a developer to just have to think about writing code and not have to deal with how to integrate this.

To achieve this, we needed a way of registering new 'tasks' that would become available to the framework. I wanted a developer to simply have to subclass the main Task class and implement a run function - the rest would be taken care of.

class TaskRegistry:

def __init__(self) -> None:
self._registry = {}

def register(self, cls: type) -> None:
n = getattr(cls, 'task_name', cls.__name__).lower()
self._registry[n] = cls

def registered(self) -> List[str]:
return list(self._registry.keys())

def has(self, name: str) -> bool:
return name in self._registry

def get(self, name: str) -> type:
return self._registry[name]

def create(self, name: str, *args, **kwargs) -> object:
try:
return self._registry[name](*args, **kwargs)
except KeyError:
raise ClassNotRegisteredException(name)


registry = TaskRegistry()

Once the registry was instantiated, any new Tasks that inherited from 'Task' would automatically be added to the registry. We could then use the create(name) function to instantiate any class - essentially a pythonic Factory Method

class Task(ABC):

def __init__(self) -> None:
self.logger = logging.getLogger(self.__class__.__name__)

def __init_subclass__(cls) -> None:
registry.register(cls)

@abstractmethod
def run(self, **kwargs) -> bool:
raise NotImplementedError

For the framework to automatically register the classes, it was important to follow the project structure. As long as the task resided in the 'tasks' module, we could scan this at runtime and register each task.

└── simple_tasker
├── __init__.py
├── cli.py
└── tasks
├── __init__.py
├── archive.py
└── shell_script.py

This was achieved with a simple dynamic module importer

modules = glob.glob(join(dirname(__file__), "*.py"))

for f in modules:
if isfile(f) and not f.endswith("__init__.py"):
__import__(f"{Task.__module__}.{basename(f)[:-3]}")

The configuration

In designing how the configuration would bind to the task, I needed to capture the name (what object to instanticate) and what args to pass to the instantiated run function. I decided to model it as below with everything under a 'tasks' array

tasks: [
{
name: shell_script
args: {
script_path: uname
script_args: -a
}
},
{
name: shell_script
args: {
script_path: find
script_args: [${CWD}/simple_tasker/tasks, -name, "*.py"]
}
},
{
name: archive
args: {
input_directory_path: ${CWD}/simple_tasker/tasks
target_file_path: /tmp/${PLATFORM}_${TODAY}.tar.gz
}
}
]

Orchestration and validation

As mentioned previously, one of the goals was to ensure the configuration was valid prior to any execution. This meant that the framework needed to validate whether tha task name referred to a registered task, and that all mandatory arguments were addressed in the configuration. Determining whether the task was registered was just a simple key check, however to validate the arguments to the run required some inspection - I needed to get all args for the run function and filter out 'self' and any asterisk args (*args, **kwargs)

def get_mandatory_args(func) -> List[str]:

mandatory_args = []
for k, v in inspect.signature(func).parameters.items():
if (
k != "self"
and v.default is inspect.Parameter.empty
and not str(v).startswith("*")
):
mandatory_args.append(k)

return mandatory_args

And finally onto the actual execution bit. The main functionality required here is to validate that the config was defined correctly, then loop through all tasks and execute them - passing in any args.

class Tasker:

def __init__(self, path: Path, env: Dict[str, str] = None) -> None:

self.logger = logging.getLogger(self.__class__.__name__)
self._tasks = []

with wrap_environment(env):
self._config = ConfigFactory.parse_file(path)


def __validate_config(self) -> bool:

error_count = 0

for task in self._config.get("tasks", []):
name, args = task["name"].lower(), task.get("args", {})

if registry.has(name):
for arg in get_mandatory_args(registry.get(name).run):
if arg not in args:
print(f"Missing arg '{arg}' for task '{name}'")
error_count += 1
else:
print(f"Unknown tasks '{name}'")
error_count += 1

self._tasks.append((name, args))

return error_count == 0

def run(self) -> bool:

if self.__validate_config():

for name, args in self._tasks:
exe = registry.create(name)
self.logger.info(f"About to execute: '{name}'")
if not exe.run(**args):
self.logger.error(f"Failed tasks '{name}'")
return False

return True
return False

Putting it together - sample tasks

Below are two examples of how easy it is to configure the framework. We have a simple folder archiver that will tar/gz a directory based on 2 input parameters.

class Archive(Task):

def __init__(self) -> None:
super().__init__()

def run(self, input_directory_path: str, target_file_path: str) -> bool:

self.logger.info(f"Archiving '{input_directory_path}' to '{target_file_path}'")

with tarfile.open(target_file_path, "w:gz") as tar:
tar.add(
input_directory_path,
arcname=os.path.basename(input_directory_path)
)
return True

A more complex example would be the ability to execute shell scripts (or os functions) by passing in some optional variables and variables that can either be a string or list.

class ShellScript(Task):

task_name = "shell_script"

def __init__(self) -> None:
super().__init__()

def run(
self,
script_path: str,
script_args: Union[str, List[str]] = None,
working_directory_path: str = None
) -> bool:

cmd = [script_path]

if isinstance(script_args, str):
cmd.append(script_args)
else:
cmd += script_args

try:

result = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT,
cwd=working_directory_path
).decode("utf-8").splitlines()

for o in result:
self.logger.info(o)

except (subprocess.CalledProcessError, FileNotFoundError) as e:
self.logger.error(e)
return False

return True

You can view the entire implementation here

· 2 min read
Jeffrey Aven

Okta Admin CLI

Identity and Access Management is a critical component of any application or SaaS architecture. I’m currently doing a spike of the Okta solution for an application development project I am on. Okta is a comprehensive solution built on the open OAuth2 and OIDC protocols, as well as supporting more conventional identity federation approaches such as SAML.

Okta has a clean and easy to use web-based Admin interface which can be used to create applications, users, claims, identity providers and more.

During my spike, which was done in a crash and burn test Okta organisation, I had associated my user account with a Microsoft Identity Provider for SSO, and subsequently had issues accessing the Microsoft Account my user was associated with, as a result I managed to lock myself (the super admin) out of the Okta Admin Console.

Fortunately, prior to doing this I had created an API token for my user. So, I went about looking at ways I could interact with Okta programmatically. My first inclination was to use a simple CLI for Okta to get me out of jail… but I found there wasn’t one that suited. There are, however, a wealth of SDKs for Okta across multiple front-end and back-end oriented programming languages (such as JavaScript, Golang, Python and more).

Being in lockdown and having some free time on my hands, I decided to create a simple open source command line tool which could be used to administer an Okta organisation. The result of this weekend lockdown is okta-admin

okta-admin cli

For this project I used the Golang SDK for Okta, along with the Cobra and Viper Golang packages (used by docker, kubectl and other popular command line utilities). To provide a query interface to JSON response payloads I use GJson.

Will keep adding to this so stay tuned...

Complete source code for this project is available at https://github.com/gammastudios/okta-admin

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 2 min read
Jeffrey Aven

Snowflake

Snowflake allows roles to be assigned to other roles, so when a user is assigned to a role, they may inherit the ability to use countless other roles.

Challenge: recursively enumerate all roles for a given user

One solution would be to create a complex query on the “SNOWFLAKE"."ACCOUNT_USAGE"."GRANTS_TO_ROLES" object.

An easier solution is to use a stored procedure to recurse through grants for a given user and return an ARRAY of roles for that user.

This is a good programming exercise in tail call recursion (sort of) in JavaScript. Here is the code:

To call the stored proc, execute:

One drawback of stored procedures in Snowflake is that they can only have scalar or array return types and cannot be used directly in a SQL query, however you can use the table(result_scan(last_query_id())) trick to get around this, as shown below where we will pivot the ARRAY into a record set with the array elements as rows:

IMPORTANT

This query must be the next statement run immediately after the CALL statement and cannot be run again until you run another CALL statement.

More adventures with Snowflake soon!

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!