38 Commits

Author SHA1 Message Date
Eric Fontana
9ef79978a6 Undo bad commit 2015-03-16 08:34:07 -04:00
Eric Fontana
47f233f863 No functional changes - refactored to common code for Multiline codec 2015-03-16 08:32:54 -04:00
Eric Fontana
b937d6ef45 Fixed doc to match default maxQueueSize 2015-03-11 10:29:31 -04:00
Eric Fontana
c16331ac10 Incorporated changes from reviews 2015-03-11 10:28:13 -04:00
Eric Fontana
ac10640edf Disable test for AppVeyor builds 2015-03-06 11:57:00 -05:00
Eric Fontana
e9c27c8c19 Doc tweaks 2015-03-06 10:39:20 -05:00
Eric Fontana
7be95a976e Re-factored to use Elasticsearch NEST and Bulk API 2015-03-06 10:01:50 -05:00
Eric Fontana
f2b0f1a85d Merge pull request #31 from Cimpress-MCP/mitigate_flood
Added new max_batch_count parameter to mitigate flooding when large burs...
2015-03-05 06:40:20 -05:00
Eric Fontana
eaba99144e Fixed test comment 2015-03-05 06:39:26 -05:00
Eric Fontana
3208da6488 Final changes from PR reviews 2015-03-05 06:38:58 -05:00
Eric Fontana
fb473909e7 Removed all warnings. 2015-03-04 09:50:34 -05:00
Eric Fontana
b7095471fb Added Unit tests for batchCount and enhanced Elasticsearch error reporting a bit. 2015-03-04 09:36:01 -05:00
Eric Fontana
d7fa582191 Re-factored Batchcounter logic into separate class. 2015-03-03 14:07:53 -05:00
Eric Fontana
6ea3e581fd Merge branch 'mitigate_flood' of https://github.com/Cimpress-MCP/TimberWinR.git 2015-03-03 13:25:02 -05:00
Eric Fontana
99b51d240d Updated per PR feedback 2015-03-03 13:24:55 -05:00
Eric Fontana
fd123e3a86 Merge pull request #32 from Cimpress-MCP/fix_redisHostIndex_inc
Move redisHostIndex inc to a finally
2015-03-03 13:01:02 -05:00
Ryan Breen
42741fbe1e Move redisHostIndex inc to a finally
It looks like this will just retry connections to the same host if new RedisClient throws.  Move _redisHostIndex++ to a finally.

Should we also be logging when these happen?
2015-03-03 12:54:48 -05:00
Eric Fontana
cdc2d09150 Added new max_batch_count parameter to mitigate flooding when large bursts occur. Essentially the incoming rate could exceed the outgoing rate and this will mitigate this condition. 2015-03-03 09:55:08 -05:00
Eric Fontana
2e28a50222 Merge branch 'master' of https://github.com/Cimpress-MCP/TimberWinR.git 2015-03-03 08:23:31 -05:00
Eric Fontana
92a9adeca8 Handle quit condition gracefully. 2015-03-03 08:23:25 -05:00
Eric Fontana
91cf59612c Merge pull request #30 from Cimpress-MCP/chrisbaldauf-default-diag-port-readme
Fix default diagnostic port
2015-03-02 12:17:37 -05:00
Chris Baldauf
e0aac878ff Fix default diagnostic port
I noticed that the documented default diagnostics port and the actual were different. From what I could see, 5141 was the default diag port and 5142 was the default UDP port. Feel free to correct me if I'm mistaken.
2015-02-27 13:01:37 -05:00
Eric Fontana
dc89ac996a Added missing batch_count param to the docs. 2015-02-27 11:23:37 -05:00
Eric Fontana
57b29a5425 Fixed high CPU usage problem for non-existent log files. 2015-02-27 07:37:09 -05:00
Eric Fontana
e5237e8e59 Default to current directory 2015-02-16 06:45:11 -05:00
Eric Fontana
fb61a49fe5 Bumped up TailFile interval to 60 seconds and updated docs. 2015-01-29 10:49:02 -05:00
Eric Fontana
775935683f Fixed guid 2015-01-29 10:18:38 -05:00
Eric Fontana
024fa68e34 Removed Vistaprint references 2015-01-29 10:17:46 -05:00
Eric Fontana
4654d7dbc1 Added new TailFiles listener. 2015-01-29 09:36:22 -05:00
Eric Fontana
d1e5224ba3 Added missing logSource to the docs. 2015-01-20 09:57:28 -05:00
Eric Fontana
5a34b687bb Split database into separate file 2015-01-20 09:10:18 -05:00
Eric Fontana
2982482f25 More doc updates 2015-01-20 09:06:36 -05:00
Eric Fontana
4d9aa4fd54 Formatting changes 2015-01-20 08:03:52 -05:00
Eric Fontana
5b0d28ce16 Fixed typo 2015-01-20 06:59:28 -05:00
Eric Fontana
4b255bfd27 More doc updates 2015-01-20 06:59:05 -05:00
Eric Fontana
e7a8ff3eb7 Merge branch 'master' of https://github.com/Cimpress-MCP/TimberWinR.git 2015-01-20 06:56:20 -05:00
Eric Fontana
3f227e0914 Fixed codec doc (was missing negate) 2015-01-20 06:56:15 -05:00
Eric Fontana
42301b5c9f Merge pull request #26 from Cimpress-MCP/mutate_example_json_fix
Update MutateFilter.md
2015-01-20 06:51:23 -05:00
52 changed files with 55643 additions and 634 deletions

View File

@@ -23,6 +23,9 @@ Please use the TimberWinR Google Group for discussion and support:
https://groups.google.com/forum/#!forum/timberwinr
Latest Build:
![alt tag](https://ci.appveyor.com/api/projects/status/github/Cimpress-MCP/TimberWinR)
## Inputs
The current supported Input format sources are:
@@ -33,6 +36,11 @@ The current supported Input format sources are:
5. [Stdin](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/StdinInput.md) (Standard Input for Debugging)
6. [W3C](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/W3CInput.md)(Internet Information Services W3C Advanced/Custom Format)
7. [Udp](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/UdpInput.md) (listens for UDP on port for JSON messages)
8. [TailFiles](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/TailFiles.md) (Tails log files efficiently *New*)
## Codecs
The current list of supported codecs are:
1. [Multiline](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md)
## Filters
The current list of supported filters are:
@@ -62,7 +70,7 @@ A single Json filter using the single tag (this is only provided as a convienien
]
```
Multiple Json filters must use the jsonFilters and array syntax
Multiple Json filters must use the jsonFilters and array syntax, also mutateFilters, grokFilters, dateFilters, geoipFilters.
```json
"Filters": [
{
@@ -200,8 +208,8 @@ Options:
-configFile: Specifies the path to the JSON config file, or directory which contains .json file(s).
Default is -configFile:default.json
-diagnosticPort: Specifies the diagnostic port which can be used to get a health check of the service.
Default Port is 5142, A value of 0 will disable it. Open a browser
http://localhost:5142
Default Port is 5141, A value of 0 will disable it. Open a browser
http://localhost:5141
```
#### -configFile
This may be a single .json file or a directory containing .json file(s). If it is a directory, all
@@ -213,7 +221,7 @@ If you really just want to try it out, grab the binary distribution, extract the
into a directory, e.g. C:\TimberWinR
Grab the [JSON example file](https://raw.githubusercontent.com/efontana/TimberWinR/master/TimberWinR.ServiceHost/default.json) and place it into C:\TimberWinR\default.json.
Edit the default.json file and change the Redis instance to match yours, replace 'tstlexiceapp006.vistaprint.svc' with the IP or DNS name
Edit the default.json file and change the Redis instance to match yours, replace 'tstlexiceapp006.mycompany.svc' with the IP or DNS name
of the machine running redis. Fire up the collector, enable the verbose debugging to see some Windows Events.
```

View File

@@ -99,9 +99,7 @@ namespace TimberWinR.ExtractID
Console.WriteLine("Updated {0} ProductID: {1}", args[2], productCode);
return 0;
}
Console.Error.WriteLine("Failed for some reason");
}
}
}
}

View File

@@ -38,7 +38,8 @@ namespace TimberWinR.ServiceHost
serviceConfigurator.WhenStarted(myService => myService.Start());
serviceConfigurator.WhenStopped(myService => myService.Stop());
});
hostConfigurator.AddCommandLineDefinition("liveMonitor", c => arguments.LiveMonitor = bool.Parse(c.ToString()));
hostConfigurator.AddCommandLineDefinition("configFile", c => arguments.ConfigFile = c);
hostConfigurator.AddCommandLineDefinition("logLevel", c => arguments.LogLevel = c);
hostConfigurator.AddCommandLineDefinition("logDir", c => arguments.LogfileDir = c);
@@ -60,6 +61,7 @@ namespace TimberWinR.ServiceHost
AddServiceParameter("-configFile", arguments.ConfigFile);
AddServiceParameter("-logLevel", arguments.LogLevel);
AddServiceParameter("-logDir", arguments.LogfileDir);
AddServiceParameter("-liveMonitor", arguments.LiveMonitor);
if (arguments.DiagnosticPort > 0)
AddServiceParameter("-diagnosticPort", arguments.DiagnosticPort);
}
@@ -68,8 +70,7 @@ namespace TimberWinR.ServiceHost
}
private static void AddServiceParameter(string paramName, string value)
{
{
string currentValue = Registry.GetValue(KeyPath, KeyName, "").ToString();
if (!string.IsNullOrEmpty(paramName) && !currentValue.Contains(string.Format("{0} ", paramName)))
@@ -80,8 +81,7 @@ namespace TimberWinR.ServiceHost
}
private static void AddServiceParameter(string paramName, int value)
{
{
string currentValue = Registry.GetValue(KeyPath, KeyName, "").ToString();
if (!string.IsNullOrEmpty(paramName) && !currentValue.Contains(string.Format("{0}:", paramName)))
@@ -91,6 +91,16 @@ namespace TimberWinR.ServiceHost
}
}
private static void AddServiceParameter(string paramName, bool value)
{
string currentValue = Registry.GetValue(KeyPath, KeyName, "").ToString();
if (!string.IsNullOrEmpty(paramName) && !currentValue.Contains(string.Format("{0}:", paramName)))
{
currentValue += string.Format(" {0} \"{1}\"", paramName, value.ToString());
Registry.SetValue(KeyPath, KeyName, currentValue);
}
}
}
internal class Arguments
@@ -99,9 +109,10 @@ namespace TimberWinR.ServiceHost
public string LogLevel { get; set; }
public string LogfileDir { get; set; }
public int DiagnosticPort { get; set; }
public bool LiveMonitor { get; set; }
public Arguments()
{
LiveMonitor = false;
DiagnosticPort = 5141;
ConfigFile = "default.json";
LogLevel = "Info";
@@ -147,7 +158,7 @@ namespace TimberWinR.ServiceHost
/// </summary>
private void RunService()
{
_manager = new TimberWinR.Manager(_args.ConfigFile, _args.LogLevel, _args.LogfileDir, _cancellationToken);
_manager = new TimberWinR.Manager(_args.ConfigFile, _args.LogLevel, _args.LogfileDir, _args.LiveMonitor, _cancellationToken);
if (_args.DiagnosticPort > 0)
_diags = new Diagnostics.Diagnostics(_manager, _cancellationToken, _args.DiagnosticPort);
}

View File

@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.3.19.0")]
[assembly: AssemblyFileVersion("1.3.19.0")]
[assembly: AssemblyVersion("1.3.20.0")]
[assembly: AssemblyFileVersion("1.3.20.0")]

View File

@@ -90,7 +90,7 @@
"interval": 5000,
"batch_count": 500,
"host": [
"tstlexiceapp006.vistaprint.svc"
"tstlexiceapp006.mycompany.svc"
]
}
],
@@ -99,7 +99,7 @@
"threads": 1,
"interval": 5000,
"host": [
"tstlexiceapp003.vistaprint.svc"
"tstlexiceapp003.mycompany.svc"
]
}
]

View File

@@ -33,7 +33,7 @@
"_comment": "Change the host to your Redis instance",
"port": 6379,
"host": [
"logaggregator.vistaprint.svc"
"logaggregator.mycompany.svc"
]
}
]

View File

@@ -85,7 +85,7 @@ namespace TimberWinR.UnitTests
[{
""host"":
[
""logaggregator.vistaprint.svc""
""logaggregator.mycompany.svc""
]
}]
}
@@ -94,7 +94,7 @@ namespace TimberWinR.UnitTests
Configuration c = Configuration.FromString(redisJson);
RedisOutput redis = c.RedisOutputs.First() as RedisOutput;
RedisOutputParameters redis = c.RedisOutputs.First() as RedisOutputParameters;
Assert.IsTrue(redis.Host.Length >= 1);
}

View File

