Reading Files

A FileReadingMessageSource can be used to consume files from the filesystem. This is an implementation of MessageSource that creates messages from a file system directory. The following example shows how to configure a FileReadingMessageSource:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

To prevent creating messages for certain files, you can supply a FileListFilter. By default, we use the following filters:

  • IgnoreHiddenFileListFilter

  • AcceptOnceFileListFilter

The IgnoreHiddenFileListFilter ensures that hidden files are not processed. Note that the exact definition of hidden is system-dependent. For example, on UNIX-based systems, a file beginning with a period character is considered to be hidden. Microsoft Windows, on the other hand, has a dedicated file attribute to indicate hidden files.

Version 4.2 introduced the IgnoreHiddenFileListFilter. In prior versions, hidden files were included. With the default configuration, the IgnoreHiddenFileListFilter is triggered first, followed by the AcceptOnceFileListFilter.

The AcceptOnceFileListFilter ensures files are picked up only once from the directory.

The AcceptOnceFileListFilter stores its state in memory. If you wish the state to survive a system restart, you can use the FileSystemPersistentAcceptOnceFileListFilter. This filter stores the accepted file names in a MetadataStore implementation (see Metadata Store). This filter matches on the filename and modified time.

Since version 4.0, this filter requires a ConcurrentMetadataStore. When used with a shared data store (such as Redis with the RedisMetadataStore), it lets filter keys be shared across multiple application instances or across a network file share being used by multiple servers.

Since version 4.1.5, this filter has a new property (flushOnUpdate), which causes it to flush the metadata store on every update (if the store implements Flushable).

The persistent file list filters now have a boolean property forRecursion. Setting this property to true, also sets alwaysAcceptDirectories, which means that the recursive operation on the outbound gateways (ls and mget) will now always traverse the full directory tree each time. This is to solve a problem where changes deep in the directory tree were not detected. In addition, forRecursion=true causes the full path to files to be used as the metadata store keys; this solves a problem where the filter did not work properly if a file with the same name appears multiple times in different directories. IMPORTANT: This means that existing keys in a persistent metadata store will not be found for files beneath the top level directory. For this reason, the property is false by default; this may change in a future release.

The following example configures a FileReadingMessageSource with a filter:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

A common problem with reading files is that a file may be detected before it is ready (that is, some other process may still be writing the file). The default AcceptOnceFileListFilter does not prevent this. In most cases, this can be prevented if the file-writing process renames each file as soon as it is ready for reading. A filename-pattern or filename-regex filter that accepts only files that are ready (perhaps based on a known suffix), composed with the default AcceptOnceFileListFilter, allows for this situation. The CompositeFileListFilter enables the composition, as the following example shows:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

If it is not possible to create the file with a temporary name and rename to the final name, Spring Integration provides another alternative. Version 4.2 added the LastModifiedFileListFilter. This filter can be configured with an age property so that only files older than this value are passed by the filter. The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches). The following example shows how to configure a LastModifiedFileListFilter:

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

Starting with version 4.3.7, a ChainFileListFilter (an extension of CompositeFileListFilter) has been introduced to allow scenarios when subsequent filters should only see the result of the previous filter. (With the CompositeFileListFilter, all filters see all the files, but it passes only files that have passed all filters). An example of where the new behavior is required is a combination of LastModifiedFileListFilter and AcceptOnceFileListFilter, when we do not wish to accept the file until some amount of time has elapsed. With the CompositeFileListFilter, since the AcceptOnceFileListFilter sees all the files on the first pass, it does not pass it later when the other filter does. The CompositeFileListFilter approach is useful when a pattern filter is combined with a custom filter that looks for a secondary file to indicate that file transfer is complete. The pattern filter might only pass the primary file (such as something.txt) but the “done” filter needs to see whether (for example) something.done is present.

Say we have files a.txt, a.done, and b.txt.

The pattern filter passes only a.txt and b.txt, while the “done” filter sees all three files and passes only a.txt. The final result of the composite filter is that only a.txt is released.

With the ChainFileListFilter, if any filter in the chain returns an empty list, the remaining filters are not invoked.

Version 5.0 introduced an ExpressionFileListFilter to execute SpEL expression against a file as a context evaluation root object. For this purpose, all the XML components for file handling (local and remote), along with an existing filter attribute, have been supplied with the filter-expression option, as the following example shows:

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

