Dynamic TaskGroup Scalability in Airflow 2.0 - Handle big DAGs

In the previous article I showed you how to instantiate TaskGroup in a Dynamic way. We will now see how we face the challenge of using it at a larger scale. This is not an easy task : you can either do a long script, which works perfectly fine, or try to factorize your code !

You will find below some insight about scaling up DAG instantiation, and a solution to handle it.

Note : This also works on the version of Airflow used by Composer (Google Cloud Platform managed version)

Scaling up : how to face this challenge ?

While scaling up and adding Tasks and TaskGroup to your DAG, you will face some challenges. First we need have a glance at what challenges are coming to you, with simple questions !

scalable-icon

Why is it a challenge ?

This not may not be absolutely clear, but you can ask yourself this question. It’s right to think you can just create a 1500 line long DAG to handle your operations. So why is this article relevant ?

The main question we all face while building a DAG is : « How do I keep my dependencies up-to-date, without requiring heavy maintenance or business knowledge ? ».

This is the « Why ? ». A long DAG can be hard to maintain, and you will soon have conflict in your code. Even if you manage to wrap up everything into functions, you will just move your problem.

Another good practice in Airflow is to overload stock Operators with one of your own. I will cover how to properly create a new Operator in another article.

complexity

What should I know ?

But first, there is some things you need to apprehend :

  • You should never link a Task to a TaskGroup : it can cause some bugs and randomly generate dependencies. Only link Tasks with other Tasks and TaskGroup with TaskGroup ; this is very important.
  • Beware of the new name of Task and TaskGroup : it’s the concatenation of all parent elements (« my_bigger_group.my_group.my_task » for example). This changes the « task_id » in the context, but not the « real » task_id.
  • « task_id » and « group_id » unicity is fairly complex :
    • Task naming unicity is still based on the « task_id » itself, not taking into account your TaskGroup.
    • TaskGroup unicity is calculated within its parent group. You can have two TaskGroup named « CALCULATION » if they are located in different parent TaskGroup. It works like groups !
    • You can’t have an ID repeated inside a fully qualified task_id or group_id :
      • « main_group.stage_1.my_group.stage_1«  is a valid combination of group_id (main_group.stage_1.my_group) and task_id (stage_1) because there no conflict between the group_id and the task_id
      • « stage_1.finance_group.stage_1.my_task«  is not a valid combination of group_id (stage_1.finance_group.stage_1) because there’s a conflict in the hierarchy of groups, having the same name !
  • You are not forced to wait for the « main » TaskGroup to finish in order to continue your path. You can cut through nested TaskGroups !!

You will find hereunder a more complex DAG creation showing you those situations.

# Create first main group
tg1 = TaskGroup(group_id='group1')
# Create subgroups
tg11 = TaskGroup(group_id='group11',parent_group=tg1)
tg12 = TaskGroup(group_id='group12',parent_group=tg1)

# Create second main group
tg2 = TaskGroup(group_id='group2')
# Create subgroups
tg21 = TaskGroup(group_id='group21',parent_group=tg2)
tg22 = TaskGroup(group_id='group22',parent_group=tg2)

# Create all Tasks
task1 = DummyOperator(task_id='task1',task_group=tg11)
task2 = DummyOperator(task_id='task2',task_group=tg12)
task3 = DummyOperator(task_id='task3',task_group=tg21)
task4 = DummyOperator(task_id='task4',task_group=tg22)

# Create dependencies
tg11 >> tg21
tg12 >> tg22

Here you can see some of my previous warnings :

  • I only linked TaskGroup with TaskGroup
  • I cut through TaskGroup hierarchy to link my groups in this example
  • I made sure not to avoid duplicate naming inside a group hierarchy
  • My Task naming unicity is respected

This is « What should I know? ».

facing challenge

How to face it ?

You now why you try to enhance your code and what is coming at you. Now we need to focus on « How to face it ? » !

To answer this using these new TaskGroup, you need some highlights on your upcoming concrete problems :

  • how to automate TaskGroup/Tasks/Dependencies generation
  • how to store them
  • how to apply them

To sum up what we are trying to achieve here, these are some input of some problems you will have to solve in common cases :

  • you will have hundreds of Tasks (different kind of Operators) to schedule
  • you will have complex dependencies
  • Airflow Operator content does not matter, so we will use DummyOperator for our example
  • You want to group them in many nested TaskGroup in order to create logical groups

So basically, you have to instantiate hundreds of DummyOperator and link them in some complex way, and put them in the right order so they fit in the right nested order.

Given what is presented in the documentation, it’s going to be a nightmare to maintain.

With the previous method I just presented to you, you can create a dependency tree easily and without any static group creation. The last leverage is how to store and apply tasks and groups easily and dynamically.

The answer to this is JSON !

Automation solution :

