CHG TCP listeners now accepts continous streams with multiline json snippets.

FIX   Default configuration template had Outputs on wrong (root) level, thus ignoring them.
This commit is contained in:
Philipp Sumi
2014-10-30 18:50:27 +01:00
parent 17a9e382fe
commit 7583a586fd
2 changed files with 25 additions and 35 deletions

View File

@@ -26,17 +26,17 @@
"drop": "true" "drop": "true"
} }
} }
] ],
}, "Outputs": {
"Outputs": { "Redis": [
"Redis": [ {
{ "_comment": "Change the host to your Redis instance",
"_comment": "Change the host to your Redis instance", "port": 6379,
"port": 6379, "host": [
"host": [ "logaggregator.vistaprint.svc"
"logaggregator.vistaprint.svc" ]
] }
} ]
] }
} }
} }

View File

@@ -1,11 +1,9 @@
using System; using System;
using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq;
using System.Text;
using System.Threading; using System.Threading;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using NLog; using NLog;
@@ -36,7 +34,7 @@ namespace TimberWinR.Inputs
: base(cancelToken, "Win32-Tcp") : base(cancelToken, "Win32-Tcp")
{ {
_port = port; _port = port;
LogManager.GetCurrentClassLogger().Info("Tcp Input(v4/v6) on Port {0} Ready", _port); LogManager.GetCurrentClassLogger().Info("Tcp Input(v4/v6) on Port {0} Ready", _port);
@@ -67,7 +65,7 @@ namespace TimberWinR.Inputs
listener.Start(); listener.Start();
while (!CancelToken.IsCancellationRequested) while (!CancelToken.IsCancellationRequested)
{ {
try try
@@ -92,27 +90,22 @@ namespace TimberWinR.Inputs
private void HandleNewClient(object client) private void HandleNewClient(object client)
{ {
var tcpClient = (TcpClient)client; var tcpClient = (TcpClient)client;
NetworkStream clientStream = null;
try try
{ {
clientStream = tcpClient.GetStream(); NetworkStream clientStream = tcpClient.GetStream();
var stream = new StreamReader(clientStream); using (var stream = new StreamReader(clientStream))
string line;
while ((line = stream.ReadLine()) != null)
{ {
try //assume a continuous stream of JSON objects
using (var reader = new JsonTextReader(stream) { SupportMultipleContent = true })
{ {
JObject json = JObject.Parse(line); while (reader.Read())
ProcessJson(json); {
_receivedMessages++; if (CancelToken.IsCancellationRequested) break;
JObject json = JObject.Load(reader);
ProcessJson(json);
}
} }
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
if (CancelToken.IsCancellationRequested)
break;
} }
} }
catch (Exception ex) catch (Exception ex)
@@ -120,9 +113,6 @@ namespace TimberWinR.Inputs
LogManager.GetCurrentClassLogger().Error(ex); LogManager.GetCurrentClassLogger().Error(ex);
} }
if (clientStream != null)
clientStream.Close();
tcpClient.Close(); tcpClient.Close();
Finished(); Finished();
} }