Class AbstractCorrelatingMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler
- All Implemented Interfaces:
- org.reactivestreams.Subscriber<Message<?>>,- Aware,- BeanFactoryAware,- BeanNameAware,- DisposableBean,- InitializingBean,- ApplicationContextAware,- ApplicationEventPublisherAware,- Lifecycle,- Ordered,- ComponentSourceAware,- ExpressionCapable,- Orderable,- MessageProducer,- DiscardingMessageHandler,- HeaderPropagationAware,- IntegrationPattern,- NamedComponent,- IntegrationManagement,- ManageableLifecycle,- TrackableComponent,- MessageHandler,- reactor.core.CoreSubscriber<Message<?>>
- Direct Known Subclasses:
- AggregatingMessageHandler,- ResequencingMessageHandler
public abstract class AbstractCorrelatingMessageHandler
extends AbstractMessageProducingHandler
implements DiscardingMessageHandler, ApplicationEventPublisherAware, ManageableLifecycle
Abstract Message handler that holds a buffer of correlated messages in a
 
MessageStore.
 This class takes care of correlated groups of messages
 that can be completed in batches. It is useful for custom implementation of
 MessageHandlers that require correlation and is used as a base class for Aggregator -
 AggregatingMessageHandler and Resequencer - ResequencingMessageHandler,
 or custom implementations requiring correlation.
 
 To customize this handler inject CorrelationStrategy,
 ReleaseStrategy, and MessageGroupProcessor implementations as
 you require.
 
 By default, the CorrelationStrategy will be a
 HeaderAttributeCorrelationStrategy and the ReleaseStrategy will be a
 SequenceSizeReleaseStrategy.
 
 Use proper CorrelationStrategy for cases when same
 MessageStore is used
 for multiple handlers to ensure uniqueness of message groups across handlers.
 
 When the expireTimeout is greater than 0, groups which are older than this timeout
 are purged from the store on start up (or when purgeOrphanedGroups() is called).
 If expireDuration is provided, the task is scheduled to perform
 purgeOrphanedGroups() periodically.
- Since:
- 2.0
- Author:
- Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell, Artem Bilan, David Liu, Enrique Rodriguez, Meherzad Lahewala, Jayadev Sirimamilla, Ngoc Nhan
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionprotected static classNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagementIntegrationManagement.ManagementOverrides
- 
Field SummaryFields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandlermessagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupportEXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagementMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.core.OrderedHIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
- 
Constructor SummaryConstructorsConstructorDescriptionAbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store) AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) 
- 
Method SummaryModifier and TypeMethodDescriptionprotected abstract voidafterRelease(MessageGroup group, Collection<Message<?>> completedMessages) Allows you to provide additional logic that needs to be performed after the MessageGroup was released.protected voidafterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) Subclasses may override if special action is needed because the group was released or discarded due to a timeout.protected voidcompleteGroup(Object correlationKey, MessageGroup group, Lock lock) protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group, Lock lock) voiddestroy()protected voidexpireGroup(Object correlationKey, MessageGroup group, Lock lock) protected intfindLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) protected voidforceComplete(MessageGroup group) Subclasses may implement this method to provide component type information.protected CorrelationStrategyReturn the discard channel.protected Stringprotected EvaluationContextprotected Map<UUID, ScheduledFuture<?>> protected BiFunction<Message<?>, String, String> protected Expressionprotected LockRegistryprotected longReturn a configuredMessageGroupProcessor.protected ReleaseStrategyprotected voidhandleMessageInternal(Message<?> message) protected booleanprotected booleanprotected booleanprotected booleanbooleanprotected booleanprotected booleanprotected ObjectobtainGroupTimeout(MessageGroup group) protected voidonInit()Subclasses may implement this for initialization logic.voidPerform aMessageGroupStore.expireMessageGroups(long)with the providedexpireTimeout.protected voidremove(MessageGroup group) voidsetApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) voidsetCorrelationStrategy(CorrelationStrategy correlationStrategy) voidsetDiscardChannel(MessageChannel discardChannel) voidsetDiscardChannelName(String discardChannelName) voidsetDiscardIndividuallyOnExpiry(boolean discardIndividuallyOnExpiry) Set tofalseto send to discard channel a whole expired group as a single message.voidsetExpireDuration(Duration expireDuration) Configure aDurationhow often to clean up old orphaned groups from the store.voidsetExpireDurationMillis(long expireDuration) Configure aDuration(in millis) how often to clean up old orphaned groups from the store.voidsetExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) Expire (completely remove) a group if it is completed due to timeout.voidsetExpireTimeout(long expireTimeout) Configure a timeout in milliseconds for purging old orphaned groups from the store.voidsetForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain) voidsetGroupConditionSupplier(BiFunction<Message<?>, String, String> conditionSupplier) Configure aBiFunctionto supply a group condition from a message to be added to the group.voidsetGroupTimeoutExpression(Expression groupTimeoutExpression) voidsetLockRegistry(LockRegistry lockRegistry) final voidsetMessageStore(MessageGroupStore store) voidsetMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed.voidsetOutputProcessor(MessageGroupProcessor outputProcessor) Specify aMessageGroupProcessorfor the output function.voidsetPopSequence(boolean popSequence) Perform aBaseMessageBuilder.popSequenceDetails()for output message or not.voidsetReleaseLockBeforeSend(boolean releaseLockBeforeSend) Set to true to release the message group lock before sending any output.voidsetReleasePartialSequences(boolean releasePartialSequences) SetreleasePartialSequenceson an underlying defaultSequenceSizeReleaseStrategy.voidsetReleaseStrategy(ReleaseStrategy releaseStrategy) voidsetSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) voidstart()voidstop()protected MessageGroupprotected voidverifyResultCollectionConsistsOfMessages(Collection<?> elements) Deprecated, for removal: This API element is subject to removal in a future version.without replacement - out of use from now on.Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandleraddNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandlerhandleMessage, onComplete, onError, onNext, onSubscribe, setObservationConventionMethods inherited from class org.springframework.integration.handler.MessageHandlerSupportbuildSendTimer, getIntegrationPatternType, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupportafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface reactor.core.CoreSubscribercurrentContextMethods inherited from interface org.springframework.integration.support.management.IntegrationManagementgetThisAsMethods inherited from interface org.springframework.messaging.MessageHandlerhandleMessageMethods inherited from interface org.springframework.integration.support.context.NamedComponentgetBeanName, getComponentName
- 
Constructor Details- 
AbstractCorrelatingMessageHandlerpublic AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) 
- 
AbstractCorrelatingMessageHandler
- 
AbstractCorrelatingMessageHandler
 
