libwinpr-pool: start implementing scheduling of asynchronous calls

This commit is contained in:
Marc-André Moreau
2013-01-21 18:33:00 -05:00
parent ad9769dfe7
commit 025b5bab68
10 changed files with 308 additions and 63 deletions

View File

@@ -26,8 +26,6 @@
#include <winpr/synch.h>
#include <winpr/thread.h>
//#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
#ifndef _WIN32
typedef DWORD TP_VERSION, *PTP_VERSION;
@@ -176,8 +174,8 @@ VOID CloseThreadpoolCleanupGroup(PTP_CLEANUP_GROUP ptpcg);
WINPR_API PTP_POOL CreateThreadpool(PVOID reserved);
WINPR_API VOID CloseThreadpool(PTP_POOL ptpp);
WINPR_API VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost);
WINPR_API BOOL SetThreadpoolThreadMinimum(PTP_POOL ptpp, DWORD cthrdMic);
WINPR_API VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost);
/* Callback Environment */
@@ -203,8 +201,6 @@ WINPR_API VOID LeaveCriticalSectionWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci
WINPR_API VOID FreeLibraryWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HMODULE mod);
WINPR_API VOID DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci);
//#endif
WINPR_API void winpr_pool_dummy();
#endif /* WINPR_POOL_H */

View File

@@ -25,6 +25,7 @@ set(${MODULE_PREFIX}_SRCS
io.c
cleanup_group.c
pool.c
pool.h
callback_environment.c
callback.c
callback_cleanup.c)
@@ -47,11 +48,14 @@ if(${CMAKE_SYSTEM_NAME} MATCHES SunOS)
set(${MODULE_PREFIX}_LIBS ${${MODULE_PREFIX}_LIBS} rt)
endif()
set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
MONOLITHIC ${MONOLITHIC_BUILD} INTERNAL
MODULE winpr
MODULES winpr-thread winpr-synch winpr-utils)
if(MONOLITHIC_BUILD)
set(WINPR_LIBS ${WINPR_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE)
else()
set(${MODULE_PREFIX}_LIBS ${${MODULE_PREFIX}_LIBS} winpr-thread winpr-synch)
target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS})
install(TARGETS ${MODULE_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif()

View File

@@ -24,11 +24,7 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
BOOL CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci)
{
return FALSE;
}
#endif

View File

@@ -24,42 +24,94 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
#include "pool.h"
VOID InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
#ifdef _WIN32
#else
static TP_CALLBACK_ENVIRON DEFAULT_CALLBACK_ENVIRONMENT =
{
1, /* Version */
NULL, /* Pool */
NULL, /* CleanupGroup */
NULL, /* CleanupGroupCancelCallback */
NULL, /* RaceDll */
NULL, /* ActivationContext */
NULL, /* FinalizationCallback */
{ 0 } /* Flags */
};
}
VOID DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
PTP_CALLBACK_ENVIRON GetDefaultThreadpoolEnvironment()
{
PTP_CALLBACK_ENVIRON environment = &DEFAULT_CALLBACK_ENVIRONMENT;
}
VOID SetThreadpoolCallbackPool(PTP_CALLBACK_ENVIRON pcbe, PTP_POOL ptpp)
{
}
VOID SetThreadpoolCallbackCleanupGroup(PTP_CALLBACK_ENVIRON pcbe, PTP_CLEANUP_GROUP ptpcg, PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng)
{
}
VOID SetThreadpoolCallbackRunsLong(PTP_CALLBACK_ENVIRON pcbe)
{
}
VOID SetThreadpoolCallbackLibrary(PTP_CALLBACK_ENVIRON pcbe, PVOID mod)
{
}
VOID SetThreadpoolCallbackPriority(PTP_CALLBACK_ENVIRON pcbe, TP_CALLBACK_PRIORITY Priority)
{
environment->Pool = GetDefaultThreadpool();
return environment;
}
#endif
VOID InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
{
#ifdef _WIN32
#else
pcbe->Version = 1;
pcbe->Pool = NULL;
pcbe->CleanupGroup = NULL;
pcbe->CleanupGroupCancelCallback = NULL;
pcbe->RaceDll = NULL;
pcbe->ActivationContext = NULL;
pcbe->FinalizationCallback = NULL;
pcbe->u.s.LongFunction = FALSE;
pcbe->u.s.Persistent = FALSE;
pcbe->u.s.Private = 0;
#endif
}
VOID DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
{
#ifdef _WIN32
#else
#endif
}
VOID SetThreadpoolCallbackPool(PTP_CALLBACK_ENVIRON pcbe, PTP_POOL ptpp)
{
#ifdef _WIN32
#else
pcbe->Pool = ptpp;
#endif
}
VOID SetThreadpoolCallbackCleanupGroup(PTP_CALLBACK_ENVIRON pcbe, PTP_CLEANUP_GROUP ptpcg, PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng)
{
#ifdef _WIN32
#else
pcbe->CleanupGroup = ptpcg;
pcbe->CleanupGroupCancelCallback = pfng;
#endif
}
VOID SetThreadpoolCallbackRunsLong(PTP_CALLBACK_ENVIRON pcbe)
{
#ifdef _WIN32
#else
pcbe->u.s.LongFunction = TRUE;
#endif
}
VOID SetThreadpoolCallbackLibrary(PTP_CALLBACK_ENVIRON pcbe, PVOID mod)
{
#ifdef _WIN32
#else
#endif
}
VOID SetThreadpoolCallbackPriority(PTP_CALLBACK_ENVIRON pcbe, TP_CALLBACK_PRIORITY Priority)
{
#ifdef _WIN32
#else
#endif
}

View File

@@ -24,6 +24,8 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
#include "pool.h"
#ifdef _WIN32
static BOOL module_initialized = FALSE;
@@ -32,8 +34,8 @@ static HMODULE kernel32_module = NULL;
static PTP_POOL (WINAPI * pCreateThreadpool)(PVOID reserved);
static VOID (WINAPI * pCloseThreadpool)(PTP_POOL ptpp);
static VOID (WINAPI * pSetThreadpoolThreadMaximum)(PTP_POOL ptpp, DWORD cthrdMost);
static BOOL (WINAPI * pSetThreadpoolThreadMinimum)(PTP_POOL ptpp, DWORD cthrdMic);
static VOID (WINAPI * pSetThreadpoolThreadMaximum)(PTP_POOL ptpp, DWORD cthrdMost);
static void module_init()
{
@@ -49,22 +51,90 @@ static void module_init()
pCreateThreadpool = (void*) GetProcAddress(kernel32_module, "CreateThreadpool");
pCloseThreadpool = (void*) GetProcAddress(kernel32_module, "CloseThreadpool");
pSetThreadpoolThreadMaximum = (void*) GetProcAddress(kernel32_module, "SetThreadpoolThreadMaximum");
pSetThreadpoolThreadMinimum = (void*) GetProcAddress(kernel32_module, "SetThreadpoolThreadMinimum");
pSetThreadpoolThreadMaximum = (void*) GetProcAddress(kernel32_module, "SetThreadpoolThreadMaximum");
}
#else
static TP_POOL DEFAULT_POOL =
{
0, /* Minimum */
500, /* Maximum */
NULL, /* Threads */
0, /* ThreadCount */
};
static void* thread_pool_work_func(void* arg)
{
PTP_POOL pool;
PTP_WORK work;
PTP_CALLBACK_INSTANCE callbackInstance;
pool = (PTP_POOL) arg;
while (WaitForSingleObject(Queue_Event(pool->PendingQueue), INFINITE) == WAIT_OBJECT_0)
{
callbackInstance = (PTP_CALLBACK_INSTANCE) Queue_Dequeue(pool->PendingQueue);
if (callbackInstance)
{
work = callbackInstance->Work;
work->WorkCallback(callbackInstance, work->CallbackParameter, work);
free(callbackInstance);
}
}
return NULL;
}
PTP_POOL GetDefaultThreadpool()
{
int index;
PTP_POOL pool = NULL;
pool = &DEFAULT_POOL;
if (!pool->Threads)
{
pool->ThreadCount = 4;
pool->Threads = (HANDLE*) malloc(pool->ThreadCount * sizeof(HANDLE));
pool->PendingQueue = Queue_New(TRUE, -1, -1);
for (index = 0; index < pool->ThreadCount; index++)
{
pool->Threads[index] = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) thread_pool_work_func,
(void*) pool, 0, NULL);
}
}
return pool;
}
#endif
PTP_POOL CreateThreadpool(PVOID reserved)
{
PTP_POOL pool = NULL;
#ifdef _WIN32
module_init();
if (pCreateThreadpool)
return pCreateThreadpool(reserved);
#else
pool = (PTP_POOL) malloc(sizeof(TP_POOL));
if (pool)
{
pool->Minimum = 0;
pool->Maximum = 500;
}
#endif
return NULL;
return pool;
}
VOID CloseThreadpool(PTP_POOL ptpp)
@@ -74,16 +144,8 @@ VOID CloseThreadpool(PTP_POOL ptpp)
if (pCloseThreadpool)
pCloseThreadpool(ptpp);
#endif
}
VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost)
{
#ifdef _WIN32
module_init();
if (pSetThreadpoolThreadMaximum)
pSetThreadpoolThreadMaximum(ptpp, cthrdMost);
#else
free(ptpp);
#endif
}
@@ -93,10 +155,23 @@ BOOL SetThreadpoolThreadMinimum(PTP_POOL ptpp, DWORD cthrdMic)
module_init();
if (pSetThreadpoolThreadMinimum)
pSetThreadpoolThreadMinimum(ptpp, cthrdMic);
return pSetThreadpoolThreadMinimum(ptpp, cthrdMic);
#else
ptpp->Minimum = cthrdMic;
#endif
return TRUE;
}
return FALSE;
VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost)
{
#ifdef _WIN32
module_init();
if (pSetThreadpoolThreadMaximum)
pSetThreadpoolThreadMaximum(ptpp, cthrdMost);
#else
ptpp->Maximum = cthrdMost;
#endif
}
/* dummy */

