Class MemoryBoundBlockingQueue<T extends Measurable>
- java.lang.Object
-
- com.linkedin.davinci.kafka.consumer.MemoryBoundBlockingQueue<T>
-
- Type Parameters:
T-
- All Implemented Interfaces:
java.lang.Iterable<T>,java.util.Collection<T>,java.util.concurrent.BlockingQueue<T>,java.util.Queue<T>
public class MemoryBoundBlockingQueue<T extends Measurable> extends java.lang.Object implements java.util.concurrent.BlockingQueue<T>This class is a generic implementation of a memory bound blocking queue. This blocking queue is bounded by the memory usage of eachMeasurableobject buffered inside. To guarantee some kind of fairness, you need to choose suitablenotifyDeltaInByteaccording to the max size of messages, which could be buffered. The reason behind this design: Considering some thread could put various sizes of messages into the shared queue,MemoryBoundBlockingQueuewon't notify the waiting thread (the 'put' thread) right away when some message gets processed until the freed memory hit the follow config:notifyDeltaInByte. The reason behind this design: When the buffered queue is full, and the processing thread keeps processing small message, the bigger message won't have chance to get queued into the buffer since the memory freed by the processed small message is not enough to fit the bigger message. With this delta config,MemoryBoundBlockingQueuewill guarantee some kind of fairness among various sizes of messages when buffered queue is full. When tuning this config, we need to consider the following tradeoffs: 1.notifyDeltaInBytemust be smaller thanmemoryCapacityInByte; 2. If the delta is too big, it will waste some buffer space since it won't notify the waiting threads even there are some memory available (less than the delta); 3. If the delta is too small, the big message may not be able to get chance to be buffered when the queue is full;
-
-
Field Summary
Fields Modifier and Type Field Description static intLINKED_QUEUE_NODE_OVERHEAD_IN_BYTEConsidering the node implementation:LinkedList.Node, the overhead is three references, which could be about 24 bytes, and the 'Node' object type itself could take 24 bytes.
-
Constructor Summary
Constructors Constructor Description MemoryBoundBlockingQueue(long memoryCapacityInByte, long notifyDeltaInByte)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description booleanadd(T t)booleanaddAll(java.util.Collection<? extends T> c)voidclear()booleancontains(java.lang.Object o)booleancontainsAll(java.util.Collection<?> c)intdrainTo(java.util.Collection<? super T> c)intdrainTo(java.util.Collection<? super T> c, int maxElements)Telement()longgetMemoryUsage()booleanisEmpty()java.util.Iterator<T>iterator()booleanoffer(T t)booleanoffer(T t, long timeout, java.util.concurrent.TimeUnit unit)Tpeek()Tpoll()Tpoll(long timeout, java.util.concurrent.TimeUnit unit)voidput(T record)intremainingCapacity()longremainingMemoryCapacityInByte()Tremove()booleanremove(java.lang.Object o)booleanremoveAll(java.util.Collection<?> c)booleanretainAll(java.util.Collection<?> c)intsize()Ttake()java.lang.Object[]toArray()<T1> T1[]toArray(T1[] a)-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
-
-
-
Field Detail
-
LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE
public static final int LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE
Considering the node implementation:LinkedList.Node, the overhead is three references, which could be about 24 bytes, and the 'Node' object type itself could take 24 bytes. We can adjust this value later if necessary.- See Also:
- Constant Field Values
-
-
Method Detail
-
getMemoryUsage
public long getMemoryUsage()
-
remainingMemoryCapacityInByte
public long remainingMemoryCapacityInByte()
-
put
public void put(T record) throws java.lang.InterruptedException
- Specified by:
putin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Throws:
java.lang.InterruptedException
-
take
public T take() throws java.lang.InterruptedException
- Specified by:
takein interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Throws:
java.lang.InterruptedException
-
offer
public boolean offer(T t, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
offerin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Throws:
java.lang.InterruptedException
-
add
public boolean add(T t)
- Specified by:
addin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Specified by:
addin interfacejava.util.Collection<T extends Measurable>- Specified by:
addin interfacejava.util.Queue<T extends Measurable>
-
offer
public boolean offer(T t)
- Specified by:
offerin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Specified by:
offerin interfacejava.util.Queue<T extends Measurable>
-
remove
public T remove()
- Specified by:
removein interfacejava.util.Queue<T extends Measurable>
-
poll
public T poll()
- Specified by:
pollin interfacejava.util.Queue<T extends Measurable>
-
element
public T element()
- Specified by:
elementin interfacejava.util.Queue<T extends Measurable>
-
peek
public T peek()
- Specified by:
peekin interfacejava.util.Queue<T extends Measurable>
-
poll
public T poll(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
pollin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Throws:
java.lang.InterruptedException
-
remainingCapacity
public int remainingCapacity()
- Specified by:
remainingCapacityin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
-
remove
public boolean remove(java.lang.Object o)
- Specified by:
removein interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Specified by:
removein interfacejava.util.Collection<T extends Measurable>
-
containsAll
public boolean containsAll(java.util.Collection<?> c)
- Specified by:
containsAllin interfacejava.util.Collection<T extends Measurable>
-
addAll
public boolean addAll(java.util.Collection<? extends T> c)
- Specified by:
addAllin interfacejava.util.Collection<T extends Measurable>
-
removeAll
public boolean removeAll(java.util.Collection<?> c)
- Specified by:
removeAllin interfacejava.util.Collection<T extends Measurable>
-
retainAll
public boolean retainAll(java.util.Collection<?> c)
- Specified by:
retainAllin interfacejava.util.Collection<T extends Measurable>
-
clear
public void clear()
- Specified by:
clearin interfacejava.util.Collection<T extends Measurable>
-
size
public int size()
- Specified by:
sizein interfacejava.util.Collection<T extends Measurable>
-
isEmpty
public boolean isEmpty()
- Specified by:
isEmptyin interfacejava.util.Collection<T extends Measurable>
-
contains
public boolean contains(java.lang.Object o)
- Specified by:
containsin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>- Specified by:
containsin interfacejava.util.Collection<T extends Measurable>
-
iterator
public java.util.Iterator<T> iterator()
- Specified by:
iteratorin interfacejava.util.Collection<T extends Measurable>- Specified by:
iteratorin interfacejava.lang.Iterable<T extends Measurable>
-
toArray
public java.lang.Object[] toArray()
- Specified by:
toArrayin interfacejava.util.Collection<T extends Measurable>
-
toArray
public <T1> T1[] toArray(T1[] a)
- Specified by:
toArrayin interfacejava.util.Collection<T extends Measurable>
-
drainTo
public int drainTo(java.util.Collection<? super T> c)
- Specified by:
drainToin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
-
drainTo
public int drainTo(java.util.Collection<? super T> c, int maxElements)
- Specified by:
drainToin interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
-
-