From 025b5bab682bbdcb24838e6730e939f277b031eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Moreau?= Date: Mon, 21 Jan 2013 18:33:00 -0500 Subject: [PATCH] libwinpr-pool: start implementing scheduling of asynchronous calls --- winpr/include/winpr/pool.h | 6 +- winpr/libwinpr/pool/CMakeLists.txt | 8 +- winpr/libwinpr/pool/callback.c | 4 - winpr/libwinpr/pool/callback_environment.c | 110 +++++++++++++++------ winpr/libwinpr/pool/pool.c | 105 +++++++++++++++++--- winpr/libwinpr/pool/pool.h | 72 ++++++++++++++ winpr/libwinpr/pool/test/TestPoolThread.c | 3 + winpr/libwinpr/pool/test/TestPoolWork.c | 15 ++- winpr/libwinpr/pool/work.c | 44 ++++++++- winpr/libwinpr/thread/thread.c | 4 +- 10 files changed, 308 insertions(+), 63 deletions(-) create mode 100644 winpr/libwinpr/pool/pool.h diff --git a/winpr/include/winpr/pool.h b/winpr/include/winpr/pool.h index eda92e773..0d35efc1e 100644 --- a/winpr/include/winpr/pool.h +++ b/winpr/include/winpr/pool.h @@ -26,8 +26,6 @@ #include #include -//#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600))) - #ifndef _WIN32 typedef DWORD TP_VERSION, *PTP_VERSION; @@ -176,8 +174,8 @@ VOID CloseThreadpoolCleanupGroup(PTP_CLEANUP_GROUP ptpcg); WINPR_API PTP_POOL CreateThreadpool(PVOID reserved); WINPR_API VOID CloseThreadpool(PTP_POOL ptpp); -WINPR_API VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost); WINPR_API BOOL SetThreadpoolThreadMinimum(PTP_POOL ptpp, DWORD cthrdMic); +WINPR_API VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost); /* Callback Environment */ @@ -203,8 +201,6 @@ WINPR_API VOID LeaveCriticalSectionWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci WINPR_API VOID FreeLibraryWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HMODULE mod); WINPR_API VOID DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci); -//#endif - WINPR_API void winpr_pool_dummy(); #endif /* WINPR_POOL_H */ diff --git a/winpr/libwinpr/pool/CMakeLists.txt b/winpr/libwinpr/pool/CMakeLists.txt index c42596513..e14033681 100644 --- a/winpr/libwinpr/pool/CMakeLists.txt +++ b/winpr/libwinpr/pool/CMakeLists.txt @@ -25,6 +25,7 @@ set(${MODULE_PREFIX}_SRCS io.c cleanup_group.c pool.c + pool.h callback_environment.c callback.c callback_cleanup.c) @@ -47,11 +48,14 @@ if(${CMAKE_SYSTEM_NAME} MATCHES SunOS) set(${MODULE_PREFIX}_LIBS ${${MODULE_PREFIX}_LIBS} rt) endif() +set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS + MONOLITHIC ${MONOLITHIC_BUILD} INTERNAL + MODULE winpr + MODULES winpr-thread winpr-synch winpr-utils) + if(MONOLITHIC_BUILD) set(WINPR_LIBS ${WINPR_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE) else() - set(${MODULE_PREFIX}_LIBS ${${MODULE_PREFIX}_LIBS} winpr-thread winpr-synch) - target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS}) install(TARGETS ${MODULE_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR}) endif() diff --git a/winpr/libwinpr/pool/callback.c b/winpr/libwinpr/pool/callback.c index 11e7b812c..1ae619dd8 100644 --- a/winpr/libwinpr/pool/callback.c +++ b/winpr/libwinpr/pool/callback.c @@ -24,11 +24,7 @@ #include #include -#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600))) - BOOL CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci) { return FALSE; } - -#endif diff --git a/winpr/libwinpr/pool/callback_environment.c b/winpr/libwinpr/pool/callback_environment.c index d3f85a690..19010f087 100644 --- a/winpr/libwinpr/pool/callback_environment.c +++ b/winpr/libwinpr/pool/callback_environment.c @@ -24,42 +24,94 @@ #include #include -#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600))) +#include "pool.h" -VOID InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe) +#ifdef _WIN32 + +#else + +static TP_CALLBACK_ENVIRON DEFAULT_CALLBACK_ENVIRONMENT = { + 1, /* Version */ + NULL, /* Pool */ + NULL, /* CleanupGroup */ + NULL, /* CleanupGroupCancelCallback */ + NULL, /* RaceDll */ + NULL, /* ActivationContext */ + NULL, /* FinalizationCallback */ + { 0 } /* Flags */ +}; -} - -VOID DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe) +PTP_CALLBACK_ENVIRON GetDefaultThreadpoolEnvironment() { + PTP_CALLBACK_ENVIRON environment = &DEFAULT_CALLBACK_ENVIRONMENT; -} - -VOID SetThreadpoolCallbackPool(PTP_CALLBACK_ENVIRON pcbe, PTP_POOL ptpp) -{ - -} - -VOID SetThreadpoolCallbackCleanupGroup(PTP_CALLBACK_ENVIRON pcbe, PTP_CLEANUP_GROUP ptpcg, PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng) -{ - -} - -VOID SetThreadpoolCallbackRunsLong(PTP_CALLBACK_ENVIRON pcbe) -{ - -} - -VOID SetThreadpoolCallbackLibrary(PTP_CALLBACK_ENVIRON pcbe, PVOID mod) -{ - -} - -VOID SetThreadpoolCallbackPriority(PTP_CALLBACK_ENVIRON pcbe, TP_CALLBACK_PRIORITY Priority) -{ + environment->Pool = GetDefaultThreadpool(); + return environment; } #endif +VOID InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe) +{ +#ifdef _WIN32 +#else + pcbe->Version = 1; + pcbe->Pool = NULL; + pcbe->CleanupGroup = NULL; + pcbe->CleanupGroupCancelCallback = NULL; + pcbe->RaceDll = NULL; + pcbe->ActivationContext = NULL; + pcbe->FinalizationCallback = NULL; + pcbe->u.s.LongFunction = FALSE; + pcbe->u.s.Persistent = FALSE; + pcbe->u.s.Private = 0; +#endif +} + +VOID DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe) +{ +#ifdef _WIN32 +#else +#endif +} + +VOID SetThreadpoolCallbackPool(PTP_CALLBACK_ENVIRON pcbe, PTP_POOL ptpp) +{ +#ifdef _WIN32 +#else + pcbe->Pool = ptpp; +#endif +} + +VOID SetThreadpoolCallbackCleanupGroup(PTP_CALLBACK_ENVIRON pcbe, PTP_CLEANUP_GROUP ptpcg, PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng) +{ +#ifdef _WIN32 +#else + pcbe->CleanupGroup = ptpcg; + pcbe->CleanupGroupCancelCallback = pfng; +#endif +} + +VOID SetThreadpoolCallbackRunsLong(PTP_CALLBACK_ENVIRON pcbe) +{ +#ifdef _WIN32 +#else + pcbe->u.s.LongFunction = TRUE; +#endif +} + +VOID SetThreadpoolCallbackLibrary(PTP_CALLBACK_ENVIRON pcbe, PVOID mod) +{ +#ifdef _WIN32 +#else +#endif +} + +VOID SetThreadpoolCallbackPriority(PTP_CALLBACK_ENVIRON pcbe, TP_CALLBACK_PRIORITY Priority) +{ +#ifdef _WIN32 +#else +#endif +} diff --git a/winpr/libwinpr/pool/pool.c b/winpr/libwinpr/pool/pool.c index 504866dc0..ee3f8edbf 100644 --- a/winpr/libwinpr/pool/pool.c +++ b/winpr/libwinpr/pool/pool.c @@ -24,6 +24,8 @@ #include #include +#include "pool.h" + #ifdef _WIN32 static BOOL module_initialized = FALSE; @@ -32,8 +34,8 @@ static HMODULE kernel32_module = NULL; static PTP_POOL (WINAPI * pCreateThreadpool)(PVOID reserved); static VOID (WINAPI * pCloseThreadpool)(PTP_POOL ptpp); -static VOID (WINAPI * pSetThreadpoolThreadMaximum)(PTP_POOL ptpp, DWORD cthrdMost); static BOOL (WINAPI * pSetThreadpoolThreadMinimum)(PTP_POOL ptpp, DWORD cthrdMic); +static VOID (WINAPI * pSetThreadpoolThreadMaximum)(PTP_POOL ptpp, DWORD cthrdMost); static void module_init() { @@ -49,22 +51,90 @@ static void module_init() pCreateThreadpool = (void*) GetProcAddress(kernel32_module, "CreateThreadpool"); pCloseThreadpool = (void*) GetProcAddress(kernel32_module, "CloseThreadpool"); - pSetThreadpoolThreadMaximum = (void*) GetProcAddress(kernel32_module, "SetThreadpoolThreadMaximum"); pSetThreadpoolThreadMinimum = (void*) GetProcAddress(kernel32_module, "SetThreadpoolThreadMinimum"); + pSetThreadpoolThreadMaximum = (void*) GetProcAddress(kernel32_module, "SetThreadpoolThreadMaximum"); +} + +#else + +static TP_POOL DEFAULT_POOL = +{ + 0, /* Minimum */ + 500, /* Maximum */ + NULL, /* Threads */ + 0, /* ThreadCount */ +}; + +static void* thread_pool_work_func(void* arg) +{ + PTP_POOL pool; + PTP_WORK work; + PTP_CALLBACK_INSTANCE callbackInstance; + + pool = (PTP_POOL) arg; + + while (WaitForSingleObject(Queue_Event(pool->PendingQueue), INFINITE) == WAIT_OBJECT_0) + { + callbackInstance = (PTP_CALLBACK_INSTANCE) Queue_Dequeue(pool->PendingQueue); + + if (callbackInstance) + { + work = callbackInstance->Work; + work->WorkCallback(callbackInstance, work->CallbackParameter, work); + free(callbackInstance); + } + } + + return NULL; +} + +PTP_POOL GetDefaultThreadpool() +{ + int index; + PTP_POOL pool = NULL; + + pool = &DEFAULT_POOL; + + if (!pool->Threads) + { + pool->ThreadCount = 4; + pool->Threads = (HANDLE*) malloc(pool->ThreadCount * sizeof(HANDLE)); + + pool->PendingQueue = Queue_New(TRUE, -1, -1); + + for (index = 0; index < pool->ThreadCount; index++) + { + pool->Threads[index] = CreateThread(NULL, 0, + (LPTHREAD_START_ROUTINE) thread_pool_work_func, + (void*) pool, 0, NULL); + } + } + + return pool; } #endif PTP_POOL CreateThreadpool(PVOID reserved) { + PTP_POOL pool = NULL; + #ifdef _WIN32 module_init(); if (pCreateThreadpool) return pCreateThreadpool(reserved); +#else + pool = (PTP_POOL) malloc(sizeof(TP_POOL)); + + if (pool) + { + pool->Minimum = 0; + pool->Maximum = 500; + } #endif - return NULL; + return pool; } VOID CloseThreadpool(PTP_POOL ptpp) @@ -74,16 +144,8 @@ VOID CloseThreadpool(PTP_POOL ptpp) if (pCloseThreadpool) pCloseThreadpool(ptpp); -#endif -} - -VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost) -{ -#ifdef _WIN32 - module_init(); - - if (pSetThreadpoolThreadMaximum) - pSetThreadpoolThreadMaximum(ptpp, cthrdMost); +#else + free(ptpp); #endif } @@ -93,10 +155,23 @@ BOOL SetThreadpoolThreadMinimum(PTP_POOL ptpp, DWORD cthrdMic) module_init(); if (pSetThreadpoolThreadMinimum) - pSetThreadpoolThreadMinimum(ptpp, cthrdMic); + return pSetThreadpoolThreadMinimum(ptpp, cthrdMic); +#else + ptpp->Minimum = cthrdMic; #endif + return TRUE; +} - return FALSE; +VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost) +{ +#ifdef _WIN32 + module_init(); + + if (pSetThreadpoolThreadMaximum) + pSetThreadpoolThreadMaximum(ptpp, cthrdMost); +#else + ptpp->Maximum = cthrdMost; +#endif } /* dummy */ diff --git a/winpr/libwinpr/pool/pool.h b/winpr/libwinpr/pool/pool.h new file mode 100644 index 000000000..789070070 --- /dev/null +++ b/winpr/libwinpr/pool/pool.h @@ -0,0 +1,72 @@ +/** + * WinPR: Windows Portable Runtime + * Thread Pool API (Pool) + * + * Copyright 2012 Marc-Andre Moreau + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef WINPR_POOL_PRIVATE_H +#define WINPR_POOL_PRIVATE_H + +#include +#include +#include +#include + +struct _TP_CALLBACK_INSTANCE +{ + PTP_WORK Work; +}; + +struct _TP_POOL +{ + DWORD Minimum; + DWORD Maximum; + HANDLE* Threads; + DWORD ThreadCount; + wQueue* PendingQueue; +}; + +struct _TP_WORK +{ + PVOID CallbackParameter; + PTP_WORK_CALLBACK WorkCallback; + PTP_CALLBACK_ENVIRON CallbackEnvironment; +}; + +struct _TP_TIMER +{ + void* dummy; +}; + +struct _TP_WAIT +{ + void* dummy; +}; + +struct _TP_IO +{ + void* dummy; +}; + +#ifndef _WIN32 + +PTP_POOL GetDefaultThreadpool(); +PTP_CALLBACK_ENVIRON GetDefaultThreadpoolEnvironment(); + +#endif + +#endif /* WINPR_POOL_PRIVATE_H */ + diff --git a/winpr/libwinpr/pool/test/TestPoolThread.c b/winpr/libwinpr/pool/test/TestPoolThread.c index 532cd55db..d842beaed 100644 --- a/winpr/libwinpr/pool/test/TestPoolThread.c +++ b/winpr/libwinpr/pool/test/TestPoolThread.c @@ -6,6 +6,9 @@ * Improve Scalability With New Thread Pool APIs: * http://msdn.microsoft.com/en-us/magazine/cc16332.aspx * + * Developing with Thread Pool Enhancements: + * http://msdn.microsoft.com/en-us/library/cc308561.aspx + * * Introduction to the Windows Threadpool: * http://blogs.msdn.com/b/harip/archive/2010/10/11/introduction-to-the-windows-threadpool-part-1.aspx * http://blogs.msdn.com/b/harip/archive/2010/10/12/introduction-to-the-windows-threadpool-part-2.aspx diff --git a/winpr/libwinpr/pool/test/TestPoolWork.c b/winpr/libwinpr/pool/test/TestPoolWork.c index 1ba2ed8b0..91aa8347c 100644 --- a/winpr/libwinpr/pool/test/TestPoolWork.c +++ b/winpr/libwinpr/pool/test/TestPoolWork.c @@ -2,13 +2,16 @@ #include #include +static int count = 0; + void test_WorkCallback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work) { - printf("Hello %s", context); + printf("Hello %s: %d\n", context, count++); } int TestPoolWork(int argc, char* argv[]) { + int index; TP_WORK * work; work = CreateThreadpoolWork((PTP_WORK_CALLBACK) test_WorkCallback, "world", NULL); @@ -19,10 +22,16 @@ int TestPoolWork(int argc, char* argv[]) return -1; } - SubmitThreadpoolWork(work); + /** + * You can post a work object one or more times (up to MAXULONG) without waiting for prior callbacks to complete. + * The callbacks will execute in parallel. To improve efficiency, the thread pool may throttle the threads. + */ + + for (index = 0; index < 10; index++) + SubmitThreadpoolWork(work); + WaitForThreadpoolWorkCallbacks(work, FALSE); CloseThreadpoolWork(work); return 0; } - diff --git a/winpr/libwinpr/pool/work.c b/winpr/libwinpr/pool/work.c index 435e2f887..27c1ec30d 100644 --- a/winpr/libwinpr/pool/work.c +++ b/winpr/libwinpr/pool/work.c @@ -24,6 +24,8 @@ #include #include +#include "pool.h" + #ifdef _WIN32 static BOOL module_initialized = FALSE; @@ -59,14 +61,29 @@ static void module_init() PTP_WORK CreateThreadpoolWork(PTP_WORK_CALLBACK pfnwk, PVOID pv, PTP_CALLBACK_ENVIRON pcbe) { + PTP_WORK work = NULL; + #ifdef _WIN32 module_init(); if (pCreateThreadpoolWork) return pCreateThreadpoolWork(pfnwk, pv, pcbe); +#else + work = (PTP_WORK) malloc(sizeof(TP_WORK)); + + if (work) + { + work->WorkCallback = pfnwk; + work->CallbackParameter = pv; + + if (!pcbe) + pcbe = GetDefaultThreadpoolEnvironment(); + + work->CallbackEnvironment = pcbe; + } #endif - return NULL; + return work; } VOID CloseThreadpoolWork(PTP_WORK pwk) @@ -76,6 +93,8 @@ VOID CloseThreadpoolWork(PTP_WORK pwk) if (pCloseThreadpoolWork) pCloseThreadpoolWork(pwk); +#else + free(pwk); #endif } @@ -86,6 +105,19 @@ VOID SubmitThreadpoolWork(PTP_WORK pwk) if (pSubmitThreadpoolWork) pSubmitThreadpoolWork(pwk); +#else + PTP_POOL pool; + PTP_CALLBACK_INSTANCE callbackInstance; + + pool = pwk->CallbackEnvironment->Pool; + + callbackInstance = (PTP_CALLBACK_INSTANCE) malloc(sizeof(TP_CALLBACK_INSTANCE)); + + if (callbackInstance) + { + callbackInstance->Work = pwk; + Queue_Enqueue(pool->PendingQueue, callbackInstance); + } #endif } @@ -96,8 +128,8 @@ BOOL TrySubmitThreadpoolCallback(PTP_SIMPLE_CALLBACK pfns, PVOID pv, PTP_CALLBAC if (pTrySubmitThreadpoolCallback) return pTrySubmitThreadpoolCallback(pfns, pv, pcbe); +#else #endif - return FALSE; } @@ -108,7 +140,11 @@ VOID WaitForThreadpoolWorkCallbacks(PTP_WORK pwk, BOOL fCancelPendingCallbacks) if (pWaitForThreadpoolWorkCallbacks) pWaitForThreadpoolWorkCallbacks(pwk, fCancelPendingCallbacks); +#else + PTP_POOL pool; + + pool = pwk->CallbackEnvironment->Pool; + + #endif } - - diff --git a/winpr/libwinpr/thread/thread.c b/winpr/libwinpr/thread/thread.c index 0755bae6d..b00a82ea0 100644 --- a/winpr/libwinpr/thread/thread.c +++ b/winpr/libwinpr/thread/thread.c @@ -135,7 +135,9 @@ HANDLE _GetCurrentThread(VOID) DWORD GetCurrentThreadId(VOID) { - return 0; + pthread_t tid; + tid = pthread_self(); + return (DWORD) tid; } DWORD ResumeThread(HANDLE hThread)