Spark-使用文档

fansichao 2021-10-23 16:25:17
Categories: Tags:

1.1. Spark 说明

Apache Spark 是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在 2009 年由加州大学伯克利分校的 AMPLab 开发,并于 2010 年成为 Apache 的开源项目之一,与 Hadoop 和 Storm 等其他大数据和 MapReduce 技术相比,Spark 有如下优势:

Spark 官网

1.2. Spark 框架

spark 运行流程图

Spark运行流程图

Spark 架构

Spark架构

详见参考链接https://blog.csdn.net/swing2008/article/details/60869183

1.3. Hadoop 说明

Hadoop 是一个由 Apache 基金会所开发的分布式系统基础架构

Hadoop 实现了一个分布式文件系统(Hadoop Distributed File System),简称 HDFS。

Hadoop 的框架最核心的设计就是:HDFS 和 MapReduce。HDFS 为海量的数据提供了存储,而 MapReduce 则为海量的数据提供了计算

Hadoop 百度百科

Hadoop 优点:

Hadoop 框架
Hadoop 有两个核心模块,分布式存储模块 HDFS分布式计算模块 Mapreduce.

1.4. Yarn 框架说明

由于原有框架 JobTracker/TaskTracker 需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷,所以推出了 Yarn 框架。

Yarn 框架核心在于将资源管理和任务调度/监控拆分。

Hadoop新MapReduce框架Yarn

2. Spark 环境部署

不同部署模式

版本说明:

2.1. 安装 Scala

Spark 支持 Scala、Java 和 Python 等语言,不过 Spark 是采用 Scala 语言开发,所以必须先安装 Scala.

步骤 1:下载
Scala-2.12.7 下载地址

1
wget https://downloads.lightbend.com/scala/2.12.7/scala-2.12.7.tgz

步骤 2:解压

1
2
3
4
# 创建目录
tar -zxvf scala-2.12.7.tgz
sudo mv scala-2.12.7 /usr/local/scala
sudo chown scfan:scfan -R /usr/local/scala

步骤 3:配置环境变量

1
2
3
4
5
# 打开文件
sudo vim /etc/profile
# 添加内容如下
export SCALA_HOME=/usr/local/scala
export PATH=$SCALA_HOME/bin:$PATH

步骤 4:生效与验证

1
2
3
4
5
6
(env) [scfan@WOM ~]$ source /etc/profile
(env) [scfan@WOM ~]$ scala
Welcome to Scala 2.12.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171).
Type in expressions for evaluation. Or try :help.

scala>

2.2. 安装 Spark

步骤 1:下载

1
wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz

步骤 2:解压

1
2
3
tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz
sudo mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark
sudo chown -R scfan:scfan /usr/local/spark

步骤 3:配置环境变量

1
2
3
4
5
6
# 打开文件
sudo vim /etc/profile
# 添加内容如下
# Spark path
export SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

步骤 4:生效与验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(env) [scfan@WOM ~]$ source /etc/profile
(env) [scfan@WOM spark]$ source /etc/profile
(env) [scfan@WOM spark]$ pyspark
Python 2.7.11 (default, Apr 10 2018, 16:42:22)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2018-12-06 15:37:54 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/

Using Python version 2.7.11 (default, Apr 10 2018 16:42:22)
SparkSession available as 'spark'.
>>>

步骤 5:启动 Spark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(env) [scfan@WOM spark]$ ./bin/spark-shell --master local[2]
2018-12-06 15:49:10 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://WOM:4040
Spark context available as 'sc' (master = local[2], app id = local-1544082590634).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

页面 UI: http://WOM:4040

2.3. 安装 Hadoop(本地单节点)

安装步骤

2.3.1. 下载安装 Hadoop

官网:https://hadoop.apache.org/releases.html

1
2
3
4
5
6
7
8
# 下载
wget http://mirror.bit.edu.cn/apache/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz
# 解压
tar -zxvf hadoop-2.7.7.tar.gz
# 迁移
sudo mv hadoop-2.7.7 /usr/local/hadoop
# 授权
sudo chown scfan:scfan -R usr/local/hadoop