- 
- 
Method Details- 
setLockRegistry
- 
setMessageStore
- 
setCorrelationStrategy
- 
setReleaseStrategy
- 
setGroupTimeoutExpression
- 
setForceReleaseAdviceChain
- 
setOutputProcessorSpecify aMessageGroupProcessorfor the output function.- Parameters:
- outputProcessor- the- MessageGroupProcessorto use
- Since:
- 5.0
 
- 
getOutputProcessorReturn a configuredMessageGroupProcessor.- Returns:
- the configured MessageGroupProcessor
- Since:
- 5.2
 
- 
setDiscardChannel
- 
setDiscardChannelName
- 
setSendPartialResultOnExpirypublic void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) 
- 
setDiscardIndividuallyOnExpirypublic void setDiscardIndividuallyOnExpiry(boolean discardIndividuallyOnExpiry) Set tofalseto send to discard channel a whole expired group as a single message. This option makes sense only ifsendPartialResultOnExpiryis set tofalse(default). And also ifdiscardChannelis injected.- Parameters:
- discardIndividuallyOnExpiry- false to discard the whole group as one message.
- Since:
- 6.5
- See Also:
 
- 
setMinimumTimeoutForEmptyGroupspublic void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) By default, when a MessageGroupStoreReaper is configured to expire partial groups, empty groups are also removed. Empty groups exist after a group is released normally. This is to enable the detection and discarding of late-arriving messages. If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property. Empty groups will then not be removed from the MessageStore until they have not been modified for at least this number of milliseconds.- Parameters:
- minimumTimeoutForEmptyGroups- The minimum timeout.
 
- 
setReleasePartialSequencespublic void setReleasePartialSequences(boolean releasePartialSequences) SetreleasePartialSequenceson an underlying defaultSequenceSizeReleaseStrategy. Ignored for other release strategies.- Parameters:
- releasePartialSequences- true to allow release.
 
- 
setExpireGroupsUponTimeoutpublic void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) Expire (completely remove) a group if it is completed due to timeout. Default true- Parameters:
- expireGroupsUponTimeout- the expireGroupsUponTimeout to set
- Since:
- 4.1
 
- 
setPopSequencepublic void setPopSequence(boolean popSequence) Perform aBaseMessageBuilder.popSequenceDetails()for output message or not. Default to true. This option removes the sequence information added by the nearest upstream component withapplySequence=true(for example splitter).- Parameters:
- popSequence- the boolean flag to use.
- Since:
- 5.1
 
- 
isReleaseLockBeforeSendprotected boolean isReleaseLockBeforeSend()
- 
setReleaseLockBeforeSendpublic void setReleaseLockBeforeSend(boolean releaseLockBeforeSend) Set to true to release the message group lock before sending any output. See "Avoiding Deadlocks" in the Aggregator section of the reference manual for more information as to why this might be needed.- Parameters:
- releaseLockBeforeSend- true to release the lock.
- Since:
- 5.1.1
 
