69

log4j异步那些事(1)--AsyncAppender

 5 years ago
source link: https://bryantchang.github.io/2018/11/18/log4j-async/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

简介Logger和Appender的异步化配置和基本原理

前面的博客里,我简单介绍过了Log4j2的简单配置和其中基本组件LogManager,LoggerContext以及Configuration的机制和流程。而还有两个关键的组件Logger和Appender,他们是对msg做真正处理的关键组件,在众多类型的Logger和Appender中,我主要想把目光集中在其中比较特定的,异步的Logger和Appender。如果大家平时只是简单使用Log4j,可能对异步的Logger和Appender有些分不清,或者并未配置过这类的异步化组件。我将用两篇文章分别介绍异步Appender和异步Logger。

Logger和Appender的关系

前面的博客中,我介绍过,记录日志的代码,这里为了讲述方便,我重新把代码放上来

Logger logger = LogManager.getLogger(loggerName);
logger.info(msg);

这篇文章主要聚焦于异步性的实现,所以对于获取logger和调用info操作的通用流程我直接用两张流程图给出:

eqIZBv2.png!webyQveyuu.png!web

了解了通用流程后,我将分别介绍异步Appender和异步Logger的原理

异步Appender AsyncAppender

配置

我们以官方文档的两个配置为例来介绍异步Appender的配置,配置文件如下

<Configuration status="warn" name="MyApp" packages="">
  <Appenders>
    <File name="MyFile" fileName="logs/app.log">
      <PatternLayout>
        <Pattern>%d %p %c{1.} [%t] %m%n</Pattern>
      </PatternLayout>
    </File>
    <Async name="Async">
      <AppenderRef ref="MyFile"/>
    </Async>
  </Appenders>
  <Loggers>
    <Root level="error">
      <AppenderRef ref="Async"/>
    </Root>
  </Loggers>
</Configuration>
<Configuration name="LinkedTransferQueueExample">
  <Appenders>
    <List name="List"/>
    <Async name="Async" bufferSize="262144">
      <AppenderRef ref="List"/>
      <LinkedTransferQueue/>
    </Async>
  </Appenders>
  <Loggers>
    <Root>
      <AppenderRef ref="Async"/>
    </Root>
  </Loggers>
</Configuration>

配置中配置了两个Appender,一个为FileAppender另一个则是异步Appender,使用 <Async> 标签进行声明,在AsyncAppender中引用了FileAppender,那么存在疑问,一般的Appender都会有一个明确的输出位置,而对于这个异步Appender都需要引用一个其他的Appender才将msg最终输出。下面的介绍将会彻底解开这个疑问。

架构

下图展示了AsyncAppender的架构

YfQniyQ.png!web

AsyncAppender的核心部件是一个阻塞队列,logger将数据通过append方法放入到阻塞队列中,随后后台线程从队列中取出数据然后进行后续的操作

机制

上文简单介绍了架构,下面从源码角度来详细阐述AsyncAppender的流程

成员变量

public final class AsyncAppender extends AbstractAppender {

    private static final int DEFAULT_QUEUE_SIZE = 128;
    private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
    };

    private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);

    private final BlockingQueue<LogEvent> queue;
    private final int queueSize;
    private final boolean blocking;
    private final long shutdownTimeout;
    private final Configuration config;
    private final AppenderRef[] appenderRefs;
    private final String errorRef;
    private final boolean includeLocation;
    private AppenderControl errorAppender;
    private AsyncThread thread;
    private AsyncQueueFullPolicy asyncQueueFullPolicy;
}

可以看到其中的一些关键属性有一个阻塞队列queue,一个后台线程thread,一个AppenderRef的数组appenderRefs以及一个关键属性blocking。对于一个任何一个Appender对象,我们都应该关注他的append()而方法,对于一个后台线程,重要的方法则是run方法,对于AsyncAppender,这两个方法刚好对应了AsyncAppender的两个核心步骤,即放入消息以及处理消息,下面将分别说明。

append()方法

废话不多说,线上代码:

@Override
public void append(final LogEvent logEvent) {
    if (!isStarted()) {
        throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
    }
    if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
        logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters

    }
    final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
    if (!transfer(memento)) {
        if (blocking) {
            // delegate to the event router (which may discard, enqueue and block, or log in current thread)
            final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
            route.logMessage(this, memento);
        } else {
            error("Appender " + getName() + " is unable to write primary appenders. queue is full");
            logToErrorAppenderIfNecessary(false, memento);
        }
    }
}