View File

@@ -0,0 +1,72 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Pool)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef WINPR_POOL_PRIVATE_H
#define WINPR_POOL_PRIVATE_H
#include <winpr/pool.h>
#include <winpr/synch.h>
#include <winpr/thread.h>
#include <winpr/collections.h>
struct _TP_CALLBACK_INSTANCE
{
PTP_WORK Work;
};
struct _TP_POOL
{
DWORD Minimum;
DWORD Maximum;
HANDLE* Threads;
DWORD ThreadCount;
wQueue* PendingQueue;
};
struct _TP_WORK
{
PVOID CallbackParameter;
PTP_WORK_CALLBACK WorkCallback;
PTP_CALLBACK_ENVIRON CallbackEnvironment;
};
struct _TP_TIMER
{
void* dummy;
};
struct _TP_WAIT
{
void* dummy;
};
struct _TP_IO
{
void* dummy;
};
#ifndef _WIN32
PTP_POOL GetDefaultThreadpool();
PTP_CALLBACK_ENVIRON GetDefaultThreadpoolEnvironment();
#endif
#endif /* WINPR_POOL_PRIVATE_H */

View File

@@ -6,6 +6,9 @@
* Improve Scalability With New Thread Pool APIs:
* http://msdn.microsoft.com/en-us/magazine/cc16332.aspx
*
* Developing with Thread Pool Enhancements:
* http://msdn.microsoft.com/en-us/library/cc308561.aspx
*
* Introduction to the Windows Threadpool:
* http://blogs.msdn.com/b/harip/archive/2010/10/11/introduction-to-the-windows-threadpool-part-1.aspx
* http://blogs.msdn.com/b/harip/archive/2010/10/12/introduction-to-the-windows-threadpool-part-2.aspx

