spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/spark/hbi/logs/ -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC -XX:+PrintReferenceGC -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/spark/hbi/logs/gc-%t.log
Spark Thrift Server大量复用了HiveServer2的代码。
HiveServer2的架构主要是通过ThriftCLIService监听端口,然后获取请求后委托给CLIService处理。CLIService又一层层的委托,最终交给OperationManager处理。OperationManager会根据请求的类型创建一个Operation的具体实现处理。比如Hive中执行sql的Operation实现是SQLOperation。
Spark Thrift Server做的事情就是实现自己的CLIService——SparkSQLCLIService,接着也实现了SparkSQLSessionManager以及SparkSQLOperationManager。另外还实现了一个处理sql的Operation——SparkExecuteStatementOperation。这样,当Spark Thrift Server启动后,对于sql的执行就会最终交给SparkExecuteStatementOperation了。
Spark Thrift Server其实就重写了处理sql的逻辑,其他的请求处理就完全复用HiveServer2的代码了。比如建表、删除表、建view等操作,全部使用的是Hive的代码。
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation;
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$;
import scala.Option;
import scala.util.control.NonFatal$;
public final class null implements Runnable {
private final ScheduledExecutorService timeoutExecutor$1;
public null(SparkExecuteStatementOperation $outer, ScheduledExecutorService timeoutExecutor$1) {}
public void run() {
try {
this.$outer.timeoutCancel();
} catch (Throwable throwable1) {
Throwable throwable2 = throwable1;
Option option = NonFatal$.MODULE$.unapply(throwable2);
} finally {
this.timeoutExecutor$1.shutdown();
}
}
}
# 核心变量timeout
// If a timeout value `queryTimeout` is specified by users and it is smaller than
// a global timeout value, we use the user-specified value.
// This code follows the Hive timeout behaviour (See #29933 for details).
private val timeout = {
val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT)
if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) {
globalTimeout
} else {
queryTimeout
}
}
# 中间代码省略...
# 内部类创建逻辑
if (timeout > 0) {
val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
timeoutCancel()
} catch {
case NonFatal(e) =>
setOperationException(new HiveSQLException(e))
logError(s"Error cancelling the query after timeout: $timeout seconds")
} finally {
timeoutExecutor.shutdown()
}
}
}, timeout, TimeUnit.SECONDS)
}
timeout默认等于客户端传过来的queryTimeout
timeout大于0时,会创建单线程的线程池,提交延迟任务,延迟任务在timeout时间后开始执行,执行完成后会停止线程池
a. 注意1:延迟任务作为SESO的内部类,会持有SESO的引用
b. 注意2:timeout单位是seconds
3.问题原因:如果timeout时间很长,而且sql执行时间很短,如果短时间内有大量的查询,那么这些线程在timeout时间内一直持有延迟任务的引用,也就是间接持有SESO对象的引用,这就是内存溢出的原因
org.apache.hive.service.cli.CLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map<java.lang.String,java.lang.String>, long)
# arthas watch 命令
watch org.apache.hive.service.cli.CLIService executeStatementAsync '{params}' 'params[3]>0' -x 2
# 连接池配置
engine.ds.hive.read.type = com.alibaba.druid.pool.DruidDataSource
# 单位是seconds,也就是16.7小时(Druid在用"SELECT 1"探活时使用该值)
engine.ds.hive.read.pool.queryTimeout=60000
# 单位是mills,也就是60秒(Druid没有使用该配置)
engine.ds.hive.read.pool.validationTimeout=60000
# 连接池配置
engine.ds.hive.read.type = com.zaxxer.hikari.HikariDataSource
# 单位是seconds,也就是5分钟
engine.ds.hive.read.pool.queryTimeout=300
# 单位是mills,也就是60秒(Hikari在用"SELECT 1"探活时使用该值)
engine.ds.hive.read.pool.validationTimeout=60000