| 1 | package org.springframework.batch.core.step.item; |
| 2 | |
| 3 | import java.util.ArrayList; |
| 4 | import java.util.Arrays; |
| 5 | import java.util.HashMap; |
| 6 | import java.util.List; |
| 7 | |
| 8 | import org.springframework.batch.core.SkipListener; |
| 9 | import org.springframework.batch.core.StepContribution; |
| 10 | import org.springframework.batch.core.listener.CompositeSkipListener; |
| 11 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
| 12 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
| 13 | import org.springframework.batch.core.step.skip.NonSkippableException; |
| 14 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
| 15 | import org.springframework.batch.item.ItemKeyGenerator; |
| 16 | import org.springframework.batch.item.ItemReader; |
| 17 | import org.springframework.batch.item.ItemWriter; |
| 18 | import org.springframework.batch.retry.RecoveryCallback; |
| 19 | import org.springframework.batch.retry.RetryCallback; |
| 20 | import org.springframework.batch.retry.RetryContext; |
| 21 | import org.springframework.batch.retry.RetryException; |
| 22 | import org.springframework.batch.retry.RetryListener; |
| 23 | import org.springframework.batch.retry.RetryOperations; |
| 24 | import org.springframework.batch.retry.RetryPolicy; |
| 25 | import org.springframework.batch.retry.backoff.BackOffPolicy; |
| 26 | import org.springframework.batch.retry.callback.RecoveryRetryCallback; |
| 27 | import org.springframework.batch.retry.policy.ExceptionClassifierRetryPolicy; |
| 28 | import org.springframework.batch.retry.policy.MapRetryContextCache; |
| 29 | import org.springframework.batch.retry.policy.NeverRetryPolicy; |
| 30 | import org.springframework.batch.retry.policy.RecoveryCallbackRetryPolicy; |
| 31 | import org.springframework.batch.retry.policy.SimpleRetryPolicy; |
| 32 | import org.springframework.batch.retry.support.RetryTemplate; |
| 33 | import org.springframework.batch.support.SubclassExceptionClassifier; |
| 34 | import org.springframework.util.Assert; |
| 35 | |
| 36 | /** |
| 37 | * Factory bean for step that provides options for configuring skip behavior. |
| 38 | * User can set {@link #setSkipLimit(int)} to set how many exceptions of |
| 39 | * {@link #setSkippableExceptionClasses(Class[])} types are tolerated. |
| 40 | * {@link #setFatalExceptionClasses(Class[])} will cause immediate termination |
| 41 | * of job - they are treated as higher priority than |
| 42 | * {@link #setSkippableExceptionClasses(Class[])}, so the two lists don't need |
| 43 | * to be exclusive. |
| 44 | * |
| 45 | * Skippable exceptions on write will by default cause transaction rollback - to |
| 46 | * avoid rollback for specific exception class include it in the transaction |
| 47 | * attribute as "no rollback for". |
| 48 | * |
| 49 | * @see SimpleStepFactoryBean |
| 50 | * |
| 51 | * @author Dave Syer |
| 52 | * @author Robert Kasanicky |
| 53 | * |
| 54 | */ |
| 55 | public class SkipLimitStepFactoryBean extends SimpleStepFactoryBean { |
| 56 | |
| 57 | private int skipLimit = 0; |
| 58 | |
| 59 | private Class[] skippableExceptionClasses = new Class[] { Exception.class }; |
| 60 | |
| 61 | private Class[] fatalExceptionClasses = new Class[] { Error.class }; |
| 62 | |
| 63 | private ItemKeyGenerator itemKeyGenerator; |
| 64 | |
| 65 | private int cacheCapacity = 0; |
| 66 | |
| 67 | private int retryLimit = 1; |
| 68 | |
| 69 | private Class[] retryableExceptionClasses = new Class[] {}; |
| 70 | |
| 71 | private BackOffPolicy backOffPolicy; |
| 72 | |
| 73 | private RetryListener[] retryListeners; |
| 74 | |
| 75 | private RetryPolicy retryPolicy; |
| 76 | |
| 77 | /** |
| 78 | * Setter for the retry policy. If this is specified the other retry |
| 79 | * properties are ignored (retryLimit, backOffPolicy, |
| 80 | * retryableExceptionClasses). |
| 81 | * |
| 82 | * @param retryPolicy a stateless {@link RetryPolicy} |
| 83 | */ |
| 84 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
| 85 | this.retryPolicy = retryPolicy; |
| 86 | } |
| 87 | |
| 88 | /** |
| 89 | * Public setter for the retry limit. Each item can be retried up to this |
| 90 | * limit. Note the limit includes the initial attempt, so it must be greater |
| 91 | * or equal to 1. |
| 92 | * |
| 93 | * @param retryLimit the retry limit to set |
| 94 | */ |
| 95 | public void setRetryLimit(int retryLimit) { |
| 96 | Assert.isTrue(retryLimit >= 1, "retry limit must be greater or equal to 1"); |
| 97 | this.retryLimit = retryLimit; |
| 98 | } |
| 99 | |
| 100 | /** |
| 101 | * Public setter for the capacity of the cache in the retry policy. If more |
| 102 | * items than this fail without being skipped or recovered an exception will |
| 103 | * be thrown. This is to guard against inadvertent infinite loops generated |
| 104 | * by item identity problems. If a large number of items are failing and not |
| 105 | * being recognized as skipped, it usually signals a problem with the key |
| 106 | * generation (often equals and hashCode in the item itself). So it is |
| 107 | * better to enforce a strict limit than have weird looking errors, where a |
| 108 | * skip limit is reached without anything being skipped.<br/> |
| 109 | * |
| 110 | * The default value should be high enough and more for most purposes. To |
| 111 | * breach the limit in a single-threaded step typically you have to have |
| 112 | * this many failures in a single transaction. Defaults to the value in the |
| 113 | * {@link MapRetryContextCache}. |
| 114 | * |
| 115 | * @param cacheCapacity the cacheCapacity to set |
| 116 | */ |
| 117 | public void setCacheCapacity(int cacheCapacity) { |
| 118 | this.cacheCapacity = cacheCapacity; |
| 119 | } |
| 120 | |
| 121 | /** |
| 122 | * Public setter for the Class[]. |
| 123 | * |
| 124 | * @param retryableExceptionClasses the retryableExceptionClasses to set |
| 125 | */ |
| 126 | public void setRetryableExceptionClasses(Class[] retryableExceptionClasses) { |
| 127 | this.retryableExceptionClasses = retryableExceptionClasses; |
| 128 | } |
| 129 | |
| 130 | /** |
| 131 | * Public setter for the {@link BackOffPolicy}. |
| 132 | * |
| 133 | * @param backOffPolicy the {@link BackOffPolicy} to set |
| 134 | */ |
| 135 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
| 136 | this.backOffPolicy = backOffPolicy; |
| 137 | } |
| 138 | |
| 139 | /** |
| 140 | * Public setter for the {@link RetryListener}s. |
| 141 | * |
| 142 | * @param retryListeners the {@link RetryListener}s to set |
| 143 | */ |
| 144 | public void setRetryListeners(RetryListener[] retryListeners) { |
| 145 | this.retryListeners = retryListeners; |
| 146 | } |
| 147 | |
| 148 | /** |
| 149 | * Public setter for a limit that determines skip policy. If this value is |
| 150 | * positive then an exception in chunk processing will cause the item to be |
| 151 | * skipped and no exception propagated until the limit is reached. If it is |
| 152 | * zero then all exceptions will be propagated from the chunk and cause the |
| 153 | * step to abort. |
| 154 | * |
| 155 | * Note that if chunks are executed concurrently the number of skips can |
| 156 | * potentially exceed the skip limit and step can still finish successfully. |
| 157 | * This is due to the fact that overall skip count is not being synchronized |
| 158 | * between concurrent chunks while they processing, only on chunk |
| 159 | * boundaries. |
| 160 | * |
| 161 | * @param skipLimit the value to set. Default is 0 (never skip). |
| 162 | */ |
| 163 | public void setSkipLimit(int skipLimit) { |
| 164 | this.skipLimit = skipLimit; |
| 165 | } |
| 166 | |
| 167 | /** |
| 168 | * Public setter for exception classes that when raised won't crash the job |
| 169 | * but will result in transaction rollback and the item which handling |
| 170 | * caused the exception will be skipped. |
| 171 | * |
| 172 | * @param exceptionClasses defaults to <code>Exception</code> |
| 173 | */ |
| 174 | public void setSkippableExceptionClasses(Class[] exceptionClasses) { |
| 175 | this.skippableExceptionClasses = exceptionClasses; |
| 176 | } |
| 177 | |
| 178 | /** |
| 179 | * Public setter for exception classes that should cause immediate failure. |
| 180 | * |
| 181 | * @param fatalExceptionClasses {@link Error} by default |
| 182 | */ |
| 183 | public void setFatalExceptionClasses(Class[] fatalExceptionClasses) { |
| 184 | this.fatalExceptionClasses = fatalExceptionClasses; |
| 185 | } |
| 186 | |
| 187 | /** |
| 188 | * Public setter for the {@link ItemKeyGenerator}. This is used to identify |
| 189 | * failed items so they can be skipped if encountered again, generally in |
| 190 | * another transaction. |
| 191 | * |
| 192 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. |
| 193 | */ |
| 194 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
| 195 | this.itemKeyGenerator = itemKeyGenerator; |
| 196 | } |
| 197 | |
| 198 | /** |
| 199 | * Uses the {@link #setSkipLimit(int)} value to configure item handler and |
| 200 | * and exception handler. |
| 201 | */ |
| 202 | protected void applyConfiguration(ItemOrientedStep step) { |
| 203 | super.applyConfiguration(step); |
| 204 | |
| 205 | if (retryLimit > 1 || skipLimit > 0 || retryPolicy != null) { |
| 206 | |
| 207 | addFatalExceptionIfMissing(SkipLimitExceededException.class); |
| 208 | addFatalExceptionIfMissing(NonSkippableException.class); |
| 209 | addFatalExceptionIfMissing(RetryException.class); |
| 210 | |
| 211 | if (retryPolicy == null) { |
| 212 | |
| 213 | SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(retryLimit); |
| 214 | if (retryableExceptionClasses.length > 0) { // otherwise we |
| 215 | // retry |
| 216 | // all exceptions |
| 217 | simpleRetryPolicy.setRetryableExceptionClasses(retryableExceptionClasses); |
| 218 | } |
| 219 | simpleRetryPolicy.setFatalExceptionClasses(fatalExceptionClasses); |
| 220 | |
| 221 | ExceptionClassifierRetryPolicy classifierRetryPolicy = new ExceptionClassifierRetryPolicy(); |
| 222 | SubclassExceptionClassifier exceptionClassifier = new SubclassExceptionClassifier(); |
| 223 | HashMap exceptionTypeMap = new HashMap(); |
| 224 | for (int i = 0; i < retryableExceptionClasses.length; i++) { |
| 225 | Class cls = retryableExceptionClasses[i]; |
| 226 | exceptionTypeMap.put(cls, "retry"); |
| 227 | } |
| 228 | exceptionClassifier.setTypeMap(exceptionTypeMap); |
| 229 | HashMap retryPolicyMap = new HashMap(); |
| 230 | retryPolicyMap.put("retry", simpleRetryPolicy); |
| 231 | retryPolicyMap.put("default", new NeverRetryPolicy()); |
| 232 | classifierRetryPolicy.setPolicyMap(retryPolicyMap); |
| 233 | classifierRetryPolicy.setExceptionClassifier(exceptionClassifier); |
| 234 | retryPolicy = classifierRetryPolicy; |
| 235 | |
| 236 | } |
| 237 | |
| 238 | // Co-ordinate the retry policy with the exception handler: |
| 239 | getStepOperations().setExceptionHandler( |
| 240 | new SimpleRetryExceptionHandler(retryPolicy, getExceptionHandler(), fatalExceptionClasses)); |
| 241 | |
| 242 | RecoveryCallbackRetryPolicy recoveryCallbackRetryPolicy = new RecoveryCallbackRetryPolicy(retryPolicy) { |
| 243 | protected boolean recoverForException(Throwable ex) { |
| 244 | return !getTransactionAttribute().rollbackOn(ex); |
| 245 | } |
| 246 | }; |
| 247 | if (cacheCapacity > 0) { |
| 248 | recoveryCallbackRetryPolicy.setRetryContextCache(new MapRetryContextCache(cacheCapacity)); |
| 249 | } |
| 250 | |
| 251 | RetryTemplate retryTemplate = new RetryTemplate(); |
| 252 | if (retryListeners != null) { |
| 253 | retryTemplate.setListeners(retryListeners); |
| 254 | } |
| 255 | retryTemplate.setRetryPolicy(recoveryCallbackRetryPolicy); |
| 256 | if (backOffPolicy != null) { |
| 257 | retryTemplate.setBackOffPolicy(backOffPolicy); |
| 258 | } |
| 259 | |
| 260 | List exceptions = new ArrayList(Arrays.asList(skippableExceptionClasses)); |
| 261 | ItemSkipPolicy readSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, exceptions, Arrays |
| 262 | .asList(fatalExceptionClasses)); |
| 263 | exceptions.addAll(Arrays.asList(retryableExceptionClasses)); |
| 264 | ItemSkipPolicy writeSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, exceptions, Arrays |
| 265 | .asList(fatalExceptionClasses)); |
| 266 | StatefulRetryItemHandler itemHandler = new StatefulRetryItemHandler(getItemReader(), getItemWriter(), |
| 267 | retryTemplate, itemKeyGenerator, readSkipPolicy, writeSkipPolicy); |
| 268 | itemHandler.setSkipListeners(BatchListenerFactoryHelper.getSkipListeners(getListeners())); |
| 269 | |
| 270 | step.setItemHandler(itemHandler); |
| 271 | |
| 272 | } |
| 273 | else { |
| 274 | // This is the default in ItemOrientedStep anyway... |
| 275 | step.setItemHandler(new SimpleItemHandler(getItemReader(), getItemWriter())); |
| 276 | } |
| 277 | |
| 278 | } |
| 279 | |
| 280 | public void addFatalExceptionIfMissing(Class cls) { |
| 281 | List fatalExceptionList = new ArrayList(Arrays.asList(fatalExceptionClasses)); |
| 282 | if (!fatalExceptionList.contains(cls)) { |
| 283 | fatalExceptionList.add(cls); |
| 284 | } |
| 285 | fatalExceptionClasses = (Class[]) fatalExceptionList.toArray(new Class[0]); |
| 286 | } |
| 287 | |
| 288 | /** |
| 289 | * If there is an exception on input it is skipped if allowed. If there is |
| 290 | * an exception on output, it will be re-thrown in any case, and the |
| 291 | * behaviour when the item is next encountered depends on the retryable and |
| 292 | * skippable exception configuration. If the exception is retryable the |
| 293 | * write will be attempted again up to the retry limit. When retry attempts |
| 294 | * are exhausted the skip listener is invoked and the skip count |
| 295 | * incremented. A retryable exception is thus also effectively also |
| 296 | * implicitly skippable. |
| 297 | * |
| 298 | * @author Dave Syer |
| 299 | * |
| 300 | */ |
| 301 | private static class StatefulRetryItemHandler extends SimpleItemHandler { |
| 302 | |
| 303 | final private RetryOperations retryOperations; |
| 304 | |
| 305 | final private ItemKeyGenerator itemKeyGenerator; |
| 306 | |
| 307 | final private CompositeSkipListener listener = new CompositeSkipListener(); |
| 308 | |
| 309 | final private ItemSkipPolicy readSkipPolicy; |
| 310 | |
| 311 | final private ItemSkipPolicy writeSkipPolicy; |
| 312 | |
| 313 | /** |
| 314 | * @param itemReader |
| 315 | * @param itemWriter |
| 316 | * @param retryTemplate |
| 317 | * @param itemKeyGenerator |
| 318 | */ |
| 319 | public StatefulRetryItemHandler(ItemReader itemReader, ItemWriter itemWriter, RetryOperations retryTemplate, |
| 320 | ItemKeyGenerator itemKeyGenerator, ItemSkipPolicy readSkipPolicy, ItemSkipPolicy writeSkipPolicy) { |
| 321 | super(itemReader, itemWriter); |
| 322 | this.retryOperations = retryTemplate; |
| 323 | this.itemKeyGenerator = itemKeyGenerator; |
| 324 | this.readSkipPolicy = readSkipPolicy; |
| 325 | this.writeSkipPolicy = writeSkipPolicy; |
| 326 | } |
| 327 | |
| 328 | /** |
| 329 | * Register some {@link SkipListener}s with the handler. Each will get |
| 330 | * the callbacks in the order specified at the correct stage if a skip |
| 331 | * occurs. |
| 332 | * |
| 333 | * @param listeners |
| 334 | */ |
| 335 | public void setSkipListeners(SkipListener[] listeners) { |
| 336 | for (int i = 0; i < listeners.length; i++) { |
| 337 | registerSkipListener(listeners[i]); |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | /** |
| 342 | * Register a listener for callbacks at the appropriate stages in a skip |
| 343 | * process. |
| 344 | * |
| 345 | * @param listener a {@link SkipListener} |
| 346 | */ |
| 347 | public void registerSkipListener(SkipListener listener) { |
| 348 | this.listener.register(listener); |
| 349 | } |
| 350 | |
| 351 | /** |
| 352 | * Tries to read the item from the reader, in case of exception skip the |
| 353 | * item if the skip policy allows, otherwise re-throw. |
| 354 | * |
| 355 | * @param contribution current StepContribution holding skipped items |
| 356 | * count |
| 357 | * @return next item for processing |
| 358 | */ |
| 359 | protected Object read(StepContribution contribution) throws Exception { |
| 360 | |
| 361 | while (true) { |
| 362 | try { |
| 363 | return doRead(); |
| 364 | } |
| 365 | catch (Exception e) { |
| 366 | try { |
| 367 | if (readSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
| 368 | // increment skip count and try again |
| 369 | contribution.incrementTemporaryReadSkipCount(); |
| 370 | onSkipInRead(e); |
| 371 | logger.debug("Skipping failed input", e); |
| 372 | } |
| 373 | else { |
| 374 | throw new NonSkippableException("Non-skippable exception during read", e); |
| 375 | } |
| 376 | } |
| 377 | catch (SkipLimitExceededException ex) { |
| 378 | // we are headed for a abnormal ending so bake in the |
| 379 | // skip count |
| 380 | contribution.combineSkipCounts(); |
| 381 | throw ex; |
| 382 | } |
| 383 | } |
| 384 | } |
| 385 | |
| 386 | } |
| 387 | |
| 388 | /** |
| 389 | * Execute the business logic, delegating to the writer.<br/> |
| 390 | * |
| 391 | * Process the item with the {@link ItemWriter} in a stateful retry. Any |
| 392 | * {@link SkipListener} provided is called when retry attempts are |
| 393 | * exhausted. The listener callback (on write failure) will happen in |
| 394 | * the next transaction automatically.<br/> |
| 395 | * |
| 396 | * @see org.springframework.batch.core.step.item.SimpleItemHandler#write(java.lang.Object, |
| 397 | * org.springframework.batch.core.StepContribution) |
| 398 | */ |
| 399 | protected void write(final Object item, final StepContribution contribution) throws Exception { |
| 400 | RecoveryRetryCallback retryCallback = new RecoveryRetryCallback(item, new RetryCallback() { |
| 401 | public Object doWithRetry(RetryContext context) throws Throwable { |
| 402 | doWrite(item); |
| 403 | return null; |
| 404 | } |
| 405 | }, itemKeyGenerator != null ? itemKeyGenerator.getKey(item) : item); |
| 406 | retryCallback.setRecoveryCallback(new RecoveryCallback() { |
| 407 | public Object recover(RetryContext context) { |
| 408 | Throwable t = context.getLastThrowable(); |
| 409 | if (writeSkipPolicy.shouldSkip(t, contribution.getStepSkipCount())) { |
| 410 | listener.onSkipInWrite(item, t); |
| 411 | } |
| 412 | else { |
| 413 | throw new NonSkippableException("Non-skippable exception on write", t); |
| 414 | } |
| 415 | contribution.incrementWriteSkipCount(); |
| 416 | return null; |
| 417 | } |
| 418 | }); |
| 419 | retryOperations.execute(retryCallback); |
| 420 | } |
| 421 | |
| 422 | private void onSkipInRead(Exception e) { |
| 423 | |
| 424 | try { |
| 425 | listener.onSkipInRead(e); |
| 426 | } |
| 427 | catch (Exception ex) { |
| 428 | logger.debug("Error in SkipListener onSkipInReader encountered and ignored.", ex); |
| 429 | } |
| 430 | } |
| 431 | |
| 432 | } |
| 433 | |
| 434 | } |