25

java容器中的几种计数方法浅谈

 3 years ago
source link: http://www.cnblogs.com/yougewe/p/13238124.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文讨论java集合容器中的几种元素数量获取的方式,命题很小,但是也足以让我们思考一些东西。

所谓计数:即是给出所在容器的元素总数的方式。一般能想到的就是两种方式:一是使用某个字段直接存储该计数值,二是在请求计数值时临时去计算所有元素数量。貌似本文的答案已经出来了。好吧,那我们还是从源码的角度来验证下想法吧:

一般在java的集合容器中,可以分为普通容器和并发容器,我们就姑且以这种方式来划分下,验证下其实现计数的方式吧!

1. 普通容器 --HashMap

一般非并发容器在进行增删改时,都会同时维护一个count值,如hashmap中的实现:

    // HashMap 增加和修改都在此实现
    /**
     * Implements Map.put and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @param value the value to put
     * @param onlyIfAbsent if true, don't change existing value
     * @param evict if false, the table is in creation mode.
     * @return previous value, or null if none
     */
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        // 直接对size进行增加1即可, 如果是更新key的值,则不会运行到此处,即不会进行相加
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }
    // 删除元素的实现,同时维护 size 大小
    /**
     * Implements Map.remove and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @param value the value to match if matchValue, else ignored
     * @param matchValue if true only remove if value is equal
     * @param movable if false do not move other nodes while removing
     * @return the node, or null if none
     */
    final Node<K,V> removeNode(int hash, Object key, Object value,
                               boolean matchValue, boolean movable) {
        Node<K,V>[] tab; Node<K,V> p; int n, index;
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (p = tab[index = (n - 1) & hash]) != null) {
            Node<K,V> node = null, e; K k; V v;
            // 先查找node所在的位置
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                node = p;
            else if ((e = p.next) != null) {
                if (p instanceof TreeNode)
                    node = ((TreeNode<K,V>)p).getTreeNode(hash, key);
                else {
                    do {
                        if (e.hash == hash &&
                            ((k = e.key) == key ||
                             (key != null && key.equals(k)))) {
                            node = e;
                            break;
                        }
                        p = e;
                    } while ((e = e.next) != null);
                }
            }
            if (node != null && (!matchValue || (v = node.value) == value ||
                                 (value != null && value.equals(v)))) {
                if (node instanceof TreeNode)
                    ((TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
                else if (node == p)
                    tab[index] = node.next;
                else
                    p.next = node.next;
                ++modCount;
                // 直接减小size即可
                --size;
                afterNodeRemoval(node);
                return node;
            }
        }
        return null;
    }

因为有了增删改时对计数器的维护,所以在想要获取总数时,就容易许多了。只需把size字段返回即可。

    // HashMap.size()
    /**
     * Returns the number of key-value mappings in this map.
     *
     * @return the number of key-value mappings in this map
     */
    public int size() {
        return size;
    }

所以,在这种情况下,获取计数值的方式非常简单。但是不管怎么样,size字段对外部是不可见的,因为它是容器内部的一个实现逻辑,它完全在将来的某个时刻改变掉。即 size() != size .

2. 普通容器 --LinkedList

看完hash类的计数实现,咱们再来看另外一个类型的容器LinkedList:

    // LinkedList.add(E)   添加一个元素
    public boolean add(E e) {
        linkLast(e);
        return true;
    }
    /**
     * Links e as last element.
     */
    void linkLast(E e) {
        final Node<E> l = last;
        final Node<E> newNode = new Node<>(l, e, null);
        last = newNode;
        if (l == null)
            first = newNode;
        else
            l.next = newNode;
        // 同样,直接使用一个 size 计数器统计即可
        size++;
        modCount++;
    }
    
    // 删除一个元素, 同时维护 size 字段
    public E removeFirst() {
        final Node<E> f = first;
        if (f == null)
            throw new NoSuchElementException();
        return unlinkFirst(f);
    }    
    /**
     * Unlinks non-null first node f.
     */
    private E unlinkFirst(Node<E> f) {
        // assert f == first && f != null;
        final E element = f.item;
        final Node<E> next = f.next;
        f.item = null;
        f.next = null; // help GC
        first = next;
        if (next == null)
            last = null;
        else
            next.prev = null;
        // 元素计数减1
        size--;
        modCount++;
        return element;
    }

    // 同样,统计元素数量时,直接返回size即可
    public int size() {
        return size;
    }

可见,LinkedList 也同样是简单地维护一个计数器字段,从而实现了高效地计数方法。而这简单地实现,则是基于单线程的访问的,它同时维护一个计数字段,基本没有多少开销,却给取值时带来了便利。

总结: 普通容器直接维护一个计数器字段,可以很方便地进行大小统计操作。

3. 并发容器 --ConcurrentHashMap

而对于并发容器,则可能会不一样些,但也有一些情况是一样的。比较,HashTable, 直接使用 synchronized 来保证线程安全,则它也同样可以直接使用一个size即可完成元素大小的统计。事实上,有些版本的HashTable仅仅是在HashMap的上面加上了synchronizd锁而已(有些版本则是 不一样的哦),细节咱们无需再看。

而稍微有点不一样的如: ConcurrentHashMap.size(), 早期的 ConcurrentHashMap 使用分段锁,则需要统计各segement的元素,相加起来然后得到整体元素大小. 而jdk1.8中,已经放弃使用分段锁来实现高性能安全的hash容器了,而是直接使用 synchronized + CAS + 红黑树 实现. 那么,我们来看看其实现元素统计这一功能的实现有何不同吧!

    // ConcurrentHashMap.putVal()  新增或修改一个元素
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 主要是在进行新增成功时,再进行计数器的操作, 看起来不是 ++size 这么简单了
        addCount(1L, binCount);
        return null;
    }

    // 这个计数的相加看起来相当复杂
    /**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     *
     * @param x the count to add
     * @param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 使用 CounterCell 来实现计数操作
        // 使用 CAS 保证更新计数时只会有一个线程成功
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                // 使用一个类似随机负载均衡的方式,将计数值随机添加到 CounterCell 的某个值下面,减少多线程竞争的可能性
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                // 通过cas将计数值x添加到 CounterCell 的 value 字段中
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 如果上面添加失败,则使用 fullAddCount 进行重新添加该计数
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            // 基于 CounterCell 做一此汇总操作
            s = sumCount();
        }
        // 在进行put值时, check的值都是大于等于0的
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // rehash 处理
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // 辅助进行hash扩容
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    // fullAddCount 比较复杂, 它的目的是为了保证多线程可以快速进行添加完成, 目标很简单, 即向数组 CounterCell 中添加一个值 x
    // See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }
    // ConcurrentHashMap.remove 删除元素
    /**
     * Implementation for the four public remove/replace methods:
     * Replaces node value with v, conditional upon match of cv if
     * non-null.  If resulting value is null, delete.
     */
    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        // 删除元素
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    // 删除元素
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        // value = null, 代表需要将元素删除,所以需要对计数器做减1操作
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

