Merge pull request #31 from Cimpress-MCP/mitigate_flood

Added new max_batch_count parameter to mitigate flooding when large burs...
This commit is contained in:
Eric Fontana
2015-03-05 06:40:20 -05:00
24 changed files with 493 additions and 133 deletions

View File

@@ -99,9 +99,7 @@ namespace TimberWinR.ExtractID
Console.WriteLine("Updated {0} ProductID: {1}", args[2], productCode);
return 0;
}
Console.Error.WriteLine("Failed for some reason");
}
}
}
}

View File

@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.3.19.0")]
[assembly: AssemblyFileVersion("1.3.19.0")]
[assembly: AssemblyVersion("1.3.19.1")]
[assembly: AssemblyFileVersion("1.3.19.1")]

View File

@@ -94,7 +94,7 @@ namespace TimberWinR.UnitTests
Configuration c = Configuration.FromString(redisJson);
RedisOutput redis = c.RedisOutputs.First() as RedisOutput;
RedisOutputParameters redis = c.RedisOutputs.First() as RedisOutputParameters;
Assert.IsTrue(redis.Host.Length >= 1);
}

View File

@@ -0,0 +1,126 @@
using System;
using System.IO;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using TimberWinR.Parser;
using System.Text;
using System.Collections.Generic;
namespace TimberWinR.UnitTests
{
// Class which implements a Fake redis server for test purposes.
class FakeRediServer
{
private readonly System.Net.Sockets.TcpListener _tcpListenerV4;
private readonly System.Net.Sockets.TcpListener _tcpListenerV6;
private Thread _listenThreadV4;
private Thread _listenThreadV6;
private readonly int _port;
private CancellationToken _cancelToken;
private bool _shutdown;
public FakeRediServer(CancellationToken cancelToken, int port = 6379)
{
_port = port;
_cancelToken = cancelToken;
_shutdown = false;
_tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port);
_tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port);
_listenThreadV4 = new Thread(new ParameterizedThreadStart(ListenForClients));
_listenThreadV4.Start(_tcpListenerV4);
_listenThreadV6 = new Thread(new ParameterizedThreadStart(ListenForClients));
_listenThreadV6.Start(_tcpListenerV6);
}
public void Shutdown()
{
_shutdown = true;
this._tcpListenerV4.Stop();
this._tcpListenerV6.Stop();
}
private void ListenForClients(object olistener)
{
System.Net.Sockets.TcpListener listener = olistener as System.Net.Sockets.TcpListener;
listener.Start();
while (!_cancelToken.IsCancellationRequested && !_shutdown)
{
try
{
//blocks until a client has connected to the server
TcpClient client = listener.AcceptTcpClient();
// Wait for a client, spin up a thread.
var clientThread = new Thread(new ParameterizedThreadStart(HandleNewClient));
clientThread.Start(client);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.Interrupted)
break;
}
}
}
private void HandleNewClient(object client)
{
var tcpClient = (TcpClient)client;
try
{
NetworkStream clientStream = tcpClient.GetStream();
int i;
Byte[] bytes = new Byte[16535];
String data = null;
do
{
try
{
// Loop to receive all the data sent by the client.
while ((i = clientStream.Read(bytes, 0, bytes.Length)) != 0)
{
// Translate data bytes to a ASCII string.
data = System.Text.Encoding.ASCII.GetString(bytes, 0, i);
//System.Diagnostics.Debug.WriteLine(String.Format("Received: {0}", data));
// Process the data sent by the client.
data = ":1000\r\n";
byte[] msg = System.Text.Encoding.ASCII.GetBytes(data);
// Send back a response.
clientStream.Write(msg, 0, msg.Length);
// System.Diagnostics.Debug.WriteLine(String.Format("Sent: {0}", data));
}
}
catch (IOException)
{
}
} while (true);
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine(ex.ToString());
}
tcpClient.Close();
}
private void ProcessJson(JObject json)
{
Console.WriteLine(json.ToString());
}
}
}

View File

@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
@@ -52,7 +51,7 @@ namespace TimberWinR.UnitTests
if (!cancelTokenSource.Token.IsCancellationRequested)
syncHandle.Wait(TimeSpan.FromSeconds(10000), cancelTokenSource.Token);
}
catch (OperationCanceledException oex)
catch (OperationCanceledException)
{
}
}
@@ -102,7 +101,7 @@ namespace TimberWinR.UnitTests
if (!cancelTokenSource.Token.IsCancellationRequested)
syncHandle.Wait(TimeSpan.FromSeconds(10000), cancelTokenSource.Token);
}
catch (OperationCanceledException oex)
catch (OperationCanceledException)
{
}
}

View File

@@ -10,12 +10,12 @@
public class ElasticsearchOutputTests
{
private ElasticsearchOutput parser;
private ElasticsearchOutputParameters parser;
[SetUp]
public void Setup()
{
this.parser = new ElasticsearchOutput();
this.parser = new ElasticsearchOutputParameters();
}
[Test]

View File

@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
@@ -61,7 +60,7 @@ namespace TimberWinR.UnitTests
}
}
}
catch (OperationCanceledException oex)
catch (OperationCanceledException)
{
Console.WriteLine("Done!");
}

View File

@@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using NUnit.Framework;
using TimberWinR.Parser;
using Newtonsoft.Json.Linq;
using System.Threading;
namespace TimberWinR.UnitTests
{
// [TestFixture]
public class TestDynamicBatchCount
{
[Test]
public void TestDynamicBatch()
{
var mgr = new Manager();
mgr.LogfileDir = ".";
mgr.Config = new Configuration();
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
var cancelToken = cancelTokenSource.Token;
FakeRediServer fr = new FakeRediServer(cancelToken);
var redisParams = new RedisOutputParameters();
redisParams.BatchCount = 10;
redisParams.MaxBatchCount = 40;
redisParams.Interval = 100;
var redisOutput = new Outputs.RedisOutput(mgr, redisParams, cancelToken);
// Message is irrelavant
JObject jsonMessage = new JObject
{
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"},
{"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"}
};
// Send 1000 messages at max throttle
for (int i = 0; i < 1000; i++)
{
Thread.Sleep(10);
redisOutput.Startup(jsonMessage);
}
while (redisOutput.SentMessages < 1000)
{
System.Diagnostics.Debug.WriteLine(redisOutput.SentMessages);
Thread.Sleep(1000);
}
fr.Shutdown();
cancelTokenSource.Cancel();
System.Diagnostics.Debug.WriteLine(redisOutput.ToJson());
System.Diagnostics.Debug.WriteLine(redisOutput.QueueDepth);
JObject json = redisOutput.ToJson();
var mbc = json["redis"]["reachedMaxBatchCountTimes"].Value<int>();
var sm = json["redis"]["sentMessageCount"].Value<int>();
var errs = json["redis"]["errors"].Value<int>();
var cbc = json["redis"]["currentBatchCount"].Value<int>();
// No errors
Assert.AreEqual(0, errs);
// Should have reached max at least 1 time
Assert.GreaterOrEqual(mbc, 1);
// Should have sent 1000 messages
Assert.AreEqual(1000, sm);
// Should reset back down to original
Assert.AreEqual(cbc, 10);
}
}
}

View File

@@ -60,6 +60,7 @@
<ItemGroup>
<Compile Include="Configuration.cs" />
<Compile Include="DateFilterTests.cs" />
<Compile Include="FakeRediServer.cs" />
<Compile Include="GeoIPFilterTests.cs" />
<Compile Include="Inputs\IisW3CRowReaderTests.cs" />
<Compile Include="JsonFilterTests.cs" />
@@ -69,6 +70,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TailFileTests.cs" />
<Compile Include="TestBase.cs" />
<Compile Include="TestDynamicBatchCount.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\TimberWinR\TimberWinR.csproj">

View File

