| 1 | /* |
| 2 | * Copyright 2006-2007 the original author or authors. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.springframework.batch.item.file; |
| 18 | |
| 19 | import org.apache.commons.logging.Log; |
| 20 | import org.apache.commons.logging.LogFactory; |
| 21 | import org.springframework.batch.item.ExecutionContext; |
| 22 | import org.springframework.batch.item.ExecutionContextUserSupport; |
| 23 | import org.springframework.batch.item.ItemReader; |
| 24 | import org.springframework.batch.item.ItemReaderException; |
| 25 | import org.springframework.batch.item.ItemStream; |
| 26 | import org.springframework.batch.item.ItemStreamException; |
| 27 | import org.springframework.batch.item.ReaderNotOpenException; |
| 28 | import org.springframework.batch.item.file.mapping.FieldSet; |
| 29 | import org.springframework.batch.item.file.mapping.FieldSetMapper; |
| 30 | import org.springframework.batch.item.file.separator.LineReader; |
| 31 | import org.springframework.batch.item.file.separator.RecordSeparatorPolicy; |
| 32 | import org.springframework.batch.item.file.separator.ResourceLineReader; |
| 33 | import org.springframework.batch.item.file.transform.AbstractLineTokenizer; |
| 34 | import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; |
| 35 | import org.springframework.batch.item.file.transform.LineTokenizer; |
| 36 | import org.springframework.beans.factory.InitializingBean; |
| 37 | import org.springframework.core.io.Resource; |
| 38 | import org.springframework.util.Assert; |
| 39 | import org.springframework.util.ClassUtils; |
| 40 | |
| 41 | /** |
| 42 | * This class represents a {@link ItemReader}, that reads lines from text file, |
| 43 | * tokenizes them to structured tuples ({@link FieldSet}s) instances and maps |
| 44 | * the {@link FieldSet}s to domain objects. The location of the file is defined |
| 45 | * by the resource property. To separate the structure of the file, |
| 46 | * {@link LineTokenizer} is used to parse data obtained from the file. <br/> |
| 47 | * |
| 48 | * A {@link FlatFileItemReader} is not thread safe because it maintains state in |
| 49 | * the form of a {@link ResourceLineReader}. Be careful to configure a |
| 50 | * {@link FlatFileItemReader} using an appropriate factory or scope so that it |
| 51 | * is not shared between threads.<br/> |
| 52 | * |
| 53 | * <p> |
| 54 | * This class supports restart, skipping invalid lines and storing statistics. |
| 55 | * It can be configured to setup {@link FieldSet} column names from the file |
| 56 | * header, skip given number of lines at the beginning of the file. |
| 57 | * </p> |
| 58 | * |
| 59 | * @author Waseem Malik |
| 60 | * @author Tomas Slanina |
| 61 | * @author Robert Kasanicky |
| 62 | * @author Dave Syer |
| 63 | */ |
| 64 | public class FlatFileItemReader extends ExecutionContextUserSupport implements ItemReader, ItemStream, InitializingBean { |
| 65 | |
| 66 | private static Log log = LogFactory.getLog(FlatFileItemReader.class); |
| 67 | |
| 68 | private static final String LINES_READ_COUNT = "lines.read.count"; |
| 69 | |
| 70 | // default encoding for input files |
| 71 | public static final String DEFAULT_CHARSET = "ISO-8859-1"; |
| 72 | |
| 73 | private String encoding = DEFAULT_CHARSET; |
| 74 | |
| 75 | private Resource resource; |
| 76 | |
| 77 | private RecordSeparatorPolicy recordSeparatorPolicy; |
| 78 | |
| 79 | private String[] comments; |
| 80 | |
| 81 | private int linesToSkip = 0; |
| 82 | |
| 83 | private boolean firstLineIsHeader = false; |
| 84 | |
| 85 | private LineTokenizer tokenizer = new DelimitedLineTokenizer(); |
| 86 | |
| 87 | private FieldSetMapper fieldSetMapper; |
| 88 | |
| 89 | private boolean saveState = false; |
| 90 | |
| 91 | /** |
| 92 | * Encapsulates the state of the input source. If it is null then we are |
| 93 | * uninitialized. |
| 94 | */ |
| 95 | private LineReader reader; |
| 96 | |
| 97 | public FlatFileItemReader() { |
| 98 | setName(ClassUtils.getShortName(FlatFileItemReader.class)); |
| 99 | } |
| 100 | |
| 101 | /** |
| 102 | * Initialize the reader if necessary. |
| 103 | * |
| 104 | * @throws IllegalStateException if the resource cannot be opened |
| 105 | */ |
| 106 | public void open(ExecutionContext executionContext) throws ItemStreamException { |
| 107 | |
| 108 | Assert.state(resource.exists(), "Resource must exist: [" + resource + "]"); |
| 109 | |
| 110 | log.debug("Opening flat file for reading: " + resource); |
| 111 | |
| 112 | if (this.reader == null) { |
| 113 | ResourceLineReader reader = new ResourceLineReader(resource, encoding); |
| 114 | if (recordSeparatorPolicy != null) { |
| 115 | reader.setRecordSeparatorPolicy(recordSeparatorPolicy); |
| 116 | } |
| 117 | if (comments != null) { |
| 118 | reader.setComments(comments); |
| 119 | } |
| 120 | reader.open(); |
| 121 | this.reader = reader; |
| 122 | } |
| 123 | |
| 124 | for (int i = 0; i < linesToSkip; i++) { |
| 125 | readLine(); |
| 126 | } |
| 127 | |
| 128 | if (firstLineIsHeader) { |
| 129 | // skip the header |
| 130 | String firstLine = readLine(); |
| 131 | // set names in tokenizer if they haven't been set already |
| 132 | if (tokenizer instanceof AbstractLineTokenizer && !((AbstractLineTokenizer) tokenizer).hasNames()) { |
| 133 | String[] names = tokenizer.tokenize(firstLine).getValues(); |
| 134 | ((AbstractLineTokenizer) tokenizer).setNames(names); |
| 135 | } |
| 136 | } |
| 137 | |
| 138 | if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { |
| 139 | log.debug("Initializing for restart. Restart data is: " + executionContext); |
| 140 | |
| 141 | long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); |
| 142 | |
| 143 | LineReader reader = getReader(); |
| 144 | |
| 145 | Object record = ""; |
| 146 | while (reader.getPosition() < lineCount && record != null) { |
| 147 | record = readLine(); |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | } |
| 152 | |
| 153 | /** |
| 154 | * Close and null out the reader. |
| 155 | * |
| 156 | * @throws ItemStreamException |
| 157 | */ |
| 158 | public void close(ExecutionContext executionContext) throws ItemStreamException { |
| 159 | try { |
| 160 | if (reader != null) { |
| 161 | log.debug("Closing flat file for reading: " + resource); |
| 162 | reader.close(null); |
| 163 | } |
| 164 | } |
| 165 | finally { |
| 166 | reader = null; |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | /** |
| 171 | * Reads a line from input, tokenizes is it using the |
| 172 | * {@link #setLineTokenizer(LineTokenizer)} and maps to domain object using |
| 173 | * {@link #setFieldSetMapper(FieldSetMapper)}. |
| 174 | * |
| 175 | * @see org.springframework.batch.item.ItemReader#read() |
| 176 | */ |
| 177 | public Object read() throws Exception { |
| 178 | String line = readLine(); |
| 179 | |
| 180 | if (line != null) { |
| 181 | try { |
| 182 | FieldSet tokenizedLine = tokenizer.tokenize(line); |
| 183 | return fieldSetMapper.mapLine(tokenizedLine); |
| 184 | } |
| 185 | catch (RuntimeException ex) { |
| 186 | // add current line count to message and re-throw |
| 187 | int lineCount = getReader().getPosition(); |
| 188 | throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=" |
| 189 | + resource.getDescription() + ", input=[" + line + "]", ex, line, lineCount); |
| 190 | } |
| 191 | } |
| 192 | return null; |
| 193 | } |
| 194 | |
| 195 | /** |
| 196 | * This method returns the execution attributes for the reader. It returns |
| 197 | * the current Line Count which can be used to reinitialise the batch job in |
| 198 | * case of restart. |
| 199 | */ |
| 200 | public void update(ExecutionContext executionContext) { |
| 201 | if (reader == null) { |
| 202 | throw new ItemStreamException("ItemStream not open or already closed."); |
| 203 | } |
| 204 | |
| 205 | if (saveState) { |
| 206 | Assert.notNull(executionContext, "ExecutionContext must not be null"); |
| 207 | executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition()); |
| 208 | } |
| 209 | } |
| 210 | |
| 211 | /** |
| 212 | * Mark is supported as long as this {@link ItemStream} is used in a |
| 213 | * single-threaded environment. The state backing the mark is a single |
| 214 | * counter, keeping track of the current position, so multiple threads |
| 215 | * cannot be accommodated. |
| 216 | * |
| 217 | * @see org.springframework.batch.item.ItemReader#mark() |
| 218 | */ |
| 219 | public void mark() { |
| 220 | getReader().mark(); |
| 221 | } |
| 222 | |
| 223 | /* |
| 224 | * (non-Javadoc) |
| 225 | * |
| 226 | * @see org.springframework.batch.item.ItemStream#reset(org.springframework.batch.item.ExecutionContext) |
| 227 | */ |
| 228 | public void reset() { |
| 229 | getReader().reset(); |
| 230 | } |
| 231 | |
| 232 | /** |
| 233 | * @return next line to be tokenized and mapped. |
| 234 | */ |
| 235 | private String readLine() { |
| 236 | try { |
| 237 | return (String) getReader().read(); |
| 238 | } |
| 239 | catch (ItemStreamException e) { |
| 240 | throw e; |
| 241 | } |
| 242 | catch (ItemReaderException e) { |
| 243 | throw e; |
| 244 | } |
| 245 | catch (Exception e) { |
| 246 | throw new IllegalStateException(); |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | /** |
| 251 | * @return line reader used to read input file |
| 252 | */ |
| 253 | protected LineReader getReader() { |
| 254 | if (reader == null) { |
| 255 | throw new ReaderNotOpenException("Reader must be open before it can be read."); |
| 256 | // reader is now not null, or else an exception is thrown |
| 257 | } |
| 258 | return reader; |
| 259 | } |
| 260 | |
| 261 | /** |
| 262 | * Setter for resource property. The location of an input stream that can be |
| 263 | * read. |
| 264 | * |
| 265 | * @param resource |
| 266 | */ |
| 267 | public void setResource(Resource resource) { |
| 268 | this.resource = resource; |
| 269 | } |
| 270 | |
| 271 | /** |
| 272 | * Public setter for the recordSeparatorPolicy. Used to determine where the |
| 273 | * line endings are and do things like continue over a line ending if inside |
| 274 | * a quoted string. |
| 275 | * |
| 276 | * @param recordSeparatorPolicy the recordSeparatorPolicy to set |
| 277 | */ |
| 278 | public void setRecordSeparatorPolicy(RecordSeparatorPolicy recordSeparatorPolicy) { |
| 279 | this.recordSeparatorPolicy = recordSeparatorPolicy; |
| 280 | } |
| 281 | |
| 282 | /** |
| 283 | * Setter for comment prefixes. Can be used to ignore header lines as well |
| 284 | * by using e.g. the first couple of column names as a prefix. |
| 285 | * |
| 286 | * @param comments an array of comment line prefixes. |
| 287 | */ |
| 288 | public void setComments(String[] comments) { |
| 289 | this.comments = new String[comments.length]; |
| 290 | System.arraycopy(comments, 0, this.comments, 0, comments.length); |
| 291 | } |
| 292 | |
| 293 | /** |
| 294 | * Indicates whether first line is a header. If the tokenizer is an |
| 295 | * {@link AbstractLineTokenizer} and the column names haven't been set |
| 296 | * already then the header will be used to setup column names. Default is |
| 297 | * <code>false</code>. |
| 298 | */ |
| 299 | public void setFirstLineIsHeader(boolean firstLineIsHeader) { |
| 300 | this.firstLineIsHeader = firstLineIsHeader; |
| 301 | } |
| 302 | |
| 303 | /** |
| 304 | * @param lineTokenizer tokenizes each line from file into {@link FieldSet}. |
| 305 | */ |
| 306 | public void setLineTokenizer(LineTokenizer lineTokenizer) { |
| 307 | this.tokenizer = lineTokenizer; |
| 308 | } |
| 309 | |
| 310 | /** |
| 311 | * Set the FieldSetMapper to be used for each line. |
| 312 | * |
| 313 | * @param fieldSetMapper |
| 314 | */ |
| 315 | public void setFieldSetMapper(FieldSetMapper fieldSetMapper) { |
| 316 | this.fieldSetMapper = fieldSetMapper; |
| 317 | } |
| 318 | |
| 319 | /** |
| 320 | * Public setter for the number of lines to skip at the start of a file. Can |
| 321 | * be used if the file contains a header without useful (column name) |
| 322 | * information, and without a comment delimiter at the beginning of the |
| 323 | * lines. |
| 324 | * |
| 325 | * @param linesToSkip the number of lines to skip |
| 326 | */ |
| 327 | public void setLinesToSkip(int linesToSkip) { |
| 328 | this.linesToSkip = linesToSkip; |
| 329 | } |
| 330 | |
| 331 | /** |
| 332 | * Setter for the encoding for this input source. Default value is |
| 333 | * {@link #DEFAULT_CHARSET}. |
| 334 | * |
| 335 | * @param encoding a properties object which possibly contains the encoding |
| 336 | * for this input file; |
| 337 | */ |
| 338 | public void setEncoding(String encoding) { |
| 339 | this.encoding = encoding; |
| 340 | } |
| 341 | |
| 342 | public void afterPropertiesSet() throws Exception { |
| 343 | Assert.notNull(resource, "Input resource must not be null"); |
| 344 | Assert.notNull(fieldSetMapper, "FieldSetMapper must not be null."); |
| 345 | } |
| 346 | |
| 347 | /** |
| 348 | * Set the boolean indicating whether or not state should be saved in the |
| 349 | * provided {@link ExecutionContext} during the {@link ItemStream} call to |
| 350 | * update. Setting this to false means that it will always start at the |
| 351 | * beginning. |
| 352 | * |
| 353 | * @param saveState |
| 354 | */ |
| 355 | public void setSaveState(boolean saveState) { |
| 356 | this.saveState = saveState; |
| 357 | } |
| 358 | |
| 359 | } |