Mypal/netwerk/base/ThrottleQueue.cpp

394 lines
8.9 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "ThrottleQueue.h"
#include "nsISeekableStream.h"
#include "nsIEventTarget.h"
#include "nsIAsyncInputStream.h"
#include "nsSocketTransportService2.h"
#include "nsStreamUtils.h"
#include "nsNetUtil.h"
namespace mozilla {
namespace net {
//-----------------------------------------------------------------------------
class ThrottleInputStream final
: public nsIAsyncInputStream
, public nsISeekableStream
{
public:
ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIINPUTSTREAM
NS_DECL_NSISEEKABLESTREAM
NS_DECL_NSIASYNCINPUTSTREAM
void AllowInput();
private:
~ThrottleInputStream();
nsCOMPtr<nsIInputStream> mStream;
RefPtr<ThrottleQueue> mQueue;
nsresult mClosedStatus;
nsCOMPtr<nsIInputStreamCallback> mCallback;
nsCOMPtr<nsIEventTarget> mEventTarget;
};
NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
: mStream(aStream)
, mQueue(aQueue)
, mClosedStatus(NS_OK)
{
MOZ_ASSERT(aQueue != nullptr);
}
ThrottleInputStream::~ThrottleInputStream()
{
Close();
}
NS_IMETHODIMP
ThrottleInputStream::Close()
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
if (mQueue) {
mQueue->DequeueStream(this);
mQueue = nullptr;
mClosedStatus = NS_BASE_STREAM_CLOSED;
}
return mStream->Close();
}
NS_IMETHODIMP
ThrottleInputStream::Available(uint64_t* aResult)
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
return mStream->Available(aResult);
}
NS_IMETHODIMP
ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
uint32_t realCount;
nsresult rv = mQueue->Available(aCount, &realCount);
if (NS_FAILED(rv)) {
return rv;
}
if (realCount == 0) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
rv = mStream->Read(aBuf, realCount, aResult);
if (NS_SUCCEEDED(rv) && *aResult > 0) {
mQueue->RecordRead(*aResult);
}
return rv;
}
NS_IMETHODIMP
ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
uint32_t aCount, uint32_t* aResult)
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
uint32_t realCount;
nsresult rv = mQueue->Available(aCount, &realCount);
if (NS_FAILED(rv)) {
return rv;
}
if (realCount == 0) {
return NS_BASE_STREAM_WOULD_BLOCK;
}
rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
if (NS_SUCCEEDED(rv) && *aResult > 0) {
mQueue->RecordRead(*aResult);
}
return rv;
}
NS_IMETHODIMP
ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
{
*aNonBlocking = true;
return NS_OK;
}
NS_IMETHODIMP
ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
if (!sstream) {
return NS_ERROR_FAILURE;
}
return sstream->Seek(aWhence, aOffset);
}
NS_IMETHODIMP
ThrottleInputStream::Tell(int64_t* aResult)
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
if (!sstream) {
return NS_ERROR_FAILURE;
}
return sstream->Tell(aResult);
}
NS_IMETHODIMP
ThrottleInputStream::SetEOF()
{
if (NS_FAILED(mClosedStatus)) {
return mClosedStatus;
}
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
if (!sstream) {
return NS_ERROR_FAILURE;
}
return sstream->SetEOF();
}
NS_IMETHODIMP
ThrottleInputStream::CloseWithStatus(nsresult aStatus)
{
if (NS_FAILED(mClosedStatus)) {
// Already closed, ignore.
return NS_OK;
}
if (NS_SUCCEEDED(aStatus)) {
aStatus = NS_BASE_STREAM_CLOSED;
}
mClosedStatus = Close();
if (NS_SUCCEEDED(mClosedStatus)) {
mClosedStatus = aStatus;
}
return NS_OK;
}
NS_IMETHODIMP
ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget *aEventTarget)
{
if (aFlags != 0) {
return NS_ERROR_ILLEGAL_VALUE;
}
mCallback = aCallback;
mEventTarget = aEventTarget;
if (mCallback) {
mQueue->QueueStream(this);
} else {
mQueue->DequeueStream(this);
}
return NS_OK;
}
void
ThrottleInputStream::AllowInput()
{
MOZ_ASSERT(mCallback);
nsCOMPtr<nsIInputStreamCallback> callbackEvent =
NS_NewInputStreamReadyEvent(mCallback, mEventTarget);
mCallback = nullptr;
mEventTarget = nullptr;
callbackEvent->OnInputStreamReady(this);
}
//-----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback)
ThrottleQueue::ThrottleQueue()
: mMeanBytesPerSecond(0)
, mMaxBytesPerSecond(0)
, mBytesProcessed(0)
, mTimerArmed(false)
{
nsresult rv;
nsCOMPtr<nsIEventTarget> sts;
nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
if (NS_SUCCEEDED(rv))
sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_SUCCEEDED(rv))
mTimer = do_CreateInstance("@mozilla.org/timer;1");
if (mTimer)
mTimer->SetTarget(sts);
}
ThrottleQueue::~ThrottleQueue()
{
if (mTimer && mTimerArmed) {
mTimer->Cancel();
}
mTimer = nullptr;
}
NS_IMETHODIMP
ThrottleQueue::RecordRead(uint32_t aBytesRead)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
ThrottleEntry entry;
entry.mTime = TimeStamp::Now();
entry.mBytesRead = aBytesRead;
mReadEvents.AppendElement(entry);
mBytesProcessed += aBytesRead;
return NS_OK;
}
NS_IMETHODIMP
ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
TimeStamp now = TimeStamp::Now();
TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
size_t i;
// Remove all stale events.
for (i = 0; i < mReadEvents.Length(); ++i) {
if (mReadEvents[i].mTime >= oneSecondAgo) {
break;
}
}
mReadEvents.RemoveElementsAt(0, i);
uint32_t totalBytes = 0;
for (i = 0; i < mReadEvents.Length(); ++i) {
totalBytes += mReadEvents[i].mBytesRead;
}
uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
double prob = static_cast<double>(rand()) / RAND_MAX;
uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
static_cast<uint32_t>(2 * spread * prob);
if (totalBytes >= thisSliceBytes) {
*aAvailable = 0;
} else {
*aAvailable = thisSliceBytes;
}
return NS_OK;
}
NS_IMETHODIMP
ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
{
// Can be called on any thread.
if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
return NS_ERROR_ILLEGAL_VALUE;
}
mMeanBytesPerSecond = aMeanBytesPerSecond;
mMaxBytesPerSecond = aMaxBytesPerSecond;
return NS_OK;
}
NS_IMETHODIMP
ThrottleQueue::BytesProcessed(uint64_t* aResult)
{
*aResult = mBytesProcessed;
return NS_OK;
}
NS_IMETHODIMP
ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
{
nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
result.forget(aResult);
return NS_OK;
}
NS_IMETHODIMP
ThrottleQueue::Notify(nsITimer* aTimer)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
// A notified reader may need to push itself back on the queue.
// Swap out the list of readers so that this works properly.
nsTArray<RefPtr<ThrottleInputStream>> events;
events.SwapElements(mAsyncEvents);
// Optimistically notify all the waiting readers, and then let them
// requeue if there isn't enough bandwidth.
for (size_t i = 0; i < events.Length(); ++i) {
events[i]->AllowInput();
}
mTimerArmed = false;
return NS_OK;
}
void
ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
mAsyncEvents.AppendElement(aStream);
if (!mTimerArmed) {
uint32_t ms = 1000;
if (mReadEvents.Length() > 0) {
TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
TimeStamp now = TimeStamp::Now();
if (t > now) {
ms = static_cast<uint32_t>((t - now).ToMilliseconds());
} else {
ms = 1;
}
}
if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
mTimerArmed = true;
}
}
}
}
void
ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
mAsyncEvents.RemoveElement(aStream);
}
}
}