Updated per PR feedback
This commit is contained in:
@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
|
||||
// You can specify all the values or you can default the Build and Revision Numbers
|
||||
// by using the '*' as shown below:
|
||||
// [assembly: AssemblyVersion("1.0.*")]
|
||||
[assembly: AssemblyVersion("1.3.20.0")]
|
||||
[assembly: AssemblyFileVersion("1.3.20.0")]
|
||||
[assembly: AssemblyVersion("1.3.19.1")]
|
||||
[assembly: AssemblyFileVersion("1.3.19.1")]
|
||||
|
||||
@@ -33,7 +33,7 @@ namespace TimberWinR.Outputs
|
||||
private readonly int _maxBatchCount;
|
||||
private readonly int _interval;
|
||||
private readonly int _numThreads;
|
||||
private readonly long[] _sampleQueueDepths;
|
||||
private readonly int[] _sampleQueueDepths;
|
||||
private int _sampleCountIndex;
|
||||
private long _sentMessages;
|
||||
private long _errorCount;
|
||||
@@ -42,6 +42,7 @@ namespace TimberWinR.Outputs
|
||||
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
|
||||
private readonly int _maxQueueSize;
|
||||
private readonly bool _queueOverflowDiscardOldest;
|
||||
private bool _warnedReachedMax;
|
||||
|
||||
public bool Stop { get; set; }
|
||||
|
||||
@@ -109,8 +110,9 @@ namespace TimberWinR.Outputs
|
||||
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Redis")
|
||||
{
|
||||
_warnedReachedMax = false;
|
||||
// Last QUEUE_SAMPLE_SIZE queue samples (spans timestamp * 10)
|
||||
_sampleQueueDepths = new long[QUEUE_SAMPLE_SIZE];
|
||||
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
|
||||
_sampleCountIndex = 0;
|
||||
_redisDepth = 0;
|
||||
_batchCount = ro.BatchCount;
|
||||
@@ -236,10 +238,9 @@ namespace TimberWinR.Outputs
|
||||
_sampleCountIndex = 0;
|
||||
_sampleQueueDepths[_sampleCountIndex++] = _jsonQueue.Count;
|
||||
messages = _jsonQueue.Take(_currentBatchCount).ToArray();
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
var remainingCount = _jsonQueue.Count;
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
// Re-compute current batch size
|
||||
updateCurrentBatchCount();
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
@@ -265,6 +266,8 @@ namespace TimberWinR.Outputs
|
||||
_sentMessages += messages.Length;
|
||||
client.EndPipe();
|
||||
sentSuccessfully = true;
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
}
|
||||
catch (SocketException ex)
|
||||
{
|
||||
@@ -298,9 +301,7 @@ namespace TimberWinR.Outputs
|
||||
}
|
||||
} // No more hosts to try.
|
||||
|
||||
// Re-compute current batch size
|
||||
updateCurrentBatchCount();
|
||||
|
||||
|
||||
if (!sentSuccessfully)
|
||||
{
|
||||
lock (_locker)
|
||||
@@ -334,10 +335,16 @@ namespace TimberWinR.Outputs
|
||||
if (_currentBatchCount < _maxBatchCount && AverageQueueDepth() > _currentBatchCount)
|
||||
{
|
||||
_currentBatchCount += _maxBatchCount/QUEUE_SAMPLE_SIZE;
|
||||
if (_currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", _currentBatchCount);
|
||||
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
|
||||
}
|
||||
}
|
||||
else // Reset to default
|
||||
{
|
||||
_currentBatchCount = _batchCount;
|
||||
_warnedReachedMax = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
|
||||
|
||||
Version History
|
||||
### 1.3.20.0 - 03/03/2015
|
||||
### 1.3.19.1 - 03/03/2015
|
||||
1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time
|
||||
to handle input flooding. Default is _batch\_count_ * 10
|
||||
|
||||
|
||||
Reference in New Issue
Block a user