diff --git a/TimberWinR.UnitTests/FakeRediServer.cs b/TimberWinR.UnitTests/FakeRediServer.cs new file mode 100644 index 0000000..a69ce1f --- /dev/null +++ b/TimberWinR.UnitTests/FakeRediServer.cs @@ -0,0 +1,128 @@ +using System; +using System.IO; +using System.Threading; +using System.Net; +using System.Net.Sockets; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using NUnit.Framework; +using TimberWinR.Parser; +using System.Text; +using System.Collections.Generic; + +namespace TimberWinR.UnitTests +{ + // Class which implements a Fake redis server for test purposes. + class FakeRediServer + { + private readonly System.Net.Sockets.TcpListener _tcpListenerV4; + private readonly System.Net.Sockets.TcpListener _tcpListenerV6; + private Thread _listenThreadV4; + private Thread _listenThreadV6; + private readonly int _port; + private CancellationToken _cancelToken; + private bool _shutdown; + public event Action OnMessageRecieved; + + + public FakeRediServer(CancellationToken cancelToken, int port = 6379) + { + _port = port; + _cancelToken = cancelToken; + _shutdown = false; + + _tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port); + _tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port); + + _listenThreadV4 = new Thread(new ParameterizedThreadStart(ListenForClients)); + _listenThreadV4.Start(_tcpListenerV4); + + _listenThreadV6 = new Thread(new ParameterizedThreadStart(ListenForClients)); + _listenThreadV6.Start(_tcpListenerV6); + } + + public void Shutdown() + { + _shutdown = true; + this._tcpListenerV4.Stop(); + this._tcpListenerV6.Stop(); + } + + + private void ListenForClients(object olistener) + { + System.Net.Sockets.TcpListener listener = olistener as System.Net.Sockets.TcpListener; + + listener.Start(); + + + while (!_cancelToken.IsCancellationRequested && !_shutdown) + { + try + { + //blocks until a client has connected to the server + TcpClient client = listener.AcceptTcpClient(); + + // Wait for a client, spin up a thread. + var clientThread = new Thread(new ParameterizedThreadStart(HandleNewClient)); + clientThread.Start(client); + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.Interrupted) + break; + } + } + } + + private void HandleNewClient(object client) + { + var tcpClient = (TcpClient)client; + + try + { + NetworkStream clientStream = tcpClient.GetStream(); + int i; + Byte[] bytes = new Byte[16535]; + String data = null; + + do + { + try + { + // Loop to receive all the data sent by the client. + while ((i = clientStream.Read(bytes, 0, bytes.Length)) != 0) + { + // Translate data bytes to a ASCII string. + data = System.Text.Encoding.ASCII.GetString(bytes, 0, i); + //System.Diagnostics.Debug.WriteLine(String.Format("Received: {0}", data)); + + // Process the data sent by the client. + data = ":1000\r\n"; + + byte[] msg = System.Text.Encoding.ASCII.GetBytes(data); + + // Send back a response. + clientStream.Write(msg, 0, msg.Length); + // System.Diagnostics.Debug.WriteLine(String.Format("Sent: {0}", data)); + } + } + catch (IOException ioex) + { + } + } while (true); + } + catch (Exception ex) + { + System.Diagnostics.Debug.WriteLine(ex.ToString()); + } + tcpClient.Close(); + } + + private void ProcessJson(JObject json) + { + Console.WriteLine(json.ToString()); + } + + } +} diff --git a/TimberWinR.UnitTests/TestDynamicBatchCount.cs b/TimberWinR.UnitTests/TestDynamicBatchCount.cs new file mode 100644 index 0000000..ae449a1 --- /dev/null +++ b/TimberWinR.UnitTests/TestDynamicBatchCount.cs @@ -0,0 +1,89 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Newtonsoft.Json; +using NUnit.Framework; +using TimberWinR.Parser; +using Newtonsoft.Json.Linq; +using System.Threading; + + +namespace TimberWinR.UnitTests +{ + [TestFixture] + public class TestDynamicBatchCount + { + [Test] + public void TestDynamicBatch() + { + var mgr = new Manager(); + mgr.LogfileDir = "."; + + mgr.Config = new Configuration(); + + CancellationTokenSource cancelTokenSource = new CancellationTokenSource(); + + var cancelToken = cancelTokenSource.Token; + + FakeRediServer fr = new FakeRediServer(cancelToken); + + var redisParams = new RedisOutput(); + redisParams.BatchCount = 10; + redisParams.MaxBatchCount = 40; + redisParams.Interval = 100; + + var redisOutput = new Outputs.RedisOutput(mgr, redisParams, cancelToken); + + + // Message is irrelavant + JObject jsonMessage = new JObject + { + {"type", "Win32-FileLog"}, + {"ComputerName", "dev.vistaprint.net"}, + {"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"} + }; + + + // Send 1000 messages at max throttle + for (int i = 0; i < 1000; i++) + { + Thread.Sleep(10); + redisOutput.Startup(jsonMessage); + } + + while (redisOutput.SentMessages < 1000) + { + System.Diagnostics.Debug.WriteLine(redisOutput.SentMessages); + Thread.Sleep(1000); + } + + fr.Shutdown(); + + cancelTokenSource.Cancel(); + + System.Diagnostics.Debug.WriteLine(redisOutput.ToJson()); + System.Diagnostics.Debug.WriteLine(redisOutput.QueueDepth); + + JObject json = redisOutput.ToJson(); + var mbc = json["redis"]["reachedMaxBatchCount"].Value(); + var sm = json["redis"]["sent_messages"].Value(); + var errs = json["redis"]["errors"].Value(); + var cbc = json["redis"]["currentBatchCount"].Value(); + + // No errors + Assert.AreEqual(0, errs); + + // Should have reached max at least 1 time + Assert.GreaterOrEqual(mbc, 1); + + // Should have sent 1000 messages + Assert.AreEqual(1000, sm); + + // Should reset back down to original + Assert.AreEqual(cbc, 10); + } + } +} diff --git a/TimberWinR.UnitTests/TimberWinR.UnitTests.csproj b/TimberWinR.UnitTests/TimberWinR.UnitTests.csproj index 577242f..b927454 100644 --- a/TimberWinR.UnitTests/TimberWinR.UnitTests.csproj +++ b/TimberWinR.UnitTests/TimberWinR.UnitTests.csproj @@ -60,6 +60,7 @@ + @@ -69,6 +70,7 @@ + diff --git a/TimberWinR/Outputs/Elasticsearch.cs b/TimberWinR/Outputs/Elasticsearch.cs index 50a1c53..ceaf446 100644 --- a/TimberWinR/Outputs/Elasticsearch.cs +++ b/TimberWinR/Outputs/Elasticsearch.cs @@ -129,7 +129,7 @@ namespace TimberWinR.Outputs if (response.StatusCode != HttpStatusCode.Created) { LogManager.GetCurrentClassLogger() - .Error("Failed to send: {0}", response.ErrorMessage); + .Error("Failed to send: {0}, code: {1}, descr: {2}, resp: {3}", response.ErrorMessage, response.StatusCode, response.StatusDescription, response.ResponseStatus); Interlocked.Increment(ref _errorCount); } else diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index 8fcf8a2..cbee023 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -20,11 +20,14 @@ namespace TimberWinR.Outputs { internal class BatchCounter { + public int ReachedMaxBatchCount { get; set; } + private readonly int[] _sampleQueueDepths; private int _sampleCountIndex; private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default) private object _locker = new object(); private bool _warnedReachedMax; + private readonly int _maxBatchCount; private readonly int _batchCount; private int _totalSamples; @@ -41,6 +44,7 @@ namespace TimberWinR.Outputs _sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE]; _sampleCountIndex = 0; _totalSamples = 0; + ReachedMaxBatchCount = 0; } public void SampleQueueDepth(int queueDepth) { @@ -61,21 +65,28 @@ namespace TimberWinR.Outputs { lock (_locker) { - var samples = _sampleQueueDepths.Take(_totalSamples); - return (int) samples.Average(); + if (_totalSamples > 0) + { + var samples = _sampleQueueDepths.Take(_totalSamples); + int avg = (int) samples.Average(); + return avg; + } + return 0; } } - + // Sample the queue and adjust the batch count if needed (ramp up slowly) public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount) { if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount) { - currentBatchCount += _maxBatchCount / QUEUE_SAMPLE_SIZE; + currentBatchCount += Math.Max(_maxBatchCount/_batchCount, 1); if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax) { LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount); _warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset) + ReachedMaxBatchCount++; + currentBatchCount = _maxBatchCount; } } else // Reset to default @@ -91,6 +102,16 @@ namespace TimberWinR.Outputs public class RedisOutput : OutputSender { + public int QueueDepth + { + get { return _jsonQueue.Count; } + } + + public long SentMessages + { + get { return _sentMessages; } + } + private readonly string _logstashIndexName; private readonly int _port; private readonly int _timeout; @@ -107,10 +128,10 @@ namespace TimberWinR.Outputs private long _sentMessages; private long _errorCount; private long _redisDepth; - private DateTime? _lastErrorTime; + private DateTime? _lastErrorTimeUTC; private readonly int _maxQueueSize; private readonly bool _queueOverflowDiscardOldest; - private bool _warnedReachedMax; + private bool _warnedReachedMax; private BatchCounter _batchCounter; public bool Stop { get; set; } @@ -154,7 +175,7 @@ namespace TimberWinR.Outputs new JObject( new JProperty("host", string.Join(",", _redisHosts)), new JProperty("errors", _errorCount), - new JProperty("lastErrorTime", _lastErrorTime), + new JProperty("lastErrorTimeUTC", _lastErrorTimeUTC), new JProperty("redis_depth", _redisDepth), new JProperty("sent_messages", _sentMessages), new JProperty("queued_messages", _jsonQueue.Count), @@ -165,6 +186,7 @@ namespace TimberWinR.Outputs new JProperty("threads", _numThreads), new JProperty("batchcount", _batchCount), new JProperty("currentBatchCount", _currentBatchCount), + new JProperty("reachedMaxBatchCount", _batchCounter.ReachedMaxBatchCount), new JProperty("maxBatchCount", _maxBatchCount), new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()), new JProperty("queueSamples", new JArray(_batchCounter.Samples())), @@ -186,8 +208,8 @@ namespace TimberWinR.Outputs _maxBatchCount = ro.MaxBatchCount; // Make sure maxBatchCount is larger than batchCount if (_maxBatchCount < _batchCount) - _maxBatchCount = _batchCount*10; - + _maxBatchCount = _batchCount*10; + _manager = manager; _redisHostIndex = 0; _redisHosts = ro.Host; @@ -198,7 +220,7 @@ namespace TimberWinR.Outputs _interval = ro.Interval; _numThreads = ro.NumThreads; _errorCount = 0; - _lastErrorTime = null; + _lastErrorTimeUTC = null; _maxQueueSize = ro.MaxQueueSize; _queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest; _batchCounter = new BatchCounter(_batchCount, _maxBatchCount); @@ -328,13 +350,13 @@ namespace TimberWinR.Outputs { LogManager.GetCurrentClassLogger().Warn(ex); Interlocked.Increment(ref _errorCount); - _lastErrorTime = DateTime.UtcNow; + _lastErrorTimeUTC = DateTime.UtcNow; } catch (Exception ex) { LogManager.GetCurrentClassLogger().Error(ex); Interlocked.Increment(ref _errorCount); - _lastErrorTime = DateTime.UtcNow; + _lastErrorTimeUTC = DateTime.UtcNow; } break; } @@ -344,7 +366,7 @@ namespace TimberWinR.Outputs LogManager.GetCurrentClassLogger() .Fatal("Unable to connect with any Redis hosts, {0}", String.Join(",", _redisHosts)); - _lastErrorTime = DateTime.UtcNow; + _lastErrorTimeUTC = DateTime.UtcNow; } } } @@ -352,7 +374,7 @@ namespace TimberWinR.Outputs { LogManager.GetCurrentClassLogger().Error(ex); Interlocked.Increment(ref _errorCount); - _lastErrorTime = DateTime.UtcNow; + _lastErrorTimeUTC = DateTime.UtcNow; } } // No more hosts to try. @@ -365,7 +387,7 @@ namespace TimberWinR.Outputs } } } - GC.Collect(); + // GC.Collect(); if (!Stop) syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken); } @@ -373,9 +395,13 @@ namespace TimberWinR.Outputs { break; } + catch(ThreadAbortException tex) + { + break; + } catch (Exception ex) { - _lastErrorTime = DateTime.UtcNow; + _lastErrorTimeUTC = DateTime.UtcNow; Interlocked.Increment(ref _errorCount); LogManager.GetCurrentClassLogger().Error(ex); }