Class ChunkMessageChannelItemWriter<T>

java.lang.Object
org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter<T>
All Implemented Interfaces:
StepExecutionListener, StepListener, StepContributionSource, ItemStream, ItemWriter<T>

public class ChunkMessageChannelItemWriter<T> extends Object implements StepExecutionListener, ItemWriter<T>, ItemStream, StepContributionSource
  • Field Details

    • DEFAULT_THROTTLE_LIMIT

      protected static final long DEFAULT_THROTTLE_LIMIT
      See Also:
    • messagingGateway

      protected org.springframework.integration.core.MessagingTemplate messagingGateway
    • localState

      protected final ChunkMessageChannelItemWriter.LocalState localState
    • throttleLimit

      protected long throttleLimit
    • DEFAULT_MAX_WAIT_TIMEOUTS

      protected final int DEFAULT_MAX_WAIT_TIMEOUTS
      See Also:
    • maxWaitTimeouts

      protected int maxWaitTimeouts
    • replyChannel

      protected org.springframework.messaging.PollableChannel replyChannel
  • Constructor Details

    • ChunkMessageChannelItemWriter

      public ChunkMessageChannelItemWriter()
  • Method Details

    • setMaxWaitTimeouts

      public void setMaxWaitTimeouts(int maxWaitTimeouts)
      The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40.
      Parameters:
      maxWaitTimeouts - the maximum number of wait timeouts
    • setThrottleLimit

      public void setThrottleLimit(long throttleLimit)
      Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid overwhelming the receivers.
      Parameters:
      throttleLimit - the throttle limit to set
    • setMessagingOperations

      public void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway)
    • setReplyChannel

      public void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel)
    • write

      public void write(Chunk<? extends T> items) throws Exception
      Description copied from interface: ItemWriter
      Process the supplied data element. Will not be called with any null items in normal operation.
      Specified by:
      write in interface ItemWriter<T>
      Parameters:
      items - of items to be written. Must not be null.
      Throws:
      Exception - if there are errors. The framework will catch the exception and convert or rethrow it as appropriate.
    • beforeStep

      public void beforeStep(StepExecution stepExecution)
      Description copied from interface: StepExecutionListener
      Initialize the state of the listener with the StepExecution from the current scope.
      Specified by:
      beforeStep in interface StepExecutionListener
      Parameters:
      stepExecution - instance of StepExecution.
    • afterStep

      @Nullable public ExitStatus afterStep(StepExecution stepExecution)
      Description copied from interface: StepExecutionListener
      Give a listener a chance to modify the exit status from a step. The value returned is combined with the normal exit status by using ExitStatus.and(ExitStatus).

      Called after execution of the step's processing logic (whether successful or failed). Throwing an exception in this method has no effect, as it is only logged.

      Specified by:
      afterStep in interface StepExecutionListener
      Parameters:
      stepExecution - a StepExecution instance.
      Returns:
      an ExitStatus to combine with the normal value. Return null (the default) to leave the old value unchanged.
    • close

      public void close() throws ItemStreamException
      Description copied from interface: ItemStream
      If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been called all other methods (except open) may throw an exception.
      Specified by:
      close in interface ItemStream
      Throws:
      ItemStreamException
    • open

      public void open(ExecutionContext executionContext) throws ItemStreamException
      Description copied from interface: ItemStream
      Open the stream for the provided ExecutionContext.
      Specified by:
      open in interface ItemStream
      Parameters:
      executionContext - current step's ExecutionContext. Will be the executionContext from the last run of the step on a restart.
      Throws:
      ItemStreamException
    • update

      public void update(ExecutionContext executionContext) throws ItemStreamException
      Description copied from interface: ItemStream
      Indicates that the execution context provided during open is about to be saved. If any state is remaining, but has not been put in the context, it should be added here.
      Specified by:
      update in interface ItemStream
      Parameters:
      executionContext - to be updated
      Throws:
      ItemStreamException
    • getStepContributions

      public Collection<StepContribution> getStepContributions()
      Description copied from interface: StepContributionSource
      Get the currently available contributions and drain the source. The next call would return an empty collection, unless new contributions have arrived.
      Specified by:
      getStepContributions in interface StepContributionSource
      Returns:
      a collection of StepContribution instances
    • waitForResults

      protected boolean waitForResults() throws AsynchronousFailureException
      Wait until all the results that are in the pipeline come back to the reply channel.
      Returns:
      true if successfully received a result, false if timed out
      Throws:
      AsynchronousFailureException
    • getNextResult

      protected void getNextResult() throws AsynchronousFailureException
      Get the next result if it is available (within the timeout specified in the gateway), otherwise do nothing.
      Throws:
      AsynchronousFailureException - If there is a response and it contains a failed chunk response.
      IllegalStateException - if the result contains the wrong job instance id (maybe we are sharing a channel and we shouldn't be)
    • wrapIfNecessary

      protected static AsynchronousFailureException wrapIfNecessary(Throwable throwable)
      Re-throws the original throwable if it is unchecked, wraps checked exceptions into AsynchronousFailureException.