@@ -22,7 +22,6 @@ using TimberWinR.Filters;
using NLog;
using TimberWinR.Parser;
using Topshelf.Configurators;
using IISW3CLog = TimberWinR.Parser.IISW3CLog;
using WindowsEvent = TimberWinR.Parser.WindowsEvent;
namespace TimberWinR
@@ -30,7 +29,7 @@ namespace TimberWinR
public class Configuration
{
private CancellationToken _cancelToken;
private bool _stopService;
private FileSystemWatcher _dirWatcher;
private Manager _manager;
@@ -40,39 +39,39 @@ namespace TimberWinR
get { return _events; }
}
private List<RedisOutput> _redisOutputs = new List<RedisOutput>();
public IEnumerable<RedisOutput> RedisOutputs
private List<RedisOutputParameters> _redisOutputs = new List<RedisOutputParameters>();
public IEnumerable<RedisOutputParameters> RedisOutputs
{
get { return _redisOutputs; }
}
private List<ElasticsearchOutput> _elasticsearchOutputs = new List<ElasticsearchOutput>();
public IEnumerable<ElasticsearchOutput> ElasticsearchOutputs
private List<ElasticsearchOutputParameters> _elasticsearchOutputs = new List<ElasticsearchOutputParameters>();
public IEnumerable<ElasticsearchOutputParameters> ElasticsearchOutputs
{
get { return _elasticsearchOutputs; }
}
private List<StdoutOutput> _stdoutOutputs = new List<StdoutOutput>();
public IEnumerable<StdoutOutput> StdoutOutputs
private List<StdoutOutputParameters> _stdoutOutputs = new List<StdoutOutputParameters>();
public IEnumerable<StdoutOutputParameters> StdoutOutputs
{
get { return _stdoutOutputs; }
}
private List<Tcp> _tcps = new List<Tcp>();
public IEnumerable<Tcp> Tcps
private List<TcpParameters> _tcps = new List<TcpParameters>();
public IEnumerable<TcpParameters> Tcps
{
get { return _tcps; }
}
private List<Udp> _udps = new List<Udp>();
public IEnumerable<Udp> Udps
private List<UdpParameters> _udps = new List<UdpParameters>();
public IEnumerable<UdpParameters> Udps
{
get { return _udps; }
}
private List<Log> _logs = new List<Log>();
public IEnumerable<Log> Logs
private List<LogParameters> _logs = new List<LogParameters>();
public IEnumerable<LogParameters> Logs
{
get { return _logs; }
}
@@ -83,16 +82,16 @@ namespace TimberWinR
get { return _tails; }
}
private List<IISW3CLog> _iisw3clogs = new List<IISW3CLog>();
private List<IISW3CLogParameters> _iisw3clogs = new List<IISW3CLogParameters>();
public IEnumerable<IISW3CLog> IISW3C
public IEnumerable<IISW3CLogParameters> IISW3C
{
get { return _iisw3clogs; }
}
private List<W3CLog> _w3clogs = new List<W3CLog>();
private List<W3CLogParameters> _w3clogs = new List<W3CLogParameters>();
public IEnumerable<W3CLog> W3C
public IEnumerable<W3CLogParameters> W3C
{
get { return _w3clogs; }
}
@@ -177,8 +176,7 @@ namespace TimberWinR
}
private void ShutdownDirectoryMonitor()
{
_stopService = true;
{
_dirWatcher.EnableRaisingEvents = false;
LogManager.GetCurrentClassLogger().Info("Stopping Directory Monitor");
}
@@ -291,13 +289,13 @@ namespace TimberWinR
{
_filters = new List<LogstashFilter>();
_events = new List<WindowsEvent>();
_iisw3clogs = new List<IISW3CLog>();
_logs = new List<Log>();
_redisOutputs = new List<RedisOutput>();
_elasticsearchOutputs = new List<ElasticsearchOutput>();
_stdoutOutputs = new List<StdoutOutput>();
_tcps = new List<Tcp>();
_udps = new List<Udp>();
_iisw3clogs = new List<IISW3CLogParameters>();
_logs = new List<LogParameters>();
_redisOutputs = new List<RedisOutputParameters>();
_elasticsearchOutputs = new List<ElasticsearchOutputParameters>();
_stdoutOutputs = new List<StdoutOutputParameters>();
_tcps = new List<TcpParameters>();
_udps = new List<UdpParameters>();
}
public static Object GetPropValue(String name, Object obj)

View File

@@ -16,12 +16,12 @@ namespace TimberWinR.Inputs
public class IISW3CInputListener : InputListener
{
private readonly int _pollingIntervalInSeconds;
private readonly Parser.IISW3CLog _arguments;
private readonly Parser.IISW3CLogParameters _arguments;
private long _receivedMessages;
public bool Stop { get; set; }
private IisW3CRowReader rowReader;
public IISW3CInputListener(Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
public IISW3CInputListener(Parser.IISW3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
: base(cancelToken, "Win32-IISLog")
{
_arguments = arguments;
@@ -138,7 +138,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -28,7 +28,7 @@ namespace TimberWinR.Inputs
public class LogsListener : InputListener
{
private int _pollingIntervalInSeconds;
private TimberWinR.Parser.Log _arguments;
private TimberWinR.Parser.LogParameters _arguments;
private long _receivedMessages;
private Dictionary<string, Int64> _logFileMaxRecords;
private Dictionary<string, DateTime> _logFileCreationTimes;
@@ -39,7 +39,7 @@ namespace TimberWinR.Inputs
public bool Stop { get; set; }
public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken)
public LogsListener(TimberWinR.Parser.LogParameters arguments, CancellationToken cancelToken)
: base(cancelToken, "Win32-FileLog")
{
Stop = false;
@@ -328,7 +328,7 @@ namespace TimberWinR.Inputs
_fnfmap[fn] = fn;
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}
@@ -345,7 +345,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
}
catch (Exception ex1)

View File

@@ -296,7 +296,7 @@ namespace TimberWinR.Inputs
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
_fnfmap[fn] = fn;
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -13,8 +13,8 @@ namespace TimberWinR.Inputs
public class UdpInputListener : InputListener
{
private readonly System.Net.Sockets.UdpClient _udpListener;
private IPEndPoint groupV4;
private IPEndPoint groupV6;
private readonly IPEndPoint groupV4;
private readonly IPEndPoint groupV6;
private Thread _listenThreadV4;
private Thread _listenThreadV6;
@@ -47,6 +47,9 @@ namespace TimberWinR.Inputs
{
_port = port;
groupV4 = new IPEndPoint(IPAddress.Any, 0);
groupV6 = new IPEndPoint(IPAddress.IPv6Any, 0);
LogManager.GetCurrentClassLogger().Info("Udp Input on Port {0} Ready", _port);
_receivedMessages = 0;

View File

@@ -23,11 +23,11 @@ namespace TimberWinR.Inputs
public class W3CInputListener : InputListener
{
private readonly int _pollingIntervalInSeconds;
private readonly TimberWinR.Parser.W3CLog _arguments;
private readonly TimberWinR.Parser.W3CLogParameters _arguments;
private long _receivedMessages;
public bool Stop { get; set; }
public W3CInputListener(TimberWinR.Parser.W3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
public W3CInputListener(TimberWinR.Parser.W3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
: base(cancelToken, "Win32-W3CLog")
{
_arguments = arguments;
@@ -153,7 +153,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -165,7 +165,7 @@ namespace TimberWinR.Inputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -179,7 +179,7 @@ namespace TimberWinR
}
}
foreach (Parser.IISW3CLog iisw3cConfig in config.IISW3C)
foreach (Parser.IISW3CLogParameters iisw3cConfig in config.IISW3C)
{
var elistner = new IISW3CInputListener(iisw3cConfig, cancelToken);
Listeners.Add(elistner);
@@ -187,7 +187,7 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (Parser.W3CLog iisw3cConfig in config.W3C)
foreach (Parser.W3CLogParameters iisw3cConfig in config.W3C)
{
var elistner = new W3CInputListener(iisw3cConfig, cancelToken);
Listeners.Add(elistner);

View File

@@ -27,10 +27,10 @@ namespace TimberWinR.Outputs
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private Parser.ElasticsearchOutput eo;
private Parser.ElasticsearchOutputParameters eo;
public bool Stop { get; set; }
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken)
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutputParameters eo, CancellationToken cancelToken)
: base(cancelToken, "Elasticsearch")
{
_sentMessages = 0;
@@ -60,8 +60,8 @@ namespace TimberWinR.Outputs
new JObject(
new JProperty("host", string.Join(",", _host)),
new JProperty("errors", _errorCount),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
new JProperty("sentMmessageCount", _sentMessages),
new JProperty("queuedMessageCount", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
@@ -129,7 +129,7 @@ namespace TimberWinR.Outputs
if (response.StatusCode != HttpStatusCode.Created)
{
LogManager.GetCurrentClassLogger()
.Error("Failed to send: {0}", response.ErrorMessage);
.Error("Failed to send: {0}, code: {1}, descr: {2}, resp: {3}", response.ErrorMessage, response.StatusCode, response.StatusDescription, response.ResponseStatus);
Interlocked.Increment(ref _errorCount);
}
else
@@ -169,7 +169,7 @@ namespace TimberWinR.Outputs
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Eventing.Reader;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Sockets;
@@ -17,32 +18,126 @@ using TimberWinR.Parser;
namespace TimberWinR.Outputs
{
internal class BatchCounter
{
// Total number of times reached max batch count (indicates we are under pressure)
public int ReachedMaxBatchCountTimes { get; set; }
private readonly int[] _sampleQueueDepths;
private int _sampleCountIndex;
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
private object _locker = new object();
private bool _warnedReachedMax;
private readonly int _maxBatchCount;
private readonly int _batchCount;
private int _totalSamples;
public int[] Samples()
{
return _sampleQueueDepths;
}
public BatchCounter(int batchCount, int maxBatchCount)
{
_batchCount = batchCount;
_maxBatchCount = maxBatchCount;
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
_sampleCountIndex = 0;
_totalSamples = 0;
ReachedMaxBatchCountTimes = 0;
}
public void SampleQueueDepth(int queueDepth)
{
lock (_locker)
{
if (_totalSamples < QUEUE_SAMPLE_SIZE)
_totalSamples++;
// Take a sample of the queue depth
if (_sampleCountIndex >= QUEUE_SAMPLE_SIZE)
_sampleCountIndex = 0;
_sampleQueueDepths[_sampleCountIndex++] = queueDepth;
}
}
public int AverageQueueDepth()
{
lock (_locker)
{
if (_totalSamples > 0)
{
var samples = _sampleQueueDepths.Take(_totalSamples);
int avg = (int) samples.Average();
return avg;
}
return 0;
}
}
// Sample the queue and adjust the batch count if needed (ramp up slowly)
public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount)
{
if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount)
{
currentBatchCount += Math.Max(_maxBatchCount/_batchCount, 1);
if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
{
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount);
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
ReachedMaxBatchCountTimes++;
currentBatchCount = _maxBatchCount;
}
}
else // Reset to default
{
currentBatchCount = _batchCount;
_warnedReachedMax = false;
}
return currentBatchCount;
}
}
public class RedisOutput : OutputSender
{
public int QueueDepth
{
get { return _jsonQueue.Count; }
}
public long SentMessages
{
get { return _sentMessages; }
}
private readonly string _logstashIndexName;
private readonly int _port;
private readonly int _timeout;
private readonly object _locker = new object();
private readonly List<string> _jsonQueue;
// readonly Task _consumerTask;
private readonly List<string> _jsonQueue;
private readonly string[] _redisHosts;
private int _redisHostIndex;
private TimberWinR.Manager _manager;
private readonly int _batchCount;
private int _currentBatchCount;
private readonly int _maxBatchCount;
private readonly int _interval;
private readonly int _numThreads;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private long _redisDepth;
private int _maxQueueSize;
private bool _queueOverflowDiscardOldest;
private DateTime? _lastErrorTimeUTC;
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
private BatchCounter _batchCounter;
public bool Stop { get; set; }
/// <summary>
/// Get the next client
/// Get the next client from the list of hosts.
/// </summary>
/// <returns></returns>
private RedisClient getClient()
@@ -56,16 +151,17 @@ namespace TimberWinR.Outputs
try
{
RedisClient client = new RedisClient(_redisHosts[_redisHostIndex], _port, _timeout);
_redisHostIndex++;
if (_redisHostIndex >= _redisHosts.Length)
_redisHostIndex = 0;
return client;
}
catch (Exception)
{
}
finally
{
_redisHostIndex++;
if (_redisHostIndex >= _redisHosts.Length)
_redisHostIndex = 0;
}
numTries++;
}
@@ -79,15 +175,21 @@ namespace TimberWinR.Outputs
new JObject(
new JProperty("host", string.Join(",", _redisHosts)),
new JProperty("errors", _errorCount),
new JProperty("redis_depth", _redisDepth),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
new JProperty("lastErrorTimeUTC", _lastErrorTimeUTC),
new JProperty("redisQueueDepth", _redisDepth),
new JProperty("sentMessageCount", _sentMessages),
new JProperty("queuedMessageCount", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("maxQueueSize", _maxQueueSize),
new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("batchcount", _batchCount),
new JProperty("currentBatchCount", _currentBatchCount),
new JProperty("reachedMaxBatchCountTimes", _batchCounter.ReachedMaxBatchCountTimes),
new JProperty("maxBatchCount", _maxBatchCount),
new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()),
new JProperty("queueSamples", new JArray(_batchCounter.Samples())),
new JProperty("index", _logstashIndexName),
new JProperty("hosts",
new JArray(
@@ -97,25 +199,33 @@ namespace TimberWinR.Outputs
return json;
}
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutputParameters parameters, CancellationToken cancelToken)
: base(cancelToken, "Redis")
{
{
_redisDepth = 0;
_batchCount = ro.BatchCount;
_batchCount = parameters.BatchCount;
_maxBatchCount = parameters.MaxBatchCount;
// Make sure maxBatchCount is larger than batchCount
if (_maxBatchCount < _batchCount)
_maxBatchCount = _batchCount*10;
_manager = manager;
_redisHostIndex = 0;
_redisHosts = ro.Host;
_redisHosts = parameters.Host;
_jsonQueue = new List<string>();
_port = ro.Port;
_timeout = ro.Timeout;
_logstashIndexName = ro.Index;
_interval = ro.Interval;
_numThreads = ro.NumThreads;
_port = parameters.Port;
_timeout = parameters.Timeout;
_logstashIndexName = parameters.Index;
_interval = parameters.Interval;
_numThreads = parameters.NumThreads;
_errorCount = 0;
_maxQueueSize = ro.MaxQueueSize;
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
for (int i = 0; i < ro.NumThreads; i++)
_lastErrorTimeUTC = null;
_maxQueueSize = parameters.MaxQueueSize;
_queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest;
_batchCounter = new BatchCounter(_batchCount, _maxBatchCount);
_currentBatchCount = _batchCount;
for (int i = 0; i < parameters.NumThreads; i++)
{
var redisThread = new Task(RedisSender, cancelToken);
redisThread.Start();
@@ -181,8 +291,7 @@ namespace TimberWinR.Outputs
}
}
return drop;
}
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
@@ -198,17 +307,23 @@ namespace TimberWinR.Outputs
try
{
string[] messages;
// Exclusively
lock (_locker)
{
messages = _jsonQueue.Take(_batchCount).ToArray();
_batchCounter.SampleQueueDepth(_jsonQueue.Count);
// Re-compute current batch size
_currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount);
messages = _jsonQueue.Take(_currentBatchCount).ToArray();
_jsonQueue.RemoveRange(0, messages.Length);
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
}
if (messages.Length > 0)
{
int numHosts = _redisHosts.Length;
bool sentSuccessfully = false;
while (numHosts-- > 0)
{
try
@@ -225,17 +340,24 @@ namespace TimberWinR.Outputs
try
{
_redisDepth = client.RPush(_logstashIndexName, messages);
_sentMessages += messages.Length;
_sentMessages += messages.Length;
client.EndPipe();
sentSuccessfully = true;
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
}
catch (SocketException ex)
{
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTimeUTC = DateTime.UtcNow;
}
finally
catch (Exception ex)
{
client.EndPipe();
}
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTimeUTC = DateTime.UtcNow;
}
break;
}
else
@@ -244,6 +366,7 @@ namespace TimberWinR.Outputs
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Redis hosts, {0}",
String.Join(",", _redisHosts));
_lastErrorTimeUTC = DateTime.UtcNow;
}
}
}
@@ -251,25 +374,40 @@ namespace TimberWinR.Outputs
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTimeUTC = DateTime.UtcNow;
}
} // No more hosts to try.
if (!sentSuccessfully)
{
lock (_locker)
{
_jsonQueue.InsertRange(0, messages);
}
}
}
GC.Collect();
// GC.Collect();
if (!Stop)
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}
catch(ThreadAbortException)
{
break;
}
catch (Exception ex)
{
throw;
_lastErrorTimeUTC = DateTime.UtcNow;
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
}
}
}
}
}

