Link Search Menu Expand Document

Dask

Dask is created to enhance Python performance by using distributed computing. To do so, Dask offers a list of libraries that mimics the popular data science tools such as numpy and pandas. For instance, Dask arrays organize Numpy arrays, break them into chunks to complete the computation process in a parallel fashion. As a result, large dataset can be processed using multiple nodes as opposed to the typical single node resource. This article describes the behavioural outcome in terms of speed and CPU utilization when using Dask arrays with different chunk sizes.

The following experiments are carried out using Jupyterlab notebook in Cloudera Machine Learning (CML) on the Kubernetes platform powered by Openshift 4.8 with the hardware specification as described below. Here’s the link to download the complete notebook.

CPUIntel(R) Xeon(R) Gold 5220R CPU @ 2.20GHz
MemoryDIMM DDR4 Synchronous Registered (Buffered) 2933 MHz (0.3 ns)
DiskSSD P4610 1.6TB SFF

Dask with Single Worker Pod

  • Create a Jupyterlab session with 2 CPU/16 GiB memory profile.

  • Create a simple dask array with 100000 chunks.
import dask.array as da
arrayshape = (200000, 200000)
chunksize = (2000, 2000)
x = da.ones(arrayshape, chunks=chunksize)
x
Array Chunk
Bytes 298.02 GiB 30.52 MiB
Shape (200000, 200000) (2000, 2000)
Count 10000 Tasks 10000 Chunks
Type float64 numpy.ndarray
200000 200000
  • Add the values of the array and take note of the completion time.
from dask.diagnostics import ProgressBar
big_calc = (x * x[::-1, ::-1]).sum()
with ProgressBar():
    result = big_calc.compute()
