1 package org.apache.lucene.benchmark.byTask.tasks;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.text.NumberFormat;
24 import org.apache.lucene.benchmark.byTask.PerfRunData;
25 import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
26 import org.apache.lucene.benchmark.byTask.stats.TaskStats;
27 import org.apache.lucene.util.ArrayUtil;
30 * Sequence of parallel or sequential tasks.
32 public class TaskSequence extends PerfTask {
33 public static int REPEAT_EXHAUST = -2;
34 private ArrayList<PerfTask> tasks;
35 private int repetitions = 1;
36 private boolean parallel;
37 private TaskSequence parent;
38 private boolean letChildReport = true;
40 private boolean perMin = false; // rate, if set, is, by default, be sec.
41 private String seqName;
42 private boolean exhausted = false;
43 private boolean resetExhausted = false;
44 private PerfTask[] tasksArray;
45 private boolean anyExhaustibleTasks;
46 private boolean collapsable = false; // to not collapse external sequence named in alg.
48 private boolean fixedTime; // true if we run for fixed time
49 private double runTimeSec; // how long to run for
50 private final long logByTimeMsec;
52 public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) {
54 collapsable = (name == null);
55 name = (name!=null ? name : (parallel ? "Par" : "Seq"));
59 this.parallel = parallel;
60 tasks = new ArrayList<PerfTask>();
61 logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0);
65 public void close() throws Exception {
67 for(int i=0;i<tasksArray.length;i++) {
68 tasksArray[i].close();
70 getRunData().getDocMaker().close();
73 private void initTasksArray() {
74 if (tasksArray == null) {
75 final int numTasks = tasks.size();
76 tasksArray = new PerfTask[numTasks];
77 for(int k=0;k<numTasks;k++) {
78 tasksArray[k] = tasks.get(k);
79 anyExhaustibleTasks |= tasksArray[k] instanceof ResetInputsTask;
80 anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
83 if (!parallel && logByTimeMsec != 0 && !letChildReport) {
84 countsByTime = new int[1];
89 * @return Returns the parallel.
91 public boolean isParallel() {
96 * @return Returns the repetitions.
98 public int getRepetitions() {
102 private int[] countsByTime;
104 public void setRunTime(double sec) throws Exception {
110 * @param repetitions The repetitions to set.
113 public void setRepetitions(int repetitions) throws Exception {
115 this.repetitions = repetitions;
116 if (repetitions==REPEAT_EXHAUST) {
118 throw new Exception("REPEAT_EXHAUST is not allowed for parallel tasks");
125 * @return Returns the parent.
127 public TaskSequence getParent() {
133 * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#doLogic()
136 public int doLogic() throws Exception {
137 exhausted = resetExhausted = false;
138 return ( parallel ? doParallelTasks() : doSerialTasks());
141 private static class RunBackgroundTask extends Thread {
142 private final PerfTask task;
143 private final boolean letChildReport;
144 private volatile int count;
146 public RunBackgroundTask(PerfTask task, boolean letChildReport) {
148 this.letChildReport = letChildReport;
151 public void stopNow() throws InterruptedException {
155 public int getCount() {
162 count = task.runAndMaybeStats(letChildReport);
163 } catch (Exception e) {
164 throw new RuntimeException(e);
169 private int doSerialTasks() throws Exception {
171 return doSerialTasksWithRate();
177 final long runTime = (long) (runTimeSec*1000);
178 List<RunBackgroundTask> bgTasks = null;
180 final long t0 = System.currentTimeMillis();
181 for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
185 for(int l=0;l<tasksArray.length;l++) {
186 final PerfTask task = tasksArray[l];
187 if (task.getRunInBackground()) {
188 if (bgTasks == null) {
189 bgTasks = new ArrayList<RunBackgroundTask>();
191 RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
192 bgTask.setPriority(task.getBackgroundDeltaPriority() + Thread.currentThread().getPriority());
197 final int inc = task.runAndMaybeStats(letChildReport);
199 if (countsByTime != null) {
200 final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
201 if (slot >= countsByTime.length) {
202 countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
204 countsByTime[slot] += inc;
206 if (anyExhaustibleTasks)
207 updateExhausted(task);
208 } catch (NoMoreDataException e) {
213 if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
219 if (bgTasks != null) {
220 for(RunBackgroundTask bgTask : bgTasks) {
223 for(RunBackgroundTask bgTask : bgTasks) {
225 count += bgTask.getCount();
229 if (countsByTime != null) {
230 getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec);
238 private int doSerialTasksWithRate() throws Exception {
240 long delayStep = (perMin ? 60000 : 1000) /rate;
241 long nextStartTime = System.currentTimeMillis();
243 final long t0 = System.currentTimeMillis();
244 for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
248 for (int l=0;l<tasksArray.length;l++) {
249 final PerfTask task = tasksArray[l];
251 long waitMore = nextStartTime - System.currentTimeMillis();
253 // TODO: better to use condition to notify
262 nextStartTime += delayStep; // this aims at avarage rate.
264 final int inc = task.runAndMaybeStats(letChildReport);
266 if (countsByTime != null) {
267 final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
268 if (slot >= countsByTime.length) {
269 countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
271 countsByTime[slot] += inc;
274 if (anyExhaustibleTasks)
275 updateExhausted(task);
276 } catch (NoMoreDataException e) {
285 // update state regarding exhaustion.
286 private void updateExhausted(PerfTask task) {
287 if (task instanceof ResetInputsTask) {
289 resetExhausted = true;
290 } else if (task instanceof TaskSequence) {
291 TaskSequence t = (TaskSequence) task;
292 if (t.resetExhausted) {
294 resetExhausted = true;
295 t.resetExhausted = false;
297 exhausted |= t.exhausted;
302 private class ParallelTask extends Thread {
305 public final PerfTask task;
307 public ParallelTask(PerfTask task) {
314 int n = task.runAndMaybeStats(letChildReport);
315 if (anyExhaustibleTasks) {
316 updateExhausted(task);
319 } catch (NoMoreDataException e) {
321 } catch (Exception e) {
322 throw new RuntimeException(e);
328 public void stopNow() {
330 // Forwards top request to children
331 if (runningParallelTasks != null) {
332 for(ParallelTask t : runningParallelTasks) {
338 ParallelTask[] runningParallelTasks;
340 private int doParallelTasks() throws Exception {
342 final TaskStats stats = getRunData().getPoints().getCurrentStats();
345 ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
348 for (int k=0; k<repetitions; k++) {
349 for (int i = 0; i < tasksArray.length; i++) {
350 final PerfTask task = (PerfTask) tasksArray[i].clone();
351 t[index++] = new ParallelTask(task);
357 // wait for all threads to complete
359 for (int i = 0; i < t.length; i++) {
362 if (t[i].task instanceof TaskSequence) {
363 TaskSequence sub = (TaskSequence) t[i].task;
364 if (sub.countsByTime != null) {
365 if (countsByTime == null) {
366 countsByTime = new int[sub.countsByTime.length];
367 } else if (countsByTime.length < sub.countsByTime.length) {
368 countsByTime = ArrayUtil.grow(countsByTime, sub.countsByTime.length);
370 for(int j=0;j<sub.countsByTime.length;j++) {
371 countsByTime[j] += sub.countsByTime[j];
377 if (countsByTime != null) {
378 stats.setCountsByTime(countsByTime, logByTimeMsec);
381 // return total count
386 private void startThreads(ParallelTask[] t) throws InterruptedException {
388 startlThreadsWithRate(t);
391 for (int i = 0; i < t.length; i++) {
396 // run threads with rate
397 private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
398 long delayStep = (perMin ? 60000 : 1000) /rate;
399 long nextStartTime = System.currentTimeMillis();
400 for (int i = 0; i < t.length; i++) {
401 long waitMore = nextStartTime - System.currentTimeMillis();
403 Thread.sleep(waitMore);
405 nextStartTime += delayStep; // this aims at average rate of starting threads.
410 public void addTask(PerfTask task) {
412 task.setDepth(getDepth()+1);
416 * @see java.lang.Object#toString()
419 public String toString() {
420 String padd = getPadding();
421 StringBuilder sb = new StringBuilder(super.toString());
422 sb.append(parallel ? " [" : " {");
424 for (final PerfTask task : tasks) {
425 sb.append(task.toString());
429 sb.append(!letChildReport ? ">" : (parallel ? "]" : "}"));
431 sb.append(" " + NumberFormat.getNumberInstance().format(runTimeSec) + "s");
432 } else if (repetitions>1) {
433 sb.append(" * " + repetitions);
434 } else if (repetitions==REPEAT_EXHAUST) {
435 sb.append(" * EXHAUST");
438 sb.append(", rate: " + rate+"/"+(perMin?"min":"sec"));
440 if (getRunInBackground()) {
442 int x = getBackgroundDeltaPriority();
447 return sb.toString();
451 * Execute child tasks in a way that they do not report their time separately.
453 public void setNoChildReport() {
454 letChildReport = false;
455 for (final PerfTask task : tasks) {
456 if (task instanceof TaskSequence) {
457 ((TaskSequence)task).setNoChildReport();
463 * Returns the rate per minute: how many operations should be performed in a minute.
464 * If 0 this has no effect.
465 * @return the rate per min: how many operations should be performed in a minute.
467 public int getRate() {
468 return (perMin ? rate : 60*rate);
472 * @param rate The rate to set.
474 public void setRate(int rate, boolean perMin) {
476 this.perMin = perMin;
480 private void setSequenceName() {
481 seqName = super.getName();
482 if (repetitions==REPEAT_EXHAUST) {
483 seqName += "_Exhaust";
484 } else if (repetitions>1) {
485 seqName += "_"+repetitions;
488 seqName += "_" + rate + (perMin?"/min":"/sec");
490 if (parallel && seqName.toLowerCase().indexOf("par")<0) {
496 public String getName() {
497 return seqName; // override to include more info
501 * @return Returns the tasks.
503 public ArrayList<PerfTask> getTasks() {
508 * @see java.lang.Object#clone()
511 protected Object clone() throws CloneNotSupportedException {
512 TaskSequence res = (TaskSequence) super.clone();
513 res.tasks = new ArrayList<PerfTask>();
514 for (int i = 0; i < tasks.size(); i++) {
515 res.tasks.add((PerfTask)tasks.get(i).clone());
521 * Return true if can be collapsed in case it is outermost sequence
523 public boolean isCollapsable() {