def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(ExitDis.getClass.getSimpleName)
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1 = sc.textFile("文件 1")
val rdd2 = sc.textFile("文件 2")
val rdd3 = sc.textFile("文件 3")
rdd1.union(rdd2).union(rdd3).distinct()
.filter(line=>{
val str = line.split(",", -1)
val time = str(1).substring(21, 29).toLong
time <= 20180704
})
.saveAsTextFile("输出文件")
}
代码如上
提交命令如下
spark-submit --class com.xxx.distinct.ExitDis --master yarn --deploy-mode client --num-executors 10 --driver-memory 2g --executor-memory 18g --executor-cores 18 --queue default --conf spark.kryoserializer.buffer.max=2024m xxx
三个文件分别是 1T、225G、225G
其中两个个报错信息为:
1.ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container marked as failed: container_e108_1536921006225_2386_02_000007 on host: zzcywhadoop03. Exit status: 143. Diagnostics: Container [pid=40865,containerID=container_e108_1536921006225_2386_02_000007] is running beyond physical memory limits. Current usage: 20.2 GB of 20 GB physical memory used; 23.0 GB of 42 GB virtual memory used. Killing container.
2. ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 20.4 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
可以看出超出了内存上限,但内存上限设置为 20G 是不能变的
百度上试了好多答案都行不通,请大神帮忙看看还有什么办法