diff --git a/libfreerdp/codec/progressive.c b/libfreerdp/codec/progressive.c index f026fb814..03ad76677 100644 --- a/libfreerdp/codec/progressive.c +++ b/libfreerdp/codec/progressive.c @@ -37,6 +37,7 @@ #include "rfx_quantization.h" #include "rfx_dwt.h" #include "rfx_rlgr.h" +#include "rfx_types.h" #include "progressive.h" #define TAG FREERDP_TAG("codec.progressive") @@ -1621,18 +1622,55 @@ static INLINE BOOL progressive_tile_read(PROGRESSIVE_CONTEXT* progressive, BOOL return progressive_surface_tile_replace(surface, region, &tile, FALSE); } +struct _PROGRESSIVE_TILE_PROCESS_WORK_PARAM +{ + PROGRESSIVE_CONTEXT* progressive; + PROGRESSIVE_BLOCK_REGION* region; + PROGRESSIVE_BLOCK_CONTEXT* context; + RFX_PROGRESSIVE_TILE* tile; +}; +typedef struct _PROGRESSIVE_TILE_PROCESS_WORK_PARAM PROGRESSIVE_TILE_PROCESS_WORK_PARAM; + +static void CALLBACK progressive_process_tiles_tile_work_callback(PTP_CALLBACK_INSTANCE instance, + void* context, PTP_WORK work) +{ + PROGRESSIVE_TILE_PROCESS_WORK_PARAM* param = (PROGRESSIVE_TILE_PROCESS_WORK_PARAM*)context; + + switch (param->tile->blockType) + { + case PROGRESSIVE_WBT_TILE_SIMPLE: + case PROGRESSIVE_WBT_TILE_FIRST: + progressive_decompress_tile_first(param->progressive, param->tile, param->region, + param->context); + break; + + case PROGRESSIVE_WBT_TILE_UPGRADE: + progressive_decompress_tile_upgrade(param->progressive, param->tile, param->region, + param->context); + break; + default: + WLog_Print(param->progressive->log, WLOG_ERROR, "Invalid block type %04 (%s)" PRIx16, + param->tile->blockType, + progressive_get_block_type_string(param->tile->blockType)); + break; + } +} + static INLINE int progressive_process_tiles(PROGRESSIVE_CONTEXT* progressive, wStream* s, PROGRESSIVE_BLOCK_REGION* region, PROGRESSIVE_SURFACE_CONTEXT* surface, const PROGRESSIVE_BLOCK_CONTEXT* context) { - int status = -1; + int status = 0; size_t end; const size_t start = Stream_GetPosition(s); UINT16 index; UINT16 blockType; UINT32 blockLen; UINT32 count = 0; + PTP_WORK* work_objects = NULL; + PROGRESSIVE_TILE_PROCESS_WORK_PARAM* params = NULL; + UINT16 close_cnt = 0; if (Stream_GetRemainingLength(s) < region->tileDataSize) { @@ -1714,24 +1752,46 @@ static INLINE int progressive_process_tiles(PROGRESSIVE_CONTEXT* progressive, wS return -1044; } + if (progressive->rfx_context->priv->UseThreads) + { + work_objects = (PTP_WORK*)calloc(region->numTiles, sizeof(PTP_WORK)); + if (!work_objects) + return -1; + + params = (PROGRESSIVE_TILE_PROCESS_WORK_PARAM*)calloc( + region->numTiles, sizeof(PROGRESSIVE_TILE_PROCESS_WORK_PARAM)); + if (!params) + { + free(work_objects); + return -1; + } + } + for (index = 0; index < region->numTiles; index++) { RFX_PROGRESSIVE_TILE* tile = region->tiles[index]; + params[index].progressive = progressive; + params[index].region = region; + params[index].context = context; + params[index].tile = tile; - switch (tile->blockType) + if (progressive->rfx_context->priv->UseThreads) { - case PROGRESSIVE_WBT_TILE_SIMPLE: - case PROGRESSIVE_WBT_TILE_FIRST: - status = progressive_decompress_tile_first(progressive, tile, region, context); + if (!(work_objects[index] = CreateThreadpoolWork( + progressive_process_tiles_tile_work_callback, (void*)¶ms[index], + &progressive->rfx_context->priv->ThreadPoolEnv))) + { + WLog_ERR(TAG, "CreateThreadpoolWork failed."); + status = -1; break; + } - case PROGRESSIVE_WBT_TILE_UPGRADE: - status = progressive_decompress_tile_upgrade(progressive, tile, region, context); - break; - default: - WLog_Print(progressive->log, WLOG_ERROR, "Invalid block type %04 (%s)" PRIx16, - tile->blockType, progressive_get_block_type_string(tile->blockType)); - return -42; + SubmitThreadpoolWork(work_objects[index]); + close_cnt = index + 1; + } + else + { + progressive_process_tiles_tile_work_callback(0, ¶ms[index], 0); } if (status < 0) @@ -1742,6 +1802,25 @@ static INLINE int progressive_process_tiles(PROGRESSIVE_CONTEXT* progressive, wS } } + if (status < 0) + WLog_Print(progressive->log, WLOG_ERROR, + "Failed to create ThreadpoolWork for tile %" PRIu16, index); + + if (progressive->rfx_context->priv->UseThreads) + { + for (index = 0; index < close_cnt; index++) + { + WaitForThreadpoolWorkCallbacks(work_objects[index], FALSE); + CloseThreadpoolWork(work_objects[index]); + } + } + + free(work_objects); + free(params); + + if (status < 0) + return -1; + return (int)(end - start); }