2.3.2. 设置环境变量

文件 /etc/profile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
## hadoop home
export HADOOP_HOME=/usr/local/hadoop
# hadoop path
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
# hadoop else env
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_PREFIX=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
# hadoop lib
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native:$JAVA_LIBRARY_PATH

2.3.3. 修改 Hadoop 配置文件

配置文件:/usr/local/hadoop/etc/hadoop/hadoop-env.sh

1
export JAVA_HOME=/usr/java/jdk1.8.0_171

HDFS 默认名称 /usr/local/hadoop/etc/hadoop/core-site.xml

1
2
3
4
5
6
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

MapReduce 配置 /usr/local/hadoop/etc/hadoop/yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<!-- 后续如果 spark-yarn 部署报错,需要解开此处
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8031</value>
</property>
-->
</configuration>

Job 配置 /usr/local/hadoop/etc/hadoop/mapred-site.xml

1
2
3
4
5
6
7
8
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<vaule>yarn</vaule>
</property>
</configuration>

HDFS 分布式文件系统 /usr/local/hadoop/etc/hadoop/hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>dfs.replication</name>
<vlaue>3</vlaue>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<vlaue>file:/usr/local/hadoop/hadoop_data/hdfs/namenode</vlaue>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<vlaue>file:/usr/local/hadoop/hadoop_data/hdfs/datanode</vlaue>
</property>
</configuration>

2.3.4. 格式化目录

1
2
3
4
5
# 创建存储目录
mkdir -p /usr/local/hadoop/hadoop_data/hdfs/namenode/
mkdir -p /usr/local/hadoop/hadoop_data/hdfs/datanode/
# 进行格式化(如果报错,删除namenode下文件夹current)
hadoop namenode -format # 会删除HDFS数据

2.3.5. 查看页面

1
2
3
4
# 启动HDFS
start-dfs.sh
# 启动Yarn
start-yarn.sh

Hadoop 界面: http://localhost:8088
HDFS 界面: http://localhost:50070

2.4. 部署 Spark Standalone Mode

参考链接:

本地单机模式

1
2
3
4
5
6
# 启动主节点 默认端口8080
./sbin/start-master.sh -h localhost --webui-port 8080
# 启动子节点
./sbin/start-slave.sh <master-spark-URL>
例如: <master-spark-URL> 可以在页面localhost:8080上面查看
./sbin/start-slave.sh spark://localhost:7077

2.5. 部署 Spark Mesos 模式

参考链接: http://spark.apache.org/docs/latest/running-on-mesos.html

Mesos 安装
参考链接:https://open.mesosphere.com/downloads/mesos/

1
2
3
# 下载系统对应 rpm 包
wget http://repos.mesosphere.com/el/6/x86_64/RPMS/mesos-1.7.0-2.0.1.el6.x86_64.rpm
rpm -ivh mesos-1.7.0-2.0.1.el6.x86_64.rpm

前端 WebUI 启动命令

1
mesos master --ip=localhost  --work_dir=/var/lib/mesos

前端 WebUI 地址:
http://localhost:5050/#/

2.6. 部署 Spark Yarn

参考链接: http://spark.apache.org/docs/latest/running-on-yarn.html

1
2
3
4
5
6
7
8
9
10
11
12
命令参数:
./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
命令样例:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10

2.6.1. 问题记录

问题说明

1
2
3
4
5
6
7
8
9
10
11
12
# 执行命令
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10
# 报错如下
2018-12-07 16:19:07 INFO Client:871 - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

问题解决
yarn-site.xml 增加如下内容

1
2
3
4
5
6
7
8
9
10
11
12
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8031</value>
</property>

2.7. 部署 Spark Kubernetes

Spark Kubernetes: https//spark.apache.org/docs/latest/running-on-kubernetes.html

kubernetes 官网:https://kubernetes.io/

TODO

3. Spark 数据统计

3.1. SparkRDD 使用

RDD - 弹性分布式数据集

RDD 是可以并行操作的容错的容错集合。创建 RDD 有两种方法:并行化 驱动程序中的现有集合,或引用外部存储系统中的数据集

