Skip to main content

7 posts tagged with "spark"

View All Tags

· 2 min read
Jeffrey Aven

This is a simple routine to generate random data with a configurable number or records, key fields and non key fields to be used to create synthetic data for source change data capture (CDC) processing. The output includes an initial directory containing CSV files representing an initial data load, and an incremental directory containing CSV files representing incremental data.

Spark Training Courses from the AlphaZetta Academy

Data Transformation and Analysis Using Apache Spark
Stream and Event Processing using Apache Spark
Advanced Analytics Using Apache Spark

Arguments (by position) include:

  • no_init_recs : the number of initial records to generate
  • no_incr_recs : the number of incremental records on the second run - should be >= no_init_recs
  • no_keys : number of key columns in the dataset – keys are generated as UUIDs
  • no_nonkeys : number of non-key columns in the dataset – nonkey values are generated as random numbers
  • pct_del : percentage of initial records deleted on the second run - between 0.0 and 1.0
  • pct_upd : percentage of initial records updated on the second run - between 0.0 and 1.0
  • pct_unchanged : percentage of records unchanged on the second run - between 0.0 and 1.0
  • initial_output : folder for initial output in CSV format
  • incremental_output : folder for incremental output in CSV format

NOTE : pct_del + pct_upd + pct_unchanged must equal 1.0

Example usage:

$ spark-submit synthetic-cdc-data-generator.py 100000 100000 2 3 0.2 0.4 0.4 data/day1 data/day2

Example output from the day1 run for the above configuration would look like this:

Note that this routine can be run subsequent times producing different key and non key values each time, as the keys are UUIDs and the values are random numbers.

We will use this application to generate random input data to demonstrate CDC using Spark in a subsequent post, see you soon!

Full source code can be found at: https://github.com/avensolutions/synthetic-cdc-data-generator

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!

· 3 min read
Jeffrey Aven

Spark SQL ETL Framework

Most traditional data warehouse or datamart ETL routines consist of multi stage SQL transformations, often a series of CTAS (CREATE TABLE AS SELECT) statements usually creating transient or temporary tables – such as volatile tables in Teradata or Common Table Expressions (CTE’s).

The initial challenge when moving from a SQL/MPP based ETL framework platformed on Oracle, Teradata, SQL Server, etc to a Spark based ETL framework is what to do with this…

Multi Stage SQL Based ETL

One approach is to use the lightweight, configuration driven, multi stage Spark SQL based ETL framework described in this post.

This framework is driven from a YAML configuration document. YAML was preferred over JSON as a document format as it allows for multi-line statements (SQL statements), as well as comments - which are very useful as SQL can sometimes be undecipherable even for the person that wrote it.

The YAML config document has three main sections: sources, transforms and targets.

Sources

The sources section is used to configure the input data source(s) including optional column and row filters. In this case the data sources are tables available in the Spark catalog (for instance the AWS Glue Catalog or a Hive Metastore), this could easily be extended to read from other datasources using the Spark DataFrameReader API.

Transforms

The transforms section contains the multiple SQL statements to be run in sequence where each statement creates a temporary view using objects created by preceding statements.

Targets

Finally the targets section writes out the final object or objects to a specified destination (S3, HDFS, etc).

Process SQL Statements

The process_sql_statements.py script that is used to execute the framework is very simple (30 lines of code not including comments, etc). It loads the sources into Spark Dataframes and then creates temporary views to reference these datasets in the transforms section, then sequentially executes the SQL statements in the list of transforms. Lastly the script writes out the final view or views to the desired destination – in this case parquet files stored in S3 were used as the target.

You could implement an object naming convention such as prefixing object names with sv_, iv_, fv_ (for source view, intermediate view and final view respectively) if this helps you differentiate between the different objects.

To use this framework you would simply use spark-submit as follows:

spark-submit process_sql_statements.py config.yml

Full source code can be found at: https://github.com/avensolutions/spark-sql-etl-framework

if you have enjoyed this post, please consider buying me a coffee ☕ to help me keep writing!