Dynamic TaskGroup Scalability in Airflow 2.0 - Handle big DAGs
In the previous article I showed you how to instantiate TaskGroup in a Dynamic way. We will now see how we face the challenge of using it at a larger scale. This is not an easy task : you can either do a long script, which works perfectly fine, or try to factorize your code !
You will find below some insight about scaling up DAG instantiation, and a solution to handle it.
Note : This also works on the version of Airflow used by Composer (Google Cloud Platform managed version)
Scaling up : how to face this challenge ?
While scaling up and adding Tasks and TaskGroup to your DAG, you will face some challenges. First we need have a glance at what challenges are coming to you, with simple questions !

Why is it a challenge ?
This not may not be absolutely clear, but you can ask yourself this question. It’s right to think you can just create a 1500 line long DAG to handle your operations. So why is this article relevant ?
The main question we all face while building a DAG is : « How do I keep my dependencies up-to-date, without requiring heavy maintenance or business knowledge ? ».
This is the « Why ? ». A long DAG can be hard to maintain, and you will soon have conflict in your code. Even if you manage to wrap up everything into functions, you will just move your problem.
Another good practice in Airflow is to overload stock Operators with one of your own. I will cover how to properly create a new Operator in another article.

What should I know ?
But first, there is some things you need to apprehend :
- You should never link a Task to a TaskGroup : it can cause some bugs and randomly generate dependencies. Only link Tasks with other Tasks and TaskGroup with TaskGroup ; this is very important.
- Beware of the new name of Task and TaskGroup : it’s the concatenation of all parent elements (« my_bigger_group.my_group.my_task » for example). This changes the « task_id » in the context, but not the « real » task_id.
- « task_id » and « group_id » unicity is fairly complex :
- Task naming unicity is still based on the « task_id » itself, not taking into account your TaskGroup.
- TaskGroup unicity is calculated within its parent group. You can have two TaskGroup named « CALCULATION » if they are located in different parent TaskGroup. It works like groups !
- You can’t have an ID repeated inside a fully qualified task_id or group_id :
- « main_group.stage_1.my_group.stage_1« is a valid combination of group_id (main_group.stage_1.my_group) and task_id (stage_1) because there no conflict between the group_id and the task_id
- « stage_1.finance_group.stage_1.my_task« is not a valid combination of group_id (stage_1.finance_group.stage_1) because there’s a conflict in the hierarchy of groups, having the same name !
- You are not forced to wait for the « main » TaskGroup to finish in order to continue your path. You can cut through nested TaskGroups !!
You will find hereunder a more complex DAG creation showing you those situations.
# Create first main group tg1 = TaskGroup(group_id='group1') # Create subgroups tg11 = TaskGroup(group_id='group11',parent_group=tg1) tg12 = TaskGroup(group_id='group12',parent_group=tg1) # Create second main group tg2 = TaskGroup(group_id='group2') # Create subgroups tg21 = TaskGroup(group_id='group21',parent_group=tg2) tg22 = TaskGroup(group_id='group22',parent_group=tg2) # Create all Tasks task1 = DummyOperator(task_id='task1',task_group=tg11) task2 = DummyOperator(task_id='task2',task_group=tg12) task3 = DummyOperator(task_id='task3',task_group=tg21) task4 = DummyOperator(task_id='task4',task_group=tg22) # Create dependencies tg11 >> tg21 tg12 >> tg22

Here you can see some of my previous warnings :
- I only linked TaskGroup with TaskGroup
- I cut through TaskGroup hierarchy to link my groups in this example
- I made sure not to avoid duplicate naming inside a group hierarchy
- My Task naming unicity is respected
This is « What should I know? ».