@@ -0,0 +1,126 @@
using System;
using System.IO;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using TimberWinR.Parser;
using System.Text;
using System.Collections.Generic;
namespace TimberWinR.UnitTests
{
// Class which implements a Fake redis server for test purposes.
class FakeRediServer
{
private readonly System.Net.Sockets.TcpListener _tcpListenerV4;
private readonly System.Net.Sockets.TcpListener _tcpListenerV6;
private Thread _listenThreadV4;
private Thread _listenThreadV6;
private readonly int _port;
private CancellationToken _cancelToken;
private bool _shutdown;
public FakeRediServer(CancellationToken cancelToken, int port = 6379)
{
_port = port;
_cancelToken = cancelToken;
_shutdown = false;
_tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port);
_tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port);
_listenThreadV4 = new Thread(new ParameterizedThreadStart(ListenForClients));
_listenThreadV4.Start(_tcpListenerV4);
_listenThreadV6 = new Thread(new ParameterizedThreadStart(ListenForClients));
_listenThreadV6.Start(_tcpListenerV6);
}
public void Shutdown()
{
_shutdown = true;
this._tcpListenerV4.Stop();
this._tcpListenerV6.Stop();
}
private void ListenForClients(object olistener)
{
System.Net.Sockets.TcpListener listener = olistener as System.Net.Sockets.TcpListener;
listener.Start();
while (!_cancelToken.IsCancellationRequested && !_shutdown)
{
try
{
//blocks until a client has connected to the server
TcpClient client = listener.AcceptTcpClient();
// Wait for a client, spin up a thread.
var clientThread = new Thread(new ParameterizedThreadStart(HandleNewClient));
clientThread.Start(client);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.Interrupted)
break;
}
}
}
private void HandleNewClient(object client)
{
var tcpClient = (TcpClient)client;
try
{
NetworkStream clientStream = tcpClient.GetStream();
int i;
Byte[] bytes = new Byte[16535];
String data = null;
do
{
try
{
// Loop to receive all the data sent by the client.
while ((i = clientStream.Read(bytes, 0, bytes.Length)) != 0)
{
// Translate data bytes to a ASCII string.
data = System.Text.Encoding.ASCII.GetString(bytes, 0, i);
//System.Diagnostics.Debug.WriteLine(String.Format("Received: {0}", data));
// Process the data sent by the client.
data = ":1000\r\n";
byte[] msg = System.Text.Encoding.ASCII.GetBytes(data);
// Send back a response.
clientStream.Write(msg, 0, msg.Length);
// System.Diagnostics.Debug.WriteLine(String.Format("Sent: {0}", data));
}
}
catch (IOException)
{
}
} while (true);
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine(ex.ToString());
}
tcpClient.Close();
}
private void ProcessJson(JObject json)
{
Console.WriteLine(json.ToString());
}
}
}

View File

@@ -22,7 +22,7 @@ namespace TimberWinR.UnitTests
{"Index", 7},
{"Text", null},
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"}
{"ComputerName", "dev.mycompany.net"}
};
string grokJson = @"{
@@ -55,7 +55,7 @@ namespace TimberWinR.UnitTests
Assert.IsTrue(grok.Apply(json));
// Verify host field added
Assert.AreEqual(json["host"].ToString(), "dev.vistaprint.net");
Assert.AreEqual(json["host"].ToString(), "dev.mycompany.net");
// Verify two tags added
Assert.AreEqual(json["tags"][0].ToString(), "rn_7");
@@ -71,7 +71,7 @@ namespace TimberWinR.UnitTests
{"Index", 7},
{"Text", "crap"},
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"}
{"ComputerName", "dev.mycompany.net"}
};
string grokJson = @"{
@@ -123,7 +123,7 @@ namespace TimberWinR.UnitTests
}
},
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"}
{"ComputerName", "dev.mycompany.net"}
};
JObject json2 = new JObject
@@ -140,7 +140,7 @@ namespace TimberWinR.UnitTests
}
},
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"}
{"ComputerName", "dev.mycompany.net"}
};
@@ -223,7 +223,7 @@ namespace TimberWinR.UnitTests
}
},
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"}
{"ComputerName", "dev.mycompany.net"}
};
string grokJson1 = @"{
@@ -311,7 +311,7 @@ namespace TimberWinR.UnitTests
}
},
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"}
{"ComputerName", "dev.mycompany.net"}
};
string grokJson = @"{

View File

@@ -20,14 +20,14 @@ namespace TimberWinR.UnitTests
JObject jsonInputLine1 = new JObject
{
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"},
{"ComputerName", "dev.mycompany.net"},
{"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"}
};
JObject jsonInputLine2 = new JObject
{
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"},
{"ComputerName", "dev.mycompany.net"},
{"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"}
};

View File

@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
@@ -18,7 +17,7 @@ namespace TimberWinR.UnitTests
[TestFixture]
public class MultilineTests
{
// [Test(Description = "Test using next")]
// [Test(Description = "Test using next")]
public void TestMultiline1()
{
using (StreamReader sr = new StreamReader("Multiline1.txt"))
@@ -29,10 +28,10 @@ namespace TimberWinR.UnitTests
Stdin sin = new Stdin();
sin.Codec = new Codec();
sin.Codec.Pattern = "\\\\$";
sin.Codec.What = Codec.WhatType.next;
sin.Codec.Type = Codec.CodecType.multiline;
sin.CodecArguments = new CodecArguments();
sin.CodecArguments.Pattern = "\\\\$";
sin.CodecArguments.What = CodecArguments.WhatType.next;
sin.CodecArguments.Type = CodecArguments.CodecType.multiline;
var cancelTokenSource = new CancellationTokenSource();
@@ -52,7 +51,7 @@ namespace TimberWinR.UnitTests
if (!cancelTokenSource.Token.IsCancellationRequested)
syncHandle.Wait(TimeSpan.FromSeconds(10000), cancelTokenSource.Token);
}
catch (OperationCanceledException oex)
catch (OperationCanceledException)
{
}
}
@@ -67,7 +66,7 @@ namespace TimberWinR.UnitTests
}
}
// [Test(Description = "Test using previous")]
// [Test(Description = "Test using previous")]
public void TestMultiline2()
{
using (StreamReader sr = new StreamReader("Multiline2.txt"))
@@ -78,11 +77,11 @@ namespace TimberWinR.UnitTests
Stdin sin = new Stdin();
sin.Codec = new Codec();
sin.Codec.Pattern = "^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2},\\d{3})(.*)$";
sin.Codec.What = Codec.WhatType.previous;
sin.Codec.Type = Codec.CodecType.multiline;
sin.Codec.Negate = true;
sin.CodecArguments = new CodecArguments();
sin.CodecArguments.Pattern = "^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2},\\d{3})(.*)$";
sin.CodecArguments.What = CodecArguments.WhatType.previous;
sin.CodecArguments.Type = CodecArguments.CodecType.multiline;
sin.CodecArguments.Negate = true;
var cancelTokenSource = new CancellationTokenSource();
@@ -102,7 +101,7 @@ namespace TimberWinR.UnitTests
if (!cancelTokenSource.Token.IsCancellationRequested)
syncHandle.Wait(TimeSpan.FromSeconds(10000), cancelTokenSource.Token);
}
catch (OperationCanceledException oex)
catch (OperationCanceledException)
{
}
}

View File

@@ -10,12 +10,12 @@
public class ElasticsearchOutputTests
{
private ElasticsearchOutput parser;
private ElasticsearchOutputParameters parser;
[SetUp]
public void Setup()
{
this.parser = new ElasticsearchOutput();
this.parser = new ElasticsearchOutputParameters();
}
[Test]

View File

@@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using NUnit.Framework;
using TimberWinR.Inputs;
using TimberWinR.Parser;
using Newtonsoft.Json.Linq;
using System.Diagnostics;
namespace TimberWinR.UnitTests
{
[TestFixture]
public class TailFileTests
{
[Test]
public void TestTailFile()
{
List<JObject> events = new List<JObject>();
if (File.Exists(".timberwinrdb"))
File.Delete(".timberwinrdb");
var mgr = new Manager();
mgr.LogfileDir = ".";
var tf = new TailFile();
var cancelTokenSource = new CancellationTokenSource();
tf.Location = "TestTailFile1.log";
if (File.Exists(tf.Location))
File.Delete(tf.Location);
try
{
var listener = new TailFileListener(tf, cancelTokenSource.Token);
listener.OnMessageRecieved += o =>
{
events.Add(o);
if (events.Count >= 100)
cancelTokenSource.Cancel();
};
GenerateLogFile(tf.Location);
bool createdFile = false;
while (!listener.Stop && !cancelTokenSource.IsCancellationRequested)
{
Thread.Sleep(100);
if (!createdFile)
{
GenerateLogFile(tf.Location);
createdFile = true;
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Done!");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Assert.AreEqual(100, events.Count);
}
}
private static void GenerateLogFile(string fileName)
{
using (System.IO.StreamWriter file = new System.IO.StreamWriter(fileName))
{
for (int i = 0; i < 100; i++)
{
file.WriteLine("Log Line Number {0}", i);
}
}
}
}
}

View File

@@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using NUnit.Framework;
using TimberWinR.Parser;
using Newtonsoft.Json.Linq;
using System.Threading;
namespace TimberWinR.UnitTests
{
[TestFixture]
public class TestDynamicBatchCount
{
// [Test]
public void TestDynamicBatch()
{
var mgr = new Manager();
mgr.LogfileDir = ".";
mgr.Config = new Configuration();
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
var cancelToken = cancelTokenSource.Token;
FakeRediServer fr = new FakeRediServer(cancelToken);
var redisParams = new RedisOutputParameters();
redisParams.BatchCount = 10;
redisParams.MaxBatchCount = 40;
redisParams.Interval = 100;
var redisOutput = new Outputs.RedisOutput(mgr, redisParams, cancelToken);
// Message is irrelavant
JObject jsonMessage = new JObject
{
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"},
{"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"}
};
// Send 1000 messages at max throttle
for (int i = 0; i < 1000; i++)
{
Thread.Sleep(10);
redisOutput.Startup(jsonMessage);
}
while (redisOutput.SentMessages < 1000)
{
System.Diagnostics.Debug.WriteLine(redisOutput.SentMessages);
Thread.Sleep(1000);
}
fr.Shutdown();
cancelTokenSource.Cancel();
System.Diagnostics.Debug.WriteLine(redisOutput.ToJson());
System.Diagnostics.Debug.WriteLine(redisOutput.QueueDepth);
JObject json = redisOutput.ToJson();
var mbc = json["redis"]["reachedMaxBatchCountTimes"].Value<int>();
var sm = json["redis"]["sentMessageCount"].Value<int>();
var errs = json["redis"]["errors"].Value<int>();
var cbc = json["redis"]["currentBatchCount"].Value<int>();
// No errors
Assert.AreEqual(0, errs);
// Should have reached max at least 1 time
Assert.GreaterOrEqual(mbc, 1);
// Should have sent 1000 messages
Assert.AreEqual(1000, sm);
// Should reset back down to original
Assert.AreEqual(cbc, 10);
}
}
}

View File

@@ -60,6 +60,7 @@
<ItemGroup>
<Compile Include="Configuration.cs" />
<Compile Include="DateFilterTests.cs" />
<Compile Include="FakeRediServer.cs" />
<Compile Include="GeoIPFilterTests.cs" />
<Compile Include="Inputs\IisW3CRowReaderTests.cs" />
<Compile Include="JsonFilterTests.cs" />
@@ -67,7 +68,9 @@
<Compile Include="MultilineTests.cs" />
<Compile Include="Parser\ElasticsearchOutputTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TailFileTests.cs" />
<Compile Include="TestBase.cs" />
<Compile Include="TestDynamicBatchCount.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\TimberWinR\TimberWinR.csproj">

View File

@@ -21,6 +21,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
chocolateyInstall.ps1.template = chocolateyInstall.ps1.template
chocolateyUninstall.ps1.guid = chocolateyUninstall.ps1.guid
chocolateyUninstall.ps1.template = chocolateyUninstall.ps1.template
chocolateyUninstall.ps1.template.orig = chocolateyUninstall.ps1.template.orig
LICENSE.txt = LICENSE.txt
Performance1.psess = Performance1.psess
README.md = README.md

View File

@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using Newtonsoft.Json.Linq;
using TimberWinR.Inputs;
using TimberWinR.Parser;
namespace TimberWinR.Codecs
{
public class Multiline : ICodec
{
private CodecArguments _codecArguments;
private List<string> _multiline { get; set; }
// return true to cancel codec
public Multiline(CodecArguments args)
{
_codecArguments = args;
}
public void Apply(string msg, InputListener listener)
{
if (_codecArguments.Re == null)
_codecArguments.Re = new Regex(_codecArguments.Pattern);
Match match = _codecArguments.Re.Match(msg);
bool isMatch = (match.Success && !_codecArguments.Negate) || (!match.Success && _codecArguments.Negate);
switch (_codecArguments.What)
{
case CodecArguments.WhatType.previous:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No Match
{
if (_multiline != null)
{
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codecArguments.MultilineTag));
listener.AddDefaultFields(jo);
listener.ProcessJson(jo);
}
_multiline = new List<string>();
_multiline.Add(msg);
}
break;
case CodecArguments.WhatType.next:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No match
{
if (_multiline != null)
{
_multiline.Add(msg);
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codecArguments.MultilineTag));
listener.AddDefaultFields(jo);
listener.ProcessJson(jo);
}
else
{
JObject jo = new JObject();
jo["message"] = msg;
listener.AddDefaultFields(jo);
listener.ProcessJson(jo);
}
}
break;
}
}
}
}

