A Timer optimized for approximated I/O timeout scheduling.
Tick Duration
As described with 'approximated', this timer does not execute the scheduled
TimerTask on time. HashedWheelTimer, on every tick, will
check if there are any TimerTasks behind the schedule and execute
them.
You can increase or decrease the accuracy of the execution timing by
specifying smaller or larger tick duration in the constructor. In most
network applications, I/O timeout does not need to be accurate. Therefore,
the default tick duration is 100 milliseconds and you will not need to try
different configurations in most cases.
Ticks per Wheel (Wheel Size)
HashedWheelTimer maintains a data structure called 'wheel'.
To put simply, a wheel is a hash table of TimerTasks whose hash
function is 'dead line of the task'. The default number of ticks per wheel
(i.e. the size of the wheel) is 512. You could specify a larger value
if you are going to schedule a lot of timeouts.
Do not create many instances.
HashedWheelTimer creates a new thread whenever it is instantiated and
started. Therefore, you should make sure to create only one instance and
share it across your application. One of the common mistakes, that makes
your application unresponsive, is to create a new instance for every connection.
/** * A handle associated with a {@link TimerTask} that is returned by a * {@link Timer}. */ publicinterfaceTimeout {
/** * Returns the {@link Timer} that created this handle. */ Timer timer();
/** * Returns the {@link TimerTask} which is associated with this handle. */ TimerTask task();
/** * Returns {@code true} if and only if the {@link TimerTask} associated * with this handle has been expired. */ booleanisExpired();
/** * Returns {@code true} if and only if the {@link TimerTask} associated * with this handle has been cancelled. */ booleanisCancelled();
/** * Attempts to cancel the {@link TimerTask} associated with this handle. * If the task has been executed or cancelled already, it will return with * no side effect. * * @return True if the cancellation completed successfully, otherwise false */ booleancancel(); }
TimerTask 的接口定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * A task which is executed after the delay specified with * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. */ publicinterfaceTimerTask {
/** * Executed after the delay specified with * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. * * @param timeout a handle which is associated with this task */ voidrun(Timeout timeout)throws Exception; }
/** * Schedules {@link TimerTask}s for one-time future execution in a background * thread. */ publicinterfaceTimer {
/** * Schedules the specified {@link TimerTask} for one-time execution after * the specified delay. * * @return a handle which is associated with the specified task * * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout * can cause instability in the system. */ Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/** * Releases all resources acquired by this {@link Timer} and cancels all * tasks which were scheduled but not executed yet. * * @return the handles associated with the tasks which were canceled by * this method */ Set<Timeout> stop(); }
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); thrownewRejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); }
start();
// Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. longdeadline= System.nanoTime() + unit.toNanos(delay) - startTime;
publicvoidstart() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: thrownewIllegalStateException("cannot be started once stopped"); default: thrownewError("Invalid WorkerState"); }
// Wait until the startTime is initialized by the worker. while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
@Override publicvoidrun() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; }
// Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown();
do { finallongdeadline= waitForNextTick(); if (deadline > 0) { intidx= (int) (tick & mask); processCancelledTasks(); HashedWheelBucketbucket= wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeouttimeout= timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
正常运行时程序一直都处在 while 循环中:
1 2 3 4 5 6 7 8 9 10 11 12
do { finallongdeadline= waitForNextTick(); if (deadline > 0) { intidx= (int) (tick & mask); processCancelledTasks(); HashedWheelBucketbucket= wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// process all timeouts while (timeout != null) { HashedWheelTimeoutnext= timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. thrownewIllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } elseif (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds --; } timeout = next; } }
privatevoidtransferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. for (inti=0; i < 100000; i++) { HashedWheelTimeouttimeout= timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; }