Mypal/netwerk/socket/nsNamedPipeIOLayer.cpp

953 lines
26 KiB
C++

/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include <algorithm>
#include <utility>
#include "mozilla/Atomics.h"
#include "mozilla/DebugOnly.h"
#include "mozilla/Logging.h"
#include "mozilla/Move.h"
#include "mozilla/net/DNS.h"
#include "mozilla/RefPtr.h"
#include "mozilla/Unused.h"
#include "nsINamedPipeService.h"
#include "nsISupportsImpl.h"
#include "nsIThread.h"
#include "nsNamedPipeIOLayer.h"
#include "nsNetCID.h"
#include "nspr.h"
#include "nsServiceManagerUtils.h"
#include "nsSocketTransportService2.h"
#include "nsString.h"
#include "nsThreadUtils.h"
#include "private/pprio.h"
namespace mozilla {
namespace net {
static mozilla::LazyLogModule gNamedPipeLog("NamedPipeWin");
#define LOG_NPIO_DEBUG(...) MOZ_LOG(gNamedPipeLog, mozilla::LogLevel::Debug, \
(__VA_ARGS__))
#define LOG_NPIO_ERROR(...) MOZ_LOG(gNamedPipeLog, mozilla::LogLevel::Error, \
(__VA_ARGS__))
PRDescIdentity nsNamedPipeLayerIdentity;
static PRIOMethods nsNamedPipeLayerMethods;
class NamedPipeInfo final : public nsINamedPipeDataObserver
{
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSINAMEDPIPEDATAOBSERVER
explicit NamedPipeInfo();
nsresult Connect(const nsACString& aPath);
nsresult Disconnect();
/**
* Both blocking/non-blocking mode are supported in this class.
* The default mode is non-blocking mode, however, the client may change its
* mode to blocking mode during hand-shaking (e.g. nsSOCKSSocketInfo).
*
* In non-blocking mode, |Read| and |Write| should be called by clients only
* when |GetPollFlags| reports data availability. That is, the client calls
* |GetPollFlags| with |PR_POLL_READ| and/or |PR_POLL_WRITE| set, and
* according to the flags that set, |GetPollFlags| will check buffers status
* and decide corresponding actions:
*
* -------------------------------------------------------------------
* | | data in buffer | empty buffer |
* |---------------+-------------------------+-----------------------|
* | PR_POLL_READ | out: PR_POLL_READ | DoRead/DoReadContinue |
* |---------------+-------------------------+-----------------------|
* | PR_POLL_WRITE | DoWrite/DoWriteContinue | out: PR_POLL_WRITE |
* ------------------------------------------+------------------------
*
* |DoRead| and |DoWrite| initiate read/write operations asynchronously, and
* the |DoReadContinue| and |DoWriteContinue| are used to check the amount
* of the data are read/written to/from buffers.
*
* The output parameter and the return value of |GetPollFlags| are identical
* because we don't rely on the low-level select function to wait for data
* availability, we instead use nsNamedPipeService to poll I/O completeness.
*
* When client get |PR_POLL_READ| or |PR_POLL_WRITE| from |GetPollFlags|,
* they are able to use |Read| or |Write| to access the data in the buffer,
* and this is supposed to be very fast because no network traffic is involved.
*
* In blocking mode, the flow is quite similar to non-blocking mode, but
* |DoReadContinue| and |DoWriteContinue| are never been used since the
* operations are done synchronously, which could lead to slow responses.
*/
int32_t Read(void* aBuffer, int32_t aSize);
int32_t Write(const void* aBuffer, int32_t aSize);
// Like Read, but doesn't remove data in internal buffer.
uint32_t Peek(void* aBuffer, int32_t aSize);
// Number of bytes available to read in internal buffer.
int32_t Available() const;
// Flush write buffer
//
// @return whether the buffer has been flushed
bool Sync(uint32_t aTimeout);
void SetNonblocking(bool nonblocking);
bool IsConnected() const;
bool IsNonblocking() const;
HANDLE GetHandle() const;
// Initiate and check current status for read/write operations.
int16_t GetPollFlags(int16_t aInFlags, int16_t* aOutFlags);
private:
virtual ~NamedPipeInfo();
/**
* DoRead/DoWrite starts a read/write call synchronously or asynchronously
* depending on |mNonblocking|. In blocking mode, they return when the action
* has been done and in non-blocking mode it returns the number of bytes that
* were read/written if the operation is done immediately. If it takes some
* time to finish the operation, zero is returned and
* DoReadContinue/DoWriteContinue must be called to get async I/O result.
*/
int32_t DoRead();
int32_t DoReadContinue();
int32_t DoWrite();
int32_t DoWriteContinue();
/**
* There was a write size limitation of named pipe,
* see https://support.microsoft.com/en-us/kb/119218 for more information.
* The limitation no longer exists, so feel free to change the value.
*/
static const uint32_t kBufferSize = 65536;
nsCOMPtr<nsINamedPipeService> mNamedPipeService;
HANDLE mPipe; // the handle to the named pipe.
OVERLAPPED mReadOverlapped; // used for asynchronous read operations.
OVERLAPPED mWriteOverlapped; // used for asynchronous write operations.
uint8_t mReadBuffer[kBufferSize]; // octets read from pipe.
/**
* These indicates the [begin, end) position of the data in the buffer.
*/
DWORD mReadBegin;
DWORD mReadEnd;
bool mHasPendingRead; // previous asynchronous read is not finished yet.
uint8_t mWriteBuffer[kBufferSize]; // octets to be written to pipe.
/**
* These indicates the [begin, end) position of the data in the buffer.
*/
DWORD mWriteBegin; // how many bytes are already written.
DWORD mWriteEnd; // valid amount of data in the buffer.
bool mHasPendingWrite; // previous asynchronous write is not finished yet.
/**
* current blocking mode is non-blocking or not, accessed only in socket
* thread.
*/
bool mNonblocking;
Atomic<DWORD> mErrorCode; // error code from Named Pipe Service.
};
NS_IMPL_ISUPPORTS(NamedPipeInfo,
nsINamedPipeDataObserver)
NamedPipeInfo::NamedPipeInfo()
: mNamedPipeService(do_GetService(NS_NAMEDPIPESERVICE_CONTRACTID))
, mPipe(INVALID_HANDLE_VALUE)
, mReadBegin(0)
, mReadEnd(0)
, mHasPendingRead(false)
, mWriteBegin(0)
, mWriteEnd(0)
, mHasPendingWrite(false)
, mNonblocking(true)
, mErrorCode(0)
{
MOZ_ASSERT(mNamedPipeService);
ZeroMemory(&mReadOverlapped, sizeof(OVERLAPPED));
ZeroMemory(&mWriteOverlapped, sizeof(OVERLAPPED));
}
NamedPipeInfo::~NamedPipeInfo()
{
MOZ_ASSERT(!mPipe);
}
// nsINamedPipeDataObserver
NS_IMETHODIMP
NamedPipeInfo::OnDataAvailable(uint32_t aBytesTransferred,
void* aOverlapped)
{
DebugOnly<bool> isOnPipeServiceThread;
MOZ_ASSERT(NS_SUCCEEDED(mNamedPipeService->IsOnCurrentThread(&isOnPipeServiceThread)) &&
isOnPipeServiceThread);
if (aOverlapped == &mReadOverlapped) {
LOG_NPIO_DEBUG("[%s] %p read %d bytes", __func__, this, aBytesTransferred);
} else if (aOverlapped == &mWriteOverlapped) {
LOG_NPIO_DEBUG("[%s] %p write %d bytes", __func__, this, aBytesTransferred);
} else {
MOZ_ASSERT(false, "invalid callback");
mErrorCode = ERROR_INVALID_DATA;
return NS_ERROR_FAILURE;
}
mErrorCode = ERROR_SUCCESS;
// dispatch an empty event to trigger STS thread
gSocketTransportService->Dispatch(NS_NewRunnableFunction([]{}),
NS_DISPATCH_NORMAL);
return NS_OK;
}
NS_IMETHODIMP
NamedPipeInfo::OnError(uint32_t aError,
void* aOverlapped)
{
DebugOnly<bool> isOnPipeServiceThread;
MOZ_ASSERT(NS_SUCCEEDED(mNamedPipeService->IsOnCurrentThread(&isOnPipeServiceThread)) &&
isOnPipeServiceThread);
LOG_NPIO_ERROR("[%s] error code=%d", __func__, aError);
mErrorCode = aError;
// dispatch an empty event to trigger STS thread
gSocketTransportService->Dispatch(NS_NewRunnableFunction([]{}),
NS_DISPATCH_NORMAL);
return NS_OK;
}
// Named pipe operations
nsresult
NamedPipeInfo::Connect(const nsACString& aPath)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
HANDLE pipe;
nsAutoCString path(aPath);
pipe = CreateFileA(path.get(),
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
nullptr,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
nullptr);
if (pipe == INVALID_HANDLE_VALUE) {
LOG_NPIO_ERROR("[%p] CreateFile error (%d)", this, GetLastError());
return NS_ERROR_FAILURE;
}
DWORD pipeMode = PIPE_READMODE_MESSAGE;
if (!SetNamedPipeHandleState(pipe, &pipeMode, nullptr, nullptr)) {
LOG_NPIO_ERROR("[%p] SetNamedPipeHandleState error (%d)",
this,
GetLastError());
CloseHandle(pipe);
return NS_ERROR_FAILURE;
}
nsresult rv = mNamedPipeService->AddDataObserver(pipe, this);
if (NS_WARN_IF(NS_FAILED(rv))) {
CloseHandle(pipe);
return rv;
}
HANDLE readEvent = CreateEventA(nullptr, TRUE, TRUE, "NamedPipeRead");
if (NS_WARN_IF(!readEvent || readEvent == INVALID_HANDLE_VALUE)) {
CloseHandle(pipe);
return NS_ERROR_FAILURE;
}
HANDLE writeEvent = CreateEventA(nullptr, TRUE, TRUE, "NamedPipeWrite");
if (NS_WARN_IF(!writeEvent || writeEvent == INVALID_HANDLE_VALUE)) {
CloseHandle(pipe);
CloseHandle(readEvent);
return NS_ERROR_FAILURE;
}
mPipe = pipe;
mReadOverlapped.hEvent = readEvent;
mWriteOverlapped.hEvent = writeEvent;
return NS_OK;
}
nsresult
NamedPipeInfo::Disconnect()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
nsresult rv = mNamedPipeService->RemoveDataObserver(mPipe, this);
NS_WARN_IF(NS_FAILED(rv));
mPipe = nullptr;
if (mReadOverlapped.hEvent &&
mReadOverlapped.hEvent != INVALID_HANDLE_VALUE) {
CloseHandle(mReadOverlapped.hEvent);
mReadOverlapped.hEvent = nullptr;
}
if (mWriteOverlapped.hEvent &&
mWriteOverlapped.hEvent != INVALID_HANDLE_VALUE) {
CloseHandle(mWriteOverlapped.hEvent);
mWriteOverlapped.hEvent = nullptr;
}
return NS_OK;
}
int32_t
NamedPipeInfo::Read(void* aBuffer, int32_t aSize)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
int32_t bytesRead = Peek(aBuffer, aSize);
if (bytesRead > 0) {
mReadBegin += bytesRead;
}
return bytesRead;
}
int32_t
NamedPipeInfo::Write(const void* aBuffer, int32_t aSize)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mWriteBegin <= mWriteEnd);
if (!IsConnected()) {
// pipe unconnected
PR_SetError(PR_NOT_CONNECTED_ERROR, 0);
return -1;
}
if (mWriteBegin == mWriteEnd) {
mWriteBegin = mWriteEnd = 0;
}
int32_t bytesToWrite = std::min<int32_t>(aSize,
sizeof(mWriteBuffer) - mWriteEnd);
MOZ_ASSERT(bytesToWrite >= 0);
if (bytesToWrite == 0) {
PR_SetError(IsNonblocking() ? PR_WOULD_BLOCK_ERROR
: PR_IO_PENDING_ERROR,
0);
return -1;
}
memcpy(&mWriteBuffer[mWriteEnd], aBuffer, bytesToWrite);
mWriteEnd += bytesToWrite;
/**
* Triggers internal write operation by calling |GetPollFlags|.
* This is required for callers that use blocking I/O because they don't call
* |GetPollFlags| to write data, but this also works for non-blocking I/O.
*/
int16_t outFlag;
GetPollFlags(PR_POLL_WRITE, &outFlag);
return bytesToWrite;
}
uint32_t
NamedPipeInfo::Peek(void* aBuffer, int32_t aSize)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mReadBegin <= mReadEnd);
if (!IsConnected()) {
// pipe unconnected
PR_SetError(PR_NOT_CONNECTED_ERROR, 0);
return -1;
}
/**
* If there's nothing in the read buffer, try to trigger internal read
* operation by calling |GetPollFlags|. This is required for callers that
* use blocking I/O because they don't call |GetPollFlags| to read data,
* but this also works for non-blocking I/O.
*/
if (!Available()) {
int16_t outFlag;
GetPollFlags(PR_POLL_READ, &outFlag);
if (!(outFlag & PR_POLL_READ)) {
PR_SetError(IsNonblocking() ? PR_WOULD_BLOCK_ERROR
: PR_IO_PENDING_ERROR,
0);
return -1;
}
}
// Available() can't return more than what fits to the buffer at the read offset.
int32_t bytesRead = std::min<int32_t>(aSize, Available());
MOZ_ASSERT(bytesRead >= 0);
MOZ_ASSERT(mReadBegin + bytesRead <= mReadEnd);
memcpy(aBuffer, &mReadBuffer[mReadBegin], bytesRead);
return bytesRead;
}
int32_t
NamedPipeInfo::Available() const
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mReadBegin <= mReadEnd);
MOZ_ASSERT(mReadEnd - mReadBegin <= 0x7FFFFFFF); // no more than int32_max
return mReadEnd - mReadBegin;
}
bool
NamedPipeInfo::Sync(uint32_t aTimeout)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
if (!mHasPendingWrite) {
return true;
}
return WaitForSingleObject(mWriteOverlapped.hEvent, aTimeout) == WAIT_OBJECT_0;
}
void
NamedPipeInfo::SetNonblocking(bool nonblocking)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
mNonblocking = nonblocking;
}
bool
NamedPipeInfo::IsConnected() const
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return mPipe && mPipe != INVALID_HANDLE_VALUE;
}
bool
NamedPipeInfo::IsNonblocking() const
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return mNonblocking;
}
HANDLE
NamedPipeInfo::GetHandle() const
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return mPipe;
}
int16_t
NamedPipeInfo::GetPollFlags(int16_t aInFlags, int16_t* aOutFlags)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
*aOutFlags = 0;
if (aInFlags & PR_POLL_READ) {
int32_t bytesToRead = 0;
if (mReadBegin < mReadEnd) { // data in buffer and is ready to be read
bytesToRead = Available();
} else if (mHasPendingRead) { // nonblocking I/O and has pending task
bytesToRead = DoReadContinue();
} else { // read bufer is empty.
bytesToRead = DoRead();
}
if (bytesToRead > 0) {
*aOutFlags |= PR_POLL_READ;
} else if (bytesToRead < 0) {
*aOutFlags |= PR_POLL_ERR;
}
}
if (aInFlags & PR_POLL_WRITE) {
int32_t bytesWritten = 0;
if (mHasPendingWrite) { // nonblocking I/O and has pending task.
bytesWritten = DoWriteContinue();
} else if (mWriteBegin < mWriteEnd) { // data in buffer, ready to write
bytesWritten = DoWrite();
} else { // write buffer is empty.
*aOutFlags |= PR_POLL_WRITE;
}
if (bytesWritten < 0) {
*aOutFlags |= PR_POLL_ERR;
} else if (bytesWritten &&
!mHasPendingWrite &&
mWriteBegin == mWriteEnd) {
*aOutFlags |= PR_POLL_WRITE;
}
}
return *aOutFlags;
}
// @return: data has been read and is available
int32_t
NamedPipeInfo::DoRead()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!mHasPendingRead);
MOZ_ASSERT(mReadBegin == mReadEnd); // the buffer should be empty
mReadBegin = 0;
mReadEnd = 0;
BOOL success = ReadFile(mPipe,
mReadBuffer,
sizeof(mReadBuffer),
&mReadEnd,
IsNonblocking() ? &mReadOverlapped : nullptr);
if (success) {
LOG_NPIO_DEBUG("[%s][%p] %d bytes read", __func__, this, mReadEnd);
return mReadEnd;
}
switch (GetLastError()) {
case ERROR_MORE_DATA: // has more data to read
mHasPendingRead = true;
return DoReadContinue();
case ERROR_IO_PENDING: // read is pending
mHasPendingRead = true;
break;
default:
LOG_NPIO_ERROR("[%s] ReadFile failed (%d)", __func__, GetLastError());
Disconnect();
PR_SetError(PR_IO_ERROR, 0);
return -1;
}
return 0;
}
int32_t
NamedPipeInfo::DoReadContinue()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mHasPendingRead);
MOZ_ASSERT(mReadBegin == 0 && mReadEnd == 0);
BOOL success;
success = GetOverlappedResult(mPipe,
&mReadOverlapped,
&mReadEnd,
FALSE);
if (success) {
mHasPendingRead = false;
if (mReadEnd == 0) {
Disconnect();
PR_SetError(PR_NOT_CONNECTED_ERROR, 0);
return -1;
}
LOG_NPIO_DEBUG("[%s][%p] %d bytes read", __func__, this, mReadEnd);
return mReadEnd;
}
switch (GetLastError()) {
case ERROR_MORE_DATA:
mHasPendingRead = false;
LOG_NPIO_DEBUG("[%s][%p] %d bytes read", __func__, this, mReadEnd);
return mReadEnd;
case ERROR_IO_INCOMPLETE: // still in progress
break;
default:
LOG_NPIO_ERROR("[%s]: GetOverlappedResult failed (%d)",
__func__,
GetLastError());
Disconnect();
PR_SetError(PR_IO_ERROR, 0);
return -1;
}
return 0;
}
int32_t
NamedPipeInfo::DoWrite()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!mHasPendingWrite);
MOZ_ASSERT(mWriteBegin < mWriteEnd);
DWORD bytesWritten = 0;
BOOL success = WriteFile(mPipe,
&mWriteBuffer[mWriteBegin],
mWriteEnd - mWriteBegin,
&bytesWritten,
IsNonblocking() ? &mWriteOverlapped : nullptr);
if (success) {
mWriteBegin += bytesWritten;
LOG_NPIO_DEBUG("[%s][%p] %d bytes written", __func__, this, bytesWritten);
return bytesWritten;
}
if (GetLastError() != ERROR_IO_PENDING) {
LOG_NPIO_ERROR("[%s] WriteFile failed (%d)", __func__, GetLastError());
Disconnect();
PR_SetError(PR_IO_ERROR, 0);
return -1;
}
mHasPendingWrite = true;
return 0;
}
int32_t
NamedPipeInfo::DoWriteContinue()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mHasPendingWrite);
DWORD bytesWritten = 0;
BOOL success = GetOverlappedResult(mPipe,
&mWriteOverlapped,
&bytesWritten,
FALSE);
if (!success) {
if (GetLastError() == ERROR_IO_INCOMPLETE) {
// still in progress
return 0;
}
LOG_NPIO_ERROR("[%s] GetOverlappedResult failed (%d)",
__func__,
GetLastError());
Disconnect();
PR_SetError(PR_IO_ERROR, 0);
return -1;
}
mHasPendingWrite = false;
mWriteBegin += bytesWritten;
LOG_NPIO_DEBUG("[%s][%p] %d bytes written", __func__, this, bytesWritten);
return bytesWritten;
}
static inline NamedPipeInfo*
GetNamedPipeInfo(PRFileDesc* aFd)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_DIAGNOSTIC_ASSERT(aFd);
MOZ_DIAGNOSTIC_ASSERT(aFd->secret);
MOZ_DIAGNOSTIC_ASSERT(PR_GetLayersIdentity(aFd) == nsNamedPipeLayerIdentity);
if (!aFd ||
!aFd->secret ||
PR_GetLayersIdentity(aFd) != nsNamedPipeLayerIdentity) {
LOG_NPIO_ERROR("cannot get named pipe info");
return nullptr;
}
return reinterpret_cast<NamedPipeInfo*>(aFd->secret);
}
static PRStatus
nsNamedPipeConnect(PRFileDesc* aFd,
const PRNetAddr* aAddr,
PRIntervalTime aTimeout)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return PR_FAILURE;
}
if (NS_WARN_IF(NS_FAILED(info->Connect(
nsDependentCString(aAddr->local.path))))) {
return PR_FAILURE;
}
return PR_SUCCESS;
}
static PRStatus
nsNamedPipeConnectContinue(PRFileDesc* aFd, PRInt16 aOutFlags)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return PR_SUCCESS;
}
static PRStatus
nsNamedPipeClose(PRFileDesc* aFd)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
if (aFd->secret && PR_GetLayersIdentity(aFd) == nsNamedPipeLayerIdentity) {
RefPtr<NamedPipeInfo> info = dont_AddRef(GetNamedPipeInfo(aFd));
info->Disconnect();
aFd->secret = nullptr;
aFd->identity = PR_INVALID_IO_LAYER;
}
MOZ_ASSERT(!aFd->lower);
PR_DELETE(aFd);
return PR_SUCCESS;
}
static PRInt32
nsNamedPipeSend(PRFileDesc* aFd,
const void* aBuffer,
PRInt32 aAmount,
PRIntn aFlags,
PRIntervalTime aTimeout)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
Unused << aFlags;
Unused << aTimeout;
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return -1;
}
return info->Write(aBuffer, aAmount);
}
static PRInt32
nsNamedPipeRecv(PRFileDesc* aFd,
void* aBuffer,
PRInt32 aAmount,
PRIntn aFlags,
PRIntervalTime aTimeout)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
Unused << aTimeout;
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return -1;
}
if (aFlags) {
if (aFlags != PR_MSG_PEEK) {
PR_SetError(PR_UNKNOWN_ERROR, 0);
return -1;
}
return info->Peek(aBuffer, aAmount);
}
return info->Read(aBuffer, aAmount);
}
static inline PRInt32
nsNamedPipeRead(PRFileDesc* aFd, void* aBuffer, PRInt32 aAmount)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return -1;
}
return info->Read(aBuffer, aAmount);
}
static inline PRInt32
nsNamedPipeWrite(PRFileDesc* aFd, const void* aBuffer, PRInt32 aAmount)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return -1;
}
return info->Write(aBuffer, aAmount);
}
static PRInt32
nsNamedPipeAvailable(PRFileDesc* aFd)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return -1;
}
return static_cast<PRInt32>(info->Available());
}
static PRInt64
nsNamedPipeAvailable64(PRFileDesc* aFd)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return -1;
}
return static_cast<PRInt64>(info->Available());
}
static PRStatus
nsNamedPipeSync(PRFileDesc* aFd)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return PR_FAILURE;
}
return info->Sync(0) ? PR_SUCCESS : PR_FAILURE;
}
static PRInt16
nsNamedPipePoll(PRFileDesc* aFd, PRInt16 aInFlags, PRInt16* aOutFlags)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
NamedPipeInfo* info = GetNamedPipeInfo(aFd);
if (!info) {
PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0);
return 0;
}
return info->GetPollFlags(aInFlags, aOutFlags);
}
// FIXME: remove socket option functions?
static PRStatus
nsNamedPipeGetSocketOption(PRFileDesc* aFd, PRSocketOptionData* aData)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(aFd);
MOZ_ASSERT(aData);
switch (aData->option) {
case PR_SockOpt_Nonblocking:
aData->value.non_blocking = GetNamedPipeInfo(aFd)->IsNonblocking()
? PR_TRUE
: PR_FALSE;
break;
case PR_SockOpt_Keepalive:
aData->value.keep_alive = PR_TRUE;
break;
case PR_SockOpt_NoDelay:
aData->value.no_delay = PR_TRUE;
break;
default:
PR_SetError(PR_INVALID_METHOD_ERROR, 0);
return PR_FAILURE;
}
return PR_SUCCESS;
}
static PRStatus
nsNamedPipeSetSocketOption(PRFileDesc* aFd, const PRSocketOptionData* aData)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(aFd);
MOZ_ASSERT(aData);
switch (aData->option) {
case PR_SockOpt_Nonblocking:
GetNamedPipeInfo(aFd)->SetNonblocking(aData->value.non_blocking);
break;
case PR_SockOpt_Keepalive:
case PR_SockOpt_NoDelay:
break;
default:
PR_SetError(PR_INVALID_METHOD_ERROR, 0);
return PR_FAILURE;
}
return PR_SUCCESS;
}
static void
Initialize()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
static bool initialized = false;
if (initialized) {
return;
}
nsNamedPipeLayerIdentity = PR_GetUniqueIdentity("Named Pipe layer");
nsNamedPipeLayerMethods = *PR_GetDefaultIOMethods();
nsNamedPipeLayerMethods.close = nsNamedPipeClose;
nsNamedPipeLayerMethods.read = nsNamedPipeRead;
nsNamedPipeLayerMethods.write = nsNamedPipeWrite;
nsNamedPipeLayerMethods.available = nsNamedPipeAvailable;
nsNamedPipeLayerMethods.available64 = nsNamedPipeAvailable64;
nsNamedPipeLayerMethods.fsync = nsNamedPipeSync;
nsNamedPipeLayerMethods.connect = nsNamedPipeConnect;
nsNamedPipeLayerMethods.recv = nsNamedPipeRecv;
nsNamedPipeLayerMethods.send = nsNamedPipeSend;
nsNamedPipeLayerMethods.poll = nsNamedPipePoll;
nsNamedPipeLayerMethods.getsocketoption = nsNamedPipeGetSocketOption;
nsNamedPipeLayerMethods.setsocketoption = nsNamedPipeSetSocketOption;
nsNamedPipeLayerMethods.connectcontinue = nsNamedPipeConnectContinue;
initialized = true;
}
bool
IsNamedPipePath(const nsACString& aPath)
{
return StringBeginsWith(aPath, NS_LITERAL_CSTRING("\\\\.\\pipe\\"));
}
PRFileDesc*
CreateNamedPipeLayer()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
Initialize();
PRFileDesc* layer = PR_CreateIOLayerStub(nsNamedPipeLayerIdentity,
&nsNamedPipeLayerMethods);
if (NS_WARN_IF(!layer)) {
LOG_NPIO_ERROR("CreateNamedPipeLayer() failed.");
return nullptr;
}
RefPtr<NamedPipeInfo> info = new NamedPipeInfo();
layer->secret = reinterpret_cast<PRFilePrivate*>(info.forget().take());
return layer;
}
} // namespace net
} // namespace mozilla