Dask并行任务调度

fansichao 2021-10-23 16:16:35
Categories: Tags:

Dask 说明介绍

Dask是用于 Python 中并行计算的灵活库。

达斯由两部分组成:

达斯克强调以下优点:

20191205_Dask_架构图.png

Dask 分析

(env36) [scfan@fdm tools]$ dask-scheduler
(env36) [scfan@fdm ~]$ dask-worker 10.0.2.14:8786

python3

Dask-资源分析
Dask-任务管理

Dask 优缺点

优点

缺点

Dask 部署

附件

性能测试

使用 自主建模-字段加工节点 测试 Pandas & Dask 性能

参考资源

Dask & Pandas 语法差异表

Github-Dask Collections API compatibility

样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Dask 没有 pandas.core.series.Series
if data_mode.upper() == 'DASK':
pass
else:
if varname.startswith('df') and not isinstance(argls[index], pandas.core.series.Series):
raise RuntimeError('第%s个参数必须为一列' % (index + 1))

# Dask DataFrame.replace 没有 inplace 参数
if data_mode == 'DASK':
data = data.replace(to_replace='nan',value='')
else:
data.replace(to_replace='nan',value='',inplace=True)

# Dask DataFrame.to_csv
# data.to_csv('a1.csv') 会创建目录
# data.to_csv(['a1.csv']) 会创建文件
# data.to_csv('a-*.csv') 会创建分区文件,创建多个文件
if data_mode == 'DASK':
data.to_csv(['a1.csv'],index=False)
else:
data.to_csv('a.csv',index=False)

Dask & Pandas 细节语法性能差异

开启程序

Dask-scheduler

开启 dask-scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(env36) [scfan@fdm tools]$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.dashboard.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: pip install jupyter-server-proxy
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-bdk4b7li
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://10.0.2.14:8786
distributed.scheduler - INFO - dashboard at: :8787
distributed.scheduler - INFO - Register tcp://10.0.2.14:30547
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.2.14:30547
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://10.0.2.14:9190
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.2.14:9190
distributed.core - INFO - Starting established connection

Dask-Scheduler 可视化界面
20191205_Dask_Scheduler_可视化界面.png

Dask-Worker

开启 Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(env36) [scfan@fdm tools]$ dask-worker 10.0.2.14:8786
distributed.nanny - INFO - Start Nanny at: 'tcp://10.0.2.14:12075'
distributed.diskutils - INFO - Found stale lock file and directory '/home/scfan/project/FISAMS/branches/branch_scfan/src/server/fdm/tools/worker-yyz2l21f', purging
distributed.dashboard.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: pip install jupyter-server-proxy
distributed.worker - INFO - Start worker at: tcp://10.0.2.14:17181
distributed.worker - INFO - Listening to: tcp://10.0.2.14:17181
distributed.worker - INFO - dashboard at: 10.0.2.14:36300
distributed.worker - INFO - Waiting to connect to: tcp://10.0.2.14:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 10.32 GB
distributed.worker - INFO - Local Directory: /home/scfan/project/FISAMS/branches/branch_scfan/src/server/fdm/tools/worker-5304u4tp
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.0.2.14:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

Dask-Worker 可视化界面
20191205_Dask_Worker_可视化界面.png

Dask 对比

Dask 缺点

Dask 优点

Dask 支持项