Compare commits
21 Commits
chrisbalda
...
bulk_elast
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ef79978a6 | ||
|
|
47f233f863 | ||
|
|
b937d6ef45 | ||
|
|
c16331ac10 | ||
|
|
ac10640edf | ||
|
|
e9c27c8c19 | ||
|
|
7be95a976e | ||
|
|
f2b0f1a85d | ||
|
|
eaba99144e | ||
|
|
3208da6488 | ||
|
|
fb473909e7 | ||
|
|
b7095471fb | ||
|
|
d7fa582191 | ||
|
|
6ea3e581fd | ||
|
|
99b51d240d | ||
|
|
fd123e3a86 | ||
|
|
42741fbe1e | ||
|
|
cdc2d09150 | ||
|
|
2e28a50222 | ||
|
|
92a9adeca8 | ||
|
|
91cf59612c |
@@ -99,9 +99,7 @@ namespace TimberWinR.ExtractID
|
||||
Console.WriteLine("Updated {0} ProductID: {1}", args[2], productCode);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
Console.Error.WriteLine("Failed for some reason");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
|
||||
// You can specify all the values or you can default the Build and Revision Numbers
|
||||
// by using the '*' as shown below:
|
||||
// [assembly: AssemblyVersion("1.0.*")]
|
||||
[assembly: AssemblyVersion("1.3.19.0")]
|
||||
[assembly: AssemblyFileVersion("1.3.19.0")]
|
||||
[assembly: AssemblyVersion("1.3.20.0")]
|
||||
[assembly: AssemblyFileVersion("1.3.20.0")]
|
||||
|
||||
@@ -94,7 +94,7 @@ namespace TimberWinR.UnitTests
|
||||
|
||||
|
||||
Configuration c = Configuration.FromString(redisJson);
|
||||
RedisOutput redis = c.RedisOutputs.First() as RedisOutput;
|
||||
RedisOutputParameters redis = c.RedisOutputs.First() as RedisOutputParameters;
|
||||
Assert.IsTrue(redis.Host.Length >= 1);
|
||||
}
|
||||
|
||||
|
||||
126
TimberWinR.UnitTests/FakeRediServer.cs
Normal file
126
TimberWinR.UnitTests/FakeRediServer.cs
Normal file
@@ -0,0 +1,126 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using NUnit.Framework;
|
||||
using TimberWinR.Parser;
|
||||
using System.Text;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace TimberWinR.UnitTests
|
||||
{
|
||||
// Class which implements a Fake redis server for test purposes.
|
||||
class FakeRediServer
|
||||
{
|
||||
private readonly System.Net.Sockets.TcpListener _tcpListenerV4;
|
||||
private readonly System.Net.Sockets.TcpListener _tcpListenerV6;
|
||||
private Thread _listenThreadV4;
|
||||
private Thread _listenThreadV6;
|
||||
private readonly int _port;
|
||||
private CancellationToken _cancelToken;
|
||||
private bool _shutdown;
|
||||
|
||||
public FakeRediServer(CancellationToken cancelToken, int port = 6379)
|
||||
{
|
||||
_port = port;
|
||||
_cancelToken = cancelToken;
|
||||
_shutdown = false;
|
||||
|
||||
_tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port);
|
||||
_tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port);
|
||||
|
||||
_listenThreadV4 = new Thread(new ParameterizedThreadStart(ListenForClients));
|
||||
_listenThreadV4.Start(_tcpListenerV4);
|
||||
|
||||
_listenThreadV6 = new Thread(new ParameterizedThreadStart(ListenForClients));
|
||||
_listenThreadV6.Start(_tcpListenerV6);
|
||||
}
|
||||
|
||||
public void Shutdown()
|
||||
{
|
||||
_shutdown = true;
|
||||
this._tcpListenerV4.Stop();
|
||||
this._tcpListenerV6.Stop();
|
||||
}
|
||||
|
||||
|
||||
private void ListenForClients(object olistener)
|
||||
{
|
||||
System.Net.Sockets.TcpListener listener = olistener as System.Net.Sockets.TcpListener;
|
||||
|
||||
listener.Start();
|
||||
|
||||
|
||||
while (!_cancelToken.IsCancellationRequested && !_shutdown)
|
||||
{
|
||||
try
|
||||
{
|
||||
//blocks until a client has connected to the server
|
||||
TcpClient client = listener.AcceptTcpClient();
|
||||
|
||||
// Wait for a client, spin up a thread.
|
||||
var clientThread = new Thread(new ParameterizedThreadStart(HandleNewClient));
|
||||
clientThread.Start(client);
|
||||
}
|
||||
catch (SocketException ex)
|
||||
{
|
||||
if (ex.SocketErrorCode == SocketError.Interrupted)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void HandleNewClient(object client)
|
||||
{
|
||||
var tcpClient = (TcpClient)client;
|
||||
|
||||
try
|
||||
{
|
||||
NetworkStream clientStream = tcpClient.GetStream();
|
||||
int i;
|
||||
Byte[] bytes = new Byte[16535];
|
||||
String data = null;
|
||||
|
||||
do
|
||||
{
|
||||
try
|
||||
{
|
||||
// Loop to receive all the data sent by the client.
|
||||
while ((i = clientStream.Read(bytes, 0, bytes.Length)) != 0)
|
||||
{
|
||||
// Translate data bytes to a ASCII string.
|
||||
data = System.Text.Encoding.ASCII.GetString(bytes, 0, i);
|
||||
//System.Diagnostics.Debug.WriteLine(String.Format("Received: {0}", data));
|
||||
|
||||
// Process the data sent by the client.
|
||||
data = ":1000\r\n";
|
||||
|
||||
byte[] msg = System.Text.Encoding.ASCII.GetBytes(data);
|
||||
|
||||
// Send back a response.
|
||||
clientStream.Write(msg, 0, msg.Length);
|
||||
// System.Diagnostics.Debug.WriteLine(String.Format("Sent: {0}", data));
|
||||
}
|
||||
}
|
||||
catch (IOException)
|
||||
{
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
System.Diagnostics.Debug.WriteLine(ex.ToString());
|
||||
}
|
||||
tcpClient.Close();
|
||||
}
|
||||
|
||||
private void ProcessJson(JObject json)
|
||||
{
|
||||
Console.WriteLine(json.ToString());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Newtonsoft.Json;
|
||||
@@ -18,7 +17,7 @@ namespace TimberWinR.UnitTests
|
||||
[TestFixture]
|
||||
public class MultilineTests
|
||||
{
|
||||
// [Test(Description = "Test using next")]
|
||||
// [Test(Description = "Test using next")]
|
||||
public void TestMultiline1()
|
||||
{
|
||||
using (StreamReader sr = new StreamReader("Multiline1.txt"))
|
||||
@@ -29,10 +28,10 @@ namespace TimberWinR.UnitTests
|
||||
|
||||
Stdin sin = new Stdin();
|
||||
|
||||
sin.Codec = new Codec();
|
||||
sin.Codec.Pattern = "\\\\$";
|
||||
sin.Codec.What = Codec.WhatType.next;
|
||||
sin.Codec.Type = Codec.CodecType.multiline;
|
||||
sin.CodecArguments = new CodecArguments();
|
||||
sin.CodecArguments.Pattern = "\\\\$";
|
||||
sin.CodecArguments.What = CodecArguments.WhatType.next;
|
||||
sin.CodecArguments.Type = CodecArguments.CodecType.multiline;
|
||||
|
||||
var cancelTokenSource = new CancellationTokenSource();
|
||||
|
||||
@@ -52,7 +51,7 @@ namespace TimberWinR.UnitTests
|
||||
if (!cancelTokenSource.Token.IsCancellationRequested)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(10000), cancelTokenSource.Token);
|
||||
}
|
||||
catch (OperationCanceledException oex)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
}
|
||||
@@ -67,7 +66,7 @@ namespace TimberWinR.UnitTests
|
||||
}
|
||||
}
|
||||
|
||||
// [Test(Description = "Test using previous")]
|
||||
// [Test(Description = "Test using previous")]
|
||||
public void TestMultiline2()
|
||||
{
|
||||
using (StreamReader sr = new StreamReader("Multiline2.txt"))
|
||||
@@ -78,11 +77,11 @@ namespace TimberWinR.UnitTests
|
||||
|
||||
Stdin sin = new Stdin();
|
||||
|
||||
sin.Codec = new Codec();
|
||||
sin.Codec.Pattern = "^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2},\\d{3})(.*)$";
|
||||
sin.Codec.What = Codec.WhatType.previous;
|
||||
sin.Codec.Type = Codec.CodecType.multiline;
|
||||
sin.Codec.Negate = true;
|
||||
sin.CodecArguments = new CodecArguments();
|
||||
sin.CodecArguments.Pattern = "^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2},\\d{3})(.*)$";
|
||||
sin.CodecArguments.What = CodecArguments.WhatType.previous;
|
||||
sin.CodecArguments.Type = CodecArguments.CodecType.multiline;
|
||||
sin.CodecArguments.Negate = true;
|
||||
|
||||
var cancelTokenSource = new CancellationTokenSource();
|
||||
|
||||
@@ -102,7 +101,7 @@ namespace TimberWinR.UnitTests
|
||||
if (!cancelTokenSource.Token.IsCancellationRequested)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(10000), cancelTokenSource.Token);
|
||||
}
|
||||
catch (OperationCanceledException oex)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,12 +10,12 @@
|
||||
|
||||
public class ElasticsearchOutputTests
|
||||
{
|
||||
private ElasticsearchOutput parser;
|
||||
private ElasticsearchOutputParameters parser;
|
||||
|
||||
[SetUp]
|
||||
public void Setup()
|
||||
{
|
||||
this.parser = new ElasticsearchOutput();
|
||||
this.parser = new ElasticsearchOutputParameters();
|
||||
}
|
||||
|
||||
[Test]
|
||||
|
||||
@@ -3,7 +3,6 @@ using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Newtonsoft.Json;
|
||||
@@ -61,7 +60,7 @@ namespace TimberWinR.UnitTests
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException oex)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Console.WriteLine("Done!");
|
||||
}
|
||||
|
||||
89
TimberWinR.UnitTests/TestDynamicBatchCount.cs
Normal file
89
TimberWinR.UnitTests/TestDynamicBatchCount.cs
Normal file
@@ -0,0 +1,89 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Newtonsoft.Json;
|
||||
using NUnit.Framework;
|
||||
using TimberWinR.Parser;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System.Threading;
|
||||
|
||||
|
||||
namespace TimberWinR.UnitTests
|
||||
{
|
||||
[TestFixture]
|
||||
public class TestDynamicBatchCount
|
||||
{
|
||||
// [Test]
|
||||
public void TestDynamicBatch()
|
||||
{
|
||||
var mgr = new Manager();
|
||||
mgr.LogfileDir = ".";
|
||||
|
||||
mgr.Config = new Configuration();
|
||||
|
||||
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
|
||||
|
||||
var cancelToken = cancelTokenSource.Token;
|
||||
|
||||
FakeRediServer fr = new FakeRediServer(cancelToken);
|
||||
|
||||
var redisParams = new RedisOutputParameters();
|
||||
redisParams.BatchCount = 10;
|
||||
redisParams.MaxBatchCount = 40;
|
||||
redisParams.Interval = 100;
|
||||
|
||||
var redisOutput = new Outputs.RedisOutput(mgr, redisParams, cancelToken);
|
||||
|
||||
|
||||
// Message is irrelavant
|
||||
JObject jsonMessage = new JObject
|
||||
{
|
||||
{"type", "Win32-FileLog"},
|
||||
{"ComputerName", "dev.vistaprint.net"},
|
||||
{"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"}
|
||||
};
|
||||
|
||||
|
||||
// Send 1000 messages at max throttle
|
||||
for (int i = 0; i < 1000; i++)
|
||||
{
|
||||
Thread.Sleep(10);
|
||||
redisOutput.Startup(jsonMessage);
|
||||
}
|
||||
|
||||
while (redisOutput.SentMessages < 1000)
|
||||
{
|
||||
System.Diagnostics.Debug.WriteLine(redisOutput.SentMessages);
|
||||
Thread.Sleep(1000);
|
||||
}
|
||||
|
||||
fr.Shutdown();
|
||||
|
||||
cancelTokenSource.Cancel();
|
||||
|
||||
System.Diagnostics.Debug.WriteLine(redisOutput.ToJson());
|
||||
System.Diagnostics.Debug.WriteLine(redisOutput.QueueDepth);
|
||||
|
||||
JObject json = redisOutput.ToJson();
|
||||
var mbc = json["redis"]["reachedMaxBatchCountTimes"].Value<int>();
|
||||
var sm = json["redis"]["sentMessageCount"].Value<int>();
|
||||
var errs = json["redis"]["errors"].Value<int>();
|
||||
var cbc = json["redis"]["currentBatchCount"].Value<int>();
|
||||
|
||||
// No errors
|
||||
Assert.AreEqual(0, errs);
|
||||
|
||||
// Should have reached max at least 1 time
|
||||
Assert.GreaterOrEqual(mbc, 1);
|
||||
|
||||
// Should have sent 1000 messages
|
||||
Assert.AreEqual(1000, sm);
|
||||
|
||||
// Should reset back down to original
|
||||
Assert.AreEqual(cbc, 10);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,6 +60,7 @@
|
||||
<ItemGroup>
|
||||
<Compile Include="Configuration.cs" />
|
||||
<Compile Include="DateFilterTests.cs" />
|
||||
<Compile Include="FakeRediServer.cs" />
|
||||
<Compile Include="GeoIPFilterTests.cs" />
|
||||
<Compile Include="Inputs\IisW3CRowReaderTests.cs" />
|
||||
<Compile Include="JsonFilterTests.cs" />
|
||||
@@ -69,6 +70,7 @@
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="TailFileTests.cs" />
|
||||
<Compile Include="TestBase.cs" />
|
||||
<Compile Include="TestDynamicBatchCount.cs" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\TimberWinR\TimberWinR.csproj">
|
||||
|
||||
90
TimberWinR/Codecs/Multiline.cs
Normal file
90
TimberWinR/Codecs/Multiline.cs
Normal file
@@ -0,0 +1,90 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Text.RegularExpressions;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using TimberWinR.Inputs;
|
||||
using TimberWinR.Parser;
|
||||
|
||||
namespace TimberWinR.Codecs
|
||||
{
|
||||
public class Multiline : ICodec
|
||||
{
|
||||
private CodecArguments _codecArguments;
|
||||
private List<string> _multiline { get; set; }
|
||||
|
||||
// return true to cancel codec
|
||||
public Multiline(CodecArguments args)
|
||||
{
|
||||
_codecArguments = args;
|
||||
}
|
||||
|
||||
public void Apply(string msg, InputListener listener)
|
||||
{
|
||||
if (_codecArguments.Re == null)
|
||||
_codecArguments.Re = new Regex(_codecArguments.Pattern);
|
||||
|
||||
Match match = _codecArguments.Re.Match(msg);
|
||||
|
||||
bool isMatch = (match.Success && !_codecArguments.Negate) || (!match.Success && _codecArguments.Negate);
|
||||
|
||||
switch (_codecArguments.What)
|
||||
{
|
||||
case CodecArguments.WhatType.previous:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No Match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codecArguments.MultilineTag));
|
||||
listener.AddDefaultFields(jo);
|
||||
listener.ProcessJson(jo);
|
||||
}
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
break;
|
||||
case CodecArguments.WhatType.next:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
_multiline.Add(msg);
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codecArguments.MultilineTag));
|
||||
listener.AddDefaultFields(jo);
|
||||
listener.ProcessJson(jo);
|
||||
}
|
||||
else
|
||||
{
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = msg;
|
||||
listener.AddDefaultFields(jo);
|
||||
listener.ProcessJson(jo);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,6 @@ using TimberWinR.Filters;
|
||||
using NLog;
|
||||
using TimberWinR.Parser;
|
||||
using Topshelf.Configurators;
|
||||
using IISW3CLog = TimberWinR.Parser.IISW3CLog;
|
||||
using WindowsEvent = TimberWinR.Parser.WindowsEvent;
|
||||
|
||||
namespace TimberWinR
|
||||
@@ -30,7 +29,7 @@ namespace TimberWinR
|
||||
public class Configuration
|
||||
{
|
||||
private CancellationToken _cancelToken;
|
||||
private bool _stopService;
|
||||
|
||||
private FileSystemWatcher _dirWatcher;
|
||||
private Manager _manager;
|
||||
|
||||
@@ -40,39 +39,39 @@ namespace TimberWinR
|
||||
get { return _events; }
|
||||
}
|
||||
|
||||
private List<RedisOutput> _redisOutputs = new List<RedisOutput>();
|
||||
public IEnumerable<RedisOutput> RedisOutputs
|
||||
private List<RedisOutputParameters> _redisOutputs = new List<RedisOutputParameters>();
|
||||
public IEnumerable<RedisOutputParameters> RedisOutputs
|
||||
{
|
||||
get { return _redisOutputs; }
|
||||
}
|
||||
|
||||
|
||||
private List<ElasticsearchOutput> _elasticsearchOutputs = new List<ElasticsearchOutput>();
|
||||
public IEnumerable<ElasticsearchOutput> ElasticsearchOutputs
|
||||
private List<ElasticsearchOutputParameters> _elasticsearchOutputs = new List<ElasticsearchOutputParameters>();
|
||||
public IEnumerable<ElasticsearchOutputParameters> ElasticsearchOutputs
|
||||
{
|
||||
get { return _elasticsearchOutputs; }
|
||||
}
|
||||
|
||||
private List<StdoutOutput> _stdoutOutputs = new List<StdoutOutput>();
|
||||
public IEnumerable<StdoutOutput> StdoutOutputs
|
||||
private List<StdoutOutputParameters> _stdoutOutputs = new List<StdoutOutputParameters>();
|
||||
public IEnumerable<StdoutOutputParameters> StdoutOutputs
|
||||
{
|
||||
get { return _stdoutOutputs; }
|
||||
}
|
||||
|
||||
private List<Tcp> _tcps = new List<Tcp>();
|
||||
public IEnumerable<Tcp> Tcps
|
||||
private List<TcpParameters> _tcps = new List<TcpParameters>();
|
||||
public IEnumerable<TcpParameters> Tcps
|
||||
{
|
||||
get { return _tcps; }
|
||||
}
|
||||
|
||||
private List<Udp> _udps = new List<Udp>();
|
||||
public IEnumerable<Udp> Udps
|
||||
private List<UdpParameters> _udps = new List<UdpParameters>();
|
||||
public IEnumerable<UdpParameters> Udps
|
||||
{
|
||||
get { return _udps; }
|
||||
}
|
||||
|
||||
private List<Log> _logs = new List<Log>();
|
||||
public IEnumerable<Log> Logs
|
||||
private List<LogParameters> _logs = new List<LogParameters>();
|
||||
public IEnumerable<LogParameters> Logs
|
||||
{
|
||||
get { return _logs; }
|
||||
}
|
||||
@@ -83,16 +82,16 @@ namespace TimberWinR
|
||||
get { return _tails; }
|
||||
}
|
||||
|
||||
private List<IISW3CLog> _iisw3clogs = new List<IISW3CLog>();
|
||||
private List<IISW3CLogParameters> _iisw3clogs = new List<IISW3CLogParameters>();
|
||||
|
||||
public IEnumerable<IISW3CLog> IISW3C
|
||||
public IEnumerable<IISW3CLogParameters> IISW3C
|
||||
{
|
||||
get { return _iisw3clogs; }
|
||||
}
|
||||
|
||||
private List<W3CLog> _w3clogs = new List<W3CLog>();
|
||||
private List<W3CLogParameters> _w3clogs = new List<W3CLogParameters>();
|
||||
|
||||
public IEnumerable<W3CLog> W3C
|
||||
public IEnumerable<W3CLogParameters> W3C
|
||||
{
|
||||
get { return _w3clogs; }
|
||||
}
|
||||
@@ -177,8 +176,7 @@ namespace TimberWinR
|
||||
}
|
||||
|
||||
private void ShutdownDirectoryMonitor()
|
||||
{
|
||||
_stopService = true;
|
||||
{
|
||||
_dirWatcher.EnableRaisingEvents = false;
|
||||
LogManager.GetCurrentClassLogger().Info("Stopping Directory Monitor");
|
||||
}
|
||||
@@ -291,13 +289,13 @@ namespace TimberWinR
|
||||
{
|
||||
_filters = new List<LogstashFilter>();
|
||||
_events = new List<WindowsEvent>();
|
||||
_iisw3clogs = new List<IISW3CLog>();
|
||||
_logs = new List<Log>();
|
||||
_redisOutputs = new List<RedisOutput>();
|
||||
_elasticsearchOutputs = new List<ElasticsearchOutput>();
|
||||
_stdoutOutputs = new List<StdoutOutput>();
|
||||
_tcps = new List<Tcp>();
|
||||
_udps = new List<Udp>();
|
||||
_iisw3clogs = new List<IISW3CLogParameters>();
|
||||
_logs = new List<LogParameters>();
|
||||
_redisOutputs = new List<RedisOutputParameters>();
|
||||
_elasticsearchOutputs = new List<ElasticsearchOutputParameters>();
|
||||
_stdoutOutputs = new List<StdoutOutputParameters>();
|
||||
_tcps = new List<TcpParameters>();
|
||||
_udps = new List<UdpParameters>();
|
||||
}
|
||||
|
||||
public static Object GetPropValue(String name, Object obj)
|
||||
|
||||
13
TimberWinR/ICodec.cs
Normal file
13
TimberWinR/ICodec.cs
Normal file
@@ -0,0 +1,13 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using TimberWinR.Inputs;
|
||||
|
||||
namespace TimberWinR
|
||||
{
|
||||
public interface ICodec
|
||||
{
|
||||
void Apply(string msg, InputListener listener);
|
||||
}
|
||||
}
|
||||
@@ -16,12 +16,12 @@ namespace TimberWinR.Inputs
|
||||
public class IISW3CInputListener : InputListener
|
||||
{
|
||||
private readonly int _pollingIntervalInSeconds;
|
||||
private readonly Parser.IISW3CLog _arguments;
|
||||
private readonly Parser.IISW3CLogParameters _arguments;
|
||||
private long _receivedMessages;
|
||||
public bool Stop { get; set; }
|
||||
private IisW3CRowReader rowReader;
|
||||
|
||||
public IISW3CInputListener(Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
|
||||
public IISW3CInputListener(Parser.IISW3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
|
||||
: base(cancelToken, "Win32-IISLog")
|
||||
{
|
||||
_arguments = arguments;
|
||||
@@ -138,7 +138,7 @@ namespace TimberWinR.Inputs
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ namespace TimberWinR.Inputs
|
||||
}
|
||||
}
|
||||
|
||||
protected virtual void AddDefaultFields(JObject json)
|
||||
public virtual void AddDefaultFields(JObject json)
|
||||
{
|
||||
if (json["type"] == null)
|
||||
json.Add(new JProperty("type", _typeName));
|
||||
@@ -100,7 +100,7 @@ namespace TimberWinR.Inputs
|
||||
json.Add(new JProperty("UtcTimestamp", utc.ToString("o")));
|
||||
}
|
||||
|
||||
protected void ProcessJson(JObject json)
|
||||
public void ProcessJson(JObject json)
|
||||
{
|
||||
if (OnMessageRecieved != null)
|
||||
{
|
||||
|
||||
@@ -14,37 +14,42 @@ using Newtonsoft.Json.Linq;
|
||||
using Newtonsoft.Json.Serialization;
|
||||
|
||||
using NLog;
|
||||
|
||||
using TimberWinR.Codecs;
|
||||
using LogQuery = Interop.MSUtil.LogQueryClassClass;
|
||||
using TextLineInputFormat = Interop.MSUtil.COMTextLineInputContextClass;
|
||||
using LogRecordSet = Interop.MSUtil.ILogRecordset;
|
||||
using TimberWinR.Parser;
|
||||
|
||||
namespace TimberWinR.Inputs
|
||||
{
|
||||
{
|
||||
/// <summary>
|
||||
/// Tail a file.
|
||||
/// </summary>
|
||||
public class LogsListener : InputListener
|
||||
{
|
||||
private int _pollingIntervalInSeconds;
|
||||
private TimberWinR.Parser.Log _arguments;
|
||||
private TimberWinR.Parser.LogParameters _arguments;
|
||||
private long _receivedMessages;
|
||||
private Dictionary<string, Int64> _logFileMaxRecords;
|
||||
private Dictionary<string, DateTime> _logFileCreationTimes;
|
||||
private Dictionary<string, DateTime> _logFileSampleTimes;
|
||||
private Dictionary<string, long> _logFileSizes;
|
||||
private Codec _codec;
|
||||
private List<string> _multiline { get; set; }
|
||||
private CodecArguments _codecArguments;
|
||||
private ICodec _codec;
|
||||
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken)
|
||||
public LogsListener(TimberWinR.Parser.LogParameters arguments, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Win32-FileLog")
|
||||
{
|
||||
Stop = false;
|
||||
|
||||
_codec = arguments.Codec;
|
||||
_codecArguments = arguments.CodecArguments;
|
||||
|
||||
_codecArguments = arguments.CodecArguments;
|
||||
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
|
||||
_codec = new Multiline(_codecArguments);
|
||||
|
||||
_logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
_logFileCreationTimes = new Dictionary<string, DateTime>();
|
||||
_logFileSampleTimes = new Dictionary<string, DateTime>();
|
||||
@@ -100,94 +105,22 @@ namespace TimberWinR.Inputs
|
||||
)));
|
||||
|
||||
|
||||
if (_codec != null)
|
||||
if (_codecArguments != null)
|
||||
{
|
||||
var cp = new JProperty("codec",
|
||||
new JArray(
|
||||
new JObject(
|
||||
new JProperty("type", _codec.Type.ToString()),
|
||||
new JProperty("what", _codec.What.ToString()),
|
||||
new JProperty("negate", _codec.Negate),
|
||||
new JProperty("multilineTag", _codec.MultilineTag),
|
||||
new JProperty("pattern", _codec.Pattern))));
|
||||
new JProperty("type", _codecArguments.Type.ToString()),
|
||||
new JProperty("what", _codecArguments.What.ToString()),
|
||||
new JProperty("negate", _codecArguments.Negate),
|
||||
new JProperty("multilineTag", _codecArguments.MultilineTag),
|
||||
new JProperty("pattern", _codecArguments.Pattern))));
|
||||
json.Add(cp);
|
||||
}
|
||||
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
// return true to cancel codec
|
||||
private void applyMultilineCodec(string msg)
|
||||
{
|
||||
if (_codec.Re == null)
|
||||
_codec.Re = new Regex(_codec.Pattern);
|
||||
|
||||
Match match = _codec.Re.Match(msg);
|
||||
|
||||
bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate);
|
||||
|
||||
switch (_codec.What)
|
||||
{
|
||||
case Codec.WhatType.previous:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No Match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codec.MultilineTag));
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
_receivedMessages++;
|
||||
}
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
break;
|
||||
case Codec.WhatType.next:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
_multiline.Add(msg);
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codec.MultilineTag));
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
_receivedMessages++;
|
||||
}
|
||||
else
|
||||
{
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = msg;
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
_receivedMessages++;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void FileWatcher(string fileToWatch)
|
||||
{
|
||||
@@ -298,8 +231,12 @@ namespace TimberWinR.Inputs
|
||||
string msg = json["Text"].ToString();
|
||||
if (!string.IsNullOrEmpty(msg))
|
||||
{
|
||||
if (_codec != null && _codec.Type == Codec.CodecType.multiline)
|
||||
applyMultilineCodec(msg);
|
||||
if (_codecArguments != null &&
|
||||
_codecArguments.Type == CodecArguments.CodecType.multiline)
|
||||
{
|
||||
_codec.Apply(msg, this);
|
||||
_receivedMessages++;
|
||||
}
|
||||
else
|
||||
{
|
||||
ProcessJson(json);
|
||||
@@ -317,18 +254,18 @@ namespace TimberWinR.Inputs
|
||||
rs.close();
|
||||
rs = null;
|
||||
GC.Collect();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (FileNotFoundException fnfex)
|
||||
{
|
||||
string fn = fnfex.FileName;
|
||||
|
||||
if (!_fnfmap.ContainsKey(fn))
|
||||
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
|
||||
|
||||
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
|
||||
|
||||
_fnfmap[fn] = fn;
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
@@ -338,10 +275,20 @@ namespace TimberWinR.Inputs
|
||||
}
|
||||
finally
|
||||
{
|
||||
oLogQuery = null;
|
||||
// Sleep
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
try
|
||||
{
|
||||
oLogQuery = null;
|
||||
// Sleep
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
catch (Exception ex1)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn(ex1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ using System.Threading;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using NLog;
|
||||
using TimberWinR.Codecs;
|
||||
using TimberWinR.Parser;
|
||||
|
||||
namespace TimberWinR.Inputs
|
||||
@@ -15,13 +16,16 @@ namespace TimberWinR.Inputs
|
||||
public class StdinListener : InputListener
|
||||
{
|
||||
private Thread _listenThread;
|
||||
private Codec _codec;
|
||||
private List<string> _multiline { get; set; }
|
||||
|
||||
private CodecArguments _codecArguments;
|
||||
private ICodec _codec;
|
||||
|
||||
public StdinListener(TimberWinR.Parser.Stdin arguments, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Win32-Console")
|
||||
{
|
||||
_codec = arguments.Codec;
|
||||
_codecArguments = arguments.CodecArguments;
|
||||
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
|
||||
_codec = new Multiline(_codecArguments);
|
||||
|
||||
_listenThread = new Thread(new ThreadStart(ListenToStdin));
|
||||
_listenThread.Start();
|
||||
}
|
||||
@@ -32,16 +36,16 @@ namespace TimberWinR.Inputs
|
||||
new JProperty("stdin", "enabled"));
|
||||
|
||||
|
||||
if (_codec != null)
|
||||
if (_codecArguments != null)
|
||||
{
|
||||
var cp = new JProperty("codec",
|
||||
new JArray(
|
||||
new JObject(
|
||||
new JProperty("type", _codec.Type.ToString()),
|
||||
new JProperty("what", _codec.What.ToString()),
|
||||
new JProperty("negate", _codec.Negate),
|
||||
new JProperty("multilineTag", _codec.MultilineTag),
|
||||
new JProperty("pattern", _codec.Pattern))));
|
||||
new JProperty("type", _codecArguments.Type.ToString()),
|
||||
new JProperty("what", _codecArguments.What.ToString()),
|
||||
new JProperty("negate", _codecArguments.Negate),
|
||||
new JProperty("multilineTag", _codecArguments.MultilineTag),
|
||||
new JProperty("pattern", _codecArguments.Pattern))));
|
||||
json.Add(cp);
|
||||
}
|
||||
|
||||
@@ -65,8 +69,8 @@ namespace TimberWinR.Inputs
|
||||
{
|
||||
string msg = ToPrintable(line);
|
||||
|
||||
if (_codec != null && _codec.Type == Codec.CodecType.multiline)
|
||||
applyMultilineCodec(msg);
|
||||
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
|
||||
_codec.Apply(msg, this);
|
||||
else
|
||||
{
|
||||
JObject jo = new JObject();
|
||||
@@ -78,73 +82,5 @@ namespace TimberWinR.Inputs
|
||||
}
|
||||
Finished();
|
||||
}
|
||||
|
||||
// return true to cancel codec
|
||||
private void applyMultilineCodec(string msg)
|
||||
{
|
||||
if (_codec.Re == null)
|
||||
_codec.Re = new Regex(_codec.Pattern);
|
||||
|
||||
Match match = _codec.Re.Match(msg);
|
||||
|
||||
bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate);
|
||||
|
||||
switch (_codec.What)
|
||||
{
|
||||
case Codec.WhatType.previous:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No Match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codec.MultilineTag));
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
}
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
break;
|
||||
case Codec.WhatType.next:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
_multiline.Add(msg);
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codec.MultilineTag));
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
}
|
||||
else
|
||||
{
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = msg;
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ using Newtonsoft.Json.Serialization;
|
||||
|
||||
using NLog;
|
||||
using NLog.LayoutRenderers;
|
||||
using TimberWinR.Codecs;
|
||||
using TimberWinR.Parser;
|
||||
|
||||
namespace TimberWinR.Inputs
|
||||
@@ -31,9 +32,9 @@ namespace TimberWinR.Inputs
|
||||
private Dictionary<string, Int64> _logFileMaxRecords;
|
||||
private Dictionary<string, DateTime> _logFileCreationTimes;
|
||||
private Dictionary<string, DateTime> _logFileSampleTimes;
|
||||
private Dictionary<string, long> _logFileSizes;
|
||||
private Codec _codec;
|
||||
private List<string> _multiline { get; set; }
|
||||
private Dictionary<string, long> _logFileSizes;
|
||||
private CodecArguments _codecArguments;
|
||||
private ICodec _codec;
|
||||
|
||||
public bool Stop { get; set; }
|
||||
|
||||
@@ -42,7 +43,11 @@ namespace TimberWinR.Inputs
|
||||
{
|
||||
Stop = false;
|
||||
|
||||
_codec = arguments.Codec;
|
||||
_codecArguments = arguments.CodecArguments;
|
||||
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
|
||||
_codec = new Multiline(_codecArguments);
|
||||
|
||||
|
||||
_logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
_logFileCreationTimes = new Dictionary<string, DateTime>();
|
||||
_logFileSampleTimes = new Dictionary<string, DateTime>();
|
||||
@@ -95,16 +100,16 @@ namespace TimberWinR.Inputs
|
||||
)));
|
||||
|
||||
|
||||
if (_codec != null)
|
||||
if (_codecArguments != null)
|
||||
{
|
||||
var cp = new JProperty("codec",
|
||||
new JArray(
|
||||
new JObject(
|
||||
new JProperty("type", _codec.Type.ToString()),
|
||||
new JProperty("what", _codec.What.ToString()),
|
||||
new JProperty("negate", _codec.Negate),
|
||||
new JProperty("multilineTag", _codec.MultilineTag),
|
||||
new JProperty("pattern", _codec.Pattern))));
|
||||
new JProperty("type", _codecArguments.Type.ToString()),
|
||||
new JProperty("what", _codecArguments.What.ToString()),
|
||||
new JProperty("negate", _codecArguments.Negate),
|
||||
new JProperty("multilineTag", _codecArguments.MultilineTag),
|
||||
new JProperty("pattern", _codecArguments.Pattern))));
|
||||
json.Add(cp);
|
||||
}
|
||||
|
||||
@@ -112,77 +117,6 @@ namespace TimberWinR.Inputs
|
||||
return json;
|
||||
}
|
||||
|
||||
// return true to cancel codec
|
||||
private void applyMultilineCodec(string msg)
|
||||
{
|
||||
if (_codec.Re == null)
|
||||
_codec.Re = new Regex(_codec.Pattern);
|
||||
|
||||
Match match = _codec.Re.Match(msg);
|
||||
|
||||
bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate);
|
||||
|
||||
switch (_codec.What)
|
||||
{
|
||||
case Codec.WhatType.previous:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No Match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codec.MultilineTag));
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
_receivedMessages++;
|
||||
}
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
break;
|
||||
case Codec.WhatType.next:
|
||||
if (isMatch)
|
||||
{
|
||||
if (_multiline == null)
|
||||
_multiline = new List<string>();
|
||||
_multiline.Add(msg);
|
||||
}
|
||||
else // No match
|
||||
{
|
||||
if (_multiline != null)
|
||||
{
|
||||
_multiline.Add(msg);
|
||||
string single = string.Join("\n", _multiline.ToArray());
|
||||
_multiline = null;
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = single;
|
||||
jo.Add("tags", new JArray(_codec.MultilineTag));
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
_receivedMessages++;
|
||||
}
|
||||
else
|
||||
{
|
||||
JObject jo = new JObject();
|
||||
jo["message"] = msg;
|
||||
AddDefaultFields(jo);
|
||||
ProcessJson(jo);
|
||||
_receivedMessages++;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private void TailFileContents(string fileName, long offset)
|
||||
{
|
||||
using (StreamReader reader = new StreamReader(new FileStream(fileName,
|
||||
@@ -222,15 +156,17 @@ namespace TimberWinR.Inputs
|
||||
json["Index"] = index;
|
||||
json["LogFileName"] = fileName;
|
||||
|
||||
if (_codec != null && _codec.Type == Codec.CodecType.multiline)
|
||||
applyMultilineCodec(line);
|
||||
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
|
||||
{
|
||||
_codec.Apply(line, this);
|
||||
Interlocked.Increment(ref _receivedMessages);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProcessJson(json);
|
||||
Interlocked.Increment(ref _receivedMessages);
|
||||
}
|
||||
lineOffset += line.Length;
|
||||
// Console.WriteLine("File: {0}:{1}: {2}", fileName, reader.BaseStream.Position, line);
|
||||
lineOffset += line.Length;
|
||||
}
|
||||
//update the last max offset
|
||||
lastMaxOffset = reader.BaseStream.Position;
|
||||
@@ -296,7 +232,7 @@ namespace TimberWinR.Inputs
|
||||
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
|
||||
_fnfmap[fn] = fn;
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
@@ -306,8 +242,18 @@ namespace TimberWinR.Inputs
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
try
|
||||
{
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
catch (Exception ex1)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn(ex1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,8 @@ namespace TimberWinR.Inputs
|
||||
public class UdpInputListener : InputListener
|
||||
{
|
||||
private readonly System.Net.Sockets.UdpClient _udpListener;
|
||||
private IPEndPoint groupV4;
|
||||
private IPEndPoint groupV6;
|
||||
private readonly IPEndPoint groupV4;
|
||||
private readonly IPEndPoint groupV6;
|
||||
|
||||
private Thread _listenThreadV4;
|
||||
private Thread _listenThreadV6;
|
||||
@@ -47,6 +47,9 @@ namespace TimberWinR.Inputs
|
||||
{
|
||||
_port = port;
|
||||
|
||||
groupV4 = new IPEndPoint(IPAddress.Any, 0);
|
||||
groupV6 = new IPEndPoint(IPAddress.IPv6Any, 0);
|
||||
|
||||
LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", _port);
|
||||
|
||||
_receivedMessages = 0;
|
||||
|
||||
@@ -23,11 +23,11 @@ namespace TimberWinR.Inputs
|
||||
public class W3CInputListener : InputListener
|
||||
{
|
||||
private readonly int _pollingIntervalInSeconds;
|
||||
private readonly TimberWinR.Parser.W3CLog _arguments;
|
||||
private readonly TimberWinR.Parser.W3CLogParameters _arguments;
|
||||
private long _receivedMessages;
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public W3CInputListener(TimberWinR.Parser.W3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
|
||||
public W3CInputListener(TimberWinR.Parser.W3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
|
||||
: base(cancelToken, "Win32-W3CLog")
|
||||
{
|
||||
_arguments = arguments;
|
||||
@@ -153,7 +153,7 @@ namespace TimberWinR.Inputs
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -165,7 +165,7 @@ namespace TimberWinR.Inputs
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -179,7 +179,7 @@ namespace TimberWinR
|
||||
}
|
||||
}
|
||||
|
||||
foreach (Parser.IISW3CLog iisw3cConfig in config.IISW3C)
|
||||
foreach (Parser.IISW3CLogParameters iisw3cConfig in config.IISW3C)
|
||||
{
|
||||
var elistner = new IISW3CInputListener(iisw3cConfig, cancelToken);
|
||||
Listeners.Add(elistner);
|
||||
@@ -187,7 +187,7 @@ namespace TimberWinR
|
||||
output.Connect(elistner);
|
||||
}
|
||||
|
||||
foreach (Parser.W3CLog iisw3cConfig in config.W3C)
|
||||
foreach (Parser.W3CLogParameters iisw3cConfig in config.W3C)
|
||||
{
|
||||
var elistner = new W3CInputListener(iisw3cConfig, cancelToken);
|
||||
Listeners.Add(elistner);
|
||||
|
||||
@@ -2,52 +2,91 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Elasticsearch.Net;
|
||||
using Elasticsearch.Net.ConnectionPool;
|
||||
using Nest;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using NLog;
|
||||
using RapidRegex.Core;
|
||||
using RestSharp;
|
||||
using System.Text.RegularExpressions;
|
||||
using Elasticsearch.Net.Serialization;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace TimberWinR.Outputs
|
||||
{
|
||||
using System.Text.RegularExpressions;
|
||||
public class Person
|
||||
{
|
||||
public string Firstname { get; set; }
|
||||
public string Lastname { get; set; }
|
||||
}
|
||||
|
||||
public partial class ElasticsearchOutput : OutputSender
|
||||
{
|
||||
private TimberWinR.Manager _manager;
|
||||
private readonly int _port;
|
||||
private readonly int _interval;
|
||||
private readonly string[] _host;
|
||||
private readonly string _protocol;
|
||||
private int _hostIndex;
|
||||
private readonly int _flushSize;
|
||||
private readonly int _idleFlushTimeSeconds;
|
||||
private readonly string[] _hosts;
|
||||
private readonly string _protocol;
|
||||
private readonly int _timeout;
|
||||
private readonly object _locker = new object();
|
||||
private readonly List<JObject> _jsonQueue;
|
||||
private readonly int _numThreads;
|
||||
private long _sentMessages;
|
||||
private long _errorCount;
|
||||
private Parser.ElasticsearchOutput eo;
|
||||
private readonly int _maxQueueSize;
|
||||
private readonly bool _queueOverflowDiscardOldest;
|
||||
private Parser.ElasticsearchOutputParameters _parameters;
|
||||
public bool Stop { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Get the bulk connection pool of hosts
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private ElasticClient getClient()
|
||||
{
|
||||
var nodes = new List<Uri>();
|
||||
foreach (var host in _hosts)
|
||||
{
|
||||
var url = string.Format("http://{0}:{1}", host, _port);
|
||||
nodes.Add(new Uri(url));
|
||||
}
|
||||
var pool = new StaticConnectionPool(nodes.ToArray());
|
||||
var settings = new ConnectionSettings(pool)
|
||||
.ExposeRawResponse();
|
||||
|
||||
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken)
|
||||
var client = new ElasticClient(settings);
|
||||
return client;
|
||||
}
|
||||
|
||||
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutputParameters parameters, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Elasticsearch")
|
||||
{
|
||||
_sentMessages = 0;
|
||||
_errorCount = 0;
|
||||
|
||||
this.eo = eo;
|
||||
_protocol = eo.Protocol;
|
||||
_timeout = eo.Timeout;
|
||||
_parameters = parameters;
|
||||
_flushSize = parameters.FlushSize;
|
||||
_idleFlushTimeSeconds = parameters.IdleFlushTimeInSeconds;
|
||||
_protocol = parameters.Protocol;
|
||||
_timeout = parameters.Timeout;
|
||||
_manager = manager;
|
||||
_port = eo.Port;
|
||||
_interval = eo.Interval;
|
||||
_host = eo.Host;
|
||||
_hostIndex = 0;
|
||||
_port = parameters.Port;
|
||||
_interval = parameters.Interval;
|
||||
_hosts = parameters.Host;
|
||||
_jsonQueue = new List<JObject>();
|
||||
_numThreads = eo.NumThreads;
|
||||
_numThreads = parameters.NumThreads;
|
||||
_maxQueueSize = parameters.MaxQueueSize;
|
||||
_queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest;
|
||||
|
||||
|
||||
for (int i = 0; i < eo.NumThreads; i++)
|
||||
for (int i = 0; i < parameters.NumThreads; i++)
|
||||
{
|
||||
Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
|
||||
}
|
||||
@@ -58,16 +97,20 @@ namespace TimberWinR.Outputs
|
||||
JObject json = new JObject(
|
||||
new JProperty("elasticsearch",
|
||||
new JObject(
|
||||
new JProperty("host", string.Join(",", _host)),
|
||||
new JProperty("host", string.Join(",", _hosts)),
|
||||
new JProperty("errors", _errorCount),
|
||||
new JProperty("sent_messages", _sentMessages),
|
||||
new JProperty("queued_messages", _jsonQueue.Count),
|
||||
new JProperty("sentMmessageCount", _sentMessages),
|
||||
new JProperty("queuedMessageCount", _jsonQueue.Count),
|
||||
new JProperty("port", _port),
|
||||
new JProperty("flushSize", _flushSize),
|
||||
new JProperty("idleFlushTime", _idleFlushTimeSeconds),
|
||||
new JProperty("interval", _interval),
|
||||
new JProperty("threads", _numThreads),
|
||||
new JProperty("maxQueueSize", _maxQueueSize),
|
||||
new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest),
|
||||
new JProperty("hosts",
|
||||
new JArray(
|
||||
from h in _host
|
||||
from h in _hosts
|
||||
select new JObject(
|
||||
new JProperty("host", h)))))));
|
||||
return json;
|
||||
@@ -77,137 +120,145 @@ namespace TimberWinR.Outputs
|
||||
//
|
||||
private void ElasticsearchSender()
|
||||
{
|
||||
// Force an inital flush
|
||||
DateTime lastFlushTime = DateTime.MinValue;
|
||||
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
{
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
{
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
JObject[] messages;
|
||||
{
|
||||
int messageCount = 0;
|
||||
List<JObject> messages = new List<JObject>();
|
||||
|
||||
// Lets get whats in the queue
|
||||
lock (_locker)
|
||||
{
|
||||
var count = _jsonQueue.Count;
|
||||
messages = _jsonQueue.Take(count).ToArray();
|
||||
_jsonQueue.RemoveRange(0, count);
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
messageCount = _jsonQueue.Count;
|
||||
|
||||
// Time to flush?
|
||||
if (messageCount >= _flushSize || (DateTime.UtcNow - lastFlushTime).Seconds >= _idleFlushTimeSeconds)
|
||||
{
|
||||
messages = _jsonQueue.Take(messageCount).ToList();
|
||||
_jsonQueue.RemoveRange(0, messageCount);
|
||||
if (messages.Count > 0)
|
||||
_manager.IncrementMessageCount(messages.Count);
|
||||
}
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
// We have some messages to work with
|
||||
if (messages.Count > 0)
|
||||
{
|
||||
int numHosts = _host.Length;
|
||||
while (numHosts-- > 0)
|
||||
var client = getClient();
|
||||
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Count, string.Join(",", _hosts));
|
||||
// This loop will process all messages we've taken from the queue
|
||||
// that have the same index and type (an elasticsearch requirement)
|
||||
do
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get the next client
|
||||
RestClient client = getClient();
|
||||
if (client != null)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl);
|
||||
// Grab all messages with same index and type (this is the whole point, group the same ones)
|
||||
var bulkTypeName = this._parameters.GetTypeName(messages[0]);
|
||||
var bulkIndexName = this._parameters.GetIndexName(messages[0]);
|
||||
|
||||
foreach (JObject json in messages)
|
||||
{
|
||||
var typeName = this.eo.GetTypeName(json);
|
||||
var indexName = this.eo.GetIndexName(json);
|
||||
var req =
|
||||
new RestRequest(string.Format("/{0}/{1}/", indexName, typeName),
|
||||
Method.POST);
|
||||
|
||||
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);
|
||||
|
||||
req.RequestFormat = DataFormat.Json;
|
||||
|
||||
try
|
||||
{
|
||||
client.ExecuteAsync(req, response =>
|
||||
{
|
||||
if (response.StatusCode != HttpStatusCode.Created)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Error("Failed to send: {0}", response.ErrorMessage);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
_sentMessages++;
|
||||
GC.Collect();
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception error)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(error);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
}
|
||||
GC.Collect();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Fatal("Unable to connect with any Elasticsearch hosts, {0}",
|
||||
String.Join(",", _host));
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
IEnumerable<JObject> bulkItems =
|
||||
messages.TakeWhile(
|
||||
message =>
|
||||
String.Compare(bulkTypeName, _parameters.GetTypeName(message), false) == 0 &&
|
||||
String.Compare(bulkIndexName, _parameters.GetIndexName(message), false) == 0);
|
||||
|
||||
// Send the message(s), if the are successfully sent, they
|
||||
// are removed from the queue
|
||||
lastFlushTime = transmitBulkData(bulkItems, bulkIndexName, bulkTypeName, client, lastFlushTime, messages);
|
||||
|
||||
GC.Collect();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while (messages.Count > 0);
|
||||
}
|
||||
GC.Collect();
|
||||
if (!Stop)
|
||||
{
|
||||
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
|
||||
{
|
||||
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception)
|
||||
catch (Exception ex)
|
||||
{
|
||||
throw;
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
private RestClient getClient()
|
||||
|
||||
//
|
||||
// Send the messages to Elasticsearch (bulk)
|
||||
//
|
||||
private DateTime transmitBulkData(IEnumerable<JObject> bulkItems, string bulkIndexName, string bulkTypeName,
|
||||
ElasticClient client, DateTime lastFlushTime, List<JObject> messages)
|
||||
{
|
||||
if (_hostIndex >= _host.Length)
|
||||
_hostIndex = 0;
|
||||
|
||||
int numTries = 0;
|
||||
while (numTries < _host.Length)
|
||||
var bulkRequest = new BulkRequest() {Refresh = true};
|
||||
bulkRequest.Operations = new List<IBulkOperation>();
|
||||
foreach (var json in bulkItems)
|
||||
{
|
||||
try
|
||||
{
|
||||
string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":", ""), _host[_hostIndex], _port);
|
||||
var client = new RestClient(url);
|
||||
client.Timeout = _timeout;
|
||||
|
||||
_hostIndex++;
|
||||
if (_hostIndex >= _host.Length)
|
||||
_hostIndex = 0;
|
||||
|
||||
return client;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
}
|
||||
numTries++;
|
||||
// ES requires a timestamp, add one if not present
|
||||
var ts = json["@timestamp"];
|
||||
if (ts == null)
|
||||
json["@timestamp"] = DateTime.UtcNow;
|
||||
var bi = new BulkIndexOperation<JObject>(json);
|
||||
bi.Index = bulkIndexName;
|
||||
bi.Type = bulkTypeName;
|
||||
bulkRequest.Operations.Add(bi);
|
||||
}
|
||||
|
||||
return null;
|
||||
// The total messages processed for this operation.
|
||||
int numMessages = bulkItems.Count();
|
||||
|
||||
var response = client.Bulk(bulkRequest);
|
||||
if (!response.IsValid)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error("Failed to send: {0}", response);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
interlockedInsert(messages); // Put the messages back into the queue
|
||||
}
|
||||
else // Success!
|
||||
{
|
||||
lastFlushTime = DateTime.UtcNow;
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Info("Successfully sent {0} messages in a single bulk request", numMessages);
|
||||
Interlocked.Add(ref _sentMessages, numMessages);
|
||||
}
|
||||
|
||||
// Remove them from the working list
|
||||
messages.RemoveRange(0, numMessages);
|
||||
return lastFlushTime;
|
||||
}
|
||||
|
||||
// Places messages back into the queue (for a future attempt)
|
||||
private void interlockedInsert(List<JObject> messages)
|
||||
{
|
||||
lock (_locker)
|
||||
{
|
||||
_jsonQueue.InsertRange(0, messages);
|
||||
if (_jsonQueue.Count > _maxQueueSize)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn("Exceeded maximum queue depth");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -221,6 +272,26 @@ namespace TimberWinR.Outputs
|
||||
|
||||
lock (_locker)
|
||||
{
|
||||
if (_jsonQueue.Count >= _maxQueueSize)
|
||||
{
|
||||
// If we've exceeded our queue size, and we're supposed to throw out the oldest objects first,
|
||||
// then remove as many as necessary to get us under our limit
|
||||
if (_queueOverflowDiscardOldest)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Warn("Overflow discarding oldest {0} messages", _jsonQueue.Count - _maxQueueSize + 1);
|
||||
|
||||
_jsonQueue.RemoveRange(0, (_jsonQueue.Count - _maxQueueSize) + 1);
|
||||
}
|
||||
// Otherwise we're in a "discard newest" mode, and this is the newest message, so just ignore it
|
||||
else
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Warn("Overflow discarding newest message: {0}", message);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
_jsonQueue.Add(jsonMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics.Eventing.Reader;
|
||||
using System.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using System.Net.Sockets;
|
||||
@@ -17,32 +18,126 @@ using TimberWinR.Parser;
|
||||
|
||||
namespace TimberWinR.Outputs
|
||||
{
|
||||
internal class BatchCounter
|
||||
{
|
||||
// Total number of times reached max batch count (indicates we are under pressure)
|
||||
public int ReachedMaxBatchCountTimes { get; set; }
|
||||
|
||||
private readonly int[] _sampleQueueDepths;
|
||||
private int _sampleCountIndex;
|
||||
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
|
||||
private object _locker = new object();
|
||||
private bool _warnedReachedMax;
|
||||
|
||||
private readonly int _maxBatchCount;
|
||||
private readonly int _batchCount;
|
||||
private int _totalSamples;
|
||||
|
||||
public int[] Samples()
|
||||
{
|
||||
return _sampleQueueDepths;
|
||||
}
|
||||
|
||||
public BatchCounter(int batchCount, int maxBatchCount)
|
||||
{
|
||||
_batchCount = batchCount;
|
||||
_maxBatchCount = maxBatchCount;
|
||||
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
|
||||
_sampleCountIndex = 0;
|
||||
_totalSamples = 0;
|
||||
ReachedMaxBatchCountTimes = 0;
|
||||
}
|
||||
public void SampleQueueDepth(int queueDepth)
|
||||
{
|
||||
lock (_locker)
|
||||
{
|
||||
if (_totalSamples < QUEUE_SAMPLE_SIZE)
|
||||
_totalSamples++;
|
||||
|
||||
// Take a sample of the queue depth
|
||||
if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE)
|
||||
_sampleCountIndex = 0;
|
||||
|
||||
_sampleQueueDepths[_sampleCountIndex++] = queueDepth;
|
||||
}
|
||||
}
|
||||
|
||||
public int AverageQueueDepth()
|
||||
{
|
||||
lock (_locker)
|
||||
{
|
||||
if (_totalSamples > 0)
|
||||
{
|
||||
var samples = _sampleQueueDepths.Take(_totalSamples);
|
||||
int avg = (int) samples.Average();
|
||||
return avg;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Sample the queue and adjust the batch count if needed (ramp up slowly)
|
||||
public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount)
|
||||
{
|
||||
if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount)
|
||||
{
|
||||
currentBatchCount += Math.Max(_maxBatchCount/_batchCount, 1);
|
||||
if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount);
|
||||
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
|
||||
ReachedMaxBatchCountTimes++;
|
||||
currentBatchCount = _maxBatchCount;
|
||||
}
|
||||
}
|
||||
else // Reset to default
|
||||
{
|
||||
currentBatchCount = _batchCount;
|
||||
_warnedReachedMax = false;
|
||||
}
|
||||
|
||||
return currentBatchCount;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public class RedisOutput : OutputSender
|
||||
{
|
||||
public int QueueDepth
|
||||
{
|
||||
get { return _jsonQueue.Count; }
|
||||
}
|
||||
|
||||
public long SentMessages
|
||||
{
|
||||
get { return _sentMessages; }
|
||||
}
|
||||
|
||||
private readonly string _logstashIndexName;
|
||||
private readonly int _port;
|
||||
private readonly int _timeout;
|
||||
private readonly object _locker = new object();
|
||||
private readonly List<string> _jsonQueue;
|
||||
// readonly Task _consumerTask;
|
||||
private readonly List<string> _jsonQueue;
|
||||
private readonly string[] _redisHosts;
|
||||
private int _redisHostIndex;
|
||||
private TimberWinR.Manager _manager;
|
||||
private readonly int _batchCount;
|
||||
private int _currentBatchCount;
|
||||
private readonly int _maxBatchCount;
|
||||
private readonly int _interval;
|
||||
private readonly int _numThreads;
|
||||
|
||||
private readonly int _numThreads;
|
||||
private long _sentMessages;
|
||||
private long _errorCount;
|
||||
private long _redisDepth;
|
||||
|
||||
private int _maxQueueSize;
|
||||
private bool _queueOverflowDiscardOldest;
|
||||
private DateTime? _lastErrorTimeUTC;
|
||||
private readonly int _maxQueueSize;
|
||||
private readonly bool _queueOverflowDiscardOldest;
|
||||
private BatchCounter _batchCounter;
|
||||
|
||||
public bool Stop { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Get the next client
|
||||
/// Get the next client from the list of hosts.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
private RedisClient getClient()
|
||||
@@ -56,16 +151,17 @@ namespace TimberWinR.Outputs
|
||||
try
|
||||
{
|
||||
RedisClient client = new RedisClient(_redisHosts[_redisHostIndex], _port, _timeout);
|
||||
|
||||
_redisHostIndex++;
|
||||
if (_redisHostIndex >= _redisHosts.Length)
|
||||
_redisHostIndex = 0;
|
||||
|
||||
return client;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
}
|
||||
finally
|
||||
{
|
||||
_redisHostIndex++;
|
||||
if (_redisHostIndex >= _redisHosts.Length)
|
||||
_redisHostIndex = 0;
|
||||
}
|
||||
numTries++;
|
||||
}
|
||||
|
||||
@@ -79,15 +175,21 @@ namespace TimberWinR.Outputs
|
||||
new JObject(
|
||||
new JProperty("host", string.Join(",", _redisHosts)),
|
||||
new JProperty("errors", _errorCount),
|
||||
new JProperty("redis_depth", _redisDepth),
|
||||
new JProperty("sent_messages", _sentMessages),
|
||||
new JProperty("queued_messages", _jsonQueue.Count),
|
||||
new JProperty("lastErrorTimeUTC", _lastErrorTimeUTC),
|
||||
new JProperty("redisQueueDepth", _redisDepth),
|
||||
new JProperty("sentMessageCount", _sentMessages),
|
||||
new JProperty("queuedMessageCount", _jsonQueue.Count),
|
||||
new JProperty("port", _port),
|
||||
new JProperty("maxQueueSize", _maxQueueSize),
|
||||
new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest),
|
||||
new JProperty("interval", _interval),
|
||||
new JProperty("threads", _numThreads),
|
||||
new JProperty("batchcount", _batchCount),
|
||||
new JProperty("currentBatchCount", _currentBatchCount),
|
||||
new JProperty("reachedMaxBatchCountTimes", _batchCounter.ReachedMaxBatchCountTimes),
|
||||
new JProperty("maxBatchCount", _maxBatchCount),
|
||||
new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()),
|
||||
new JProperty("queueSamples", new JArray(_batchCounter.Samples())),
|
||||
new JProperty("index", _logstashIndexName),
|
||||
new JProperty("hosts",
|
||||
new JArray(
|
||||
@@ -97,25 +199,33 @@ namespace TimberWinR.Outputs
|
||||
return json;
|
||||
}
|
||||
|
||||
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
|
||||
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutputParameters parameters, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Redis")
|
||||
{
|
||||
{
|
||||
_redisDepth = 0;
|
||||
_batchCount = ro.BatchCount;
|
||||
_batchCount = parameters.BatchCount;
|
||||
_maxBatchCount = parameters.MaxBatchCount;
|
||||
// Make sure maxBatchCount is larger than batchCount
|
||||
if (_maxBatchCount < _batchCount)
|
||||
_maxBatchCount = _batchCount*10;
|
||||
|
||||
_manager = manager;
|
||||
_redisHostIndex = 0;
|
||||
_redisHosts = ro.Host;
|
||||
_redisHosts = parameters.Host;
|
||||
_jsonQueue = new List<string>();
|
||||
_port = ro.Port;
|
||||
_timeout = ro.Timeout;
|
||||
_logstashIndexName = ro.Index;
|
||||
_interval = ro.Interval;
|
||||
_numThreads = ro.NumThreads;
|
||||
_port = parameters.Port;
|
||||
_timeout = parameters.Timeout;
|
||||
_logstashIndexName = parameters.Index;
|
||||
_interval = parameters.Interval;
|
||||
_numThreads = parameters.NumThreads;
|
||||
_errorCount = 0;
|
||||
_maxQueueSize = ro.MaxQueueSize;
|
||||
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
|
||||
|
||||
for (int i = 0; i < ro.NumThreads; i++)
|
||||
_lastErrorTimeUTC = null;
|
||||
_maxQueueSize = parameters.MaxQueueSize;
|
||||
_queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest;
|
||||
_batchCounter = new BatchCounter(_batchCount, _maxBatchCount);
|
||||
_currentBatchCount = _batchCount;
|
||||
|
||||
for (int i = 0; i < parameters.NumThreads; i++)
|
||||
{
|
||||
var redisThread = new Task(RedisSender, cancelToken);
|
||||
redisThread.Start();
|
||||
@@ -181,8 +291,7 @@ namespace TimberWinR.Outputs
|
||||
}
|
||||
}
|
||||
return drop;
|
||||
}
|
||||
|
||||
}
|
||||
//
|
||||
// Pull off messages from the Queue, batch them up and send them all across
|
||||
//
|
||||
@@ -198,17 +307,23 @@ namespace TimberWinR.Outputs
|
||||
try
|
||||
{
|
||||
string[] messages;
|
||||
// Exclusively
|
||||
lock (_locker)
|
||||
{
|
||||
messages = _jsonQueue.Take(_batchCount).ToArray();
|
||||
_batchCounter.SampleQueueDepth(_jsonQueue.Count);
|
||||
// Re-compute current batch size
|
||||
_currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount);
|
||||
|
||||
messages = _jsonQueue.Take(_currentBatchCount).ToArray();
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
|
||||
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
int numHosts = _redisHosts.Length;
|
||||
bool sentSuccessfully = false;
|
||||
while (numHosts-- > 0)
|
||||
{
|
||||
try
|
||||
@@ -225,17 +340,24 @@ namespace TimberWinR.Outputs
|
||||
try
|
||||
{
|
||||
_redisDepth = client.RPush(_logstashIndexName, messages);
|
||||
_sentMessages += messages.Length;
|
||||
_sentMessages += messages.Length;
|
||||
client.EndPipe();
|
||||
sentSuccessfully = true;
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
}
|
||||
catch (SocketException ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
_lastErrorTimeUTC = DateTime.UtcNow;
|
||||
}
|
||||
finally
|
||||
catch (Exception ex)
|
||||
{
|
||||
client.EndPipe();
|
||||
}
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
_lastErrorTimeUTC = DateTime.UtcNow;
|
||||
}
|
||||
break;
|
||||
}
|
||||
else
|
||||
@@ -244,6 +366,7 @@ namespace TimberWinR.Outputs
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Fatal("Unable to connect with any Redis hosts, {0}",
|
||||
String.Join(",", _redisHosts));
|
||||
_lastErrorTimeUTC = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -251,25 +374,40 @@ namespace TimberWinR.Outputs
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
_lastErrorTimeUTC = DateTime.UtcNow;
|
||||
}
|
||||
} // No more hosts to try.
|
||||
|
||||
|
||||
if (!sentSuccessfully)
|
||||
{
|
||||
lock (_locker)
|
||||
{
|
||||
_jsonQueue.InsertRange(0, messages);
|
||||
}
|
||||
}
|
||||
}
|
||||
GC.Collect();
|
||||
// GC.Collect();
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch(ThreadAbortException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
throw;
|
||||
_lastErrorTimeUTC = DateTime.UtcNow;
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ namespace TimberWinR.Outputs
|
||||
private long _sentMessages;
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken)
|
||||
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutputParameters eo, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Stdout")
|
||||
{
|
||||
_sentMessages = 0;
|
||||
@@ -34,7 +34,7 @@ namespace TimberWinR.Outputs
|
||||
JObject json = new JObject(
|
||||
new JProperty("stdout",
|
||||
new JObject(
|
||||
new JProperty("sent_messages", _sentMessages))));
|
||||
new JProperty("sentMessageCount", _sentMessages))));
|
||||
|
||||
return json;
|
||||
}
|
||||
@@ -78,7 +78,7 @@ namespace TimberWinR.Outputs
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -255,7 +255,7 @@ namespace TimberWinR.Parser
|
||||
public class Stdin : IValidateSchema
|
||||
{
|
||||
[JsonProperty(PropertyName = "codec")]
|
||||
public Codec Codec { get; set; }
|
||||
public CodecArguments CodecArguments { get; set; }
|
||||
|
||||
public void Validate()
|
||||
{
|
||||
@@ -263,7 +263,7 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
}
|
||||
|
||||
public class Codec
|
||||
public class CodecArguments
|
||||
{
|
||||
public enum CodecType
|
||||
{
|
||||
@@ -290,7 +290,7 @@ namespace TimberWinR.Parser
|
||||
|
||||
public Regex Re { get; set; }
|
||||
|
||||
public Codec()
|
||||
public CodecArguments()
|
||||
{
|
||||
Negate = false;
|
||||
MultilineTag = "multiline";
|
||||
@@ -310,7 +310,7 @@ namespace TimberWinR.Parser
|
||||
[JsonProperty(PropertyName = "logSource")]
|
||||
public string LogSource { get; set; }
|
||||
[JsonProperty(PropertyName = "codec")]
|
||||
public Codec Codec { get; set; }
|
||||
public CodecArguments CodecArguments { get; set; }
|
||||
|
||||
public TailFile()
|
||||
{
|
||||
@@ -327,7 +327,7 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
}
|
||||
|
||||
public class Log : IValidateSchema
|
||||
public class LogParameters : IValidateSchema
|
||||
{
|
||||
[JsonProperty(PropertyName = "location")]
|
||||
public string Location { get; set; }
|
||||
@@ -344,9 +344,9 @@ namespace TimberWinR.Parser
|
||||
[JsonProperty(PropertyName = "logSource")]
|
||||
public string LogSource { get; set; }
|
||||
[JsonProperty(PropertyName = "codec")]
|
||||
public Codec Codec { get; set; }
|
||||
public CodecArguments CodecArguments { get; set; }
|
||||
|
||||
public Log()
|
||||
public LogParameters()
|
||||
{
|
||||
Fields = new List<Field>();
|
||||
Fields.Add(new Field("LogFilename", "string"));
|
||||
@@ -361,12 +361,12 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
}
|
||||
|
||||
public class Tcp : IValidateSchema
|
||||
public class TcpParameters : IValidateSchema
|
||||
{
|
||||
[JsonProperty(PropertyName = "port")]
|
||||
public int Port { get; set; }
|
||||
|
||||
public Tcp()
|
||||
public TcpParameters()
|
||||
{
|
||||
Port = 5140;
|
||||
}
|
||||
@@ -378,12 +378,12 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
|
||||
|
||||
public class Udp : IValidateSchema
|
||||
public class UdpParameters : IValidateSchema
|
||||
{
|
||||
[JsonProperty(PropertyName = "port")]
|
||||
public int Port { get; set; }
|
||||
|
||||
public Udp()
|
||||
public UdpParameters()
|
||||
{
|
||||
Port = 5142;
|
||||
}
|
||||
@@ -393,7 +393,7 @@ namespace TimberWinR.Parser
|
||||
|
||||
}
|
||||
}
|
||||
public class W3CLog : IValidateSchema
|
||||
public class W3CLogParameters : IValidateSchema
|
||||
{
|
||||
[JsonProperty(PropertyName = "location")]
|
||||
public string Location { get; set; }
|
||||
@@ -410,7 +410,7 @@ namespace TimberWinR.Parser
|
||||
[JsonProperty(PropertyName = "fields")]
|
||||
public List<Field> Fields { get; set; }
|
||||
|
||||
public W3CLog()
|
||||
public W3CLogParameters()
|
||||
{
|
||||
CodePage = 0;
|
||||
DtLines = 10;
|
||||
@@ -428,7 +428,7 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
|
||||
|
||||
public class IISW3CLog : IValidateSchema
|
||||
public class IISW3CLogParameters : IValidateSchema
|
||||
{
|
||||
[JsonProperty(PropertyName = "location")]
|
||||
public string Location { get; set; }
|
||||
@@ -448,7 +448,7 @@ namespace TimberWinR.Parser
|
||||
[JsonProperty(PropertyName = "fields")]
|
||||
public List<Field> Fields { get; set; }
|
||||
|
||||
public IISW3CLog()
|
||||
public IISW3CLogParameters()
|
||||
{
|
||||
CodePage = -2;
|
||||
Recurse = 0;
|
||||
@@ -494,7 +494,7 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
}
|
||||
|
||||
public class ElasticsearchOutput
|
||||
public class ElasticsearchOutputParameters
|
||||
{
|
||||
const string IndexDatePattern = "(%\\{(?<format>[^\\}]+)\\})";
|
||||
|
||||
@@ -512,9 +512,19 @@ namespace TimberWinR.Parser
|
||||
public string Protocol { get; set; }
|
||||
[JsonProperty(PropertyName = "interval")]
|
||||
public int Interval { get; set; }
|
||||
[JsonProperty(PropertyName = "flush_size")]
|
||||
public int FlushSize { get; set; }
|
||||
[JsonProperty(PropertyName = "idle_flush_time")]
|
||||
public int IdleFlushTimeInSeconds { get; set; }
|
||||
[JsonProperty(PropertyName = "max_queue_size")]
|
||||
public int MaxQueueSize { get; set; }
|
||||
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
|
||||
public bool QueueOverflowDiscardOldest { get; set; }
|
||||
|
||||
public ElasticsearchOutput()
|
||||
public ElasticsearchOutputParameters()
|
||||
{
|
||||
FlushSize = 5000;
|
||||
IdleFlushTimeInSeconds = 10;
|
||||
Protocol = "http";
|
||||
Port = 9200;
|
||||
Index = "";
|
||||
@@ -522,6 +532,8 @@ namespace TimberWinR.Parser
|
||||
Timeout = 10000;
|
||||
NumThreads = 1;
|
||||
Interval = 1000;
|
||||
QueueOverflowDiscardOldest = true;
|
||||
MaxQueueSize = 50000;
|
||||
}
|
||||
|
||||
public string GetIndexName(JObject json)
|
||||
@@ -564,7 +576,7 @@ namespace TimberWinR.Parser
|
||||
|
||||
}
|
||||
|
||||
public class RedisOutput
|
||||
public class RedisOutputParameters
|
||||
{
|
||||
[JsonProperty(PropertyName = "host")]
|
||||
public string[] Host { get; set; }
|
||||
@@ -576,6 +588,8 @@ namespace TimberWinR.Parser
|
||||
public int Timeout { get; set; }
|
||||
[JsonProperty(PropertyName = "batch_count")]
|
||||
public int BatchCount { get; set; }
|
||||
[JsonProperty(PropertyName = "max_batch_count")]
|
||||
public int MaxBatchCount { get; set; }
|
||||
[JsonProperty(PropertyName = "threads")]
|
||||
public int NumThreads { get; set; }
|
||||
[JsonProperty(PropertyName = "interval")]
|
||||
@@ -585,13 +599,14 @@ namespace TimberWinR.Parser
|
||||
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
|
||||
public bool QueueOverflowDiscardOldest { get; set; }
|
||||
|
||||
public RedisOutput()
|
||||
public RedisOutputParameters()
|
||||
{
|
||||
Port = 6379;
|
||||
Index = "logstash";
|
||||
Host = new string[] { "localhost" };
|
||||
Timeout = 10000;
|
||||
BatchCount = 10;
|
||||
MaxBatchCount = BatchCount*10;
|
||||
NumThreads = 1;
|
||||
Interval = 5000;
|
||||
QueueOverflowDiscardOldest = true;
|
||||
@@ -599,12 +614,12 @@ namespace TimberWinR.Parser
|
||||
}
|
||||
}
|
||||
|
||||
public class StdoutOutput
|
||||
public class StdoutOutputParameters
|
||||
{
|
||||
[JsonProperty(PropertyName = "interval")]
|
||||
public int Interval { get; set; }
|
||||
|
||||
public StdoutOutput()
|
||||
public StdoutOutputParameters()
|
||||
{
|
||||
Interval = 1000;
|
||||
}
|
||||
@@ -613,13 +628,13 @@ namespace TimberWinR.Parser
|
||||
public class OutputTargets
|
||||
{
|
||||
[JsonProperty("Redis")]
|
||||
public RedisOutput[] Redis { get; set; }
|
||||
public RedisOutputParameters[] Redis { get; set; }
|
||||
|
||||
[JsonProperty("Elasticsearch")]
|
||||
public ElasticsearchOutput[] Elasticsearch { get; set; }
|
||||
public ElasticsearchOutputParameters[] Elasticsearch { get; set; }
|
||||
|
||||
[JsonProperty("Stdout")]
|
||||
public StdoutOutput[] Stdout { get; set; }
|
||||
public StdoutOutputParameters[] Stdout { get; set; }
|
||||
}
|
||||
|
||||
public class InputSources
|
||||
@@ -628,22 +643,22 @@ namespace TimberWinR.Parser
|
||||
public WindowsEvent[] WindowsEvents { get; set; }
|
||||
|
||||
[JsonProperty("Logs")]
|
||||
public Log[] Logs { get; set; }
|
||||
public LogParameters[] Logs { get; set; }
|
||||
|
||||
[JsonProperty("TailFiles")]
|
||||
public TailFile[] TailFiles { get; set; }
|
||||
|
||||
[JsonProperty("Tcp")]
|
||||
public Tcp[] Tcps { get; set; }
|
||||
public TcpParameters[] Tcps { get; set; }
|
||||
|
||||
[JsonProperty("Udp")]
|
||||
public Udp[] Udps { get; set; }
|
||||
public UdpParameters[] Udps { get; set; }
|
||||
|
||||
[JsonProperty("IISW3CLogs")]
|
||||
public IISW3CLog[] IISW3CLogs { get; set; }
|
||||
public IISW3CLogParameters[] IISW3CLogs { get; set; }
|
||||
|
||||
[JsonProperty("W3CLogs")]
|
||||
public W3CLog[] W3CLogs { get; set; }
|
||||
public W3CLogParameters[] W3CLogs { get; set; }
|
||||
|
||||
[JsonProperty("Stdin")]
|
||||
public Stdin[] Stdins { get; set; }
|
||||
|
||||
@@ -3,6 +3,10 @@
|
||||
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
|
||||
|
||||
Version History
|
||||
### 1.3.19.1 - 03/03/2015
|
||||
1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time
|
||||
to handle input flooding. Default is _batch\_count_ * 10
|
||||
|
||||
### 1.3.19.0 - 02/26/2015
|
||||
|
||||
1. Added support for Multiline codecs for Stdin and Logs listeners, closes issue [#23](https://github.com/Cimpress-MCP/TimberWinR/issues/23)
|
||||
|
||||
@@ -34,6 +34,9 @@
|
||||
<Reference Include="csredis">
|
||||
<HintPath>..\packages\csredis.1.4.7.1\lib\net40\csredis.dll</HintPath>
|
||||
</Reference>
|
||||
<Reference Include="Elasticsearch.Net">
|
||||
<HintPath>..\packages\Elasticsearch.Net.1.3.1\lib\Elasticsearch.Net.dll</HintPath>
|
||||
</Reference>
|
||||
<Reference Include="Interop.MSUtil, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
|
||||
<SpecificVersion>False</SpecificVersion>
|
||||
<EmbedInteropTypes>False</EmbedInteropTypes>
|
||||
@@ -47,6 +50,9 @@
|
||||
<HintPath>..\packages\MaxMind.GeoIP2.0.4.0.0\lib\net40\MaxMind.GeoIP2.dll</HintPath>
|
||||
<Private>True</Private>
|
||||
</Reference>
|
||||
<Reference Include="Nest">
|
||||
<HintPath>..\packages\NEST.1.3.1\lib\Nest.dll</HintPath>
|
||||
</Reference>
|
||||
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
|
||||
<SpecificVersion>False</SpecificVersion>
|
||||
<HintPath>..\packages\Newtonsoft.Json.6.0.4\lib\net40\Newtonsoft.Json.dll</HintPath>
|
||||
@@ -76,6 +82,7 @@
|
||||
</Reference>
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Compile Include="Codecs\Multiline.cs" />
|
||||
<Compile Include="Configuration.cs" />
|
||||
<Compile Include="ConfigurationErrors.cs" />
|
||||
<Compile Include="Diagnostics\Diagnostics.cs" />
|
||||
@@ -85,6 +92,7 @@
|
||||
<Compile Include="Filters\GeoIPFilter.cs" />
|
||||
<Compile Include="Filters\JsonFilter.cs" />
|
||||
<Compile Include="Filters\MutateFilter.cs" />
|
||||
<Compile Include="ICodec.cs" />
|
||||
<Compile Include="Inputs\FieldDefinitions.cs" />
|
||||
<Compile Include="Inputs\IISW3CRowReader.cs" />
|
||||
<Compile Include="Inputs\LogsFileDatabase.cs" />
|
||||
|
||||
@@ -7,11 +7,15 @@ The following parameters are allowed when configuring the Redis output.
|
||||
|
||||
| Parameter | Type | Description | Details | Default |
|
||||
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
|
||||
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
|
||||
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
|
||||
| *index* | string | The index name to use | index used/created | logstash-yyyy.dd.mm |
|
||||
| *host* | [string] | The hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
|
||||
| *port* | integer | Redis port number | This port must be open | 9200 |
|
||||
| *flush_size* | integer | Maximum number of messages before flushing | | 50000 |
|
||||
| *host* | [string] | Array of hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
|
||||
| *idle_flush_time* | integer | Maximum number of seconds elapsed before triggering a flush | | 10 |
|
||||
| *index* | [string] | The index name to use | index used/created | logstash-yyyy.dd.mm |
|
||||
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
|
||||
| *max_queue_size* | integer | Maximum Elasticsearch queue depth | | 50000 |
|
||||
| *port* | integer | Elasticsearch port number | This port must be open | 9200 |
|
||||
| *queue_overflow_discard_oldest* | bool | If true, discard oldest messages when max_queue_size reached otherwise discard newest | | true |
|
||||
| *threads* | [string] | Number of Threads | Number of worker threads processing messages | 1 |
|
||||
|
||||
### Index parameter
|
||||
If you want to output your data everyday to a new index, use following index format: "index-%{yyyy.MM.dd}". Here date format could be any forwat which you need.
|
||||
|
||||
@@ -9,6 +9,7 @@ The following parameters are allowed when configuring the Redis output.
|
||||
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
|
||||
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
|
||||
| *batch_count* | integer | Sent as a single message | Number of messages to aggregate | 10 |
|
||||
| *max_batch_count* | integer | Dynamically adjusted count maximum | Increases over time | batch_count*10 |
|
||||
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
|
||||
| *index* | string | The name of the redis list | logstash index name | logstash |
|
||||
| *host* | [string] | The hostname(s) of your Redis server(s) | IP or DNS name | |
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<packages>
|
||||
<package id="csredis" version="1.4.7.1" targetFramework="net40" />
|
||||
<package id="Elasticsearch.Net" version="1.3.1" targetFramework="net40" />
|
||||
<package id="MaxMind.Db" version="0.2.3.0" targetFramework="net40" />
|
||||
<package id="MaxMind.GeoIP2" version="0.4.0.0" targetFramework="net40" />
|
||||
<package id="NEST" version="1.3.1" targetFramework="net40" />
|
||||
<package id="Newtonsoft.Json" version="6.0.4" targetFramework="net40" />
|
||||
<package id="NLog" version="3.1.0.0" targetFramework="net40" />
|
||||
<package id="RapidRegex.Core" version="1.0.0.2" targetFramework="net40" />
|
||||
|
||||
BIN
packages/Elasticsearch.Net.1.3.1/Elasticsearch.Net.1.3.1.nupkg
vendored
Normal file
BIN
packages/Elasticsearch.Net.1.3.1/Elasticsearch.Net.1.3.1.nupkg
vendored
Normal file
Binary file not shown.
41449
packages/Elasticsearch.Net.1.3.1/lib/Elasticsearch.Net.XML
vendored
Normal file
41449
packages/Elasticsearch.Net.1.3.1/lib/Elasticsearch.Net.XML
vendored
Normal file
File diff suppressed because it is too large
Load Diff
BIN
packages/Elasticsearch.Net.1.3.1/lib/Elasticsearch.Net.dll
vendored
Normal file
BIN
packages/Elasticsearch.Net.1.3.1/lib/Elasticsearch.Net.dll
vendored
Normal file
Binary file not shown.
BIN
packages/Elasticsearch.Net.1.3.1/lib/Elasticsearch.Net.pdb
vendored
Normal file
BIN
packages/Elasticsearch.Net.1.3.1/lib/Elasticsearch.Net.pdb
vendored
Normal file
Binary file not shown.
BIN
packages/NEST.1.3.1/NEST.1.3.1.nupkg
vendored
Normal file
BIN
packages/NEST.1.3.1/NEST.1.3.1.nupkg
vendored
Normal file
Binary file not shown.
12447
packages/NEST.1.3.1/lib/Nest.XML
vendored
Normal file
12447
packages/NEST.1.3.1/lib/Nest.XML
vendored
Normal file
File diff suppressed because it is too large
Load Diff
BIN
packages/NEST.1.3.1/lib/Nest.dll
vendored
Normal file
BIN
packages/NEST.1.3.1/lib/Nest.dll
vendored
Normal file
Binary file not shown.
BIN
packages/NEST.1.3.1/lib/Nest.pdb
vendored
Normal file
BIN
packages/NEST.1.3.1/lib/Nest.pdb
vendored
Normal file
Binary file not shown.
Reference in New Issue
Block a user