可以看到这个appende方法流程并不复杂,只有以下两步:

1、创建LogEvent的复制对象memento
2、将event放入队列

虽然只有简单两步,也需要注意一个边界情况,那就是当阻塞队列满时Appender的处理,这里我首先给出流程,然后结合代码进行简要说明。

MfmQNn3.png!web

如流程图所示,首先会判断用户是否设置了blocking选项,如果未选择blocking选项,则Appender直接会将msg放入errorAppender中,如果用户没有配置这些Appender,则会直接丢弃这些消息,如果设置了这个属性,则会按照一定的策略来处理这些消息。策略可以分为3种,他们分别为:

1、Default---等待直到队列有空闲,退化为同步操作
2、Discard---按照日志级别丢弃一部分日志
3、用户自定义(需要实现AsyncQueueFullPolicy接口)

run()方法

当使用append方法将消息放入阻塞队列后,后台的线程将会一步的进行处理,这也就是异步线程的run方法的功能所在,首先简单看其数据结构

private class AsyncThread extends Log4jThread {
    private volatile boolean shutdown = false;
    private final List<AppenderControl> appenders;
    private final BlockingQueue<LogEvent> queue;
}

AsyncThread这是一个内部类,其中包含一个外部Appender类的阻塞队列,还有对应的AsyncAppender所引用的Appender。接下来我们详细看其中的run方法

public void run() {
     while (!shutdown) {
         LogEvent event;
         try {
             event = queue.take();
             if (event == SHUTDOWN) {
                 shutdown = true;
                 continue;
             }
         } catch (final InterruptedException ex) {
             break; // LOG4J2-830
         }
         event.setEndOfBatch(queue.isEmpty());
         final boolean success = callAppenders(event);
         if (!success && errorAppender != null) {
             try {
                 errorAppender.callAppender(event);
             } catch (final Exception ex) {
                 // Silently accept the error.
             }
         }
     }
     // Process any remaining items in the queue.
     LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
         queue.size());
     int count = 0;
     int ignored = 0;
     while (!queue.isEmpty()) {
         try {
             final LogEvent event = queue.take();
             if (event instanceof Log4jLogEvent) {
                 final Log4jLogEvent logEvent = (Log4jLogEvent) event;
                 logEvent.setEndOfBatch(queue.isEmpty());
                 callAppenders(logEvent);
                 count++;
             } else {
                 ignored++;
                 LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
             }
         } catch (final InterruptedException ex) {
             // May have been interrupted to shut down.
             // Here we ignore interrupts and try to process all remaining events.
         }
     }
     LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
         + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
}

可以看到,异步线程的逻辑比较简单,该线程会一直尝试从阻塞队列中获取logEvent数据,如果能够成功获取数据,则会调用AppenderRef所引用Appender的append方法,通过这个方法,我们可以看到,实际上,AsyncAppender可以看做一个中转站,其作用仅仅将消息的处理异步化,当消息放入阻塞队列后,info方法就能返回成功,这样能够大幅提高日志记录的吞吐,同时,用户可以自行权衡性能与日志收集质量上进行权衡(设置blocking选项),此外,用户还可以设置不同类型的阻塞队列已到达更好的日志记录吞吐。

再回配置

最后,让我们整体来看AsyncAppender所支持的所有配置项以及其中每个配置项的作用

名称 类型 描述 默认值 AppenderRef String 引用的Appender blocking boolean 是否阻塞等待(这里指队列满后的处理) true shutdownTimeout integer appender关闭时等待的超时时间 0(立刻关闭) bufferSize integer 阻塞队列的最大容量 1024 errorRef String 队列满后如果不阻塞时配置的errorAppender filter Filter 过滤器 name String 名称 ignoreExceptions boolean 用于决定是否需要记录在日志事件处理过程中出现的异常 true BlockingQueueFactory BlockingQueueFactory Buffer的种类(默认ArrayBlockingQueue,能够支持DisruptorBlockingQueue,JCToolsBlockingQueue,LinkedTransferQueue) ArrayBlockingQueueFactory

至此,我结合源码简单介绍了AsyncAppnder的使用配置以及基本原理,在下一篇文章中,我将介绍另一个异步化组件AsyncLogger

谢谢你请我吃糖果


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK