Added ability to ship directly to elasticsearch.

This commit is contained in:
Eric Fontana
2014-08-04 08:25:28 -04:00
parent af7e240c3d
commit 693c74d8d0
25 changed files with 17648 additions and 12 deletions

View File

@@ -1,10 +1,11 @@
TimberWinR
==========
A Native Windows to Redis Logstash Agent which runs as a service.
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
## Why have TimberWinR?
TimberWinR is a native .NET implementation utilizing Microsoft's [LogParser](http://technet.microsoft.com/en-us/scriptcenter/dd919274.aspx). This means
no JVM/JRuby is required, and LogParser does all the heavy lifting. TimberWinR collects
the data from LogParser and ships it to Logstash via Redis.
the data from LogParser and ships it to Logstash via Redis (or can ship direcly to Elasticsearch)
## Basics
TimberWinR uses a configuration file to control how the logs are collected, filtered and shipped off.
@@ -33,6 +34,7 @@ represented as a JSON Property or Array.
## Supported Output Formats
1. [Redis](https://github.com/efontana/TimberWinR/blob/master/TimberWinR/mdocs/RedisOutput.md)
1. [Elasticsearch](https://github.com/efontana/TimberWinR/blob/master/TimberWinR/mdocs/ElasticsearchOutput.md)
## Sample Configuration
TimberWinR reads a JSON configuration file, an example file is shown here:

View File

@@ -63,7 +63,7 @@
},
{
"date": {
"condition": "json[\"timestamp\"] != null && [type] == \"Win32-FileLog\"",
"condition": "[type] == \"Win32-FileLog\"",
"match": [
"timestamp",
"MMM d HH:mm:sss",
@@ -78,9 +78,6 @@
"_comment": "Orion Rules",
"rename": [
"ComputerName", "Host",
"host", "Host",
"message","Message",
"type","Type",
"SID", "Username"
]
}
@@ -96,6 +93,15 @@
"tstlexiceapp006.vistaprint.svc"
]
}
],
"Elasticsearch": [
{
"threads": 1,
"interval": 5000,
"host": [
"tstlexiceapp003.vistaprint.svc"
]
}
]
}
}

View File

@@ -39,6 +39,13 @@ namespace TimberWinR
}
private List<ElasticsearchOutput> _elasticsearchOutputs = new List<ElasticsearchOutput>();
public IEnumerable<ElasticsearchOutput> ElasticsearchOutputs
{
get { return _elasticsearchOutputs; }
}
private List<Tcp> _tcps = new List<Tcp>();
public IEnumerable<Tcp> Tcps
{
@@ -111,7 +118,14 @@ namespace TimberWinR
}
if (x.TimberWinR.Outputs != null)
c._redisOutputs = x.TimberWinR.Outputs.Redis.ToList();
{
if (x.TimberWinR.Outputs.Redis != null)
c._redisOutputs = x.TimberWinR.Outputs.Redis.ToList();
if (x.TimberWinR.Outputs.Elasticsearch != null)
c._elasticsearchOutputs = x.TimberWinR.Outputs.Elasticsearch.ToList();
}
if (x.TimberWinR.Filters != null)
c._filters = x.TimberWinR.AllFilters.ToList();
@@ -146,6 +160,7 @@ namespace TimberWinR
_iisw3clogs = new List<IISW3CLog>();
_logs = new List<Log>();
_redisOutputs = new List<RedisOutput>();
_elasticsearchOutputs = new List<ElasticsearchOutput>();
_tcps = new List<Tcp>();
}

View File

@@ -37,6 +37,7 @@ namespace TimberWinR.Inputs
string msg = ToPrintable(line);
JObject jo = new JObject();
jo["message"] = msg;
AddDefaultFields(jo);
ProcessJson(jo);
}
else

View File

@@ -74,6 +74,15 @@ namespace TimberWinR
var redis = new RedisOutput(this, ro, cancelToken);
Outputs.Add(redis);
}
}
if (Config.ElasticsearchOutputs != null)
{
foreach (var ro in Config.ElasticsearchOutputs)
{
var els = new ElasticsearchOutput(this, ro, cancelToken);
Outputs.Add(els);
}
}
foreach (Parser.IISW3CLog iisw3cConfig in Config.IISW3C)