How to face it ?
You now why you try to enhance your code and what is coming at you. Now we need to focus on « How to face it ? » !
To answer this using these new TaskGroup, you need some highlights on your upcoming concrete problems :
- how to automate TaskGroup/Tasks/Dependencies generation
- how to store them
- how to apply them
To sum up what we are trying to achieve here, these are some input of some problems you will have to solve in common cases :
- you will have hundreds of Tasks (different kind of Operators) to schedule
- you will have complex dependencies
- Airflow Operator content does not matter, so we will use DummyOperator for our example
- You want to group them in many nested TaskGroup in order to create logical groups
So basically, you have to instantiate hundreds of DummyOperator and link them in some complex way, and put them in the right order so they fit in the right nested order.
Given what is presented in the documentation, it’s going to be a nightmare to maintain.
With the previous method I just presented to you, you can create a dependency tree easily and without any static group creation. The last leverage is how to store and apply tasks and groups easily and dynamically.
The answer to this is JSON !
Automation solution :
With simple tools to populate a JSON, you can create your structure on a configuration stored elsewhere.
Then you’ll need to store them (Tasks and TaskGroups), and you just simply use a dictionnary in Python :
# Create a dictionary groups_lvl1={} # Add an entry for your TaskGroup groups_lvl1.update({'my_group':TaskGroup(group_id='my_group',dag=my_dag)})
which can be upgraded using JSON :
groups={} tasks={} groups_lvl1={"START":{"group_id":"START"},"Main":{"group_id":"Main"}} groups_lvl2={"Main.Action_1":{"group_id":"Action_1","parent":"Main"},"Main.Action_2":{"group_id":"Action_2","parent":"Main"},"Main.Action_3":{"group_id":"Action_3","parent":"Main"}} tasks={"starting_task":{"task_id":"start_task","parent":"START"},"loading_data":{"task_id":"loading_data","parent":"Main.Action_1"},"cleaning_data":{"task_id":"cleaning_data","parent":"Main.Action_1"},"archive_source":{"task_id":"archive_source","parent":"Main.Action_1"},"datawarehouse_loading":{"task_id":"datawarehouse_loading","parent":"Main.Action_2"},"building_dashboards":{"task_id":"building_dashboards","parent":"Main.Action_3"},"mailing_dashboards":{"task_id":"mailing_dashboards","parent":"Main.Action_3"}} links={"cleaning_data":{"type":"task","parent":"loading_data"},"mailing_dashboards":{"type":"task","parent":"building_dashboards"},"Main.Action_1":{"type":"group","parent":"START"},"Main.Action_2":{"type":"group","parent":"Main.Action_1"},"Main.Action_3":{"type":"group","parent":"Main.Action_2"}} ## Creating Lvl1 groups keys=list(groups_lvl1.keys()) values=list(groups_lvl1.values()) for i in keys: groups.update({i:TaskGroup(group_id=values[keys.index(i)]['group_id'],dag=my_dag)}) ## Creating Lvl2 groups keys=list(groups_lvl2.keys()) values=list(groups_lvl2.values()) for j in keys: groups.update({j:TaskGroup(group_id=values[keys.index(j)]['group_id'],parent_group=groups[values[keys.index(j)]['parent']],dag=my_dag)}) ## Creating Tasks keys=list(tasks.keys()) values=list(tasks.values()) for k in keys: tasks.update({k:DummyOperator(task_id=values[keys.index(k)]['task_id'],task_group=groups[values[keys.index(k)]['parent']],dag=my_dag)}) ## Creating Dependencies task_keys=list(tasks.keys()) task_values=list(tasks.values()) task_group_keys=list(groups.keys()) task_group_values=list(groups.values()) keys=list(links.keys()) values=list(links.values()) for l in keys: if values[keys.index(l)]['type'] == "task": task_values[task_keys.index(values[keys.index(l)]['parent'])] >> task_values[task_keys.index(l)] elif values[keys.index(l)]['type'] == "group": task_group_values[task_group_keys.index(values[keys.index(l)]['parent'])] >> task_group_values[task_group_keys.index(l)]

You just created a dynamic DAG in the right order and added your tasks into them ! The piece of software (code, tools or other) is not hard to create and can take a slightly different form to ease production requirements.
You can maintain this JSON with some tools like : Sheets (or Excel), a database, a flat file or a custom program. To synchronize it with the DAG, you can deliver it by copy-pasting your JSON or linking it to a file refreshed frequently.
With this technique, you can maintain very heavy DAGs and dozens of them !!
Conclusion
TaskGroup is powerful tool when it comes to organise things. You can group them as much as you want (but use it wisely), and make your 500 hundred-task DAG look like a charm.
You can easily managed those groups with a configuration file and ease your creation of DAG, instead of writing it all down into a file manually.
A better approach here is to store Task and TaskGroup hierarchy separately and only caring about assembling it in your DAG.
But the knowledge about this feature is not well documented and we had to look for answers in order to make it even more powerful.
While I’m writing this, I’m using my simple tool, using Sheets and a database (any will work) as a storage for my DAG configuration, and use it to easily populate by copy-pasting JSON (automated) around thirty DAGs for a total of about 2000 tasks and more dependencies, and (nearly) not coding a single line of python besides the 130 lines of code and 7000 characters to structure groups and tasks dynamically.
To change a dependency, it takes me around 30 seconds of changing my Sheets, and then the rest of the process is automated.
Currently, my biggest DAG is :
- 217 lines long (static)
- 280k characters (JSON structure)
- 600 Tasks
- 750 TaskGroup
- 1200 Dependency Links
The code is the same between all my DAG, only the JSON changes. So I’m using a template, which is easy to maintain, debug or upgrade !