View File

@@ -4,6 +4,8 @@ using System.Data.Odbc;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using System.Xml;
using System.Xml.Linq;
@@ -16,69 +18,80 @@ using Newtonsoft.Json.Linq;
using TimberWinR.Inputs;
using TimberWinR.Filters;
using NLog;
using TimberWinR.Parser;
using Topshelf.Configurators;
using IISW3CLog = TimberWinR.Parser.IISW3CLog;
using WindowsEvent = TimberWinR.Parser.WindowsEvent;
namespace TimberWinR
{
public class Configuration
{
{
private CancellationToken _cancelToken;
private FileSystemWatcher _dirWatcher;
private Manager _manager;
private List<WindowsEvent> _events = new List<WindowsEvent>();
public IEnumerable<WindowsEvent> Events
{
get { return _events; }
}
private List<RedisOutput> _redisOutputs = new List<RedisOutput>();
public IEnumerable<RedisOutput> RedisOutputs
private List<RedisOutputParameters> _redisOutputs = new List<RedisOutputParameters>();
public IEnumerable<RedisOutputParameters> RedisOutputs
{
get { return _redisOutputs; }
}
private List<ElasticsearchOutput> _elasticsearchOutputs = new List<ElasticsearchOutput>();
public IEnumerable<ElasticsearchOutput> ElasticsearchOutputs
private List<ElasticsearchOutputParameters> _elasticsearchOutputs = new List<ElasticsearchOutputParameters>();
public IEnumerable<ElasticsearchOutputParameters> ElasticsearchOutputs
{
get { return _elasticsearchOutputs; }
}
private List<StdoutOutput> _stdoutOutputs = new List<StdoutOutput>();
public IEnumerable<StdoutOutput> StdoutOutputs
private List<StdoutOutputParameters> _stdoutOutputs = new List<StdoutOutputParameters>();
public IEnumerable<StdoutOutputParameters> StdoutOutputs
{
get { return _stdoutOutputs; }
}
private List<Tcp> _tcps = new List<Tcp>();
public IEnumerable<Tcp> Tcps
private List<TcpParameters> _tcps = new List<TcpParameters>();
public IEnumerable<TcpParameters> Tcps
{
get { return _tcps; }
}
private List<Udp> _udps = new List<Udp>();
public IEnumerable<Udp> Udps
private List<UdpParameters> _udps = new List<UdpParameters>();
public IEnumerable<UdpParameters> Udps
{
get { return _udps; }
}
private List<Log> _logs = new List<Log>();
public IEnumerable<Log> Logs
private List<LogParameters> _logs = new List<LogParameters>();
public IEnumerable<LogParameters> Logs
{
get { return _logs; }
}
}
private List<IISW3CLog> _iisw3clogs = new List<IISW3CLog>();
private List<TailFile> _tails = new List<TailFile>();
public IEnumerable<TailFile> TailFiles
{
get { return _tails; }
}
public IEnumerable<IISW3CLog> IISW3C
private List<IISW3CLogParameters> _iisw3clogs = new List<IISW3CLogParameters>();
public IEnumerable<IISW3CLogParameters> IISW3C
{
get { return _iisw3clogs; }
}
private List<W3CLog> _w3clogs = new List<W3CLog>();
private List<W3CLogParameters> _w3clogs = new List<W3CLogParameters>();
public IEnumerable<W3CLog> W3C
public IEnumerable<W3CLogParameters> W3C
{
get { return _w3clogs; }
}
@@ -96,11 +109,87 @@ namespace TimberWinR
get { return _filters; }
}
private void MonitorDirectory(string directoryToWatch, CancellationToken cancelToken, Manager manager)
{
_manager = manager;
_cancelToken = cancelToken;
if (_dirWatcher == null)
{
_dirWatcher = new FileSystemWatcher();
_dirWatcher.Path = directoryToWatch;
_dirWatcher.NotifyFilter = NotifyFilters.LastAccess | NotifyFilters.LastWrite | NotifyFilters.FileName | NotifyFilters.DirectoryName;
// Only watch json files.
_dirWatcher.Filter = "*.json";
_dirWatcher.Created += DirWatcherOnCreated;
_dirWatcher.Changed += DirWatcherOnChanged;
_dirWatcher.Renamed += DirWatcherOnRenamed;
_dirWatcher.EnableRaisingEvents = true;
}
}
public static Configuration FromDirectory(string jsonDirectory)
private void DirWatcherOnRenamed(object sender, RenamedEventArgs e)
{
// The Renamed file could be a different name from .json
FileInfo fi = new FileInfo(e.FullPath);
if (fi.Extension == ".json")
{
LogManager.GetCurrentClassLogger().Info("File: OnRenamed " + e.FullPath + " " + e.ChangeType);
ProcessNewJson(e.FullPath);
}
}
private void DirWatcherOnCreated(object sender, FileSystemEventArgs e)
{
FileInfo fi = new FileInfo(e.FullPath);
if (fi.Extension == ".json")
{
LogManager.GetCurrentClassLogger().Info("File: OnCreated " + e.FullPath + " " + e.ChangeType);
ProcessNewJson(e.FullPath);
}
}
private void DirWatcherOnChanged(object sender, FileSystemEventArgs e)
{
FileInfo fi = new FileInfo(e.FullPath);
if (fi.Extension == ".json")
{
// Specify what is done when a file is changed, created, or deleted.
LogManager.GetCurrentClassLogger()
.Info("File: OnChanged " + e.ChangeType.ToString() + " " + e.FullPath + " " + e.ChangeType);
ProcessNewJson(e.FullPath);
}
}
private void ProcessNewJson(string fileName)
{
try
{
Configuration c = new Configuration();
var config = Configuration.FromFile(fileName, c);
_manager.ProcessConfiguration(_cancelToken, config);
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
}
private void ShutdownDirectoryMonitor()
{
_dirWatcher.EnableRaisingEvents = false;
LogManager.GetCurrentClassLogger().Info("Stopping Directory Monitor");
}
private void DirectoryWatcher(string directoryToWatch)
{
LogManager.GetCurrentClassLogger().Info("Starting Directory Monitor {0}", directoryToWatch);
}
public static Configuration FromDirectory(string jsonDirectory, CancellationToken cancelToken, Manager manager)
{
Configuration c = null;
foreach (string jsonConfFile in Directory.GetFiles(jsonDirectory, "*.json"))
{
if (!string.IsNullOrEmpty(jsonConfFile))
@@ -109,6 +198,10 @@ namespace TimberWinR
}
}
// Startup Directory Monitor
if (manager.LiveMonitor)
c.MonitorDirectory(jsonDirectory, cancelToken, manager);
return c;
}
@@ -148,6 +241,8 @@ namespace TimberWinR
c._stdins.AddRange(x.TimberWinR.Inputs.Stdins.ToList());
if (x.TimberWinR.Inputs.Logs != null)
c._logs.AddRange(x.TimberWinR.Inputs.Logs.ToList());
if (x.TimberWinR.Inputs.TailFiles != null)
c._tails.AddRange(x.TimberWinR.Inputs.TailFiles.ToList());
if (x.TimberWinR.Inputs.Tcps != null)
c._tcps.AddRange(x.TimberWinR.Inputs.Tcps.ToList());
if (x.TimberWinR.Inputs.Udps != null)
@@ -194,13 +289,13 @@ namespace TimberWinR
{
_filters = new List<LogstashFilter>();
_events = new List<WindowsEvent>();
_iisw3clogs = new List<IISW3CLog>();
_logs = new List<Log>();
_redisOutputs = new List<RedisOutput>();
_elasticsearchOutputs = new List<ElasticsearchOutput>();
_stdoutOutputs = new List<StdoutOutput>();
_tcps = new List<Tcp>();
_udps = new List<Udp>();
_iisw3clogs = new List<IISW3CLogParameters>();
_logs = new List<LogParameters>();
_redisOutputs = new List<RedisOutputParameters>();
_elasticsearchOutputs = new List<ElasticsearchOutputParameters>();
_stdoutOutputs = new List<StdoutOutputParameters>();
_tcps = new List<TcpParameters>();
_udps = new List<UdpParameters>();
}
public static Object GetPropValue(String name, Object obj)

13
TimberWinR/ICodec.cs Normal file
View File

@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using TimberWinR.Inputs;
namespace TimberWinR
{
public interface ICodec
{
void Apply(string msg, InputListener listener);
}
}

View File

@@ -16,12 +16,12 @@ namespace TimberWinR.Inputs
public class IISW3CInputListener : InputListener
{
private readonly int _pollingIntervalInSeconds;
private readonly Parser.IISW3CLog _arguments;
private readonly Parser.IISW3CLogParameters _arguments;
private long _receivedMessages;
public bool Stop { get; set; }
private IisW3CRowReader rowReader;
public IISW3CInputListener(Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
public IISW3CInputListener(Parser.IISW3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
: base(cancelToken, "Win32-IISLog")
{
_arguments = arguments;
@@ -138,7 +138,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -80,7 +80,7 @@ namespace TimberWinR.Inputs
}
}
protected virtual void AddDefaultFields(JObject json)
public virtual void AddDefaultFields(JObject json)
{
if (json["type"] == null)
json.Add(new JProperty("type", _typeName));
@@ -100,7 +100,7 @@ namespace TimberWinR.Inputs
json.Add(new JProperty("UtcTimestamp", utc.ToString("o")));
}
protected void ProcessJson(JObject json)
public void ProcessJson(JObject json)
{
if (OnMessageRecieved != null)
{

View File

@@ -0,0 +1,207 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using Newtonsoft.Json;
using NLog;
using TimberWinR.Parser;
namespace TimberWinR.Inputs
{
//
// Maintain persistent state for Log files (to be used across restarts)
//
public class LogsFileDatabase
{
private static readonly object _locker = new object();
private List<LogsFileDatabaseEntry> Entries { get; set; }
private string DatabaseDirectory { get; set; }
public string DatabaseFileName
{
get { return Path.Combine(DatabaseDirectory, ".timberwinrdb"); }
}
public static Manager Manager { get; set; }
private static LogsFileDatabase instance;
private bool ExistingFile(string logName)
{
lock (_locker)
{
return ExistingFileTest(logName);
}
}
private LogsFileDatabaseEntry FindFile(string logName)
{
lock (_locker)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
return existingEntry;
}
}
private bool ExistingFileTest(string logName)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
return existingEntry != null;
}
private void RemoveFileEntry(string logName)
{
lock (_locker)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
if (existingEntry != null)
{
Entries.Remove(existingEntry);
WriteDatabaseFileNoLock();
}
}
}
private LogsFileDatabaseEntry AddFileEntry(string logName)
{
var de = new LogsFileDatabaseEntry();
lock (_locker)
{
de.NewFile = true;
var fi = new FileInfo(logName);
de.FileName = logName;
de.Size = fi.Length;
de.SampleTime = DateTime.UtcNow;
de.CreationTimeUtc = fi.CreationTimeUtc;
Entries.Add(de);
WriteDatabaseFileNoLock();
}
return de;
}
public static LogsFileDatabaseEntry LookupLogFile(string logName)
{
LogsFileDatabaseEntry dbe = Instance.FindFile(logName);
if (dbe == null)
dbe = Instance.AddFileEntry(logName);
else
dbe.NewFile = false;
return dbe;
}
public static void Update(LogsFileDatabaseEntry dbe)
{
Instance.UpdateEntry(dbe);
}
private void UpdateEntry(LogsFileDatabaseEntry dbe)
{
lock(_locker)
{
var fi = new FileInfo(dbe.FileName);
dbe.CreationTimeUtc = fi.CreationTimeUtc;
dbe.SampleTime = DateTime.UtcNow;
dbe.Size = fi.Length;
WriteDatabaseFileNoLock();
}
}
public static LogsFileDatabase Instance
{
get
{
if (instance == null)
{
instance = new LogsFileDatabase(Manager.LogfileDir);
lock (_locker)
{
if (!Directory.Exists(instance.DatabaseDirectory))
Directory.CreateDirectory(instance.DatabaseDirectory);
// If it exists, read the current state, otherwise create an empty database.
if (File.Exists(instance.DatabaseFileName))
instance.ReadDatabaseNoLock();
else
instance.WriteDatabaseFileNoLock();
}
}
return instance;
}
}
private void ReadDatabaseNoLock()
{
try
{
var serializer = new JsonSerializer();
if (File.Exists(DatabaseFileName))
Entries =
JsonConvert.DeserializeObject<List<LogsFileDatabaseEntry>>(File.ReadAllText(DatabaseFileName));
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger()
.Error("Error reading database '{0}': {1}", DatabaseFileName, ex.ToString());
try
{
if (File.Exists(DatabaseFileName))
File.Delete(DatabaseFileName);
LogManager.GetCurrentClassLogger().Info("Creating New Database '{0}'", DatabaseFileName);
WriteDatabaseLock();
}
catch (Exception ex2)
{
LogManager.GetCurrentClassLogger().Info("Error Creating New Database '{0}': {1}", DatabaseFileName, ex2.ToString());
}
}
}
private void WriteDatabaseFileNoLock()
{
try
{
File.WriteAllText(DatabaseFileName, JsonConvert.SerializeObject(instance.Entries), Encoding.UTF8);
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger()
.Error("Error saving database '{0}': {1}", DatabaseFileName, ex.ToString());
}
}
private void ReadDatabaseLock()
{
lock (_locker)
{
ReadDatabaseNoLock();
}
}
private void WriteDatabaseLock()
{
lock (_locker)
{
WriteDatabaseFileNoLock();
}
}
private LogsFileDatabase(string databaseDirectory)
{
DatabaseDirectory = databaseDirectory;
Entries = new List<LogsFileDatabaseEntry>();
}
}
public class LogsFileDatabaseEntry
{
[JsonIgnore]
public bool NewFile { get; set; }
public string FileName { get; set; }
public Int64 MaxRecords { get; set; }
public DateTime CreationTimeUtc { get; set; }
public DateTime SampleTime { get; set; }
public long Size { get; set; }
}
}

View File

@@ -14,7 +14,7 @@ using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using NLog;
using TimberWinR.Codecs;
using LogQuery = Interop.MSUtil.LogQueryClassClass;
using TextLineInputFormat = Interop.MSUtil.COMTextLineInputContextClass;
using LogRecordSet = Interop.MSUtil.ILogRecordset;
@@ -22,169 +22,34 @@ using TimberWinR.Parser;
namespace TimberWinR.Inputs
{
public class LogsFileDatabase
{
private static readonly object _locker = new object();
private List<LogsFileDatabaseEntry> Entries { get; set; }
private string DatabaseDirectory { get; set; }
public string DatabaseFileName
{
get { return Path.Combine(DatabaseDirectory, ".timberwinrdb"); }
}
public static Manager Manager { get; set; }
private static LogsFileDatabase instance;
private bool ExistingFile(string logName)
{
lock (_locker)
{
return ExistingFileTest(logName);
}
}
private bool ExistingFileTest(string logName)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
return existingEntry != null;
}
private void RemoveFileEntry(string logName)
{
lock (_locker)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
if (existingEntry != null)
{
Entries.Remove(existingEntry);
WriteDatabaseFileNoLock();
}
}
}
private LogsFileDatabaseEntry AddFileEntry(string logName, TextLineInputFormat fmt)
{
LogsFileDatabaseEntry de = new LogsFileDatabaseEntry();
lock (_locker)
{
var lq = new LogQuery();
FileInfo fi = new FileInfo(logName);
de.FileName = logName;
de.Size = fi.Length;
de.SampleTime = DateTime.UtcNow;
de.CreationTime = fi.CreationTimeUtc;
if (fi.Exists)
{
var qcount = string.Format("SELECT max(Index) as MaxRecordNumber FROM {0}", logName);
var rcount = lq.Execute(qcount, fmt);
var qr = rcount.getRecord();
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
de.MaxRecords = lrn;
}
Entries.Add(de);
WriteDatabaseFileNoLock();
}
return de;
}
public static LogsFileDatabaseEntry AddLogFile(string logName, TextLineInputFormat fmt)
{
Instance.RemoveFileEntry(logName); // Remove if already exists, otherwise ignores.
return Instance.AddFileEntry(logName, fmt);
}
public static LogsFileDatabase Instance
{
get
{
if (instance == null)
{
instance = new LogsFileDatabase(Manager.LogfileDir);
lock (_locker)
{
if (!Directory.Exists(instance.DatabaseDirectory))
{
Directory.CreateDirectory(instance.DatabaseDirectory);
}
if (File.Exists(instance.DatabaseFileName))
instance.ReadDatabaseNoLock();
else
instance.WriteDatabaseFileNoLock();
}
}
return instance;
}
}
private void ReadDatabaseNoLock()
{
JsonSerializer serializer = new JsonSerializer();
if (File.Exists(DatabaseFileName))
Entries = JsonConvert.DeserializeObject<List<LogsFileDatabaseEntry>>(File.ReadAllText(DatabaseFileName));
}
private void WriteDatabaseFileNoLock()
{
File.WriteAllText(DatabaseFileName, JsonConvert.SerializeObject(instance.Entries), Encoding.UTF8);
}
private void ReadDatabaseLock()
{
lock (_locker)
{
ReadDatabaseNoLock();
}
}
private void WriteDatabaseLock()
{
lock (_locker)
{
WriteDatabaseFileNoLock();
}
}
private LogsFileDatabase(string databaseDirectory)
{
DatabaseDirectory = databaseDirectory;
Entries = new List<LogsFileDatabaseEntry>();
}
}
public class LogsFileDatabaseEntry
{
public string FileName { get; set; }
public Int64 MaxRecords { get; set; }
public DateTime CreationTime { get; set; }
public DateTime SampleTime { get; set; }
public long Size { get; set; }
}
/// <summary>
/// Tail a file.
/// </summary>
public class LogsListener : InputListener
{
private int _pollingIntervalInSeconds;
private TimberWinR.Parser.Log _arguments;
private TimberWinR.Parser.LogParameters _arguments;
private long _receivedMessages;
private Dictionary<string, Int64> _logFileMaxRecords;
private Dictionary<string, DateTime> _logFileCreationTimes;
private Dictionary<string, DateTime> _logFileSampleTimes;
private Dictionary<string, long> _logFileSizes;
private Codec _codec;
private List<string> _multiline { get; set; }
private CodecArguments _codecArguments;
private ICodec _codec;
public bool Stop { get; set; }
public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken)
public LogsListener(TimberWinR.Parser.LogParameters arguments, CancellationToken cancelToken)
: base(cancelToken, "Win32-FileLog")
{
Stop = false;
_codec = arguments.Codec;
_codecArguments = arguments.CodecArguments;
_codecArguments = arguments.CodecArguments;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
_codec = new Multiline(_codecArguments);
_logFileMaxRecords = new Dictionary<string, Int64>();
_logFileCreationTimes = new Dictionary<string, DateTime>();
_logFileSampleTimes = new Dictionary<string, DateTime>();
@@ -240,94 +105,22 @@ namespace TimberWinR.Inputs
)));
if (_codec != null)
if (_codecArguments != null)
{
var cp = new JProperty("codec",
new JArray(
new JObject(
new JProperty("type", _codec.Type.ToString()),
new JProperty("what", _codec.What.ToString()),
new JProperty("negate", _codec.Negate),
new JProperty("multilineTag", _codec.MultilineTag),
new JProperty("pattern", _codec.Pattern))));
new JProperty("type", _codecArguments.Type.ToString()),
new JProperty("what", _codecArguments.What.ToString()),
new JProperty("negate", _codecArguments.Negate),
new JProperty("multilineTag", _codecArguments.MultilineTag),
new JProperty("pattern", _codecArguments.Pattern))));
json.Add(cp);
}
return json;
}
// return true to cancel codec
private void applyMultilineCodec(string msg)
{
if (_codec.Re == null)
_codec.Re = new Regex(_codec.Pattern);
Match match = _codec.Re.Match(msg);
bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate);
switch (_codec.What)
{
case Codec.WhatType.previous:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No Match
{
if (_multiline != null)
{
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codec.MultilineTag));
AddDefaultFields(jo);
ProcessJson(jo);
_receivedMessages++;
}
_multiline = new List<string>();
_multiline.Add(msg);
}
break;
case Codec.WhatType.next:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No match
{
if (_multiline != null)
{
_multiline.Add(msg);
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codec.MultilineTag));
AddDefaultFields(jo);
ProcessJson(jo);
_receivedMessages++;
}
else
{
JObject jo = new JObject();
jo["message"] = msg;
AddDefaultFields(jo);
ProcessJson(jo);
_receivedMessages++;
}
}
break;
}
}
}
private void FileWatcher(string fileToWatch)
{
@@ -338,6 +131,8 @@ namespace TimberWinR.Inputs
recurse = _arguments.Recurse
};
Dictionary<string, string> _fnfmap = new Dictionary<string, string>();
using (var syncHandle = new ManualResetEventSlim())
{
// Execute the query
@@ -436,8 +231,12 @@ namespace TimberWinR.Inputs
string msg = json["Text"].ToString();
if (!string.IsNullOrEmpty(msg))
{
if (_codec != null && _codec.Type == Codec.CodecType.multiline)
applyMultilineCodec(msg);
if (_codecArguments != null &&
_codecArguments.Type == CodecArguments.CodecType.multiline)
{
_codec.Apply(msg, this);
_receivedMessages++;
}
else
{
ProcessJson(json);
@@ -456,15 +255,17 @@ namespace TimberWinR.Inputs
rs = null;
GC.Collect();
}
// Sleep
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (FileNotFoundException fnfex)
{
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
string fn = fnfex.FileName;
if (!_fnfmap.ContainsKey(fn))
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
_fnfmap[fn] = fn;
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}
@@ -474,7 +275,20 @@ namespace TimberWinR.Inputs
}
finally
{
oLogQuery = null;
try
{
oLogQuery = null;
// Sleep
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException)
{
}
catch (Exception ex1)
{
LogManager.GetCurrentClassLogger().Warn(ex1);
}
}
}
}

View File

@@ -8,6 +8,7 @@ using System.Threading;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NLog;
using TimberWinR.Codecs;
using TimberWinR.Parser;
namespace TimberWinR.Inputs
@@ -15,13 +16,16 @@ namespace TimberWinR.Inputs
public class StdinListener : InputListener
{
private Thread _listenThread;
private Codec _codec;
private List<string> _multiline { get; set; }
private CodecArguments _codecArguments;
private ICodec _codec;
public StdinListener(TimberWinR.Parser.Stdin arguments, CancellationToken cancelToken)
: base(cancelToken, "Win32-Console")
{
_codec = arguments.Codec;
_codecArguments = arguments.CodecArguments;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
_codec = new Multiline(_codecArguments);
_listenThread = new Thread(new ThreadStart(ListenToStdin));
_listenThread.Start();
}
@@ -32,16 +36,16 @@ namespace TimberWinR.Inputs
new JProperty("stdin", "enabled"));
if (_codec != null)
if (_codecArguments != null)
{
var cp = new JProperty("codec",
new JArray(
new JObject(
new JProperty("type", _codec.Type.ToString()),
new JProperty("what", _codec.What.ToString()),
new JProperty("negate", _codec.Negate),
new JProperty("multilineTag", _codec.MultilineTag),
new JProperty("pattern", _codec.Pattern))));
new JProperty("type", _codecArguments.Type.ToString()),
new JProperty("what", _codecArguments.What.ToString()),
new JProperty("negate", _codecArguments.Negate),
new JProperty("multilineTag", _codecArguments.MultilineTag),
new JProperty("pattern", _codecArguments.Pattern))));
json.Add(cp);
}
@@ -65,8 +69,8 @@ namespace TimberWinR.Inputs
{
string msg = ToPrintable(line);
if (_codec != null && _codec.Type == Codec.CodecType.multiline)
applyMultilineCodec(msg);
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
_codec.Apply(msg, this);
else
{
JObject jo = new JObject();
@@ -78,73 +82,5 @@ namespace TimberWinR.Inputs
}
Finished();
}
// return true to cancel codec
private void applyMultilineCodec(string msg)
{
if (_codec.Re == null)
_codec.Re = new Regex(_codec.Pattern);
Match match = _codec.Re.Match(msg);
bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate);
switch (_codec.What)
{
case Codec.WhatType.previous:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No Match
{
if (_multiline != null)
{
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codec.MultilineTag));
AddDefaultFields(jo);
ProcessJson(jo);
}
_multiline = new List<string>();
_multiline.Add(msg);
}
break;
case Codec.WhatType.next:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No match
{
if (_multiline != null)
{
_multiline.Add(msg);
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codec.MultilineTag));
AddDefaultFields(jo);
ProcessJson(jo);
}
else
{
JObject jo = new JObject();
jo["message"] = msg;
AddDefaultFields(jo);
ProcessJson(jo);
}
}
break;
}
}
}
}

View File