View File

@@ -0,0 +1,178 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using NLog;
using RestSharp;
namespace TimberWinR.Outputs
{
public partial class ElasticsearchOutput : OutputSender
{
private TimberWinR.Manager _manager;
private readonly int _port;
private readonly int _interval;
private readonly string[] _host;
private readonly string _protocol;
private readonly string _index;
private int _hostIndex;
private readonly int _timeout;
private readonly object _locker = new object();
private readonly List<JObject> _jsonQueue;
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken)
: base(cancelToken)
{
_protocol = eo.Protocol;
_timeout = eo.Timeout;
_manager = manager;
_port = eo.Port;
_host = eo.Host;
_index = eo.Index;
_hostIndex = 0;
_jsonQueue = new List<JObject>();
for (int i = 0; i < eo.NumThreads; i++)
{
var elsThread = new Task(ElasticsearchSender, cancelToken);
elsThread.Start();
}
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
private void ElasticsearchSender()
{
while (!CancelToken.IsCancellationRequested)
{
JObject[] messages;
lock (_locker)
{
messages = _jsonQueue.Take(1).ToArray();
_jsonQueue.RemoveRange(0, messages.Length);
}
if (messages.Length > 0)
{
int numHosts = _host.Length;
while (numHosts-- > 0)
{
try
{
// Get the next client
RestClient client = getClient();
if (client != null)
{
LogManager.GetCurrentClassLogger()
.Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl);
foreach (JObject json in messages)
{
string typeName = "Win32-Elasticsearch";
if (json["type"] != null)
typeName = json["type"].ToString();
string indexName = _index;
if (string.IsNullOrEmpty(indexName))
{
DateTime now = DateTime.UtcNow;
indexName = string.Format("logstash-{0}", DateTime.UtcNow.ToString("yyyy.MM.dd"));
}
var req = new RestRequest(string.Format("/{0}/{1}/", indexName, typeName), Method.POST);
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);
req.RequestFormat = DataFormat.Json;
try
{
client.ExecuteAsync(req, response =>
{
if (response.StatusCode != HttpStatusCode.Created)
{
LogManager.GetCurrentClassLogger()
.Error("Failed to send: {0}", response.ErrorMessage);
}
});
}
catch (Exception error)
{
LogManager.GetCurrentClassLogger().Error(error);
}
}
}
else
{
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Elasticsearch hosts, {0}",
String.Join(",", _host));
}
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
System.Threading.Thread.Sleep(_interval);
}
}
private RestClient getClient()
{
if (_hostIndex >= _host.Length)
_hostIndex = 0;
int numTries = 0;
while (numTries < _host.Length)
{
try
{
string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":",""), _host[_hostIndex], _port);
var client = new RestClient(url);
client.Timeout = _timeout;
_hostIndex++;
if (_hostIndex >= _host.Length)
_hostIndex = 0;
return client;
}
catch (Exception)
{
}
numTries++;
}
return null;
}
protected override void MessageReceivedHandler(Newtonsoft.Json.Linq.JObject jsonMessage)
{
if (_manager.Config.Filters != null)
ApplyFilters(jsonMessage);
var message = jsonMessage.ToString();
LogManager.GetCurrentClassLogger().Debug(message);
lock (_locker)
{
_jsonQueue.Add(jsonMessage);
}
}
private void ApplyFilters(JObject json)
{
foreach (var filter in _manager.Config.Filters)
{
filter.Apply(json);
}
}
}
}

View File

@@ -81,7 +81,6 @@ namespace TimberWinR.Outputs
}
}
/// <summary>
/// Forward on Json message to Redis Logstash queue
/// </summary>

View File

