Transformation

We already have our data in the storage, now it’s time to transform it.

As stated in the Project design chapter, transformations are configured in the file transformations.json, while the SQL queries live under transformations/{some-folder}/*.sql. Let’s take a look at transformations.json first.

Transformation configuration

This is what a basic settings of transformations.json looks like:


transformations.json

[
  {
    "type": "sql",
    "source": "{some_folder}",
    "id": "{transformation_name}",
    "in_kex": [
      /* List of input kexes*/
    ],
    "out_kex": "{output_kex_name}",
    "in_tables": [
      /* List of input tables */
    ],
    "query_timeout": 600
  }
]

As you should now, [] brackets in JSON mean array and that means this is a thing, that there can be more of. If you would like to specify more transformations, you simply add one after the first one:


transformations.json

[
  {
    "type": "sql",
    "source": "{some_folder}",
    "id": "{transformation_name}",
    "in_kex": [
      /* List of input kexes*/
    ],
    "out_kex": "{output_kex_name}",
    "in_tables": [
      /* List of input tables */
    ],
    "query_timeout": 600
  },
  {
    "type": "sql",
    "source": "{another_folder}",
    "id": "{another_transformation_name}",
    "in_kex": [
      /* List of input kexes*/
    ],
    "out_kex": "{output_kex_name}",
    "in_tables": [
      /* List of input tables */
    ],
    "query_timeout": 600
  }
]

However, for the sake of this guide, let’s stick with just one transformation.

Parameters' explanation

type

For the sake of this guide, transformation type will always be sql. Other possibility is docker, but Docker transformations will not be covered by this guide.

source

Transformation source is the name of the sub-directory within directory transformations/ where SQL files will be found.

├── transformations
│   └── main
│   ├── 01_orders.sql
│   └── 02_account.sql
└── transformations.json

In case of the file structure above, the name of source would be main.

id

This is the name of the transformation. Please, stick with numbers, English alphabet letters and underscores. Also, as stated above, it is a good practice to keep the name related to the source and otherwise.

Input

Keys in_kex and in_tables are both arrays that accept an arbitrary list of kexes and tables respectively. You can specify kexes by their names and tables in the form of kex_name.table_name. If you specify a kex, all tables from the kex will be available to the transformation.

Output

Parameter out_kex specifies the name of the output kex. All tables prefixed out_ will be copied to this kex after the transformation end.

Query timeout

So far, this is only a BigQuery-specific setting that specifies how many seconds to wait for the query result at most. You can ignore this for now, default 600 means maximal duration of a query will be 10 minutes.

Putting it together

Let’s call our transformation main, since there will only be one and name our source main as well to keep things clean. Let’s input only tables orders and orderdetails from our classicmodels extractor. Third time is the charm, so let’s our output kex call main as well.


transformations.json

[
  {
    "type": "sql",
    "source": "main",
    "id": "main",
    "in_kex": [],
    "in_tables": [
      "in_ex_mysql_classicmodels.orders",
      "in_ex_mysql_classicmodels.orderdetails"
    ],
    "out_kex": "out_main",
    "query_timeout": 600
  }
]

To actually see the transformation, do not forget to run 90_update_project DAG again.

Airflow transformation
Airflow transformation

Writing the SQLs

We cannot run the transformation yet, because we havent’s actually written any SQL files yet (the transformations/main/ directory is still empty).

Setting the sandbox up

The best way to develop SQLs on your data is loading the transformation into sandbox. Sandbox is an exact replica of the input data for the transformation with accessible credentials, so that you can connect to the sandbox and run queries before commiting them to the repository. Let’s open up Sandbox console (Consoles -> Sandbox console). You should see this message showing your e-mail address:

Sandbox console access
Sandbox console access

We need to specify, that we should be able to use the sandbox. Open sandbox.json in your Bizzflow project repository. It should be empty, containing only an empty array [].


sandbox.json

[]

Let’s put our e-mail in it.


sandbox.json

["tomas.votava@bizztreat.com"]

Commit the changes and run 90_update_project again. You should now see two more DAGs in your Airflow UI:

Airlfow with sandbox DAGs
Airlfow with sandbox DAGs

By default, sandbox names are created with dev_ prefix, then first three letters from your first name and three letters from your last name (dev_tomvot for me). 80_Sandbox_main_dev_tomvot DAG can be used to run your transformation in sandbox mode (input data, run SQL files but do not output to output kex). Using 80_Sandbox_main_dev_tomvot_load you only load your data into your sandbox, leaving them in their original state. As we currently have no SQLs, the first one is not what we need. The second one is exactly what we need. Trigger the load DAG right now.

Sandbox credentials

You should now be able to access the Sandbox Console, try it once again using Consoles -> Sandbox.

Sandbox console
Sandbox console

Click Download credentials... button. A file named dev_your_sandbox.json should be downloaded to your computer.


dev_your_sandbox.json

{
  "type": "service_account",
  "project_id": "bizzflow-r2-training",
  "private_key_id": "private_key_id",
  "private_key": "private_key",
  "client_email": "dev-tomvot@bizzflow-r2-training.iam.gserviceaccount.com",
  "client_id": "client_id",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/dev-tomvot%40bizzflow-r2-training.iam.gserviceaccount.com"
}

