From af554b5a542b68116061ad8bc6c45ee0a4739ae8 Mon Sep 17 00:00:00 2001 From: akallabeth Date: Tue, 5 May 2020 13:07:15 +0200 Subject: [PATCH 1/2] Use dedicated sound decoder thread. --- channels/rdpsnd/client/rdpsnd_main.c | 106 +++++++++++++++++++++++---- 1 file changed, 92 insertions(+), 14 deletions(-) diff --git a/channels/rdpsnd/client/rdpsnd_main.c b/channels/rdpsnd/client/rdpsnd_main.c index 9644afadb..e01869024 100644 --- a/channels/rdpsnd/client/rdpsnd_main.c +++ b/channels/rdpsnd/client/rdpsnd_main.c @@ -122,6 +122,9 @@ struct rdpsnd_plugin rdpContext* rdpcontext; FREERDP_DSP_CONTEXT* dsp_context; + + HANDLE thread; + wMessageQueue* queue; }; static const char* rdpsnd_is_dyn_str(BOOL dynamic) @@ -1070,14 +1073,11 @@ static UINT rdpsnd_virtual_channel_event_data_received(rdpsndPlugin* plugin, voi if (dataFlags & CHANNEL_FLAG_LAST) { - UINT error; - Stream_SealLength(plugin->data_in); Stream_SetPosition(plugin->data_in, 0); - error = rdpsnd_recv_pdu(plugin, plugin->data_in); - if (error) - return error; + if (!MessageQueue_Post(plugin->queue, NULL, 0, plugin->data_in, NULL)) + return ERROR_INTERNAL_ERROR; plugin->data_in = NULL; } @@ -1222,10 +1222,75 @@ static UINT rdpsnd_virtual_channel_event_disconnected(rdpsndPlugin* rdpsnd) return CHANNEL_RC_OK; } +static void _queue_free(void* obj) +{ + wStream* s = obj; + Stream_Release(s); +} + +static DWORD WINAPI play_thread(LPVOID arg) +{ + UINT error = CHANNEL_RC_OK; + rdpsndPlugin* rdpsnd = arg; + + if (!rdpsnd || !rdpsnd->queue) + return ERROR_INVALID_PARAMETER; + + while (TRUE) + { + int rc; + wMessage message; + wStream* s; + HANDLE handle = MessageQueue_Event(rdpsnd->queue); + WaitForSingleObject(handle, INFINITE); + + rc = MessageQueue_Peek(rdpsnd->queue, &message, TRUE); + if (rc < 1) + continue; + + if (message.id == WMQ_QUIT) + break; + + s = message.wParam; + error = rdpsnd_recv_pdu(rdpsnd, s); + + if (error) + return error; + } + + return CHANNEL_RC_OK; +} + +static UINT rdpsnd_virtual_channel_event_initialized(rdpsndPlugin* rdpsnd) +{ + wObject obj = { 0 }; + + if (!rdpsnd) + return ERROR_INVALID_PARAMETER; + + obj.fnObjectFree = _queue_free; + rdpsnd->queue = MessageQueue_New(&obj); + if (!rdpsnd->queue) + return CHANNEL_RC_NO_MEMORY; + + rdpsnd->thread = CreateThread(NULL, 0, play_thread, rdpsnd, 0, NULL); + if (!rdpsnd->thread) + return CHANNEL_RC_INITIALIZATION_ERROR; + return CHANNEL_RC_OK; +} + static void rdpsnd_virtual_channel_event_terminated(rdpsndPlugin* rdpsnd) { if (rdpsnd) { + MessageQueue_PostQuit(rdpsnd->queue, 0); + if (rdpsnd->thread) + { + WaitForSingleObject(rdpsnd->thread, INFINITE); + CloseHandle(rdpsnd->thread); + } + MessageQueue_Free(rdpsnd->queue); + audio_formats_free(rdpsnd->fixed_format, 1); free(rdpsnd->subsystem); free(rdpsnd->device_name); @@ -1254,6 +1319,7 @@ static VOID VCAPITYPE rdpsnd_virtual_channel_init_event_ex(LPVOID lpUserParam, L switch (event) { case CHANNEL_EVENT_INITIALIZED: + error = rdpsnd_virtual_channel_event_initialized(plugin); break; case CHANNEL_EVENT_CONNECTED: @@ -1373,7 +1439,15 @@ fail: static UINT rdpsnd_on_data_received(IWTSVirtualChannelCallback* pChannelCallback, wStream* data) { RDPSND_CHANNEL_CALLBACK* callback = (RDPSND_CHANNEL_CALLBACK*)pChannelCallback; - return rdpsnd_recv_pdu((rdpsndPlugin*)callback->plugin, data); + rdpsndPlugin* plugin; + if (!callback || !callback->plugin) + return ERROR_INVALID_PARAMETER; + plugin = (rdpsndPlugin*)callback->plugin; + + if (!MessageQueue_Post(plugin->queue, NULL, 0, data, NULL)) + return ERROR_INTERNAL_ERROR; + + return CHANNEL_RC_OK; } static UINT rdpsnd_on_close(IWTSVirtualChannelCallback* pChannelCallback) @@ -1452,7 +1526,7 @@ static UINT rdpsnd_plugin_initialize(IWTSPlugin* pPlugin, IWTSVirtualChannelMana (IWTSListenerCallback*)rdpsnd->listener_callback, &(rdpsnd->listener)); rdpsnd->listener->pInterface = rdpsnd->iface.pInterface; - return status; + return rdpsnd_virtual_channel_event_initialized(rdpsnd); } /** @@ -1463,8 +1537,11 @@ static UINT rdpsnd_plugin_initialize(IWTSPlugin* pPlugin, IWTSVirtualChannelMana static UINT rdpsnd_plugin_terminated(IWTSPlugin* pPlugin) { rdpsndPlugin* rdpsnd = (rdpsndPlugin*)pPlugin; - free(rdpsnd->listener_callback); - free(rdpsnd->iface.pInterface); + if (rdpsnd) + { + free(rdpsnd->listener_callback); + free(rdpsnd->iface.pInterface); + } rdpsnd_virtual_channel_event_terminated(rdpsnd); return CHANNEL_RC_OK; } @@ -1496,10 +1573,8 @@ UINT rdpsnd_DVCPluginEntry(IDRDYNVC_ENTRY_POINTS* pEntryPoints) rdpsnd->dynamic = TRUE; rdpsnd->fixed_format = audio_format_new(); if (!rdpsnd->fixed_format) - { - free(rdpsnd); - return FALSE; - } + goto fail; + rdpsnd->log = WLog_Get("com.freerdp.channels.rdpsnd.client"); rdpsnd->channelEntryPoints.pExtendedData = pEntryPoints->GetPluginData(pEntryPoints); @@ -1507,9 +1582,12 @@ UINT rdpsnd_DVCPluginEntry(IDRDYNVC_ENTRY_POINTS* pEntryPoints) } else { - WLog_ERR(TAG, "%s could not get disp Plugin.", rdpsnd_is_dyn_str(TRUE)); + WLog_ERR(TAG, "%s could not get rdpsnd Plugin.", rdpsnd_is_dyn_str(TRUE)); return CHANNEL_RC_BAD_CHANNEL; } + return error; +fail: + rdpsnd_plugin_terminated(&rdpsnd->iface); return error; } From f016f1ec09056db856fb662876f934dbc754bc0c Mon Sep 17 00:00:00 2001 From: akallabeth Date: Tue, 5 May 2020 17:02:34 +0200 Subject: [PATCH 2/2] Create data copy in case of dynamic sound channel. --- channels/rdpsnd/client/rdpsnd_main.c | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/channels/rdpsnd/client/rdpsnd_main.c b/channels/rdpsnd/client/rdpsnd_main.c index e01869024..4b508ec90 100644 --- a/channels/rdpsnd/client/rdpsnd_main.c +++ b/channels/rdpsnd/client/rdpsnd_main.c @@ -1440,12 +1440,25 @@ static UINT rdpsnd_on_data_received(IWTSVirtualChannelCallback* pChannelCallback { RDPSND_CHANNEL_CALLBACK* callback = (RDPSND_CHANNEL_CALLBACK*)pChannelCallback; rdpsndPlugin* plugin; + wStream* copy; + size_t len = Stream_GetRemainingLength(data); + if (!callback || !callback->plugin) return ERROR_INVALID_PARAMETER; plugin = (rdpsndPlugin*)callback->plugin; - if (!MessageQueue_Post(plugin->queue, NULL, 0, data, NULL)) + copy = StreamPool_Take(plugin->pool, len); + if (!copy) + return ERROR_OUTOFMEMORY; + Stream_Copy(data, copy, len); + Stream_SealLength(copy); + Stream_SetPosition(copy, 0); + + if (!MessageQueue_Post(plugin->queue, NULL, 0, copy, NULL)) + { + Stream_Release(copy); return ERROR_INTERNAL_ERROR; + } return CHANNEL_RC_OK; }