Adding stdout output

This commit is contained in:
Jonathan Preddy
2014-08-04 14:07:16 -04:00
parent 7ec68f4e52
commit 94ca9b40a2
8 changed files with 154 additions and 3 deletions

View File

@@ -59,6 +59,9 @@
<Content Include="default.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
<None Include="loopback.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="packages.config">
<SubType>Designer</SubType>
</None>

View File

@@ -0,0 +1,29 @@
{
"TimberWinR": {
"Inputs": {
"Stdin": [
{
}
]
},
"Outputs": {
"Stdout": [
{
}
]
},
"Filters": [
{
"grok": {
"match": [
"message",
""
],
"add_field": [
"ComputerName", "test"
]
}
}
]
}
}

View File

@@ -45,6 +45,11 @@ namespace TimberWinR
get { return _elasticsearchOutputs; }
}
private List<StdoutOutput> _stdoutOutputs = new List<StdoutOutput>();
public IEnumerable<StdoutOutput> StdoutOutputs
{
get { return _stdoutOutputs; }
}
private List<Tcp> _tcps = new List<Tcp>();
public IEnumerable<Tcp> Tcps
@@ -136,9 +141,10 @@ namespace TimberWinR
{
if (x.TimberWinR.Outputs.Redis != null)
c._redisOutputs = x.TimberWinR.Outputs.Redis.ToList();
if (x.TimberWinR.Outputs.Elasticsearch != null)
c._elasticsearchOutputs = x.TimberWinR.Outputs.Elasticsearch.ToList();
if (x.TimberWinR.Outputs.Stdout != null)
c._stdoutOutputs = x.TimberWinR.Outputs.Stdout.ToList();
}
if (x.TimberWinR.Filters != null)
@@ -175,6 +181,7 @@ namespace TimberWinR
_logs = new List<Log>();
_redisOutputs = new List<RedisOutput>();
_elasticsearchOutputs = new List<ElasticsearchOutput>();
_stdoutOutputs = new List<StdoutOutput>();
_tcps = new List<Tcp>();
}

View File

@@ -95,6 +95,14 @@ namespace TimberWinR
Outputs.Add(els);
}
}
if (Config.StdoutOutputs != null)
{
foreach (var ro in Config.StdoutOutputs)
{
var stdout = new StdoutOutput(this, ro, cancelToken);
Outputs.Add(stdout);
}
}
foreach (Parser.IISW3CLog iisw3cConfig in Config.IISW3C)
{

View File

@@ -31,6 +31,7 @@ namespace TimberWinR.Outputs
_timeout = eo.Timeout;
_manager = manager;
_port = eo.Port;
_interval = eo.Interval;
_host = eo.Host;
_index = eo.Index;
_hostIndex = 0;

View File

@@ -0,0 +1,85 @@
using Newtonsoft.Json.Linq;
using NLog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace TimberWinR.Outputs
{
public class StdoutOutput : OutputSender
{
private TimberWinR.Manager _manager;
private readonly int _interval;
private readonly object _locker = new object();
private readonly List<JObject> _jsonQueue;
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken)
: base(cancelToken)
{
_manager = manager;
_interval = eo.Interval;
_jsonQueue = new List<JObject>();
var elsThread = new Task(StdoutSender, cancelToken);
elsThread.Start();
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
private void StdoutSender()
{
while (!CancelToken.IsCancellationRequested)
{
JObject[] messages;
lock (_locker)
{
messages = _jsonQueue.Take(1).ToArray();
_jsonQueue.RemoveRange(0, messages.Length);
}
if (messages.Length > 0)
{
try
{
foreach (JObject obj in messages)
{
Console.WriteLine(obj.ToString());
}
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
}
System.Threading.Thread.Sleep(_interval);
}
}
protected override void MessageReceivedHandler(Newtonsoft.Json.Linq.JObject jsonMessage)
{
if (_manager.Config.Filters != null)
ApplyFilters(jsonMessage);
var message = jsonMessage.ToString();
LogManager.GetCurrentClassLogger().Debug(message);
lock (_locker)
{
_jsonQueue.Add(jsonMessage);
}
}
private void ApplyFilters(JObject json)
{
foreach (var filter in _manager.Config.Filters)
{
filter.Apply(json);
}
}
}
}

View File

@@ -406,6 +406,8 @@ namespace TimberWinR.Parser
public int NumThreads { get; set; }
[JsonProperty(PropertyName = "protocol")]
public string Protocol { get; set; }
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
public ElasticsearchOutput()
{
@@ -415,6 +417,7 @@ namespace TimberWinR.Parser
Host = new string[] { "localhost" };
Timeout = 10000;
NumThreads = 1;
Interval = 1000;
}
}
@@ -447,6 +450,17 @@ namespace TimberWinR.Parser
}
}
public class StdoutOutput
{
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
public StdoutOutput()
{
Interval = 1000;
}
}
public class OutputTargets
{
[JsonProperty("Redis")]
@@ -454,6 +468,9 @@ namespace TimberWinR.Parser
[JsonProperty("Elasticsearch")]
public ElasticsearchOutput[] Elasticsearch { get; set; }
[JsonProperty("Stdout")]
public StdoutOutput[] Stdout { get; set; }
}
public class InputSources

View File

@@ -84,6 +84,7 @@
<Compile Include="Outputs\Elasticsearch.cs" />
<Compile Include="Outputs\OutputSender.cs" />
<Compile Include="Outputs\Redis.cs" />
<Compile Include="Outputs\Stdout.cs" />
<Compile Include="Parser.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Properties\Resources.Designer.cs">