spark通过kafka-appender指定日志输出到kafka引发的死锁问题

在采用log4j的kafka-appender收集spark任务运行日志时,发现提交到yarn上的任务始终ACCEPTED状态,无法进入RUNNING状态,并且会重试两次后超时。期初认为是yarn资源不足导致,但在确认yarn资源充裕的时候问题依旧,而且基本上能稳定复现。

起初是这么配置spark日志输出到kafka的:

log4j.rootCategory=INFO, console, kafka
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n

# Kafka appender
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# Set Kafka topic and brokerList
log4j.appender.kafka.topic=yarn_spark_log
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=10
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m

这里用org.apache.kafka.log4jappender.KafkaLog4jAppender默认将所有日志都输出到kafka,这个appender已经被kafka官方维护,稳定性应该是可以保障的。

问题定位

发现问题后,尝试将输出到kafka的规则去掉,问题解除!于是把问题定位到跟日志输出到kafka有关。通过其他测试,证实目标kafka其实是正常的,这就非常奇怪了。

查看yarn的ResourceManager日志,发现有如下超时

2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
 state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE

表明,yarn本身是接收任务的,但是发现任务迟迟没有启动。在spark的场景下其实是指只有driver启动了,但是没有启动executor。
而查看driver日志,发现日志输出到一个地方就卡住了,不往下继续了。通过对比成功运行和卡住的情况发现,日志卡在这条上:

2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A

卡住的情况下,只会打出SecurityManager这行,而无法打出Metadata这行。
猜想Metadata这行是kafka-client本身打出来的,因为整个上下文只有yarn, spark, kafka-client可能会打出这个日志。

在kafka-client 2.2.0版本中找到这个日志是输出位置:

public synchronized void update(MetadataResponse metadataResponse, long now) {
  ...

  String newClusterId = cache.cluster().clusterResource().clusterId();
  if (!Objects.equals(previousClusterId, newClusterId)) {
    log.info("Cluster ID: {}", newClusterId);
  }
  ...
}

看到synchronized,高度怀疑死锁。于是考虑用jstack分析:

在yarn上运行spark任务的时候,driver进程叫ApplicationMaster,executor进程叫CoarseGrainedExecutorBackend。这里首先尝试再复现过程中找到drvier最终在哪个节点上运行,然后快速使用jstack -F <pid>打印堆栈

jstack果然不负众望,报告了死锁!这里我把结果贴的全一点

[root@node1 ~]# jstack 20136
20136: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding
[root@node1 ~]# jstack -F 20136
Attaching to process ID 20136, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.231-b11
Deadlock Detection:

Found one Java-level deadlock:
=============================

"kafka-producer-network-thread | producer-1":
 waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender),
 which is held by "main"
"main":
 waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata),
 which is held by "kafka-producer-network-thread | producer-1"

Found a total of 1 deadlock.

Thread 20157: (state = BLOCKED)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)

Thread 20150: (state = BLOCKED)

Thread 20149: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame)
 - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)

Thread 20148: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=502 (Interpreted frame)
 - java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame)
 - java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)

Thread 20137: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame)
 - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame)
 - org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame)
 - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame)
 - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame)
 - org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)

到这里,已经确定是死锁,导致driver一开始就运行停滞,那么当然无法提交executor执行。
具体的死锁稍后分析,先考虑如何解决。从感性认识看,似乎只要不让kafka-client的日志也输出到kafka即可。实验后,发现果然如此:如果只输出org.apache.spark的日志就可以正常执行。

根因分析

从stack的结果看,造成死锁的是如下两个线程:

  • kafka-client内部的网络线程spark
  • 主入口线程

两个线程其实都是卡在打日志上了,观察堆栈可以发现,两个线程同时持有了同一个log对象。而这个log对象实际上是kafka-appender。而kafka-appender本质上持有kafka-client,及其内部的Metadata对象。log4j的doAppend为了保证线程安全也用synchronized修饰了:

public
 synchronized
 void doAppend(LoggingEvent event) {
  if(closed) {
   LogLog.error("Attempted to append to closed appender named ["+name+"].");
   return;
  }

  if(!isAsSevereAsThreshold(event.level)) {
   return;
  }

  Filter f = this.headFilter;

  FILTER_LOOP:
  while(f != null) {
   switch(f.decide(event)) {
   case Filter.DENY: return;
   case Filter.ACCEPT: break FILTER_LOOP;
   case Filter.NEUTRAL: f = f.next;
   }
  }

  this.append(event);
 }

于是事情开始了:

  • main线程尝试打日志,首先进入了synchronized的doAppend,即获取了kafka-appender的锁
  • kafka-appender内部需要调用kafka-client发送日志到kafka,最终调用到Thread 20137展示的,运行到Metadata.awaitUpdate(也是个synchronized方法),内部的wait会尝试获取metadata的锁。(详见https://github.com/apache/kaf...)
  • 但此时,kafka-producer-network-thread线程刚好进入了上文提到的打Cluster ID这个日志的这个阶段(update方法也是synchronized的),也就是说kafka-producer-network-thread线程获得了metadata对象的锁
  • kafka-producer-network-thread线程要打印日志同样执行synchronized的doAppend,即获取了kafka-appender的锁

上图main-thread持有了log对象锁,要求获取metadata对象锁;kafka-producer-network-thread持有了metadata对象锁,要求获取log对象锁于是造成了死锁。

总结

到此这篇关于spark通过kafka-appender指定日志输出到kafka引发的死锁的文章就介绍到这了,更多相关spark指定日志输出内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解如何使用Spark和Scala分析Apache访问日志

    安装 首先需要安装好Java和Scala,然后下载Spark安装,确保PATH 和JAVA_HOME 已经设置,然后需要使用Scala的SBT构建Spark如下: $ sbt/sbt assembly 构建时间比较长.构建完成后,通过运行下面命令确证安装成功: $ ./bin/spark-shell scala> val textFile = sc.textFile("README.md") // 创建一个指向 README.md 引用 scala> textFile.co

  • spark通过kafka-appender指定日志输出到kafka引发的死锁问题

    在采用log4j的kafka-appender收集spark任务运行日志时,发现提交到yarn上的任务始终ACCEPTED状态,无法进入RUNNING状态,并且会重试两次后超时.期初认为是yarn资源不足导致,但在确认yarn资源充裕的时候问题依旧,而且基本上能稳定复现. 起初是这么配置spark日志输出到kafka的: log4j.rootCategory=INFO, console, kafka log4j.appender.console=org.apache.log4j.ConsoleA

  • Golang如何将日志以Json格式输出到Kafka

    目录 格式化接口 普通文本格式化器 Json文本格式化器 写日志接口 写日志到文件 写日志到Kafka 接口的组装 如何提高日志处理的吞吐量 在上一篇文章中我实现了一个支持Debug.Info.Error等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适合练手.有兴趣的可以通过这个链接前往:https://github.com/bosima/ylog/releases/tag/v1.0.1 工程实践中,我们往往还需要对日志进行采集,将日志归集到一起,然后用于各种处理分析,比如生产环境

  • 使用Log4j为项目配置日志输出应用详解以及示例演示的实现分析

    Log4j组件构成  Log4j由三个重要的组件构成: 1.日志信息的优先级(Logger) 2.日志信息的输出目的地(Appender) 3.日志信息的输出格式(Layout). 概要: 日志信息的优先级从高到低有ERROR.WARN. INFO.DEBUG,分别用来指定这条日志信息的重要程度: 日志信息的输出目的地指定了日志将打印到控制台还是文件中: 而输出格式则控制了日志信息的显示内容. Log4j介绍 Log4j是 Apache的一个开放源代码项目,通过使用Log4j,我们可以控制日志信

  • springboot使用Logback把日志输出到控制台或输出到文件

    一:日志: 1.配置日志级别 日志记录器(Logger)的行为是分等级的.如下表所示: 分为:OFF.FATAL.ERROR.WARN.INFO.DEBUG.ALL 默认情况下,spring boot从控制台打印出来的日志级别只有INFO及以上级别,可以配置日志级别 设置日志级别 logging.level.root=WARN 这种方式只能将日志打印在控制台上 二.Logback日志 spring boot内部使用Logback作为日志实现的框架. Logback和log4j非常相似,如果你对l

  • logback过滤部分日志输出的操作

    目录 logback过滤部分日志输出 场景 日志过滤 Logback 自定义灵活的日志过滤规则 下面是一个只记录日志级别为ERROR的例子 下面举一个简单的例子 logback过滤部分日志输出 场景 使用监控异常日志进行告警时,部分异常日志可能只是不需要告警,但无法通过编码去除时,可以通过不输出这类异常日志达到忽略告警的目的. 比如在系统中经常会出现断开的管道的相关问题,异常如下 org.apache.catalina.connector.ClientAbortException: java.i

  • Java Fluent Mybatis 分页查询与sql日志输出详解流程篇

    目录 前言 准备数据 Sql日志配置 官方分页查询 PageHelper分页查询 总结 前言 接着我上一章:Java Fluent Mybatis 项目工程化与常规操作详解流程篇 下 上一章我把项目做了一部分工程化包装,主要还是想要之后的调试能够方便一些.那么这一章接着上一章的内容,做一下查询分页,并且将每次请求所调用的sql语句写在日志里面,便于我们观察定位问题.代码之后还是会上传到github. GitHub代码仓库地址:GitHub仓库 准备数据 简单的准备了一些数据. Sql日志配置 之

  • Springboot异常日志输出方式

    目录 lombok插件使用 统一异常处理 统一日志输出 配置日志级别↓ Logback日志↓ 配置logback日志↓ 安装idea彩色日志插件:grep-console 复制粘贴即可 lombok插件使用 引入依赖,在项目中使用Lombok可以减少很多重复代码的书写.比如说getter/setter/toString等方法的编写 ↓ <!--lombok用来简化实体类--> <dependency> <groupId>org.projectlombok</gro

  • C# log4net 日志输出的实现示例

    目录 第一步:安装log4net 第二步:添加log4net.config配置文件 第三步:添加日志配置 第四步:AssemblyInfo.cs中配置 Watch = true 思路: 1.安装插件:安装log4net 2.使用配置:添加log4net.config配置文件 3.输出日志文件格式:添加日志配置 4.AssemblyInfo.cs中配置 第一步:安装log4net 第二步:添加log4net.config配置文件 新增log4net.config配置文件,内容如下,与Program

  • Python向日志输出中添加上下文信息

    除了传递给日志记录函数的参数(如msg)外,有时候我们还想在日志输出中包含一些额外的上下文信息.比如,在一个网络应用中,可能希望在日志中记录客户端的特定信息,如:远程客户端的IP地址和用户名.这里我们来介绍以下几种实现方式: 通过向日志记录函数传递一个extra参数引入上下文信息 使用LoggerAdapters引入上下文信息 使用Filters引入上下文信息 一.通过向日志记录函数传递一个extra参数引入上下文信息 前面我们提到过,可以通过向日志记录函数传递一个extra参数来实现向日志输出

  • 利用Log4j将不同Package的日志输出到不同文件的方法

    前言 随着项目规模的越来越大,会不断的引入新的模块,不同的模块都会打印自己的日志,最后就造成日志根本没法查看,比如我自己的项目中,就存在以下这些日志: 接收外界消息的日志.对外发送消息的日志: 后台常驻线程的处理日志: 外部接口访问的参数.返回结果等接口日志: Service访问数据库产生的SQL日志: 这其中,消息日志和后台线程的日志数据量非常庞大,如果所有日志打印在一个文件中,使用tail -f log.log文件,会发现日志在快速的滚动,根本无法查看甚至定位某一个具体的SQL或者Servi

随机推荐