Never share your sandbox credentials with anyone. They can be used to access project data.

Let’s connect to the sandbox.

Connecting to sandbox

In the DBeaver, select Create connection and pick Google BigQuery from the list of databases.

DBeaver BigQuery
DBeaver BigQuery

As the Project specify the project_id from your credentials file downloaded above, in Service account specify client_email and lastly browse for the key path and select the file downloaded above.

DBeaver Bigquery settings
DBeaver Bigquery settings

Confirm everything using Finish button.

Looking around

You should now be able to display tables you have in your sandbox.

Sandbox tables
Sandbox tables

Just like we specified in in_tables in the transformations.json file, there are only two tables in our transformation sandbox. Both tables have in_ prefix so that it is super easy for us to differentiate between tables that came as the input for out transformation and ones that were created within the transformation itself. Press F3 to open a new SQL file and type this:

SELECT * FROM `dev_tomvot`.`in_orders`;

Replace dev_tomvot with the name of your sandbox.

Now press Alt + X and you should see the content of the in_orders table.

DBeaver result
DBeaver result

The queries

We could spend days cleaning the data and getting them ready for processing, but this is not the goal of this guide. Let’s just say we do not like the null on the tenth line in shippeddate column, so let’s correct it:

SELECT
    `ordernumber`,
    `orderdate`,
    `requireddate`,
    CASE
        WHEN `shippeddate` = '' THEN '1970-01-01'
        ELSE `shippeddate`
    END as `shippeddate`,
    `status`
FROM
    `dev_tomvot`.`in_orders`;

In order to actually have the result of the query in our transformation, let’s persist the SELECT as a table:

CREATE OR REPLACE TABLE `dev_tomvot`.`cln_orders` AS
SELECT
    `ordernumber`,
    `orderdate`,
    `requireddate`,
    CASE
        WHEN `shippeddate` = '' THEN '1970-01-01'
        ELSE `shippeddate`
    END as `shippeddate`,
    `status`
FROM
    `dev_tomvot`.`in_orders`;

From now on, we should have cln_orders table containing our cleaned input data. Let’s do the same with orderdetails:

CREATE OR REPLACE TABLE `dev_tomvot`.`cln_orderdetails` AS
SELECT
    `ordernumber`,
    `productcode`,
    `quantityordered`,
    `priceeach`,
    `orderlinenumber`,
    CAST(`quantityordered` as INT64) * CAST(`priceeach` AS FLOAT64) as `total_price`
FROM
    `dev_tomvot`.`in_orderdetails`
;

Now let’s produce a single table containing all of our desired data:

CREATE OR REPLACE TABLE `dev_tomvot`.`out_order_lines` AS
SELECT
    d.`orderlinenumber`,
    o.`ordernumber`,
    o.`orderdate`,
    o.`shippeddate`,
    d.`productcode`,
    d.`quantityordered`,
    d.`total_price`,
    d.`priceeach`
FROM `dev_tomvot`.`cln_orders` o
LEFT JOIN `dev_tomvot`.`cln_orderdetails` d ON o.`ordernumber` = d.`ordernumber`
;

Saving the queries

Let’s put our resulting queries into our Bizzflow project repository in two files:


transformations/main/01_clean.sql

CREATE OR REPLACE TABLE `tr`.`cln_orders` AS
SELECT
    `ordernumber`,
    `orderdate`,
    `requireddate`,
    CASE
        WHEN `shippeddate` = '' THEN '1970-01-01'
        ELSE `shippeddate`
    END as `shippeddate`,
    `status`
FROM
    `tr`.`in_orders`;

CREATE OR REPLACE TABLE `tr`.`cln_orderdetails` AS
SELECT
    `ordernumber`,
    `productcode`,
    `quantityordered`,
    `priceeach`,
    `orderlinenumber`,
    CAST(`quantityordered` as INT64) * CAST(`priceeach` AS FLOAT64) as `total_price`
FROM
    `tr`.`in_orderdetails`
;

transformations/main/02_process.sql

CREATE OR REPLACE TABLE `tr`.`out_order_lines` AS
SELECT
    d.`orderlinenumber`,
    o.`ordernumber`,
    o.`orderdate`,
    o.`shippeddate`,
    d.`productcode`,
    d.`quantityordered`,
    d.`total_price`,
    d.`priceeach`
FROM `tr`.`cln_orders` o
LEFT JOIN `tr`.`cln_orderdetails` d ON o.`ordernumber` = d.`ordernumber`
;

Now you can commit all changes and run 90_update_project DAG to refresh our queries. It’s time to run the transformation.

Running the transformation

Our queries are ready, project was updated, let’s run the transformation.

Airflow transformation
Airflow transformation

Run 40_Transformation_main DAG. After it has ended, look into Storage Console. After clicking Refresh kexes button, you should see a new kex appear, named out_main, containing only single table order_lines.

Storage output
Storage output

Easy-peasy, am I right?

So that was our first transformation. Go get yourself a steaming hot chocolate ☕ and then continue to the next chapter.