Added new max_batch_count parameter to mitigate flooding when large bursts occur. Essentially the incoming rate could exceed the outgoing rate and this will mitigate this condition.

This commit is contained in:
Eric Fontana
2015-03-03 09:55:08 -05:00
parent 2e28a50222
commit cdc2d09150
5 changed files with 95 additions and 18 deletions

View File

@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.3.19.0")]
[assembly: AssemblyFileVersion("1.3.19.0")]
[assembly: AssemblyVersion("1.3.20.0")]
[assembly: AssemblyFileVersion("1.3.20.0")]

View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Eventing.Reader;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Sockets;
@@ -24,25 +25,28 @@ namespace TimberWinR.Outputs
private readonly int _timeout;
private readonly object _locker = new object();
private readonly List<string> _jsonQueue;
// readonly Task _consumerTask;
private readonly string[] _redisHosts;
private int _redisHostIndex;
private TimberWinR.Manager _manager;
private readonly int _batchCount;
private int _currentBatchCount;
private readonly int _maxBatchCount;
private readonly int _interval;
private readonly int _numThreads;
private readonly long[] _sampleQueueDepths;
private int _sampleCountIndex;
private long _sentMessages;
private long _errorCount;
private long _redisDepth;
private int _maxQueueSize;
private bool _queueOverflowDiscardOldest;
private DateTime? _lastErrorTime;
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
public bool Stop { get; set; }
/// <summary>
/// Get the next client
/// Get the next client from the list of hosts.
/// </summary>
/// <returns></returns>
private RedisClient getClient()
@@ -79,6 +83,7 @@ namespace TimberWinR.Outputs
new JObject(
new JProperty("host", string.Join(",", _redisHosts)),
new JProperty("errors", _errorCount),
new JProperty("lastErrorTime", _lastErrorTime),
new JProperty("redis_depth", _redisDepth),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
@@ -88,6 +93,10 @@ namespace TimberWinR.Outputs
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("batchcount", _batchCount),
new JProperty("currentBatchCount", _currentBatchCount),
new JProperty("maxBatchCount", _maxBatchCount),
new JProperty("averageQueueDepth", AverageQueueDepth()),
new JProperty("queueSamples", new JArray(_sampleQueueDepths)),
new JProperty("index", _logstashIndexName),
new JProperty("hosts",
new JArray(
@@ -100,8 +109,16 @@ namespace TimberWinR.Outputs
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
: base(cancelToken, "Redis")
{
// Last QUEUE_SAMPLE_SIZE queue samples (spans timestamp * 10)
_sampleQueueDepths = new long[QUEUE_SAMPLE_SIZE];
_sampleCountIndex = 0;
_redisDepth = 0;
_batchCount = ro.BatchCount;
_maxBatchCount = ro.MaxBatchCount;
// Make sure maxBatchCount is larger than batchCount
if (_maxBatchCount < _batchCount)
_maxBatchCount = _batchCount*10;
_currentBatchCount = _batchCount;
_manager = manager;
_redisHostIndex = 0;
_redisHosts = ro.Host;
@@ -112,6 +129,7 @@ namespace TimberWinR.Outputs
_interval = ro.Interval;
_numThreads = ro.NumThreads;
_errorCount = 0;
_lastErrorTime = null;
_maxQueueSize = ro.MaxQueueSize;
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
@@ -183,6 +201,18 @@ namespace TimberWinR.Outputs
return drop;
}
//
// What is average queue depth?
//
private int AverageQueueDepth()
{
lock(_locker)
{
return (int)_sampleQueueDepths.Average();
}
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
@@ -198,10 +228,16 @@ namespace TimberWinR.Outputs
try
{
string[] messages;
// Exclusively
lock (_locker)
{
messages = _jsonQueue.Take(_batchCount).ToArray();
// Take a sample of the queue depth
if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE)
_sampleCountIndex = 0;
_sampleQueueDepths[_sampleCountIndex++] = _jsonQueue.Count;
messages = _jsonQueue.Take(_currentBatchCount).ToArray();
_jsonQueue.RemoveRange(0, messages.Length);
var remainingCount = _jsonQueue.Count;
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
}
@@ -209,6 +245,7 @@ namespace TimberWinR.Outputs
if (messages.Length > 0)
{
int numHosts = _redisHosts.Length;
bool sentSuccessfully = false;
while (numHosts-- > 0)
{
try
@@ -226,15 +263,20 @@ namespace TimberWinR.Outputs
{
_redisDepth = client.RPush(_logstashIndexName, messages);
_sentMessages += messages.Length;
client.EndPipe();
sentSuccessfully = true;
}
catch (SocketException ex)
{
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTime = DateTime.UtcNow;
}
finally
catch (Exception ex)
{
client.EndPipe();
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTime = DateTime.UtcNow;
}
break;
}
@@ -244,6 +286,7 @@ namespace TimberWinR.Outputs
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Redis hosts, {0}",
String.Join(",", _redisHosts));
_lastErrorTime = DateTime.UtcNow;
}
}
}
@@ -251,6 +294,18 @@ namespace TimberWinR.Outputs
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTime = DateTime.UtcNow;
}
} // No more hosts to try.
// Re-compute current batch size
updateCurrentBatchCount();
if (!sentSuccessfully)
{
lock (_locker)
{
_jsonQueue.InsertRange(0, messages);
}
}
}
@@ -264,12 +319,26 @@ namespace TimberWinR.Outputs
}
catch (Exception ex)
{
throw;
_lastErrorTime = DateTime.UtcNow;
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
}
}
// Sample the queue and adjust the batch count if needed (ramp up slowly)
private void updateCurrentBatchCount()
{
if (_currentBatchCount < _maxBatchCount && AverageQueueDepth() > _currentBatchCount)
{
_currentBatchCount += _maxBatchCount/QUEUE_SAMPLE_SIZE;
}
else // Reset to default
{
_currentBatchCount = _batchCount;
}
}
}
}

View File

@@ -576,6 +576,8 @@ namespace TimberWinR.Parser
public int Timeout { get; set; }
[JsonProperty(PropertyName = "batch_count")]
public int BatchCount { get; set; }
[JsonProperty(PropertyName = "max_batch_count")]
public int MaxBatchCount { get; set; }
[JsonProperty(PropertyName = "threads")]
public int NumThreads { get; set; }
[JsonProperty(PropertyName = "interval")]
@@ -592,6 +594,7 @@ namespace TimberWinR.Parser
Host = new string[] { "localhost" };
Timeout = 10000;
BatchCount = 10;
MaxBatchCount = BatchCount*10;
NumThreads = 1;
Interval = 5000;
QueueOverflowDiscardOldest = true;

View File

@@ -3,6 +3,10 @@
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
Version History
### 1.3.20.0 - 03/03/2015
1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time
to handle input flooding. Default is _batch\_count_ * 10
### 1.3.19.0 - 02/26/2015
1. Added support for Multiline codecs for Stdin and Logs listeners, closes issue [#23](https://github.com/Cimpress-MCP/TimberWinR/issues/23)

View File

@@ -9,6 +9,7 @@ The following parameters are allowed when configuring the Redis output.
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
| *batch_count* | integer | Sent as a single message | Number of messages to aggregate | 10 |
| *max_batch_count* | integer | Dynamically adjusted count maximum | Increases over time | batch_count*10 |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *index* | string | The name of the redis list | logstash index name | logstash |
| *host* | [string] | The hostname(s) of your Redis server(s) | IP or DNS name | |