@@ -0,0 +1,264 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Configuration;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using Interop.MSUtil;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using NLog;
using NLog.LayoutRenderers;
using TimberWinR.Codecs;
using TimberWinR.Parser;
namespace TimberWinR.Inputs
{
/// <summary>
/// Tail a file.
/// </summary>
public class TailFileListener : InputListener
{
private int _pollingIntervalInSeconds;
private TimberWinR.Parser.TailFile _arguments;
private long _receivedMessages;
private Dictionary<string, Int64> _logFileMaxRecords;
private Dictionary<string, DateTime> _logFileCreationTimes;
private Dictionary<string, DateTime> _logFileSampleTimes;
private Dictionary<string, long> _logFileSizes;
private CodecArguments _codecArguments;
private ICodec _codec;
public bool Stop { get; set; }
public TailFileListener(TimberWinR.Parser.TailFile arguments, CancellationToken cancelToken)
: base(cancelToken, "Win32-TailLog")
{
Stop = false;
_codecArguments = arguments.CodecArguments;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
_codec = new Multiline(_codecArguments);
_logFileMaxRecords = new Dictionary<string, Int64>();
_logFileCreationTimes = new Dictionary<string, DateTime>();
_logFileSampleTimes = new Dictionary<string, DateTime>();
_logFileSizes = new Dictionary<string, long>();
_receivedMessages = 0;
_arguments = arguments;
_pollingIntervalInSeconds = arguments.Interval;
foreach (string srcFile in _arguments.Location.Split(','))
{
string file = srcFile.Trim();
Task.Factory.StartNew(() => TailFileWatcher(file));
}
}
public override void Shutdown()
{
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
Stop = true;
base.Shutdown();
}
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("log",
new JObject(
new JProperty("messages", _receivedMessages),
new JProperty("type", InputType),
new JProperty("location", _arguments.Location),
new JProperty("logSource", _arguments.LogSource),
new JProperty("recurse", _arguments.Recurse),
new JProperty("files",
new JArray(from f in _logFileMaxRecords.Keys
select new JValue(f))),
new JProperty("fileSampleTimes",
new JArray(from f in _logFileSampleTimes.Values
select new JValue(f))),
new JProperty("fileSizes",
new JArray(from f in _logFileSizes.Values
select new JValue(f))),
new JProperty("fileIndices",
new JArray(from f in _logFileMaxRecords.Values
select new JValue(f))),
new JProperty("fileCreationDates",
new JArray(from f in _logFileCreationTimes.Values
select new JValue(f)))
)));
if (_codecArguments != null)
{
var cp = new JProperty("codec",
new JArray(
new JObject(
new JProperty("type", _codecArguments.Type.ToString()),
new JProperty("what", _codecArguments.What.ToString()),
new JProperty("negate", _codecArguments.Negate),
new JProperty("multilineTag", _codecArguments.MultilineTag),
new JProperty("pattern", _codecArguments.Pattern))));
json.Add(cp);
}
return json;
}
private void TailFileContents(string fileName, long offset)
{
using (StreamReader reader = new StreamReader(new FileStream(fileName,
FileMode.Open, FileAccess.Read, FileShare.ReadWrite)))
{
//start at the end of the file
long lastMaxOffset = offset;
//if the file size has not changed, idle
if (reader.BaseStream.Length == lastMaxOffset)
return;
//seek to the last max offset
reader.BaseStream.Seek(lastMaxOffset, SeekOrigin.Begin);
//read out of the file until the EOF
string line = "";
long lineOffset = 0;
while ((line = reader.ReadLine()) != null)
{
if (string.IsNullOrEmpty(line))
continue;
long index = lastMaxOffset + lineOffset;
string text = line;
string logFileName = fileName;
var json = new JObject();
if (json["logSource"] == null)
{
if (string.IsNullOrEmpty(_arguments.LogSource))
json.Add(new JProperty("logSource", fileName));
else
json.Add(new JProperty("logSource", _arguments.LogSource));
}
json["Text"] = line;
json["Index"] = index;
json["LogFileName"] = fileName;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
{
_codec.Apply(line, this);
Interlocked.Increment(ref _receivedMessages);
}
else
{
ProcessJson(json);
Interlocked.Increment(ref _receivedMessages);
}
lineOffset += line.Length;
}
//update the last max offset
lastMaxOffset = reader.BaseStream.Position;
}
}
// One thread for each kind of file to watch, i.e. "*.log,*.txt" would be two separate
// threads.
private void TailFileWatcher(string fileToWatch)
{
Dictionary<string, string> _fnfmap = new Dictionary<string, string>();
using (var syncHandle = new ManualResetEventSlim())
{
// Execute the query
while (!Stop && !CancelToken.IsCancellationRequested)
{
try
{
if (!CancelToken.IsCancellationRequested)
{
string path = Path.GetDirectoryName(fileToWatch);
string name = Path.GetFileName(fileToWatch);
if (string.IsNullOrEmpty(path))
path = ".";
// Ok, we have a potential file filter here as 'fileToWatch' could be foo.log or *.log
SearchOption so = SearchOption.TopDirectoryOnly;
if (_arguments.Recurse == -1)
so = SearchOption.AllDirectories;
foreach (string fileName in Directory.GetFiles(path, name, so))
{
var dbe = LogsFileDatabase.LookupLogFile(fileName);
FileInfo fi = new FileInfo(dbe.FileName);
//LogManager.GetCurrentClassLogger().Info("Located File: {0}, New: {1}", dbe.FileName, dbe.NewFile);
long length = fi.Length;
bool logHasRolled = false;
if (fi.Length < dbe.Size || fi.CreationTimeUtc != dbe.CreationTimeUtc)
{
LogManager.GetCurrentClassLogger().Info("Log has Rolled: {0}", dbe.FileName);
logHasRolled = true;
}
bool processWholeFile = logHasRolled || dbe.NewFile;
if (processWholeFile)
{
LogManager.GetCurrentClassLogger().Info("Process Whole File: {0}", dbe.FileName);
TailFileContents(dbe.FileName, 0);
}
else
{
TailFileContents(dbe.FileName, dbe.Size);
}
LogsFileDatabase.Update(dbe);
}
}
}
catch (FileNotFoundException fnfex)
{
string fn = fnfex.FileName;
if (!_fnfmap.ContainsKey(fn))
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
_fnfmap[fn] = fn;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
finally
{
try
{
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException)
{
}
catch (Exception ex1)
{
LogManager.GetCurrentClassLogger().Warn(ex1);
}
}
}
}
Finished();
}
}
}

View File

