Merge pull request #41 from thoean/MakeUdpClientAsync

Make UDP listener async
This commit is contained in:
Eric Fontana
2015-04-14 11:30:48 -04:00

View File

@@ -1,11 +1,9 @@
using System; using System;
using System.IO; using System.Collections.Concurrent;
using System.Text; using System.Text;
using System.Text.RegularExpressions;
using System.Threading; using System.Threading;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using NLog; using NLog;
@@ -14,35 +12,43 @@ namespace TimberWinR.Inputs
public class UdpInputListener : InputListener public class UdpInputListener : InputListener
{ {
private UdpClient _udpListenerV4; private UdpClient _udpListenerV4;
private readonly Thread _listenThreadV4; private IPEndPoint _udpEndpointV4;
private readonly BlockingCollection<byte[]> _unprocessedRawData;
private readonly Thread _rawDataProcessingThread;
private readonly int _port; private readonly int _port;
private long _receivedMessages; private long _receivedMessages;
private long _parsedErrors; private long _parseErrors;
private long _receiveErrors;
private long _parsedMessages;
public override JObject ToJson() public override JObject ToJson()
{ {
JObject json = new JObject( var json =
new JProperty("udp", new JObject(new JProperty("udp",
new JObject( new JObject(new JProperty("port", _port),
new JProperty("port", _port), new JProperty("receive_errors", _receiveErrors),
new JProperty("errors", _parsedErrors), new JProperty("parse_errors", _parseErrors),
new JProperty("messages", _receivedMessages) new JProperty("received_messages", _receivedMessages),
))); new JProperty("parsed_messages", _parsedMessages),
new JProperty("unprocessed_messages", _unprocessedRawData.Count))));
return json; return json;
} }
public UdpInputListener(CancellationToken cancelToken, int port = 5140) public UdpInputListener(CancellationToken cancelToken, int port = 5140) : base(cancelToken, "Win32-Udp")
: base(cancelToken, "Win32-Udp")
{ {
_port = port; _port = port;
_receivedMessages = 0; _receivedMessages = 0;
_listenThreadV4 = new Thread(StartListener); // setup raw data processor
_listenThreadV4.Start(); _unprocessedRawData = new BlockingCollection<byte[]>();
} _rawDataProcessingThread = new Thread(ProcessDataLoop) { Name = "Win32-Udp-DataProcessor"};
_rawDataProcessingThread.Start();
// start listing to udp port
StartListener();
}
public override void Shutdown() public override void Shutdown()
{ {
@@ -51,64 +57,109 @@ namespace TimberWinR.Inputs
// close UDP listeners, which will end the listener threads // close UDP listeners, which will end the listener threads
_udpListenerV4.Close(); _udpListenerV4.Close();
// wait for completion of the threads
_listenThreadV4.Join();
base.Shutdown(); base.Shutdown();
} }
private void StartListener() private void StartListener()
{ {
var groupV4 = new IPEndPoint(IPAddress.Any, _port); _udpEndpointV4 = new IPEndPoint(IPAddress.Any, _port);
// setup listener
_udpListenerV4 = new UdpClient(_port); _udpListenerV4 = new UdpClient(_port);
LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", groupV4); // start listening on UDP port
StartReceiving();
string lastMessage = ""; // all started; log details
try LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", _udpEndpointV4);
{
while (!CancelToken.IsCancellationRequested)
{
try
{
byte[] bytes = _udpListenerV4.Receive(ref groupV4);
var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
lastMessage = data;
var json = JObject.Parse(data);
ProcessJson(json);
Interlocked.Increment(ref _receivedMessages);
} }
catch(ArgumentException aex)
private void StartReceiving()
{ {
LogManager.GetCurrentClassLogger().Error(aex); _udpListenerV4.BeginReceive(DataReceived, null);
break; }
private void DataReceived(IAsyncResult result)
{
if (CancelToken.IsCancellationRequested)
{
_unprocessedRawData.CompleteAdding();
return;
}
byte[] bytes;
try
{
bytes = _udpListenerV4.EndReceive(result, ref _udpEndpointV4);
Interlocked.Increment(ref _receivedMessages);
StartReceiving();
} }
catch (SocketException) catch (SocketException)
{ {
LogManager.GetCurrentClassLogger().Info("Socked exception. Ending UDP Listener.");
_unprocessedRawData.CompleteAdding();
return;
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Warn("Error while receiving data.", ex);
Interlocked.Increment(ref _receiveErrors);
StartReceiving();
return;
}
_unprocessedRawData.Add(bytes);
}
private void ProcessDataLoop()
{
while (!_unprocessedRawData.IsCompleted)
{
try
{
ProcessData(_unprocessedRawData.Take());
}
catch (InvalidOperationException)
{
// when the collection is marked as completed
break; break;
} }
catch (Exception ex) catch (Exception ex)
{ {
var jex1 = LogErrors.LogException(string.Format("Invalid JSON: {0}", lastMessage), ex); LogManager.GetCurrentClassLogger().ErrorException("Error while processing data", ex);
if (jex1 != null) Thread.Sleep(100);
ProcessJson(jex1);
LogManager.GetCurrentClassLogger().Warn("Bad JSON: {0}", lastMessage);
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _parsedErrors);
} }
} }
_udpListenerV4.Close();
}
catch (Exception ex)
{
if (!CancelToken.IsCancellationRequested)
LogManager.GetCurrentClassLogger().Error(ex);
}
Finished(); Finished();
} }
private void ProcessData(byte[] bytes)
{
var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
try
{
var json = JObject.Parse(data);
ProcessJson(json);
_parsedMessages++;
}
catch (Exception ex)
{
var jex1 = LogErrors.LogException(string.Format("Invalid JSON: {0}", data), ex);
if (jex1 != null)
{
ProcessJson(jex1);
}
var msg = string.Format("Bad JSON: {0}", data);
LogManager.GetCurrentClassLogger().Warn(msg, ex);
_parseErrors++;
}
}
} }
} }