1 /*
2 * Copyright 2009-2010 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.springframework.batch.admin.util;
17
18 import org.springframework.core.task.SyncTaskExecutor;
19 import org.springframework.core.task.TaskExecutor;
20 import org.springframework.core.task.TaskRejectedException;
21
22 import java.util.concurrent.FutureTask;
23 import java.util.concurrent.Semaphore;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 /**
27 * <p>
28 * A {@link TaskExecutor} with a throttle limit which works by delegating to an
29 * existing task executor and limiting the number of tasks submitted.
30 * </p>
31 * <p>
32 * A throttle limit is provided to limit the number of pending requests over and
33 * above the features provided by the other task executors. The submit method
34 * blocks until there are results available to retrieve. This limit is different
35 * (and orthogonal) to any queue size imposed by the delegate
36 * {@link TaskExecutor}: such queues normally do not throttle, in the sense that
37 * they always accept more work, until they fill up, at which point they reject.
38 * The point of a throttle is to not reject any work, but to still limit the
39 * number of concurrent tasks.
40 * </p>
41 * @author Dave Syer
42 *
43 */
44 public class ThrottledTaskExecutor implements TaskExecutor {
45
46 private Semaphore semaphore;
47
48 private volatile AtomicInteger count = new AtomicInteger(0);
49
50 private TaskExecutor taskExecutor = new SyncTaskExecutor();
51
52 /**
53 * Create a {@link ThrottledTaskExecutor} with infinite
54 * (Integer.MAX_VALUE) throttle limit. A task can always be submitted.
55 */
56 public ThrottledTaskExecutor() {
57 this(null, Integer.MAX_VALUE);
58 }
59
60 /**
61 * Create a {@link ThrottledTaskExecutor} with infinite
62 * (Integer.MAX_VALUE) throttle limit. A task can always be submitted.
63 *
64 * @param taskExecutor the {@link TaskExecutor} to use
65 */
66 public ThrottledTaskExecutor(TaskExecutor taskExecutor) {
67 this(taskExecutor, Integer.MAX_VALUE);
68 }
69
70 /**
71 * Create a {@link ThrottledTaskExecutor} with finite throttle
72 * limit. The submit method will block when this limit is reached until one
73 * of the tasks has finished.
74 *
75 * @param taskExecutor the {@link TaskExecutor} to use
76 * @param throttleLimit the throttle limit
77 */
78 public ThrottledTaskExecutor(TaskExecutor taskExecutor, int throttleLimit) {
79 super();
80 if (taskExecutor != null) {
81 this.taskExecutor = taskExecutor;
82 }
83 this.semaphore = new Semaphore(throttleLimit);
84 }
85
86 /**
87 * Limits the number of concurrent executions on the enclosed task executor.
88 * Do not call this after initialization (for configuration purposes only).
89 *
90 * @param throttleLimit the throttle limit to apply
91 */
92 public void setThrottleLimit(int throttleLimit) {
93 this.semaphore = new Semaphore(throttleLimit);
94 }
95
96 /**
97 * Public setter for the {@link TaskExecutor} to be used to execute the
98 * tasks submitted. The default is synchronous, executing tasks on the
99 * calling thread. In this case the throttle limit is irrelevant as there
100 * will always be at most one task pending.
101 *
102 * @param taskExecutor {@link org.springframework.core.task.TaskExecutor}
103 */
104 public void setTaskExecutor(TaskExecutor taskExecutor) {
105 this.taskExecutor = taskExecutor;
106 }
107
108 /**
109 * Submit a task for execution by the delegate task executor, blocking if
110 * the throttleLimit is exceeded.
111 *
112 * @see TaskExecutor#execute(Runnable)
113 */
114 public void execute(Runnable task) {
115 if (task == null) {
116 throw new NullPointerException("Task is null in ThrottledTaskExecutor.");
117 }
118 doSubmit(task);
119 }
120
121 /**
122 * Get an estimate of the number of pending requests.
123 *
124 * @return the estimate
125 */
126 public int size() {
127 return count.get();
128 }
129
130 private Runnable doSubmit(final Runnable task) {
131
132 try {
133 semaphore.acquire();
134 count.incrementAndGet();
135 }
136 catch (InterruptedException e) {
137 Thread.currentThread().interrupt();
138 throw new TaskRejectedException("Task could not be submitted because of a thread interruption.");
139 }
140
141 try {
142 taskExecutor.execute(new FutureTask<Object>(task, null) {
143 @Override
144 protected void done() {
145 semaphore.release();
146 count.decrementAndGet();
147 }
148 });
149 }
150 catch (TaskRejectedException e) {
151 semaphore.release();
152 count.decrementAndGet();
153 throw e;
154 }
155
156 return task;
157 }
158 }