UdpListener: Simplify shutdown procedure and wait until all messages are processed.

This commit is contained in:
Markus Thurner
2015-04-20 10:39:50 +02:00
parent dcd104e4f4
commit dcfdf73842

View File

@@ -15,7 +15,6 @@ namespace TimberWinR.Inputs
private IPEndPoint _udpEndpointV4; private IPEndPoint _udpEndpointV4;
private readonly BlockingCollection<byte[]> _unprocessedRawData; private readonly BlockingCollection<byte[]> _unprocessedRawData;
private readonly Thread _rawDataProcessingThread; private readonly Thread _rawDataProcessingThread;
public bool Stop { get; set; }
private readonly int _port; private readonly int _port;
private long _receivedMessages; private long _receivedMessages;
private long _parseErrors; private long _parseErrors;
@@ -52,15 +51,11 @@ namespace TimberWinR.Inputs
public override void Shutdown() public override void Shutdown()
{ {
Stop = true;
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
// close UDP listeners, which will end the listener threads // close UDP listeners, which will end the listener threads
_udpListenerV4.Close(); _udpListenerV4.Close();
Finished();
base.Shutdown(); base.Shutdown();
} }
@@ -92,18 +87,22 @@ namespace TimberWinR.Inputs
return; return;
} }
byte[] bytes;
try try
{ {
bytes = _udpListenerV4.EndReceive(result, ref _udpEndpointV4); byte[] bytes = _udpListenerV4.EndReceive(result, ref _udpEndpointV4);
Interlocked.Increment(ref _receivedMessages); Interlocked.Increment(ref _receivedMessages);
StartReceiving(); StartReceiving();
_unprocessedRawData.Add(bytes);
} }
catch (SocketException) catch (SocketException)
{ {
LogManager.GetCurrentClassLogger().Info("Socked exception. Ending UDP Listener."); LogManager.GetCurrentClassLogger().Info("Socked exception. Ending UDP Listener.");
_unprocessedRawData.CompleteAdding(); _unprocessedRawData.CompleteAdding();
return; }
catch (ObjectDisposedException)
{
LogManager.GetCurrentClassLogger().Info("Object disposed. Ending UDP Listener");
_unprocessedRawData.CompleteAdding();
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -111,22 +110,18 @@ namespace TimberWinR.Inputs
Interlocked.Increment(ref _receiveErrors); Interlocked.Increment(ref _receiveErrors);
StartReceiving(); StartReceiving();
return;
} }
_unprocessedRawData.Add(bytes);
} }
private void ProcessDataLoop() private void ProcessDataLoop()
{ {
while (!Stop && !_unprocessedRawData.IsCompleted) while (!_unprocessedRawData.IsCompleted)
{ {
try try
{ {
ProcessData(_unprocessedRawData.Take()); ProcessData(_unprocessedRawData.Take());
} }
catch(OperationCanceledException) catch (OperationCanceledException)
{ {
// we are shutting down. // we are shutting down.
break; break;
@@ -141,7 +136,9 @@ namespace TimberWinR.Inputs
LogManager.GetCurrentClassLogger().ErrorException("Error while processing data", ex); LogManager.GetCurrentClassLogger().ErrorException("Error while processing data", ex);
Thread.Sleep(100); Thread.Sleep(100);
} }
} }
Finished();
} }
private void ProcessData(byte[] bytes) private void ProcessData(byte[] bytes)