add --shared
[pylucene.git] / lucene-java-3.4.0 / lucene / contrib / benchmark / src / java / org / apache / lucene / benchmark / byTask / tasks / TaskSequence.java
1 package org.apache.lucene.benchmark.byTask.tasks;
2
3 /**
4  * Licensed to the Apache Software Foundation (ASF) under one or more
5  * contributor license agreements.  See the NOTICE file distributed with
6  * this work for additional information regarding copyright ownership.
7  * The ASF licenses this file to You under the Apache License, Version 2.0
8  * (the "License"); you may not use this file except in compliance with
9  * the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.text.NumberFormat;
23
24 import org.apache.lucene.benchmark.byTask.PerfRunData;
25 import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
26 import org.apache.lucene.benchmark.byTask.stats.TaskStats;
27 import org.apache.lucene.util.ArrayUtil;
28
29 /**
30  * Sequence of parallel or sequential tasks.
31  */
32 public class TaskSequence extends PerfTask {
33   public static int REPEAT_EXHAUST = -2; 
34   private ArrayList<PerfTask> tasks;
35   private int repetitions = 1;
36   private boolean parallel;
37   private TaskSequence parent;
38   private boolean letChildReport = true;
39   private int rate = 0;
40   private boolean perMin = false; // rate, if set, is, by default, be sec.
41   private String seqName;
42   private boolean exhausted = false;
43   private boolean resetExhausted = false;
44   private PerfTask[] tasksArray;
45   private boolean anyExhaustibleTasks;
46   private boolean collapsable = false; // to not collapse external sequence named in alg.  
47   
48   private boolean fixedTime;                      // true if we run for fixed time
49   private double runTimeSec;                      // how long to run for
50   private final long logByTimeMsec;
51
52   public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) {
53     super(runData);
54     collapsable = (name == null);
55     name = (name!=null ? name : (parallel ? "Par" : "Seq"));
56     setName(name);
57     setSequenceName();
58     this.parent = parent;
59     this.parallel = parallel;
60     tasks = new ArrayList<PerfTask>();
61     logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0);
62   }
63
64   @Override
65   public void close() throws Exception {
66     initTasksArray();
67     for(int i=0;i<tasksArray.length;i++) {
68       tasksArray[i].close();
69     }
70     getRunData().getDocMaker().close();
71   }
72
73   private void initTasksArray() {
74     if (tasksArray == null) {
75       final int numTasks = tasks.size();
76       tasksArray = new PerfTask[numTasks];
77       for(int k=0;k<numTasks;k++) {
78         tasksArray[k] = tasks.get(k);
79         anyExhaustibleTasks |= tasksArray[k] instanceof ResetInputsTask;
80         anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
81       }
82     }
83     if (!parallel && logByTimeMsec != 0 && !letChildReport) {
84       countsByTime = new int[1];
85     }
86   }
87
88   /**
89    * @return Returns the parallel.
90    */
91   public boolean isParallel() {
92     return parallel;
93   }
94
95   /**
96    * @return Returns the repetitions.
97    */
98   public int getRepetitions() {
99     return repetitions;
100   }
101
102   private int[] countsByTime;
103
104   public void setRunTime(double sec) throws Exception {
105     runTimeSec = sec;
106     fixedTime = true;
107   }
108
109   /**
110    * @param repetitions The repetitions to set.
111    * @throws Exception 
112    */
113   public void setRepetitions(int repetitions) throws Exception {
114     fixedTime = false;
115     this.repetitions = repetitions;
116     if (repetitions==REPEAT_EXHAUST) {
117       if (isParallel()) {
118         throw new Exception("REPEAT_EXHAUST is not allowed for parallel tasks");
119       }
120     }
121     setSequenceName();
122   }
123
124   /**
125    * @return Returns the parent.
126    */
127   public TaskSequence getParent() {
128     return parent;
129   }
130
131   /*
132    * (non-Javadoc)
133    * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#doLogic()
134    */
135   @Override
136   public int doLogic() throws Exception {
137     exhausted = resetExhausted = false;
138     return ( parallel ? doParallelTasks() : doSerialTasks());
139   }
140
141   private static class RunBackgroundTask extends Thread {
142     private final PerfTask task;
143     private final boolean letChildReport;
144     private volatile int count;
145
146     public RunBackgroundTask(PerfTask task, boolean letChildReport) {
147       this.task = task;
148       this.letChildReport = letChildReport;
149     }
150
151     public void stopNow() throws InterruptedException {
152       task.stopNow();
153     }
154
155     public int getCount() {
156       return count;
157     }
158
159     @Override
160     public void run() {
161       try {
162         count = task.runAndMaybeStats(letChildReport);
163       } catch (Exception e) {
164         throw new RuntimeException(e);
165       }
166     }
167   }
168
169   private int doSerialTasks() throws Exception {
170     if (rate > 0) {
171       return doSerialTasksWithRate();
172     }
173     
174     initTasksArray();
175     int count = 0;
176
177     final long runTime = (long) (runTimeSec*1000);
178     List<RunBackgroundTask> bgTasks = null;
179
180     final long t0 = System.currentTimeMillis();
181     for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
182       if (stopNow) {
183         break;
184       }
185       for(int l=0;l<tasksArray.length;l++) {
186         final PerfTask task = tasksArray[l];
187         if (task.getRunInBackground()) {
188           if (bgTasks == null) {
189             bgTasks = new ArrayList<RunBackgroundTask>();
190           }
191           RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
192           bgTask.setPriority(task.getBackgroundDeltaPriority() + Thread.currentThread().getPriority());
193           bgTask.start();
194           bgTasks.add(bgTask);
195         } else {
196           try {
197             final int inc = task.runAndMaybeStats(letChildReport);
198             count += inc;
199             if (countsByTime != null) {
200               final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
201               if (slot >= countsByTime.length) {
202                 countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
203               }
204               countsByTime[slot] += inc;
205             }
206             if (anyExhaustibleTasks)
207               updateExhausted(task);
208           } catch (NoMoreDataException e) {
209             exhausted = true;
210           }
211         }
212       }
213       if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
214         repetitions = k+1;
215         break;
216       }
217     }
218
219     if (bgTasks != null) {
220       for(RunBackgroundTask bgTask : bgTasks) {
221         bgTask.stopNow();
222       }
223       for(RunBackgroundTask bgTask : bgTasks) {
224         bgTask.join();
225         count += bgTask.getCount();
226       }
227     }
228
229     if (countsByTime != null) {
230       getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec);
231     }
232
233     stopNow = false;
234
235     return count;
236   }
237
238   private int doSerialTasksWithRate() throws Exception {
239     initTasksArray();
240     long delayStep = (perMin ? 60000 : 1000) /rate;
241     long nextStartTime = System.currentTimeMillis();
242     int count = 0;
243     final long t0 = System.currentTimeMillis();
244     for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
245       if (stopNow) {
246         break;
247       }
248       for (int l=0;l<tasksArray.length;l++) {
249         final PerfTask task = tasksArray[l];
250         while(!stopNow) {
251           long waitMore = nextStartTime - System.currentTimeMillis();
252           if (waitMore > 0) {
253             // TODO: better to use condition to notify
254             Thread.sleep(1);
255           } else {
256             break;
257           }
258         }
259         if (stopNow) {
260           break;
261         }
262         nextStartTime += delayStep; // this aims at avarage rate. 
263         try {
264           final int inc = task.runAndMaybeStats(letChildReport);
265           count += inc;
266           if (countsByTime != null) {
267             final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
268             if (slot >= countsByTime.length) {
269               countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
270             }
271             countsByTime[slot] += inc;
272           }
273
274           if (anyExhaustibleTasks)
275             updateExhausted(task);
276         } catch (NoMoreDataException e) {
277           exhausted = true;
278         }
279       }
280     }
281     stopNow = false;
282     return count;
283   }
284
285   // update state regarding exhaustion.
286   private void updateExhausted(PerfTask task) {
287     if (task instanceof ResetInputsTask) {
288       exhausted = false;
289       resetExhausted = true;
290     } else if (task instanceof TaskSequence) {
291       TaskSequence t = (TaskSequence) task;
292       if (t.resetExhausted) {
293         exhausted = false;
294         resetExhausted = true;
295         t.resetExhausted = false;
296       } else {
297         exhausted |= t.exhausted;
298       }
299     }
300   }
301
302   private class ParallelTask extends Thread {
303
304     public int count;
305     public final PerfTask task;
306
307     public ParallelTask(PerfTask task) {
308       this.task = task;
309     }
310
311     @Override
312     public void run() {
313       try {
314         int n = task.runAndMaybeStats(letChildReport);
315         if (anyExhaustibleTasks) {
316           updateExhausted(task);
317         }
318         count += n;
319       } catch (NoMoreDataException e) {
320         exhausted = true;
321       } catch (Exception e) {
322         throw new RuntimeException(e);
323       }
324     }
325   }
326
327   @Override
328   public void stopNow() {
329     super.stopNow();
330     // Forwards top request to children
331     if (runningParallelTasks != null) {
332       for(ParallelTask t : runningParallelTasks) {
333         t.task.stopNow();
334       }
335     }
336   }
337
338   ParallelTask[] runningParallelTasks;
339
340   private int doParallelTasks() throws Exception {
341
342     final TaskStats stats = getRunData().getPoints().getCurrentStats();
343
344     initTasksArray();
345     ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
346     // prepare threads
347     int index = 0;
348     for (int k=0; k<repetitions; k++) {
349       for (int i = 0; i < tasksArray.length; i++) {
350         final PerfTask task = (PerfTask) tasksArray[i].clone();
351         t[index++] = new ParallelTask(task);
352       }
353     }
354     // run threads
355     startThreads(t);
356
357     // wait for all threads to complete
358     int count = 0;
359     for (int i = 0; i < t.length; i++) {
360       t[i].join();
361       count += t[i].count;
362       if (t[i].task instanceof TaskSequence) {
363         TaskSequence sub = (TaskSequence) t[i].task;
364         if (sub.countsByTime != null) {
365           if (countsByTime == null) {
366             countsByTime = new int[sub.countsByTime.length];
367           } else if (countsByTime.length < sub.countsByTime.length) {
368             countsByTime = ArrayUtil.grow(countsByTime, sub.countsByTime.length);
369           }
370           for(int j=0;j<sub.countsByTime.length;j++) {
371             countsByTime[j] += sub.countsByTime[j];
372           }
373         }
374       }
375     }
376
377     if (countsByTime != null) {
378       stats.setCountsByTime(countsByTime, logByTimeMsec);
379     }
380
381     // return total count
382     return count;
383   }
384
385   // run threads
386   private void startThreads(ParallelTask[] t) throws InterruptedException {
387     if (rate > 0) {
388       startlThreadsWithRate(t);
389       return;
390     }
391     for (int i = 0; i < t.length; i++) {
392       t[i].start();
393     }
394   }
395
396   // run threads with rate
397   private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
398     long delayStep = (perMin ? 60000 : 1000) /rate;
399     long nextStartTime = System.currentTimeMillis();
400     for (int i = 0; i < t.length; i++) {
401       long waitMore = nextStartTime - System.currentTimeMillis();
402       if (waitMore > 0) {
403         Thread.sleep(waitMore);
404       }
405       nextStartTime += delayStep; // this aims at average rate of starting threads. 
406       t[i].start();
407     }
408   }
409
410   public void addTask(PerfTask task) {
411     tasks.add(task);
412     task.setDepth(getDepth()+1);
413   }
414   
415   /* (non-Javadoc)
416    * @see java.lang.Object#toString()
417    */
418   @Override
419   public String toString() {
420     String padd = getPadding();
421     StringBuilder sb = new StringBuilder(super.toString());
422     sb.append(parallel ? " [" : " {");
423     sb.append(NEW_LINE);
424     for (final PerfTask task : tasks) {
425       sb.append(task.toString());
426       sb.append(NEW_LINE);
427     }
428     sb.append(padd);
429     sb.append(!letChildReport ? ">" : (parallel ? "]" : "}"));
430     if (fixedTime) {
431       sb.append(" " + NumberFormat.getNumberInstance().format(runTimeSec) + "s");
432     } else if (repetitions>1) {
433       sb.append(" * " + repetitions);
434     } else if (repetitions==REPEAT_EXHAUST) {
435       sb.append(" * EXHAUST");
436     }
437     if (rate>0) {
438       sb.append(",  rate: " + rate+"/"+(perMin?"min":"sec"));
439     }
440     if (getRunInBackground()) {
441       sb.append(" &");
442       int x = getBackgroundDeltaPriority();
443       if (x != 0) {
444         sb.append(x);
445       }
446     }
447     return sb.toString();
448   }
449
450   /**
451    * Execute child tasks in a way that they do not report their time separately.
452    */
453   public void setNoChildReport() {
454     letChildReport  = false;
455     for (final PerfTask task : tasks) {
456       if (task instanceof TaskSequence) {
457         ((TaskSequence)task).setNoChildReport();
458   }
459     }
460   }
461
462   /**
463    * Returns the rate per minute: how many operations should be performed in a minute.
464    * If 0 this has no effect.
465    * @return the rate per min: how many operations should be performed in a minute.
466    */
467   public int getRate() {
468     return (perMin ? rate : 60*rate);
469   }
470
471   /**
472    * @param rate The rate to set.
473    */
474   public void setRate(int rate, boolean perMin) {
475     this.rate = rate;
476     this.perMin = perMin;
477     setSequenceName();
478   }
479
480   private void setSequenceName() {
481     seqName = super.getName();
482     if (repetitions==REPEAT_EXHAUST) {
483       seqName += "_Exhaust";
484     } else if (repetitions>1) {
485       seqName += "_"+repetitions;
486     }
487     if (rate>0) {
488       seqName += "_" + rate + (perMin?"/min":"/sec"); 
489     }
490     if (parallel && seqName.toLowerCase().indexOf("par")<0) {
491       seqName += "_Par";
492     }
493   }
494
495   @Override
496   public String getName() {
497     return seqName; // override to include more info 
498   }
499
500   /**
501    * @return Returns the tasks.
502    */
503   public ArrayList<PerfTask> getTasks() {
504     return tasks;
505   }
506
507   /* (non-Javadoc)
508    * @see java.lang.Object#clone()
509    */
510   @Override
511   protected Object clone() throws CloneNotSupportedException {
512     TaskSequence res = (TaskSequence) super.clone();
513     res.tasks = new ArrayList<PerfTask>();
514     for (int i = 0; i < tasks.size(); i++) {
515       res.tasks.add((PerfTask)tasks.get(i).clone());
516     }
517     return res;
518   }
519
520   /**
521    * Return true if can be collapsed in case it is outermost sequence
522    */
523   public boolean isCollapsable() {
524     return collapsable;
525   }
526   
527 }