pylucene 3.5.0-3
[pylucene.git] / lucene-java-3.5.0 / lucene / contrib / facet / src / test / org / apache / lucene / facet / search / TestTotalFacetCountsCache.java
1 package org.apache.lucene.facet.search;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.List;
7
8 import org.apache.lucene.analysis.MockAnalyzer;
9 import org.apache.lucene.analysis.MockTokenizer;
10 import org.apache.lucene.document.Document;
11 import org.apache.lucene.index.CorruptIndexException;
12 import org.apache.lucene.index.IndexReader;
13 import org.apache.lucene.index.IndexWriter;
14 import org.apache.lucene.index.IndexWriterConfig;
15 import org.apache.lucene.store.Directory;
16 import org.apache.lucene.store.MockDirectoryWrapper;
17 import org.junit.Before;
18 import org.junit.Test;
19
20 import org.apache.lucene.util.LuceneTestCase;
21 import org.apache.lucene.facet.FacetTestUtils;
22 import org.apache.lucene.facet.FacetTestUtils.IndexTaxonomyReaderPair;
23 import org.apache.lucene.facet.FacetTestUtils.IndexTaxonomyWriterPair;
24 import org.apache.lucene.facet.example.ExampleResult;
25 import org.apache.lucene.facet.example.TestMultiCLExample;
26 import org.apache.lucene.facet.example.multiCL.MultiCLIndexer;
27 import org.apache.lucene.facet.example.multiCL.MultiCLSearcher;
28 import org.apache.lucene.facet.index.CategoryDocumentBuilder;
29 import org.apache.lucene.facet.index.params.DefaultFacetIndexingParams;
30 import org.apache.lucene.facet.index.params.FacetIndexingParams;
31 import org.apache.lucene.facet.search.TotalFacetCounts.CreationType;
32 import org.apache.lucene.facet.search.results.FacetResult;
33 import org.apache.lucene.facet.search.results.FacetResultNode;
34 import org.apache.lucene.facet.taxonomy.CategoryPath;
35 import org.apache.lucene.facet.taxonomy.TaxonomyReader;
36 import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
37 import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader;
38 import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
39 import org.apache.lucene.util.IOUtils;
40 import org.apache.lucene.util.SlowRAMDirectory;
41 import org.apache.lucene.util._TestUtil;
42
43 /**
44  * Licensed to the Apache Software Foundation (ASF) under one or more
45  * contributor license agreements.  See the NOTICE file distributed with
46  * this work for additional information regarding copyright ownership.
47  * The ASF licenses this file to You under the Apache License, Version 2.0
48  * (the "License"); you may not use this file except in compliance with
49  * the License.  You may obtain a copy of the License at
50  *
51  *     http://www.apache.org/licenses/LICENSE-2.0
52  *
53  * Unless required by applicable law or agreed to in writing, software
54  * distributed under the License is distributed on an "AS IS" BASIS,
55  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
56  * See the License for the specific language governing permissions and
57  * limitations under the License.
58  */
59
60 public class TestTotalFacetCountsCache extends LuceneTestCase {
61
62   static final TotalFacetCountsCache TFC = TotalFacetCountsCache.getSingleton();
63
64   /**
65    * Thread class to be used in tests for this method. This thread gets a TFC
66    * and records times.
67    */
68   private static class TFCThread extends Thread {
69     private final IndexReader r;
70     private final DirectoryTaxonomyReader tr;
71     private final FacetIndexingParams iParams;
72     
73     TotalFacetCounts tfc;
74
75     public TFCThread(IndexReader r, DirectoryTaxonomyReader tr, FacetIndexingParams iParams) {
76       this.r = r;
77       this.tr = tr;
78       this.iParams = iParams;
79     }
80     @Override
81     public void run() {
82       try {
83         tfc = TFC.getTotalCounts(r, tr, iParams, null);
84       } catch (Exception e) {
85         throw new RuntimeException(e);
86       }
87     }
88   }
89
90   /** Utility method to add a document and facets to an index/taxonomy. */
91   static void addFacets(FacetIndexingParams iParams, IndexWriter iw,
92                         TaxonomyWriter tw, String... strings) throws IOException {
93     ArrayList<CategoryPath> cps = new ArrayList<CategoryPath>();
94     cps.add(new CategoryPath(strings));
95     CategoryDocumentBuilder builder = new CategoryDocumentBuilder(tw, iParams);
96     iw.addDocument(builder.setCategoryPaths(cps).build(new Document()));
97   }
98
99   /** Clears the cache and sets its size to one. */
100   static void initCache() {
101     TFC.clear();
102     TFC.setCacheSize(1); // Set to keep one in memory
103   }
104
105   @Override
106   @Before
107   public void setUp() throws Exception {
108     super.setUp();
109     initCache();
110   }
111
112   /** runs a few instances of {@link MultiCLSearcher} in parallel */
113   public void testGeneralSynchronization() throws Exception {
114     int numIters = atLeast(2);
115     for (int i = 0; i < numIters; i++) {
116       doTestGeneralSynchronization(_TestUtil.nextInt(random, 2, 4),
117                                   random.nextBoolean() ? -1 : _TestUtil.nextInt(random, 1, 10),
118                                   _TestUtil.nextInt(random, 0, 3));
119     }
120   }
121
122   /**
123    * Run many instances of {@link MultiCLSearcher} in parallel, results should
124    * be sane. Each instance has a random delay for reading bytes, to ensure
125    * that threads finish in different order than started.
126    */
127   @Test @Nightly
128   public void testGeneralSynchronizationBig() throws Exception {
129     int[] numThreads = new int[] { 2, 3, 5, 8 };
130     int[] sleepMillis = new int[] { -1, 1, 20, 33 };
131     int[] cacheSize = new int[] { 0,1,2,3,5 };
132     for (int size : cacheSize) {
133       for (int sleep : sleepMillis) {
134         for (int nThreads : numThreads) {
135           doTestGeneralSynchronization(nThreads, sleep, size);
136         }
137       }
138     }
139   }
140
141   private void doTestGeneralSynchronization(int numThreads, int sleepMillis,
142       int cacheSize) throws Exception, CorruptIndexException, IOException,
143       InterruptedException {
144     TFC.setCacheSize(cacheSize);
145     SlowRAMDirectory slowIndexDir = new SlowRAMDirectory(-1, random);
146     MockDirectoryWrapper indexDir = new MockDirectoryWrapper(random, slowIndexDir);
147     SlowRAMDirectory slowTaxoDir = new SlowRAMDirectory(-1, random);
148     MockDirectoryWrapper taxoDir = new MockDirectoryWrapper(random, slowTaxoDir);
149     
150
151     // Index documents without the "slowness"
152     MultiCLIndexer.index(indexDir, taxoDir);
153
154     slowIndexDir.setSleepMillis(sleepMillis);
155     slowTaxoDir.setSleepMillis(sleepMillis);
156     
157     // Open the slow readers
158     IndexReader slowIndexReader = IndexReader.open(indexDir);
159     TaxonomyReader slowTaxoReader = new DirectoryTaxonomyReader(taxoDir);
160
161     // Class to perform search and return results as threads
162     class Multi extends Thread {
163       private List<FacetResult> results;
164       private FacetIndexingParams iParams;
165       private IndexReader indexReader;
166       private TaxonomyReader taxoReader;
167
168       public Multi(IndexReader indexReader, TaxonomyReader taxoReader,
169                     FacetIndexingParams iParams) {
170         this.indexReader = indexReader;
171         this.taxoReader = taxoReader;
172         this.iParams = iParams;
173       }
174
175       public ExampleResult getResults() {
176         ExampleResult exampleRes = new ExampleResult();
177         exampleRes.setFacetResults(results);
178         return exampleRes;
179       }
180
181       @Override
182       public void run() {
183         try {
184           results = MultiCLSearcher.searchWithFacets(indexReader, taxoReader, iParams);
185         } catch (Exception e) {
186           throw new RuntimeException(e);
187         }
188       }
189     }
190
191     // Instantiate threads, but do not start them
192     Multi[] multis = new Multi[numThreads];
193     for (int i = 0; i < numThreads - 1; i++) {
194       multis[i] = new Multi(slowIndexReader, slowTaxoReader, MultiCLIndexer.MULTI_IPARAMS);
195     }
196     // The last thread uses ONLY the DefaultFacetIndexingParams so that
197     // it references a different TFC cache. This will still result
198     // in valid results, but will only search one of the category lists
199     // instead of all of them.
200     multis[numThreads - 1] = new Multi(slowIndexReader, slowTaxoReader, new DefaultFacetIndexingParams());
201
202     // Gentleman, start your engines
203     for (Multi m : multis) {
204       m.start();
205     }
206
207     // Wait for threads and get results
208     ExampleResult[] multiResults = new ExampleResult[numThreads];
209     for (int i = 0; i < numThreads; i++) {
210       multis[i].join();
211       multiResults[i] = multis[i].getResults();
212     }
213
214     // Each of the (numThreads-1) should have the same predictable
215     // results, which we test for here.
216     for (int i = 0; i < numThreads - 1; i++) {
217       ExampleResult eResults = multiResults[i];
218       TestMultiCLExample.assertCorrectMultiResults(eResults);
219     }
220
221     // The last thread, which only searched over the
222     // DefaultFacetIndexingParams,
223     // has its own results
224     ExampleResult eResults = multiResults[numThreads - 1];
225     List<FacetResult> results = eResults.getFacetResults();
226     assertEquals(3, results.size());
227     String[] expLabels = new String[] { "5", "5/5", "6/2" };
228     double[] expValues = new double[] { 0.0, 0.0, 1.0 };
229     for (int i = 0; i < 3; i++) {
230       FacetResult result = results.get(i);
231       assertNotNull("Result should not be null", result);
232       FacetResultNode resNode = result.getFacetResultNode();
233       assertEquals("Invalid label", expLabels[i], resNode.getLabel().toString());
234       assertEquals("Invalid value", expValues[i], resNode.getValue(), 0.0);
235       assertEquals("Invalid number of subresults", 0, resNode.getNumSubResults());
236     }
237     // we're done, close the index reader and the taxonomy.
238     slowIndexReader.close();
239     slowTaxoReader.close();
240     indexDir.close();
241     taxoDir.close();
242   }
243
244   /**
245    * Simple test to make sure the TotalFacetCountsManager updates the
246    * TotalFacetCounts array only when it is supposed to, and whether it
247    * is recomputed or read from disk.
248    */
249   @Test
250   public void testGenerationalConsistency() throws Exception {
251     // Create temporary RAMDirectories
252     Directory[][] dirs = FacetTestUtils.createIndexTaxonomyDirs(1);
253
254     // Create our index/taxonomy writers
255     IndexTaxonomyWriterPair[] writers = FacetTestUtils.createIndexTaxonomyWriterPair(dirs);
256     DefaultFacetIndexingParams iParams = new DefaultFacetIndexingParams();
257
258     // Add a facet to the index
259     addFacets(iParams, writers[0].indexWriter, writers[0].taxWriter, "a", "b");
260
261     // Commit Changes
262     writers[0].indexWriter.commit();
263     writers[0].taxWriter.commit();
264
265     // Open readers
266     IndexTaxonomyReaderPair[] readers = FacetTestUtils.createIndexTaxonomyReaderPair(dirs);
267
268     // As this is the first time we have invoked the TotalFacetCountsManager, 
269     // we should expect to compute and not read from disk.
270     TotalFacetCounts totalCounts = 
271       TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
272     int prevGen = assertRecomputed(totalCounts, 0, "after first attempt to get it!");
273
274     // Repeating same operation should pull from the cache - not recomputed. 
275     assertTrue("Should be obtained from cache at 2nd attempt",totalCounts == 
276       TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null));
277
278     // Repeat the same operation as above. but clear first - now should recompute again
279     initCache();
280     totalCounts = TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
281     prevGen = assertRecomputed(totalCounts, prevGen, "after cache clear, 3rd attempt to get it!");
282     
283     //store to file
284     File outputFile = _TestUtil.createTempFile("test", "tmp", TEMP_DIR);
285     initCache();
286     TFC.store(outputFile, readers[0].indexReader, readers[0].taxReader, iParams, null);
287     totalCounts = TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
288     prevGen = assertRecomputed(totalCounts, prevGen, "after cache clear, 4th attempt to get it!");
289
290     //clear and load
291     initCache();
292     TFC.load(outputFile, readers[0].indexReader, readers[0].taxReader, iParams);
293     totalCounts = TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
294     prevGen = assertReadFromDisc(totalCounts, prevGen, "after 5th attempt to get it!");
295
296     // Add a new facet to the index, commit and refresh readers
297     addFacets(iParams, writers[0].indexWriter, writers[0].taxWriter, "c", "d");
298     writers[0].indexWriter.close();
299     writers[0].taxWriter.close();
300
301     readers[0].taxReader.refresh();
302     IndexReader r2 = readers[0].indexReader.reopen();
303     // Hold on to the 'original' reader so we can do some checks with it
304     IndexReader origReader = null;
305
306     assertTrue("Reader must be updated!", readers[0].indexReader != r2);
307     
308     // Set the 'original' reader
309     origReader = readers[0].indexReader;
310     // Set the new master index Reader
311     readers[0].indexReader = r2;
312
313     // Try to get total-counts the originalReader AGAIN, just for sanity. Should pull from the cache - not recomputed. 
314     assertTrue("Should be obtained from cache at 6th attempt",totalCounts == 
315       TFC.getTotalCounts(origReader, readers[0].taxReader, iParams, null));
316
317     // now use the new reader - should recompute
318     totalCounts = TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
319     prevGen = assertRecomputed(totalCounts, prevGen, "after updating the index - 7th attempt!");
320
321     // try again - should not recompute
322     assertTrue("Should be obtained from cache at 8th attempt",totalCounts == 
323       TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null));
324     
325     // delete a doc from the reader and commit - should recompute
326     origReader.close();
327     origReader = readers[0].indexReader;
328     readers[0].indexReader = IndexReader.open(origReader.directory(),false);
329     initCache();
330     totalCounts = TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
331     prevGen = assertRecomputed(totalCounts, prevGen, "after opening a writable reader - 9th attempt!");
332     // now do the delete
333     readers[0].indexReader.deleteDocument(1);
334     readers[0].indexReader.commit(null);
335     totalCounts = TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
336     prevGen = assertRecomputed(totalCounts, prevGen, "after deleting docs the index - 10th attempt!");
337     
338     origReader.close();
339     readers[0].close();
340     r2.close();
341     outputFile.delete();
342     IOUtils.close(dirs[0]);
343   }
344
345   private int assertReadFromDisc(TotalFacetCounts totalCounts, int prevGen, String errMsg) {
346     assertEquals("should read from disk "+errMsg, CreationType.Loaded, totalCounts.createType4test);
347     int gen4test = totalCounts.gen4test;
348     assertTrue("should read from disk "+errMsg, gen4test > prevGen);
349     return gen4test;
350   }
351   
352   private int assertRecomputed(TotalFacetCounts totalCounts, int prevGen, String errMsg) {
353     assertEquals("should recompute "+errMsg, CreationType.Computed, totalCounts.createType4test);
354     int gen4test = totalCounts.gen4test;
355     assertTrue("should recompute "+errMsg, gen4test > prevGen);
356     return gen4test;
357   }
358
359   /**
360    * This test is to address a bug in a previous version.  If a TFC cache is
361    * written to disk, and then the taxonomy grows (but the index does not change),
362    * and then the TFC cache is re-read from disk, there will be an exception
363    * thrown, as the integers are read off of the disk according to taxonomy
364    * size, which has changed.
365    */
366   @Test
367   public void testGrowingTaxonomy() throws Exception {
368     // Create temporary RAMDirectories
369     Directory[][] dirs = FacetTestUtils.createIndexTaxonomyDirs(1);
370     // Create our index/taxonomy writers
371     IndexTaxonomyWriterPair[] writers = FacetTestUtils
372     .createIndexTaxonomyWriterPair(dirs);
373     DefaultFacetIndexingParams iParams = new DefaultFacetIndexingParams() {
374       @Override
375       protected int fixedPartitionSize() {
376         return 2;
377       }
378     };
379     // Add a facet to the index
380     addFacets(iParams, writers[0].indexWriter, writers[0].taxWriter, "a", "b");
381     // Commit Changes
382     writers[0].indexWriter.commit();
383     writers[0].taxWriter.commit();
384
385     IndexTaxonomyReaderPair[] readers = FacetTestUtils.createIndexTaxonomyReaderPair(dirs);
386
387     // Create TFC and write cache to disk
388     File outputFile = _TestUtil.createTempFile("test", "tmp", TEMP_DIR);
389     TFC.store(outputFile, readers[0].indexReader, readers[0].taxReader, iParams, null);
390     
391     // Make the taxonomy grow without touching the index
392     for (int i = 0; i < 10; i++) {
393       writers[0].taxWriter.addCategory(new CategoryPath("foo", Integer.toString(i)));
394     }
395     writers[0].taxWriter.commit();
396     readers[0].taxReader.refresh();
397
398     initCache();
399
400     // With the bug, this next call should result in an exception
401     TFC.load(outputFile, readers[0].indexReader, readers[0].taxReader, iParams);
402     TotalFacetCounts totalCounts = TFC.getTotalCounts(
403         readers[0].indexReader, readers[0].taxReader, iParams, null);
404     assertReadFromDisc(totalCounts, 0, "after reading from disk.");
405     outputFile.delete();
406     writers[0].close();
407     readers[0].close();
408     IOUtils.close(dirs[0]);
409   }
410
411   /**
412    * Test that a new TFC is only calculated and placed in memory (by two
413    * threads who want it at the same time) only once.
414    */
415   @Test
416   public void testMemoryCacheSynchronization() throws Exception {
417     SlowRAMDirectory indexDir = new SlowRAMDirectory(-1, null);
418     SlowRAMDirectory taxoDir = new SlowRAMDirectory(-1, null);
419
420     // Write index using 'normal' directories
421     IndexWriter w = new IndexWriter(indexDir, new IndexWriterConfig(
422         TEST_VERSION_CURRENT, new MockAnalyzer(random, MockTokenizer.WHITESPACE, false)));
423     DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(taxoDir);
424     DefaultFacetIndexingParams iParams = new DefaultFacetIndexingParams();
425     // Add documents and facets
426     for (int i = 0; i < 1000; i++) {
427       addFacets(iParams, w, tw, "facet", Integer.toString(i));
428     }
429     w.close();
430     tw.close();
431
432     indexDir.setSleepMillis(1);
433     taxoDir.setSleepMillis(1);
434
435     IndexReader r = IndexReader.open(indexDir);
436     DirectoryTaxonomyReader tr = new DirectoryTaxonomyReader(taxoDir);
437
438     // Create and start threads. Thread1 should lock the cache and calculate
439     // the TFC array. The second thread should block until the first is
440     // done, then successfully retrieve from the cache without recalculating
441     // or reading from disk.
442     TFCThread tfcCalc1 = new TFCThread(r, tr, iParams);
443     TFCThread tfcCalc2 = new TFCThread(r, tr, iParams);
444     tfcCalc1.start();
445     // Give thread 1 a head start to ensure correct sequencing for testing
446     Thread.sleep(5);
447     tfcCalc2.start();
448
449     tfcCalc1.join();
450     tfcCalc2.join();
451
452     // Since this test ends up with references to the same TFC object, we
453     // can only test the times to make sure that they are the same.
454     assertRecomputed(tfcCalc1.tfc, 0, "thread 1 should recompute");
455     assertRecomputed(tfcCalc2.tfc, 0, "thread 2 should recompute");
456     assertTrue("Both results should be the same (as their inputs are the same objects)",
457         tfcCalc1.tfc == tfcCalc2.tfc);
458
459     r.close();
460     tr.close();
461   }
462
463   /**
464    * Simple test to make sure the TotalFacetCountsManager updates the
465    * TotalFacetCounts array only when it is supposed to, and whether it
466    * is recomputed or read from disk, but this time with TWO different
467    * TotalFacetCounts
468    */
469   @Test
470   public void testMultipleIndices() throws IOException {
471     // Create temporary RAMDirectories
472     Directory[][] dirs = FacetTestUtils.createIndexTaxonomyDirs(2);
473     // Create our index/taxonomy writers
474     IndexTaxonomyWriterPair[] writers = FacetTestUtils.createIndexTaxonomyWriterPair(dirs);
475     DefaultFacetIndexingParams iParams = new DefaultFacetIndexingParams();
476
477     // Add a facet to the index
478     addFacets(iParams, writers[0].indexWriter, writers[0].taxWriter, "a", "b");
479     addFacets(iParams, writers[1].indexWriter, writers[1].taxWriter, "d", "e");
480     // Commit Changes
481     writers[0].indexWriter.commit();
482     writers[0].taxWriter.commit();
483     writers[1].indexWriter.commit();
484     writers[1].taxWriter.commit();
485
486     // Open two readers
487     IndexTaxonomyReaderPair[] readers = FacetTestUtils.createIndexTaxonomyReaderPair(dirs);
488
489     // As this is the first time we have invoked the TotalFacetCountsManager, we
490     // should expect to compute.
491     TotalFacetCounts totalCounts0 = 
492       TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
493     int prevGen = -1;
494     prevGen = assertRecomputed(totalCounts0, prevGen, "after attempt 1");
495     assertTrue("attempt 1b for same input [0] shout find it in cache",
496         totalCounts0 == TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null));
497     
498     // 2nd Reader - As this is the first time we have invoked the
499     // TotalFacetCountsManager, we should expect a state of NEW to be returned.
500     TotalFacetCounts totalCounts1 = 
501       TFC.getTotalCounts(readers[1].indexReader, readers[1].taxReader, iParams, null);
502     prevGen = assertRecomputed(totalCounts1, prevGen, "after attempt 2");
503     assertTrue("attempt 2b for same input [1] shout find it in cache",
504         totalCounts1 == TFC.getTotalCounts(readers[1].indexReader, readers[1].taxReader, iParams, null));
505
506     // Right now cache size is one, so first TFC is gone and should be recomputed  
507     totalCounts0 = 
508       TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
509     prevGen = assertRecomputed(totalCounts0, prevGen, "after attempt 3");
510     
511     // Similarly will recompute the second result  
512     totalCounts1 = 
513       TFC.getTotalCounts(readers[1].indexReader, readers[1].taxReader, iParams, null);
514     prevGen = assertRecomputed(totalCounts1, prevGen, "after attempt 4");
515
516     // Now we set the cache size to two, meaning both should exist in the
517     // cache simultaneously
518     TFC.setCacheSize(2);
519
520     // Re-compute totalCounts0 (was evicted from the cache when the cache was smaller)
521     totalCounts0 = 
522       TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null);
523     prevGen = assertRecomputed(totalCounts0, prevGen, "after attempt 5");
524
525     // now both are in the larger cache and should not be recomputed 
526     totalCounts1 = TFC.getTotalCounts(readers[1].indexReader,
527         readers[1].taxReader, iParams, null);
528     assertTrue("with cache of size 2 res no. 0 should come from cache",
529         totalCounts0 == TFC.getTotalCounts(readers[0].indexReader, readers[0].taxReader, iParams, null));
530     assertTrue("with cache of size 2 res no. 1 should come from cache",
531         totalCounts1 == TFC.getTotalCounts(readers[1].indexReader, readers[1].taxReader, iParams, null));
532     
533     writers[0].close();
534     writers[1].close();
535     readers[0].close();
536     readers[1].close();
537     for (Directory[] dirset : dirs) {
538       IOUtils.close(dirset);
539     }
540   }
541
542 }