From dcfdf738425176ce6ed932097585254d2af62451 Mon Sep 17 00:00:00 2001 From: Markus Thurner Date: Mon, 20 Apr 2015 10:39:50 +0200 Subject: [PATCH] UdpListener: Simplify shutdown procedure and wait until all messages are processed. --- TimberWinR/Inputs/UdpInputListener.cs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/TimberWinR/Inputs/UdpInputListener.cs b/TimberWinR/Inputs/UdpInputListener.cs index f5d683d..2063344 100644 --- a/TimberWinR/Inputs/UdpInputListener.cs +++ b/TimberWinR/Inputs/UdpInputListener.cs @@ -15,7 +15,6 @@ namespace TimberWinR.Inputs private IPEndPoint _udpEndpointV4; private readonly BlockingCollection _unprocessedRawData; private readonly Thread _rawDataProcessingThread; - public bool Stop { get; set; } private readonly int _port; private long _receivedMessages; private long _parseErrors; @@ -52,15 +51,11 @@ namespace TimberWinR.Inputs public override void Shutdown() { - Stop = true; - LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); // close UDP listeners, which will end the listener threads _udpListenerV4.Close(); - Finished(); - base.Shutdown(); } @@ -92,18 +87,22 @@ namespace TimberWinR.Inputs return; } - byte[] bytes; try { - bytes = _udpListenerV4.EndReceive(result, ref _udpEndpointV4); + byte[] bytes = _udpListenerV4.EndReceive(result, ref _udpEndpointV4); Interlocked.Increment(ref _receivedMessages); StartReceiving(); + _unprocessedRawData.Add(bytes); } catch (SocketException) { LogManager.GetCurrentClassLogger().Info("Socked exception. Ending UDP Listener."); _unprocessedRawData.CompleteAdding(); - return; + } + catch (ObjectDisposedException) + { + LogManager.GetCurrentClassLogger().Info("Object disposed. Ending UDP Listener"); + _unprocessedRawData.CompleteAdding(); } catch (Exception ex) { @@ -111,22 +110,18 @@ namespace TimberWinR.Inputs Interlocked.Increment(ref _receiveErrors); StartReceiving(); - - return; } - - _unprocessedRawData.Add(bytes); } private void ProcessDataLoop() { - while (!Stop && !_unprocessedRawData.IsCompleted) + while (!_unprocessedRawData.IsCompleted) { try { ProcessData(_unprocessedRawData.Take()); } - catch(OperationCanceledException) + catch (OperationCanceledException) { // we are shutting down. break; @@ -141,7 +136,9 @@ namespace TimberWinR.Inputs LogManager.GetCurrentClassLogger().ErrorException("Error while processing data", ex); Thread.Sleep(100); } - } + } + + Finished(); } private void ProcessData(byte[] bytes)