官网 RDD 参考链接:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

Spark 启动
bin/pyspark

Spark 初始化

1
2
3
4
5
6
7
>>> from pyspark import SparkContext, SparkConf
>>> appName="fdm"
>>> master="mesos://localhost:5050"
>>> conf = SparkConf().setAppName(appName).setMaster(master)
>>> sc = SparkContext(conf=conf)
>>> sc
<SparkContext master=local[*] appName=PySparkShell>

并行化集合

1
2
3
4
>>> data = [1, 2, 3, 4, 5]
>>> distData = sc.parallelize(data)
>>> print distData
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

外部数据集
支持导入本地数据集、HDFS://xxxxxx 等

1
2
3
>>> distFile = sc.textFile("data.txt")
>>> distFile
data.txt MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

可写类型:

1
2
3
4
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

3.2. SparkDataFrame 使用

官网 DataFrame 参考链接:
http://spark.apache.org/docs/latest/sql-programming-guide.html

初始化 Spark Session

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

创建 DataFrame

1
2
3
4
5
6
7
8
9
10
11
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

3.3. SparkSQL 使用

官网参考链接:
http://spark.apache.org/docs/latest/sql-distributed-sql-engine.html#running-the-thrift-jdbcodbc-server

启动 Thrift JDBC / ODBC 服务器

1
./sbin/start-thriftserver.sh

访问前端 UI

1
http://localhost:4042/SQL/

使用 beeline 来测试 Thrift JDBC / ODBC 服务器:

1
2
3
./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000
# 输入用户名和空白密码

启动 spark-sql

1
./bin/spark-sql

Spark 问题整理

Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.

问题:运行 Spark 脚本报错

1
2

self.spark = SparkSession.builder.master(self.MASTER).appName(self.APPNAME).getOrCreate()

原因:
由于启动一个 Spark context 时,SparkUI 默认会使用 4040 端口,当 4040 端口被占用时,则尝试使用另外一个端口

解决步骤:
关闭 Spark-Shell 即可

错误日志:

1
2
3
4
5

2018-12-27 09:38:59 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
I1227 09:39:02.612689 26652 sched.cpp:232] Version: 1.7.0
I1227 09:39:02.619974 26650 sched.cpp:336] New master detected at master@192.168.172.70:5050
I1227 09:39:02.620997 26650 sched.cpp:356] No credentials provided. Attempting to register without authentication

Spark 操作细节

Mesos 使用

Messos 安装&配置

mesos 官方部署文档

Mesos 启动 & 关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
cd /usr/local/spark
./bin/spark-shell --master mesos://192.168.172.70:5050

/etc/mesos-master
/etc/mesos-slave

# 关闭 mesos-master
[root@WOM mesos-master]# netstat -lntp | grep 5050
[root@WOM mesos-master]# kill -9 XXXX
# 启动 mesoso-master
mesos-master --work_dir=/usr/local/mesos/master_data --log_dir=/usr/local/mesos/master_logs --no-hostname_lookup --ip=192.168.172.70 --cluster=wom
# 启动master-salve
mesos-slave --work_dir=/usr/local/mesos/salves_data --log_dir=/usr/local/mesos/salves_logs --master=192.168.172.70:5050 --no-hostname_lookup --ip=192.168.172.70 --port=5052
# 启动 Spark
./sbin/start-master.sh -h localhost --webui-port 8080
(env) [scfan@WOM spark]$ bin/spark-shell --master mesos://192.168.172.70:5050 --total-executor-cores 1 --driver-memory 512M --executor-memory 512M
## 2.4. 部署Spark Standalone Mode
参考链接:
- http://spark.apache.org/docs/latest/spark-standalone.html

本地单机模式
# 启动主节点 默认端口8080
./sbin/start-master.sh -h localhost --webui-port 8080
# 启动子节点
./sbin/start-slave.sh <master-spark-URL>
例如: <master-spark-URL> 可以在页面localhost:8080上面查看
./sbin/start-slave.sh spark://localhost:7077

删除 mesos 工作目录

1
2
3
4
5
6
7
8
9
10
11
12

