From 64979df0121578572382e56445ad045140483575 Mon Sep 17 00:00:00 2001 From: Markus Thurner Date: Fri, 10 Apr 2015 10:23:22 +0200 Subject: [PATCH] Use Begin/End Receive for data, and immediately start receiving data again. Processing of result is done outside of the UDP receive method in a separate thread, allowing to process more messages in peak situations. --- TimberWinR/Inputs/UdpInputListener.cs | 181 +++++++++++++++++--------- 1 file changed, 116 insertions(+), 65 deletions(-) diff --git a/TimberWinR/Inputs/UdpInputListener.cs b/TimberWinR/Inputs/UdpInputListener.cs index 728e387..e7be8e4 100644 --- a/TimberWinR/Inputs/UdpInputListener.cs +++ b/TimberWinR/Inputs/UdpInputListener.cs @@ -1,48 +1,54 @@ using System; -using System.IO; +using System.Collections.Concurrent; using System.Text; -using System.Text.RegularExpressions; using System.Threading; using System.Net; using System.Net.Sockets; -using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; namespace TimberWinR.Inputs { public class UdpInputListener : InputListener - { - private UdpClient _udpListenerV4; - private readonly Thread _listenThreadV4; + { + private UdpClient _udpListenerV4; + private IPEndPoint _udpEndpointV4; + private readonly BlockingCollection _unprocessedRawData; + private readonly Thread _rawDataProcessingThread; private readonly int _port; private long _receivedMessages; - private long _parsedErrors; - + private long _parseErrors; + private long _receiveErrors; + private long _parsedMessages; + public override JObject ToJson() { - JObject json = new JObject( - new JProperty("udp", - new JObject( - new JProperty("port", _port), - new JProperty("errors", _parsedErrors), - new JProperty("messages", _receivedMessages) - ))); + var json = + new JObject(new JProperty("udp", + new JObject(new JProperty("port", _port), + new JProperty("receive_errors", _receiveErrors), + new JProperty("parse_errors", _parseErrors), + new JProperty("received_messages", _receivedMessages), + new JProperty("parsed_messages", _parsedMessages), + new JProperty("unprocessed_messages", _unprocessedRawData.Count)))); return json; } - public UdpInputListener(CancellationToken cancelToken, int port = 5140) - : base(cancelToken, "Win32-Udp") + public UdpInputListener(CancellationToken cancelToken, int port = 5140) : base(cancelToken, "Win32-Udp") { - _port = port; + _port = port; _receivedMessages = 0; - _listenThreadV4 = new Thread(StartListener); - _listenThreadV4.Start(); - } + // setup raw data processor + _unprocessedRawData = new BlockingCollection(); + _rawDataProcessingThread = new Thread(ProcessDataLoop) { Name = "Win32-Udp-DataProcessor"}; + _rawDataProcessingThread.Start(); + // start listing to udp port + StartListener(); + } public override void Shutdown() { @@ -51,64 +57,109 @@ namespace TimberWinR.Inputs // close UDP listeners, which will end the listener threads _udpListenerV4.Close(); - // wait for completion of the threads - _listenThreadV4.Join(); - base.Shutdown(); } private void StartListener() - { - var groupV4 = new IPEndPoint(IPAddress.Any, _port); + { + _udpEndpointV4 = new IPEndPoint(IPAddress.Any, _port); - _udpListenerV4 = new UdpClient(_port); - - LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", groupV4); + // setup listener + _udpListenerV4 = new UdpClient(_port); - string lastMessage = ""; + // start listening on UDP port + StartReceiving(); + + // all started; log details + LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", _udpEndpointV4); + } + + private void StartReceiving() + { + _udpListenerV4.BeginReceive(DataReceived, null); + } + + private void DataReceived(IAsyncResult result) + { + if (CancelToken.IsCancellationRequested) + { + _unprocessedRawData.CompleteAdding(); + return; + } + + byte[] bytes; try { - while (!CancelToken.IsCancellationRequested) - { - try - { - byte[] bytes = _udpListenerV4.Receive(ref groupV4); - var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length); - lastMessage = data; - var json = JObject.Parse(data); - ProcessJson(json); - Interlocked.Increment(ref _receivedMessages); - } - catch(ArgumentException aex) - { - LogManager.GetCurrentClassLogger().Error(aex); - break; - } - catch(SocketException) - { - break; - } - catch (Exception ex) - { - var jex1 = LogErrors.LogException(string.Format("Invalid JSON: {0}", lastMessage), ex); - if (jex1 != null) - ProcessJson(jex1); - - LogManager.GetCurrentClassLogger().Warn("Bad JSON: {0}", lastMessage); - LogManager.GetCurrentClassLogger().Warn(ex); - - Interlocked.Increment(ref _parsedErrors); - } - } - _udpListenerV4.Close(); + bytes = _udpListenerV4.EndReceive(result, ref _udpEndpointV4); + Interlocked.Increment(ref _receivedMessages); + StartReceiving(); + } + catch (SocketException) + { + LogManager.GetCurrentClassLogger().Info("Socked exception. Ending UDP Listener."); + _unprocessedRawData.CompleteAdding(); + return; } catch (Exception ex) { - if (!CancelToken.IsCancellationRequested) - LogManager.GetCurrentClassLogger().Error(ex); + LogManager.GetCurrentClassLogger().Warn("Error while receiving data.", ex); + + Interlocked.Increment(ref _receiveErrors); + StartReceiving(); + + return; + } + + _unprocessedRawData.Add(bytes); + } + + private void ProcessDataLoop() + { + while (!_unprocessedRawData.IsCompleted) + { + try + { + ProcessData(_unprocessedRawData.Take()); + } + catch (InvalidOperationException) + { + // when the collection is marked as completed + break; + } + catch (Exception ex) + { + LogManager.GetCurrentClassLogger().ErrorException("Error while processing data", ex); + Thread.Sleep(100); + } } Finished(); } + + private void ProcessData(byte[] bytes) + { + var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length); + + try + { + var json = JObject.Parse(data); + ProcessJson(json); + + _parsedMessages++; + } + catch (Exception ex) + { + var jex1 = LogErrors.LogException(string.Format("Invalid JSON: {0}", data), ex); + if (jex1 != null) + { + ProcessJson(jex1); + } + + var msg = string.Format("Bad JSON: {0}", data); + LogManager.GetCurrentClassLogger().Warn(msg, ex); + + _parseErrors++; + } + } } }