Mypal/devtools/shared/webconsole/throttle.js

418 lines
14 KiB
JavaScript

/* -*- indent-tabs-mode: nil; js-indent-level: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
const {CC, Ci, Cu, Cc} = require("chrome");
const ArrayBufferInputStream = CC("@mozilla.org/io/arraybuffer-input-stream;1",
"nsIArrayBufferInputStream");
const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
"nsIBinaryInputStream", "setInputStream");
loader.lazyServiceGetter(this, "gActivityDistributor",
"@mozilla.org/network/http-activity-distributor;1",
"nsIHttpActivityDistributor");
const {XPCOMUtils} = require("resource://gre/modules/XPCOMUtils.jsm");
const {setTimeout} = Cu.import("resource://gre/modules/Timer.jsm", {});
/**
* Construct a new nsIStreamListener that buffers data and provides a
* method to notify another listener when data is available. This is
* used to throttle network data on a per-channel basis.
*
* After construction, @see setOriginalListener must be called on the
* new object.
*
* @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
* which status changes should be reported
*/
function NetworkThrottleListener(queue) {
this.queue = queue;
this.pendingData = [];
this.pendingException = null;
this.offset = 0;
this.responseStarted = false;
this.activities = {};
}
NetworkThrottleListener.prototype = {
QueryInterface:
XPCOMUtils.generateQI([Ci.nsIStreamListener, Ci.nsIInterfaceRequestor,
Ci.nsISupports]),
/**
* Set the original listener for this object. The original listener
* will receive requests from this object when the queue allows data
* through.
*
* @param {nsIStreamListener} originalListener the original listener
* for the channel, to which all requests will be sent
*/
setOriginalListener: function (originalListener) {
this.originalListener = originalListener;
},
/**
* @see nsIStreamListener.onStartRequest.
*/
onStartRequest: function (request, context) {
this.originalListener.onStartRequest(request, context);
this.queue.start(this);
},
/**
* @see nsIStreamListener.onStopRequest.
*/
onStopRequest: function (request, context, statusCode) {
this.pendingData.push({request, context, statusCode});
this.queue.dataAvailable(this);
},
/**
* @see nsIStreamListener.onDataAvailable.
*/
onDataAvailable: function (request, context, inputStream, offset, count) {
if (this.pendingException) {
throw this.pendingException;
}
const bin = new BinaryInputStream(inputStream);
const bytes = new ArrayBuffer(count);
bin.readArrayBuffer(count, bytes);
const stream = new ArrayBufferInputStream();
stream.setData(bytes, 0, count);
this.pendingData.push({request, context, stream, count});
this.queue.dataAvailable(this);
},
/**
* Allow some buffered data from this object to be forwarded to this
* object's originalListener.
*
* @param {Number} bytesPermitted The maximum number of bytes
* permitted to be sent.
* @return {Object} an object of the form {length, done}, where
* |length| is the number of bytes actually forwarded, and
* |done| is a boolean indicating whether this particular
* request has been completed. (A NetworkThrottleListener
* may be queued multiple times, so this does not mean that
* all available data has been sent.)
*/
sendSomeData: function (bytesPermitted) {
if (this.pendingData.length === 0) {
// Shouldn't happen.
return {length: 0, done: true};
}
const {request, context, stream, count, statusCode} = this.pendingData[0];
if (statusCode !== undefined) {
this.pendingData.shift();
this.originalListener.onStopRequest(request, context, statusCode);
return {length: 0, done: true};
}
if (bytesPermitted > count) {
bytesPermitted = count;
}
try {
this.originalListener.onDataAvailable(request, context, stream,
this.offset, bytesPermitted);
} catch (e) {
this.pendingException = e;
}
let done = false;
if (bytesPermitted === count) {
this.pendingData.shift();
done = true;
} else {
this.pendingData[0].count -= bytesPermitted;
}
this.offset += bytesPermitted;
// Maybe our state has changed enough to emit an event.
this.maybeEmitEvents();
return {length: bytesPermitted, done};
},
/**
* Return the number of pending data requests available for this
* listener.
*/
pendingCount: function () {
return this.pendingData.length;
},
/**
* This is called when an http activity event is delivered. This
* object delays the event until the appropriate moment.
*/
addActivityCallback: function (callback, httpActivity, channel, activityType,
activitySubtype, timestamp, extraSizeData,
extraStringData) {
let datum = {callback, httpActivity, channel, activityType,
activitySubtype, extraSizeData,
extraStringData};
this.activities[activitySubtype] = datum;
if (activitySubtype ===
gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE) {
this.totalSize = extraSizeData;
}
this.maybeEmitEvents();
},
/**
* This is called for a download throttler when the latency timeout
* has ended.
*/
responseStart: function () {
this.responseStarted = true;
this.maybeEmitEvents();
},
/**
* Check our internal state and emit any http activity events as
* needed. Note that we wait until both our internal state has
* changed and we've received the real http activity event from
* platform. This approach ensures we can both pass on the correct
* data from the original event, and update the reported time to be
* consistent with the delay we're introducing.
*/
maybeEmitEvents: function () {
if (this.responseStarted) {
this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START);
this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER);
}
if (this.totalSize !== undefined && this.offset >= this.totalSize) {
this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE);
this.maybeEmit(gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE);
}
},
/**
* Emit an event for |code|, if the appropriate entry in
* |activities| is defined.
*/
maybeEmit: function (code) {
if (this.activities[code] !== undefined) {
let {callback, httpActivity, channel, activityType,
activitySubtype, extraSizeData,
extraStringData} = this.activities[code];
let now = Date.now() * 1000;
callback(httpActivity, channel, activityType, activitySubtype,
now, extraSizeData, extraStringData);
this.activities[code] = undefined;
}
},
};
/**
* Construct a new queue that can be used to throttle the network for
* a group of related network requests.
*
* meanBPS {Number} Mean bytes per second.
* maxBPS {Number} Maximum bytes per second.
* roundTripTimeMean {Number} Mean round trip time in milliseconds.
* roundTripTimeMax {Number} Maximum round trip time in milliseconds.
*/
function NetworkThrottleQueue(meanBPS, maxBPS,
roundTripTimeMean, roundTripTimeMax) {
this.meanBPS = meanBPS;
this.maxBPS = maxBPS;
this.roundTripTimeMean = roundTripTimeMean;
this.roundTripTimeMax = roundTripTimeMax;
this.pendingRequests = new Set();
this.downloadQueue = [];
this.previousReads = [];
this.pumping = false;
}
NetworkThrottleQueue.prototype = {
/**
* A helper function that, given a mean and a maximum, returns a
* random integer between (mean - (max - mean)) and max.
*/
random: function (mean, max) {
return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
},
/**
* A helper function that lets the indicating listener start sending
* data. This is called after the initial round trip time for the
* listener has elapsed.
*/
allowDataFrom: function (throttleListener) {
throttleListener.responseStart();
this.pendingRequests.delete(throttleListener);
const count = throttleListener.pendingCount();
for (let i = 0; i < count; ++i) {
this.downloadQueue.push(throttleListener);
}
this.pump();
},
/**
* Notice a new listener object. This is called by the
* NetworkThrottleListener when the request has started. Initially
* a new listener object is put into a "pending" state, until the
* round-trip time has elapsed. This is used to simulate latency.
*
* @param {NetworkThrottleListener} throttleListener the new listener
*/
start: function (throttleListener) {
this.pendingRequests.add(throttleListener);
let delay = this.random(this.roundTripTimeMean, this.roundTripTimeMax);
if (delay > 0) {
setTimeout(() => this.allowDataFrom(throttleListener), delay);
} else {
this.allowDataFrom(throttleListener);
}
},
/**
* Note that new data is available for a given listener. Each time
* data is available, the listener will be re-queued.
*
* @param {NetworkThrottleListener} throttleListener the listener
* which has data available.
*/
dataAvailable: function (throttleListener) {
if (!this.pendingRequests.has(throttleListener)) {
this.downloadQueue.push(throttleListener);
this.pump();
}
},
/**
* An internal function that permits individual listeners to send
* data.
*/
pump: function () {
// A redirect will cause two NetworkThrottleListeners to be on a
// listener chain. In this case, we might recursively call into
// this method. Avoid infinite recursion here.
if (this.pumping) {
return;
}
this.pumping = true;
const now = Date.now();
const oneSecondAgo = now - 1000;
while (this.previousReads.length &&
this.previousReads[0].when < oneSecondAgo) {
this.previousReads.shift();
}
const totalBytes = this.previousReads.reduce((sum, elt) => {
return sum + elt.numBytes;
}, 0);
let thisSliceBytes = this.random(this.meanBPS, this.maxBPS);
if (totalBytes < thisSliceBytes) {
thisSliceBytes -= totalBytes;
let readThisTime = 0;
while (thisSliceBytes > 0 && this.downloadQueue.length) {
let {length, done} = this.downloadQueue[0].sendSomeData(thisSliceBytes);
thisSliceBytes -= length;
readThisTime += length;
if (done) {
this.downloadQueue.shift();
}
}
this.previousReads.push({when: now, numBytes: readThisTime});
}
// If there is more data to download, then schedule ourselves for
// one second after the oldest previous read.
if (this.downloadQueue.length) {
const when = this.previousReads[0].when + 1000;
setTimeout(this.pump.bind(this), when - now);
}
this.pumping = false;
},
};
/**
* Construct a new object that can be used to throttle the network for
* a group of related network requests.
*
* @param {Object} An object with the following attributes:
* roundTripTimeMean {Number} Mean round trip time in milliseconds.
* roundTripTimeMax {Number} Maximum round trip time in milliseconds.
* downloadBPSMean {Number} Mean bytes per second for downloads.
* downloadBPSMax {Number} Maximum bytes per second for downloads.
* uploadBPSMean {Number} Mean bytes per second for uploads.
* uploadBPSMax {Number} Maximum bytes per second for uploads.
*
* Download throttling will not be done if downloadBPSMean and
* downloadBPSMax are <= 0. Upload throttling will not be done if
* uploadBPSMean and uploadBPSMax are <= 0.
*/
function NetworkThrottleManager({roundTripTimeMean, roundTripTimeMax,
downloadBPSMean, downloadBPSMax,
uploadBPSMean, uploadBPSMax}) {
if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
this.downloadQueue = null;
} else {
this.downloadQueue =
new NetworkThrottleQueue(downloadBPSMean, downloadBPSMax,
roundTripTimeMean, roundTripTimeMax);
}
if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
this.uploadQueue = null;
} else {
this.uploadQueue = Cc["@mozilla.org/network/throttlequeue;1"]
.createInstance(Ci.nsIInputChannelThrottleQueue);
this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
}
}
exports.NetworkThrottleManager = NetworkThrottleManager;
NetworkThrottleManager.prototype = {
/**
* Create a new NetworkThrottleListener for a given channel and
* install it using |setNewListener|.
*
* @param {nsITraceableChannel} channel the channel to manage
* @return {NetworkThrottleListener} the new listener, or null if
* download throttling is not being done.
*/
manage: function (channel) {
if (this.downloadQueue) {
let listener = new NetworkThrottleListener(this.downloadQueue);
let originalListener = channel.setNewListener(listener);
listener.setOriginalListener(originalListener);
return listener;
}
return null;
},
/**
* Throttle uploads taking place on the given channel.
*
* @param {nsITraceableChannel} channel the channel to manage
*/
manageUpload: function (channel) {
if (this.uploadQueue) {
channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
channel.throttleQueue = this.uploadQueue;
}
},
};