This post is part of a series delving into the details of the JSR-352 (Java Batch) specification. Each post examines a very specific part of the specification and looks at how it works and how you might use it in a real batch application.To start at the beginning, follow the link to the first post.The next post in the series is
here.
This series is also available as a podcast on iTunes, Google Play, Stitcher, or use the link to the RSS feed.
-----
The Collector/Analyzer pair can be used to communicate between the concurrent threads of a partition and the main step thread. The usual example is that the partitions are counting something, and purpose of the step is to get a total count. Each partition needs to report its results to the main step thread which will add up the results as the partitions complete.
The Collector runs on the partition thread, but you need to figure out a way for the partition processing to communicate with it. An easy way is to use the StepContext’s Transient User Data. If the partitioned step is a batchlet, the Collector will get control when the batchlet finishes. If the partitioned step is chunking then the Collector will get control after each chunk. Note that the Collector gets control after the transaction commits (as opposed to the Chunk Listener’s afterChunk method which gets control after the Writer but before the transaction commits).
For our example we’ll just use a batchlet. We’ve re-used the SamplePartitionMapper from earlier to provide a list of .txt files found in an input directory as a property to each partition. As before, our batchlet (UserDataPartitionedBatchlet) just logs the file name it received, but it also sets that file name into a String set into the Transient User Data for the step. The batchlet also sets an Exit Status for the partition that includes the thread identifier (just to have something probably different in there from partition to partition – although it could repeat if you have a lot of partitions).
Having information in the Transient User Data allows our Collector (UserDataCollector) to get to it and set it as the returned value from the collectPartitionData method. This has to be a Serializable value so you may need to create a separate Java class to contain the results. In our case it is just a String so nothing special is needed.
Our Analyzer (UserDataAnalyzer) gets the Batch Status and Exit Status value from each partition in the analyzeStatus method. Our Exit Status values contain the thread id of the thread used to run each partition so there should at least be some variety in the values. The analyzeCollectorData method will see all the data returned by the Collectors running for each partition. In our example it will contain the file name processed by each partition.
An important thing to remember about all this processing is that it is not transactional or hardened in any way. If the job fails or some other bad thing happens after a chunk or a batchlet completes there is no guarantee the Collector will run or that data returned by a Collector will ever be seen by the Analyzer. Any in-flight information is lost and a restart of the job won’t recover it.
As usual, the code is all at https://github.com/follisd/batch-samples.