Skip to content

Treat single task_ids in xcom_pull the same as multiple #49692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 26, 2025

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Apr 24, 2025

closes: #49540

Problem

There is no need to handle some cases of xcom retrieval seperately.

Expected behaviour in airflow 2:

  1. xcom_pull(task_ids=["task1"]) should not return a value but return a LazyXComSelectSequence object which was to avoid pulling everything into memory at once (unlike session.execute(...).fetchall()). But this is no longer relevant as we get the xcoms we need to from the api server (using get_one from BaseXcom class).

So we need to have a similar abstraction so that the user code doesn't have to change when they called xcom_pull using single task_id in a list, like ti.xcom_pull(task_ids=["task1"]).

  1. xcom_pull(task_ids="task1") should return a value and not a list of values

  2. xcom_pull with one task_id and one map_indexes should return a value again and not an iterable

Testing

Testing with various combinations of task_ids

DAG used for testing:

from __future__ import annotations

import logging
from datetime import datetime

from airflow.decorators import task

from airflow.models.dag import DAG


log = logging.getLogger(__name__)


with DAG(
    "xcom_test",
    schedule=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example", "sheets"],
) as dag:
    @task
    def xcom_dict():
        return {"a": "b"}

    @task
    def read_xcom(**kwargs):
        xcom_dict_task_value_ids = kwargs["task_instance"].xcom_pull(task_ids=["xcom_dict"], key="return_value")
        print("With task ids in a list", type(xcom_dict_task_value_ids), xcom_dict_task_value_ids)
        print(xcom_dict_task_value_ids[0])

        xcom_dict_task_value_ids1 = kwargs["task_instance"].xcom_pull(task_ids="xcom_dict", key="return_value")
        print("Without a list", type(xcom_dict_task_value_ids1), xcom_dict_task_value_ids1)

        xcom_dict_task_value_ids_2 = kwargs["task_instance"].xcom_pull(task_ids=None, key="return_value")

        print("With None task_ids", type(xcom_dict_task_value_ids_2), xcom_dict_task_value_ids_2)

    xcom_dict = xcom_dict()

    read_xcom_task = read_xcom()

    xcom_dict >> read_xcom_task


This DAG tests three things:

  1. xcom_pull with task_ids as list of one task_id, it should return an Iterable and not value. Verified by accessing using element access
  2. xcom_pull with a single task_id without a list, it should return a value
  3. xcom_pull with NONE task_id, it will return a value again but for the current task instance task_id, which is None.

image

Testing with map_indexes

DAG:

from airflow.decorators import dag, task
from airflow.utils.timezone import datetime


@dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False)
def mapped_xcom_pull_example():
    @task
    def push_value(value):
        return value

    @task
    def collect_values(**context):
        ti = context["ti"]
        r = ti.xcom_pull(task_ids="push_value", map_indexes=[0, 1, 2])
        print("Collected values from mapped task:", r)

        print("Values in results are", r[0], r[1], r[2])

        return r

    push_value.expand(value=["apple", "banana", "cherry"]) >> collect_values()

mapped_xcom_pull_example()

image

image

Observe that each value is pushed as a single value and we get it normally now.

Similar behaviour to AF2:

13167e4798c6
 ▶ Log message source details
[2025-04-25, 15:42:22 IST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2025-04-25, 15:42:22 IST] {logging_mixin.py:190} INFO - Collected values from mapped task: LazySelectSequence([3 items])
[2025-04-25, 15:42:22 IST] {logging_mixin.py:190} INFO - Values in results are apple banana cherry
[2025-04-25, 15:42:22 IST] {python.py:240} INFO - Done. Returned value was: LazySelectSequence([3 items])
[2025-04-25, 15:42:22 IST] {xcom.py:241} WARNING - Coercing mapped lazy proxy return value from task collect_values (DAG mapped_xcom_pull_example, run manual__2025-04-25T10:12:16.005448+00:00) to list, which may degrade performance. Review resource requirements for this operation, and call list() to suppress this message. See Dynamic Task Mapping documentation for more information about lazy proxy objects.
[2025-04-25, 15:42:22 IST] {taskinstance.py:341} ▶ Post task execution logs

We return a list instead of a LazySelectSequence though


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh requested review from ashb and kaxil as code owners April 24, 2025 08:16
@amoghrajesh amoghrajesh added this to the Airflow 3.0.1 milestone Apr 24, 2025
@amoghrajesh
Copy link
Contributor Author

TODO: doc changes and test addition (theres no test that specifically tests this portion)

@kaxil
Copy link
Member

kaxil commented Apr 24, 2025

