Batch send messages to redis
Add a cap on the message backlog in the redis outputter. Defaults to 50k.
This commit is contained in:
@@ -35,6 +35,9 @@ namespace TimberWinR.Outputs
|
||||
private long _errorCount;
|
||||
private long _redisDepth;
|
||||
|
||||
private int _maxQueueSize;
|
||||
private bool _queueOverflowDiscardOldest;
|
||||
|
||||
/// <summary>
|
||||
/// Get the next client
|
||||
/// </summary>
|
||||
@@ -104,6 +107,8 @@ namespace TimberWinR.Outputs
|
||||
_interval = ro.Interval;
|
||||
_numThreads = ro.NumThreads;
|
||||
_errorCount = 0;
|
||||
_maxQueueSize = ro.MaxQueueSize;
|
||||
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
|
||||
|
||||
for (int i = 0; i < ro.NumThreads; i++)
|
||||
{
|
||||
@@ -134,6 +139,24 @@ namespace TimberWinR.Outputs
|
||||
|
||||
lock (_locker)
|
||||
{
|
||||
if (_jsonQueue.Count >= _maxQueueSize)
|
||||
{
|
||||
// If we've exceeded our queue size, and we're supposed to throw out the oldest objects first,
|
||||
// then remove as many as necessary to get us under our limit
|
||||
if (_queueOverflowDiscardOldest)
|
||||
{
|
||||
for (int i = 0; i <= (_jsonQueue.Count - _maxQueueSize); i++)
|
||||
{
|
||||
_jsonQueue.RemoveAt(0);
|
||||
}
|
||||
}
|
||||
// Otherwise we're in a "discard newest" mode, and this is the newest message, so just ignore it
|
||||
else
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_jsonQueue.Add(message);
|
||||
}
|
||||
}
|
||||
@@ -184,21 +207,20 @@ namespace TimberWinR.Outputs
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Length, client.Host);
|
||||
|
||||
foreach (string jsonMessage in messages)
|
||||
{
|
||||
try
|
||||
{
|
||||
_redisDepth = client.RPush(_logstashIndexName, jsonMessage);
|
||||
_sentMessages++;
|
||||
_redisDepth = client.RPush(_logstashIndexName, messages);
|
||||
_sentMessages += messages.Length;
|
||||
}
|
||||
catch (SocketException ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.EndPipe();
|
||||
GC.Collect();
|
||||
}
|
||||
break;
|
||||
}
|
||||
else
|
||||
|
||||
@@ -512,6 +512,10 @@ namespace TimberWinR.Parser
|
||||
public int NumThreads { get; set; }
|
||||
[JsonProperty(PropertyName = "interval")]
|
||||
public int Interval { get; set; }
|
||||
[JsonProperty(PropertyName = "max_queue_size")]
|
||||
public int MaxQueueSize { get; set; }
|
||||
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
|
||||
public bool QueueOverflowDiscardOldest { get; set; }
|
||||
|
||||
public RedisOutput()
|
||||
{
|
||||
@@ -522,6 +526,8 @@ namespace TimberWinR.Parser
|
||||
BatchCount = 10;
|
||||
NumThreads = 1;
|
||||
Interval = 5000;
|
||||
QueueOverflowDiscardOldest = true;
|
||||
MaxQueueSize = 50000;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user