Version 5.0.5 introduced the DiscardAwareFileListFilter implementations that have an interest in rejected files. For this purpose, such a filter implementation should be supplied with a callback through addDiscardCallback(Consumer<File>). In the framework, this functionality is used from the FileReadingMessageSource.WatchServiceDirectoryScanner, in combination with LastModifiedFileListFilter. Unlike the regular DirectoryScanner, the WatchService provides files for processing according to the events on the target file system. At the moment of polling an internal queue with those files, the LastModifiedFileListFilter may discard them because they are too young relative to its configured age. Therefore, we lose the file for future possible considerations. The discard callback hook lets us retain the file in the internal queue so that it is available to be checked against the age in subsequent polls. The CompositeFileListFilter also implements a DiscardAwareFileListFilter and populates a discard callback to all its DiscardAwareFileListFilter delegates.

Since CompositeFileListFilter matches the files against all delegates, the discardCallback may be called several times for the same file.

Starting with version 5.1, the FileReadingMessageSource doesn’t check a directory for existence and doesn’t create it until its start() is called (typically via wrapping SourcePollingChannelAdapter). Previously, there was no simple way to prevent an operation system permissions error when referencing the directory, for example from tests, or when permissions are applied later.

Message Headers

Starting with version 5.0, the FileReadingMessageSource (in addition to the payload as a polled File) populates the following headers to the outbound Message:

  • FileHeaders.FILENAME: The File.getName() of the file to send. Can be used for subsequent rename or copy logic.

  • FileHeaders.ORIGINAL_FILE: The File object itself. Typically, this header is populated automatically by framework components (such as splitters or transformers) when we lose the original File object. However, for consistency and convenience with any other custom use cases, this header can be useful to get access to the original file.

  • FileHeaders.RELATIVE_PATH: A new header introduced to represent the part of file path relative to the root directory for the scan. This header can be useful when the requirement is to restore a source directory hierarchy in the other places. For this purpose, the DefaultFileNameGenerator (see "`Generating File Names) can be configured to use this header.

Directory Scanning and Polling

The FileReadingMessageSource does not produce messages for files from the directory immediately. It uses an internal queue for 'eligible files' returned by the scanner. The scanEachPoll option is used to ensure that the internal queue is refreshed with the latest input directory content on each poll. By default (scanEachPoll = false), the FileReadingMessageSource empties its queue before scanning the directory again. This default behavior is particularly useful to reduce scans of large numbers of files in a directory. However, in cases where custom ordering is required, it is important to consider the effects of setting this flag to true. The order in which files are processed may not be as expected. By default, files in the queue are processed in their natural (path) order. New files added by a scan, even when the queue already has files, are inserted in the appropriate position to maintain that natural order. To customize the order, the FileReadingMessageSource can accept a Comparator<File> as a constructor argument. It is used by the internal (PriorityBlockingQueue) to reorder its content according to the business requirements. Therefore, to process files in a specific order, you should provide a comparator to the FileReadingMessageSource rather than ordering the list produced by a custom DirectoryScanner.

Version 5.0 introduced RecursiveDirectoryScanner to perform file tree visiting. The implementation is based on the Files.walk(Path start, int maxDepth, FileVisitOption…​ options) functionality. The root directory (DirectoryScanner.listFiles(File)) argument is excluded from the result. All other sub-directories inclusions and exclusions are based on the target FileListFilter implementation. For example, the SimplePatternFileListFilter filters out directories by default. See AbstractDirectoryAwareFileListFilter and its implementations for more information.

Starting with version 5.5, the FileInboundChannelAdapterSpec of the Java DSL has a convenient recursive(boolean) option to use a RecursiveDirectoryScanner in the target FileReadingMessageSource instead of the default one.

Namespace Support

The configuration for file reading can be simplified by using the file-specific namespace. To do so, use the following template:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

Within this namespace, you can reduce the FileReadingMessageSource and wrap it in an inbound Channel Adapter, as follows:

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

The first channel adapter example relies on the default FileListFilter implementations:

  • IgnoreHiddenFileListFilter (do not process hidden files)

  • AcceptOnceFileListFilter (prevent duplication)

Therefore, you can also leave off the prevent-duplicates and ignore-hidden attributes, as they are true by default.

Spring Integration 4.2 introduced the ignore-hidden attribute. In prior versions, hidden files were included.

The second channel adapter example uses a custom filter, the third uses the filename-pattern attribute to add an AntPathMatcher based filter, and the fourth uses the filename-regex attribute to add a regular expression pattern-based filter to the FileReadingMessageSource. The filename-pattern and filename-regex attributes are each mutually exclusive with the regular filter reference attribute. However, you can use the filter attribute to reference an instance of CompositeFileListFilter that combines any number of filters, including one or more pattern-based filters to fit your particular needs.

When multiple processes read from the same directory, you may want to lock files to prevent them from being picked up concurrently. To do so, you can use a FileLocker. There is a java.nio-based implementation available, but it is also possible to implement your own locking scheme. The nio locker can be injected as follows:

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

You can configure a custom locker as follows:

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
When a file inbound adapter is configured with a locker, it takes responsibility for acquiring a lock before the file is allowed to be received. It does not assume the responsibility to unlock the file. If you have processed the file and keep the locks hanging around, you have a memory leak. If this is a problem, you should call FileLocker.unlock(File file) yourself at the appropriate time.

When filtering and locking files is not enough, you might need to control the way files are listed entirely. To implement this type of requirement, you can use an implementation of DirectoryScanner. This scanner lets you determine exactly what files are listed in each poll. This is also the interface that Spring Integration uses internally to wire FileListFilter instances and FileLocker to the FileReadingMessageSource. You can inject a custom DirectoryScanner into the <int-file:inbound-channel-adapter/> on the scanner attribute, as the following example shows:

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

Doing so gives you full freedom to choose the ordering, listing, and locking strategies.

It is also important to understand that filters (including patterns, regex, prevent-duplicates, and others) and locker instances are actually used by the scanner. Any of these attributes set on the adapter are subsequently injected into the internal scanner. For the case of an external scanner, all filter and locker attributes are prohibited on the FileReadingMessageSource. They must be specified (if required) on that custom DirectoryScanner. In other words, if you inject a scanner into the FileReadingMessageSource, you should supply filter and locker on that scanner, not on the FileReadingMessageSource.

By default, the DefaultDirectoryScanner uses an IgnoreHiddenFileListFilter and an AcceptOnceFileListFilter. To prevent their use, you can configure your own filter (such as AcceptAllFileListFilter) or even set it to null.

WatchServiceDirectoryScanner

The FileReadingMessageSource.WatchServiceDirectoryScanner relies on file-system events when new files are added to the directory. During initialization, the directory is registered to generate events. The initial file list is also built during initialization. While walking the directory tree, any subdirectories encountered are also registered to generate events. On the first poll, the initial file list from walking the directory is returned. On subsequent polls, files from new creation events are returned. If a new subdirectory is added, its creation event is used to walk the new subtree to find existing files and register any new subdirectories found.

There is an issue with WatchKey when its internal events queue is not drained by the program as quickly as the directory modification events occur. If the queue size is exceeded, a StandardWatchEventKinds.OVERFLOW is emitted to indicate that some file system events may be lost. In this case, the root directory is re-scanned completely. To avoid duplicates, consider using an appropriate FileListFilter (such as the AcceptOnceFileListFilter) or removing files when processing is complete.

The WatchServiceDirectoryScanner can be enabled through the FileReadingMessageSource.use-watch-service option, which is mutually exclusive with the scanner option. An internal FileReadingMessageSource.WatchServiceDirectoryScanner instance is populated for the provided directory.

In addition, now the WatchService polling logic can track the StandardWatchEventKinds.ENTRY_MODIFY and StandardWatchEventKinds.ENTRY_DELETE.

If you need to track the modification of existing files as well as new files, you should implement the ENTRY_MODIFY events logic in the FileListFilter. Otherwise, the files from those events are treated the same way.

The ResettableFileListFilter implementations pick up the ENTRY_DELETE events. Consequently, their files are provided for the remove() operation. When this event is enabled, filters such as the AcceptOnceFileListFilter have the file removed. As a result, if a file with the same name appears, it passes the filter and is sent as a message.

For this purpose, the watch-events property (FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents)) has been introduced. (WatchEventType is a public inner enumeration in FileReadingMessageSource.) With such an option, we can use one downstream flow logic for new files and use some other logic for modified files. The following example shows how to configure different logic for create and modify events in the same directory:

