Merge pull request #20 from gcschorer/FixRedisOutputLeaks

Batch send messages to redis
This commit is contained in:
Eric Fontana
2014-12-19 11:22:53 -05:00
2 changed files with 41 additions and 13 deletions

View File

@@ -35,6 +35,9 @@ namespace TimberWinR.Outputs
private long _errorCount;
private long _redisDepth;
private int _maxQueueSize;
private bool _queueOverflowDiscardOldest;
/// <summary>
/// Get the next client
/// </summary>
@@ -104,6 +107,8 @@ namespace TimberWinR.Outputs
_interval = ro.Interval;
_numThreads = ro.NumThreads;
_errorCount = 0;
_maxQueueSize = ro.MaxQueueSize;
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
for (int i = 0; i < ro.NumThreads; i++)
{
@@ -134,6 +139,24 @@ namespace TimberWinR.Outputs
lock (_locker)
{
if (_jsonQueue.Count >= _maxQueueSize)
{
// If we've exceeded our queue size, and we're supposed to throw out the oldest objects first,
// then remove as many as necessary to get us under our limit
if (_queueOverflowDiscardOldest)
{
for (int i = 0; i <= (_jsonQueue.Count - _maxQueueSize); i++)
{
_jsonQueue.RemoveAt(0);
}
}
// Otherwise we're in a "discard newest" mode, and this is the newest message, so just ignore it
else
{
return;
}
}
_jsonQueue.Add(message);
}
}
@@ -184,21 +207,20 @@ namespace TimberWinR.Outputs
LogManager.GetCurrentClassLogger()
.Debug("Sending {0} Messages to {1}", messages.Length, client.Host);
foreach (string jsonMessage in messages)
try
{
try
{
_redisDepth = client.RPush(_logstashIndexName, jsonMessage);
_sentMessages++;
}
catch (SocketException ex)
{
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
}
_redisDepth = client.RPush(_logstashIndexName, messages);
_sentMessages += messages.Length;
}
catch (SocketException ex)
{
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
}
finally
{
client.EndPipe();
}
client.EndPipe();
GC.Collect();
break;
}
else

View File

@@ -512,6 +512,10 @@ namespace TimberWinR.Parser
public int NumThreads { get; set; }
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
[JsonProperty(PropertyName = "max_queue_size")]
public int MaxQueueSize { get; set; }
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
public bool QueueOverflowDiscardOldest { get; set; }
public RedisOutput()
{
@@ -522,6 +526,8 @@ namespace TimberWinR.Parser
BatchCount = 10;
NumThreads = 1;
Interval = 5000;
QueueOverflowDiscardOldest = true;
MaxQueueSize = 50000;
}
}