View File

@@ -17,7 +17,7 @@ namespace TimberWinR.Outputs
private long _sentMessages;
public bool Stop { get; set; }
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken)
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutputParameters eo, CancellationToken cancelToken)
: base(cancelToken, "Stdout")
{
_sentMessages = 0;
@@ -34,7 +34,7 @@ namespace TimberWinR.Outputs
JObject json = new JObject(
new JProperty("stdout",
new JObject(
new JProperty("sent_messages", _sentMessages))));
new JProperty("sentMessageCount", _sentMessages))));
return json;
}
@@ -78,7 +78,7 @@ namespace TimberWinR.Outputs
if (!Stop)
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
catch (OperationCanceledException oce)
catch (OperationCanceledException)
{
break;
}

View File

@@ -327,7 +327,7 @@ namespace TimberWinR.Parser
}
}
public class Log : IValidateSchema
public class LogParameters : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
@@ -346,7 +346,7 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "codec")]
public Codec Codec { get; set; }
public Log()
public LogParameters()
{
Fields = new List<Field>();
Fields.Add(new Field("LogFilename", "string"));
@@ -361,12 +361,12 @@ namespace TimberWinR.Parser
}
}
public class Tcp : IValidateSchema
public class TcpParameters : IValidateSchema
{
[JsonProperty(PropertyName = "port")]
public int Port { get; set; }
public Tcp()
public TcpParameters()
{
Port = 5140;
}
@@ -378,12 +378,12 @@ namespace TimberWinR.Parser
}
public class Udp : IValidateSchema
public class UdpParameters : IValidateSchema
{
[JsonProperty(PropertyName = "port")]
public int Port { get; set; }
public Udp()
public UdpParameters()
{
Port = 5142;
}
@@ -393,7 +393,7 @@ namespace TimberWinR.Parser
}
}
public class W3CLog : IValidateSchema
public class W3CLogParameters : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
@@ -410,7 +410,7 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "fields")]
public List<Field> Fields { get; set; }
public W3CLog()
public W3CLogParameters()
{
CodePage = 0;
DtLines = 10;
@@ -428,7 +428,7 @@ namespace TimberWinR.Parser
}
public class IISW3CLog : IValidateSchema
public class IISW3CLogParameters : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
@@ -448,7 +448,7 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "fields")]
public List<Field> Fields { get; set; }
public IISW3CLog()
public IISW3CLogParameters()
{
CodePage = -2;
Recurse = 0;
@@ -494,7 +494,7 @@ namespace TimberWinR.Parser
}
}
public class ElasticsearchOutput
public class ElasticsearchOutputParameters
{
const string IndexDatePattern = "(%\\{(?<format>[^\\}]+)\\})";
@@ -513,7 +513,7 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
public ElasticsearchOutput()
public ElasticsearchOutputParameters()
{
Protocol = "http";
Port = 9200;
@@ -564,7 +564,7 @@ namespace TimberWinR.Parser
}
public class RedisOutput
public class RedisOutputParameters
{
[JsonProperty(PropertyName = "host")]
public string[] Host { get; set; }
@@ -576,6 +576,8 @@ namespace TimberWinR.Parser
public int Timeout { get; set; }
[JsonProperty(PropertyName = "batch_count")]
public int BatchCount { get; set; }
[JsonProperty(PropertyName = "max_batch_count")]
public int MaxBatchCount { get; set; }
[JsonProperty(PropertyName = "threads")]
public int NumThreads { get; set; }
[JsonProperty(PropertyName = "interval")]
@@ -585,13 +587,14 @@ namespace TimberWinR.Parser
[JsonProperty(PropertyName = "queue_overflow_discard_oldest")]
public bool QueueOverflowDiscardOldest { get; set; }
public RedisOutput()
public RedisOutputParameters()
{
Port = 6379;
Index = "logstash";
Host = new string[] { "localhost" };
Timeout = 10000;
BatchCount = 10;
MaxBatchCount = BatchCount*10;
NumThreads = 1;
Interval = 5000;
QueueOverflowDiscardOldest = true;
@@ -599,12 +602,12 @@ namespace TimberWinR.Parser
}
}
public class StdoutOutput
public class StdoutOutputParameters
{
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
public StdoutOutput()
public StdoutOutputParameters()
{
Interval = 1000;
}
@@ -613,13 +616,13 @@ namespace TimberWinR.Parser
public class OutputTargets
{
[JsonProperty("Redis")]
public RedisOutput[] Redis { get; set; }
public RedisOutputParameters[] Redis { get; set; }
[JsonProperty("Elasticsearch")]
public ElasticsearchOutput[] Elasticsearch { get; set; }
public ElasticsearchOutputParameters[] Elasticsearch { get; set; }
[JsonProperty("Stdout")]
public StdoutOutput[] Stdout { get; set; }
public StdoutOutputParameters[] Stdout { get; set; }
}
public class InputSources
@@ -628,22 +631,22 @@ namespace TimberWinR.Parser
public WindowsEvent[] WindowsEvents { get; set; }
[JsonProperty("Logs")]
public Log[] Logs { get; set; }
public LogParameters[] Logs { get; set; }
[JsonProperty("TailFiles")]
public TailFile[] TailFiles { get; set; }
[JsonProperty("Tcp")]
public Tcp[] Tcps { get; set; }
public TcpParameters[] Tcps { get; set; }
[JsonProperty("Udp")]
public Udp[] Udps { get; set; }
public UdpParameters[] Udps { get; set; }
[JsonProperty("IISW3CLogs")]
public IISW3CLog[] IISW3CLogs { get; set; }
public IISW3CLogParameters[] IISW3CLogs { get; set; }
[JsonProperty("W3CLogs")]
public W3CLog[] W3CLogs { get; set; }
public W3CLogParameters[] W3CLogs { get; set; }
[JsonProperty("Stdin")]
public Stdin[] Stdins { get; set; }

View File

@@ -3,6 +3,10 @@
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
Version History
### 1.3.19.1 - 03/03/2015
1. Added new Redis parameter _max\_batch\_count_ which increases the _batch\_count_ dynamically over time
to handle input flooding. Default is _batch\_count_ * 10
### 1.3.19.0 - 02/26/2015
1. Added support for Multiline codecs for Stdin and Logs listeners, closes issue [#23](https://github.com/Cimpress-MCP/TimberWinR/issues/23)

View File

@@ -9,6 +9,7 @@ The following parameters are allowed when configuring the Redis output.
| :-------------|:---------|:------------------------------------------------------------| :--------------------------- | :-- |
| *threads* | string | Location of log files(s) to monitor | Number of worker theads to send messages | 1 |
| *batch_count* | integer | Sent as a single message | Number of messages to aggregate | 10 |
| *max_batch_count* | integer | Dynamically adjusted count maximum | Increases over time | batch_count*10 |
| *interval* | integer | Interval in milliseconds to sleep during batch sends | Interval | 5000 |
| *index* | string | The name of the redis list | logstash index name | logstash |
| *host* | [string] | The hostname(s) of your Redis server(s) | IP or DNS name | |

View File

@@ -1,7 +1,7 @@
$packageName = 'TimberWinR-${version}' # arbitrary name for the package, used in messages
$installerType = 'msi' #only one of these: exe, msi, msu
$url = 'http://www.ericfontana.com/TimberWinR/TimberWinR-${version}.0.msi' # download url
$silentArgs = '{593EF0C4-54E0-40D5-A3E3-922CD1C25B9E} /quiet'
$silentArgs = '${PROJECTGUID} /quiet'
$validExitCodes = @(0) #please insert other valid exit codes here, exit codes for ms http://msdn.microsoft.com/en-us/library/aa368542(VS.85).aspx
UnInstall-ChocolateyPackage "$packageName" "$installerType" "$silentArgs" "$url" -validExitCodes $validExitCodes