It is worth mentioning that the ENTRY_DELETE event is involved in the rename operation of sub-directory of the watched directory. More specifically, ENTRY_DELETE event, which is related to the previous directory name, precedes ENTRY_CREATE event which notifies about the new (renamed) directory. On some operating systems (like Windows), the ENTRY_DELETE event has to be registered to deal with that situation. Otherwise, renaming watched sub-directory in the File Explorer could result in the new files not being detected in that sub-directory.

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

Starting with version 6.1, the FileReadingMessageSource exposes two new WatchService-related options:

  • watchMaxDepth - an argument for the Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor) API;

  • watchDirPredicate - a Predicate<Path> to test if a directory in the scanned tree should be walked and registered with the WatchService and the configured watch event kinds.

Limiting Memory Consumption

You can use a HeadDirectoryScanner to limit the number of files retained in memory. This can be useful when scanning large directories. With XML configuration, this is enabled by setting the queue-size property on the inbound channel adapter.

Prior to version 4.2, this setting was incompatible with the use of any other filters. Any other filters (including prevent-duplicates="true") overwrote the filter used to limit the size.

The use of a HeadDirectoryScanner is incompatible with an AcceptOnceFileListFilter. Since all filters are consulted during the poll decision, the AcceptOnceFileListFilter does not know that other filters might be temporarily filtering files. Even if files that were previously filtered by the HeadDirectoryScanner.HeadFilter are now available, the AcceptOnceFileListFilter filters them.

