--- /dev/null
+package org.apache.lucene.benchmark.byTask.tasks;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.text.NumberFormat;
+
+import org.apache.lucene.benchmark.byTask.PerfRunData;
+import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
+import org.apache.lucene.benchmark.byTask.stats.TaskStats;
+import org.apache.lucene.util.ArrayUtil;
+
+/**
+ * Sequence of parallel or sequential tasks.
+ */
+public class TaskSequence extends PerfTask {
+ public static int REPEAT_EXHAUST = -2;
+ private ArrayList<PerfTask> tasks;
+ private int repetitions = 1;
+ private boolean parallel;
+ private TaskSequence parent;
+ private boolean letChildReport = true;
+ private int rate = 0;
+ private boolean perMin = false; // rate, if set, is, by default, be sec.
+ private String seqName;
+ private boolean exhausted = false;
+ private boolean resetExhausted = false;
+ private PerfTask[] tasksArray;
+ private boolean anyExhaustibleTasks;
+ private boolean collapsable = false; // to not collapse external sequence named in alg.
+
+ private boolean fixedTime; // true if we run for fixed time
+ private double runTimeSec; // how long to run for
+ private final long logByTimeMsec;
+
+ public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) {
+ super(runData);
+ collapsable = (name == null);
+ name = (name!=null ? name : (parallel ? "Par" : "Seq"));
+ setName(name);
+ setSequenceName();
+ this.parent = parent;
+ this.parallel = parallel;
+ tasks = new ArrayList<PerfTask>();
+ logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0);
+ }
+
+ @Override
+ public void close() throws Exception {
+ initTasksArray();
+ for(int i=0;i<tasksArray.length;i++) {
+ tasksArray[i].close();
+ }
+ getRunData().getDocMaker().close();
+ }
+
+ private void initTasksArray() {
+ if (tasksArray == null) {
+ final int numTasks = tasks.size();
+ tasksArray = new PerfTask[numTasks];
+ for(int k=0;k<numTasks;k++) {
+ tasksArray[k] = tasks.get(k);
+ anyExhaustibleTasks |= tasksArray[k] instanceof ResetInputsTask;
+ anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
+ }
+ }
+ if (!parallel && logByTimeMsec != 0 && !letChildReport) {
+ countsByTime = new int[1];
+ }
+ }
+
+ /**
+ * @return Returns the parallel.
+ */
+ public boolean isParallel() {
+ return parallel;
+ }
+
+ /**
+ * @return Returns the repetitions.
+ */
+ public int getRepetitions() {
+ return repetitions;
+ }
+
+ private int[] countsByTime;
+
+ public void setRunTime(double sec) throws Exception {
+ runTimeSec = sec;
+ fixedTime = true;
+ }
+
+ /**
+ * @param repetitions The repetitions to set.
+ * @throws Exception
+ */
+ public void setRepetitions(int repetitions) throws Exception {
+ fixedTime = false;
+ this.repetitions = repetitions;
+ if (repetitions==REPEAT_EXHAUST) {
+ if (isParallel()) {
+ throw new Exception("REPEAT_EXHAUST is not allowed for parallel tasks");
+ }
+ }
+ setSequenceName();
+ }
+
+ /**
+ * @return Returns the parent.
+ */
+ public TaskSequence getParent() {
+ return parent;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#doLogic()
+ */
+ @Override
+ public int doLogic() throws Exception {
+ exhausted = resetExhausted = false;
+ return ( parallel ? doParallelTasks() : doSerialTasks());
+ }
+
+ private static class RunBackgroundTask extends Thread {
+ private final PerfTask task;
+ private final boolean letChildReport;
+ private volatile int count;
+
+ public RunBackgroundTask(PerfTask task, boolean letChildReport) {
+ this.task = task;
+ this.letChildReport = letChildReport;
+ }
+
+ public void stopNow() throws InterruptedException {
+ task.stopNow();
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void run() {
+ try {
+ count = task.runAndMaybeStats(letChildReport);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private int doSerialTasks() throws Exception {
+ if (rate > 0) {
+ return doSerialTasksWithRate();
+ }
+
+ initTasksArray();
+ int count = 0;
+
+ final long runTime = (long) (runTimeSec*1000);
+ List<RunBackgroundTask> bgTasks = null;
+
+ final long t0 = System.currentTimeMillis();
+ for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
+ if (stopNow) {
+ break;
+ }
+ for(int l=0;l<tasksArray.length;l++) {
+ final PerfTask task = tasksArray[l];
+ if (task.getRunInBackground()) {
+ if (bgTasks == null) {
+ bgTasks = new ArrayList<RunBackgroundTask>();
+ }
+ RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
+ bgTask.setPriority(task.getBackgroundDeltaPriority() + Thread.currentThread().getPriority());
+ bgTask.start();
+ bgTasks.add(bgTask);
+ } else {
+ try {
+ final int inc = task.runAndMaybeStats(letChildReport);
+ count += inc;
+ if (countsByTime != null) {
+ final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
+ if (slot >= countsByTime.length) {
+ countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
+ }
+ countsByTime[slot] += inc;
+ }
+ if (anyExhaustibleTasks)
+ updateExhausted(task);
+ } catch (NoMoreDataException e) {
+ exhausted = true;
+ }
+ }
+ }
+ if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
+ repetitions = k+1;
+ break;
+ }
+ }
+
+ if (bgTasks != null) {
+ for(RunBackgroundTask bgTask : bgTasks) {
+ bgTask.stopNow();
+ }
+ for(RunBackgroundTask bgTask : bgTasks) {
+ bgTask.join();
+ count += bgTask.getCount();
+ }
+ }
+
+ if (countsByTime != null) {
+ getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec);
+ }
+
+ stopNow = false;
+
+ return count;
+ }
+
+ private int doSerialTasksWithRate() throws Exception {
+ initTasksArray();
+ long delayStep = (perMin ? 60000 : 1000) /rate;
+ long nextStartTime = System.currentTimeMillis();
+ int count = 0;
+ final long t0 = System.currentTimeMillis();
+ for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
+ if (stopNow) {
+ break;
+ }
+ for (int l=0;l<tasksArray.length;l++) {
+ final PerfTask task = tasksArray[l];
+ while(!stopNow) {
+ long waitMore = nextStartTime - System.currentTimeMillis();
+ if (waitMore > 0) {
+ // TODO: better to use condition to notify
+ Thread.sleep(1);
+ } else {
+ break;
+ }
+ }
+ if (stopNow) {
+ break;
+ }
+ nextStartTime += delayStep; // this aims at avarage rate.
+ try {
+ final int inc = task.runAndMaybeStats(letChildReport);
+ count += inc;
+ if (countsByTime != null) {
+ final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
+ if (slot >= countsByTime.length) {
+ countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
+ }
+ countsByTime[slot] += inc;
+ }
+
+ if (anyExhaustibleTasks)
+ updateExhausted(task);
+ } catch (NoMoreDataException e) {
+ exhausted = true;
+ }
+ }
+ }
+ stopNow = false;
+ return count;
+ }
+
+ // update state regarding exhaustion.
+ private void updateExhausted(PerfTask task) {
+ if (task instanceof ResetInputsTask) {
+ exhausted = false;
+ resetExhausted = true;
+ } else if (task instanceof TaskSequence) {
+ TaskSequence t = (TaskSequence) task;
+ if (t.resetExhausted) {
+ exhausted = false;
+ resetExhausted = true;
+ t.resetExhausted = false;
+ } else {
+ exhausted |= t.exhausted;
+ }
+ }
+ }
+
+ private class ParallelTask extends Thread {
+
+ public int count;
+ public final PerfTask task;
+
+ public ParallelTask(PerfTask task) {
+ this.task = task;
+ }
+
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void run() {
+ try {
+ int n = task.runAndMaybeStats(letChildReport);
+ if (anyExhaustibleTasks) {
+ updateExhausted(task);
+ }
+ count += n;
+ } catch (NoMoreDataException e) {
+ exhausted = true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void stopNow() {
+ super.stopNow();
+ // Forwards top request to children
+ if (runningParallelTasks != null) {
+ for(ParallelTask t : runningParallelTasks) {
+ t.task.stopNow();
+ }
+ }
+ }
+
+ ParallelTask[] runningParallelTasks;
+
+ private int doParallelTasks() throws Exception {
+
+ final TaskStats stats = getRunData().getPoints().getCurrentStats();
+
+ initTasksArray();
+ ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
+ // prepare threads
+ int index = 0;
+ for (int k=0; k<repetitions; k++) {
+ for (int i = 0; i < tasksArray.length; i++) {
+ final PerfTask task = (PerfTask) tasksArray[i].clone();
+ t[index++] = new ParallelTask(task);
+ }
+ }
+ // run threads
+ startThreads(t);
+
+ // wait for all threads to complete
+ int count = 0;
+ for (int i = 0; i < t.length; i++) {
+ t[i].join();
+ count += t[i].count;
+ if (t[i].task instanceof TaskSequence) {
+ TaskSequence sub = (TaskSequence) t[i].task;
+ if (sub.countsByTime != null) {
+ if (countsByTime == null) {
+ countsByTime = new int[sub.countsByTime.length];
+ } else if (countsByTime.length < sub.countsByTime.length) {
+ countsByTime = ArrayUtil.grow(countsByTime, sub.countsByTime.length);
+ }
+ for(int j=0;j<sub.countsByTime.length;j++) {
+ countsByTime[j] += sub.countsByTime[j];
+ }
+ }
+ }
+ }
+
+ if (countsByTime != null) {
+ stats.setCountsByTime(countsByTime, logByTimeMsec);
+ }
+
+ // return total count
+ return count;
+ }
+
+ // run threads
+ private void startThreads(ParallelTask[] t) throws InterruptedException {
+ if (rate > 0) {
+ startlThreadsWithRate(t);
+ return;
+ }
+ for (int i = 0; i < t.length; i++) {
+ t[i].start();
+ }
+ }
+
+ // run threads with rate
+ private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
+ long delayStep = (perMin ? 60000 : 1000) /rate;
+ long nextStartTime = System.currentTimeMillis();
+ for (int i = 0; i < t.length; i++) {
+ long waitMore = nextStartTime - System.currentTimeMillis();
+ if (waitMore > 0) {
+ Thread.sleep(waitMore);
+ }
+ nextStartTime += delayStep; // this aims at average rate of starting threads.
+ t[i].start();
+ }
+ }
+
+ public void addTask(PerfTask task) {
+ tasks.add(task);
+ task.setDepth(getDepth()+1);
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ String padd = getPadding();
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append(parallel ? " [" : " {");
+ sb.append(NEW_LINE);
+ for (final PerfTask task : tasks) {
+ sb.append(task.toString());
+ sb.append(NEW_LINE);
+ }
+ sb.append(padd);
+ sb.append(!letChildReport ? ">" : (parallel ? "]" : "}"));
+ if (fixedTime) {
+ sb.append(" " + NumberFormat.getNumberInstance().format(runTimeSec) + "s");
+ } else if (repetitions>1) {
+ sb.append(" * " + repetitions);
+ } else if (repetitions==REPEAT_EXHAUST) {
+ sb.append(" * EXHAUST");
+ }
+ if (rate>0) {
+ sb.append(", rate: " + rate+"/"+(perMin?"min":"sec"));
+ }
+ if (getRunInBackground()) {
+ sb.append(" &");
+ int x = getBackgroundDeltaPriority();
+ if (x != 0) {
+ sb.append(x);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Execute child tasks in a way that they do not report their time separately.
+ */
+ public void setNoChildReport() {
+ letChildReport = false;
+ for (final PerfTask task : tasks) {
+ if (task instanceof TaskSequence) {
+ ((TaskSequence)task).setNoChildReport();
+ }
+ }
+ }
+
+ /**
+ * Returns the rate per minute: how many operations should be performed in a minute.
+ * If 0 this has no effect.
+ * @return the rate per min: how many operations should be performed in a minute.
+ */
+ public int getRate() {
+ return (perMin ? rate : 60*rate);
+ }
+
+ /**
+ * @param rate The rate to set.
+ */
+ public void setRate(int rate, boolean perMin) {
+ this.rate = rate;
+ this.perMin = perMin;
+ setSequenceName();
+ }
+
+ private void setSequenceName() {
+ seqName = super.getName();
+ if (repetitions==REPEAT_EXHAUST) {
+ seqName += "_Exhaust";
+ } else if (repetitions>1) {
+ seqName += "_"+repetitions;
+ }
+ if (rate>0) {
+ seqName += "_" + rate + (perMin?"/min":"/sec");
+ }
+ if (parallel && seqName.toLowerCase().indexOf("par")<0) {
+ seqName += "_Par";
+ }
+ }
+
+ @Override
+ public String getName() {
+ return seqName; // override to include more info
+ }
+
+ /**
+ * @return Returns the tasks.
+ */
+ public ArrayList<PerfTask> getTasks() {
+ return tasks;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#clone()
+ */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ TaskSequence res = (TaskSequence) super.clone();
+ res.tasks = new ArrayList<PerfTask>();
+ for (int i = 0; i < tasks.size(); i++) {
+ res.tasks.add((PerfTask)tasks.get(i).clone());
+ }
+ return res;
+ }
+
+ /**
+ * Return true if can be collapsed in case it is outermost sequence
+ */
+ public boolean isCollapsable() {
+ return collapsable;
+ }
+
+}