diff --git a/libfreerdp/core/gateway/rpc_client.c b/libfreerdp/core/gateway/rpc_client.c index 13a906213..ad265b188 100644 --- a/libfreerdp/core/gateway/rpc_client.c +++ b/libfreerdp/core/gateway/rpc_client.c @@ -426,6 +426,23 @@ RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc) return pdu; } +RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc) +{ + RPC_PDU* pdu; + DWORD dwMilliseconds; + + pdu = NULL; + dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0; + + if (WaitForSingleObject(Queue_Event(rpc->client->ReceiveQueue), dwMilliseconds) == WAIT_OBJECT_0) + { + pdu = (RPC_PDU*) Queue_Peek(rpc->client->ReceiveQueue); + return pdu; + } + + return pdu; +} + static void* rpc_client_thread(void* arg) { rdpRpc* rpc; diff --git a/libfreerdp/core/gateway/rpc_client.h b/libfreerdp/core/gateway/rpc_client.h index 38c79be85..569973f83 100644 --- a/libfreerdp/core/gateway/rpc_client.h +++ b/libfreerdp/core/gateway/rpc_client.h @@ -40,6 +40,7 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc); int rpc_recv_enqueue_pdu(rdpRpc* rpc); RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc); +RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc); int rpc_client_new(rdpRpc* rpc); int rpc_client_start(rdpRpc* rpc); diff --git a/libfreerdp/core/gateway/tsg.c b/libfreerdp/core/gateway/tsg.c index 47a1f570d..708a3aa2a 100644 --- a/libfreerdp/core/gateway/tsg.c +++ b/libfreerdp/core/gateway/tsg.c @@ -1399,6 +1399,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length) if (tsg->BytesAvailable < 1) { tsg->PendingPdu = FALSE; + rpc_recv_dequeue_pdu(rpc); rpc_client_receive_pool_return(rpc, tsg->pdu); } @@ -1406,7 +1407,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length) } else { - tsg->pdu = rpc_recv_dequeue_pdu(rpc); + tsg->pdu = rpc_recv_peek_pdu(rpc); if (!tsg->pdu) { @@ -1429,6 +1430,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length) if (tsg->BytesAvailable < 1) { tsg->PendingPdu = FALSE; + rpc_recv_dequeue_pdu(rpc); rpc_client_receive_pool_return(rpc, tsg->pdu); } @@ -1446,6 +1448,8 @@ BOOL tsg_set_blocking_mode(rdpTsg* tsg, BOOL blocking) tsg->rpc->client->SynchronousSend = TRUE; tsg->rpc->client->SynchronousReceive = blocking; + tsg->transport->GatewayEvent = Queue_Event(tsg->rpc->client->ReceiveQueue); + return TRUE; } diff --git a/libfreerdp/core/transport.c b/libfreerdp/core/transport.c index ae4025a13..445d49ed4 100644 --- a/libfreerdp/core/transport.c +++ b/libfreerdp/core/transport.c @@ -599,6 +599,17 @@ void transport_get_fds(rdpTransport* transport, void** rfds, int* rcount) rfds[*rcount] = pfd; (*rcount)++; } + + if (transport->GatewayEvent) + { + pfd = GetEventWaitObject(transport->GatewayEvent); + + if (pfd) + { + rfds[*rcount] = pfd; + (*rcount)++; + } + } } int transport_check_fds(rdpTransport** ptransport) @@ -769,8 +780,6 @@ static void* transport_client_thread(void* arg) while (1) { - printf("transport_client_thread\n"); - status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE); if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0) @@ -805,10 +814,11 @@ rdpTransport* transport_new(rdpSettings* settings) rdpTransport* transport; transport = (rdpTransport*) malloc(sizeof(rdpTransport)); - ZeroMemory(transport, sizeof(rdpTransport)); if (transport != NULL) { + ZeroMemory(transport, sizeof(rdpTransport)); + transport->TcpIn = tcp_new(settings); transport->settings = settings; diff --git a/libfreerdp/core/transport.h b/libfreerdp/core/transport.h index ee225bfe1..0dc04122a 100644 --- a/libfreerdp/core/transport.h +++ b/libfreerdp/core/transport.h @@ -66,6 +66,7 @@ struct rdp_transport wStream* ReceiveBuffer; TransportRecv ReceiveCallback; HANDLE ReceiveEvent; + HANDLE GatewayEvent; BOOL blocking; BOOL SplitInputOutput; wObjectPool* ReceivePool; diff --git a/winpr/include/winpr/collections.h b/winpr/include/winpr/collections.h index e8cafb01e..544d98a39 100644 --- a/winpr/include/winpr/collections.h +++ b/winpr/include/winpr/collections.h @@ -61,8 +61,10 @@ struct _wQueue typedef struct _wQueue wQueue; WINPR_API int Queue_Count(wQueue* queue); -WINPR_API BOOL Queue_IsSynchronized(wQueue* queue); -WINPR_API HANDLE Queue_SyncRoot(wQueue* queue); + +WINPR_API BOOL Queue_Lock(wQueue* queue); +WINPR_API BOOL Queue_Unlock(wQueue* queue); + WINPR_API HANDLE Queue_Event(wQueue* queue); #define Queue_Object(_queue) (&_queue->object) diff --git a/winpr/libwinpr/synch/event.c b/winpr/libwinpr/synch/event.c index 43103362d..0c1223ef1 100644 --- a/winpr/libwinpr/synch/event.c +++ b/winpr/libwinpr/synch/event.c @@ -64,6 +64,7 @@ HANDLE CreateEventW(LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL bManualReset, #ifdef HAVE_EVENTFD_H event->pipe_fd[0] = eventfd(0, EFD_NONBLOCK); + if (event->pipe_fd[0] < 0) { printf("CreateEventW: failed to create event\n"); @@ -286,10 +287,13 @@ void* GetEventWaitObject(HANDLE hEvent) { #ifndef _WIN32 int fd; + void* obj; fd = GetEventFileDescriptor(hEvent); - return ((void*) (long) fd); + obj = ((void*) (long) fd); + + return obj; #else return hEvent; #endif diff --git a/winpr/libwinpr/utils/collections/Queue.c b/winpr/libwinpr/utils/collections/Queue.c index 79c286dbb..fa33df76a 100644 --- a/winpr/libwinpr/utils/collections/Queue.c +++ b/winpr/libwinpr/utils/collections/Queue.c @@ -44,21 +44,21 @@ int Queue_Count(wQueue* queue) } /** - * Gets a value indicating whether access to the Queue is synchronized (thread safe). + * Lock access to the ArrayList */ -BOOL Queue_IsSynchronized(wQueue* queue) +BOOL Queue_Lock(wQueue* queue) { - return queue->synchronized; + return (WaitForSingleObject(queue->mutex, INFINITE) == WAIT_OBJECT_0) ? TRUE : FALSE; } /** - * Gets an object that can be used to synchronize access to the Queue. + * Unlock access to the ArrayList */ -HANDLE Queue_SyncRoot(wQueue* queue) +BOOL Queue_Unlock(wQueue* queue) { - return queue->mutex; + return ReleaseMutex(queue->mutex); } /**