23

Asop 之 消息处理机制

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzA3NDcyMTQyNQ%3D%3D&%3Bmid=2649265427&%3Bidx=1&%3Bsn=b6a7d003579529552b9c9626d6a8c946
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击蓝字

关注我们

作者简介

amueM3q.png!mobile

葛泽续

2012年2月加入去哪儿网,多次参与 Android 客户端的重构,插件化开发,客户端安全攻防,现负责国际酒店抓取系统架构升级维护。

Android 应用程序是通过消息来驱动的,系统为每一个应用程序维护一个消息队例,应用程序的主线程不断地从这个消息队例中获取消息(Looper),然后对这些消息进行处理(Handler),这样就实现了通过消息来驱动应用程序的执行。先了解一下涉及到的几个概念:

  • Message

消息(Message)代表一个行为(what)或者一串动作(Runnable),每一个消息在加入消息队列时,都有明确的目标(Handler)。

  • MessageQueue

以队列的形式存放消息对象,其内部结构是以链表的形式存储消息。对外提供插入和删除操作。

  • Looper

Looper 是循环的意思,它负责从 MessageQueue 中循环的取出 Message 然后交给目标(Handler)处理。

  • Handler

消息的真正处理者,具备获取消息、发送消息、处理消息、移除消息等功能。

  • ThreadLocal

作用是为了线程隔离,内部实现相当于Map以当前线程为key,存入的值作为 value。

Looper 不断从 MessageQueue 中取出一个 Message,然后交给其对应的 Handler 处理。

vQJfiiB.jpg!mobile

我们平时接触到的 Looper、Message、Handler 都是用 JAVA 实现的,Android 是一个基于 Linux 的系统,底层用C、C++实现的,而且还有 NDK 的存在,Android 消息驱动的模型为了消息的及时性、高效性,在 Native 层也设计了 Java 层对应的类如 Looper、MessageQueue 等。

在 ActivityThread 的 main 函数里面调用主线程的 loop 方法开启消息循环监听,这个 loop 方法会一直运行,伴随应用的整个生命周期。

以下是 ActitivyThread 的 main 的实现:

public static void main(String[] args) {
Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain");


// CloseGuard defaults to true and can be quite spammy. We
// disable it here, but selectively enable it later (via
// StrictMode) on debug builds, but using DropBox, not logs.
CloseGuard.setEnabled(false);


Environment.initForCurrentUser();


// Set the reporter for event logging in libcore
EventLogger.setReporter(new EventLoggingReporter());


// Make sure TrustedCertificateStore looks in the right place for CA certificates
final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());
TrustedCertificateStore.setDefaultUserDirectory(configDir);


Process.setArgV0("<pre-initialized>");


Looper.prepareMainLooper();


ActivityThread thread = new ActivityThread();
thread.attach(false);


if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}


if (false) {
Looper.myLooper().setMessageLogging(new
LogPrinter(Log.DEBUG, "ActivityThread"));
}


// End of event ActivityThreadMain.
Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
Looper.loop();


throw new RuntimeException("Main thread loop unexpectedly exited");
}

prepareMainLooper 做的事情其实就是在线程中创建一个 Looper 对象:

/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

先调用 prepare 来进行主要成员变量的初始化,传传入参数 false 最终会传到 MessageQueue 的构造函数中。初始化完成后,接着调用 myLooper 方法将返回值赋给成员变量 sMainLooper,它也是一个 Looper 类型的成员变量,接着再来看一下 prepare 方法的实现,源码如下:

private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}

这个 Looper 对象是存放在 sThreadLocal 成员变量里面的。线程创建 Looper 对象的工作是由 prepare 函数来完成的,而在创建 Looper 对象的时候,会同时创建一个消息队列 MessageQueue,保存在 Looper 的成员变量 mQueue 中,后续消息就是存放在这个队列中去。消息队列在 Android 应用程序消息处理机制中最重要的组件,以下是它的创建过程:

public class MessageQueue {
......

// True if the message queue can be quit.
private final boolean mQuitAllowed;

private int mPtr; // used by native code

private native void nativeInit();

MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
......
}