Generally, instead of using an AcceptOnceFileListFilter in this case, you should remove the processed files so that the previously filtered files are available on a future poll.

Configuring with Java Configuration

The following Spring Boot application shows an example of how to configure the outbound adapter with Java configuration:

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

Configuring with the Java DSL

The following Spring Boot application shows an example of how to configure the outbound adapter with the Java DSL:

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

'tail’ing Files

Another popular use case is to get 'lines' from the end (or tail) of a file, capturing new lines when they are added. Two implementations are provided. The first, OSDelegatingFileTailingMessageProducer, uses the native tail command (on operating systems that have one). This is generally the most efficient implementation on those platforms. For operating systems that do not have a tail command, the second implementation, ApacheCommonsFileTailingMessageProducer, uses the Apache commons-io Tailer class.

In both cases, file system events, such as files being unavailable and other events, are published as ApplicationEvent instances by using the normal Spring event publishing mechanism. Examples of such events include the following:

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

The sequence of events shown in the preceding example might occur, for example, when a file is rotated.

Starting with version 5.0, a FileTailingIdleEvent is emitted when there is no data in the file during idleEventInterval. The following example shows what such an event looks like:

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
Not all platforms that support a tail command provide these status messages.

Messages emitted from these endpoints have the following headers:

  • FileHeaders.ORIGINAL_FILE: The File object

  • FileHeaders.FILENAME: The file name (File.getName())

In versions prior to version 5.0, the FileHeaders.FILENAME header contained a string representation of the file’s absolute path. You can now obtain that string representation by calling getAbsolutePath() on the original file header.

The following example creates a native adapter with the default options ('-F -n 0', meaning to follow the file name from the current end).

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

The following example creates a native adapter with '-F -n +0' options (meaning follow the file name, emitting all existing lines).

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

If the tail command fails (on some platforms, a missing file causes the tail to fail, even with -F specified), the command is retried every 10 seconds.

By default, native adapters capture from standard output and send the content as messages. They also capture from standard error to raise events. Starting with version 4.3.6, you can discard the standard error events by setting the enable-status-reader to false, as the following example shows:

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

In the following example, IdleEventInterval is set to 5000, meaning that, if no lines are written for five seconds, FileTailingIdleEvent is triggered every five seconds:

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

This can be useful when you need to stop the adapter.

The following example creates an Apache commons-io Tailer adapter that examines the file for new lines every two seconds and checks for existence of a missing file every ten seconds:

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 The file is tailed from the beginning (end="false") instead of the end (which is the default).
2 The file is reopened for each chunk (the default is to keep the file open).
Specifying the delay, end or reopen attributes forces the use of the Apache commons-io adapter and makes the native-options attribute unavailable.

Dealing With Incomplete Data

A common problem in file-transfer scenarios is how to determine that the transfer is complete so that you do not start reading an incomplete file. A common technique to solve this problem is to write the file with a temporary name and then atomically rename it to the final name. This technique, together with a filter that masks the temporary file from being picked up by the consumer, provides a robust solution. This technique is used by Spring Integration components that write files (locally or remotely). By default, they append .writing to the file name and remove it when the transfer is complete.

Another common technique is to write a second “marker” file to indicate that the file transfer is complete. In this scenario, you should not consider somefile.txt (for example) to be available for use until somefile.txt.complete is also present. Spring Integration version 5.0 introduced new filters to support this mechanism. Implementations are provided for the file system (FileSystemMarkerFilePresentFileListFilter), FTP and SFTP. They are configurable such that the marker file can have any name, although it is usually related to the file being transferred. See the Javadoc for more information.