print(f"Total size: {result}")
[########################################] | 100% Completed | 42.8s
Total size: 40000000000.0
  • Restart the kernel. Create the same array shape with smaller number of chunks.
# Restart kernel
# Take shorter duration to complete with smaller number (400) of chunks.

import dask.array as da
arrayshape = (200000, 200000)
chunksize = (10000, 10000)
x = da.ones(arrayshape, chunks=chunksize)
x
Array Chunk
Bytes 298.02 GiB 762.94 MiB
Shape (200000, 200000) (10000, 10000)
Count 400 Tasks 400 Chunks
Type float64 numpy.ndarray
200000 200000
  • Add the values of the array and take note of the completion time.
from dask.diagnostics import ProgressBar
big_calc = (x * x[::-1, ::-1]).sum()
with ProgressBar():
    result = big_calc.compute()
print(f"Total size: {result}")
[########################################] | 100% Completed | 26.9s
Total size: 40000000000.0
  • By default, Dask does automatic chunking. Find out the chunk size created by the system.
# Restart kernel
# Allow automatic assignment of chunks. The system assigns 2500 tasks in this example.

import dask.array as da
arrayshape = (200000, 200000)
x = da.ones(arrayshape)
x
Array Chunk
Bytes 298.02 GiB 122.07 MiB
Shape (200000, 200000) (4000, 4000)
Count 2500 Tasks 2500 Chunks
Type float64 numpy.ndarray
200000 200000
  • Add the values of the array based on the chunk size created by Dask. Take note of the completion time.
from dask.diagnostics import ProgressBar
big_calc = (x * x[::-1, ::-1]).sum()
with ProgressBar():
    result = big_calc.compute()
print(f"Total size: {result}")
[########################################] | 100% Completed | 35.2s
Total size: 40000000000.0
  • Restart the kernel. This time, create the same array shape with higher chunk size.
import dask.array as da
arrayshape = (200000, 200000)
chunksize = (20000, 20000)
x = da.ones(arrayshape, chunks=chunksize)
x
Array Chunk
Bytes 298.02 GiB 2.98 GiB
Shape (200000, 200000) (20000, 20000)
Count 100 Tasks 100 Chunks
Type float64 numpy.ndarray
200000 200000
  • Perform the same computation but the the kernel crashes due to insufficient memory (more than 16G is needed for such chunk size).

from dask.diagnostics import ProgressBar
big_calc = (x * x[::-1, ::-1]).sum()
with ProgressBar():
    result = big_calc.compute()
print(f"Total size: {result}")
[###                                     ] | 9% Completed |  1.0s
  • Restart the kernel. Create the same array shape and allow Dask to assign the chunk size automatically.
import dask.array as da
arrayshape = (200000, 200000)
x = da.ones(arrayshape)
x
Array Chunk
Bytes 298.02 GiB 122.07 MiB
Shape (200000, 200000) (4000, 4000)
Count 2500 Tasks 2500 Chunks
Type float64 numpy.ndarray
200000 200000
  • Automatic assignment of chunks by the system allows computation to complete without encountering insufficient memory problem.
from dask.diagnostics import ProgressBar
big_calc = (x * x[::-1, ::-1]).sum()
with ProgressBar():
    result = big_calc.compute()
print(f"Total size: {result}")
[########################################] | 100% Completed | 35.8s
Total size: 40000000000.0
  • Restart the kernel. Create a distributed Dask with task scheduler pod.
import os
import time
import cdsw
import dask

dask_scheduler = cdsw.launch_workers(
    n=1,
    cpu=2,
    memory=2,
    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090",
)

# Wait for the scheduler to start.
time.sleep(10)
  • Obtain the Dask URL and access the DASK UI portal.
print("//".join(dask_scheduler[0]["app_url"].split("//")))
http://zusu97y7pconcrlc.ml-76cf996d-8f4.apps.apps.ocp4.cdpkvm.cldr/
  • Take note of the Dask scheduler URL.
scheduler_workers = cdsw.list_workers()
scheduler_id = dask_scheduler[0]["id"]
scheduler_ip = [
    worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id
][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"
scheduler_url

'tcp://10.254.0.98:8786'

  • Open the terminal of the CML pod and start the dask-worker command connecting the above Dask scheduler.

  • Verify that 1 worker node is currently attached to the Dask cluster.
from dask.distributed import Client
client = Client(scheduler_url)
client

Client

Client-52dd6bb3-545b-11ed-8245-0a580afe03f4

Connection method: Direct
Dashboard: http://10.254.0.39:8090/status

Scheduler Info

Scheduler

Scheduler-fb942c81-40ea-48f1-a0fa-02aaee940ac1

Comm: tcp://10.254.0.39:8786 Workers: 1
Dashboard: http://10.254.0.39:8090/status Total threads: 16
Started: 2 minutes ago Total memory: 14.81 GiB

Workers

Worker: tcp://10.254.3.244:46363

Comm: tcp://10.254.3.244:46363 Total threads: 16
Dashboard: http://10.254.3.244:39351/status Memory: 14.81 GiB
Nanny: tcp://10.254.3.244:37459
Local directory: /home/cdsw/dask-worker-space/worker-ndw3ef2a
GPU: NVIDIA A100-PCIE-40GB GPU memory: 39.59 GiB
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 8.0% Last seen: Just now
Memory usage: 130.37 MiB Spilled bytes: 0 B
Read bytes: 33.51 kiB Write bytes: 48.61 kiB
  • Dask UI shows that 1 worker node is currently connected to the Dask cluster.

  • Openshift dashboard depicts 1 Dask scheduler pod and 1 Dask worker pod (CML session pod) are currently up and running.

  • Create the Dask array without specifying the chunk size.
import dask.array as da
arrayshape = (200000, 200000)
x = da.ones(arrayshape)
x
Array Chunk
Bytes 298.02 GiB 122.07 MiB
Shape (200000, 200000) (4000, 4000)
Count 2500 Tasks 2500 Chunks
Type float64 numpy.ndarray
200000 200000
  • Run the same array sum() computation and check the completion time.
%%time
big_calc = (x * x[::-1, ::-1]).sum()
result = big_calc.compute()
print(f"Total size: {result}")
Total size: 40000000000.0
CPU times: user 2.17 s, sys: 641 ms, total: 2.81 s
Wall time: 9min 3s
  • Dask UI displays the execution of the tasks.

  • Take note of the CPU utilization (using bpytop) when the above computation takes place.

  • Dask UI is unable to depict the graph due to too many Dask tasks/chunks in place.

  • Next, create the same array shape with higher chunk size.
import dask.array as da
arrayshape = (200000, 200000)
chunksize = (10000, 10000)
x = da.ones(arrayshape, chunks=chunksize)
x
Array Chunk
Bytes 298.02 GiB 762.94 MiB
Shape (200000, 200000) (10000, 10000)
Count 400 Tasks 400 Chunks
Type float64 numpy.ndarray
200000 200000
  • Run the same sum() computation and check the completion time.
%%time
big_calc = (x * x[::-1, ::-1]).sum()
result = big_calc.compute()
print(f"Total size: {result}")
Total size: 40000000000.0
CPU times: user 685 ms, sys: 253 ms, total: 937 ms
Wall time: 3min 55s
  • Time time, Dask UI is able to depict the graph with higher chunk size (smaller number of chunks).

  • Dask allows scheduler to be defined specfically. Let’s assign ‘single-threaded’ scheduler and run the same computation.
# Assigning scheduler "single-threaded" doesn't trigger Dask-worker?
%time big_calc = (x * x[::-1, ::-1]).sum().compute(scheduler='single-threaded')
print(f"Total size: {big_calc}")
CPU times: user 1min 23s, sys: 1min 54s, total: 3min 18s
Wall time: 3min 17s
Total size: 40000000000.0
  • The CPU utilization graph shows that ‘single-threaded’ scheduler uses a single CPU core but not using Dask-worker.

  • Now let’s assign ‘threads’ scheduler and run the same computation.
%%time
big_calc = (x * x[::-1, ::-1]).sum().compute(scheduler='threads')
print(f"Total size: {big_calc}")
Total size: 40000000000.0
CPU times: user 3min 2s, sys: 2min 47s, total: 5min 50s
Wall time: 26.9 s
  • The CPU utilization graph shows that ‘threads’ scheduler uses all the available CPU cores in the hosting node to complete the tasks. It doesn’t use Dask-worker.

Dask with Multiple Worker Pods

  • Restart the kernel and create a new Dask cluster.
import os
import time
import cdsw
import dask

dask_scheduler = cdsw.launch_workers(
    n=1,
    cpu=2,
    memory=2,
    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090",
)

# Wait for the scheduler to start.
time.sleep(10)
# Obtain the Dask UI address.
print("//".join(dask_scheduler[0]["app_url"].split("//")))
http://agfeuy4lgg17kf6f.ml-76cf996d-8f4.apps.apps.ocp4.cdpkvm.cldr/
scheduler_workers = cdsw.list_workers()
scheduler_id = dask_scheduler[0]["id"]
scheduler_ip = [
    worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id
][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"
scheduler_url
'tcp://10.254.0.116:8786'
  • Create 3 new CML worker pods attach them to the Dask cluster.
# Assign 3 worker nodes to the Dask cluster.
more_worker = 3
dask_workers = cdsw.launch_workers(
    n=more_worker,
    cpu=2,
    memory=32,
    code=f"!dask-worker {scheduler_url}",
)

# Wait for the workers to start.
time.sleep(10)
from dask.distributed import Client
client = Client(scheduler_url)
client

Client

Client-107eecec-5466-11ed-80f9-0a580afe03f7

Connection method: Direct
Dashboard: http://10.254.0.116:8090/status

Scheduler Info

Scheduler

Scheduler-0f7f401a-d799-4e03-9eff-f06bcb06b243

Comm: tcp://10.254.0.116:8786 Workers: 3
Dashboard: http://10.254.0.116:8090/status Total threads: 72
Started: Just now Total memory: 89.13 GiB

Workers

Worker: tcp://10.254.0.117:34289

Comm: tcp://10.254.0.117:34289 Total threads: 24
Dashboard: http://10.254.0.117:34401/status Memory: 29.71 GiB
Nanny: tcp://10.254.0.117:38833
Local directory: /home/cdsw/dask-worker-space/worker-9d3afyzk
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 2.0% Last seen: Just now
Memory usage: 137.63 MiB Spilled bytes: 0 B
Read bytes: 2.15 kiB Write bytes: 2.82 kiB

Worker: tcp://10.254.0.119:36463

Comm: tcp://10.254.0.119:36463 Total threads: 24
Dashboard: http://10.254.0.119:45335/status Memory: 29.71 GiB
Nanny: tcp://10.254.0.119:38037
Local directory: /home/cdsw/dask-worker-space/worker-kqs0vmco
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 2.0% Last seen: Just now
Memory usage: 131.52 MiB Spilled bytes: 0 B
Read bytes: 417.9661186980349 B Write bytes: 131.9893006414847 B

Worker: tcp://10.254.2.119:43099

Comm: tcp://10.254.2.119:43099 Total threads: 24
Dashboard: http://10.254.2.119:33821/status Memory: 29.71 GiB
Nanny: tcp://10.254.2.119:40791
Local directory: /home/cdsw/dask-worker-space/worker-_rg8214f
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 2.0% Last seen: Just now
Memory usage: 127.04 MiB Spilled bytes: 0 B
Read bytes: 2.15 kiB Write bytes: 3.64 kiB
  • Openshift dashboard shows 3 Dask worker pods with 1 Dask scheduler pod.

  • Run the same computation and check the completion time.
import dask.array as da
arrayshape = (200000, 200000)
chunksize = (10000, 10000)
x = da.ones(arrayshape, chunks=chunksize)
x
Array Chunk
Bytes 298.02 GiB 762.94 MiB
Shape (200000, 200000) (10000, 10000)
Count 400 Tasks 400 Chunks
Type float64 numpy.ndarray
200000 200000
%%time
big_calc = (x * x[::-1, ::-1]).sum()
result = big_calc.compute()
print(f"Total size: {result}")
Total size: 40000000000.0
CPU times: user 329 ms, sys: 113 ms, total: 443 ms
Wall time: 22.9 s

Dask with NVIDIA GPU

  • Restart the kernel. Use GPU to compute the same array shape by using cuda library on the pod attached with NVIDIA GPU card. Take note of the completion time.
import cupy as cpcupy
import dask.array as dacupy

arrayshape = (200000, 200000)
chunksize = (10000, 10000)
y = dacupy.ones_like(cpcupy.array(()), shape=arrayshape, chunks=chunksize)
y
Array Chunk
Bytes 298.02 GiB 762.94 MiB
Shape (200000, 200000) (10000, 10000)
Count 400 Tasks 400 Chunks
Type float64 cupy._core.core.ndarray
200000 200000
%time array_sumcupy = dacupy.sum(y).compute()
print(f"Total size: {array_sumcupy}")
CPU times: user 1.09 s, sys: 2.34 s, total: 3.43 s
Wall time: 2.5 s
Total size: 40000000000.0

  • The following graph displays the GPU utilization during the execution of the above tasks.

Conclusion:

  • The performance output of Dask depends heavily on the chunk size assignment. Higher chunk size results in smaller number of tasks and allows execution to complete quicker. This also means higher CPU utilization when running the tasks. In addition, higher chunk size requires more memory as well. The kernel crashes due to insufficient memory when running the tasks with higher chunk size.
  • Using more Dask worker nodes might not necessarily result in shorter completion time as overhead applies. Dask makes sense for huge and complext dataset processing but definitely not applicable for small and simple machine learning task.
  • Using GPU in Dask completes the tasks much faster than using CPU for typical CPU-bound computation.


Back to top

All trademarks, logos, service marks and company names appeared here are the property of their respective owners.

Linkedin