TODO: doc changes and test addition (theres no test that specifically tests this portion)

Cool, yeah we should remove the entry we added for 3.0.0 https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#behaviour-change-in-xcom-pull -- when this PR is ready like you mentioned

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need this code duplication, the following diff should work:

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index a34a4c78ff..706a72b378 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -325,6 +325,9 @@ class RuntimeTaskInstance(TaskInstance):
         if run_id is None:
             run_id = self.run_id
 
+        single_task_requested = isinstance(task_ids, (str, type(None)))
+        single_map_index_requested = isinstance(map_indexes, (int, type(None), ArgNotSet))
+
         if task_ids is None:
             # default to the current task if not provided
             task_ids = [self.task_id]
@@ -363,7 +366,7 @@ class RuntimeTaskInstance(TaskInstance):
             else:
                 xcoms.append(value)
 
-        if len(xcoms) == 1:
+        if single_task_requested and single_map_index_requested:
             return xcoms[0]
         return xcoms

@amoghrajesh
Copy link
Contributor Author

Umm yeah we can do that, i didnt want to overly simplify it that much, but seemingly it looks good!

@amoghrajesh amoghrajesh requested a review from kaxil April 25, 2025 19:15
@amoghrajesh
Copy link
Contributor Author

@kaxil I updated it, it passes all my tests too, so we are good.

@kaxil
Copy link
Member

kaxil commented Apr 25, 2025

Umm yeah we can do that, i didnt want to overly simplify it that much, but seemingly it looks good!

Cool, yeah the code duplication in my opinion made it look complex and having too follow two entirely separate branches too. This keep the flow same.

@potiuk
Copy link
Member

potiuk commented Apr 25, 2025

Nice!

@kaxil kaxil force-pushed the xcom-single-value-bugfix branch from efc7385 to 55038ed Compare April 26, 2025 09:40
@kaxil
Copy link
Member

kaxil commented Apr 26, 2025

Fixed doc change in 55038ed

@amoghrajesh
Copy link
Contributor Author

Thank you! @kaxil

I didnt get time to do this earlier

@amoghrajesh amoghrajesh added the backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch label Apr 26, 2025
@potiuk potiuk enabled auto-merge (squash) April 26, 2025 12:19
@potiuk potiuk disabled auto-merge April 26, 2025 12:23
@potiuk potiuk enabled auto-merge (squash) April 26, 2025 12:23
@amoghrajesh amoghrajesh disabled auto-merge April 26, 2025 12:50
@amoghrajesh amoghrajesh enabled auto-merge (squash) April 26, 2025 12:50
@amoghrajesh amoghrajesh disabled auto-merge April 26, 2025 12:59
@amoghrajesh amoghrajesh enabled auto-merge (squash) April 26, 2025 13:00
@amoghrajesh amoghrajesh disabled auto-merge April 26, 2025 13:00
@potiuk potiuk merged commit 517b29d into apache:main Apr 26, 2025
71 checks passed
github-actions bot pushed a commit that referenced this pull request Apr 26, 2025
…49692)

* Treat single task_ids in xcom_pull the same as multiple

closes: #49540

* fixup! Treat single task_ids in xcom_pull the same as multiple

---------
(cherry picked from commit 517b29d)

Co-authored-by: Amogh Desai <[email protected]>
Co-authored-by: Kaxil Naik <[email protected]>
Copy link

Backport successfully created: v3-0-test

Status Branch Result
v3-0-test PR Link

github-actions bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Apr 26, 2025
…pache#49692)

* Treat single task_ids in xcom_pull the same as multiple

closes: apache#49540

* fixup! Treat single task_ids in xcom_pull the same as multiple

---------
(cherry picked from commit 517b29d)

Co-authored-by: Amogh Desai <[email protected]>
Co-authored-by: Kaxil Naik <[email protected]>
potiuk pushed a commit that referenced this pull request Apr 26, 2025
…49692) (#49820)

* Treat single task_ids in xcom_pull the same as multiple

closes: #49540

* fixup! Treat single task_ids in xcom_pull the same as multiple

---------
(cherry picked from commit 517b29d)

Co-authored-by: Amogh Desai <[email protected]>
Co-authored-by: Kaxil Naik <[email protected]>
jroachgolf84 pushed a commit to jroachgolf84/airflow that referenced this pull request Apr 30, 2025
* Treat single task_ids in xcom_pull the same as multiple

closes: apache#49540

* fixup! Treat single task_ids in xcom_pull the same as multiple

---------

Co-authored-by: Kaxil Naik <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:task-sdk backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Airflow 3. xcom_pull different behavior between Airflow 2 and 3
3 participants