TaskGroup feature in Airflow 2.0 - Dynamic creation

In this article we will uncover a way to use Airflow new feature called TaskGroup which allow you to manage your dependencies in a dynamic way. Many articles are showing you how to use them in a static way. This article is all about flexible DAG creation, using the full feature capabilities, allowing you to create dynamic TaskGroup in Airflow.

This feature comes with a great deal of opportunities, but doesn’t scale well if you only read the official documentation. This approach gives you leverage to automate DAG creation, by using Airflow internal way of linking Task and TaskGroup.

Note : This also works on the version of Airflow used by Composer (Google Cloud Platform managed version)

The TaskGroup feature

Airflow TaskGroup official usage :

This feature is not really a code feature, it’s more of an alias, even if you have to code a little extra.
How it works is quite simple : it wraps up your Tasks into TaskGroups, in order to ease your declaration of dependencies.

This example is the official way to declare Task and TaskGroup according to the documentation :

# Declare a starting Task
start_task = DummyOperator(task_id='start_task')

# Create a TaskGroup with 5 Tasks
with TaskGroup(group_id='group1') as group1:
    task1 = DummyOperator(task_id='task1')
    task2 = DummyOperator(task_id='task2')
    task3 = DummyOperator(task_id='task3')
    task4 = DummyOperator(task_id='task4')
    task5 = DummyOperator(task_id='task5')

# Create the dependency
start_task >> group1

You have 6 tasks here, but you only have to declare 1 dependency ! Nearly all your Tasks belong to TaskGroup « group1 ».

You would have needed this without TaskGroup feature :

# Declare a starting Task
start_task = DummyOperator(task_id='start_task')

# Create 5 Tasks 
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task3 = DummyOperator(task_id='task3')
task4 = DummyOperator(task_id='task4')
task5 = DummyOperator(task_id='task5')

# Declare relationships without TaskGroup
start_task >> [task1, task2, task3, task4, task5]

That’s not heavy, but TaskGroup gives a great advantage at code-factoring ! This does not look like much but it helps keep your code and DAG clean !

The main goal here for Airflow is to replace SubDAG by declaring a so-called TaskGroup. They give you the opportunity to logically group your Tasks in a easier way.

Nested TakskGroup :

If you going deeper in this TaskGroup creation, you’ll encounter some difficulties.
First catch : you can’t create a Task or TaskGroup without being inside the « with » keyword ! And thus, you can call all your variables « task » which can cause unclear coding (like in the example in the documentation).

At least it’s not documented properly ! What the documentation tells you is to do this :

# Create a TaskGroup as the main group
with TaskGroup(group_id='main_group') as main_group:
    # Create a starting Task
    start_task = DummyOperator(task_id='start_task')
    # Create a nested group inside the main group and create 5 Task inside
    with TaskGroup(group_id='group1') as group1:
        task1 = DummyOperator(task_id='task1')
        task2 = DummyOperator(task_id='task2')
        task3 = DummyOperator(task_id='task3')
        task4 = DummyOperator(task_id='task4')
        task5 = DummyOperator(task_id='task5')

# Declare dependencies
start_task >> group1

If you look for solutions to build a dynamic grouping structure over the Internet, you will basically find nothing. The best solution I found was this one : some heavy and complex piece of code like this one : Stackoverflow solution
This is not the right way to do it for creating a structure like this one !

How to do it : GitHub gives the answer !

If you want to create dynamic TaskGroup in Airflow, the solution lies deep inside the GitHub Airflow repository, where the code behind TaskGroup is written.
The « with » keyword is pretty static and not scalable. Instead you should use the parameter « task_group » to define the group containing your Task.

# Create a TaskGroup
group = TaskGroup(group_id='my_group')

# Assign the TaskGroup to the Task
task = DummyOperator(task_id='my_task', task_group=group)

It’s as simple as this, and you don’t need any order or complex code (just creating the group before you create your task). The « task_group » argument is acting like a pointer. It must be a TaskGroup object.

The equivalent to add a TaskGroup to an existing TaskGroup (nesting them), is the keyword « parent_group ».

# Create a top TaskGroup
group1 = TaskGroup(group_id='my_bigger_group')

# Create another TaskGroup and assign it to the previous one
group2 = TaskGroup(group_id='my_group', parent_group=group1)

# Create a Task and it to the last TaskGroup
task = DummyOperator(task_id='my_task', task_group=group2)

Here you have a TaskGroup, which contains another TaskGroup, which itself contains a task. You’ve done it, and create a proper dynamic TaskGroup that you can create and assign when you want.

This is proper way to create a hierarchy of Tasks and TaskGroup.

But now, as you may expect, you will face some problems if you want to do this at a larger scale ! We will cover this in the next article !!

Conclusion

TaskGroup is powerful tool when it comes to organise things. You can use them as logic units to run Tasks and avoid complex graph with the UI.

But it’s not easy to maintain if you follow directly what’s indicated in the documentation and you will quickly be slowed down by your maintenance. Dynamic Taskgroup creation allows you to organize your code properly and avoid complex indentation.

In the next article, we will see how to scale up to a more common DAG size (dozen or hundreds of Tasks). Even if you don’t have that much Tasks, it’s interesting to see how you can handle Task creation (and TaskGroup creation).