@@ -13,14 +13,15 @@ namespace TimberWinR.Inputs
public class UdpInputListener : InputListener
{
private readonly System.Net.Sockets.UdpClient _udpListener;
private IPEndPoint groupV4;
private IPEndPoint groupV6;
private readonly IPEndPoint groupV4;
private readonly IPEndPoint groupV6;
private Thread _listenThreadV4;
private Thread _listenThreadV6;
private readonly int _port;
private long _receivedMessages;
private long _parsedErrors;
private struct listenProfile
{
@@ -34,6 +35,7 @@ namespace TimberWinR.Inputs
new JProperty("udp",
new JObject(
new JProperty("port", _port),
new JProperty("errors", _parsedErrors),
new JProperty("messages", _receivedMessages)
)));
@@ -45,6 +47,9 @@ namespace TimberWinR.Inputs
{
_port = port;
groupV4 = new IPEndPoint(IPAddress.Any, 0);
groupV6 = new IPEndPoint(IPAddress.IPv6Any, 0);
LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", _port);
_receivedMessages = 0;
@@ -71,22 +76,25 @@ namespace TimberWinR.Inputs
private void StartListener(object useProfile)
{
var profile = (listenProfile)useProfile;
string lastMessage = "";
try
{
while (!CancelToken.IsCancellationRequested)
{
byte[] bytes = profile.client.Receive(ref profile.endPoint);
try
{
var data = Encoding.ASCII.GetString(bytes, 0, bytes.Length);
byte[] bytes = profile.client.Receive(ref profile.endPoint);
var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
lastMessage = data;
JObject json = JObject.Parse(data);
ProcessJson(json);
_receivedMessages++;
}
catch (Exception ex1)
{
LogManager.GetCurrentClassLogger().Warn("Bad JSON: {0}", lastMessage);
LogManager.GetCurrentClassLogger().Warn(ex1);
_parsedErrors++;
}
}
_udpListener.Close();

View File

@@ -23,11 +23,11 @@ namespace TimberWinR.Inputs
public class W3CInputListener : InputListener
{
private readonly int _pollingIntervalInSeconds;
private readonly TimberWinR.Parser.W3CLog _arguments;
private readonly TimberWinR.Parser.W3CLogParameters _arguments;
private long _receivedMessages;
public bool Stop { get; set; }
public W3CInputListener(TimberWinR.Parser.W3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
public W3CInputListener(TimberWinR.Parser.W3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
: base(cancelToken, "Win32-W3CLog")
{
_arguments = arguments;
@@ -153,7 +153,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -165,7 +165,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -26,6 +26,8 @@ namespace TimberWinR
public List<TcpInputListener> Tcps { get; set; }
public List<TcpInputListener> Udps { get; set; }
public List<InputListener> Listeners { get; set; }
public bool LiveMonitor { get; set; }
public DateTime StartedOn { get; set; }
public string JsonConfig { get; set; }
public string LogfileDir { get; set; }
@@ -60,11 +62,17 @@ namespace TimberWinR
Interlocked.Add(ref numMessages, count);
}
public Manager(string jsonConfigFile, string logLevel, string logfileDir, CancellationToken cancelToken)
public Manager()
{
LogsFileDatabase.Manager = this;
}
public Manager(string jsonConfigFile, string logLevel, string logfileDir, bool liveMonitor, CancellationToken cancelToken)
{
LogsFileDatabase.Manager = this;
StartedOn = DateTime.UtcNow;
LiveMonitor = liveMonitor;
var vfi = new FileInfo(jsonConfigFile);
@@ -105,7 +113,6 @@ namespace TimberWinR
LogManager.GetCurrentClassLogger()
.Info("Database Directory: {0}", LogsFileDatabase.Instance.DatabaseFileName);
try
{
// Is it a directory?
@@ -113,7 +120,7 @@ namespace TimberWinR
{
DirectoryInfo di = new DirectoryInfo(jsonConfigFile);
LogManager.GetCurrentClassLogger().Info("Initialized, Reading Configurations From {0}", di.FullName);
Config = Configuration.FromDirectory(jsonConfigFile);
Config = Configuration.FromDirectory(jsonConfigFile, cancelToken, this);
}
else
{
@@ -139,36 +146,40 @@ namespace TimberWinR
LogManager.GetCurrentClassLogger().Info("Log Directory {0}", logfileDir);
LogManager.GetCurrentClassLogger().Info("Logging Level: {0}", LogManager.GlobalThreshold);
// Read the Configuration file
if (Config != null)
ProcessConfiguration(cancelToken, Config);
}
public void ProcessConfiguration(CancellationToken cancelToken, Configuration config)
{
// Read the Configuration file
if (config != null)
{
if (Config.RedisOutputs != null)
if (config.RedisOutputs != null)
{
foreach (var ro in Config.RedisOutputs)
foreach (var ro in config.RedisOutputs)
{
var redis = new RedisOutput(this, ro, cancelToken);
Outputs.Add(redis);
}
}
if (Config.ElasticsearchOutputs != null)
if (config.ElasticsearchOutputs != null)
{
foreach (var ro in Config.ElasticsearchOutputs)
foreach (var ro in config.ElasticsearchOutputs)
{
var els = new ElasticsearchOutput(this, ro, cancelToken);
Outputs.Add(els);
}
}
if (Config.StdoutOutputs != null)
if (config.StdoutOutputs != null)
{
foreach (var ro in Config.StdoutOutputs)
foreach (var ro in config.StdoutOutputs)
{
var stdout = new StdoutOutput(this, ro, cancelToken);
Outputs.Add(stdout);
}
}
foreach (Parser.IISW3CLog iisw3cConfig in Config.IISW3C)
foreach (Parser.IISW3CLogParameters iisw3cConfig in config.IISW3C)
{
var elistner = new IISW3CInputListener(iisw3cConfig, cancelToken);
Listeners.Add(elistner);
@@ -176,7 +187,7 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (Parser.W3CLog iisw3cConfig in Config.W3C)
foreach (Parser.W3CLogParameters iisw3cConfig in config.W3C)
{
var elistner = new W3CInputListener(iisw3cConfig, cancelToken);
Listeners.Add(elistner);
@@ -184,7 +195,7 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (Parser.WindowsEvent eventConfig in Config.Events)
foreach (Parser.WindowsEvent eventConfig in config.Events)
{
var elistner = new WindowsEvtInputListener(eventConfig, cancelToken);
Listeners.Add(elistner);
@@ -192,7 +203,7 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (var logConfig in Config.Logs)
foreach (var logConfig in config.Logs)
{
var elistner = new LogsListener(logConfig, cancelToken);
Listeners.Add(elistner);
@@ -200,7 +211,15 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (var tcp in Config.Tcps)
foreach (var logConfig in config.TailFiles)
{
var elistner = new TailFileListener(logConfig, cancelToken);
Listeners.Add(elistner);
foreach (var output in Outputs)
output.Connect(elistner);
}
foreach (var tcp in config.Tcps)
{
var elistner = new TcpInputListener(cancelToken, tcp.Port);
Listeners.Add(elistner);
@@ -208,7 +227,7 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (var udp in Config.Udps)
foreach (var udp in config.Udps)
{
var elistner = new UdpInputListener(cancelToken, udp.Port);
Listeners.Add(elistner);
@@ -216,7 +235,7 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (var stdin in Config.Stdins)
foreach (var stdin in config.Stdins)
{
var elistner = new StdinListener(stdin, cancelToken);
Listeners.Add(elistner);
@@ -225,28 +244,28 @@ namespace TimberWinR
}
var computerName = System.Environment.MachineName + "." +
Microsoft.Win32.Registry.LocalMachine.OpenSubKey(
@"SYSTEM\CurrentControlSet\services\Tcpip\Parameters")
.GetValue("Domain", "")
.ToString();
Microsoft.Win32.Registry.LocalMachine.OpenSubKey(
@"SYSTEM\CurrentControlSet\services\Tcpip\Parameters")
.GetValue("Domain", "")
.ToString();
foreach (var output in Outputs)
{
var name = Assembly.GetExecutingAssembly().GetName();
JObject json = new JObject(
new JProperty("TimberWinR",
new JObject(
new JProperty("version", GetAssemblyByName("TimberWinR.ServiceHost").GetName().Version.ToString()),
new JProperty("host", computerName),
new JProperty("output", output.Name),
new JProperty("initialized", DateTime.UtcNow)
)));
new JProperty("TimberWinR",
new JObject(
new JProperty("version",
GetAssemblyByName("TimberWinR.ServiceHost").GetName().Version.ToString()),
new JProperty("host", computerName),
new JProperty("output", output.Name),
new JProperty("initialized", DateTime.UtcNow)
)));
json.Add(new JProperty("type", "Win32-TimberWinR"));
json.Add(new JProperty("host", computerName));
output.Startup(json);
}
}
}

View File

@@ -2,52 +2,91 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Elasticsearch.Net.ConnectionPool;
using Nest;
using Newtonsoft.Json.Linq;
using NLog;
using RapidRegex.Core;
using RestSharp;
using System.Text.RegularExpressions;
using Elasticsearch.Net.Serialization;
using Newtonsoft.Json;
namespace TimberWinR.Outputs
{
using System.Text.RegularExpressions;
public class Person
{
public string Firstname { get; set; }
public string Lastname { get; set; }
}
public partial class ElasticsearchOutput : OutputSender
{
private TimberWinR.Manager _manager;
private readonly int _port;
private readonly int _interval;
private readonly string[] _host;
private readonly string _protocol;
private int _hostIndex;
private readonly int _flushSize;
private readonly int _idleFlushTimeSeconds;
private readonly string[] _hosts;
private readonly string _protocol;
private readonly int _timeout;
private readonly object _locker = new object();
private readonly List<JObject> _jsonQueue;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private Parser.ElasticsearchOutput eo;
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
private Parser.ElasticsearchOutputParameters _parameters;
public bool Stop { get; set; }
/// <summary>
/// Get the bulk connection pool of hosts
/// </summary>
/// <returns></returns>
private ElasticClient getClient()
{
var nodes = new List<Uri>();
foreach (var host in _hosts)
{
var url = string.Format("http://{0}:{1}", host, _port);
nodes.Add(new Uri(url));
}
var pool = new StaticConnectionPool(nodes.ToArray());
var settings = new ConnectionSettings(pool)
.ExposeRawResponse();
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken)
var client = new ElasticClient(settings);
return client;
}
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutputParameters parameters, CancellationToken cancelToken)
: base(cancelToken, "Elasticsearch")
{
_sentMessages = 0;
_errorCount = 0;
this.eo = eo;
_protocol = eo.Protocol;
_timeout = eo.Timeout;
_parameters = parameters;
_flushSize = parameters.FlushSize;
_idleFlushTimeSeconds = parameters.IdleFlushTimeInSeconds;
_protocol = parameters.Protocol;
_timeout = parameters.Timeout;
_manager = manager;
_port = eo.Port;
_interval = eo.Interval;
_host = eo.Host;
_hostIndex = 0;
_port = parameters.Port;
_interval = parameters.Interval;
_hosts = parameters.Host;
_jsonQueue = new List<JObject>();
_numThreads = eo.NumThreads;
_numThreads = parameters.NumThreads;
_maxQueueSize = parameters.MaxQueueSize;
_queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest;
for (int i = 0; i < eo.NumThreads; i++)
for (int i = 0; i < parameters.NumThreads; i++)
{
Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
@@ -58,16 +97,20 @@ namespace TimberWinR.Outputs
JObject json = new JObject(
new JProperty("elasticsearch",
new JObject(
new JProperty("host", string.Join(",", _host)),
new JProperty("host", string.Join(",", _hosts)),
new JProperty("errors", _errorCount),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
new JProperty("sentMmessageCount", _sentMessages),
new JProperty("queuedMessageCount", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("flushSize", _flushSize),
new JProperty("idleFlushTime", _idleFlushTimeSeconds),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("maxQueueSize", _maxQueueSize),
new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest),
new JProperty("hosts",
new JArray(
from h in _host
from h in _hosts
select new JObject(
new JProperty("host", h)))))));
return json;
@@ -77,137 +120,145 @@ namespace TimberWinR.Outputs
//
private void ElasticsearchSender()
{
// Force an inital flush
DateTime lastFlushTime = DateTime.MinValue;
using (var syncHandle = new ManualResetEventSlim())
{
{
// Execute the query
while (!Stop)
{
{
if (!CancelToken.IsCancellationRequested)
{
try
{
JObject[] messages;
{
int messageCount = 0;
List<JObject> messages = new List<JObject>();
// Lets get whats in the queue
lock (_locker)
{
var count = _jsonQueue.Count;
messages = _jsonQueue.Take(count).ToArray();
_jsonQueue.RemoveRange(0, count);
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
messageCount = _jsonQueue.Count;
// Time to flush?
if (messageCount >= _flushSize || (DateTime.UtcNow - lastFlushTime).Seconds >= _idleFlushTimeSeconds)
{
messages = _jsonQueue.Take(messageCount).ToList();
_jsonQueue.RemoveRange(0, messageCount);
if (messages.Count > 0)
_manager.IncrementMessageCount(messages.Count);
}
}
if (messages.Length > 0)
// We have some messages to work with
if (messages.Count > 0)
{
int numHosts = _host.Length;
while (numHosts-- > 0)
var client = getClient();
LogManager.GetCurrentClassLogger()
.Debug("Sending {0} Messages to {1}", messages.Count, string.Join(",", _hosts));
// This loop will process all messages we've taken from the queue
// that have the same index and type (an elasticsearch requirement)
do
{
try
{
// Get the next client
RestClient client = getClient();
if (client != null)
{
LogManager.GetCurrentClassLogger()
.Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl);
// Grab all messages with same index and type (this is the whole point, group the same ones)
var bulkTypeName = this._parameters.GetTypeName(messages[0]);
var bulkIndexName = this._parameters.GetIndexName(messages[0]);
foreach (JObject json in messages)
{
var typeName = this.eo.GetTypeName(json);
var indexName = this.eo.GetIndexName(json);
var req =
new RestRequest(string.Format("/{0}/{1}/", indexName, typeName),
Method.POST);
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);
req.RequestFormat = DataFormat.Json;
try
{
client.ExecuteAsync(req, response =>
{
if (response.StatusCode != HttpStatusCode.Created)
{
LogManager.GetCurrentClassLogger()
.Error("Failed to send: {0}", response.ErrorMessage);
Interlocked.Increment(ref _errorCount);
}
else
{
_sentMessages++;
GC.Collect();
}
});
}
catch (Exception error)
{
LogManager.GetCurrentClassLogger().Error(error);
Interlocked.Increment(ref _errorCount);
}
}
GC.Collect();
}
else
{
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Elasticsearch hosts, {0}",
String.Join(",", _host));
Interlocked.Increment(ref _errorCount);
}
IEnumerable<JObject> bulkItems =
messages.TakeWhile(
message =>
String.Compare(bulkTypeName, _parameters.GetTypeName(message), false) == 0 &&
String.Compare(bulkIndexName, _parameters.GetIndexName(message), false) == 0);
// Send the message(s), if the are successfully sent, they
// are removed from the queue
lastFlushTime = transmitBulkData(bulkItems, bulkIndexName, bulkTypeName, client, lastFlushTime, messages);
GC.Collect();
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
break;
}
}
} while (messages.Count > 0);
}
GC.Collect();
if (!Stop)
{
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
{
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
}
catch (OperationCanceledException oce)
}
catch (OperationCanceledException)
{
break;
}
catch (Exception)
catch (Exception ex)
{
throw;
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
}
}
private RestClient getClient()
//
// Send the messages to Elasticsearch (bulk)
//
private DateTime transmitBulkData(IEnumerable<JObject> bulkItems, string bulkIndexName, string bulkTypeName,
ElasticClient client, DateTime lastFlushTime, List<JObject> messages)
{
if (_hostIndex >= _host.Length)
_hostIndex = 0;
int numTries = 0;
while (numTries < _host.Length)
var bulkRequest = new BulkRequest() {Refresh = true};
bulkRequest.Operations = new List<IBulkOperation>();
foreach (var json in bulkItems)
{
try
{
string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":", ""), _host[_hostIndex], _port);
var client = new RestClient(url);
client.Timeout = _timeout;
_hostIndex++;
if (_hostIndex >= _host.Length)
_hostIndex = 0;
return client;
}
catch (Exception)
{
}
numTries++;
// ES requires a timestamp, add one if not present
var ts = json["@timestamp"];
if (ts == null)
json["@timestamp"] = DateTime.UtcNow;
var bi = new BulkIndexOperation<JObject>(json);
bi.Index = bulkIndexName;
bi.Type = bulkTypeName;
bulkRequest.Operations.Add(bi);
}
return null;
// The total messages processed for this operation.
int numMessages = bulkItems.Count();
var response = client.Bulk(bulkRequest);
if (!response.IsValid)
{
LogManager.GetCurrentClassLogger().Error("Failed to send: {0}", response);
Interlocked.Increment(ref _errorCount);
interlockedInsert(messages); // Put the messages back into the queue
}
else // Success!
{
lastFlushTime = DateTime.UtcNow;
LogManager.GetCurrentClassLogger()
.Info("Successfully sent {0} messages in a single bulk request", numMessages);
Interlocked.Add(ref _sentMessages, numMessages);
}
// Remove them from the working list
messages.RemoveRange(0, numMessages);
return lastFlushTime;
}
// Places messages back into the queue (for a future attempt)
private void interlockedInsert(List<JObject> messages)
{
lock (_locker)
{
_jsonQueue.InsertRange(0, messages);
if (_jsonQueue.Count > _maxQueueSize)
{
LogManager.GetCurrentClassLogger().Warn("Exceeded maximum queue depth");
}
}
}
@@ -221,6 +272,26 @@ namespace TimberWinR.Outputs
lock (_locker)
{
if (_jsonQueue.Count >= _maxQueueSize)
{
// If we've exceeded our queue size, and we're supposed to throw out the oldest objects first,
// then remove as many as necessary to get us under our limit
if (_queueOverflowDiscardOldest)
{
LogManager.GetCurrentClassLogger()
.Warn("Overflow discarding oldest {0} messages", _jsonQueue.Count - _maxQueueSize + 1);
_jsonQueue.RemoveRange(0, (_jsonQueue.Count - _maxQueueSize) + 1);
}
// Otherwise we're in a "discard newest" mode, and this is the newest message, so just ignore it
else
{
LogManager.GetCurrentClassLogger()
.Warn("Overflow discarding newest message: {0}", message);
return;
}
}
_jsonQueue.Add(jsonMessage);
}
}

View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Eventing.Reader;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Sockets;
@@ -17,32 +18,126 @@ using TimberWinR.Parser;
namespace TimberWinR.Outputs
{
internal class BatchCounter
{
// Total number of times reached max batch count (indicates we are under pressure)
public int ReachedMaxBatchCountTimes { get; set; }
private readonly int[] _sampleQueueDepths;
private int _sampleCountIndex;
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
private object _locker = new object();
private bool _warnedReachedMax;
private readonly int _maxBatchCount;
private readonly int _batchCount;
private int _totalSamples;
public int[] Samples()
{
return _sampleQueueDepths;
}
public BatchCounter(int batchCount, int maxBatchCount)
{
_batchCount = batchCount;
_maxBatchCount = maxBatchCount;
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
_sampleCountIndex = 0;
_totalSamples = 0;
ReachedMaxBatchCountTimes = 0;
}
public void SampleQueueDepth(int queueDepth)
{
lock (_locker)
{
if (_totalSamples < QUEUE_SAMPLE_SIZE)
_totalSamples++;
// Take a sample of the queue depth
if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE)
_sampleCountIndex = 0;
_sampleQueueDepths[_sampleCountIndex++] = queueDepth;
}
}
public int AverageQueueDepth()
{
lock (_locker)
{
if (_totalSamples > 0)
{
var samples = _sampleQueueDepths.Take(_totalSamples);
int avg = (int) samples.Average();
return avg;
}
return 0;
}
}
// Sample the queue and adjust the batch count if needed (ramp up slowly)
public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount)
{
if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount)
{
currentBatchCount += Math.Max(_maxBatchCount/_batchCount, 1);
if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
{
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount);
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
ReachedMaxBatchCountTimes++;
currentBatchCount = _maxBatchCount;
}
}
else // Reset to default
{
currentBatchCount = _batchCount;
_warnedReachedMax = false;
}
return currentBatchCount;
}
}
public class RedisOutput : OutputSender
{
public int QueueDepth
{
get { return _jsonQueue.Count; }
}
public long SentMessages
{
get { return _sentMessages; }
}
private readonly string _logstashIndexName;
private readonly int _port;
private readonly int _timeout;
private readonly object _locker = new object();
private readonly List<string> _jsonQueue;
// readonly Task _consumerTask;
private readonly List<string> _jsonQueue;
private readonly string[] _redisHosts;
private int _redisHostIndex;
private TimberWinR.Manager _manager;
private readonly int _batchCount;
private int _currentBatchCount;
private readonly int _maxBatchCount;
private readonly int _interval;
private readonly int _numThreads;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private long _redisDepth;
private int _maxQueueSize;
private bool _queueOverflowDiscardOldest;
private DateTime? _lastErrorTimeUTC;
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
private BatchCounter _batchCounter;
public bool Stop { get; set; }
/// <summary>
/// Get the next client
/// Get the next client from the list of hosts.
/// </summary>
/// <returns></returns>
private RedisClient getClient()
@@ -56,16 +151,17 @@ namespace TimberWinR.Outputs
try
{
RedisClient client = new RedisClient(_redisHosts[_redisHostIndex], _port, _timeout);
_redisHostIndex++;
if (_redisHostIndex >= _redisHosts.Length)
_redisHostIndex = 0;
return client;
}
catch (Exception)
{
}
finally
{
_redisHostIndex++;
if (_redisHostIndex >= _redisHosts.Length)
_redisHostIndex = 0;
}
numTries++;
}
@@ -79,15 +175,21 @@ namespace TimberWinR.Outputs
new JObject(
new JProperty("host", string.Join(",", _redisHosts)),
new JProperty("errors", _errorCount),
new JProperty("redis_depth", _redisDepth),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
new JProperty("lastErrorTimeUTC", _lastErrorTimeUTC),
new JProperty("redisQueueDepth", _redisDepth),
new JProperty("sentMessageCount", _sentMessages),
new JProperty("queuedMessageCount", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("maxQueueSize", _maxQueueSize),
new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("batchcount", _batchCount),
new JProperty("currentBatchCount", _currentBatchCount),
new JProperty("reachedMaxBatchCountTimes", _batchCounter.ReachedMaxBatchCountTimes),
new JProperty("maxBatchCount", _maxBatchCount),
new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()),
new JProperty("queueSamples", new JArray(_batchCounter.Samples())),
new JProperty("index", _logstashIndexName),
new JProperty("hosts",
new JArray(
@@ -97,25 +199,33 @@ namespace TimberWinR.Outputs
return json;
}
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutputParameters parameters, CancellationToken cancelToken)
: base(cancelToken, "Redis")
{
{
_redisDepth = 0;
_batchCount = ro.BatchCount;
_batchCount = parameters.BatchCount;
_maxBatchCount = parameters.MaxBatchCount;
// Make sure maxBatchCount is larger than batchCount
if (_maxBatchCount < _batchCount)
_maxBatchCount = _batchCount*10;
_manager = manager;
_redisHostIndex = 0;
_redisHosts = ro.Host;
_redisHosts = parameters.Host;
_jsonQueue = new List<string>();
_port = ro.Port;
_timeout = ro.Timeout;
_logstashIndexName = ro.Index;
_interval = ro.Interval;
_numThreads = ro.NumThreads;
_port = parameters.Port;
_timeout = parameters.Timeout;
_logstashIndexName = parameters.Index;
_interval = parameters.Interval;
_numThreads = parameters.NumThreads;
_errorCount = 0;
_maxQueueSize = ro.MaxQueueSize;
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
for (int i = 0; i < ro.NumThreads; i++)
_lastErrorTimeUTC = null;
_maxQueueSize = parameters.MaxQueueSize;
_queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest;
_batchCounter = new BatchCounter(_batchCount, _maxBatchCount);
_currentBatchCount = _batchCount;
for (int i = 0; i < parameters.NumThreads; i++)
{
var redisThread = new Task(RedisSender, cancelToken);
redisThread.Start();
@@ -181,8 +291,7 @@ namespace TimberWinR.Outputs
}
}
return drop;
}
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
@@ -198,17 +307,23 @@ namespace TimberWinR.Outputs
try
{
string[] messages;
// Exclusively
lock (_locker)
{
messages = _jsonQueue.Take(_batchCount).ToArray();
_batchCounter.SampleQueueDepth(_jsonQueue.Count);
// Re-compute current batch size
_currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount);
messages = _jsonQueue.Take(_currentBatchCount).ToArray();
_jsonQueue.RemoveRange(0, messages.Length);
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
}
if (messages.Length > 0)
{
int numHosts = _redisHosts.Length;
bool sentSuccessfully = false;
while (numHosts-- > 0)
{
try
@@ -225,17 +340,24 @@ namespace TimberWinR.Outputs
try
{
_redisDepth = client.RPush(_logstashIndexName, messages);
_sentMessages += messages.Length;
_sentMessages += messages.Length;
client.EndPipe();
sentSuccessfully = true;
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
}
catch (SocketException ex)
{
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTimeUTC = DateTime.UtcNow;
}
finally
catch (Exception ex)
{
client.EndPipe();
}
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTimeUTC = DateTime.UtcNow;
}
break;
}
else
@@ -244,6 +366,7 @@ namespace TimberWinR.Outputs
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Redis hosts, {0}",
String.Join(",", _redisHosts));
_lastErrorTimeUTC = DateTime.UtcNow;
}
}
}
@@ -251,25 +374,40 @@ namespace TimberWinR.Outputs
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTimeUTC = DateTime.UtcNow;
}
} // No more hosts to try.
if (!sentSuccessfully)
{
lock (_locker)
{
_jsonQueue.InsertRange(0, messages);
}
}
}
GC.Collect();
// GC.Collect();
if (!Stop)
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}
catch(ThreadAbortException)
{
break;
}
catch (Exception ex)
{
throw;
_lastErrorTimeUTC = DateTime.UtcNow;
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
}
}
}
}
}

View File

@@ -17,7 +17,7 @@ namespace TimberWinR.Outputs
private long _sentMessages;
public bool Stop { get; set; }
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken)
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutputParameters eo, CancellationToken cancelToken)
: base(cancelToken, "Stdout")
{
_sentMessages = 0;
@@ -34,7 +34,7 @@ namespace TimberWinR.Outputs
JObject json = new JObject(
new JProperty("stdout",
new JObject(
new JProperty("sent_messages", _sentMessages))));
new JProperty("sentMessageCount", _sentMessages))));
return json;
}
@@ -78,7 +78,7 @@ namespace TimberWinR.Outputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -255,7 +255,7 @@ namespace TimberWinR.Parser
public class Stdin : IValidateSchema
{
[JsonProperty(PropertyName = "codec")]
public Codec Codec { get; set; }
public CodecArguments CodecArguments { get; set; }
public void Validate()
{
@@ -263,7 +263,7 @@ namespace TimberWinR.Parser
}
}
public class Codec
public class CodecArguments
{
public enum CodecType
{
@@ -290,14 +290,44 @@ namespace TimberWinR.Parser
public Regex Re { get; set; }
public Codec()
public CodecArguments()
{
Negate = false;
MultilineTag = "multiline";
}
}
public class Log : IValidateSchema
public class TailFile : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
[JsonProperty(PropertyName = "recurse")]
public int Recurse { get; set; }
[JsonProperty(PropertyName = "fields")]
public List<Field> Fields { get; set; }
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
[JsonProperty(PropertyName = "logSource")]
public string LogSource { get; set; }
[JsonProperty(PropertyName = "codec")]
public CodecArguments CodecArguments { get; set; }
public TailFile()
{
Fields = new List<Field>();
Fields.Add(new Field("LogFilename", "string"));
Fields.Add(new Field("Index", "integer"));
Fields.Add(new Field("Text", "string"));
Interval = 30;
}
public void Validate()
{
}
}
public class LogParameters : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
@@ -314,9 +344,9 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "logSource")]
public string LogSource { get; set; }
[JsonProperty(PropertyName = "codec")]
public Codec Codec { get; set; }
public CodecArguments CodecArguments { get; set; }
public Log()
public LogParameters()
{
Fields = new List<Field>();
Fields.Add(new Field("LogFilename", "string"));
@@ -331,12 +361,12 @@ namespace TimberWinR.Parser
}
}
public class Tcp : IValidateSchema
public class TcpParameters : IValidateSchema
{
[JsonProperty(PropertyName = "port")]
public int Port { get; set; }
public Tcp()
public TcpParameters()
{
Port = 5140;
}
@@ -348,12 +378,12 @@ namespace TimberWinR.Parser
}
public class Udp : IValidateSchema
public class UdpParameters : IValidateSchema
{
[JsonProperty(PropertyName = "port")]
public int Port { get; set; }
public Udp()
public UdpParameters()
{
Port = 5142;
}
@@ -363,7 +393,7 @@ namespace TimberWinR.Parser
}
}
public class W3CLog : IValidateSchema
public class W3CLogParameters : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
@@ -380,7 +410,7 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "fields")]
public List<Field> Fields { get; set; }
public W3CLog()
public W3CLogParameters()
{
CodePage = 0;
DtLines = 10;
@@ -398,7 +428,7 @@ namespace TimberWinR.Parser
}
public class IISW3CLog : IValidateSchema
public class IISW3CLogParameters : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
@@ -418,7 +448,7 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "fields")]
public List<Field> Fields { get; set; }
public IISW3CLog()
public IISW3CLogParameters()
{
CodePage = -2;
Recurse = 0;
@@ -464,7 +494,7 @@ namespace TimberWinR.Parser
}
}
public class ElasticsearchOutput
public class ElasticsearchOutputParameters
{
const string IndexDatePattern = "(%\\{(?<format>[^\\}]+)\\})";
@@ -482,9 +512,19 @@ namespace TimberWinR.Parser
public string Protocol { get; set; }
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
[JsonProperty(PropertyName = "flush_size")]
public int FlushSize { get; set; }
[JsonProperty(PropertyName = "idle_flush_time")]
public int IdleFlushTimeInSeconds { get; set; }
[JsonProperty(PropertyName = "max_queue_size")]
public int MaxQueueSize { get; set; }
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
public bool QueueOverflowDiscardOldest { get; set; }
public ElasticsearchOutput()
public ElasticsearchOutputParameters()
{
FlushSize = 5000;
IdleFlushTimeInSeconds = 10;
Protocol = "http";
Port = 9200;
Index = "";
@@ -492,6 +532,8 @@ namespace TimberWinR.Parser
Timeout = 10000;
NumThreads = 1;
Interval = 1000;
QueueOverflowDiscardOldest = true;
MaxQueueSize = 50000;
}
public string GetIndexName(JObject json)
@@ -534,7 +576,7 @@ namespace TimberWinR.Parser
}
public class RedisOutput
public class RedisOutputParameters
{
[JsonProperty(PropertyName = "host")]
public string[] Host { get; set; }
@@ -546,6 +588,8 @@ namespace TimberWinR.Parser
public int Timeout { get; set; }
[JsonProperty(PropertyName = "batch_count")]
public int BatchCount { get; set; }
[JsonProperty(PropertyName = "max_batch_count")]
public int MaxBatchCount { get; set; }
[JsonProperty(PropertyName = "threads")]
public int NumThreads { get; set; }
[JsonProperty(PropertyName = "interval")]
@@ -555,13 +599,14 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
public bool QueueOverflowDiscardOldest { get; set; }
public RedisOutput()
public RedisOutputParameters()
{
Port = 6379;
Index = "logstash";
Host = new string[] { "localhost" };
Timeout = 10000;
BatchCount = 10;
MaxBatchCount = BatchCount*10;
NumThreads = 1;
Interval = 5000;
QueueOverflowDiscardOldest = true;
@@ -569,12 +614,12 @@ namespace TimberWinR.Parser
}
}
public class StdoutOutput
public class StdoutOutputParameters
{
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
public StdoutOutput()
public StdoutOutputParameters()
{
Interval = 1000;
}
@@ -583,13 +628,13 @@ namespace TimberWinR.Parser
public class OutputTargets
{
[JsonProperty("Redis")]
public RedisOutput[] Redis { get; set; }
public RedisOutputParameters[] Redis { get; set; }
[JsonProperty("Elasticsearch")]
public ElasticsearchOutput[] Elasticsearch { get; set; }
public ElasticsearchOutputParameters[] Elasticsearch { get; set; }
[JsonProperty("Stdout")]
public StdoutOutput[] Stdout { get; set; }
public StdoutOutputParameters[] Stdout { get; set; }
}
public class InputSources
@@ -598,19 +643,22 @@ namespace TimberWinR.Parser
public WindowsEvent[] WindowsEvents { get; set; }
[JsonProperty("Logs")]
public Log[] Logs { get; set; }
public LogParameters[] Logs { get; set; }
[JsonProperty("TailFiles")]
public TailFile[] TailFiles { get; set; }
[JsonProperty("Tcp")]
public Tcp[] Tcps { get; set; }
public TcpParameters[] Tcps { get; set; }
[JsonProperty("Udp")]
public Udp[] Udps { get; set; }
public UdpParameters[] Udps { get; set; }
[JsonProperty("IISW3CLogs")]
public IISW3CLog[] IISW3CLogs { get; set; }
public IISW3CLogParameters[] IISW3CLogs { get; set; }
[JsonProperty("W3CLogs")]
public W3CLog[] W3CLogs { get; set; }
public W3CLogParameters[] W3CLogs { get; set; }
[JsonProperty("Stdin")]
public Stdin[] Stdins { get; set; }

View File

@@ -3,10 +3,18 @@
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
Version History
### 1.3.19.1 - 03/03/2015
1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time
to handle input flooding. Default is _batch\_count_ * 10
### 1.3.19.0 - 01/12/2015
### 1.3.19.0 - 02/26/2015
1. Added support for Multiline codecs for Stdin and Logs listeners, addresses issue #23
1. Added support for Multiline codecs for Stdin and Logs listeners, closes issue [#23](https://github.com/Cimpress-MCP/TimberWinR/issues/23)
2. Added new TailFiles input type which uses a native implementation (more-efficient) than using LogParser's Log
3. Updated Udp input listner to use UTF8 Encoding rather than ASCII
4. Reduced noisy complaint about missing log files for Logs listener
5. Fixed bug when tailing non-existent log files which resulted in high cpu-usage.
6. Added feature to watch the configuration directory
### 1.3.18.0 - 12/22/2014

View File

@@ -34,6 +34,9 @@
<Reference Include="csredis">
<HintPath>..\packages\csredis.1.4.7.1\lib\net40\csredis.dll</HintPath>
</Reference>
<Reference Include="Elasticsearch.Net">
<HintPath>..\packages\Elasticsearch.Net.1.3.1\lib\Elasticsearch.Net.dll</HintPath>
</Reference>
<Reference Include="Interop.MSUtil, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<EmbedInteropTypes>False</EmbedInteropTypes>
@@ -47,6 +50,9 @@
<HintPath>..\packages\MaxMind.GeoIP2.0.4.0.0\lib\net40\MaxMind.GeoIP2.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Nest">
<HintPath>..\packages\NEST.1.3.1\lib\Nest.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Newtonsoft.Json.6.0.4\lib\net40\Newtonsoft.Json.dll</HintPath>
@@ -76,6 +82,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Codecs\Multiline.cs" />
<Compile Include="Configuration.cs" />
<Compile Include="ConfigurationErrors.cs" />
<Compile Include="Diagnostics\Diagnostics.cs" />
@@ -85,8 +92,11 @@
<Compile Include="Filters\GeoIPFilter.cs" />
<Compile Include="Filters\JsonFilter.cs" />
<Compile Include="Filters\MutateFilter.cs" />
<Compile Include="ICodec.cs" />
<Compile Include="Inputs\FieldDefinitions.cs" />
<Compile Include="Inputs\IISW3CRowReader.cs" />
<Compile Include="Inputs\LogsFileDatabase.cs" />
<Compile Include="Inputs\TailFileListener.cs" />
<Compile Include="Inputs\UdpInputListener.cs" />
<Compile Include="Inputs\W3CInputListener.cs" />
<Compile Include="Inputs\IISW3CInputListener.cs" />
@@ -126,6 +136,7 @@
<None Include="mdocs\DateFilter.md" />
<None Include="mdocs\Filters.md" />
<None Include="mdocs\GeoIPFilter.md" />
<None Include="mdocs\TailFiles.md" />
<None Include="mdocs\UdpInput.md" />
<None Include="mdocs\W3CInput.md" />
<None Include="mdocs\JsonFilter.md" />

View File

@@ -8,6 +8,10 @@ The following parameters are allowed when configuring the Codec.
| *type* | enum |Codec type 'multiline' | Must be 'multiline' | |
| *pattern* | regex |Regular expression to be matched | Must be legal .NET Regex | |
| *what* | enum |Value can be previous or next | If the pattern matched, does event belong to the next or previous event? | |
| *negate* | bool |Inverts the pattern sense | If true, a message not matching the pattern will constitute a match of the multiline filter and the what will be applied. (vice-versa is also true) | false |
| *multiline_tag* | string |Tag to be added when multiline conversion is applied | | multiline |
This codec applies to [Logs](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Logs.md) and [Stdin](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/StdinInput.md) only.
Example Input: Mutliline input log file
@@ -20,9 +24,10 @@ Example Input: Mutliline input log file
"location": "C:\\Logs1\\multiline.log",
"recurse": -1,
"codec": {
"type": "multiline",
"pattern": "(^.+Exception: .+)|(^\\s+at .+)|(^\\s+... \\d+ more)|(^\\s*Caused by:.+)",
"what": "previous"
"negate": false,
"type": "multiline",
"pattern": "(^.+Exception: .+)|(^\\s+at .+)|(^\\s+... \\d+ more)|(^\\s*Caused by:.+)",
"what": "previous"
}
}
}

