From 21796ad73d080477d8440f215403c84ae61b1330 Mon Sep 17 00:00:00 2001 From: Vic Lee Date: Sun, 4 Aug 2013 17:23:32 +0800 Subject: [PATCH 1/4] libfreerdp-core/fastpath: fix memory leak when sending large packet. --- libfreerdp/core/fastpath.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libfreerdp/core/fastpath.c b/libfreerdp/core/fastpath.c index 872b57e51..844816024 100644 --- a/libfreerdp/core/fastpath.c +++ b/libfreerdp/core/fastpath.c @@ -856,6 +856,8 @@ BOOL fastpath_send_update_pdu(rdpFastPath* fastpath, BYTE updateCode, wStream* s comp_flags = FASTPATH_OUTPUT_COMPRESSION_USED; header_bytes = 7 + sec_bytes; bm = (BYTE*) (rdp->mppc_enc->outputBuffer - header_bytes); + if (comp_update) + Stream_Free(comp_update, FALSE); comp_update = Stream_New(bm, pdu_data_bytes + header_bytes); ls = comp_update; } @@ -902,6 +904,8 @@ BOOL fastpath_send_update_pdu(rdpFastPath* fastpath, BYTE updateCode, wStream* s Stream_Write_UINT16(ls, pdu_data_bytes); + if (update) + Stream_Free(update, FALSE); update = Stream_New(bm, pduLength); Stream_Seek(update, pduLength); From 2b25b4a52014d160aea370a7afba9aa1cff18c3f Mon Sep 17 00:00:00 2001 From: Norbert Federa Date: Wed, 7 Aug 2013 10:20:04 +0200 Subject: [PATCH 2/4] libwinpr-sync: New complete critical section code - Complete implementation including recursion support - Added an intensive ctest (TestSynchCritical) - Struct members are used exactly as Windows does it internally: LockCount starts at -1, RecursionCount at 0 - Same performance optimizations as internally on Windows: - Fast lock acquisition path using CAS -> SpinCount -> wait - SpinCount automatically disabled on uniprocessor systems - On Linux SpinCount is disabled because it provided no advantage over NPTL/futex in all tests Support for CRITICAL_SECTION's DebugInfo is not yet included (but trivial to add). --- winpr/include/winpr/synch.h | 16 +- winpr/libwinpr/interlocked/CMakeLists.txt | 2 +- winpr/libwinpr/synch/CMakeLists.txt | 5 +- winpr/libwinpr/synch/critical.c | 197 +++++++++-- winpr/libwinpr/synch/test/.gitignore | 3 + winpr/libwinpr/synch/test/CMakeLists.txt | 31 ++ winpr/libwinpr/synch/test/TestSynchCritical.c | 329 ++++++++++++++++++ 7 files changed, 539 insertions(+), 44 deletions(-) create mode 100644 winpr/libwinpr/synch/test/.gitignore create mode 100644 winpr/libwinpr/synch/test/CMakeLists.txt create mode 100644 winpr/libwinpr/synch/test/TestSynchCritical.c diff --git a/winpr/include/winpr/synch.h b/winpr/include/winpr/synch.h index c9bb0ad39..fd083ae71 100644 --- a/winpr/include/winpr/synch.h +++ b/winpr/include/winpr/synch.h @@ -139,14 +139,22 @@ typedef RTL_CONDITION_VARIABLE CONDITION_VARIABLE, *PCONDITION_VARIABLE; /* Critical Section */ +#if defined(__linux__) +/** + * Linux NPTL thread synchronization primitives are implemented using + * the futex system calls ... we can't beat futex with a spin loop. + */ +#define WINPR_CRITICAL_SECTION_DISABLE_SPINCOUNT +#endif + typedef struct _RTL_CRITICAL_SECTION { - void* DebugInfo; + PVOID DebugInfo; LONG LockCount; LONG RecursionCount; - PVOID OwningThread; - PVOID LockSemaphore; - ULONG SpinCount; + HANDLE OwningThread; + HANDLE LockSemaphore; + ULONG_PTR SpinCount; } RTL_CRITICAL_SECTION, *PRTL_CRITICAL_SECTION; typedef RTL_CRITICAL_SECTION CRITICAL_SECTION; diff --git a/winpr/libwinpr/interlocked/CMakeLists.txt b/winpr/libwinpr/interlocked/CMakeLists.txt index 9a5278112..b944c5d7c 100644 --- a/winpr/libwinpr/interlocked/CMakeLists.txt +++ b/winpr/libwinpr/interlocked/CMakeLists.txt @@ -34,7 +34,7 @@ set_target_properties(${MODULE_NAME} PROPERTIES VERSION ${WINPR_VERSION_FULL} SO set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS MONOLITHIC ${MONOLITHIC_BUILD} INTERNAL MODULE winpr - MODULES winpr-crt winpr-handle winpr-synch) + MODULES winpr-crt winpr-handle) if(MONOLITHIC_BUILD) diff --git a/winpr/libwinpr/synch/CMakeLists.txt b/winpr/libwinpr/synch/CMakeLists.txt index 1554511a6..cd93e84f8 100644 --- a/winpr/libwinpr/synch/CMakeLists.txt +++ b/winpr/libwinpr/synch/CMakeLists.txt @@ -53,7 +53,7 @@ endif() set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS MONOLITHIC ${MONOLITHIC_BUILD} INTERNAL MODULE winpr - MODULES winpr-handle) + MODULES winpr-handle winpr-interlocked winpr-thread) if(MONOLITHIC_BUILD) set(WINPR_LIBS ${WINPR_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE) @@ -64,3 +64,6 @@ endif() set_property(TARGET ${MODULE_NAME} PROPERTY FOLDER "WinPR") +if(BUILD_TESTING) + add_subdirectory(test) +endif() diff --git a/winpr/libwinpr/synch/critical.c b/winpr/libwinpr/synch/critical.c index 3466a6672..a547bd24e 100644 --- a/winpr/libwinpr/synch/critical.c +++ b/winpr/libwinpr/synch/critical.c @@ -3,6 +3,7 @@ * Synchronization Functions * * Copyright 2012 Marc-Andre Moreau + * Copyright 2013 Norbert Federa * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +23,9 @@ #endif #include +#include +#include +#include #include "synch.h" @@ -31,84 +35,201 @@ #ifndef _WIN32 -/** - * TODO: EnterCriticalSection allows recursive calls from the same thread. - * Implement this using pthreads (see PTHREAD_MUTEX_RECURSIVE_NP) - * http://msdn.microsoft.com/en-us/library/windows/desktop/ms682608%28v=vs.85%29.aspx - */ - VOID InitializeCriticalSection(LPCRITICAL_SECTION lpCriticalSection) { - if (lpCriticalSection) - { - lpCriticalSection->DebugInfo = NULL; - - lpCriticalSection->LockCount = 0; - lpCriticalSection->RecursionCount = 0; - lpCriticalSection->SpinCount = 0; - - lpCriticalSection->OwningThread = NULL; - lpCriticalSection->LockSemaphore = NULL; - - lpCriticalSection->LockSemaphore = malloc(sizeof(pthread_mutex_t)); - pthread_mutex_init(lpCriticalSection->LockSemaphore, NULL); - } + InitializeCriticalSectionEx(lpCriticalSection, 0, 0); } BOOL InitializeCriticalSectionEx(LPCRITICAL_SECTION lpCriticalSection, DWORD dwSpinCount, DWORD Flags) { + /** + * See http://msdn.microsoft.com/en-us/library/ff541979(v=vs.85).aspx + * - The LockCount field indicates the number of times that any thread has + * called the EnterCriticalSection routine for this critical section, + * minus one. This field starts at -1 for an unlocked critical section. + * Each call of EnterCriticalSection increments this value; each call of + * LeaveCriticalSection decrements it. + * - The RecursionCount field indicates the number of times that the owning + * thread has called EnterCriticalSection for this critical section. + */ + if (Flags != 0) { fprintf(stderr, "warning: InitializeCriticalSectionEx Flags unimplemented\n"); } - return InitializeCriticalSectionAndSpinCount(lpCriticalSection, dwSpinCount); + + lpCriticalSection->DebugInfo = NULL; + lpCriticalSection->LockCount = -1; + lpCriticalSection->SpinCount = 0; + lpCriticalSection->RecursionCount = 0; + lpCriticalSection->OwningThread = NULL; + + lpCriticalSection->LockSemaphore = (winpr_sem_t*) malloc(sizeof(winpr_sem_t)); +#if defined(__APPLE__) + semaphore_create(mach_task_self(), lpCriticalSection->LockSemaphore, SYNC_POLICY_FIFO, 0); +#else + sem_init(lpCriticalSection->LockSemaphore, 0, 0); +#endif + + SetCriticalSectionSpinCount(lpCriticalSection, dwSpinCount); + + return TRUE; } BOOL InitializeCriticalSectionAndSpinCount(LPCRITICAL_SECTION lpCriticalSection, DWORD dwSpinCount) { - InitializeCriticalSection(lpCriticalSection); - SetCriticalSectionSpinCount(lpCriticalSection, dwSpinCount); - return TRUE; + return InitializeCriticalSectionEx(lpCriticalSection, dwSpinCount, 0); } DWORD SetCriticalSectionSpinCount(LPCRITICAL_SECTION lpCriticalSection, DWORD dwSpinCount) { +#if !defined(WINPR_CRITICAL_SECTION_DISABLE_SPINCOUNT) + SYSTEM_INFO sysinfo; DWORD dwPreviousSpinCount = lpCriticalSection->SpinCount; + + if (dwSpinCount) + { + /* Don't spin on uniprocessor systems! */ + GetNativeSystemInfo(&sysinfo); + if (sysinfo.dwNumberOfProcessors < 2) + dwSpinCount = 0; + } lpCriticalSection->SpinCount = dwSpinCount; return dwPreviousSpinCount; +#else + return 0; +#endif +} + +VOID _WaitForCriticalSection(LPCRITICAL_SECTION lpCriticalSection) +{ +#if defined(__APPLE__) + semaphore_wait(*((winpr_sem_t*) lpCriticalSection->LockSemaphore)); +#else + sem_wait((winpr_sem_t*) lpCriticalSection->LockSemaphore); +#endif +} + +VOID _UnWaitCriticalSection(LPCRITICAL_SECTION lpCriticalSection) +{ +#if defined __APPLE__ + semaphore_signal(*((winpr_sem_t*) lpCriticalSection->LockSemaphore)); +#else + sem_post((winpr_sem_t*) lpCriticalSection->LockSemaphore); +#endif } VOID EnterCriticalSection(LPCRITICAL_SECTION lpCriticalSection) { - /** - * Linux NPTL thread synchronization primitives are implemented using - * the futex system calls ... no need for performing a trylock loop. - */ -#if !defined(__linux__) - ULONG spin = lpCriticalSection->SpinCount; - while (spin--) +#if !defined(WINPR_CRITICAL_SECTION_DISABLE_SPINCOUNT) + ULONG SpinCount = lpCriticalSection->SpinCount; + + /* If we're lucky or if the current thread is already owner we can return early */ + if (SpinCount && TryEnterCriticalSection(lpCriticalSection)) + return; + + /* Spin requested times but don't compete with another waiting thread */ + while (SpinCount-- && lpCriticalSection->LockCount < 1) { - if (pthread_mutex_trylock((pthread_mutex_t*)lpCriticalSection->LockSemaphore) == 0) + /* Atomically try to acquire and check the if the section is free. */ + if (InterlockedCompareExchange(&lpCriticalSection->LockCount, 0, -1) == -1) + { + lpCriticalSection->RecursionCount = 1; + lpCriticalSection->OwningThread = (HANDLE)GetCurrentThreadId(); return; - pthread_yield(); + } + /* Failed to get the lock. Let the scheduler know that we're spinning. */ + if (sched_yield()!=0) + { + /** + * On some operating systems sched_yield is a stub. + * usleep should at least trigger a context switch if any thread is waiting. + * A ThreadYield() would be nice in winpr ... + */ + usleep(1); + } } + #endif - pthread_mutex_lock((pthread_mutex_t*)lpCriticalSection->LockSemaphore); + + /* First try the fastest posssible path to get the lock. */ + if (InterlockedIncrement(&lpCriticalSection->LockCount)) + { + /* Section is already locked. Check if it is owned by the current thread. */ + if (lpCriticalSection->OwningThread == (HANDLE)GetCurrentThreadId()) + { + /* Recursion. No need to wait. */ + lpCriticalSection->RecursionCount++; + return; + } + + /* Section is locked by another thread. We have to wait. */ + _WaitForCriticalSection(lpCriticalSection); + } + /* We got the lock. Own it ... */ + lpCriticalSection->RecursionCount = 1; + lpCriticalSection->OwningThread = (HANDLE)GetCurrentThreadId(); } BOOL TryEnterCriticalSection(LPCRITICAL_SECTION lpCriticalSection) { - return (pthread_mutex_trylock((pthread_mutex_t*)lpCriticalSection->LockSemaphore) == 0 ? TRUE : FALSE); + HANDLE current_thread = (HANDLE)GetCurrentThreadId(); + + /* Atomically acquire the the lock if the section is free. */ + if (InterlockedCompareExchange(&lpCriticalSection->LockCount, 0, -1 ) == -1) + { + lpCriticalSection->RecursionCount = 1; + lpCriticalSection->OwningThread = current_thread; + return TRUE; + } + + /* Section is already locked. Check if it is owned by the current thread. */ + if (lpCriticalSection->OwningThread == current_thread) + { + /* Recursion, return success */ + lpCriticalSection->RecursionCount++; + InterlockedIncrement(&lpCriticalSection->LockCount); + return TRUE; + } + + return FALSE; } VOID LeaveCriticalSection(LPCRITICAL_SECTION lpCriticalSection) { - pthread_mutex_unlock((pthread_mutex_t*)lpCriticalSection->LockSemaphore); + /* Decrement RecursionCount and check if this is the last LeaveCriticalSection call ...*/ + if (--lpCriticalSection->RecursionCount < 1) + { + /* Last recursion, clear owner, unlock and if there are other waiting threads ... */ + lpCriticalSection->OwningThread = NULL; + if (InterlockedDecrement(&lpCriticalSection->LockCount) >= 0) + { + /* ...signal the semaphore to unblock the next waiting thread */ + _UnWaitCriticalSection(lpCriticalSection); + } + } + else + { + InterlockedDecrement(&lpCriticalSection->LockCount); + } } VOID DeleteCriticalSection(LPCRITICAL_SECTION lpCriticalSection) { - pthread_mutex_destroy((pthread_mutex_t*)lpCriticalSection->LockSemaphore); - free(lpCriticalSection->LockSemaphore); + lpCriticalSection->LockCount = -1; + lpCriticalSection->SpinCount = 0; + lpCriticalSection->RecursionCount = 0; + lpCriticalSection->OwningThread = NULL; + + if (lpCriticalSection->LockSemaphore != NULL) + { +#if defined __APPLE__ + semaphore_destroy(mach_task_self(), *((winpr_sem_t*) lpCriticalSection->LockSemaphore)); +#else + sem_destroy((winpr_sem_t*) lpCriticalSection->LockSemaphore); +#endif + free(lpCriticalSection->LockSemaphore); + lpCriticalSection->LockSemaphore = NULL; + } } #endif diff --git a/winpr/libwinpr/synch/test/.gitignore b/winpr/libwinpr/synch/test/.gitignore new file mode 100644 index 000000000..7107fa9df --- /dev/null +++ b/winpr/libwinpr/synch/test/.gitignore @@ -0,0 +1,3 @@ +TestSynch +TestSynch.c + diff --git a/winpr/libwinpr/synch/test/CMakeLists.txt b/winpr/libwinpr/synch/test/CMakeLists.txt new file mode 100644 index 000000000..d26d32935 --- /dev/null +++ b/winpr/libwinpr/synch/test/CMakeLists.txt @@ -0,0 +1,31 @@ + +set(MODULE_NAME "TestSynch") +set(MODULE_PREFIX "TEST_SYNCH") + +set(${MODULE_PREFIX}_DRIVER ${MODULE_NAME}.c) + +set(${MODULE_PREFIX}_TESTS + TestSynchCritical.c) + +create_test_sourcelist(${MODULE_PREFIX}_SRCS + ${${MODULE_PREFIX}_DRIVER} + ${${MODULE_PREFIX}_TESTS}) + +add_executable(${MODULE_NAME} ${${MODULE_PREFIX}_SRCS}) + +set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS + MONOLITHIC ${MONOLITHIC_BUILD} + MODULE winpr + MODULES winpr-synch winpr-sysinfo) + +target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS}) + +set_target_properties(${MODULE_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${TESTING_OUTPUT_DIRECTORY}") + +foreach(test ${${MODULE_PREFIX}_TESTS}) + get_filename_component(TestName ${test} NAME_WE) + add_test(${TestName} ${TESTING_OUTPUT_DIRECTORY}/${MODULE_NAME} ${TestName}) +endforeach() + +set_property(TARGET ${MODULE_NAME} PROPERTY FOLDER "WinPR/Test") + diff --git a/winpr/libwinpr/synch/test/TestSynchCritical.c b/winpr/libwinpr/synch/test/TestSynchCritical.c new file mode 100644 index 000000000..6ec5ca284 --- /dev/null +++ b/winpr/libwinpr/synch/test/TestSynchCritical.c @@ -0,0 +1,329 @@ + +#include +#include +#include +#include +#include +#include +#include + + +#define TEST_SYNC_CRITICAL_TEST1_RUNTIME_MS 500 +#define TEST_SYNC_CRITICAL_TEST1_RUNS 4 + +CRITICAL_SECTION critical; +LONG gTestValueVulnerable = 0; +LONG gTestValueSerialized = 0; + +BOOL TestSynchCritical_TriggerAndCheckRaceCondition(HANDLE OwningThread, LONG RecursionCount) +{ + /* if called unprotected this will hopefully trigger a race condition ... */ + gTestValueVulnerable++; + + if (critical.OwningThread != OwningThread) + { + printf("CriticalSection failure: OwningThread is invalid\n"); + return FALSE; + } + if (critical.RecursionCount != RecursionCount) + { + printf("CriticalSection failure: RecursionCount is invalid\n"); + return FALSE; + } + + /* ... which we try to detect using the serialized counter */ + if (gTestValueVulnerable != InterlockedIncrement(&gTestValueSerialized)) + { + printf("CriticalSection failure: Data corruption detected\n"); + return FALSE; + } + + return TRUE; +} + +/* this thread function shall increment the global dwTestValue until the PBOOL passsed in arg is FALSE */ +static PVOID TestSynchCritical_Test1(PVOID arg) +{ + int i, j, rc; + HANDLE hThread = (HANDLE)GetCurrentThreadId(); + + PBOOL pbContinueRunning = (PBOOL)arg; + + while(*pbContinueRunning) + { + EnterCriticalSection(&critical); + + rc = 1; + + if (!TestSynchCritical_TriggerAndCheckRaceCondition(hThread, rc)) + return (PVOID)1; + + /* add some random recursion level */ + j = rand()%5; + for (i=0; i 1) + dwSpinCountExpected = dwSpinCount+1; +#endif + if (dwPreviousSpinCount != dwSpinCountExpected) + { + printf("CriticalSection failure: SetCriticalSectionSpinCount returned %lu (expected: %lu)\n", dwPreviousSpinCount, dwSpinCountExpected); + goto fail; + } + + DeleteCriticalSection(&critical); + + if (dwSpinCount%2==0) + InitializeCriticalSectionAndSpinCount(&critical, dwSpinCount); + else + InitializeCriticalSectionEx(&critical, dwSpinCount, 0); + } + DeleteCriticalSection(&critical); + + + /** + * Test single-threaded recursive TryEnterCriticalSection/EnterCriticalSection/LeaveCriticalSection + * + */ + + InitializeCriticalSection(&critical); + + for (i=0; i<1000; i++) + { + if (critical.RecursionCount != i) + { + printf("CriticalSection failure: RecursionCount field is %ld instead of %d.\n", critical.RecursionCount, i); + goto fail; + } + if (i%2==0) + { + EnterCriticalSection(&critical); + } + else + { + if (TryEnterCriticalSection(&critical) == FALSE) + { + printf("CriticalSection failure: TryEnterCriticalSection failed where it should not.\n"); + goto fail; + } + } + if (critical.OwningThread != hMainThread) + { + printf("CriticalSection failure: Could not verify section ownership (loop index=%d).\n", i); + goto fail; + } + } + while (--i >= 0) + { + LeaveCriticalSection(&critical); + if (critical.RecursionCount != i) + { + printf("CriticalSection failure: RecursionCount field is %ld instead of %d.\n", critical.RecursionCount, i); + goto fail; + } + if (critical.OwningThread != (HANDLE)(i ? hMainThread : NULL)) + { + printf("CriticalSection failure: Could not verify section ownership (loop index=%d).\n", i); + goto fail; + } + } + DeleteCriticalSection(&critical); + + + /** + * Test using multiple threads modifying the same value + */ + + dwThreadCount = sysinfo.dwNumberOfProcessors > 1 ? sysinfo.dwNumberOfProcessors : 2; + + hThreads = (HANDLE*)calloc(dwThreadCount, sizeof(HANDLE)); + + for (j=0; j < TEST_SYNC_CRITICAL_TEST1_RUNS; j++) + { + dwSpinCount = j * 1000; + InitializeCriticalSectionAndSpinCount(&critical, dwSpinCount); + + gTestValueVulnerable = 0; + gTestValueSerialized = 0; + + /* the TestSynchCritical_Test1 threads shall run until bTest1Running is FALSE */ + bTest1Running = TRUE; + for (i=0; i Date: Wed, 7 Aug 2013 17:10:43 +0200 Subject: [PATCH 3/4] libwinpr-utils: Use criticalsection with spincount Use InitializeCriticalSectionAndSpinCount instead of IntializeCriticalSection. Using spin counts for critical sections of short duration enables the calling thread to avoid the wait operation in most situations which can dramatically improve the overall performance on multiprocessor systems. On Linux this change has no effect because the new winpr critical section implementation does not use the SpinCount field under Linux because the NPTL synchronization primitives are implemented using the extremely performant futex system calls which have this magic already built in. However, on Mac OS X this change improved the overall performance of the multithreaded RemoteFX decoder by 25 percent. I've used a SpinCount of 4000 which avoided 99 percent of the wait calls. This value is also used by Microsoft's heap manager for its per-heap critical sections. Note: This change requires pull request #1397 to be merged. --- winpr/libwinpr/utils/collections/ArrayList.c | 2 +- winpr/libwinpr/utils/collections/BufferPool.c | 2 +- winpr/libwinpr/utils/collections/CountdownEvent.c | 2 +- winpr/libwinpr/utils/collections/MessageQueue.c | 2 +- winpr/libwinpr/utils/collections/ObjectPool.c | 2 +- winpr/libwinpr/utils/collections/PubSub.c | 2 +- winpr/libwinpr/utils/collections/Queue.c | 2 +- winpr/libwinpr/utils/collections/Reference.c | 2 +- winpr/libwinpr/utils/collections/Stack.c | 2 +- winpr/libwinpr/utils/collections/StreamPool.c | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/winpr/libwinpr/utils/collections/ArrayList.c b/winpr/libwinpr/utils/collections/ArrayList.c index 40fec723d..a337e2169 100644 --- a/winpr/libwinpr/utils/collections/ArrayList.c +++ b/winpr/libwinpr/utils/collections/ArrayList.c @@ -390,7 +390,7 @@ wArrayList* ArrayList_New(BOOL synchronized) arrayList->array = (void**) malloc(sizeof(void*) * arrayList->capacity); - InitializeCriticalSection(&arrayList->lock); + InitializeCriticalSectionAndSpinCount(&arrayList->lock, 4000); ZeroMemory(&arrayList->object, sizeof(wObject)); } diff --git a/winpr/libwinpr/utils/collections/BufferPool.c b/winpr/libwinpr/utils/collections/BufferPool.c index 3a8c317e9..b4f22b712 100644 --- a/winpr/libwinpr/utils/collections/BufferPool.c +++ b/winpr/libwinpr/utils/collections/BufferPool.c @@ -134,7 +134,7 @@ wBufferPool* BufferPool_New(BOOL synchronized, int fixedSize, DWORD alignment) pool->synchronized = synchronized; if (pool->synchronized) - InitializeCriticalSection(&pool->lock); + InitializeCriticalSectionAndSpinCount(&pool->lock, 4000); if (!pool->fixedSize) { diff --git a/winpr/libwinpr/utils/collections/CountdownEvent.c b/winpr/libwinpr/utils/collections/CountdownEvent.c index 0f11bb2ec..ea50f4219 100644 --- a/winpr/libwinpr/utils/collections/CountdownEvent.c +++ b/winpr/libwinpr/utils/collections/CountdownEvent.c @@ -158,7 +158,7 @@ wCountdownEvent* CountdownEvent_New(DWORD initialCount) { countdown->count = initialCount; countdown->initialCount = initialCount; - InitializeCriticalSection(&countdown->lock); + InitializeCriticalSectionAndSpinCount(&countdown->lock, 4000); countdown->event = CreateEvent(NULL, TRUE, FALSE, NULL); if (countdown->count == 0) diff --git a/winpr/libwinpr/utils/collections/MessageQueue.c b/winpr/libwinpr/utils/collections/MessageQueue.c index 871316ab0..e7b8a5e84 100644 --- a/winpr/libwinpr/utils/collections/MessageQueue.c +++ b/winpr/libwinpr/utils/collections/MessageQueue.c @@ -194,7 +194,7 @@ wMessageQueue* MessageQueue_New() queue->array = (wMessage*) malloc(sizeof(wMessage) * queue->capacity); ZeroMemory(queue->array, sizeof(wMessage) * queue->capacity); - InitializeCriticalSection(&queue->lock); + InitializeCriticalSectionAndSpinCount(&queue->lock, 4000); queue->event = CreateEvent(NULL, TRUE, FALSE, NULL); } diff --git a/winpr/libwinpr/utils/collections/ObjectPool.c b/winpr/libwinpr/utils/collections/ObjectPool.c index 488559085..c0e36266f 100644 --- a/winpr/libwinpr/utils/collections/ObjectPool.c +++ b/winpr/libwinpr/utils/collections/ObjectPool.c @@ -119,7 +119,7 @@ wObjectPool* ObjectPool_New(BOOL synchronized) pool->synchronized = synchronized; if (pool->synchronized) - InitializeCriticalSection(&pool->lock); + InitializeCriticalSectionAndSpinCount(&pool->lock, 4000); pool->size = 0; pool->capacity = 32; diff --git a/winpr/libwinpr/utils/collections/PubSub.c b/winpr/libwinpr/utils/collections/PubSub.c index 26c89968a..abd29098c 100644 --- a/winpr/libwinpr/utils/collections/PubSub.c +++ b/winpr/libwinpr/utils/collections/PubSub.c @@ -202,7 +202,7 @@ wPubSub* PubSub_New(BOOL synchronized) pubSub->synchronized = synchronized; if (pubSub->synchronized) - InitializeCriticalSection(&pubSub->lock); + InitializeCriticalSectionAndSpinCount(&pubSub->lock, 4000); pubSub->count = 0; pubSub->size = 64; diff --git a/winpr/libwinpr/utils/collections/Queue.c b/winpr/libwinpr/utils/collections/Queue.c index 95210649d..7a729622e 100644 --- a/winpr/libwinpr/utils/collections/Queue.c +++ b/winpr/libwinpr/utils/collections/Queue.c @@ -243,7 +243,7 @@ wQueue* Queue_New(BOOL synchronized, int capacity, int growthFactor) queue->array = (void**) malloc(sizeof(void*) * queue->capacity); ZeroMemory(queue->array, sizeof(void*) * queue->capacity); - InitializeCriticalSection(&queue->lock); + InitializeCriticalSectionAndSpinCount(&queue->lock, 4000); queue->event = CreateEvent(NULL, TRUE, FALSE, NULL); ZeroMemory(&queue->object, sizeof(wObject)); diff --git a/winpr/libwinpr/utils/collections/Reference.c b/winpr/libwinpr/utils/collections/Reference.c index 6c32ffeac..171123661 100644 --- a/winpr/libwinpr/utils/collections/Reference.c +++ b/winpr/libwinpr/utils/collections/Reference.c @@ -154,7 +154,7 @@ wReferenceTable* ReferenceTable_New(BOOL synchronized, void* context, REFERENCE_ ZeroMemory(referenceTable->array, sizeof(wReference) * referenceTable->size); referenceTable->synchronized = synchronized; - InitializeCriticalSection(&referenceTable->lock); + InitializeCriticalSectionAndSpinCount(&referenceTable->lock, 4000); } return referenceTable; diff --git a/winpr/libwinpr/utils/collections/Stack.c b/winpr/libwinpr/utils/collections/Stack.c index a0d063eee..5ddb18088 100644 --- a/winpr/libwinpr/utils/collections/Stack.c +++ b/winpr/libwinpr/utils/collections/Stack.c @@ -153,7 +153,7 @@ wStack* Stack_New(BOOL synchronized) stack->synchronized = synchronized; if (stack->synchronized) - InitializeCriticalSection(&stack->lock); + InitializeCriticalSectionAndSpinCount(&stack->lock, 4000); stack->size = 0; stack->capacity = 32; diff --git a/winpr/libwinpr/utils/collections/StreamPool.c b/winpr/libwinpr/utils/collections/StreamPool.c index 418681c61..4df2fe8b3 100644 --- a/winpr/libwinpr/utils/collections/StreamPool.c +++ b/winpr/libwinpr/utils/collections/StreamPool.c @@ -323,7 +323,7 @@ wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize) pool->synchronized = synchronized; pool->defaultSize = defaultSize; - InitializeCriticalSection(&pool->lock); + InitializeCriticalSectionAndSpinCount(&pool->lock, 4000); pool->aSize = 0; pool->aCapacity = 32; From 0d916527bccc72f44443bf756982ae3f69f77a27 Mon Sep 17 00:00:00 2001 From: Norbert Federa Date: Sun, 4 Aug 2013 12:07:53 +0200 Subject: [PATCH 4/4] codec/rfx: added multithreaded encoder --- libfreerdp/codec/rfx.c | 102 +++++++++++++++++++++++++++++++---- libfreerdp/codec/rfx_types.h | 1 + 2 files changed, 93 insertions(+), 10 deletions(-) diff --git a/libfreerdp/codec/rfx.c b/libfreerdp/codec/rfx.c index f09029e3d..1a4480f13 100644 --- a/libfreerdp/codec/rfx.c +++ b/libfreerdp/codec/rfx.c @@ -227,6 +227,8 @@ RFX_CONTEXT* rfx_context_new(void) if (context->priv->MaxThreadCount) SetThreadpoolThreadMaximum(context->priv->ThreadPool, context->priv->MaxThreadCount); + + context->priv->EncoderStreamPool = StreamPool_New(TRUE, 64*64*3+19); } /* initialize the default pixel format */ @@ -261,6 +263,7 @@ void rfx_context_free(RFX_CONTEXT* context) { CloseThreadpool(context->priv->ThreadPool); DestroyThreadpoolEnvironment(&context->priv->ThreadPoolEnv); + StreamPool_Free(context->priv->EncoderStreamPool); #ifdef WITH_PROFILER fprintf(stderr, "\nWARNING: Profiling results probably unusable with multithreaded RemoteFX codec!\n"); #endif @@ -582,17 +585,17 @@ static BOOL rfx_process_message_tile(RFX_CONTEXT* context, RFX_TILE* tile, wStre tile->data, 64 * 4); } -struct _RFX_TILE_WORK_PARAM +struct _RFX_TILE_PROCESS_WORK_PARAM { wStream s; RFX_TILE* tile; RFX_CONTEXT* context; }; -typedef struct _RFX_TILE_WORK_PARAM RFX_TILE_WORK_PARAM; +typedef struct _RFX_TILE_PROCESS_WORK_PARAM RFX_TILE_PROCESS_WORK_PARAM; void CALLBACK rfx_process_message_tile_work_callback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work) { - RFX_TILE_WORK_PARAM* param = (RFX_TILE_WORK_PARAM*) context; + RFX_TILE_PROCESS_WORK_PARAM* param = (RFX_TILE_PROCESS_WORK_PARAM*) context; rfx_process_message_tile(param->context, param->tile, &(param->s)); } @@ -607,7 +610,7 @@ static BOOL rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa UINT32 blockType; UINT32 tilesDataSize; PTP_WORK* work_objects = NULL; - RFX_TILE_WORK_PARAM* params = NULL; + RFX_TILE_PROCESS_WORK_PARAM* params = NULL; if (Stream_GetRemainingLength(s) < 14) { @@ -691,7 +694,7 @@ static BOOL rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa if (context->priv->UseThreads) { work_objects = (PTP_WORK*) malloc(sizeof(PTP_WORK) * message->num_tiles); - params = (RFX_TILE_WORK_PARAM*) malloc(sizeof(RFX_TILE_WORK_PARAM) * message->num_tiles); + params = (RFX_TILE_PROCESS_WORK_PARAM*) malloc(sizeof(RFX_TILE_PROCESS_WORK_PARAM) * message->num_tiles); } /* tiles */ @@ -1043,6 +1046,33 @@ static void rfx_compose_message_tile(RFX_CONTEXT* context, wStream* s, Stream_SetPosition(s, end_pos); } + +struct _RFX_TILE_COMPOSE_WORK_PARAM +{ + RFX_CONTEXT* context; + wStream *s; + BYTE* tile_data; + int tile_width; + int tile_height; + int rowstride; + UINT32* quantVals; + int quantIdxY; + int quantIdxCb; + int quantIdxCr; + int xIdx; + int yIdx; +}; +typedef struct _RFX_TILE_COMPOSE_WORK_PARAM RFX_TILE_COMPOSE_WORK_PARAM; + +void CALLBACK rfx_compose_message_tile_work_callback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work) +{ + RFX_TILE_COMPOSE_WORK_PARAM* param = (RFX_TILE_COMPOSE_WORK_PARAM*) context; + + rfx_compose_message_tile(param->context, param->s, + param->tile_data, param->tile_width, param->tile_height, param->rowstride, + param->quantVals, param->quantIdxY, param->quantIdxCb, param->quantIdxCr, param->xIdx, param->yIdx); +} + static void rfx_compose_message_tileset(RFX_CONTEXT* context, wStream* s, BYTE* image_data, int width, int height, int rowstride) { @@ -1061,6 +1091,11 @@ static void rfx_compose_message_tileset(RFX_CONTEXT* context, wStream* s, int xIdx; int yIdx; int tilesDataSize; + BYTE* tileData; + int tileWidth; + int tileHeight; + PTP_WORK* work_objects = NULL; + RFX_TILE_COMPOSE_WORK_PARAM* params = NULL; if (context->num_quants == 0) { @@ -1109,17 +1144,64 @@ static void rfx_compose_message_tileset(RFX_CONTEXT* context, wStream* s, DEBUG_RFX("width:%d height:%d rowstride:%d", width, height, rowstride); end_pos = Stream_GetPosition(s); + + if (context->priv->UseThreads) + { + work_objects = (PTP_WORK*) malloc(sizeof(PTP_WORK) * numTiles); + params = (RFX_TILE_COMPOSE_WORK_PARAM*) malloc(sizeof(RFX_TILE_COMPOSE_WORK_PARAM) * numTiles); + } + for (yIdx = 0; yIdx < numTilesY; yIdx++) { for (xIdx = 0; xIdx < numTilesX; xIdx++) { - rfx_compose_message_tile(context, s, - image_data + yIdx * 64 * rowstride + xIdx * 8 * context->bits_per_pixel, - (xIdx < numTilesX - 1) ? 64 : width - xIdx * 64, - (yIdx < numTilesY - 1) ? 64 : height - yIdx * 64, - rowstride, quantVals, quantIdxY, quantIdxCb, quantIdxCr, xIdx, yIdx); + tileData = image_data + yIdx * 64 * rowstride + xIdx * 8 * context->bits_per_pixel; + tileWidth = (xIdx < numTilesX - 1) ? 64 : width - xIdx * 64; + tileHeight = (yIdx < numTilesY - 1) ? 64 : height - yIdx * 64; + + if (context->priv->UseThreads) + { + i = yIdx * numTilesX + xIdx; + + params[i].context = context; + params[i].s = StreamPool_Take(context->priv->EncoderStreamPool, 0); + params[i].tile_data = tileData; + params[i].tile_width = tileWidth; + params[i].tile_height = tileHeight; + params[i].rowstride = rowstride; + params[i].quantVals = (UINT32*)quantVals; + params[i].quantIdxY = quantIdxY; + params[i].quantIdxCb = quantIdxCb; + params[i].quantIdxCr = quantIdxCr; + params[i].xIdx = xIdx; + params[i].yIdx = yIdx; + + work_objects[i] = CreateThreadpoolWork((PTP_WORK_CALLBACK) rfx_compose_message_tile_work_callback, + (void*) ¶ms[i], &context->priv->ThreadPoolEnv); + + SubmitThreadpoolWork(work_objects[i]); + } + else + { + rfx_compose_message_tile(context, s, tileData, tileWidth, tileHeight, + rowstride, quantVals, quantIdxY, quantIdxCb, quantIdxCr, xIdx, yIdx); + } } } + + if (context->priv->UseThreads) + { + for (i = 0; i < numTiles; i++) + { + WaitForThreadpoolWorkCallbacks(work_objects[i], FALSE); + CloseThreadpoolWork(work_objects[i]); + Stream_Write(s, Stream_Buffer(params[i].s), Stream_GetPosition(params[i].s)); + StreamPool_Return(context->priv->EncoderStreamPool, params[i].s); + } + free(work_objects); + free(params); + } + tilesDataSize = Stream_GetPosition(s) - end_pos; size += tilesDataSize; end_pos = Stream_GetPosition(s); diff --git a/libfreerdp/codec/rfx_types.h b/libfreerdp/codec/rfx_types.h index f87dba7f6..5b875bdba 100644 --- a/libfreerdp/codec/rfx_types.h +++ b/libfreerdp/codec/rfx_types.h @@ -49,6 +49,7 @@ struct _RFX_CONTEXT_PRIV TP_CALLBACK_ENVIRON ThreadPoolEnv; wBufferPool* BufferPool; + wStreamPool* EncoderStreamPool; /* profilers */ PROFILER_DEFINE(prof_rfx_decode_rgb);