1 package org.apache.lucene.benchmark.byTask.tasks;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.text.NumberFormat;
24 import org.apache.lucene.benchmark.byTask.PerfRunData;
25 import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
26 import org.apache.lucene.benchmark.byTask.stats.TaskStats;
27 import org.apache.lucene.util.ArrayUtil;
30 * Sequence of parallel or sequential tasks.
32 public class TaskSequence extends PerfTask {
33 public static int REPEAT_EXHAUST = -2;
34 private ArrayList<PerfTask> tasks;
35 private int repetitions = 1;
36 private boolean parallel;
37 private TaskSequence parent;
38 private boolean letChildReport = true;
40 private boolean perMin = false; // rate, if set, is, by default, be sec.
41 private String seqName;
42 private boolean exhausted = false;
43 private boolean resetExhausted = false;
44 private PerfTask[] tasksArray;
45 private boolean anyExhaustibleTasks;
46 private boolean collapsable = false; // to not collapse external sequence named in alg.
48 private boolean fixedTime; // true if we run for fixed time
49 private double runTimeSec; // how long to run for
50 private final long logByTimeMsec;
52 public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) {
54 collapsable = (name == null);
55 name = (name!=null ? name : (parallel ? "Par" : "Seq"));
59 this.parallel = parallel;
60 tasks = new ArrayList<PerfTask>();
61 logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0);
65 public void close() throws Exception {
67 for(int i=0;i<tasksArray.length;i++) {
68 tasksArray[i].close();
70 getRunData().getDocMaker().close();
73 private void initTasksArray() {
74 if (tasksArray == null) {
75 final int numTasks = tasks.size();
76 tasksArray = new PerfTask[numTasks];
77 for(int k=0;k<numTasks;k++) {
78 tasksArray[k] = tasks.get(k);
79 anyExhaustibleTasks |= tasksArray[k] instanceof ResetInputsTask;
80 anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
83 if (!parallel && logByTimeMsec != 0 && !letChildReport) {
84 countsByTime = new int[1];
89 * @return Returns the parallel.
91 public boolean isParallel() {
96 * @return Returns the repetitions.
98 public int getRepetitions() {
102 private int[] countsByTime;
104 public void setRunTime(double sec) throws Exception {
110 * @param repetitions The repetitions to set.
113 public void setRepetitions(int repetitions) throws Exception {
115 this.repetitions = repetitions;
116 if (repetitions==REPEAT_EXHAUST) {
118 throw new Exception("REPEAT_EXHAUST is not allowed for parallel tasks");
125 * @return Returns the parent.
127 public TaskSequence getParent() {
133 * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#doLogic()
136 public int doLogic() throws Exception {
137 exhausted = resetExhausted = false;
138 return ( parallel ? doParallelTasks() : doSerialTasks());
141 private static class RunBackgroundTask extends Thread {
142 private final PerfTask task;
143 private final boolean letChildReport;
144 private volatile int count;
146 public RunBackgroundTask(PerfTask task, boolean letChildReport) {
148 this.letChildReport = letChildReport;
151 public void stopNow() throws InterruptedException {
155 public int getCount() {
162 count = task.runAndMaybeStats(letChildReport);
163 } catch (Exception e) {
164 throw new RuntimeException(e);
169 private int doSerialTasks() throws Exception {
171 return doSerialTasksWithRate();
177 final long runTime = (long) (runTimeSec*1000);
178 List<RunBackgroundTask> bgTasks = null;
180 final long t0 = System.currentTimeMillis();
181 for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
185 for(int l=0;l<tasksArray.length;l++) {
186 final PerfTask task = tasksArray[l];
187 if (task.getRunInBackground()) {
188 if (bgTasks == null) {
189 bgTasks = new ArrayList<RunBackgroundTask>();
191 RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
192 bgTask.setPriority(task.getBackgroundDeltaPriority() + Thread.currentThread().getPriority());
197 final int inc = task.runAndMaybeStats(letChildReport);
199 if (countsByTime != null) {
200 final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
201 if (slot >= countsByTime.length) {
202 countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
204 countsByTime[slot] += inc;
206 if (anyExhaustibleTasks)
207 updateExhausted(task);
208 } catch (NoMoreDataException e) {
213 if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
219 if (bgTasks != null) {
220 for(RunBackgroundTask bgTask : bgTasks) {
223 for(RunBackgroundTask bgTask : bgTasks) {
225 count += bgTask.getCount();
229 if (countsByTime != null) {
230 getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec);
238 private int doSerialTasksWithRate() throws Exception {
240 long delayStep = (perMin ? 60000 : 1000) /rate;
241 long nextStartTime = System.currentTimeMillis();
243 final long t0 = System.currentTimeMillis();
244 for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
248 for (int l=0;l<tasksArray.length;l++) {
249 final PerfTask task = tasksArray[l];
251 long waitMore = nextStartTime - System.currentTimeMillis();
253 // TODO: better to use condition to notify
262 nextStartTime += delayStep; // this aims at avarage rate.
264 final int inc = task.runAndMaybeStats(letChildReport);
266 if (countsByTime != null) {
267 final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
268 if (slot >= countsByTime.length) {
269 countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
271 countsByTime[slot] += inc;
274 if (anyExhaustibleTasks)
275 updateExhausted(task);
276 } catch (NoMoreDataException e) {
285 // update state regarding exhaustion.
286 private void updateExhausted(PerfTask task) {
287 if (task instanceof ResetInputsTask) {
289 resetExhausted = true;
290 } else if (task instanceof TaskSequence) {
291 TaskSequence t = (TaskSequence) task;
292 if (t.resetExhausted) {
294 resetExhausted = true;
295 t.resetExhausted = false;
297 exhausted |= t.exhausted;
302 private class ParallelTask extends Thread {
305 public final PerfTask task;
307 public ParallelTask(PerfTask task) {
311 @SuppressWarnings("synthetic-access")
315 int n = task.runAndMaybeStats(letChildReport);
316 if (anyExhaustibleTasks) {
317 updateExhausted(task);
320 } catch (NoMoreDataException e) {
322 } catch (Exception e) {
323 throw new RuntimeException(e);
329 public void stopNow() {
331 // Forwards top request to children
332 if (runningParallelTasks != null) {
333 for(ParallelTask t : runningParallelTasks) {
339 ParallelTask[] runningParallelTasks;
341 private int doParallelTasks() throws Exception {
343 final TaskStats stats = getRunData().getPoints().getCurrentStats();
346 ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
349 for (int k=0; k<repetitions; k++) {
350 for (int i = 0; i < tasksArray.length; i++) {
351 final PerfTask task = (PerfTask) tasksArray[i].clone();
352 t[index++] = new ParallelTask(task);
358 // wait for all threads to complete
360 for (int i = 0; i < t.length; i++) {
363 if (t[i].task instanceof TaskSequence) {
364 TaskSequence sub = (TaskSequence) t[i].task;
365 if (sub.countsByTime != null) {
366 if (countsByTime == null) {
367 countsByTime = new int[sub.countsByTime.length];
368 } else if (countsByTime.length < sub.countsByTime.length) {
369 countsByTime = ArrayUtil.grow(countsByTime, sub.countsByTime.length);
371 for(int j=0;j<sub.countsByTime.length;j++) {
372 countsByTime[j] += sub.countsByTime[j];
378 if (countsByTime != null) {
379 stats.setCountsByTime(countsByTime, logByTimeMsec);
382 // return total count
387 private void startThreads(ParallelTask[] t) throws InterruptedException {
389 startlThreadsWithRate(t);
392 for (int i = 0; i < t.length; i++) {
397 // run threads with rate
398 private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
399 long delayStep = (perMin ? 60000 : 1000) /rate;
400 long nextStartTime = System.currentTimeMillis();
401 for (int i = 0; i < t.length; i++) {
402 long waitMore = nextStartTime - System.currentTimeMillis();
404 Thread.sleep(waitMore);
406 nextStartTime += delayStep; // this aims at average rate of starting threads.
411 public void addTask(PerfTask task) {
413 task.setDepth(getDepth()+1);
417 * @see java.lang.Object#toString()
420 public String toString() {
421 String padd = getPadding();
422 StringBuilder sb = new StringBuilder(super.toString());
423 sb.append(parallel ? " [" : " {");
425 for (final PerfTask task : tasks) {
426 sb.append(task.toString());
430 sb.append(!letChildReport ? ">" : (parallel ? "]" : "}"));
432 sb.append(" " + NumberFormat.getNumberInstance().format(runTimeSec) + "s");
433 } else if (repetitions>1) {
434 sb.append(" * " + repetitions);
435 } else if (repetitions==REPEAT_EXHAUST) {
436 sb.append(" * EXHAUST");
439 sb.append(", rate: " + rate+"/"+(perMin?"min":"sec"));
441 if (getRunInBackground()) {
443 int x = getBackgroundDeltaPriority();
448 return sb.toString();
452 * Execute child tasks in a way that they do not report their time separately.
454 public void setNoChildReport() {
455 letChildReport = false;
456 for (final PerfTask task : tasks) {
457 if (task instanceof TaskSequence) {
458 ((TaskSequence)task).setNoChildReport();
464 * Returns the rate per minute: how many operations should be performed in a minute.
465 * If 0 this has no effect.
466 * @return the rate per min: how many operations should be performed in a minute.
468 public int getRate() {
469 return (perMin ? rate : 60*rate);
473 * @param rate The rate to set.
475 public void setRate(int rate, boolean perMin) {
477 this.perMin = perMin;
481 private void setSequenceName() {
482 seqName = super.getName();
483 if (repetitions==REPEAT_EXHAUST) {
484 seqName += "_Exhaust";
485 } else if (repetitions>1) {
486 seqName += "_"+repetitions;
489 seqName += "_" + rate + (perMin?"/min":"/sec");
491 if (parallel && seqName.toLowerCase().indexOf("par")<0) {
497 public String getName() {
498 return seqName; // override to include more info
502 * @return Returns the tasks.
504 public ArrayList<PerfTask> getTasks() {
509 * @see java.lang.Object#clone()
512 protected Object clone() throws CloneNotSupportedException {
513 TaskSequence res = (TaskSequence) super.clone();
514 res.tasks = new ArrayList<PerfTask>();
515 for (int i = 0; i < tasks.size(); i++) {
516 res.tasks.add((PerfTask)tasks.get(i).clone());
522 * Return true if can be collapsed in case it is outermost sequence
524 public boolean isCollapsable() {