Orchestrations
Created: May 20, 2021, Updated: April 17, 2024
Orchestrations are a way to schedule and automate data pipeline tasks.
Orchestrations Configuration
Orchestrations are configured within file orchestrations.json
(or orchestrations.yaml
) that contains arrays
of configuration objects describing orchestrations and each of their tasks.
Orchestrations Reference
Key | Type | Description |
---|---|---|
id | string | required, a unique orchestration name (max 33 characters) |
schedule | string | See below |
tasks | list of task objects | See below |
notify | bool | Notify on error (defaults to true in orchestrations with schedule ) |
Scheduling
Cron notation
The schedule
key is a string with a crontab notation of a schedule.
It is used to schedule the orchestration to run periodically.
If the schedule
key is not present, the orchestration will not be scheduled and can only be triggered manually.
Airflow Dataset
The schedule
key can also be a JSON object with lists of names of datasets needed to be refreshed before the orchestration runs.
This allows to run the orchestration when all required datasets are ready. For more information see Airflow dataset.
All used datasets must be defined in datasets config.
"schedule": {
"datasets": [
"sales-data",
"marketing-data"
]
}
Task Object
Task object describes which component should run and which configuration for the component to use.
Every task object must provide type
and id
keys. While type
is an enumeration and has a finite possible
values (listed below), id
differs between different task types.
Task Type
Type | Description | id explanation |
---|---|---|
extractor | run an extractor configuration | extractor id (configuration file name without extension) |
transformation | run a transformation | transformation id (as specified in transformations.json ) |
writer | run a writer configuration | writer id (configuration file name without extension) |
datamart | run a datamart | datamart id (as specified in datamarts.json ) |
notification | (added in 2.6.0) run a notification | notification id (as specified in notifications.json ) |
dataset | (added in 2.6.0) refresh a dataset | dataset id (as specified in datasets.json ) |
orchestration | trigger another orchestration and wait for it to complete | orchestration id (as specified in orchestrations.json ) See Orchestration Chaining |
group | a group of tasks | see below |
dag | (added in 2.4.2 ) trigger another custom DAG and wait for it to complete | dag id as specified when initiating Airflow DAG class |
Common Reference
Following are the keys that are applicable for all orchestration task objects.
Key | Type | Description |
---|---|---|
type | string | required, see above |
id | string | required, see above |
continue_on_error | bool | if true , this task’s failure will not stop the orchestration, (default: false ) |
notify | bool | if set, it will override orchestration’s notify settings for this specific task. See notify parameter. (default: null ) |
Task groups
It’s quite often that you do not need to wait until previous task finishes before starting the next one, but instead of that you would like to run group tasks in parallel i.e. run all extractors at once. For such cases there is a task group.
Task group is actually a task itself. No standard parameters such as id
or continue_on_error
are allowed, instead of them there is another required key named tasks
. tasks
is an array of task objects with the same structure as the orchestration tasks, except that a task group cannot contain another task group definition, just pure tasks.
All tasks in a group run in parallel, until the maximum concurrent tasks count is reached - it can be set in Airflow settings (defaults to 32). This number limits tasks count across all run tasks in all DAGs at a time.
Two tasks with the same id
cannot run multiple times at once, so even if the task is a part of another orchestration, or the orchestration runs multiple times at the same time, you do not need to worry about conflicts. Tasks with the conflicting id
will wait until the previous finish.
Orchestration Chaining
Orchestration chaining is a way to include an orchestration as a task within another orchestration. This way, you can achieve a more understandable and cleaner project. Imagine a situation, where you have a common data for multiple transformations with multiple outputs. At one point or another, you would find yourself thinking:
How do I make sure my data is ready when running this orchestration without the need to run extractors and transformations multiple times?
If you include an orchestration in another orchestration’s tasks, Bizzflow will first look, whether the dependant has already run within specified timeframe and will run the dependant only if it hasn’t.
Following are the keys specific for task type
= orchestration
:
Key | Type | Description |
---|---|---|
data_age | integer | required, maximum amount of seconds the dependant orchestration must have run before in order not to run it again |
timeout | integer | how many seconds to wait for the dependant orchestration to finish before failing (default: 21,600 (6 hours) ) |
poke_interval | integer | how often (in seconds) to check whether the dependant orchestration finished. Unless you have a super fast orchestration (in which case you would want this number to be lower), you can keep this in default (default: 600 (10 minutes) ) |
dependency_mode | string | must be eitherpoke or reschedule , best kept default. (default: reschedule ) |
notify
parameter
When task fails, you can get notification via email set in project’s main configuration. There are two levels
of notifications - warning
and error
- difference is only in text in the e-mail notification’s message.
Warning level is triggered whenever an orchestration task with continue_on_error
enabled fails, all other failures
are sent as error.
By default, all tasks in orchestration trigger notification, if not set differently. Tasks triggered manually (not as a part of orchestration) do not send notification on failure. Again, this behavior can be changed.
There are two ways to change default notifications' behaviour. First is by setting a parameter "notify": true/false
.
This parameter can stand for orchestration settings as well as for an individual task in orchestration. Downstream
takes precedence over upstream - if orchestration is set not to trigger notification and a single task is, the task
triggers notification. If orchestration is set to trigger notification, and a single task is set not to, the task
does not trigger notification.
Second option is to override settings using a DAG trigger config (can be set when triggering DAG from within Airflow
UI). DAG trigger config always takes precedence over everything. Even if both orchestration and task are set to
trigger notification but DAG was triggered with {"notify": false}
, the notification will not be sent. With this
option you can also send notification for and individual task, if needed.
Example: A tandem of dependent orchestrations
Let’s put all we know together now.
orchestrations.json
[
{
"id": "sales",
"notify": true,
"tasks": [
{
"type": "group",
"tasks": [
{
"type": "extractor",
"id": "hubspot"
},
{
"type": "extractor",
"id": "db",
// Ignore failures and do not send notification
"continue_on_error": true,
"notify": false
}
]
},
{
"type": "transformation",
"id": "sales"
}
]
},
{
"id": "main",
"notify": true,
"tasks": [
{
"type": "orchestration",
// trigger "sales" orchestration defined above only if it hasn't
// run successfully in the last 10 hours
"id": "sales",
"data_age": 36000
},
{
"type": "extractor",
"id": "erp"
},
{
"type": "transformation",
"id": "main"
},
{
"type": "dag",
// it is necessary to specify the whole DAG name (visible in Airflow) as id
"id": "XX_custom_dag",
"data_age": 600
},
]
}
]
The same example in YAML
format:
orchestrations.yaml
- id: sales
notify: true
tasks:
- type: group
tasks:
- type: extractor
id: hubspot
- type: extractor
id: db
continue_on_error: true
notify: false
- type: transformation
id: sales
- id: main
notify: true
tasks:
- type: orchestration
id: sales
data_age: 36000
- type: extractor
id: erp
- type: transformation
id: main
- type: dag
id: XX_custom_dag
data_age: 600