View File

@@ -2,13 +2,16 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
static int count = 0;
void test_WorkCallback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work)
{
printf("Hello %s", context);
printf("Hello %s: %d\n", context, count++);
}
int TestPoolWork(int argc, char* argv[])
{
int index;
TP_WORK * work;
work = CreateThreadpoolWork((PTP_WORK_CALLBACK) test_WorkCallback, "world", NULL);
@@ -19,10 +22,16 @@ int TestPoolWork(int argc, char* argv[])
return -1;
}
SubmitThreadpoolWork(work);
/**
* You can post a work object one or more times (up to MAXULONG) without waiting for prior callbacks to complete.
* The callbacks will execute in parallel. To improve efficiency, the thread pool may throttle the threads.
*/
for (index = 0; index < 10; index++)
SubmitThreadpoolWork(work);
WaitForThreadpoolWorkCallbacks(work, FALSE);
CloseThreadpoolWork(work);
return 0;
}

View File

@@ -24,6 +24,8 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
#include "pool.h"
#ifdef _WIN32
static BOOL module_initialized = FALSE;
@@ -59,14 +61,29 @@ static void module_init()
PTP_WORK CreateThreadpoolWork(PTP_WORK_CALLBACK pfnwk, PVOID pv, PTP_CALLBACK_ENVIRON pcbe)
{
PTP_WORK work = NULL;
#ifdef _WIN32
module_init();
if (pCreateThreadpoolWork)
return pCreateThreadpoolWork(pfnwk, pv, pcbe);
#else
work = (PTP_WORK) malloc(sizeof(TP_WORK));
if (work)
{
work->WorkCallback = pfnwk;
work->CallbackParameter = pv;
if (!pcbe)
pcbe = GetDefaultThreadpoolEnvironment();
work->CallbackEnvironment = pcbe;
}
#endif
return NULL;
return work;
}
VOID CloseThreadpoolWork(PTP_WORK pwk)
@@ -76,6 +93,8 @@ VOID CloseThreadpoolWork(PTP_WORK pwk)
if (pCloseThreadpoolWork)
pCloseThreadpoolWork(pwk);
#else
free(pwk);
#endif
}
@@ -86,6 +105,19 @@ VOID SubmitThreadpoolWork(PTP_WORK pwk)
if (pSubmitThreadpoolWork)
pSubmitThreadpoolWork(pwk);
#else
PTP_POOL pool;
PTP_CALLBACK_INSTANCE callbackInstance;
pool = pwk->CallbackEnvironment->Pool;
callbackInstance = (PTP_CALLBACK_INSTANCE) malloc(sizeof(TP_CALLBACK_INSTANCE));
if (callbackInstance)
{
callbackInstance->Work = pwk;
Queue_Enqueue(pool->PendingQueue, callbackInstance);
}
#endif
}
@@ -96,8 +128,8 @@ BOOL TrySubmitThreadpoolCallback(PTP_SIMPLE_CALLBACK pfns, PVOID pv, PTP_CALLBAC
if (pTrySubmitThreadpoolCallback)
return pTrySubmitThreadpoolCallback(pfns, pv, pcbe);
#else
#endif
return FALSE;
}
@@ -108,7 +140,11 @@ VOID WaitForThreadpoolWorkCallbacks(PTP_WORK pwk, BOOL fCancelPendingCallbacks)
if (pWaitForThreadpoolWorkCallbacks)
pWaitForThreadpoolWorkCallbacks(pwk, fCancelPendingCallbacks);
#else
PTP_POOL pool;
pool = pwk->CallbackEnvironment->Pool;
#endif
}

View File

@@ -135,7 +135,9 @@ HANDLE _GetCurrentThread(VOID)
DWORD GetCurrentThreadId(VOID)
{
return 0;
pthread_t tid;
tid = pthread_self();
return (DWORD) tid;
}
DWORD ResumeThread(HANDLE hThread)