同样是由于在增删时,维护一个计数器(CounterCell数组), 所以对于返回计数值操作则会比较简单化:

    // ConcurrentHashMap.size()
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    // 直接将 CounterCell 中的值相加起来即可
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

虽然ConcurrentHash的元素本身没有使用分段式存储了,但是其计数值还是存在了多个 CounterCell 中,目的自然是为了减少多线程竞争对计数器的更新成性能瓶颈。在进行 size() 计数时,并未有上锁操作,整个 CounterCell 使用 volatile 修饰,保证其可见性,但是整个size 却是不保证绝对准确的哦。

4. 并发容器 --ArrayBlockingQueue

下面我们再来看看另一各类型的并发容器: ArrayBlockingQueue

    // ArrayBlockingQueue.offer()
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 直接上锁操作
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                // 进行入队操作
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        // 同样,它还是通过一个  count 的计数器完成统计工作
        count++;
        notEmpty.signal();
    }
    // 移除动作时,也需要维护 count 的值
    /**
     * Deletes item at array index removeIndex.
     * Utility for remove(Object) and iterator.remove.
     * Call only when holding lock.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            // 移除成功, 将计数器减1
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            // 通过轮询的方式, 必然有一个元素被删除
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            // 计数器相减
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

同样是维护了一个计数器,但是因为有上锁机制的保证,整个过程看起来就简单了许多。在获取元素大小时,自然也就简单了.

    // ArrayBlockingQueue.size()
    /**
     * Returns the number of elements in this queue.
     *
     * @return the number of elements in this queue
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

但是它为了保证结果的准确性,在计数时,同样进行了上锁操作。可见,并发容器的实现思路也基本一致.并无太多奇淫技巧. 咱们再来看一下并发容器的实现: CopyOnWriteArrayList

5. 并发容器 --CopyOnWriteArrayList

顾名思义,是在写操作的时候,使用复制方式进行实现。

    // CopyOnWriteArrayList.add()
    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        // 同样上锁保证线程安全
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 将元素copy出来, 但其并非维护一个len字段
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    // CopyOnWriteArrayList, 删除一个字段, 同其名称一样, 还是使用写时复制实现 
    public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                // 找到移除的字段位置, 依次复制其前后元素到新数组中,完成功能
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

    
    // CopyOnWriteArrayList.size(), 直接使用数组长度字段
    /**
     * Returns the number of elements in this list.
     *
     * @return the number of elements in this list
     */
    public int size() {
        // 获取元素大小时,直接获取所有元素,取数组的长度即可. 借用jvm提供的数组长度元信息实现
        return getArray().length;
    }
    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        // 该array字段一定是要保证可见性的, 即至少得是  volatile 修饰的数据
        return array;
    }

CopyOnWriteArrayList, 因为其语义决定,其在一定程度上是线程安全的,所以,在读操作时,就不需要上锁,从而性能在某些场景会比较好。

根据功能特性的不同, CopyOnWriteArrayList 采用了一个不同实现方式, 实现了元素的统计功能. 另外像 SynchronousQueue#size, 则永久返回0, 因为它的定义是当被放一个元素后,必须等到有线程消费之后才可返回,而其本身并不存储元素. 所以, 虽然元素计数道理比较简单通用, 但是还是要按照具体的场景进行相应的实现, 才能满足具体的需求. 即不可脱离场景谈技术.

6. 更多计数

类似数据库类的产品,同样的这样的计数刚性需求,各自实现方式也有不同,但大体思路也差不多。比如 redis 的计数使用在计数时临时遍历元素实现,mysql myisam 引擎使用一个表级的计数器等等。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK