Added Unit tests for batchCount and enhanced Elasticsearch error reporting a bit.

This commit is contained in:
Eric Fontana
2015-03-04 09:36:01 -05:00
parent d7fa582191
commit b7095471fb
5 changed files with 262 additions and 17 deletions

View File

@@ -0,0 +1,128 @@
using System;
using System.IO;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using TimberWinR.Parser;
using System.Text;
using System.Collections.Generic;
namespace TimberWinR.UnitTests
{
// Class which implements a Fake redis server for test purposes.
class FakeRediServer
{
private readonly System.Net.Sockets.TcpListener _tcpListenerV4;
private readonly System.Net.Sockets.TcpListener _tcpListenerV6;
private Thread _listenThreadV4;
private Thread _listenThreadV6;
private readonly int _port;
private CancellationToken _cancelToken;
private bool _shutdown;
public event Action<JObject> OnMessageRecieved;
public FakeRediServer(CancellationToken cancelToken, int port = 6379)
{
_port = port;
_cancelToken = cancelToken;
_shutdown = false;
_tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port);
_tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port);
_listenThreadV4 = new Thread(new ParameterizedThreadStart(ListenForClients));
_listenThreadV4.Start(_tcpListenerV4);
_listenThreadV6 = new Thread(new ParameterizedThreadStart(ListenForClients));
_listenThreadV6.Start(_tcpListenerV6);
}
public void Shutdown()
{
_shutdown = true;
this._tcpListenerV4.Stop();
this._tcpListenerV6.Stop();
}
private void ListenForClients(object olistener)
{
System.Net.Sockets.TcpListener listener = olistener as System.Net.Sockets.TcpListener;
listener.Start();
while (!_cancelToken.IsCancellationRequested && !_shutdown)
{
try
{
//blocks until a client has connected to the server
TcpClient client = listener.AcceptTcpClient();
// Wait for a client, spin up a thread.
var clientThread = new Thread(new ParameterizedThreadStart(HandleNewClient));
clientThread.Start(client);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.Interrupted)
break;
}
}
}
private void HandleNewClient(object client)
{
var tcpClient = (TcpClient)client;
try
{
NetworkStream clientStream = tcpClient.GetStream();
int i;
Byte[] bytes = new Byte[16535];
String data = null;
do
{
try
{
// Loop to receive all the data sent by the client.
while ((i = clientStream.Read(bytes, 0, bytes.Length)) != 0)
{
// Translate data bytes to a ASCII string.
data = System.Text.Encoding.ASCII.GetString(bytes, 0, i);
//System.Diagnostics.Debug.WriteLine(String.Format("Received: {0}", data));
// Process the data sent by the client.
data = ":1000\r\n";
byte[] msg = System.Text.Encoding.ASCII.GetBytes(data);
// Send back a response.
clientStream.Write(msg, 0, msg.Length);
// System.Diagnostics.Debug.WriteLine(String.Format("Sent: {0}", data));
}
}
catch (IOException ioex)
{
}
} while (true);
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine(ex.ToString());
}
tcpClient.Close();
}
private void ProcessJson(JObject json)
{
Console.WriteLine(json.ToString());
}
}
}

View File

@@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using NUnit.Framework;
using TimberWinR.Parser;
using Newtonsoft.Json.Linq;
using System.Threading;
namespace TimberWinR.UnitTests
{
[TestFixture]
public class TestDynamicBatchCount
{
[Test]
public void TestDynamicBatch()
{
var mgr = new Manager();
mgr.LogfileDir = ".";
mgr.Config = new Configuration();
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
var cancelToken = cancelTokenSource.Token;
FakeRediServer fr = new FakeRediServer(cancelToken);
var redisParams = new RedisOutput();
redisParams.BatchCount = 10;
redisParams.MaxBatchCount = 40;
redisParams.Interval = 100;
var redisOutput = new Outputs.RedisOutput(mgr, redisParams, cancelToken);
// Message is irrelavant
JObject jsonMessage = new JObject
{
{"type", "Win32-FileLog"},
{"ComputerName", "dev.vistaprint.net"},
{"Text", "{\"Email\":\"james@example.com\",\"Active\":true,\"CreatedDate\":\"2013-01-20T00:00:00Z\",\"Roles\":[\"User\",\"Admin\"]}"}
};
// Send 1000 messages at max throttle
for (int i = 0; i < 1000; i++)
{
Thread.Sleep(10);
redisOutput.Startup(jsonMessage);
}
while (redisOutput.SentMessages < 1000)
{
System.Diagnostics.Debug.WriteLine(redisOutput.SentMessages);
Thread.Sleep(1000);
}
fr.Shutdown();
cancelTokenSource.Cancel();
System.Diagnostics.Debug.WriteLine(redisOutput.ToJson());
System.Diagnostics.Debug.WriteLine(redisOutput.QueueDepth);
JObject json = redisOutput.ToJson();
var mbc = json["redis"]["reachedMaxBatchCount"].Value<int>();
var sm = json["redis"]["sent_messages"].Value<int>();
var errs = json["redis"]["errors"].Value<int>();
var cbc = json["redis"]["currentBatchCount"].Value<int>();
// No errors
Assert.AreEqual(0, errs);
// Should have reached max at least 1 time
Assert.GreaterOrEqual(mbc, 1);
// Should have sent 1000 messages
Assert.AreEqual(1000, sm);
// Should reset back down to original
Assert.AreEqual(cbc, 10);
}
}
}

View File

@@ -60,6 +60,7 @@
<ItemGroup>
<Compile Include="Configuration.cs" />
<Compile Include="DateFilterTests.cs" />
<Compile Include="FakeRediServer.cs" />
<Compile Include="GeoIPFilterTests.cs" />
<Compile Include="Inputs\IisW3CRowReaderTests.cs" />
<Compile Include="JsonFilterTests.cs" />
@@ -69,6 +70,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TailFileTests.cs" />
<Compile Include="TestBase.cs" />
<Compile Include="TestDynamicBatchCount.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\TimberWinR\TimberWinR.csproj">

View File

@@ -129,7 +129,7 @@ namespace TimberWinR.Outputs
if (response.StatusCode != HttpStatusCode.Created)
{
LogManager.GetCurrentClassLogger()
.Error("Failed to send: {0}", response.ErrorMessage);
.Error("Failed to send: {0}, code: {1}, descr: {2}, resp: {3}", response.ErrorMessage, response.StatusCode, response.StatusDescription, response.ResponseStatus);
Interlocked.Increment(ref _errorCount);
}
else

View File