如果我需要一个新的mesos集群,我需要master的干净工作目录。但问题不在于10.142.55.202约瑟夫吴说。我清除了所有的word_dir,并摆脱了这个问题。

如何清理工作目录:

找到mesos-master工作目录

$ cat /etc/mesos-master/work_dir
/var/lib/mesos
去掉它

$ rm -rf /var/lib/mesos

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

当前的集群的可用资源不能满足应用程序所请求的资源

资源分 2 类: cores 和 ram
Core 代表对执行可用的 executor slots
Ram 代表每个 Worker 上被需要的空闲内存来运行你的 Application。
解决方法:
应用不要请求多余空闲可用资源的
关闭掉已经执行结束的 Application

解决方法:

  1. 执行参数修改内存大小
  2. 释放内存,增加内存大小

export SPARK_WORKER_MEMORY=512M
export SPARK_DAEMON_MEMORY=256M

这些–executor-memory、–driver-memory 你是否能先指定得更小些(比如 50M、100M)

1.因为提交任务的节点不能和 worker 节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为 4044,工作节点需要反馈进度给该该端口,所以如果主机名或者 IP 在 hosts 中配置不正确。所以检查下主机名和 ip 是否配置正确。

2.也有可能是内存不足造成的。内存设置可以根据情况调整下。另外,也检查下 web UI 看看,确保 worker 节点处于 alive 状态。

Spark

tags: 202101 大数据

常用命令

1
2
3
4
5
6
7
8
9
# 查看分区目录
hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200916/part-00109,
# 删除分区目录数据后,也必须删除要分区
alter table db_dwd_test.dwd_ei_basic_tsc_tax_illegal_ds drop partition (p_date='20200917');

# 进入 调试环境
~/spark/bin/pyspark --master ${master_ip} --total-executor-cores 5
# 执行任务
~/spark/bin/spark-sql --master {master_ip} --driver-memory 1g --executor-memory 1g --executor-cores 1 --total-executor-cores 2 xx.py

问题记录

分区数量过多

a union all b
最后的分区数量会变为 a 的分区+b 的分区

1
2
3
20/09/18 14:12:13 INFO TaskSetManager: Finished task 17432.0 in stage 3.0 (TID 17437) in 503 ms on 192.168.201.8 (executor 1) (17435/17436)
20/09/18 14:12:13 INFO TaskSetManager: Finished task 17426.0 in stage 3.0 (TID 17431) in 1312 ms on 192.168.207.96 (executor 0) (17436/17436)
20/09/18 14:12:13 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

spark-submit 报 No module 的错误

问题说明:使用外部依赖包,报 not module xxxx

可能原因 https://segmentfault.com/q/1010000004569365

问题:No module named tools.utils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[An error occurred while calling o35.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 43, 192.168.201.40, executor 4): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/dfs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 159, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/home/dfs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 93, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile)
File "/home/dfs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 79, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/home/dfs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/home/dfs/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/home/dfs/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
return pickle.loads(obj)
File "./scripts.zip/scripts/zhengfujigou.py", line 19, in <module>
ImportError: No module named tools.utils

修改步骤 1:替换 --py-files 为单个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    bash /home/scrapyer/spark/bin/spark-submit --master ${spark_ip} \
--executor-memory 1g \
--total-executor-cores 20 \
--py-files /home/scrapyer/workspace/fansichao/workspace/${frame_dir}/conf.zip \
--py-files /home/scrapyer/workspace/fansichao/workspace/${frame_dir}/tools.zip \
--py-files /home/scrapyer/workspace/fansichao/workspace/${frame_dir}/scripts.zip \
${code_path} ${conf_path} ${p_date} > logs/${app_name}_${p_date}.log 2>&1

修改为

bash /home/scrapyer/spark/bin/spark-submit --master ${spark_ip} \
--executor-memory 1g \
--total-executor-cores 5 \
--py-files conf.zip,tools.zip,scripts.zip \
${code_path} ${conf_path} ${p_date}
# > logs/${app_name}_${p_date}.log 2>&1

使用同一个 --py-files, 而非多个--py-files !!!

