Skip to content

Workers dead suddenly due to scheduler RPC bad response to workers_to_close #950

Open
@kikemolina3

Description

@kikemolina3

Brief explanation
When I deploy my KubeCluster on Kubernetes using the latest version of Dask for the scheduler, workers, and operator, I enable adaptive scaling. However, when I launch a dummy computation (e.g., using sleep functions), the workers unexpectedly die, causing task loss and preventing the job from completing.

Minimal Complete Verifiable Example
Versions:

  • Client: dask==2025.5.0, distributed=2025.5.0, dask-kubernetes=2025.4.0
  • Worker and scheduler: docker.io/daskdev/dask:2025.5.0-py3.12
  • Operator: ghcr.io/dask/dask-kubernetes-operator:2025.4.3
  • Kubernetes: v1.28.2

Cluster spec to deploy:

---
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: foo-cluster
  namespace: default
spec:
  idleTimeout: 0
  worker:
    replicas: 0
    spec:
      containers:
      - name: worker
        image: docker.io/daskdev/dask:2025.5.0-py3.12
        args:
        - dask-worker
        - "--name"
        - "$(DASK_WORKER_NAME)"
        - "--dashboard"
        - "--dashboard-address"
        - '8788'
        - "--nthreads"
        - "1"
        env:
        - name: "DASK_DISTRIBUTED__LOGGING__DISTRIBUTED"
          value: "debug"
        resources:
          requests:
            memory: "0.3Gi"
            cpu: "0.2"
          limits:
           memory: "0.3Gi"
           cpu: "0.2"
        ports:
        - name: http-dashboard
          containerPort: 8788
          protocol: TCP
  scheduler:
    spec:
      containers:
      - name: scheduler
        image: docker.io/daskdev/dask:2025.5.0-py3.12
        args:
        - dask-scheduler
        - "--host"
        - 0.0.0.0
        env:
        - name: "DASK_DISTRIBUTED__LOGGING__DISTRIBUTED"
          value: "debug"
        ports:
        - name: tcp-comm
          containerPort: 8786
          protocol: TCP
        - name: http-dashboard
          containerPort: 8787
          protocol: TCP
        readinessProbe:
          httpGet:
            port: http-dashboard
            path: "/health"
          initialDelaySeconds: 0
          periodSeconds: 1
          timeoutSeconds: 300
        livenessProbe:
          httpGet:
            port: http-dashboard
            path: "/health"
          initialDelaySeconds: 15
          periodSeconds: 20
    service:
      type: ClusterIP
      selector:
        dask.org/cluster-name: foo-cluster
        dask.org/component: scheduler
      ports:
      - name: tcp-comm
        protocol: TCP
        port: 8786
        targetPort: tcp-comm
      - name: http-dashboard
        protocol: TCP
        port: 8787
        targetPort: http-dashboard

Python script:

import dask.bag as db
from dask_kubernetes.operator import KubeCluster
import time
from dask.distributed import Client
import os

def sleepy(i):
    hostname = os.environ.get("HOSTNAME")
    t0 = time.time()
    print("Going to sleep...")
    time.sleep(60)
    t1 = time.time()
    return (i, hostname, t0, t1)
#
cluster = KubeCluster(name="foo-cluster", namespace="default", custom_cluster_spec="dask.yaml")
client = Client(cluster)

MAP_SIZE = MAX_WORKERS = 50

cluster.adapt(minimum=0, maximum=MAX_WORKERS)

t0 = time.time()
b = db.from_sequence([i for i in range(MAP_SIZE)], npartitions=MAP_SIZE).map(sleepy)
res = client.compute(b).result()
t1 = time.time()

print(f"Time to compute sleepy: {t1 - t0:.2f} seconds")

cluster.close()

How to reproduce

  1. Deploy a Kubernetes cluster (e.g., Minikube)
  2. Install the Dask K8s operator
  3. Apply the provided cluster manifest
  4. Run the Python script that executes sleeping tasks

Observed behavior
Occasionally, when the operator queries the scheduler via RPC (workers_to_close), the response is a tuple instead of a list:

This assert skips RPC response:

assert isinstance(workers_to_close, list)

And this fact triggers the following logic:

logger.warning(
f"Scaling {worker_group_name} failed via the HTTP API and the Dask RPC, falling back to LIFO scaling. "
"This can result in lost data, see https://kubernetes.dask.org/en/latest/operator_troubleshooting.html."
)

Which applies a LIFO policy that mistakenly deletes active workers, interrupting running tasks.

I added a debug line; this is my log about RPC response:

[2025-05-18 20:56:57,883] kopf.objects         [WARNING] [default/foo-cluster-default] Scheduler returned a tuple instead of a list of workers to close: ('foo-cluster-default-worker-95512f34cf')

This behavior is not consistent — sometimes the return type is a list, sometimes a tuple. This seems to be caused by a serialization quirk (maybe in distributed's RPC mechanism).
I don't really know if the problem in on the distributed side, serializing, or other possible sources. But I know that (and also for other workloads, the sleep one was only a trivial example) this behaviour avoid me to successfully execute Dask over k8s cluster.

Expected behavior
The scheduler should always return a list of workers to close. Returning a tuple causes unexpected and fatal behavior in the operator logic.

Workaround
An immediate workaround can be cast the tuple to list. However, this only patches the symptom. Ideally, the root cause should be identified and fixed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions