Transformations
Transformation is a manupulation with the data already in the Bizzflow storage (analytical warehouse). Here you will read about how to create an sql and Python transformations in Bizzflow. Within your data pipeline, transformation will expect to get some input data and Bizzflow will expect transformation to produce some output data in return.
Once there are actual data in the warehouse, you are free to play with them as you want. Although, there are few restrictions that will help you keep your project clean and accessible by enforcing some best practices.
Transformation Configuration
In order to create a transformation in your data pipeline, you need to specify the transformation metadata
in transformations.json
(or transformations.yaml
) and then provide Bizzflow with the files that will
process the actual data transformation. Think of it this way: In transformations.json
you specify which data you
want to transform and where to put the outputs to. In /transformations/
directory you specify what to do to
the data being transformed.
In short, always specify the transformation in transformations.json
(or YAML
) and then put your SQL
and Python
files to /transformations/
directory.
SQL files in your source directory are always executed in an alphabetical order. We recommend prefixing the files' names with a fixed-length numeric prefix, such as:
- 00_init_deals.sql
- 10_init_accounts.sql
- 20_process_deals.sql
- 30_process_accounts.sql
Transformations Reference
No matter the warehouse your Bizzflow is running, following are the keys that are always applicable:
Key | Type | Description |
---|---|---|
id | string | required, id for your transformation (must be unique, max 32 characters) |
type | string | required, possible values: sql or docker |
source | string | required, the name of the subdirectory containing transformation files |
out_kex | string | required, name of the kex that will hold transformation output |
input_tables* | array of strings | list of tables (including kex names) to be present in the transformation |
input_kexes* | array of strings | list of kexes to be present in the transformation |
* both input_tables
and input_kexes
may be present. In this case, a union of input_tables
and tables contained
in input_kexes
is loaded into the transformation.
Additionally, there are keys that are applicable only to some of the warehouses currently supported by Bizzflow:
Key | Supported Warehouses | Type | Description |
---|---|---|---|
query_timeout | bigquery , snowflake | integer | how long (seconds) to wait for sql query to execute |
transformation_service_account | bigquery | string | deprecated, name of the service account to create for transformation |
Example 1: A simple SQL transformation
Let’s pretend we already have tables in two kexes:
in_hubspot
accounts
deals
in_mysql
invoices
We want to load tables accounts
and invoices
and transform them in an SQL transformation.
transformations.json
[
{
"id": "example",
"type": "sql",
"source": "example",
"out_kex": "out_example",
"input_tables": ["in_hubspot.accounts"],
"input_kexes": ["in_mysql"]
}
]
Notice that we used both input_tables
and input_kexes
here. We refer to our table accounts
within kex
in_hubspot
by its full name in_hubspot.deals
. Since kex in_mysql
only contains one table, we can load it whole.
The same configuration in YAML
:
transformations.yaml
- id: example
type: sql
source: example
out_kex: out_example
input_tables:
- in_hubspot.accounts
input_kexes:
- in_mysql
The transformation configuration is set, now we need to create the transformation’s content. Create a directory
example
under /transformations/
directory and a single file 00_init.sql
.
/transformations/example/00_init.sql
Let’s create a table that will contain an account name for each invoice based on account’s VAT. We are using BigQuery SQL dialect here, but implementation for any warehouse would be very similar.
CREATE TABLE `tr`.`out_invoices` AS
SELECT
i.*,
a.name as account_name
FROM `tr`.`in_invoices` i
INNER JOIN `tr`.`in_accounts` a ON a.vat_number = i.vat_number
*
in SELECT
statements. We only use *
in examples
so that they are more readable.You may notice quite a few things here:
The tr
namespace
Since BigQuery doesn’t allow selecting a default dataset, you need to specify tr
when refering to any table.
Bizzflow will translate the name tr
to the temporary transformation dataset that the transformation is actually
taking place within.
the in_
prefix
All tables that Bizzflow loads to the temporary transformation dataset for you will be prefixed with in_
prefix,
do not forget that when refering to them.
the out_
prefix
All tables you want Bizzflow to output to the out_kex
kex after the transformation need to have out_
as a prefix.
This allows you to create arbitrary temporary tables without needing to take care of what happens to them.
Example 2: A simple Python transformation
Let’s create a similar solution using Python instead.
transformations.json
[
{
"id": "example",
"type": "docker",
"source": "example",
"out_kex": "out_example",
"input_tables": ["in_hubspot.accounts"],
"input_kexes": ["in_mysql"]
}
]
Notice the only thing that changed here is type
. It is the same in YAML
:
transformations.yaml
- id: example
type: docker
source: example
out_kex: out_example
input_tables:
- in_hubspot.accounts
input_kexes:
- in_mysql
Next, we need to create example
directory under /transformations/
again, but this time we will need to create
two separate files:
/transformations/example/main.py
We will only copy contents of the input table accounts
into output table copied_accounts
.
import csv
with open("/data/in/tables/accounts.csv") as finput:
with open("/data/out/tables/copied_accounts.csv") as foutput:
reader = csv.DictReader(finput, dialect=csv.unix_dialect)
writer = csv.DictWriter(foutput, fieldnames=reader.fieldnames, dialect=csv.unix_dialect)
writer.writeheader()
writer.writerows(reader)
In order to run this Python
transformation, we need to tell Bizzflow how to make it into a container.
/transformations/example/Dockerfile
FROM python:3.9-slim
WORKDIR /code
ADD main.py .
CMD ["python", "-u", "main.py"]
Again, let’s summarize what we have just seen:
Input tables
All input tables are loaded into /data/in/tables/
directory as individual CSV
files conforming to
UNIX dialect.
Output tables
All CSV
files you will put into /data/out/tables/
directory will be loaded into the analytical warehouse
to the kex specified in out_kex
configuration.
Transformation Dockerfile
Since the transformation is actually a Docker container, you may actually use any language that feels right
for the use case. Just always remember to find your input tables in /data/in/tables
and put your output tables
to /data/out/tables
.
Sandbox
If you need to test your SQL
transformation in a safe environment, always use sandboxes. You can create
one using the Sandbox console. You should always dry run your transformation before running it live.