多个 --py-files 在 Thanos 平台上无法真实 Kill 程序。

Resources

Spark 官方文档

技术笔记

Tips

Vscode 配置代码行数
editor.rulers,默认 79,建议 160

– Spark 提交任务的三种方式
https://www.cnblogs.com/itboys/p/9998666.html

参考资源

优质博客

Spark 技术知识

Spark 知识术语

Spark 分区的区别

Spark 学习目录

https://blog.csdn.net/a544258023/article/details/94635807

常用命令

Spark - 重新分区

https://blog.csdn.net/u010720408/article/details/90229461

1
2
3
4
5
6
# 查看分区目录
hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200916/part-00109,

# Hive中执行 删除分区目录数据后,也必须删除要分区
alter table db_dwd_test.dwd_ei_basic_tsc_tax_illegal_ds drop partition (p_date='20200917');

多个 py-files 使用

参考链接:https://www.jianshu.com/p/92be93cfbb97

hivE 数据类型 https://www.cnblogs.com/dangjf/p/10071683.html

Spark 数据类型转换 https://blog.csdn.net/an1090239782/article/details/102541024

1
2
3
4
5
6
7
8
9
10
11
12
13
14

ByteType:代表一个字节的整数。范围是-128127
ShortType:代表两个字节的整数。范围是-3276832767
IntegerType:代表4个字节的整数。范围是-21474836482147483647
LongType:代表8个字节的整数。范围是-92233720368547758089223372036854775807
FloatType:代表4字节的单精度浮点数 DoubleType:代表8字节的双精度浮点数
DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
StringType:代表一个字符串值
BinaryType:代表一个byte序列值
BooleanType:代表boolean值
Datetime类型:

TimestampType:代表包含字段年,月,日,时,分,秒的值
DateType:代表包含字段年,月,日的值

Spark 问题记录

TaskSetManager 过多,分区数过多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 问题背景
sql = """insert overwrite table {dwd_table} partition(p_date='{p_date}')
{dwd_table} union {ods_table}"""

# 问题描述
发现sql执行时,TaskSetManager分配的任务过多,part过多

# 解决方案
1. 在读取HDFS的时候设置固定的分区数。 数据进入ods时配置好分区数
rd = spark_tools.spark_context.textFile(hdfs_path).repartition(200)
2. 此sql执行方法修改,先保存为df,再进行df.repartition(200, "p_date")重新分区后,再保存到数据中。

# 其他说明
去除 union, 使用 "insert overwrite table {dwd_table}" and "insert into table {dwd_table}"
时,会产生 copy 分区,此方法不适用!

db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200917/part-00199
db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200917/part-00199_copy_1

详细日志

1
2
3
4
5
6
7
8
9
20/09/18 14:12:13 INFO TaskSetManager: Finished task 17432.0 in stage 3.0 (TID 17437) in 503 ms on 192.168.201.8 (executor 1) (17435/17436)
20/09/18 14:12:13 INFO TaskSetManager: Finished task 17426.0 in stage 3.0 (TID 17431) in 1312 ms on 192.168.207.96 (executor 0) (17436/17436)
20/09/18 14:12:13 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
20/09/18 14:12:13 INFO DAGScheduler: ResultStage 3 (sql at NativeMethodAccessorImpl.java:0) finished in 1248.035 s
20/09/18 14:12:13 INFO DAGScheduler: Job 2 finished: sql at NativeMethodAccessorImpl.java:0, took 1250.992204 s

-06729, dest: hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200917/part-06729, Status:true
20/09/18 14:20:15 INFO Hive: Renaming src: hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/.hive-staging_hive_2020-09-18_13-51-12_491_7780753504198483023-1/-ext-10000/part-06730, dest: hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200917/part-06730, Status:true
20/09/18 14:20:15 INFO Hive: Renaming src: hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/.hive-staging_hive_2020-09-18_13-51-12_491_7780753504198483023-1/-ext-10000/part-06731, dest: hdfs://offline-cluster/user/hive/warehouse/db_dwd_test.db/dwd_ei_basic_tsc_tax_illegal_ds/p_date=20200917/part-06731, Status:true