Transformations

Transformation is a manupulation with the data already in the Bizzflow storage (analytical warehouse). Here you will read about how to create an sql and Python transformations in Bizzflow. Within your data pipeline, transformation will expect to get some input data and Bizzflow will expect transformation to produce some output data in return.

Once there are actual data in the warehouse, you are free to play with them as you want. Although, there are few restrictions that will help you keep your project clean and accessible by enforcing some best practices.

Transformation Configuration

In order to create a transformation in your data pipeline, you need to specify the transformation metadata in transformations.json (or transformations.yaml) and then provide Bizzflow with the files that will process the actual data transformation. Think of it this way: In transformations.json you specify which data you want to transform and where to put the outputs to. In /transformations/ directory you specify what to do to the data being transformed.

In short, always specify the transformation in transformations.json (or YAML) and then put your SQL and Python files to /transformations/ directory.

Transformations Reference

No matter the warehouse your Bizzflow is running, following are the keys that are always applicable:

KeyTypeDescription
idstringrequired, id for your transformation (must be unique, max 32 characters)
typestringrequired, possible values: sql or docker
sourcestringrequired, the name of the subdirectory containing transformation files
out_kexstringrequired, name of the kex that will hold transformation output
input_tables*array of stringslist of tables (including kex names) to be present in the transformation
input_kexes*array of stringslist of kexes to be present in the transformation

* both input_tables and input_kexes may be present. In this case, a union of input_tables and tables contained in input_kexes is loaded into the transformation.

Additionally, there are keys that are applicable only to some of the warehouses currently supported by Bizzflow:

KeySupported WarehousesTypeDescription
query_timeoutbigquery, snowflakeintegerhow long (seconds) to wait for sql query to execute
transformation_service_accountbigquerystringdeprecated, name of the service account to create for transformation

Example 1: A simple SQL transformation

Let’s pretend we already have tables in two kexes:

  • in_hubspot
    • accounts
    • deals
  • in_mysql
    • invoices

We want to load tables accounts and invoices and transform them in an SQL transformation.

transformations.json

[
  {
    "id": "example",
    "type": "sql",
    "source": "example",
    "out_kex": "out_example",
    "input_tables": ["in_hubspot.accounts"],
    "input_kexes": ["in_mysql"]
  }
]

Notice that we used both input_tables and input_kexes here. We refer to our table accounts within kex in_hubspot by its full name in_hubspot.deals. Since kex in_mysql only contains one table, we can load it whole.

The same configuration in YAML:

transformations.yaml

- id: example
  type: sql
  source: example
  out_kex: out_example
  input_tables:
    - in_hubspot.accounts
  input_kexes:
    - in_mysql

The transformation configuration is set, now we need to create the transformation’s content. Create a directory example under /transformations/ directory and a single file 00_init.sql.

/transformations/example/00_init.sql

Let’s create a table that will contain an account name for each invoice based on account’s VAT. We are using BigQuery SQL dialect here, but implementation for any warehouse would be very similar.

CREATE TABLE `tr`.`out_invoices` AS
SELECT
  i.*,
  a.name as account_name
FROM `tr`.`in_invoices` i
INNER JOIN `tr`.`in_accounts` a ON a.vat_number = i.vat_number

You may notice quite a few things here:

The tr namespace

Since BigQuery doesn’t allow selecting a default dataset, you need to specify tr when refering to any table. Bizzflow will translate the name tr to the temporary transformation dataset that the transformation is actually taking place within.

the in_ prefix

All tables that Bizzflow loads to the temporary transformation dataset for you will be prefixed with in_ prefix, do not forget that when refering to them.

the out_ prefix

All tables you want Bizzflow to output to the out_kex kex after the transformation need to have out_ as a prefix. This allows you to create arbitrary temporary tables without needing to take care of what happens to them.

Example 2: A simple Python transformation

Let’s create a similar solution using Python instead.

transformations.json

[
  {
    "id": "example",
    "type": "docker",
    "source": "example",
    "out_kex": "out_example",
    "input_tables": ["in_hubspot.accounts"],
    "input_kexes": ["in_mysql"]
  }
]

Notice the only thing that changed here is type. It is the same in YAML:

transformations.yaml

- id: example
  type: docker
  source: example
  out_kex: out_example
  input_tables:
    - in_hubspot.accounts
  input_kexes:
    - in_mysql

Next, we need to create example directory under /transformations/ again, but this time we will need to create two separate files:

/transformations/example/main.py

We will only copy contents of the input table accounts into output table copied_accounts.

import csv

with open("/data/in/tables/accounts.csv") as finput:
  with open("/data/out/tables/copied_accounts.csv") as foutput:
    reader = csv.DictReader(finput, dialect=csv.unix_dialect)
    writer = csv.DictWriter(foutput, fieldnames=reader.fieldnames, dialect=csv.unix_dialect)
    writer.writeheader()
    writer.writerows(reader)

In order to run this Python transformation, we need to tell Bizzflow how to make it into a container.

/transformations/example/Dockerfile

FROM python:3.9-slim
WORKDIR /code
ADD main.py .
CMD ["python", "-u", "main.py"]

Again, let’s summarize what we have just seen:

Input tables

All input tables are loaded into /data/in/tables/ directory as individual CSV files conforming to UNIX dialect.

Output tables

All CSV files you will put into /data/out/tables/ directory will be loaded into the analytical warehouse to the kex specified in out_kex configuration.

Transformation Dockerfile

Since the transformation is actually a Docker container, you may actually use any language that feels right for the use case. Just always remember to find your input tables in /data/in/tables and put your output tables to /data/out/tables.

Sandbox

If you need to test your SQL transformation in a safe environment, always use sandboxes. You can create one using the Sandbox console. You should always dry run your transformation before running it live.