- 
setExpireTimeoutpublic void setExpireTimeout(long expireTimeout) Configure a timeout in milliseconds for purging old orphaned groups from the store. Used on startup and when anexpireDurationis provided, the task for runningpurgeOrphanedGroups()is scheduled with that period. TheforceReleaseProcessoris used to process those expired groups according the "force complete" options. A group can be orphaned if a persistent message group store is used and no new messages arrive for that group after a restart.- Parameters:
- expireTimeout- the number of milliseconds to determine old orphaned groups in the store to purge.
- Since:
- 5.4
- See Also:
 
- 
setExpireDurationMillispublic void setExpireDurationMillis(long expireDuration) Configure aDuration(in millis) how often to clean up old orphaned groups from the store.- Parameters:
- expireDuration- the delay how often to call- purgeOrphanedGroups().
- Since:
- 5.4
- See Also:
 
- 
setExpireDurationConfigure aDurationhow often to clean up old orphaned groups from the store.- Parameters:
- expireDuration- the delay how often to call- purgeOrphanedGroups().
- Since:
- 5.4
- See Also:
 
- 
setGroupConditionSupplierConfigure aBiFunctionto supply a group condition from a message to be added to the group. Thenullresult from the function will reset a condition set before.- Parameters:
- conditionSupplier- the function to supply a group condition from a message to be added to the group.
- Since:
- 5.5
- See Also:
 
- 
setApplicationEventPublisher- Specified by:
- setApplicationEventPublisherin interface- ApplicationEventPublisherAware
 
- 
onInitprotected void onInit()Description copied from class:IntegrationObjectSupportSubclasses may implement this for initialization logic.- Overrides:
- onInitin class- AbstractMessageProducingHandler
 
- 
getComponentTypeDescription copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
- getComponentTypein interface- NamedComponent
- Overrides:
- getComponentTypein class- MessageHandlerSupport
 
- 
getMessageStore
- 
getExpireGroupScheduledFutures
- 
getCorrelationStrategy
- 
getReleaseStrategy
- 
getGroupConditionSupplier
- 
getDiscardChannelDescription copied from interface:DiscardingMessageHandlerReturn the discard channel.- Specified by:
- getDiscardChannelin interface- DiscardingMessageHandler
- Returns:
- the channel.
 
- 
getDiscardChannelName
- 
isSendPartialResultOnExpiryprotected boolean isSendPartialResultOnExpiry()
- 
isSequenceAwareprotected boolean isSequenceAware()
- 
getLockRegistry
- 
isLockRegistrySetprotected boolean isLockRegistrySet()
- 
getMinimumTimeoutForEmptyGroupsprotected long getMinimumTimeoutForEmptyGroups()
- 
isReleasePartialSequencesprotected boolean isReleasePartialSequences()
- 
getGroupTimeoutExpression
- 
getEvaluationContext
- 
handleMessageInternal- Specified by:
- handleMessageInternalin class- AbstractMessageHandler
 
- 
isExpireGroupsUponCompletionprotected boolean isExpireGroupsUponCompletion()
- 
afterReleaseAllows you to provide additional logic that needs to be performed after the MessageGroup was released.- Parameters:
- group- The group.
- completedMessages- The completed messages.
 
- 
afterReleaseprotected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) Subclasses may override if special action is needed because the group was released or discarded due to a timeout. By default,afterRelease(MessageGroup, Collection)is invoked.- Parameters:
- group- The group.
- completedMessages- The completed messages.
- timeout- True if the release/discard was due to a timeout.
 
- 
forceComplete
- 
remove
- 
findLastReleasedSequenceNumberprotected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) 
- 
store
- 
expireGroup
- 
completeGroup
- 
completeGroupprotected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group, Lock lock) 
- 
verifyResultCollectionConsistsOfMessages@Deprecated(since="6.5", forRemoval=true) protected void verifyResultCollectionConsistsOfMessages(Collection<?> elements) Deprecated, for removal: This API element is subject to removal in a future version.without replacement - out of use from now on.Probably the method isprotectedby mistake.- Parameters:
- elements- the group processor result.
 
- 
obtainGroupTimeout
- 
destroypublic void destroy()- Specified by:
- destroyin interface- DisposableBean
- Specified by:
- destroyin interface- IntegrationManagement
- Overrides:
- destroyin class- MessageHandlerSupport
 
- 
startpublic void start()- Specified by:
- startin interface- Lifecycle
- Specified by:
- startin interface- ManageableLifecycle
 
- 
stoppublic void stop()- Specified by:
- stopin interface- Lifecycle
- Specified by:
- stopin interface- ManageableLifecycle
 
- 
isRunningpublic boolean isRunning()- Specified by:
- isRunningin interface- Lifecycle
- Specified by:
- isRunningin interface- ManageableLifecycle
 
- 
purgeOrphanedGroupspublic void purgeOrphanedGroups()Perform aMessageGroupStore.expireMessageGroups(long)with the providedexpireTimeout. Can be called externally at any time. Internally it is called from the scheduled task with the configuredexpireDuration.- Since:
- 5.4
 
 
-