Re-factored Batchcounter logic into separate class.

This commit is contained in:
Eric Fontana
2015-03-03 14:07:53 -05:00
parent 6ea3e581fd
commit d7fa582191

View File

@@ -18,6 +18,77 @@ using TimberWinR.Parser;
namespace TimberWinR.Outputs
{
internal class BatchCounter
{
private readonly int[] _sampleQueueDepths;
private int _sampleCountIndex;
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
private object _locker = new object();
private bool _warnedReachedMax;
private readonly int _maxBatchCount;
private readonly int _batchCount;
private int _totalSamples;
public int[] Samples()
{
return _sampleQueueDepths;
}
public BatchCounter(int batchCount, int maxBatchCount)
{
_batchCount = batchCount;
_maxBatchCount = maxBatchCount;
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
_sampleCountIndex = 0;
_totalSamples = 0;
}
public void SampleQueueDepth(int queueDepth)
{
lock (_locker)
{
if (_totalSamples < QUEUE_SAMPLE_SIZE)
_totalSamples++;
// Take a sample of the queue depth
if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE)
_sampleCountIndex = 0;
_sampleQueueDepths[_sampleCountIndex++] = queueDepth;
}
}
public int AverageQueueDepth()
{
lock (_locker)
{
var samples = _sampleQueueDepths.Take(_totalSamples);
return (int) samples.Average();
}
}
// Sample the queue and adjust the batch count if needed (ramp up slowly)
public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount)
{
if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount)
{
currentBatchCount += _maxBatchCount / QUEUE_SAMPLE_SIZE;
if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
{
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount);
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
}
}
else // Reset to default
{
currentBatchCount = _batchCount;
_warnedReachedMax = false;
}
return currentBatchCount;
}
}
public class RedisOutput : OutputSender
{
private readonly string _logstashIndexName;
@@ -32,17 +103,15 @@ namespace TimberWinR.Outputs
private int _currentBatchCount;
private readonly int _maxBatchCount;
private readonly int _interval;
private readonly int _numThreads;
private readonly int[] _sampleQueueDepths;
private int _sampleCountIndex;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private long _redisDepth;
private DateTime? _lastErrorTime;
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
private DateTime? _lastErrorTime;
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
private bool _warnedReachedMax;
private BatchCounter _batchCounter;
public bool Stop { get; set; }
@@ -97,8 +166,8 @@ namespace TimberWinR.Outputs
new JProperty("batchcount", _batchCount),
new JProperty("currentBatchCount", _currentBatchCount),
new JProperty("maxBatchCount", _maxBatchCount),
new JProperty("averageQueueDepth", AverageQueueDepth()),
new JProperty("queueSamples", new JArray(_sampleQueueDepths)),
new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()),
new JProperty("queueSamples", new JArray(_batchCounter.Samples())),
new JProperty("index", _logstashIndexName),
new JProperty("hosts",
new JArray(
@@ -111,17 +180,14 @@ namespace TimberWinR.Outputs
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
: base(cancelToken, "Redis")
{
_warnedReachedMax = false;
// Last QUEUE_SAMPLE_SIZE queue samples (spans timestamp * 10)
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
_sampleCountIndex = 0;
_warnedReachedMax = false;
_redisDepth = 0;
_batchCount = ro.BatchCount;
_maxBatchCount = ro.MaxBatchCount;
// Make sure maxBatchCount is larger than batchCount
if (_maxBatchCount < _batchCount)
_maxBatchCount = _batchCount*10;
_currentBatchCount = _batchCount;
_maxBatchCount = _batchCount*10;
_manager = manager;
_redisHostIndex = 0;
_redisHosts = ro.Host;
@@ -135,7 +201,9 @@ namespace TimberWinR.Outputs
_lastErrorTime = null;
_maxQueueSize = ro.MaxQueueSize;
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
_batchCounter = new BatchCounter(_batchCount, _maxBatchCount);
_currentBatchCount = _batchCount;
for (int i = 0; i < ro.NumThreads; i++)
{
var redisThread = new Task(RedisSender, cancelToken);
@@ -202,20 +270,7 @@ namespace TimberWinR.Outputs
}
}
return drop;
}
//
// What is average queue depth?
//
private int AverageQueueDepth()
{
lock(_locker)
{
return (int)_sampleQueueDepths.Average();
}
}
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
@@ -233,15 +288,14 @@ namespace TimberWinR.Outputs
string[] messages;
// Exclusively
lock (_locker)
{
// Take a sample of the queue depth
if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE)
_sampleCountIndex = 0;
_sampleQueueDepths[_sampleCountIndex++] = _jsonQueue.Count;
{
_batchCounter.SampleQueueDepth(_jsonQueue.Count);
messages = _jsonQueue.Take(_currentBatchCount).ToArray();
_jsonQueue.RemoveRange(0, messages.Length);
_jsonQueue.RemoveRange(0, messages.Length);
// Re-compute current batch size
updateCurrentBatchCount();
_currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount);
}
if (messages.Length > 0)
@@ -328,25 +382,6 @@ namespace TimberWinR.Outputs
}
}
}
}
// Sample the queue and adjust the batch count if needed (ramp up slowly)
private void updateCurrentBatchCount()
{
if (_currentBatchCount < _maxBatchCount && AverageQueueDepth() > _currentBatchCount)
{
_currentBatchCount += _maxBatchCount/QUEUE_SAMPLE_SIZE;
if (_currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
{
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", _currentBatchCount);
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
}
}
else // Reset to default
{
_currentBatchCount = _batchCount;
_warnedReachedMax = false;
}
}
}
}
}