With simple tools to populate a JSON, you can create your structure on a configuration stored elsewhere.
Then you’ll need to store them (Tasks and TaskGroups), and you just simply use a dictionnary in Python :

# Create a dictionary
groups_lvl1={}

# Add an entry for your TaskGroup
groups_lvl1.update({'my_group':TaskGroup(group_id='my_group',dag=my_dag)})

which can be upgraded using JSON :

groups={}
tasks={}

groups_lvl1={"START":{"group_id":"START"},"Main":{"group_id":"Main"}}
groups_lvl2={"Main.Action_1":{"group_id":"Action_1","parent":"Main"},"Main.Action_2":{"group_id":"Action_2","parent":"Main"},"Main.Action_3":{"group_id":"Action_3","parent":"Main"}}
tasks={"starting_task":{"task_id":"start_task","parent":"START"},"loading_data":{"task_id":"loading_data","parent":"Main.Action_1"},"cleaning_data":{"task_id":"cleaning_data","parent":"Main.Action_1"},"archive_source":{"task_id":"archive_source","parent":"Main.Action_1"},"datawarehouse_loading":{"task_id":"datawarehouse_loading","parent":"Main.Action_2"},"building_dashboards":{"task_id":"building_dashboards","parent":"Main.Action_3"},"mailing_dashboards":{"task_id":"mailing_dashboards","parent":"Main.Action_3"}}
links={"cleaning_data":{"type":"task","parent":"loading_data"},"mailing_dashboards":{"type":"task","parent":"building_dashboards"},"Main.Action_1":{"type":"group","parent":"START"},"Main.Action_2":{"type":"group","parent":"Main.Action_1"},"Main.Action_3":{"type":"group","parent":"Main.Action_2"}}


## Creating Lvl1 groups
keys=list(groups_lvl1.keys())
values=list(groups_lvl1.values())
for i in keys:
    groups.update({i:TaskGroup(group_id=values[keys.index(i)]['group_id'],dag=my_dag)})

## Creating Lvl2 groups
keys=list(groups_lvl2.keys())
values=list(groups_lvl2.values())
for j in keys:
    groups.update({j:TaskGroup(group_id=values[keys.index(j)]['group_id'],parent_group=groups[values[keys.index(j)]['parent']],dag=my_dag)})
    

## Creating Tasks
keys=list(tasks.keys())
values=list(tasks.values())
for k in keys:
    tasks.update({k:DummyOperator(task_id=values[keys.index(k)]['task_id'],task_group=groups[values[keys.index(k)]['parent']],dag=my_dag)})


## Creating Dependencies


task_keys=list(tasks.keys())
task_values=list(tasks.values())

task_group_keys=list(groups.keys())
task_group_values=list(groups.values())

keys=list(links.keys())
values=list(links.values())

for l in keys:
    if values[keys.index(l)]['type'] == "task":
        task_values[task_keys.index(values[keys.index(l)]['parent'])] >> task_values[task_keys.index(l)]
    elif values[keys.index(l)]['type'] == "group":
        task_group_values[task_group_keys.index(values[keys.index(l)]['parent'])] >> task_group_values[task_group_keys.index(l)]

You just created a dynamic DAG in the right order and added your tasks into them ! The piece of software (code, tools or other) is not hard to create and can take a slightly different form to ease production requirements.

You can maintain this JSON with some tools like : Sheets (or Excel), a database, a flat file or a custom program. To synchronize it with the DAG, you can deliver it by copy-pasting your JSON or linking it to a file refreshed frequently.

 

With this technique, you can maintain very heavy DAGs and dozens of them !!

Conclusion

TaskGroup is powerful tool when it comes to organise things. You can group them as much as you want (but use it wisely), and make your 500 hundred-task DAG look like a charm.
You can easily managed those groups with a configuration file and ease your creation of DAG, instead of writing it all down into a file manually.

A better approach here is to store Task and TaskGroup hierarchy separately and only caring about assembling it in your DAG.

But the knowledge about this feature is not well documented and we had to look for answers in order to make it even more powerful.
While I’m writing this, I’m using my simple tool, using Sheets and a database (any will work) as a storage for my DAG configuration, and use it to easily populate by copy-pasting JSON (automated) around thirty DAGs for a total of about 2000 tasks and more dependencies, and (nearly) not coding a single line of python besides the 130 lines of code and 7000 characters to structure groups and tasks dynamically.

To change a dependency, it takes me around 30 seconds of changing my Sheets, and then the rest of the process is automated.

Currently, my biggest DAG is :

  • 217 lines long (static)
  • 280k characters (JSON structure)
  • 600 Tasks
  • 750 TaskGroup
  • 1200 Dependency Links

The code is the same between all my DAG, only the JSON changes. So I’m using a template, which is easy to maintain, debug or upgrade !