@@ -20,11 +20,14 @@ namespace TimberWinR.Outputs
{
internal class BatchCounter
{
public int ReachedMaxBatchCount { get; set; }
private readonly int[] _sampleQueueDepths;
private int _sampleCountIndex;
private const int QUEUE_SAMPLE_SIZE = 30; // 30 samples over 2.5 minutes (default)
private object _locker = new object();
private bool _warnedReachedMax;
private readonly int _maxBatchCount;
private readonly int _batchCount;
private int _totalSamples;
@@ -41,6 +44,7 @@ namespace TimberWinR.Outputs
_sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE];
_sampleCountIndex = 0;
_totalSamples = 0;
ReachedMaxBatchCount = 0;
}
public void SampleQueueDepth(int queueDepth)
{
@@ -61,21 +65,28 @@ namespace TimberWinR.Outputs
{
lock (_locker)
{
var samples = _sampleQueueDepths.Take(_totalSamples);
return (int) samples.Average();
if (_totalSamples > 0)
{
var samples = _sampleQueueDepths.Take(_totalSamples);
int avg = (int) samples.Average();
return avg;
}
return 0;
}
}
// Sample the queue and adjust the batch count if needed (ramp up slowly)
public int UpdateCurrentBatchCount(int queueSize, int currentBatchCount)
{
if (currentBatchCount < _maxBatchCount && currentBatchCount < queueSize && AverageQueueDepth() > currentBatchCount)
{
currentBatchCount += _maxBatchCount / QUEUE_SAMPLE_SIZE;
currentBatchCount += Math.Max(_maxBatchCount/_batchCount, 1);
if (currentBatchCount >= _maxBatchCount && !_warnedReachedMax)
{
LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount);
_warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset)
ReachedMaxBatchCount++;
currentBatchCount = _maxBatchCount;
}
}
else // Reset to default
@@ -91,6 +102,16 @@ namespace TimberWinR.Outputs
public class RedisOutput : OutputSender
{
public int QueueDepth
{
get { return _jsonQueue.Count; }
}
public long SentMessages
{
get { return _sentMessages; }
}
private readonly string _logstashIndexName;
private readonly int _port;
private readonly int _timeout;
@@ -107,10 +128,10 @@ namespace TimberWinR.Outputs
private long _sentMessages;
private long _errorCount;
private long _redisDepth;
private DateTime? _lastErrorTime;
private DateTime? _lastErrorTimeUTC;
private readonly int _maxQueueSize;
private readonly bool _queueOverflowDiscardOldest;
private bool _warnedReachedMax;
private bool _warnedReachedMax;
private BatchCounter _batchCounter;
public bool Stop { get; set; }
@@ -154,7 +175,7 @@ namespace TimberWinR.Outputs
new JObject(
new JProperty("host", string.Join(",", _redisHosts)),
new JProperty("errors", _errorCount),
new JProperty("lastErrorTime", _lastErrorTime),
new JProperty("lastErrorTimeUTC", _lastErrorTimeUTC),
new JProperty("redis_depth", _redisDepth),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
@@ -165,6 +186,7 @@ namespace TimberWinR.Outputs
new JProperty("threads", _numThreads),
new JProperty("batchcount", _batchCount),
new JProperty("currentBatchCount", _currentBatchCount),
new JProperty("reachedMaxBatchCount", _batchCounter.ReachedMaxBatchCount),
new JProperty("maxBatchCount", _maxBatchCount),
new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()),
new JProperty("queueSamples", new JArray(_batchCounter.Samples())),
@@ -186,8 +208,8 @@ namespace TimberWinR.Outputs
_maxBatchCount = ro.MaxBatchCount;
// Make sure maxBatchCount is larger than batchCount
if (_maxBatchCount < _batchCount)
_maxBatchCount = _batchCount*10;
_maxBatchCount = _batchCount*10;
_manager = manager;
_redisHostIndex = 0;
_redisHosts = ro.Host;
@@ -198,7 +220,7 @@ namespace TimberWinR.Outputs
_interval = ro.Interval;
_numThreads = ro.NumThreads;
_errorCount = 0;
_lastErrorTime = null;
_lastErrorTimeUTC = null;
_maxQueueSize = ro.MaxQueueSize;
_queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest;
_batchCounter = new BatchCounter(_batchCount, _maxBatchCount);
@@ -328,13 +350,13 @@ namespace TimberWinR.Outputs
{
LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTime = DateTime.UtcNow;
_lastErrorTimeUTC = DateTime.UtcNow;
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTime = DateTime.UtcNow;
_lastErrorTimeUTC = DateTime.UtcNow;
}
break;
}
@@ -344,7 +366,7 @@ namespace TimberWinR.Outputs
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Redis hosts, {0}",
String.Join(",", _redisHosts));
_lastErrorTime = DateTime.UtcNow;
_lastErrorTimeUTC = DateTime.UtcNow;
}
}
}
@@ -352,7 +374,7 @@ namespace TimberWinR.Outputs
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
_lastErrorTime = DateTime.UtcNow;
_lastErrorTimeUTC = DateTime.UtcNow;
}
} // No more hosts to try.
@@ -365,7 +387,7 @@ namespace TimberWinR.Outputs
}
}
}
GC.Collect();
// GC.Collect();
if (!Stop)
syncHandle.Wait(TimeSpan.FromMilliseconds(_interval), CancelToken);
}
@@ -373,9 +395,13 @@ namespace TimberWinR.Outputs
{
break;
}
catch(ThreadAbortException tex)
{
break;
}
catch (Exception ex)
{
_lastErrorTime = DateTime.UtcNow;
_lastErrorTimeUTC = DateTime.UtcNow;
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger().Error(ex);
}