From cdc2d091504f73124cf94786adc96aeb7e3ebe64 Mon Sep 17 00:00:00 2001 From: Eric Fontana Date: Tue, 3 Mar 2015 09:55:08 -0500 Subject: [PATCH] Added new max_batch_count parameter to mitigate flooding when large bursts occur. Essentially the incoming rate could exceed the outgoing rate and this will mitigate this condition. --- .../Properties/AssemblyInfo.cs | 4 +- TimberWinR/Outputs/Redis.cs | 101 +++++++++++++++--- TimberWinR/Parser.cs | 3 + TimberWinR/ReleaseNotes.md | 4 + TimberWinR/mdocs/RedisOutput.md | 1 + 5 files changed, 95 insertions(+), 18 deletions(-) diff --git a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs index ee7828b..86c588b 100644 --- a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs +++ b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ using System.Runtime.InteropServices; // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.3.19.0")] -[assembly: AssemblyFileVersion("1.3.19.0")] +[assembly: AssemblyVersion("1.3.20.0")] +[assembly: AssemblyFileVersion("1.3.20.0")] diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index a501057..0e98118 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics.Eventing.Reader; using System.Linq; using System.Linq.Expressions; using System.Net.Sockets; @@ -23,26 +24,29 @@ namespace TimberWinR.Outputs private readonly int _port; private readonly int _timeout; private readonly object _locker = new object(); - private readonly List _jsonQueue; - // readonly Task _consumerTask; + private readonly List _jsonQueue; private readonly string[] _redisHosts; private int _redisHostIndex; private TimberWinR.Manager _manager; private readonly int _batchCount; + private int _currentBatchCount; + private readonly int _maxBatchCount; private readonly int _interval; private readonly int _numThreads; - + private readonly long[] _sampleQueueDepths; + private int _sampleCountIndex; private long _sentMessages; private long _errorCount; private long _redisDepth; - - private int _maxQueueSize; - private bool _queueOverflowDiscardOldest; + private DateTime? _lastErrorTime; + private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default) + private readonly int _maxQueueSize; + private readonly bool _queueOverflowDiscardOldest; public bool Stop { get; set; } /// - /// Get the next client + /// Get the next client from the list of hosts. /// /// private RedisClient getClient() @@ -79,6 +83,7 @@ namespace TimberWinR.Outputs new JObject( new JProperty("host", string.Join(",", _redisHosts)), new JProperty("errors", _errorCount), + new JProperty("lastErrorTime", _lastErrorTime), new JProperty("redis_depth", _redisDepth), new JProperty("sent_messages", _sentMessages), new JProperty("queued_messages", _jsonQueue.Count), @@ -88,6 +93,10 @@ namespace TimberWinR.Outputs new JProperty("interval", _interval), new JProperty("threads", _numThreads), new JProperty("batchcount", _batchCount), + new JProperty("currentBatchCount", _currentBatchCount), + new JProperty("maxBatchCount", _maxBatchCount), + new JProperty("averageQueueDepth", AverageQueueDepth()), + new JProperty("queueSamples", new JArray(_sampleQueueDepths)), new JProperty("index", _logstashIndexName), new JProperty("hosts", new JArray( @@ -100,11 +109,19 @@ namespace TimberWinR.Outputs public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken) : base(cancelToken, "Redis") { + // Last QUEUE_SAMPLE_SIZE queue samples (spans timestamp * 10) + _sampleQueueDepths = new long[QUEUE_SAMPLE_SIZE]; + _sampleCountIndex = 0; _redisDepth = 0; _batchCount = ro.BatchCount; + _maxBatchCount = ro.MaxBatchCount; + // Make sure maxBatchCount is larger than batchCount + if (_maxBatchCount < _batchCount) + _maxBatchCount = _batchCount*10; + _currentBatchCount = _batchCount; _manager = manager; _redisHostIndex = 0; - _redisHosts = ro.Host; + _redisHosts = ro.Host; _jsonQueue = new List(); _port = ro.Port; _timeout = ro.Timeout; @@ -112,6 +129,7 @@ namespace TimberWinR.Outputs _interval = ro.Interval; _numThreads = ro.NumThreads; _errorCount = 0; + _lastErrorTime = null; _maxQueueSize = ro.MaxQueueSize; _queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest; @@ -183,6 +201,18 @@ namespace TimberWinR.Outputs return drop; } + + // + // What is average queue depth? + // + private int AverageQueueDepth() + { + lock(_locker) + { + return (int)_sampleQueueDepths.Average(); + } + } + // // Pull off messages from the Queue, batch them up and send them all across // @@ -198,10 +228,16 @@ namespace TimberWinR.Outputs try { string[] messages; + // Exclusively lock (_locker) - { - messages = _jsonQueue.Take(_batchCount).ToArray(); + { + // Take a sample of the queue depth + if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE) + _sampleCountIndex = 0; + _sampleQueueDepths[_sampleCountIndex++] = _jsonQueue.Count; + messages = _jsonQueue.Take(_currentBatchCount).ToArray(); _jsonQueue.RemoveRange(0, messages.Length); + var remainingCount = _jsonQueue.Count; if (messages.Length > 0) _manager.IncrementMessageCount(messages.Length); } @@ -209,6 +245,7 @@ namespace TimberWinR.Outputs if (messages.Length > 0) { int numHosts = _redisHosts.Length; + bool sentSuccessfully = false; while (numHosts-- > 0) { try @@ -225,17 +262,22 @@ namespace TimberWinR.Outputs try { _redisDepth = client.RPush(_logstashIndexName, messages); - _sentMessages += messages.Length; + _sentMessages += messages.Length; + client.EndPipe(); + sentSuccessfully = true; } catch (SocketException ex) { LogManager.GetCurrentClassLogger().Warn(ex); Interlocked.Increment(ref _errorCount); + _lastErrorTime = DateTime.UtcNow; } - finally + catch (Exception ex) { - client.EndPipe(); - } + LogManager.GetCurrentClassLogger().Error(ex); + Interlocked.Increment(ref _errorCount); + _lastErrorTime = DateTime.UtcNow; + } break; } else @@ -244,6 +286,7 @@ namespace TimberWinR.Outputs LogManager.GetCurrentClassLogger() .Fatal("Unable to connect with any Redis hosts, {0}", String.Join(",", _redisHosts)); + _lastErrorTime = DateTime.UtcNow; } } } @@ -251,6 +294,18 @@ namespace TimberWinR.Outputs { LogManager.GetCurrentClassLogger().Error(ex); Interlocked.Increment(ref _errorCount); + _lastErrorTime = DateTime.UtcNow; + } + } // No more hosts to try. + + // Re-compute current batch size + updateCurrentBatchCount(); + + if (!sentSuccessfully) + { + lock (_locker) + { + _jsonQueue.InsertRange(0, messages); } } } @@ -264,12 +319,26 @@ namespace TimberWinR.Outputs } catch (Exception ex) { - - throw; + _lastErrorTime = DateTime.UtcNow; + Interlocked.Increment(ref _errorCount); + LogManager.GetCurrentClassLogger().Error(ex); } } } } } + + // Sample the queue and adjust the batch count if needed (ramp up slowly) + private void updateCurrentBatchCount() + { + if (_currentBatchCount < _maxBatchCount && AverageQueueDepth() > _currentBatchCount) + { + _currentBatchCount += _maxBatchCount/QUEUE_SAMPLE_SIZE; + } + else // Reset to default + { + _currentBatchCount = _batchCount; + } + } } } diff --git a/TimberWinR/Parser.cs b/TimberWinR/Parser.cs index 59d2379..c163114 100644 --- a/TimberWinR/Parser.cs +++ b/TimberWinR/Parser.cs @@ -576,6 +576,8 @@ namespace TimberWinR.Parser public int Timeout { get; set; } [JsonProperty(PropertyName = "batch_count")] public int BatchCount { get; set; } + [JsonProperty(PropertyName = "max_batch_count")] + public int MaxBatchCount { get; set; } [JsonProperty(PropertyName = "threads")] public int NumThreads { get; set; } [JsonProperty(PropertyName = "interval")] @@ -592,6 +594,7 @@ namespace TimberWinR.Parser Host = new string[] { "localhost" }; Timeout = 10000; BatchCount = 10; + MaxBatchCount = BatchCount*10; NumThreads = 1; Interval = 5000; QueueOverflowDiscardOldest = true; diff --git a/TimberWinR/ReleaseNotes.md b/TimberWinR/ReleaseNotes.md index c787235..20f8cca 100644 --- a/TimberWinR/ReleaseNotes.md +++ b/TimberWinR/ReleaseNotes.md @@ -3,6 +3,10 @@ A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service. Version History +### 1.3.20.0 - 03/03/2015 +1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time + to handle input flooding. Default is _batch\_count_ * 10 + ### 1.3.19.0 - 02/26/2015 1. Added support for Multiline codecs for Stdin and Logs listeners, closes issue [#23](https://github.com/Cimpress-MCP/TimberWinR/issues/23) diff --git a/TimberWinR/mdocs/RedisOutput.md b/TimberWinR/mdocs/RedisOutput.md index f541898..b7251d0 100644 --- a/TimberWinR/mdocs/RedisOutput.md +++ b/TimberWinR/mdocs/RedisOutput.md @@ -9,6 +9,7 @@ The following parameters are allowed when configuring the Redis output. | :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- | | *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 | | *batch_count* | integer | Sent as a single message | Number of messages to aggregate | 10 | +| *max_batch_count* | integer | Dynamically adjusted count maximum | Increases over time | batch_count*10 | | *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 | | *index* | string | The name of the redis list | logstash index name | logstash | | *host* | [string] | The hostname(s) of your Redis server(s) | IP or DNS name | |