@@ -392,7 +392,33 @@ namespace TimberWinR.Parser
}
}
public partial class RedisOutput
public class ElasticsearchOutput
{
[JsonProperty(PropertyName = "host")]
public string[] Host { get; set; }
[JsonProperty(PropertyName = "index")]
public string Index { get; set; }
[JsonProperty(PropertyName = "port")]
public int Port { get; set; }
[JsonProperty(PropertyName = "timeout")]
public int Timeout { get; set; }
[JsonProperty(PropertyName = "threads")]
public int NumThreads { get; set; }
[JsonProperty(PropertyName = "protocol")]
public string Protocol { get; set; }
public ElasticsearchOutput()
{
Protocol = "http";
Port = 9200;
Index = "";
Host = new string[] { "localhost" };
Timeout = 10000;
NumThreads = 1;
}
}
public class RedisOutput
{
[JsonProperty(PropertyName = "host")]
public string[] Host { get; set; }
@@ -425,6 +451,9 @@ namespace TimberWinR.Parser
{
[JsonProperty("Redis")]
public RedisOutput[] Redis { get; set; }
[JsonProperty("Elasticsearch")]
public ElasticsearchOutput[] Elasticsearch { get; set; }
}
public class InputSources

View File

@@ -49,6 +49,9 @@
<Reference Include="RapidRegex.Core">
<HintPath>..\packages\RapidRegex.Core.1.0.0.2\lib\net40\RapidRegex.Core.dll</HintPath>
</Reference>
<Reference Include="RestSharp">
<HintPath>..\packages\RestSharp.104.4.0\lib\net4\RestSharp.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
@@ -78,6 +81,7 @@
<Compile Include="Inputs\LogsListener.cs" />
<Compile Include="Inputs\WindowsEvtInputListener.cs" />
<Compile Include="Manager.cs" />
<Compile Include="Outputs\Elasticsearch.cs" />
<Compile Include="Outputs\OutputSender.cs" />
<Compile Include="Outputs\Redis.cs" />
<Compile Include="Parser.cs" />

View File

@@ -0,0 +1,33 @@
# Output: Elasticsearch
The Elasticsearch output passes on data directly to Elasticsearch.
## Parameters
The following parameters are allowed when configuring the Redis output.
| Parameter | Type | Description | Details | Default |
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *index* | string | The index name to use | index used/created | logstash-yyyy.dd.mm |
| *host* | [string] | The hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
| *port* | integer | Redis port number | This port must be open | 6379 |
Example Input:
```json
{
"TimberWinR": {
"Outputs": {
"Elasticsearch": [
{
"threads": 1,
"interval": 5000,
"host": [
"tstlexiceapp006.vistaprint.svc"
]
}
]
}
}
}
```

View File

@@ -4,4 +4,5 @@
<package id="Newtonsoft.Json" version="6.0.3" targetFramework="net40" />
<package id="NLog" version="3.1.0.0" targetFramework="net40" />
<package id="RapidRegex.Core" version="1.0.0.2" targetFramework="net40" />
<package id="RestSharp" version="104.4.0" targetFramework="net40" />
</packages>

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

22
packages/RestSharp.104.4.0/readme.txt vendored Normal file
View File

@@ -0,0 +1,22 @@
*** IMPORTANT CHANGE IN RESTSHARP VERSION 103 ***
In 103.0, JSON.NET was removed as a dependency.
If this is still installed in your project and no other libraries depend on
it you may remove it from your installed packages.
There is one breaking change: the default Json*Serializer* is no longer
compatible with Json.NET. To use Json.NET for serialization, copy the code
from https://github.com/restsharp/RestSharp/blob/86b31f9adf049d7fb821de8279154f41a17b36f7/RestSharp/Serializers/JsonSerializer.cs
and register it with your client:
var client = new RestClient();
client.JsonSerializer = new YourCustomSerializer();
The default Json*Deserializer* is mostly compatible, but it does not support
all features which Json.NET has (like the ability to support a custom [JsonConverter]
by decorating a certain property with an attribute). If you need these features, you
must take care of the deserialization yourself to get it working.
If you run into any compatibility issues with deserialization,
please report it to http://groups.google.com/group/restsharp