mirror of
https://github.com/morgan9e/FreeRDP
synced 2026-04-15 00:44:19 +09:00
Merge pull request #10911 from akallabeth/assert-assert-assert
Assert assert assert
This commit is contained in:
@@ -26,3 +26,4 @@ set(BUILD_WITH_CLANG_TIDY OFF CACHE BOOL "oss fuzz")
|
||||
set(OSS_FUZZ ON CACHE BOOL "oss fuzz")
|
||||
set(BUILD_FUZZERS ON CACHE BOOL "oss fuzz")
|
||||
set(BUILD_TESTING_INTERNAL ON CACHE BOOL "oss fuzz")
|
||||
set(WITH_STREAMPOOL_DEBUG ON CACHE BOOL "oss fuzz")
|
||||
|
||||
@@ -3,6 +3,7 @@ set(CMAKE_VERBOSE_MAKEFILE ON CACHE BOOL "preload")
|
||||
set(WITH_SERVER ON CACHE BOOL "qa default")
|
||||
set(WITH_SAMPLE ON CACHE BOOL "qa default")
|
||||
set(WITH_SIMD ON CACHE BOOL "qa default")
|
||||
set(WITH_STREAMPOOL_DEBUG ON CACHE BOOL "preload")
|
||||
set(WITH_VERBOSE_WINPR_ASSERT OFF CACHE BOOL "qa default")
|
||||
set(ENABLE_WARNING_VERBOSE ON CACHE BOOL "preload")
|
||||
set(BUILD_SHARED_LIBS OFF CACHE BOOL "qa default")
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
set(BUILD_TESTING_INTERNAL ON CACHE BOOL "qa default")
|
||||
set(WITH_STREAMPOOL_DEBUG ON CACHE BOOL "preload")
|
||||
set(CMAKE_VERBOSE_MAKEFILE ON CACHE BOOL "preload")
|
||||
set(ENABLE_WARNING_VERBOSE ON CACHE BOOL "preload")
|
||||
set(WITH_MANPAGES ON CACHE BOOL "qa default")
|
||||
|
||||
@@ -491,6 +491,9 @@ BOOL rdp_client_disconnect(rdpRdp* rdp)
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
if (!transport_disconnect(rdp->transport))
|
||||
return FALSE;
|
||||
|
||||
if (!rdp_reset(rdp))
|
||||
return FALSE;
|
||||
|
||||
|
||||
@@ -785,7 +785,10 @@ static BOOL license_send(rdpLicense* license, wStream* s, BYTE type)
|
||||
flags |= EXTENDED_ERROR_MSG_SUPPORTED;
|
||||
|
||||
if (!license_write_preamble(s, type, flags, wMsgSize))
|
||||
{
|
||||
Stream_Release(s);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
#ifdef WITH_DEBUG_LICENSE
|
||||
WLog_DBG(TAG, "Sending %s Packet, length %" PRIu16 "", license_request_type_string(type),
|
||||
@@ -1742,7 +1745,7 @@ BOOL license_send_license_info(rdpLicense* license, const LICENSE_BLOB* calBlob,
|
||||
|
||||
if (!license_check_stream_capacity(s, 8 + sizeof(license->ClientRandom),
|
||||
"license info::ClientRandom"))
|
||||
return FALSE;
|
||||
goto error;
|
||||
|
||||
Stream_Write_UINT32(s,
|
||||
license->PreferredKeyExchangeAlg); /* PreferredKeyExchangeAlg (4 bytes) */
|
||||
@@ -2479,7 +2482,6 @@ BOOL license_answer_license_request(rdpLicense* license)
|
||||
|
||||
BOOL license_send_platform_challenge_response(rdpLicense* license)
|
||||
{
|
||||
wStream* s = license_send_stream_init(license);
|
||||
wStream* challengeRespData = NULL;
|
||||
BYTE* buffer = NULL;
|
||||
BOOL status = 0;
|
||||
@@ -2552,6 +2554,10 @@ BOOL license_send_platform_challenge_response(rdpLicense* license)
|
||||
winpr_HexDump(TAG, WLOG_DEBUG, license->EncryptedHardwareId->data,
|
||||
license->EncryptedHardwareId->length);
|
||||
#endif
|
||||
wStream* s = license_send_stream_init(license);
|
||||
if (!s)
|
||||
return FALSE;
|
||||
|
||||
if (license_write_client_platform_challenge_response(license, s))
|
||||
return license_send(license, s, PLATFORM_CHALLENGE_RESPONSE);
|
||||
|
||||
|
||||
@@ -2400,6 +2400,7 @@ static void rdp_reset_free(rdpRdp* rdp)
|
||||
{
|
||||
WINPR_ASSERT(rdp);
|
||||
|
||||
(void)security_lock(rdp);
|
||||
rdp_free_rc4_decrypt_keys(rdp);
|
||||
rdp_free_rc4_encrypt_keys(rdp);
|
||||
|
||||
@@ -2407,6 +2408,7 @@ static void rdp_reset_free(rdpRdp* rdp)
|
||||
winpr_Cipher_Free(rdp->fips_decrypt);
|
||||
rdp->fips_encrypt = NULL;
|
||||
rdp->fips_decrypt = NULL;
|
||||
(void)security_unlock(rdp);
|
||||
|
||||
mcs_free(rdp->mcs);
|
||||
nego_free(rdp->nego);
|
||||
@@ -2502,7 +2504,6 @@ void rdp_free(rdpRdp* rdp)
|
||||
{
|
||||
if (rdp)
|
||||
{
|
||||
DeleteCriticalSection(&rdp->critical);
|
||||
rdp_reset_free(rdp);
|
||||
|
||||
freerdp_settings_free(rdp->settings);
|
||||
@@ -2522,6 +2523,7 @@ void rdp_free(rdpRdp* rdp)
|
||||
if (rdp->abortEvent)
|
||||
(void)CloseHandle(rdp->abortEvent);
|
||||
aad_free(rdp->aad);
|
||||
DeleteCriticalSection(&rdp->critical);
|
||||
free(rdp);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,6 +478,8 @@ static int transport_bio_buffered_write(BIO* bio, const char* buf, int num)
|
||||
|
||||
WINPR_ASSERT(bio);
|
||||
WINPR_ASSERT(ptr);
|
||||
if (num < 0)
|
||||
return num;
|
||||
|
||||
ptr->writeBlocked = FALSE;
|
||||
BIO_clear_flags(bio, BIO_FLAGS_WRITE);
|
||||
@@ -485,7 +487,7 @@ static int transport_bio_buffered_write(BIO* bio, const char* buf, int num)
|
||||
/* we directly append extra bytes in the xmit buffer, this could be prevented
|
||||
* but for now it makes the code more simple.
|
||||
*/
|
||||
if (buf && num && !ringbuffer_write(&ptr->xmitBuffer, (const BYTE*)buf, num))
|
||||
if (buf && (num > 0) && !ringbuffer_write(&ptr->xmitBuffer, (const BYTE*)buf, (size_t)num))
|
||||
{
|
||||
WLog_ERR(TAG, "an error occurred when writing (num: %d)", num);
|
||||
return -1;
|
||||
|
||||
@@ -149,10 +149,13 @@ static void transport_ssl_cb(const SSL* ssl, int where, int ret)
|
||||
|
||||
wStream* transport_send_stream_init(rdpTransport* transport, size_t size)
|
||||
{
|
||||
wStream* s = NULL;
|
||||
WINPR_ASSERT(transport);
|
||||
|
||||
if (!(s = StreamPool_Take(transport->ReceivePool, size)))
|
||||
if (!transport->frontBio)
|
||||
return NULL;
|
||||
|
||||
wStream* s = StreamPool_Take(transport->ReceivePool, size);
|
||||
if (!s)
|
||||
return NULL;
|
||||
|
||||
if (!Stream_EnsureCapacity(s, size))
|
||||
@@ -216,7 +219,12 @@ static BOOL transport_default_attach(rdpTransport* transport, int sockfd)
|
||||
*/
|
||||
BIO_set_fd(socketBio, sockfd, BIO_CLOSE);
|
||||
}
|
||||
EnterCriticalSection(&(transport->ReadLock));
|
||||
EnterCriticalSection(&(transport->WriteLock));
|
||||
transport->frontBio = bufferedBio;
|
||||
LeaveCriticalSection(&(transport->WriteLock));
|
||||
LeaveCriticalSection(&(transport->ReadLock));
|
||||
|
||||
return TRUE;
|
||||
fail:
|
||||
|
||||
@@ -1592,6 +1600,8 @@ static BOOL transport_default_disconnect(rdpTransport* transport)
|
||||
if (!transport)
|
||||
return FALSE;
|
||||
|
||||
EnterCriticalSection(&(transport->ReadLock));
|
||||
EnterCriticalSection(&(transport->WriteLock));
|
||||
if (transport->tls)
|
||||
{
|
||||
freerdp_tls_free(transport->tls);
|
||||
@@ -1624,6 +1634,8 @@ static BOOL transport_default_disconnect(rdpTransport* transport)
|
||||
transport->frontBio = NULL;
|
||||
transport->layer = TRANSPORT_LAYER_TCP;
|
||||
transport->earlyUserAuth = FALSE;
|
||||
LeaveCriticalSection(&(transport->WriteLock));
|
||||
LeaveCriticalSection(&(transport->ReadLock));
|
||||
return status;
|
||||
}
|
||||
|
||||
@@ -1708,15 +1720,26 @@ void transport_free(rdpTransport* transport)
|
||||
|
||||
transport_disconnect(transport);
|
||||
|
||||
EnterCriticalSection(&(transport->ReadLock));
|
||||
if (transport->ReceiveBuffer)
|
||||
Stream_Release(transport->ReceiveBuffer);
|
||||
LeaveCriticalSection(&(transport->ReadLock));
|
||||
|
||||
(void)StreamPool_WaitForReturn(transport->ReceivePool, INFINITE);
|
||||
|
||||
EnterCriticalSection(&(transport->ReadLock));
|
||||
EnterCriticalSection(&(transport->WriteLock));
|
||||
|
||||
nla_free(transport->nla);
|
||||
StreamPool_Free(transport->ReceivePool);
|
||||
(void)CloseHandle(transport->connectedEvent);
|
||||
(void)CloseHandle(transport->rereadEvent);
|
||||
(void)CloseHandle(transport->ioEvent);
|
||||
|
||||
LeaveCriticalSection(&(transport->ReadLock));
|
||||
DeleteCriticalSection(&(transport->ReadLock));
|
||||
|
||||
LeaveCriticalSection(&(transport->WriteLock));
|
||||
DeleteCriticalSection(&(transport->WriteLock));
|
||||
free(transport);
|
||||
}
|
||||
@@ -1795,6 +1818,8 @@ rdpTsg* transport_get_tsg(rdpTransport* transport)
|
||||
wStream* transport_take_from_pool(rdpTransport* transport, size_t size)
|
||||
{
|
||||
WINPR_ASSERT(transport);
|
||||
if (!transport->frontBio)
|
||||
return NULL;
|
||||
return StreamPool_Take(transport->ReceivePool, size);
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
|
||||
BOOL ringbuffer_init(RingBuffer* rb, size_t initialSize)
|
||||
{
|
||||
WINPR_ASSERT(rb);
|
||||
rb->buffer = malloc(initialSize);
|
||||
|
||||
if (!rb->buffer)
|
||||
@@ -54,23 +55,29 @@ BOOL ringbuffer_init(RingBuffer* rb, size_t initialSize)
|
||||
|
||||
size_t ringbuffer_used(const RingBuffer* rb)
|
||||
{
|
||||
WINPR_ASSERT(rb);
|
||||
return rb->size - rb->freeSize;
|
||||
}
|
||||
|
||||
size_t ringbuffer_capacity(const RingBuffer* rb)
|
||||
{
|
||||
WINPR_ASSERT(rb);
|
||||
return rb->size;
|
||||
}
|
||||
|
||||
void ringbuffer_destroy(RingBuffer* rb)
|
||||
{
|
||||
DEBUG_RINGBUFFER("ringbuffer_destroy(%p)", (void*)rb);
|
||||
if (!rb)
|
||||
return;
|
||||
|
||||
free(rb->buffer);
|
||||
rb->buffer = NULL;
|
||||
}
|
||||
|
||||
static BOOL ringbuffer_realloc(RingBuffer* rb, size_t targetSize)
|
||||
{
|
||||
WINPR_ASSERT(rb);
|
||||
BYTE* newData = NULL;
|
||||
DEBUG_RINGBUFFER("ringbuffer_realloc(%p): targetSize: %" PRIdz "", (void*)rb, targetSize);
|
||||
|
||||
@@ -162,6 +169,8 @@ BOOL ringbuffer_write(RingBuffer* rb, const BYTE* ptr, size_t sz)
|
||||
{
|
||||
size_t toWrite = 0;
|
||||
size_t remaining = 0;
|
||||
|
||||
WINPR_ASSERT(rb);
|
||||
DEBUG_RINGBUFFER("ringbuffer_write(%p): sz: %" PRIdz "", (void*)rb, sz);
|
||||
|
||||
if ((rb->freeSize <= sz) && !ringbuffer_realloc(rb, rb->size + sz))
|
||||
@@ -198,6 +207,7 @@ BYTE* ringbuffer_ensure_linear_write(RingBuffer* rb, size_t sz)
|
||||
{
|
||||
DEBUG_RINGBUFFER("ringbuffer_ensure_linear_write(%p): sz: %" PRIdz "", (void*)rb, sz);
|
||||
|
||||
WINPR_ASSERT(rb);
|
||||
if (rb->freeSize < sz)
|
||||
{
|
||||
if (!ringbuffer_realloc(rb, rb->size + sz - rb->freeSize + 32))
|
||||
@@ -229,6 +239,7 @@ BOOL ringbuffer_commit_written_bytes(RingBuffer* rb, size_t sz)
|
||||
{
|
||||
DEBUG_RINGBUFFER("ringbuffer_commit_written_bytes(%p): sz: %" PRIdz "", (void*)rb, sz);
|
||||
|
||||
WINPR_ASSERT(rb);
|
||||
if (sz < 1)
|
||||
return TRUE;
|
||||
|
||||
@@ -248,6 +259,7 @@ int ringbuffer_peek(const RingBuffer* rb, DataChunk chunks[2], size_t sz)
|
||||
int status = 0;
|
||||
DEBUG_RINGBUFFER("ringbuffer_peek(%p): sz: %" PRIdz "", (const void*)rb, sz);
|
||||
|
||||
WINPR_ASSERT(rb);
|
||||
if (sz < 1)
|
||||
return 0;
|
||||
|
||||
@@ -282,6 +294,7 @@ void ringbuffer_commit_read_bytes(RingBuffer* rb, size_t sz)
|
||||
{
|
||||
DEBUG_RINGBUFFER("ringbuffer_commit_read_bytes(%p): sz: %" PRIdz "", (void*)rb, sz);
|
||||
|
||||
WINPR_ASSERT(rb);
|
||||
if (sz < 1)
|
||||
return;
|
||||
|
||||
|
||||
@@ -1370,13 +1370,36 @@ extern "C"
|
||||
|
||||
WINPR_API void StreamPool_Return(wStreamPool* pool, wStream* s);
|
||||
|
||||
WINPR_API wStream* StreamPool_Take(wStreamPool* pool, size_t size);
|
||||
|
||||
WINPR_API void Stream_AddRef(wStream* s);
|
||||
WINPR_API void Stream_Release(wStream* s);
|
||||
|
||||
WINPR_ATTR_MALLOC(Stream_Release, 1)
|
||||
WINPR_API wStream* StreamPool_Take(wStreamPool* pool, size_t size);
|
||||
|
||||
WINPR_API wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr);
|
||||
|
||||
/** Return the number of streams still not returned to the pool
|
||||
*
|
||||
* @param pool The pool to query, must not be \b NULL
|
||||
*
|
||||
* @return the number of streams still in use
|
||||
*
|
||||
* @since version 3.10.0
|
||||
*/
|
||||
WINPR_API size_t StreamPool_UsedCount(wStreamPool* pool);
|
||||
|
||||
/** Wait up to \b timeoutMS milliseconds for streams to be returned to the pool.
|
||||
* Use \b INFINITE for an infinite timeout
|
||||
*
|
||||
* @param pool The pool to query, must not be \b NULL
|
||||
* @param timeoutMS Milliseconds to wait at most, use \b INFINITE for no timeout.
|
||||
*
|
||||
* @return \b TRUE in case all streams were returned, \b FALSE otherwise.
|
||||
*
|
||||
* @since version 3.10.0
|
||||
*/
|
||||
WINPR_API BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS);
|
||||
|
||||
WINPR_API void StreamPool_Clear(wStreamPool* pool);
|
||||
|
||||
WINPR_API void StreamPool_Free(wStreamPool* pool);
|
||||
|
||||
@@ -20,6 +20,11 @@ include(CMakeDependentOption)
|
||||
|
||||
winpr_include_directory_add(${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
option(WITH_STREAMPOOL_DEBUG "build with extensive streampool logging" OFF)
|
||||
if(WITH_STREAMPOOL_DEBUG)
|
||||
winpr_definition_add(-DWITH_STREAMPOOL_DEBUG)
|
||||
endif()
|
||||
|
||||
option(WITH_LODEPNG "build WinPR with PNG support" OFF)
|
||||
if(WITH_LODEPNG)
|
||||
find_package(lodepng REQUIRED)
|
||||
|
||||
@@ -25,22 +25,64 @@
|
||||
#include <winpr/collections.h>
|
||||
|
||||
#include "../stream.h"
|
||||
#include "../log.h"
|
||||
#define TAG WINPR_TAG("utils.streampool")
|
||||
|
||||
struct s_StreamPoolEntry
|
||||
{
|
||||
#if defined(WITH_STREAMPOOL_DEBUG)
|
||||
char** msg;
|
||||
size_t lines;
|
||||
#endif
|
||||
wStream* s;
|
||||
};
|
||||
|
||||
struct s_wStreamPool
|
||||
{
|
||||
size_t aSize;
|
||||
size_t aCapacity;
|
||||
wStream** aArray;
|
||||
struct s_StreamPoolEntry* aArray;
|
||||
|
||||
size_t uSize;
|
||||
size_t uCapacity;
|
||||
wStream** uArray;
|
||||
struct s_StreamPoolEntry* uArray;
|
||||
|
||||
CRITICAL_SECTION lock;
|
||||
BOOL synchronized;
|
||||
size_t defaultSize;
|
||||
};
|
||||
|
||||
static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
|
||||
{
|
||||
if (!entry)
|
||||
return;
|
||||
|
||||
#if defined(WITH_STREAMPOOL_DEBUG)
|
||||
free((void*)entry->msg);
|
||||
#endif
|
||||
|
||||
if (discardStream && entry->s)
|
||||
Stream_Free(entry->s, entry->s->isAllocatedStream);
|
||||
|
||||
const struct s_StreamPoolEntry empty = { 0 };
|
||||
*entry = empty;
|
||||
}
|
||||
|
||||
static struct s_StreamPoolEntry add_entry(wStream* s)
|
||||
{
|
||||
struct s_StreamPoolEntry entry = { 0 };
|
||||
|
||||
#if defined(WITH_STREAMPOOL_DEBUG)
|
||||
void* stack = winpr_backtrace(20);
|
||||
if (stack)
|
||||
entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
|
||||
winpr_backtrace_free(stack);
|
||||
#endif
|
||||
|
||||
entry.s = s;
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lock the stream pool
|
||||
*/
|
||||
@@ -65,31 +107,29 @@ static INLINE void StreamPool_Unlock(wStreamPool* pool)
|
||||
|
||||
static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
|
||||
{
|
||||
size_t new_cap = 0;
|
||||
size_t* cap = NULL;
|
||||
size_t* size = NULL;
|
||||
wStream*** array = NULL;
|
||||
|
||||
WINPR_ASSERT(pool);
|
||||
|
||||
cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
|
||||
size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
|
||||
array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
|
||||
size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
|
||||
size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
|
||||
struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
|
||||
|
||||
size_t new_cap = 0;
|
||||
if (*cap == 0)
|
||||
new_cap = *size + count;
|
||||
else if (*size + count > *cap)
|
||||
new_cap = *cap * 2;
|
||||
new_cap = (*size + count + 2) / 2 * 3;
|
||||
else if ((*size + count) < *cap / 3)
|
||||
new_cap = *cap / 2;
|
||||
|
||||
if (new_cap > 0)
|
||||
{
|
||||
wStream** new_arr = NULL;
|
||||
struct s_StreamPoolEntry* new_arr = NULL;
|
||||
|
||||
if (*cap < *size + count)
|
||||
*cap += count;
|
||||
|
||||
new_arr = (wStream**)realloc((void*)*array, sizeof(wStream*) * new_cap);
|
||||
new_arr =
|
||||
(struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
|
||||
if (!new_arr)
|
||||
return FALSE;
|
||||
*cap = new_cap;
|
||||
@@ -102,28 +142,21 @@ static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL used
|
||||
* Methods
|
||||
*/
|
||||
|
||||
static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index, INT64 count)
|
||||
static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
|
||||
{
|
||||
WINPR_ASSERT(pool);
|
||||
if (count > 0)
|
||||
{
|
||||
const size_t pcount = (size_t)count;
|
||||
StreamPool_EnsureCapacity(pool, pcount, TRUE);
|
||||
|
||||
MoveMemory((void*)&pool->uArray[index + pcount], (void*)&pool->uArray[index],
|
||||
(pool->uSize - index) * sizeof(wStream*));
|
||||
pool->uSize += pcount;
|
||||
}
|
||||
else if (count < 0)
|
||||
const size_t pcount = 1;
|
||||
const size_t off = index + pcount;
|
||||
if (pool->uSize >= off)
|
||||
{
|
||||
const size_t pcount = (size_t)-count;
|
||||
const size_t off = index + pcount;
|
||||
if (pool->uSize > off)
|
||||
for (size_t x = 0; x < pcount; x++)
|
||||
{
|
||||
MoveMemory((void*)&pool->uArray[index], (void*)&pool->uArray[index + pcount],
|
||||
(pool->uSize - index - pcount) * sizeof(wStream*));
|
||||
struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
|
||||
discard_entry(cur, FALSE);
|
||||
}
|
||||
|
||||
MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
|
||||
(pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
|
||||
pool->uSize -= pcount;
|
||||
}
|
||||
}
|
||||
@@ -135,7 +168,8 @@ static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index, INT64 count)
|
||||
static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
|
||||
{
|
||||
StreamPool_EnsureCapacity(pool, 1, TRUE);
|
||||
pool->uArray[(pool->uSize)++] = s;
|
||||
pool->uArray[pool->uSize] = add_entry(s);
|
||||
pool->uSize++;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -147,36 +181,31 @@ static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
|
||||
WINPR_ASSERT(pool);
|
||||
for (size_t index = 0; index < pool->uSize; index++)
|
||||
{
|
||||
if (pool->uArray[index] == s)
|
||||
struct s_StreamPoolEntry* cur = &pool->uArray[index];
|
||||
if (cur->s == s)
|
||||
{
|
||||
StreamPool_ShiftUsed(pool, index, -1);
|
||||
StreamPool_ShiftUsed(pool, index);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index, INT64 count)
|
||||
static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
|
||||
{
|
||||
WINPR_ASSERT(pool);
|
||||
if (count > 0)
|
||||
{
|
||||
const size_t pcount = (size_t)count;
|
||||
|
||||
StreamPool_EnsureCapacity(pool, pcount, FALSE);
|
||||
MoveMemory((void*)&pool->aArray[index + pcount], (void*)&pool->aArray[index],
|
||||
(pool->aSize - index) * sizeof(wStream*));
|
||||
pool->aSize += pcount;
|
||||
}
|
||||
else if (count < 0)
|
||||
const size_t pcount = 1;
|
||||
const size_t off = index + pcount;
|
||||
if (pool->aSize >= off)
|
||||
{
|
||||
const size_t pcount = (size_t)-count;
|
||||
const size_t off = index + pcount;
|
||||
if (pool->aSize > off)
|
||||
for (size_t x = 0; x < pcount; x++)
|
||||
{
|
||||
MoveMemory((void*)&pool->aArray[index], (void*)&pool->aArray[index + pcount],
|
||||
(pool->aSize - index - pcount) * sizeof(wStream*));
|
||||
struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
|
||||
discard_entry(cur, FALSE);
|
||||
}
|
||||
|
||||
MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
|
||||
(pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
|
||||
pool->aSize -= pcount;
|
||||
}
|
||||
}
|
||||
@@ -198,7 +227,8 @@ wStream* StreamPool_Take(wStreamPool* pool, size_t size)
|
||||
|
||||
for (size_t index = 0; index < pool->aSize; index++)
|
||||
{
|
||||
s = pool->aArray[index];
|
||||
struct s_StreamPoolEntry* cur = &pool->aArray[index];
|
||||
s = cur->s;
|
||||
|
||||
if (Stream_Capacity(s) >= size)
|
||||
{
|
||||
@@ -218,7 +248,7 @@ wStream* StreamPool_Take(wStreamPool* pool, size_t size)
|
||||
{
|
||||
Stream_SetPosition(s, 0);
|
||||
Stream_SetLength(s, Stream_Capacity(s));
|
||||
StreamPool_ShiftAvailable(pool, foundIndex, -1);
|
||||
StreamPool_ShiftAvailable(pool, foundIndex);
|
||||
}
|
||||
|
||||
if (s)
|
||||
@@ -244,11 +274,11 @@ static void StreamPool_Remove(wStreamPool* pool, wStream* s)
|
||||
Stream_EnsureValidity(s);
|
||||
for (size_t x = 0; x < pool->aSize; x++)
|
||||
{
|
||||
wStream* cs = pool->aArray[x];
|
||||
wStream* cs = pool->aArray[x].s;
|
||||
if (cs == s)
|
||||
return;
|
||||
}
|
||||
pool->aArray[(pool->aSize)++] = s;
|
||||
pool->aArray[(pool->aSize)++] = add_entry(s);
|
||||
StreamPool_RemoveUsed(pool, s);
|
||||
}
|
||||
|
||||
@@ -311,11 +341,12 @@ wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
|
||||
|
||||
for (size_t index = 0; index < pool->uSize; index++)
|
||||
{
|
||||
wStream* cur = pool->uArray[index];
|
||||
struct s_StreamPoolEntry* cur = &pool->uArray[index];
|
||||
|
||||
if ((ptr >= Stream_Buffer(cur)) && (ptr < (Stream_Buffer(cur) + Stream_Capacity(cur))))
|
||||
if ((ptr >= Stream_Buffer(cur->s)) &&
|
||||
(ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
|
||||
{
|
||||
s = cur;
|
||||
s = cur->s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -333,21 +364,34 @@ void StreamPool_Clear(wStreamPool* pool)
|
||||
{
|
||||
StreamPool_Lock(pool);
|
||||
|
||||
while (pool->aSize > 0)
|
||||
for (size_t x = 0; x < pool->aSize; x++)
|
||||
{
|
||||
wStream* s = pool->aArray[--pool->aSize];
|
||||
Stream_Free(s, s->isAllocatedStream);
|
||||
struct s_StreamPoolEntry* cur = &pool->aArray[x];
|
||||
discard_entry(cur, TRUE);
|
||||
}
|
||||
|
||||
while (pool->uSize > 0)
|
||||
if (pool->uSize > 0)
|
||||
{
|
||||
wStream* s = pool->uArray[--pool->uSize];
|
||||
Stream_Free(s, s->isAllocatedStream);
|
||||
WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
|
||||
pool->uSize);
|
||||
for (size_t x = 0; x < pool->uSize; x++)
|
||||
{
|
||||
struct s_StreamPoolEntry* cur = &pool->uArray[x];
|
||||
discard_entry(cur, TRUE);
|
||||
}
|
||||
}
|
||||
|
||||
StreamPool_Unlock(pool);
|
||||
}
|
||||
|
||||
size_t StreamPool_UsedCount(wStreamPool* pool)
|
||||
{
|
||||
StreamPool_Lock(pool);
|
||||
size_t usize = pool->uSize;
|
||||
StreamPool_Unlock(pool);
|
||||
return usize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construction, Destruction
|
||||
*/
|
||||
@@ -388,8 +432,8 @@ void StreamPool_Free(wStreamPool* pool)
|
||||
|
||||
DeleteCriticalSection(&pool->lock);
|
||||
|
||||
free((void*)pool->aArray);
|
||||
free((void*)pool->uArray);
|
||||
free(pool->aArray);
|
||||
free(pool->uArray);
|
||||
|
||||
free(pool);
|
||||
}
|
||||
@@ -401,10 +445,84 @@ char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
|
||||
|
||||
if (!buffer || (size < 1))
|
||||
return NULL;
|
||||
(void)_snprintf(buffer, size - 1,
|
||||
"aSize =%" PRIuz ", uSize =%" PRIuz "aCapacity=%" PRIuz
|
||||
", uCapacity=%" PRIuz,
|
||||
pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
|
||||
buffer[size - 1] = '\0';
|
||||
|
||||
size_t used = 0;
|
||||
int offset = _snprintf(buffer, size - 1,
|
||||
"aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
|
||||
", uCapacity=%" PRIuz,
|
||||
pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
|
||||
if ((offset > 0) && (offset < size))
|
||||
used += (size_t)offset;
|
||||
|
||||
#if defined(WITH_STREAMPOOL_DEBUG)
|
||||
StreamPool_Lock(pool);
|
||||
|
||||
offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
|
||||
if ((offset > 0) && (offset < size - used))
|
||||
used += (size_t)offset;
|
||||
for (size_t x = 0; x < pool->uSize; x++)
|
||||
{
|
||||
const struct s_StreamPoolEntry* cur = &pool->uArray[x];
|
||||
WINPR_ASSERT(cur->msg || (cur->lines == 0));
|
||||
|
||||
for (size_t y = 0; y < cur->lines; y++)
|
||||
{
|
||||
offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
|
||||
y, cur->msg[y]);
|
||||
if ((offset > 0) && (offset < size - used))
|
||||
used += (size_t)offset;
|
||||
}
|
||||
}
|
||||
|
||||
offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
|
||||
if ((offset > 0) && (offset < size - used))
|
||||
used += (size_t)offset;
|
||||
|
||||
struct s_StreamPoolEntry entry = { 0 };
|
||||
void* stack = winpr_backtrace(20);
|
||||
if (stack)
|
||||
entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
|
||||
winpr_backtrace_free(stack);
|
||||
|
||||
for (size_t x = 0; x < entry.lines; x++)
|
||||
{
|
||||
const char* msg = entry.msg[x];
|
||||
offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
|
||||
if ((offset > 0) && (offset < size - used))
|
||||
used += (size_t)offset;
|
||||
}
|
||||
free((void*)entry.msg);
|
||||
StreamPool_Unlock(pool);
|
||||
#endif
|
||||
buffer[used] = '\0';
|
||||
return buffer;
|
||||
}
|
||||
|
||||
BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
|
||||
{
|
||||
wLog* log = WLog_Get(TAG);
|
||||
|
||||
/* HACK: We disconnected the transport above, now wait without a read or write lock until all
|
||||
* streams in use have been returned to the pool. */
|
||||
while (timeoutMS > 0)
|
||||
{
|
||||
const size_t used = StreamPool_UsedCount(pool);
|
||||
if (used == 0)
|
||||
return TRUE;
|
||||
WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
|
||||
|
||||
char buffer[4096] = { 0 };
|
||||
StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
|
||||
WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
|
||||
|
||||
UINT32 diff = 10;
|
||||
if (timeoutMS != INFINITE)
|
||||
{
|
||||
diff = timeoutMS > 10 ? 10 : timeoutMS;
|
||||
timeoutMS -= diff;
|
||||
}
|
||||
Sleep(diff);
|
||||
}
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
@@ -72,6 +72,10 @@ int TestStreamPool(int argc, char* argv[])
|
||||
|
||||
printf("%s\n", StreamPool_GetStatistics(pool, buffer, sizeof(buffer)));
|
||||
|
||||
Stream_Release(s[2]);
|
||||
Stream_Release(s[3]);
|
||||
Stream_Release(s[4]);
|
||||
|
||||
StreamPool_Free(pool);
|
||||
|
||||
return 0;
|
||||
|
||||
Reference in New Issue
Block a user