View File

@@ -7,11 +7,15 @@ The following parameters are allowed when configuring the Redis output.
| Parameter | Type | Description | Details | Default |
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *index* | string | The index name to use | index used/created | logstash-yyyy.dd.mm |
| *host* | [string] | The hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
| *port* | integer | Redis port number | This port must be open | 9200 |
| *flush_size* | integer | Maximum number of messages before flushing | | 50000 |
| *host* | [string] | Array of hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
| *idle_flush_time* | integer | Maximum number of seconds elapsed before triggering a flush | | 10 |
| *index* | [string] | The index name to use | index used/created | logstash-yyyy.dd.mm |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *max_queue_size* | integer | Maximum Elasticsearch queue depth | | 50000 |
| *port* | integer | Elasticsearch port number | This port must be open | 9200 |
| *queue_overflow_discard_oldest* | bool | If true, discard oldest messages when max_queue_size reached otherwise discard newest | | true |
| *threads* | [string] | Number of Threads | Number of worker threads processing messages | 1 |
### Index parameter
If you want to output your data everyday to a new index, use following index format: "index-%{yyyy.MM.dd}". Here date format could be any forwat which you need.
@@ -26,7 +30,7 @@ Example Input:
"threads": 1,
"interval": 5000,
"host": [
"tstlexiceapp006.vistaprint.svc"
"tstlexiceapp006.mycompany.svc"
]
}
]

