Merge pull request #33 from Cimpress-MCP/bulk_elasticsearch

Re-factored to use Elasticsearch Bulk API
This commit is contained in:
Eric Fontana
2015-03-11 11:58:28 -04:00
15 changed files with 54109 additions and 118 deletions

View File

@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.3.19.1")]
[assembly: AssemblyFileVersion("1.3.19.1")]
[assembly: AssemblyVersion("1.3.20.0")]
[assembly: AssemblyFileVersion("1.3.20.0")]

View File

@@ -13,10 +13,10 @@ using System.Threading;
namespace TimberWinR.UnitTests
{
// [TestFixture]
[TestFixture]
public class TestDynamicBatchCount
{
[Test]
// [Test]
public void TestDynamicBatch()
{
var mgr = new Manager();

View File

@@ -2,52 +2,91 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Elasticsearch.Net.ConnectionPool;
using Nest;
using Newtonsoft.Json.Linq;
using NLog;
using RapidRegex.Core;
using RestSharp;
using System.Text.RegularExpressions;
using Elasticsearch.Net.Serialization;
using Newtonsoft.Json;
namespace TimberWinR.Outputs
{
using System.Text.RegularExpressions;
public class Person
{
public string Firstname { get; set; }
public string Lastname { get; set; }
}
public partial class ElasticsearchOutput : OutputSender
{
private TimberWinR.Manager _manager;
private readonly int _port;
private readonly int _interval;
private readonly string[] _host;
private readonly string _protocol;
private int _hostIndex;
private readonly int _flushSize;
private readonly int _idleFlushTimeSeconds;
private readonly string[] _hosts;
private readonly string _protocol;
private readonly int _timeout;
private readonly object _locker = new object();
private readonly List<JObject> _jsonQueue;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private Parser.ElasticsearchOutputParameters eo;
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
private Parser.ElasticsearchOutputParameters _parameters;
public bool Stop { get; set; }
/// <summary>
/// Get the bulk connection pool of hosts
/// </summary>
/// <returns></returns>
private ElasticClient getClient()
{
var nodes = new List<Uri>();
foreach (var host in _hosts)
{
var url = string.Format("http://{0}:{1}", host, _port);
nodes.Add(new Uri(url));
}
var pool = new StaticConnectionPool(nodes.ToArray());
var settings = new ConnectionSettings(pool)
.ExposeRawResponse();
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutputParameters eo, CancellationToken cancelToken)
var client = new ElasticClient(settings);
return client;
}
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutputParameters parameters, CancellationToken cancelToken)
: base(cancelToken, "Elasticsearch")
{
_sentMessages = 0;
_errorCount = 0;
this.eo = eo;
_protocol = eo.Protocol;
_timeout = eo.Timeout;
_parameters = parameters;
_flushSize = parameters.FlushSize;
_idleFlushTimeSeconds = parameters.IdleFlushTimeInSeconds;
_protocol = parameters.Protocol;
_timeout = parameters.Timeout;
_manager = manager;
_port = eo.Port;
_interval = eo.Interval;
_host = eo.Host;
_hostIndex = 0;
_port = parameters.Port;
_interval = parameters.Interval;
_hosts = parameters.Host;
_jsonQueue = new List<JObject>();
_numThreads = eo.NumThreads;
_numThreads = parameters.NumThreads;
_maxQueueSize = parameters.MaxQueueSize;
_queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest;
for (int i = 0; i < eo.NumThreads; i++)
for (int i = 0; i < parameters.NumThreads; i++)
{
Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
@@ -58,16 +97,20 @@ namespace TimberWinR.Outputs
JObject json = new JObject(
new JProperty("elasticsearch",
new JObject(
new JProperty("host", string.Join(",", _host)),
new JProperty("host", string.Join(",", _hosts)),
new JProperty("errors", _errorCount),
new JProperty("sentMmessageCount", _sentMessages),
new JProperty("queuedMessageCount", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("flushSize", _flushSize),
new JProperty("idleFlushTime", _idleFlushTimeSeconds),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("maxQueueSize", _maxQueueSize),
new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest),
new JProperty("hosts",
new JArray(
from h in _host
from h in _hosts
select new JObject(
new JProperty("host", h)))))));
return json;
@@ -77,137 +120,145 @@ namespace TimberWinR.Outputs
//
private void ElasticsearchSender()
{
// Force an inital flush
DateTime lastFlushTime = DateTime.MinValue;
using (var syncHandle = new ManualResetEventSlim())
{
{
// Execute the query
while (!Stop)
{
{
if (!CancelToken.IsCancellationRequested)
{
try
{
JObject[] messages;
{
int messageCount = 0;
List<JObject> messages = new List<JObject>();
// Lets get whats in the queue
lock (_locker)
{
var count = _jsonQueue.Count;
messages = _jsonQueue.Take(count).ToArray();
_jsonQueue.RemoveRange(0, count);
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
messageCount = _jsonQueue.Count;
// Time to flush?
if (messageCount >= _flushSize || (DateTime.UtcNow - lastFlushTime).Seconds >= _idleFlushTimeSeconds)
{
messages = _jsonQueue.Take(messageCount).ToList();
_jsonQueue.RemoveRange(0, messageCount);
if (messages.Count > 0)
_manager.IncrementMessageCount(messages.Count);
}
}
if (messages.Length > 0)
// We have some messages to work with
if (messages.Count > 0)
{
int numHosts = _host.Length;
while (numHosts-- > 0)
var client = getClient();
LogManager.GetCurrentClassLogger()
.Debug("Sending {0} Messages to {1}", messages.Count, string.Join(",", _hosts));
// This loop will process all messages we've taken from the queue
// that have the same index and type (an elasticsearch requirement)
do
{
try
{
// Get the next client
RestClient client = getClient();
if (client != null)
{
LogManager.GetCurrentClassLogger()
.Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl);
// Grab all messages with same index and type (this is the whole point, group the same ones)
var bulkTypeName = this._parameters.GetTypeName(messages[0]);
var bulkIndexName = this._parameters.GetIndexName(messages[0]);
foreach (JObject json in messages)
{
var typeName = this.eo.GetTypeName(json);
var indexName = this.eo.GetIndexName(json);
var req =
new RestRequest(string.Format("/{0}/{1}/", indexName, typeName),
Method.POST);
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);
req.RequestFormat = DataFormat.Json;
try
{
client.ExecuteAsync(req, response =>
{
if (response.StatusCode != HttpStatusCode.Created)
{
LogManager.GetCurrentClassLogger()
.Error("Failed to send: {0}, code: {1}, descr: {2}, resp: {3}", response.ErrorMessage, response.StatusCode, response.StatusDescription, response.ResponseStatus);
Interlocked.Increment(ref _errorCount);
}
else
{
_sentMessages++;
GC.Collect();
}
});
}
catch (Exception error)
{
LogManager.GetCurrentClassLogger().Error(error);
Interlocked.Increment(ref _errorCount);
}
}
GC.Collect();
}
else
{
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Elasticsearch hosts, {0}",
String.Join(",", _host));
Interlocked.Increment(ref _errorCount);
}
IEnumerable<JObject> bulkItems =
messages.TakeWhile(
message =>
String.Compare(bulkTypeName, _parameters.GetTypeName(message), false) == 0 &&
String.Compare(bulkIndexName, _parameters.GetIndexName(message), false) == 0);
// Send the message(s), if the are successfully sent, they
// are removed from the queue
lastFlushTime = transmitBulkData(bulkItems, bulkIndexName, bulkTypeName, client, lastFlushTime, messages);
GC.Collect();
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
break;
}
}
} while (messages.Count > 0);
}
GC.Collect();
if (!Stop)
{
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
{
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception)
catch (Exception ex)
{
throw;
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
}
}
private RestClient getClient()
//
// Send the messages to Elasticsearch (bulk)
//
private DateTime transmitBulkData(IEnumerable<JObject> bulkItems, string bulkIndexName, string bulkTypeName,
ElasticClient client, DateTime lastFlushTime, List<JObject> messages)
{
if (_hostIndex >= _host.Length)
_hostIndex = 0;
int numTries = 0;
while (numTries < _host.Length)
var bulkRequest = new BulkRequest() {Refresh = true};
bulkRequest.Operations = new List<IBulkOperation>();
foreach (var json in bulkItems)
{
try
{
string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":", ""), _host[_hostIndex], _port);
var client = new RestClient(url);
client.Timeout = _timeout;
_hostIndex++;
if (_hostIndex >= _host.Length)
_hostIndex = 0;
return client;
}
catch (Exception)
{
}
numTries++;
// ES requires a timestamp, add one if not present
var ts = json["@timestamp"];
if (ts == null)
json["@timestamp"] = DateTime.UtcNow;
var bi = new BulkIndexOperation<JObject>(json);
bi.Index = bulkIndexName;
bi.Type = bulkTypeName;
bulkRequest.Operations.Add(bi);
}
return null;
// The total messages processed for this operation.
int numMessages = bulkItems.Count();
var response = client.Bulk(bulkRequest);
if (!response.IsValid)
{
LogManager.GetCurrentClassLogger().Error("Failed to send: {0}", response);
Interlocked.Increment(ref _errorCount);
interlockedInsert(messages); // Put the messages back into the queue
}
else // Success!
{
lastFlushTime = DateTime.UtcNow;
LogManager.GetCurrentClassLogger()
.Info("Successfully sent {0} messages in a single bulk request", numMessages);
Interlocked.Add(ref _sentMessages, numMessages);
}
// Remove them from the working list
messages.RemoveRange(0, numMessages);
return lastFlushTime;
}
// Places messages back into the queue (for a future attempt)
private void interlockedInsert(List<JObject> messages)
{
lock (_locker)
{
_jsonQueue.InsertRange(0, messages);
if (_jsonQueue.Count > _maxQueueSize)
{
LogManager.GetCurrentClassLogger().Warn("Exceeded maximum queue depth");
}
}
}
@@ -221,6 +272,26 @@ namespace TimberWinR.Outputs
lock (_locker)
{
if (_jsonQueue.Count >= _maxQueueSize)
{
// If we've exceeded our queue size, and we're supposed to throw out the oldest objects first,
// then remove as many as necessary to get us under our limit
if (_queueOverflowDiscardOldest)
{
LogManager.GetCurrentClassLogger()
.Warn("Overflow discarding oldest {0} messages", _jsonQueue.Count - _maxQueueSize + 1);
_jsonQueue.RemoveRange(0, (_jsonQueue.Count - _maxQueueSize) + 1);
}
// Otherwise we're in a "discard newest" mode, and this is the newest message, so just ignore it
else
{
LogManager.GetCurrentClassLogger()
.Warn("Overflow discarding newest message: {0}", message);
return;
}
}
_jsonQueue.Add(jsonMessage);
}
}

View File

@@ -512,9 +512,19 @@ namespace TimberWinR.Parser
public string Protocol { get; set; }
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
[JsonProperty(PropertyName = "flush_size")]
public int FlushSize { get; set; }
[JsonProperty(PropertyName = "idle_flush_time")]
public int IdleFlushTimeInSeconds { get; set; }
[JsonProperty(PropertyName = "max_queue_size")]
public int MaxQueueSize { get; set; }
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
public bool QueueOverflowDiscardOldest { get; set; }
public ElasticsearchOutputParameters()
{
FlushSize = 5000;
IdleFlushTimeInSeconds = 10;
Protocol = "http";
Port = 9200;
Index = "";
@@ -522,6 +532,8 @@ namespace TimberWinR.Parser
Timeout = 10000;
NumThreads = 1;
Interval = 1000;
QueueOverflowDiscardOldest = true;
MaxQueueSize = 50000;
}
public string GetIndexName(JObject json)

View File

@@ -34,6 +34,9 @@
<Reference Include="csredis">
<HintPath>..\packages\csredis.1.4.7.1\lib\net40\csredis.dll</HintPath>
</Reference>
<Reference Include="Elasticsearch.Net">
<HintPath>..\packages\Elasticsearch.Net.1.3.1\lib\Elasticsearch.Net.dll</HintPath>
</Reference>
<Reference Include="Interop.MSUtil, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<EmbedInteropTypes>False</EmbedInteropTypes>
@@ -47,6 +50,9 @@
<HintPath>..\packages\MaxMind.GeoIP2.0.4.0.0\lib\net40\MaxMind.GeoIP2.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Nest">
<HintPath>..\packages\NEST.1.3.1\lib\Nest.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Newtonsoft.Json.6.0.4\lib\net40\Newtonsoft.Json.dll</HintPath>

View File

@@ -7,11 +7,15 @@ The following parameters are allowed when configuring the Redis output.
| Parameter | Type | Description | Details | Default |
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *index* | string | The index name to use | index used/created | logstash-yyyy.dd.mm |
| *host* | [string] | The hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
| *port* | integer | Redis port number | This port must be open | 9200 |
| *flush_size* | integer | Maximum number of messages before flushing | | 50000 |
| *host* | [string] | Array of hostname(s) of your Elasticsearch server(s) | IP or DNS name | |
| *idle_flush_time* | integer | Maximum number of seconds elapsed before triggering a flush | | 10 |
| *index* | [string] | The index name to use | index used/created | logstash-yyyy.dd.mm |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *max_queue_size* | integer | Maximum Elasticsearch queue depth | | 50000 |
| *port* | integer | Elasticsearch port number | This port must be open | 9200 |
| *queue_overflow_discard_oldest* | bool | If true, discard oldest messages when max_queue_size reached otherwise discard newest | | true |
| *threads* | [string] | Number of Threads | Number of worker threads processing messages | 1 |
### Index parameter
If you want to output your data everyday to a new index, use following index format: "index-%{yyyy.MM.dd}". Here date format could be any forwat which you need.

View File

@@ -1,8 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="csredis" version="1.4.7.1" targetFramework="net40" />
<package id="Elasticsearch.Net" version="1.3.1" targetFramework="net40" />
<package id="MaxMind.Db" version="0.2.3.0" targetFramework="net40" />
<package id="MaxMind.GeoIP2" version="0.4.0.0" targetFramework="net40" />
<package id="NEST" version="1.3.1" targetFramework="net40" />
<package id="Newtonsoft.Json" version="6.0.4" targetFramework="net40" />
<package id="NLog" version="3.1.0.0" targetFramework="net40" />
<package id="RapidRegex.Core" version="1.0.0.2" targetFramework="net40" />

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

BIN
packages/NEST.1.3.1/NEST.1.3.1.nupkg vendored Normal file

Binary file not shown.

12447
packages/NEST.1.3.1/lib/Nest.XML vendored Normal file

File diff suppressed because it is too large Load Diff

BIN
packages/NEST.1.3.1/lib/Nest.dll vendored Normal file

Binary file not shown.

BIN
packages/NEST.1.3.1/lib/Nest.pdb vendored Normal file

Binary file not shown.