1 package org.apache.lucene.util;
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with
6 * this work for additional information regarding copyright ownership.
7 * The ASF licenses this file to You under the Apache License, Version 2.0
8 * (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 import java.io.IOException;
21 import org.apache.lucene.store.DataInput;
22 import org.apache.lucene.store.IndexOutput;
24 public class ThrottledIndexOutput extends IndexOutput {
25 public static final int DEFAULT_MIN_WRITTEN_BYTES = 1024;
26 private final int bytesPerSecond;
27 private IndexOutput delegate;
28 private long flushDelayMillis;
29 private long closeDelayMillis;
30 private long seekDelayMillis;
31 private long pendingBytes;
32 private long minBytesWritten;
33 private long timeElapsed;
34 private final byte[] bytes = new byte[1];
36 public ThrottledIndexOutput newFromDelegate(IndexOutput output) {
37 return new ThrottledIndexOutput(bytesPerSecond, flushDelayMillis,
38 closeDelayMillis, seekDelayMillis, minBytesWritten, output);
41 public ThrottledIndexOutput(int bytesPerSecond, long delayInMillis,
42 IndexOutput delegate) {
43 this(bytesPerSecond, delayInMillis, delayInMillis, delayInMillis,
44 DEFAULT_MIN_WRITTEN_BYTES, delegate);
47 public ThrottledIndexOutput(int bytesPerSecond, long delays,
48 int minBytesWritten, IndexOutput delegate) {
49 this(bytesPerSecond, delays, delays, delays, minBytesWritten, delegate);
52 public static final int mBitsToBytes(int mbits) {
53 return mbits * 125000;
56 public ThrottledIndexOutput(int bytesPerSecond, long flushDelayMillis,
57 long closeDelayMillis, long seekDelayMillis, long minBytesWritten,
58 IndexOutput delegate) {
59 assert bytesPerSecond > 0;
60 this.delegate = delegate;
61 this.bytesPerSecond = bytesPerSecond;
62 this.flushDelayMillis = flushDelayMillis;
63 this.closeDelayMillis = closeDelayMillis;
64 this.seekDelayMillis = seekDelayMillis;
65 this.minBytesWritten = minBytesWritten;
69 public void flush() throws IOException {
70 sleep(flushDelayMillis);
75 public void close() throws IOException {
77 sleep(closeDelayMillis + getDelay(true));
84 public long getFilePointer() {
85 return delegate.getFilePointer();
89 public void seek(long pos) throws IOException {
90 sleep(seekDelayMillis);
95 public long length() throws IOException {
96 return delegate.length();
100 public void writeByte(byte b) throws IOException {
102 writeBytes(bytes, 0, 1);
106 public void writeBytes(byte[] b, int offset, int length) throws IOException {
107 final long before = System.nanoTime();
108 delegate.writeBytes(b, offset, length);
109 timeElapsed += System.nanoTime() - before;
110 pendingBytes += length;
111 sleep(getDelay(false));
115 protected long getDelay(boolean closing) {
116 if (pendingBytes > 0 && (closing || pendingBytes > minBytesWritten)) {
117 long actualBps = (timeElapsed / pendingBytes) * 1000000000l; // nano to sec
118 if (actualBps > bytesPerSecond) {
119 long expected = (pendingBytes * 1000l / bytesPerSecond) ;
120 final long delay = expected - (timeElapsed / 1000000l) ;
130 private static final void sleep(long ms) {
135 } catch (InterruptedException e) {
136 throw new ThreadInterruptedException(e);
141 public void setLength(long length) throws IOException {
142 delegate.setLength(length);
146 public void copyBytes(DataInput input, long numBytes) throws IOException {
147 delegate.copyBytes(input, numBytes);