View File

@@ -61,7 +61,7 @@ The resulting output would be:
```
{
"type": "Win32-FileLog",
"ComputerName": "dev.vistaprint.net",
"ComputerName": "dev.mycompany.net",
"Text": "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}",
"stuff": {
"Email": "james@example.com",

View File

@@ -8,6 +8,7 @@ The following parameters are allowed when configuring WindowsEvents.
| Parameter | Type | Description | Details | Default |
| :---------------- |:---------------| :----------------------------------------------------------------------- | :--------------------------- | :-- |
| *location* | string |Location of file(s) to monitor | Path to text file(s) including wildcards. | |
| *logSource* | string |Source name | Used for conditions | |
| *recurse* | integer |Max subdirectory recursion level. | 0 disables subdirectory recursion; -1 enables unlimited recursion. | 0 |
| *splitLongLines* | boolean |Behavior when event messages or event category names cannot be resolved. |When a text line is longer than 128K characters, the format truncates the line and either discards the remaining of the line (when this parameter is set to "false"), or processes the remainder of the line as a new line (when this parameter is set to "true").| false |
| *iCodepage* | integer |Codepage of the text file. | 0 is the system codepage, -1 is UNICODE. | 0 |
@@ -21,6 +22,7 @@ Example Input: Monitors all files (recursively) located at C:\Logs1\ matching *.
"Inputs": {
"Logs": [
{
"logSource": "log files",
"location": "C:\\Logs1\\*.log",
"recurse": -1
}

View File

@@ -8,6 +8,8 @@ The following parameters are allowed when configuring the Redis output.
| Parameter | Type | Description | Details | Default |
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
| *batch_count* | integer | Sent as a single message | Number of messages to aggregate | 10 |
| *max_batch_count* | integer | Dynamically adjusted count maximum | Increases over time | batch_count*10 |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *index* | string | The name of the redis list | logstash index name | logstash |
| *host* | [string] | The hostname(s) of your Redis server(s) | IP or DNS name | |
@@ -26,7 +28,7 @@ Example Input:
"interval": 5000,
"batch_count": 500,
"host": [
"tstlexiceapp006.vistaprint.svc"
"tstlexiceapp006.mycompany.svc"
]
}
]

View File

@@ -3,7 +3,12 @@
The Stdin Input will read from the console (Console.ReadLine) and build a simple message for testing.
## Parameters
There are no Parameters at this time.
The following parameters are allowed when configuring WindowsEvents.
| Parameter | Type | Description | Details | Default |
| :---------------- |:---------------| :----------------------------------------------------------------------- | :--------------------------- | :-- |
| [codec](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md) | object | Codec to use |
```json
{
@@ -26,5 +31,3 @@ A field: "type": "Win32-Stdin" is automatically appended, and the entire JSON is
| ---- |:-----| :-----------------------------------------------------------------------|
| type | STRING |Win32-Stdin |
| message | STRING | The message typed in |
| [codec](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md) | object | Codec to use |

View File

@@ -0,0 +1,41 @@
# Input: TailFiles
The TailFiles input will monitor a log (text) file similar to how a Linux "tail -f" command works. This uses
a native implementation rather than uses LogParser
## Parameters
The following parameters are allowed when configuring WindowsEvents.
| Parameter | Type | Description | Details | Default |
| :---------------- |:---------------| :----------------------------------------------------------------------- | :--------------------------- | :-- |
| *location* | string |Location of file(s) to monitor | Path to text file(s) including wildcards. | |
| *logSource* | string |Source name | Used for conditions | |
| *recurse* | integer |Max subdirectory recursion level. | 0 disables subdirectory recursion; -1 enables unlimited recursion. | 0 |
| *interval* | integer |Polling interval in seconds | Defaults every 60 seconds | 60 |
| [codec](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md) | object | Codec to use |
Example Input: Monitors all files (recursively) located at C:\Logs1\ matching *.log as a pattern. I.e. C:\Logs1\foo.log, C:\Logs1\Subdir\Log2.log, etc.
```json
{
"TimberWinR": {
"Inputs": {
"TailFiles": [
{
"logSource": "log files",
"location": "C:\\Logs1\\*.log",
"recurse": -1
}
]
}
}
}
```
## Fields
After a successful parse of an event, the following fields are added:
| Name | Type | Description |
| ---- |:-----| :-----------|
| LogFilename | STRING |Full path of the file containing this line |
| Index | INTEGER | Line number |
| Text | STRING | Text line content |

View File

@@ -1,8 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="csredis" version="1.4.7.1" targetFramework="net40" />
<package id="Elasticsearch.Net" version="1.3.1" targetFramework="net40" />
<package id="MaxMind.Db" version="0.2.3.0" targetFramework="net40" />
<package id="MaxMind.GeoIP2" version="0.4.0.0" targetFramework="net40" />
<package id="NEST" version="1.3.1" targetFramework="net40" />
<package id="Newtonsoft.Json" version="6.0.4" targetFramework="net40" />
<package id="NLog" version="3.1.0.0" targetFramework="net40" />
<package id="RapidRegex.Core" version="1.0.0.2" targetFramework="net40" />

View File

@@ -0,0 +1,8 @@
$packageName = 'TimberWinR-${version}' # arbitrary name for the package, used in messages
$installerType = 'msi' #only one of these: exe, msi, msu
$url = 'http://www.ericfontana.com/TimberWinR/TimberWinR-${version}.0.msi' # download url
$silentArgs = '${PROJECTGUID} /quiet'
$validExitCodes = @(0) #please insert other valid exit codes here, exit codes for ms http://msdn.microsoft.com/en-us/library/aa368542(VS.85).aspx
UnInstall-ChocolateyPackage "$packageName" "$installerType" "$silentArgs" "$url" -validExitCodes $validExitCodes

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

BIN
packages/NEST.1.3.1/NEST.1.3.1.nupkg vendored Normal file

Binary file not shown.

12447
packages/NEST.1.3.1/lib/Nest.XML vendored Normal file

File diff suppressed because it is too large Load Diff

BIN
packages/NEST.1.3.1/lib/Nest.dll vendored Normal file

Binary file not shown.

BIN
packages/NEST.1.3.1/lib/Nest.pdb vendored Normal file

Binary file not shown.