diff --git a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs index 86c588b..ca35740 100644 --- a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs +++ b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ using System.Runtime.InteropServices; // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.3.20.0")] -[assembly: AssemblyFileVersion("1.3.20.0")] +[assembly: AssemblyVersion("1.3.19.1")] +[assembly: AssemblyFileVersion("1.3.19.1")] diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index 0e98118..e46818f 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -33,7 +33,7 @@ namespace TimberWinR.Outputs private readonly int _maxBatchCount; private readonly int _interval; private readonly int _numThreads; - private readonly long[] _sampleQueueDepths; + private readonly int[] _sampleQueueDepths; private int _sampleCountIndex; private long _sentMessages; private long _errorCount; @@ -42,6 +42,7 @@ namespace TimberWinR.Outputs private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default) private readonly int _maxQueueSize; private readonly bool _queueOverflowDiscardOldest; + private bool _warnedReachedMax; public bool Stop { get; set; } @@ -109,8 +110,9 @@ namespace TimberWinR.Outputs public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken) : base(cancelToken, "Redis") { + _warnedReachedMax = false; // Last QUEUE_SAMPLE_SIZE queue samples (spans timestamp * 10) - _sampleQueueDepths = new long[QUEUE_SAMPLE_SIZE]; + _sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE]; _sampleCountIndex = 0; _redisDepth = 0; _batchCount = ro.BatchCount; @@ -236,10 +238,9 @@ namespace TimberWinR.Outputs _sampleCountIndex = 0; _sampleQueueDepths[_sampleCountIndex++] = _jsonQueue.Count; messages = _jsonQueue.Take(_currentBatchCount).ToArray(); - _jsonQueue.RemoveRange(0, messages.Length); - var remainingCount = _jsonQueue.Count; - if (messages.Length > 0) - _manager.IncrementMessageCount(messages.Length); + _jsonQueue.RemoveRange(0, messages.Length); + // Re-compute current batch size + updateCurrentBatchCount(); } if (messages.Length > 0) @@ -265,6 +266,8 @@ namespace TimberWinR.Outputs _sentMessages += messages.Length; client.EndPipe(); sentSuccessfully = true; + if (messages.Length > 0) + _manager.IncrementMessageCount(messages.Length); } catch (SocketException ex) { @@ -298,9 +301,7 @@ namespace TimberWinR.Outputs } } // No more hosts to try. - // Re-compute current batch size - updateCurrentBatchCount(); - + if (!sentSuccessfully) { lock (_locker) @@ -334,10 +335,16 @@ namespace TimberWinR.Outputs if (_currentBatchCount < _maxBatchCount && AverageQueueDepth() > _currentBatchCount) { _currentBatchCount += _maxBatchCount/QUEUE_SAMPLE_SIZE; + if (_currentBatchCount >= _maxBatchCount && !_warnedReachedMax) + { + LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", _currentBatchCount); + _warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset) + } } else // Reset to default { _currentBatchCount = _batchCount; + _warnedReachedMax = false; } } } diff --git a/TimberWinR/ReleaseNotes.md b/TimberWinR/ReleaseNotes.md index 20f8cca..b7c76f4 100644 --- a/TimberWinR/ReleaseNotes.md +++ b/TimberWinR/ReleaseNotes.md @@ -3,7 +3,7 @@ A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service. Version History -### 1.3.20.0 - 03/03/2015 +### 1.3.19.1 - 03/03/2015 1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time to handle input flooding. Default is _batch\_count_ * 10