Orchestrations

Created: May 20, 2021, Updated: March 27, 2023

Orchestrations are a way to schedule and automate data pipeline tasks.

Orchestrations Configuration

Orchestrations are configured within file orchestrations.json (or orchestrations.yaml) that contains arrays of configuration objects describing orchestrations and each of their tasks.

Orchestrations Reference

KeyTypeDescription
idstringrequired, a unique orchestration name (max 33 characters)
schedulestringa crontab notation of a schedule
taskslist of task objectsSee below
notifyboolNotify on error (defaults to true in orchestrations with schedule)

Task Object

Task object describes which component should run and which configuration for the component to use. Every task object must provide type and id keys. While type is an enumeration and has a finite possible values (listed below), id differs between different task types.

Task Type

TypeDescriptionid explanation
extractorrun an extractor configurationextractor id (configuration file name without extension)
transformationrun a transformationtransformation id (as specified in transformations.json)
writerrun a writer configurationwriter id (configuration file name without extension)
datamartrun a datamartdatamart id (as specified in datamarts.json)
orchestrationtrigger another orchestration and wait for it to completeorchestration id (as specified in orchestrations.json) See Orchestration Chaining
groupa group of taskssee below
dag(added in 2.4.2) trigger another custom DAG and wait for it to completedag id as specified when initiating Airflow DAG class

Common Reference

Following are the keys that are applicable for all orchestration task objects.

KeyTypeDescription
typestringrequired, see above
idstringrequired, see above
continue_on_errorboolif true, this task’s failure will not stop the orchestration, (default: false)
notifyboolif set, it will override orchestration’s notify settings for this specific task. See notify parameter. (default: null)

Task groups

It’s quite often that you do not need to wait until previous task finishes before starting the next one, but instead of that you would like to run group tasks in parallel i.e. run all extractors at once. For such cases there is a task group.

Task group is actually a task itself. No standard parameters such as id or continue_on_error are allowed, instead of them there is another required key named tasks. tasks is an array of task objects with the same structure as the orchestration tasks, except that a task group cannot contain another task group definition, just pure tasks.

All tasks in a group run in parallel, until the maximum concurrent tasks count is reached - it can be set in Airflow settings (defaults to 32). This number limits tasks count across all run tasks in all DAGs at a time.

Two tasks with the same id cannot run multiple times at once, so even if the task is a part of another orchestration, or the orchestration runs multiple times at the same time, you do not need to worry about conflicts. Tasks with the conflicting id will wait until the previous finish.

Orchestration Chaining

Orchestration chaining is a way to include an orchestration as a task within another orchestration. This way, you can achieve a more understandable and cleaner project. Imagine a situation, where you have a common data for multiple transformations with multiple outputs. At one point or another, you would find yourself thinking:

How do I make sure my data is ready when running this orchestration without the need to run extractors and transformations multiple times?

If you include an orchestration in another orchestration’s tasks, Bizzflow will first look, whether the dependant has already run within specified timeframe and will run the dependant only if it hasn’t.

Following are the keys specific for task type = orchestration:

KeyTypeDescription
data_ageintegerrequired, maximum amount of seconds the dependant orchestration must have run before in order not to run it again
timeoutintegerhow many seconds to wait for the dependant orchestration to finish before failing (default: 21,600 (6 hours))
poke_intervalintegerhow often (in seconds) to check whether the dependant orchestration finished. Unless you have a super fast orchestration (in which case you would want this number to be lower), you can keep this in default (default: 600 (10 minutes))
dependency_modestringmust be eitherpoke or reschedule, best kept default. (default: reschedule)

notify parameter

When task fails, you can get notification via email set in project’s main configuration. There are two levels of notifications - warning and error - difference is only in text in the e-mail notification’s message. Warning level is triggered whenever an orchestration task with continue_on_error enabled fails, all other failures are sent as error.

By default, all tasks in orchestration trigger notification, if not set differently. Tasks triggered manually (not as a part of orchestration) do not send notification on failure. Again, this behavior can be changed.

There are two ways to change default notifications' behaviour. First is by setting a parameter "notify": true/false. This parameter can stand for orchestration settings as well as for an individual task in orchestration. Downstream takes precedence over upstream - if orchestration is set not to trigger notification and a single task is, the task triggers notification. If orchestration is set to trigger notification, and a single task is set not to, the task does not trigger notification.

Second option is to override settings using a DAG trigger config (can be set when triggering DAG from within Airflow UI). DAG trigger config always takes precedence over everything. Even if both orchestration and task are set to trigger notification but DAG was triggered with {"notify": false}, the notification will not be sent. With this option you can also send notification for and individual task, if needed.

Example: A tandem of dependent orchestrations

Let’s put all we know together now.

orchestrations.json

[
  {
    "id": "sales",
    "notify": true,
    "tasks": [
      {
        "type": "group",
        "tasks": [
          {
            "type": "extractor",
            "id": "hubspot"
          },
          {
            "type": "extractor",
            "id": "db",
            // Ignore failures and do not send notification
            "continue_on_error": true,
            "notify": false
          }
        ]
      },
      {
        "type": "transformation",
        "id": "sales"
      }
    ]
  },
  {
    "id": "main",
    "notify": true,
    "tasks": [
      {
        "type": "orchestration",
        // trigger "sales" orchestration defined above only if it hasn't
        // run successfully in the last 10 hours
        "id": "sales",
        "data_age": 36000
      },
      {
        "type": "extractor",
        "id": "erp"
      },
      {
        "type": "transformation",
        "id": "main"
      },
      {
        "type": "dag",
        // it is necessary to specify the whole DAG name (visible in Airflow) as id
        "id": "XX_custom_dag",
        "data_age": 600
      },
    ]
  }
]

The same example in YAML format:

orchestrations.yaml

- id: sales
  notify: true
  tasks:
    - type: group
      tasks:
        - type: extractor
          id: hubspot
        - type: extractor
          id: db
          continue_on_error: true
          notify: false
    - type: transformation
      id: sales
- id: main
  notify: true
  tasks:
    - type: orchestration
      id: sales
      data_age: 36000
    - type: extractor
      id: erp
    - type: transformation
      id: main
    - type: dag
      id: XX_custom_dag
      data_age: 600