X-Git-Url: https://git.mdrn.pl/pylucene.git/blobdiff_plain/a2e61f0c04805cfcb8706176758d1283c7e3a55c..aaeed5504b982cf3545252ab528713250aa33eed:/lucene-java-3.5.0/lucene/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java?ds=sidebyside diff --git a/lucene-java-3.5.0/lucene/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene-java-3.5.0/lucene/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java new file mode 100644 index 0000000..98c5fb3 --- /dev/null +++ b/lucene-java-3.5.0/lucene/contrib/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -0,0 +1,528 @@ +package org.apache.lucene.benchmark.byTask.tasks; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.List; +import java.text.NumberFormat; + +import org.apache.lucene.benchmark.byTask.PerfRunData; +import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException; +import org.apache.lucene.benchmark.byTask.stats.TaskStats; +import org.apache.lucene.util.ArrayUtil; + +/** + * Sequence of parallel or sequential tasks. + */ +public class TaskSequence extends PerfTask { + public static int REPEAT_EXHAUST = -2; + private ArrayList tasks; + private int repetitions = 1; + private boolean parallel; + private TaskSequence parent; + private boolean letChildReport = true; + private int rate = 0; + private boolean perMin = false; // rate, if set, is, by default, be sec. + private String seqName; + private boolean exhausted = false; + private boolean resetExhausted = false; + private PerfTask[] tasksArray; + private boolean anyExhaustibleTasks; + private boolean collapsable = false; // to not collapse external sequence named in alg. + + private boolean fixedTime; // true if we run for fixed time + private double runTimeSec; // how long to run for + private final long logByTimeMsec; + + public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) { + super(runData); + collapsable = (name == null); + name = (name!=null ? name : (parallel ? "Par" : "Seq")); + setName(name); + setSequenceName(); + this.parent = parent; + this.parallel = parallel; + tasks = new ArrayList(); + logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0); + } + + @Override + public void close() throws Exception { + initTasksArray(); + for(int i=0;i 0) { + return doSerialTasksWithRate(); + } + + initTasksArray(); + int count = 0; + + final long runTime = (long) (runTimeSec*1000); + List bgTasks = null; + + final long t0 = System.currentTimeMillis(); + for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k(); + } + RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport); + bgTask.setPriority(task.getBackgroundDeltaPriority() + Thread.currentThread().getPriority()); + bgTask.start(); + bgTasks.add(bgTask); + } else { + try { + final int inc = task.runAndMaybeStats(letChildReport); + count += inc; + if (countsByTime != null) { + final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec); + if (slot >= countsByTime.length) { + countsByTime = ArrayUtil.grow(countsByTime, 1+slot); + } + countsByTime[slot] += inc; + } + if (anyExhaustibleTasks) + updateExhausted(task); + } catch (NoMoreDataException e) { + exhausted = true; + } + } + } + if (fixedTime && System.currentTimeMillis()-t0 > runTime) { + repetitions = k+1; + break; + } + } + + if (bgTasks != null) { + for(RunBackgroundTask bgTask : bgTasks) { + bgTask.stopNow(); + } + for(RunBackgroundTask bgTask : bgTasks) { + bgTask.join(); + count += bgTask.getCount(); + } + } + + if (countsByTime != null) { + getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec); + } + + stopNow = false; + + return count; + } + + private int doSerialTasksWithRate() throws Exception { + initTasksArray(); + long delayStep = (perMin ? 60000 : 1000) /rate; + long nextStartTime = System.currentTimeMillis(); + int count = 0; + final long t0 = System.currentTimeMillis(); + for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k 0) { + // TODO: better to use condition to notify + Thread.sleep(1); + } else { + break; + } + } + if (stopNow) { + break; + } + nextStartTime += delayStep; // this aims at avarage rate. + try { + final int inc = task.runAndMaybeStats(letChildReport); + count += inc; + if (countsByTime != null) { + final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec); + if (slot >= countsByTime.length) { + countsByTime = ArrayUtil.grow(countsByTime, 1+slot); + } + countsByTime[slot] += inc; + } + + if (anyExhaustibleTasks) + updateExhausted(task); + } catch (NoMoreDataException e) { + exhausted = true; + } + } + } + stopNow = false; + return count; + } + + // update state regarding exhaustion. + private void updateExhausted(PerfTask task) { + if (task instanceof ResetInputsTask) { + exhausted = false; + resetExhausted = true; + } else if (task instanceof TaskSequence) { + TaskSequence t = (TaskSequence) task; + if (t.resetExhausted) { + exhausted = false; + resetExhausted = true; + t.resetExhausted = false; + } else { + exhausted |= t.exhausted; + } + } + } + + private class ParallelTask extends Thread { + + public int count; + public final PerfTask task; + + public ParallelTask(PerfTask task) { + this.task = task; + } + + @SuppressWarnings("synthetic-access") + @Override + public void run() { + try { + int n = task.runAndMaybeStats(letChildReport); + if (anyExhaustibleTasks) { + updateExhausted(task); + } + count += n; + } catch (NoMoreDataException e) { + exhausted = true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void stopNow() { + super.stopNow(); + // Forwards top request to children + if (runningParallelTasks != null) { + for(ParallelTask t : runningParallelTasks) { + t.task.stopNow(); + } + } + } + + ParallelTask[] runningParallelTasks; + + private int doParallelTasks() throws Exception { + + final TaskStats stats = getRunData().getPoints().getCurrentStats(); + + initTasksArray(); + ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + // prepare threads + int index = 0; + for (int k=0; k 0) { + startlThreadsWithRate(t); + return; + } + for (int i = 0; i < t.length; i++) { + t[i].start(); + } + } + + // run threads with rate + private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException { + long delayStep = (perMin ? 60000 : 1000) /rate; + long nextStartTime = System.currentTimeMillis(); + for (int i = 0; i < t.length; i++) { + long waitMore = nextStartTime - System.currentTimeMillis(); + if (waitMore > 0) { + Thread.sleep(waitMore); + } + nextStartTime += delayStep; // this aims at average rate of starting threads. + t[i].start(); + } + } + + public void addTask(PerfTask task) { + tasks.add(task); + task.setDepth(getDepth()+1); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + String padd = getPadding(); + StringBuilder sb = new StringBuilder(super.toString()); + sb.append(parallel ? " [" : " {"); + sb.append(NEW_LINE); + for (final PerfTask task : tasks) { + sb.append(task.toString()); + sb.append(NEW_LINE); + } + sb.append(padd); + sb.append(!letChildReport ? ">" : (parallel ? "]" : "}")); + if (fixedTime) { + sb.append(" " + NumberFormat.getNumberInstance().format(runTimeSec) + "s"); + } else if (repetitions>1) { + sb.append(" * " + repetitions); + } else if (repetitions==REPEAT_EXHAUST) { + sb.append(" * EXHAUST"); + } + if (rate>0) { + sb.append(", rate: " + rate+"/"+(perMin?"min":"sec")); + } + if (getRunInBackground()) { + sb.append(" &"); + int x = getBackgroundDeltaPriority(); + if (x != 0) { + sb.append(x); + } + } + return sb.toString(); + } + + /** + * Execute child tasks in a way that they do not report their time separately. + */ + public void setNoChildReport() { + letChildReport = false; + for (final PerfTask task : tasks) { + if (task instanceof TaskSequence) { + ((TaskSequence)task).setNoChildReport(); + } + } + } + + /** + * Returns the rate per minute: how many operations should be performed in a minute. + * If 0 this has no effect. + * @return the rate per min: how many operations should be performed in a minute. + */ + public int getRate() { + return (perMin ? rate : 60*rate); + } + + /** + * @param rate The rate to set. + */ + public void setRate(int rate, boolean perMin) { + this.rate = rate; + this.perMin = perMin; + setSequenceName(); + } + + private void setSequenceName() { + seqName = super.getName(); + if (repetitions==REPEAT_EXHAUST) { + seqName += "_Exhaust"; + } else if (repetitions>1) { + seqName += "_"+repetitions; + } + if (rate>0) { + seqName += "_" + rate + (perMin?"/min":"/sec"); + } + if (parallel && seqName.toLowerCase().indexOf("par")<0) { + seqName += "_Par"; + } + } + + @Override + public String getName() { + return seqName; // override to include more info + } + + /** + * @return Returns the tasks. + */ + public ArrayList getTasks() { + return tasks; + } + + /* (non-Javadoc) + * @see java.lang.Object#clone() + */ + @Override + protected Object clone() throws CloneNotSupportedException { + TaskSequence res = (TaskSequence) super.clone(); + res.tasks = new ArrayList(); + for (int i = 0; i < tasks.size(); i++) { + res.tasks.add((PerfTask)tasks.get(i).clone()); + } + return res; + } + + /** + * Return true if can be collapsed in case it is outermost sequence + */ + public boolean isCollapsable() { + return collapsable; + } + +}