From d7fa582191cdfa8abc5c6f1dfadcf98d327ccaf9 Mon Sep 17 00:00:00 2001 From: Eric Fontana Date: Tue, 3 Mar 2015 14:07:53 -0500 Subject: [PATCH] Re-factored Batchcounter logic into separate class. --- TimberWinR/Outputs/Redis.cs | 145 ++++++++++++++++++++++-------------- 1 file changed, 90 insertions(+), 55 deletions(-) diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index f11788e..8fcf8a2 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -18,6 +18,77 @@ using TimberWinR.Parser; namespace TimberWinR.Outputs { + internal class BatchCounter + { + private readonly int[] _sampleQueueDepths; + private int _sampleCountIndex; + private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default) + private object _locker = new object(); + private bool _warnedReachedMax; + private readonly int _maxBatchCount; + private readonly int _batchCount; + private int _totalSamples; + + public int[] Samples() + { + return _sampleQueueDepths; + } + + public BatchCounter(int batchCount, int maxBatchCount) + { + _batchCount = batchCount; + _maxBatchCount = maxBatchCount; + _sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE]; + _sampleCountIndex = 0; + _totalSamples = 0; + } + public void SampleQueueDepth(int queueDepth) + { + lock (_locker) + { + if (_totalSamples < QUEUE_SAMPLE_SIZE) + _totalSamples++; + + // Take a sample of the queue depth + if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE) + _sampleCountIndex = 0; + + _sampleQueueDepths[_sampleCountIndex++] = queueDepth; + } + } + + public int AverageQueueDepth() + { + lock (_locker) + { + var samples = _sampleQueueDepths.Take(_totalSamples); + return (int) samples.Average(); + } + } + + // Sample the queue and adjust the batch count if needed (ramp up slowly) + public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount) + { + if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount) + { + currentBatchCount += _maxBatchCount / QUEUE_SAMPLE_SIZE; + if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax) + { + LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount); + _warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset) + } + } + else // Reset to default + { + currentBatchCount = _batchCount; + _warnedReachedMax = false; + } + + return currentBatchCount; + } + } + + public class RedisOutput : OutputSender { private readonly string _logstashIndexName; @@ -32,17 +103,15 @@ namespace TimberWinR.Outputs private int _currentBatchCount; private readonly int _maxBatchCount; private readonly int _interval; - private readonly int _numThreads; - private readonly int[] _sampleQueueDepths; - private int _sampleCountIndex; + private readonly int _numThreads; private long _sentMessages; private long _errorCount; private long _redisDepth; - private DateTime? _lastErrorTime; - private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default) + private DateTime? _lastErrorTime; private readonly int _maxQueueSize; private readonly bool _queueOverflowDiscardOldest; private bool _warnedReachedMax; + private BatchCounter _batchCounter; public bool Stop { get; set; } @@ -97,8 +166,8 @@ namespace TimberWinR.Outputs new JProperty("batchcount", _batchCount), new JProperty("currentBatchCount", _currentBatchCount), new JProperty("maxBatchCount", _maxBatchCount), - new JProperty("averageQueueDepth", AverageQueueDepth()), - new JProperty("queueSamples", new JArray(_sampleQueueDepths)), + new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()), + new JProperty("queueSamples", new JArray(_batchCounter.Samples())), new JProperty("index", _logstashIndexName), new JProperty("hosts", new JArray( @@ -111,17 +180,14 @@ namespace TimberWinR.Outputs public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken) : base(cancelToken, "Redis") { - _warnedReachedMax = false; - // Last QUEUE_SAMPLE_SIZE queue samples (spans timestamp * 10) - _sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE]; - _sampleCountIndex = 0; + _warnedReachedMax = false; _redisDepth = 0; _batchCount = ro.BatchCount; _maxBatchCount = ro.MaxBatchCount; // Make sure maxBatchCount is larger than batchCount if (_maxBatchCount < _batchCount) - _maxBatchCount = _batchCount*10; - _currentBatchCount = _batchCount; + _maxBatchCount = _batchCount*10; + _manager = manager; _redisHostIndex = 0; _redisHosts = ro.Host; @@ -135,7 +201,9 @@ namespace TimberWinR.Outputs _lastErrorTime = null; _maxQueueSize = ro.MaxQueueSize; _queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest; - + _batchCounter = new BatchCounter(_batchCount, _maxBatchCount); + _currentBatchCount = _batchCount; + for (int i = 0; i < ro.NumThreads; i++) { var redisThread = new Task(RedisSender, cancelToken); @@ -202,20 +270,7 @@ namespace TimberWinR.Outputs } } return drop; - } - - - // - // What is average queue depth? - // - private int AverageQueueDepth() - { - lock(_locker) - { - return (int)_sampleQueueDepths.Average(); - } - } - + } // // Pull off messages from the Queue, batch them up and send them all across // @@ -233,15 +288,14 @@ namespace TimberWinR.Outputs string[] messages; // Exclusively lock (_locker) - { - // Take a sample of the queue depth - if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE) - _sampleCountIndex = 0; - _sampleQueueDepths[_sampleCountIndex++] = _jsonQueue.Count; + { + _batchCounter.SampleQueueDepth(_jsonQueue.Count); + messages = _jsonQueue.Take(_currentBatchCount).ToArray(); - _jsonQueue.RemoveRange(0, messages.Length); + _jsonQueue.RemoveRange(0, messages.Length); + // Re-compute current batch size - updateCurrentBatchCount(); + _currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount); } if (messages.Length > 0) @@ -328,25 +382,6 @@ namespace TimberWinR.Outputs } } } - } - - // Sample the queue and adjust the batch count if needed (ramp up slowly) - private void updateCurrentBatchCount() - { - if (_currentBatchCount < _maxBatchCount && AverageQueueDepth() > _currentBatchCount) - { - _currentBatchCount += _maxBatchCount/QUEUE_SAMPLE_SIZE; - if (_currentBatchCount >= _maxBatchCount && !_warnedReachedMax) - { - LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", _currentBatchCount); - _warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset) - } - } - else // Reset to default - { - _currentBatchCount = _batchCount; - _warnedReachedMax = false; - } - } + } } }