它的初始化工作都交给 JNI 方法 nativeInit 实现:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}


nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}

在 JNI 中,也相应地创建了一个消息队列 NativeMessageQueue,接着把 C++ 里面的这个指针转成 jlong 类型返回给 java 层,赋值给前面我们在 Java 层创建的 MessageQueue 对象的 mPtr 成员变量。继续看 NativeMessageQueue 的创建过程:

NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}

它主要就是在内部创建了一个 Looper 对象,这里的 Looper 跟 java 层的是对应的。继续看 Looper 对象的创建过程:

Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));


AutoMutex _l(mLock);
rebuildEpollLocked();
}

该方法中首先调用 eventfd 系统函数,该函数返回一个文件描述符,与打开的其他文件一样,可以进行读写操作。然后调用 rebuildEpollLocked 函数继续进行后续的初始化,继续看 rebuildEpollLocked:

void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}


// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));


struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));


for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);


int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}

为我们的主线程创建 Epoll 循环的结构体。该方法执行完,我们的 epoll 节点添加进去之后,那么初始化的工作就结束了。framework 中为我们创建好的 java 层的 Looper、MessageQueue 和 native 层的 Looper、NativeMessageQueue 都已经准备好了,epoll 机制相应的节点也注册好了。

下面我们接着来分析 ActivityThread 类的 main 方法中的 Looper.loop()的实现。先调用 myLooper 方法来判断前面的准备工作是否完成,如果准备工作都出错,那就直接抛出运行时异常。接着一个 for (;;) 无限循环取消息。queue.next()取下一个消息,该方法可能会阻塞,如果取到的 msg 为空,则说明消息循环要退出了,则直接 return。取到下一个消息 msg 之后,就调用 msg.target.dispatchMessage(msg) 将它分发给目标进行处理,msg 的成员变量 target 的类型为 Handler,它是在我们往当前的 MessageQueue 消息队列上发送消息时指定的,分发完成后调用 recycleUnchecked() 来将当前的 msg 回收掉。Message 对象的构建也是使用了一个缓存池,因为消息循环是非常频繁的,所以使用缓存池可以有效的减少无用内存的分配,非常必要。接下来重点看一下 queue.next() 是如何取到下一条消息的,该方法的实现在 MessageQueue 类中,方法的源码如下:

Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}


int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}


nativePollOnce(ptr, nextPollTimeoutMillis);


synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}


// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}


// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}


if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}


// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler


boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}


if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}


// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;


// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}

主要是一个 for (;;) 无限循环,所有发送过来的消息最终都会存储在成员变量 mMessages 上,它的类型为 Message,Message 类又有一个类型为 Message 的成员变量 next,相当于 Message 类就是单向链表,所以我们发送过来的消息会不断的往上挂,从 mMessages 上取下一个消息 msg,如果当前消息时间未到,那么就需要休眠,休眠的时间长短取决于 nextPollTimeoutMillis;否则处理该消息,则将该消息返回给 Looper 类的 loop 方法中进行处理。下面看一下 nativePollOnce 的实现:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

取到在 native 层创建的 NativeMessageQueue,然后调用它的 pollOnce 继续处理,pollOnce 方法的源码如下:

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;


if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

调用 native 层的 Looper 类的 pollOnce 继续处理,源码如下:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}


if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}


result = pollInner(timeoutMillis);
}
}

调用 pollInner 进一步处理,pollInner 方法的源码如下:

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif


// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}


// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;


// We are about to idle.
mPolling = true;


struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);


// No longer idling.
mPolling = false;


// Acquire lock.
mLock.lock();


// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}


// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}


// Check for poll timeout.
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}


// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif


for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;


// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();


#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler


mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}


// Release lock.
mLock.unlock();


// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}


// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}

该方法的参数 timeoutMillis 就是下一个消息的等待时间,在调用 epoll wait 系统函数时,就会将当前的线程休眠。休眠时间到之后,epoll wait 就会返回,再次检查消息队列时,就会有符合要求的消息了。

END

BZFRriZ.jpg!mobile

6jemIbu.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK