From afbc1e229448fd8684992534467a8d750b4ada52 Mon Sep 17 00:00:00 2001 From: gschorer Date: Fri, 19 Dec 2014 11:08:37 -0500 Subject: [PATCH] Batch send messages to redis Add a cap on the message backlog in the redis outputter. Defaults to 50k. --- TimberWinR/Outputs/Redis.cs | 48 +++++++++++++++++++++++++++---------- TimberWinR/Parser.cs | 6 +++++ 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index 0217c84..4528b10 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -35,6 +35,9 @@ namespace TimberWinR.Outputs private long _errorCount; private long _redisDepth; + private int _maxQueueSize; + private bool _queueOverflowDiscardOldest; + /// /// Get the next client /// @@ -104,6 +107,8 @@ namespace TimberWinR.Outputs _interval = ro.Interval; _numThreads = ro.NumThreads; _errorCount = 0; + _maxQueueSize = ro.MaxQueueSize; + _queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest; for (int i = 0; i < ro.NumThreads; i++) { @@ -134,6 +139,24 @@ namespace TimberWinR.Outputs lock (_locker) { + if (_jsonQueue.Count >= _maxQueueSize) + { + // If we've exceeded our queue size, and we're supposed to throw out the oldest objects first, + // then remove as many as necessary to get us under our limit + if (_queueOverflowDiscardOldest) + { + for (int i = 0; i <= (_jsonQueue.Count - _maxQueueSize); i++) + { + _jsonQueue.RemoveAt(0); + } + } + // Otherwise we're in a "discard newest" mode, and this is the newest message, so just ignore it + else + { + return; + } + } + _jsonQueue.Add(message); } } @@ -184,21 +207,20 @@ namespace TimberWinR.Outputs LogManager.GetCurrentClassLogger() .Debug("Sending {0} Messages to {1}", messages.Length, client.Host); - foreach (string jsonMessage in messages) + try { - try - { - _redisDepth = client.RPush(_logstashIndexName, jsonMessage); - _sentMessages++; - } - catch (SocketException ex) - { - LogManager.GetCurrentClassLogger().Warn(ex); - Interlocked.Increment(ref _errorCount); - } + _redisDepth = client.RPush(_logstashIndexName, messages); + _sentMessages += messages.Length; + } + catch (SocketException ex) + { + LogManager.GetCurrentClassLogger().Warn(ex); + Interlocked.Increment(ref _errorCount); + } + finally + { + client.EndPipe(); } - client.EndPipe(); - GC.Collect(); break; } else diff --git a/TimberWinR/Parser.cs b/TimberWinR/Parser.cs index 051f0ad..3ebfd3d 100644 --- a/TimberWinR/Parser.cs +++ b/TimberWinR/Parser.cs @@ -512,6 +512,10 @@ namespace TimberWinR.Parser public int NumThreads { get; set; } [JsonProperty(PropertyName = "interval")] public int Interval { get; set; } + [JsonProperty(PropertyName = "max_queue_size")] + public int MaxQueueSize { get; set; } + [JsonProperty(PropertyName = "queue_overflow_discard_oldest")] + public bool QueueOverflowDiscardOldest { get; set; } public RedisOutput() { @@ -522,6 +526,8 @@ namespace TimberWinR.Parser BatchCount = 10; NumThreads = 1; Interval = 5000; + QueueOverflowDiscardOldest = true; + MaxQueueSize = 50000; } }