SFTP Streaming Inbound Channel Adapter

Version 4.3 introduced the streaming inbound channel adapter. This adapter produces message with payloads of type InputStream, letting you fetch files without writing to the local file system. Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. The session is provided in the closeableResource header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). Standard framework components, such as the FileSplitter and StreamTransformer, automatically close the session. See File Splitter and Stream Transformer for more information about these components. The following example shows how to configure an SFTP streaming inbound channel adapter:

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

You can use only one of filename-pattern, filename-regex, filter, or filter-expression.

Starting with version 5.0, by default, the SftpStreamingMessageSource adapter prevents duplicates for remote files by using SftpPersistentAcceptOnceFileListFilter based on the in-memory SimpleMetadataStore. By default, this filter is also applied together with the filename pattern (or regex) as well. If you need to allow duplicates, you can use the AcceptAllFileListFilter. You can handle any other use cases by using CompositeFileListFilter (or ChainFileListFilter). The Java configuration shown later shows one technique to remove the remote file after processing, avoiding duplicates.

For more information about the SftpPersistentAcceptOnceFileListFilter, and how it is used, see Remote Persistent File List Filters.

You can use the max-fetch-size attribute to limit the number of files fetched on each poll when a fetch is necessary. Set it to 1 and use a persistent filter when running in a clustered environment. See Inbound Channel Adapters: Controlling Remote File Fetching for more information.

The adapter puts the remote directory and the file name in headers (FileHeaders.REMOTE_DIRECTORY and FileHeaders.REMOTE_FILE, respectively). Starting with version 5.0, the FileHeaders.REMOTE_FILE_INFO header provides additional remote file information (in JSON). If you set the fileInfoJson property on the SftpStreamingMessageSource to false, the header contains an SftpFileInfo object. You can access the SftpClient.DirEntry object provided by the underlying SftpClient by using the SftpFileInfo.getFileInfo() method. The fileInfoJson property is not available when you use XML configuration, but you can set it by injecting the SftpStreamingMessageSource into one of your configuration classes. See also Remote File Information.

Configuring with Java Configuration

The following Spring Boot application shows an example of how to configure the inbound adapter with Java:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

Notice that, in this example, the message handler downstream of the transformer has an advice that removes the remote file after processing.