22

聊聊Elasticsearch的AtomicArray - code-craft - SegmentFault 思否

 4 years ago
source link: https://segmentfault.com/a/1190000019388911?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文主要研究一下Elasticsearch的AtomicArray

AtomicArray

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

public class AtomicArray<E> {
    private final AtomicReferenceArray<E> array;
    private volatile List<E> nonNullList;

    public AtomicArray(int size) {
        array = new AtomicReferenceArray<>(size);
    }

    /**
     * The size of the expected results, including potential null values.
     */
    public int length() {
        return array.length();
    }

    /**
     * Sets the element at position {@code i} to the given value.
     *
     * @param i     the index
     * @param value the new value
     */
    public void set(int i, E value) {
        array.set(i, value);
        if (nonNullList != null) { // read first, lighter, and most times it will be null...
            nonNullList = null;
        }
    }

    public final void setOnce(int i, E value) {
        if (array.compareAndSet(i, null, value) == false) {
            throw new IllegalStateException("index [" + i + "] has already been set");
        }
        if (nonNullList != null) { // read first, lighter, and most times it will be null...
            nonNullList = null;
        }
    }

    /**
     * Gets the current value at position {@code i}.
     *
     * @param i the index
     * @return the current value
     */
    public E get(int i) {
        return array.get(i);
    }

    /**
     * Returns the it as a non null list.
     */
    public List<E> asList() {
        if (nonNullList == null) {
            if (array == null || array.length() == 0) {
                nonNullList = Collections.emptyList();
            } else {
                List<E> list = new ArrayList<>(array.length());
                for (int i = 0; i < array.length(); i++) {
                    E e = array.get(i);
                    if (e != null) {
                        list.add(e);
                    }
                }
                nonNullList = list;
            }
        }
        return nonNullList;
    }

    /**
     * Copies the content of the underlying atomic array to a normal one.
     */
    public E[] toArray(E[] a) {
        if (a.length != array.length()) {
            throw new ElasticsearchGenerationException("AtomicArrays can only be copied to arrays of the same size");
        }
        for (int i = 0; i < array.length(); i++) {
            a[i] = array.get(i);
        }
        return a;
    }
}
  • AtomicArray封装了AtomicReferenceArray并定义了nonNullList,提供了asList方法转换为ArrayList;而setOnce方法则使用了AtomicReferenceArray的compareAndSet方法来实现;另外set及setOnce都会判断nonNullList是否为null,不为null则重新设置为null

GroupedActionListener

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java

public final class GroupedActionListener<T> implements ActionListener<T> {
    private final CountDown countDown;
    private final AtomicInteger pos = new AtomicInteger();
    private final AtomicArray<T> results;
    private final ActionListener<Collection<T>> delegate;
    private final Collection<T> defaults;
    private final AtomicReference<Exception> failure = new AtomicReference<>();

    /**
     * Creates a new listener
     * @param delegate the delegate listener
     * @param groupSize the group size
     */
    public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize,
                                 Collection<T> defaults) {
        results = new AtomicArray<>(groupSize);
        countDown = new CountDown(groupSize);
        this.delegate = delegate;
        this.defaults = defaults;
    }

    @Override
    public void onResponse(T element) {
        results.setOnce(pos.incrementAndGet() - 1, element);
        if (countDown.countDown()) {
            if (failure.get() != null) {
                delegate.onFailure(failure.get());
            } else {
                List<T> collect = this.results.asList();
                collect.addAll(defaults);
                delegate.onResponse(Collections.unmodifiableList(collect));
            }
        }
    }

    @Override
    public void onFailure(Exception e) {
        if (failure.compareAndSet(null, e) == false) {
            failure.accumulateAndGet(e, (previous, current) -> {
                previous.addSuppressed(current);
                return previous;
            });
        }
        if (countDown.countDown()) {
            delegate.onFailure(failure.get());
        }
    }
}
  • GroupedActionListener的构造器根据groupSize创建了AtomicArray及CountDown
  • onResponse方法会调用AtomicArray的setOnce方法来设置结果,之后判断countDown是否都完成了,完成的话判断是否有failure,有则回调delegate.onFailure,没有failure则调用AtomicArray的asList方法获取list形式的结果,最后回调delegate.onResponse
  • onFailure方法会更新failure,如果compareAndSet失败则使用accumulateAndGet来更新,之后判断countDown是否都完成了,完成的话则回调delegate.onFailure

CountDown

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/CountDown.java

public final class CountDown {

    private final AtomicInteger countDown;
    private final int originalCount;

    public CountDown(int count) {
        if (count < 0) {
            throw new IllegalArgumentException("count must be greater or equal to 0 but was: " + count);
        }
        this.originalCount = count;
        this.countDown = new AtomicInteger(count);
    }

    /**
     * Decrements the count-down and returns <code>true</code> iff this call
     * reached zero otherwise <code>false</code>
     */
    public boolean countDown() {
        assert originalCount > 0;
        for (;;) {
            final int current = countDown.get();
            assert current >= 0;
            if (current == 0) {
                return false;
            }
            if (countDown.compareAndSet(current, current - 1)) {
                return current == 1;
            }
        }
    }

    /**
     * Fast forwards the count-down to zero and returns <code>true</code> iff
     * the count down reached zero with this fast forward call otherwise
     * <code>false</code>
     */
    public boolean fastForward() {
        assert originalCount > 0;
        assert countDown.get() >= 0;
        return countDown.getAndSet(0) > 0;
    }
    
    /**
     * Returns <code>true</code> iff the count-down has reached zero. Otherwise <code>false</code>
     */
    public boolean isCountedDown() {
        assert countDown.get() >= 0;
        return countDown.get() == 0;
    }
}
  • CountDown是一个简易线程安全非阻塞版的CountDownLatch,它提供了countDown方法使用compareAndSet来递减值,同时返回countDown是否完成(countDown.get() == 0);另外还提供了isCountedDown来查询countDown是否完成;还有fastForward方法用于将countDown直接设置为0
  • AtomicArray封装了AtomicReferenceArray并定义了nonNullList,提供了asList方法转换为ArrayList;而setOnce方法则使用了AtomicReferenceArray的compareAndSet方法来实现;另外set及setOnce都会判断nonNullList是否为null,不为null则重新设置为null
  • GroupedActionListener的构造器根据groupSize创建了AtomicArray及CountDown;onResponse方法会调用AtomicArray的setOnce方法来设置结果,之后判断countDown是否都完成了,完成的话判断是否有failure,有则回调delegate.onFailure,没有failure则调用AtomicArray的asList方法获取list形式的结果,最后回调delegate.onResponse;onFailure方法会更新failure,如果compareAndSet失败则使用accumulateAndGet来更新,之后判断countDown是否都完成了,完成的话则回调delegate.onFailure
  • CountDown是一个简易线程安全非阻塞版的CountDownLatch,它提供了countDown方法使用compareAndSet来递减值,同时返回countDown是否完成(countDown.get() == 0);另外还提供了isCountedDown来查询countDown是否完成;还有fastForward方法用于将countDown直接设置为0

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK