Fixed shutdown slowness
This commit is contained in:
@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
|
||||
// You can specify all the values or you can default the Build and Revision Numbers
|
||||
// by using the '*' as shown below:
|
||||
// [assembly: AssemblyVersion("1.0.*")]
|
||||
[assembly: AssemblyVersion("1.3.16.0")]
|
||||
[assembly: AssemblyFileVersion("1.3.16.0")]
|
||||
[assembly: AssemblyVersion("1.3.17.0")]
|
||||
[assembly: AssemblyFileVersion("1.3.17.0")]
|
||||
|
||||
@@ -18,12 +18,12 @@ namespace TimberWinR.Inputs
|
||||
private readonly int _pollingIntervalInSeconds;
|
||||
private readonly Parser.IISW3CLog _arguments;
|
||||
private long _receivedMessages;
|
||||
|
||||
private IisW3CRowReader rowReader;
|
||||
public bool Stop { get; set; }
|
||||
private IisW3CRowReader rowReader;
|
||||
|
||||
public IISW3CInputListener(Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
|
||||
: base(cancelToken, "Win32-IISLog")
|
||||
{
|
||||
{
|
||||
_arguments = arguments;
|
||||
_receivedMessages = 0;
|
||||
_pollingIntervalInSeconds = pollingIntervalInSeconds;
|
||||
@@ -38,6 +38,7 @@ namespace TimberWinR.Inputs
|
||||
|
||||
public override void Shutdown()
|
||||
{
|
||||
Stop = true;
|
||||
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
|
||||
base.Shutdown();
|
||||
}
|
||||
@@ -70,7 +71,7 @@ namespace TimberWinR.Inputs
|
||||
codepage = _arguments.CodePage,
|
||||
consolidateLogs = true,
|
||||
dirTime = _arguments.DirTime,
|
||||
dQuotes = _arguments.DoubleQuotes,
|
||||
dQuotes = _arguments.DoubleQuotes,
|
||||
recurse = _arguments.Recurse,
|
||||
useDoubleQuotes = _arguments.DoubleQuotes
|
||||
};
|
||||
@@ -79,61 +80,74 @@ namespace TimberWinR.Inputs
|
||||
iFmt.minDateMod = _arguments.MinDateMod.Value.ToString("yyyy-MM-dd hh:mm:ss");
|
||||
|
||||
Dictionary<string, Int64> logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
|
||||
// Execute the query
|
||||
while (!CancelToken.IsCancellationRequested)
|
||||
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
try
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
oLogQuery = new LogQuery();
|
||||
|
||||
var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location);
|
||||
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
|
||||
for (; !rsfiles.atEnd(); rsfiles.moveNext())
|
||||
// Execute the query
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
var record = rsfiles.getRecord();
|
||||
string fileName = record.getValue("LogFilename") as string;
|
||||
if (!logFileMaxRecords.ContainsKey(fileName))
|
||||
try
|
||||
{
|
||||
var qcount = string.Format("SELECT max(LogRow) as MaxRecordNumber FROM {0}", fileName);
|
||||
var rcount = oLogQuery.Execute(qcount, iFmt);
|
||||
var qr = rcount.getRecord();
|
||||
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
oLogQuery = new LogQuery();
|
||||
|
||||
var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location);
|
||||
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
|
||||
for (; !rsfiles.atEnd(); rsfiles.moveNext())
|
||||
{
|
||||
var record = rsfiles.getRecord();
|
||||
string fileName = record.getValue("LogFilename") as string;
|
||||
if (!logFileMaxRecords.ContainsKey(fileName))
|
||||
{
|
||||
var qcount = string.Format("SELECT max(LogRow) as MaxRecordNumber FROM {0}",
|
||||
fileName);
|
||||
var rcount = oLogQuery.Execute(qcount, iFmt);
|
||||
var qr = rcount.getRecord();
|
||||
var lrn = (Int64) qr.getValueEx("MaxRecordNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
}
|
||||
}
|
||||
|
||||
foreach (string fileName in logFileMaxRecords.Keys.ToList())
|
||||
{
|
||||
var lastRecordNumber = logFileMaxRecords[fileName];
|
||||
var query = string.Format("SELECT * FROM '{0}' Where LogRow > {1}", fileName,
|
||||
lastRecordNumber);
|
||||
|
||||
var rs = oLogQuery.Execute(query, iFmt);
|
||||
rowReader.ReadColumnMap(rs);
|
||||
|
||||
// Browse the recordset
|
||||
for (; !rs.atEnd(); rs.moveNext())
|
||||
{
|
||||
var record = rs.getRecord();
|
||||
var json = rowReader.ReadToJson(record);
|
||||
ProcessJson(json);
|
||||
_receivedMessages++;
|
||||
var lrn = (Int64) record.getValueEx("LogRow");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
record = null;
|
||||
json = null;
|
||||
}
|
||||
// Close the recordset
|
||||
rs.close();
|
||||
GC.Collect();
|
||||
}
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (string fileName in logFileMaxRecords.Keys.ToList())
|
||||
{
|
||||
var lastRecordNumber = logFileMaxRecords[fileName];
|
||||
var query = string.Format("SELECT * FROM '{0}' Where LogRow > {1}", fileName, lastRecordNumber);
|
||||
|
||||
var rs = oLogQuery.Execute(query, iFmt);
|
||||
rowReader.ReadColumnMap(rs);
|
||||
|
||||
// Browse the recordset
|
||||
for (; !rs.atEnd(); rs.moveNext())
|
||||
catch (OperationCanceledException oce)
|
||||
{
|
||||
var record = rs.getRecord();
|
||||
var json = rowReader.ReadToJson(record);
|
||||
ProcessJson(json);
|
||||
_receivedMessages++;
|
||||
var lrn = (Int64)record.getValueEx("LogRow");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
record = null;
|
||||
json = null;
|
||||
break;
|
||||
}
|
||||
// Close the recordset
|
||||
rs.close();
|
||||
GC.Collect();
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
|
||||
System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000);
|
||||
}
|
||||
|
||||
Finished();
|
||||
|
||||
@@ -58,14 +58,17 @@ namespace TimberWinR.Inputs
|
||||
|
||||
public void Finished()
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Info("Signaling Event Shutdown {0}", InputType);
|
||||
FinishedEvent.Set();
|
||||
LogManager.GetCurrentClassLogger().Info("Finished Shutdown {0}", InputType);
|
||||
LogManager.GetCurrentClassLogger().Info("Finished signaling Shutdown {0}", InputType);
|
||||
}
|
||||
public virtual void Shutdown()
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
|
||||
|
||||
FinishedEvent.WaitOne();
|
||||
|
||||
LogManager.GetCurrentClassLogger().Info("Finished Wait For {0}", InputType);
|
||||
try
|
||||
{
|
||||
if (File.Exists(CheckpointFileName))
|
||||
|
||||
@@ -31,14 +31,14 @@ namespace TimberWinR.Inputs
|
||||
private Dictionary<string, DateTime> _logFileCreationTimes;
|
||||
private Dictionary<string, DateTime> _logFileSampleTimes;
|
||||
private Dictionary<string, long> _logFileSizes;
|
||||
private CancellationToken _cancelToken;
|
||||
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Win32-FileLog")
|
||||
{
|
||||
Stop = false;
|
||||
_cancelToken = cancelToken;
|
||||
|
||||
_logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
_logFileCreationTimes = new Dictionary<string, DateTime>();
|
||||
_logFileSampleTimes = new Dictionary<string, DateTime>();
|
||||
@@ -108,7 +108,7 @@ namespace TimberWinR.Inputs
|
||||
while (!Stop)
|
||||
{
|
||||
var oLogQuery = new LogQuery();
|
||||
if (!_cancelToken.IsCancellationRequested)
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -216,7 +216,8 @@ namespace TimberWinR.Inputs
|
||||
GC.Collect();
|
||||
}
|
||||
// Sleep
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), _cancelToken);
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (FileNotFoundException fnfex)
|
||||
{
|
||||
|
||||
@@ -25,6 +25,7 @@ namespace TimberWinR.Inputs
|
||||
private readonly int _pollingIntervalInSeconds;
|
||||
private readonly TimberWinR.Parser.W3CLog _arguments;
|
||||
private long _receivedMessages;
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public W3CInputListener(TimberWinR.Parser.W3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5)
|
||||
: base(cancelToken, "Win32-W3CLog")
|
||||
@@ -41,6 +42,7 @@ namespace TimberWinR.Inputs
|
||||
|
||||
public override void Shutdown()
|
||||
{
|
||||
Stop = true;
|
||||
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
|
||||
base.Shutdown();
|
||||
}
|
||||
@@ -55,7 +57,7 @@ namespace TimberWinR.Inputs
|
||||
new JProperty("codepage", _arguments.CodePage),
|
||||
new JProperty("separator", _arguments.Separator),
|
||||
new JProperty("dQuotes", _arguments.DoubleQuotes),
|
||||
new JProperty("dtLines", _arguments.DtLines)
|
||||
new JProperty("dtLines", _arguments.DtLines)
|
||||
)));
|
||||
return json;
|
||||
}
|
||||
@@ -73,81 +75,94 @@ namespace TimberWinR.Inputs
|
||||
iCodepage = _arguments.CodePage,
|
||||
doubleQuotedStrings = _arguments.DoubleQuotes,
|
||||
detectTypesLines = _arguments.DtLines,
|
||||
dQuotes = _arguments.DoubleQuotes,
|
||||
separator = _arguments.Separator
|
||||
dQuotes = _arguments.DoubleQuotes,
|
||||
separator = _arguments.Separator
|
||||
};
|
||||
|
||||
|
||||
Dictionary<string, Int64> logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
|
||||
// Execute the query
|
||||
while (!CancelToken.IsCancellationRequested)
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
try
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
oLogQuery = new LogQuery();
|
||||
|
||||
var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location);
|
||||
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
|
||||
for (; !rsfiles.atEnd(); rsfiles.moveNext())
|
||||
// Execute the query
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
var record = rsfiles.getRecord();
|
||||
string fileName = record.getValue("LogFilename") as string;
|
||||
if (!logFileMaxRecords.ContainsKey(fileName))
|
||||
try
|
||||
{
|
||||
var qcount = string.Format("SELECT max(RowNumber) as MaxRecordNumber FROM {0}", fileName);
|
||||
var rcount = oLogQuery.Execute(qcount, iFmt);
|
||||
var qr = rcount.getRecord();
|
||||
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
}
|
||||
}
|
||||
oLogQuery = new LogQuery();
|
||||
|
||||
|
||||
foreach (string fileName in logFileMaxRecords.Keys.ToList())
|
||||
{
|
||||
var lastRecordNumber = logFileMaxRecords[fileName];
|
||||
var query = string.Format("SELECT * FROM '{0}' Where RowNumber > {1} order by RowNumber", fileName, lastRecordNumber);
|
||||
var rs = oLogQuery.Execute(query, iFmt);
|
||||
var colMap = new Dictionary<string, int>();
|
||||
for (int col = 0; col < rs.getColumnCount(); col++)
|
||||
{
|
||||
string colName = rs.getColumnName(col);
|
||||
colMap[colName] = col;
|
||||
}
|
||||
|
||||
// Browse the recordset
|
||||
for (; !rs.atEnd(); rs.moveNext())
|
||||
{
|
||||
var record = rs.getRecord();
|
||||
var json = new JObject();
|
||||
foreach (var field in colMap.Keys)
|
||||
{
|
||||
object v = record.getValue(field);
|
||||
if (field == "date" || field == "time")
|
||||
var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location);
|
||||
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
|
||||
for (; !rsfiles.atEnd(); rsfiles.moveNext())
|
||||
{
|
||||
var record = rsfiles.getRecord();
|
||||
string fileName = record.getValue("LogFilename") as string;
|
||||
if (!logFileMaxRecords.ContainsKey(fileName))
|
||||
{
|
||||
DateTime dt = DateTime.Parse(v.ToString());
|
||||
json.Add(new JProperty(field, dt));
|
||||
}
|
||||
else
|
||||
json.Add(new JProperty(field, v));
|
||||
var qcount = string.Format("SELECT max(RowNumber) as MaxRecordNumber FROM {0}",
|
||||
fileName);
|
||||
var rcount = oLogQuery.Execute(qcount, iFmt);
|
||||
var qr = rcount.getRecord();
|
||||
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
}
|
||||
}
|
||||
ProcessJson(json);
|
||||
_receivedMessages++;
|
||||
var lrn = (Int64)record.getValueEx("RowNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
record = null;
|
||||
json = null;
|
||||
|
||||
|
||||
foreach (string fileName in logFileMaxRecords.Keys.ToList())
|
||||
{
|
||||
var lastRecordNumber = logFileMaxRecords[fileName];
|
||||
var query = string.Format(
|
||||
"SELECT * FROM '{0}' Where RowNumber > {1} order by RowNumber", fileName,
|
||||
lastRecordNumber);
|
||||
var rs = oLogQuery.Execute(query, iFmt);
|
||||
var colMap = new Dictionary<string, int>();
|
||||
for (int col = 0; col < rs.getColumnCount(); col++)
|
||||
{
|
||||
string colName = rs.getColumnName(col);
|
||||
colMap[colName] = col;
|
||||
}
|
||||
|
||||
// Browse the recordset
|
||||
for (; !rs.atEnd(); rs.moveNext())
|
||||
{
|
||||
var record = rs.getRecord();
|
||||
var json = new JObject();
|
||||
foreach (var field in colMap.Keys)
|
||||
{
|
||||
object v = record.getValue(field);
|
||||
if (field == "date" || field == "time")
|
||||
{
|
||||
DateTime dt = DateTime.Parse(v.ToString());
|
||||
json.Add(new JProperty(field, dt));
|
||||
}
|
||||
else
|
||||
json.Add(new JProperty(field, v));
|
||||
}
|
||||
ProcessJson(json);
|
||||
_receivedMessages++;
|
||||
var lrn = (Int64)record.getValueEx("RowNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
record = null;
|
||||
json = null;
|
||||
}
|
||||
// Close the recordset
|
||||
rs.close();
|
||||
}
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
// Close the recordset
|
||||
rs.close();
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
|
||||
System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000);
|
||||
}
|
||||
|
||||
Finished();
|
||||
|
||||
@@ -27,6 +27,7 @@ namespace TimberWinR.Inputs
|
||||
private TimberWinR.Parser.WindowsEvent _arguments;
|
||||
private long _receivedMessages;
|
||||
private List<Thread> _tasks { get; set; }
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public WindowsEvtInputListener(TimberWinR.Parser.WindowsEvent arguments, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Win32-Eventlog")
|
||||
@@ -40,17 +41,14 @@ namespace TimberWinR.Inputs
|
||||
string hive = eventHive.Trim();
|
||||
var thread = new Thread(new ParameterizedThreadStart(EventWatcher));
|
||||
_tasks.Add(thread);
|
||||
thread.Start(eventHive);
|
||||
thread.Start(eventHive);
|
||||
}
|
||||
}
|
||||
|
||||
public override void Shutdown()
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
|
||||
foreach(Thread t in _tasks)
|
||||
{
|
||||
t.Abort();
|
||||
}
|
||||
Stop = true;
|
||||
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
|
||||
base.Shutdown();
|
||||
}
|
||||
|
||||
@@ -78,9 +76,9 @@ namespace TimberWinR.Inputs
|
||||
{
|
||||
string location = ploc.ToString();
|
||||
|
||||
LogQuery oLogQuery = new LogQuery();
|
||||
LogQuery oLogQuery = new LogQuery();
|
||||
|
||||
LogManager.GetCurrentClassLogger().Info("WindowsEvent Input Listener Ready");
|
||||
LogManager.GetCurrentClassLogger().Info("WindowsEvent Input Listener Ready");
|
||||
|
||||
// Instantiate the Event Log Input Format object
|
||||
var iFmt = new EventLogInputFormat()
|
||||
@@ -94,101 +92,91 @@ namespace TimberWinR.Inputs
|
||||
stringsSep = _arguments.StringsSep,
|
||||
resolveSIDs = _arguments.ResolveSIDS
|
||||
};
|
||||
|
||||
|
||||
oLogQuery = null;
|
||||
|
||||
Dictionary<string, Int64> logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
|
||||
// Execute the query
|
||||
while (!CancelToken.IsCancellationRequested)
|
||||
Dictionary<string, Int64> logFileMaxRecords = new Dictionary<string, Int64>();
|
||||
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
try
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
Thread.CurrentThread.Priority = ThreadPriority.BelowNormal;
|
||||
|
||||
oLogQuery = new LogQuery();
|
||||
|
||||
var qfiles = string.Format("SELECT Distinct [EventLog] FROM {0}", location);
|
||||
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
|
||||
for (; !rsfiles.atEnd(); rsfiles.moveNext())
|
||||
// Execute the query
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
var record = rsfiles.getRecord();
|
||||
string logName = record.getValue("EventLog") as string;
|
||||
if (!logFileMaxRecords.ContainsKey(logName))
|
||||
{
|
||||
var qcount = string.Format("SELECT max(RecordNumber) as MaxRecordNumber FROM {0}", logName);
|
||||
var rcount = oLogQuery.Execute(qcount, iFmt);
|
||||
var qr = rcount.getRecord();
|
||||
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
|
||||
logFileMaxRecords[logName] = lrn;
|
||||
}
|
||||
}
|
||||
try
|
||||
{
|
||||
oLogQuery = new LogQuery();
|
||||
|
||||
|
||||
foreach (string fileName in logFileMaxRecords.Keys.ToList())
|
||||
{
|
||||
var lastRecordNumber = logFileMaxRecords[fileName];
|
||||
var query = string.Format("SELECT * FROM {0} where RecordNumber > {1}", location, lastRecordNumber);
|
||||
|
||||
var rs = oLogQuery.Execute(query, iFmt);
|
||||
// Browse the recordset
|
||||
for (; !rs.atEnd(); rs.moveNext())
|
||||
{
|
||||
|
||||
var record = rs.getRecord();
|
||||
var json = new JObject();
|
||||
foreach (var field in _arguments.Fields)
|
||||
var qfiles = string.Format("SELECT Distinct [EventLog] FROM {0}", location);
|
||||
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
|
||||
for (; !rsfiles.atEnd(); rsfiles.moveNext())
|
||||
{
|
||||
object v = record.getValue(field.Name);
|
||||
if (field.Name == "Data")
|
||||
v = ToPrintable(v.ToString());
|
||||
json.Add(new JProperty(field.Name, v));
|
||||
var record = rsfiles.getRecord();
|
||||
string logName = record.getValue("EventLog") as string;
|
||||
if (!logFileMaxRecords.ContainsKey(logName))
|
||||
{
|
||||
var qcount = string.Format("SELECT max(RecordNumber) as MaxRecordNumber FROM {0}",
|
||||
logName);
|
||||
var rcount = oLogQuery.Execute(qcount, iFmt);
|
||||
var qr = rcount.getRecord();
|
||||
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
|
||||
logFileMaxRecords[logName] = lrn;
|
||||
}
|
||||
}
|
||||
|
||||
var lrn = (Int64)record.getValueEx("RecordNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
|
||||
record = null;
|
||||
ProcessJson(json);
|
||||
_receivedMessages++;
|
||||
json = null;
|
||||
foreach (string fileName in logFileMaxRecords.Keys.ToList())
|
||||
{
|
||||
var lastRecordNumber = logFileMaxRecords[fileName];
|
||||
var query = string.Format("SELECT * FROM {0} where RecordNumber > {1}", location,
|
||||
lastRecordNumber);
|
||||
|
||||
var rs = oLogQuery.Execute(query, iFmt);
|
||||
// Browse the recordset
|
||||
for (; !rs.atEnd(); rs.moveNext())
|
||||
{
|
||||
|
||||
var record = rs.getRecord();
|
||||
var json = new JObject();
|
||||
foreach (var field in _arguments.Fields)
|
||||
{
|
||||
object v = record.getValue(field.Name);
|
||||
if (field.Name == "Data")
|
||||
v = ToPrintable(v.ToString());
|
||||
json.Add(new JProperty(field.Name, v));
|
||||
}
|
||||
|
||||
var lrn = (Int64)record.getValueEx("RecordNumber");
|
||||
logFileMaxRecords[fileName] = lrn;
|
||||
|
||||
record = null;
|
||||
ProcessJson(json);
|
||||
_receivedMessages++;
|
||||
json = null;
|
||||
|
||||
}
|
||||
// Close the recordset
|
||||
rs.close();
|
||||
rs = null;
|
||||
GC.Collect();
|
||||
}
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
// Close the recordset
|
||||
rs.close();
|
||||
rs = null;
|
||||
GC.Collect();
|
||||
}
|
||||
}
|
||||
catch (System.Threading.ThreadAbortException tex)
|
||||
{
|
||||
Thread.ResetAbort();
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
Thread.CurrentThread.Priority = ThreadPriority.Normal;
|
||||
System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000);
|
||||
}
|
||||
catch (System.Threading.ThreadAbortException tex)
|
||||
{
|
||||
Thread.ResetAbort();
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
|
||||
|
||||
Finished();
|
||||
}
|
||||
|
||||
Finished();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ namespace TimberWinR.Outputs
|
||||
private long _sentMessages;
|
||||
private long _errorCount;
|
||||
private Parser.ElasticsearchOutput eo;
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Elasticsearch")
|
||||
@@ -46,10 +47,10 @@ namespace TimberWinR.Outputs
|
||||
_jsonQueue = new List<JObject>();
|
||||
_numThreads = eo.NumThreads;
|
||||
|
||||
for (int i = 0; i < eo.NumThreads; i++)
|
||||
{
|
||||
Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
|
||||
}
|
||||
for (int i = 0; i < eo.NumThreads; i++)
|
||||
{
|
||||
Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
|
||||
}
|
||||
}
|
||||
|
||||
public override JObject ToJson()
|
||||
@@ -58,12 +59,12 @@ namespace TimberWinR.Outputs
|
||||
new JProperty("elasticsearch",
|
||||
new JObject(
|
||||
new JProperty("host", string.Join(",", _host)),
|
||||
new JProperty("errors", _errorCount),
|
||||
new JProperty("errors", _errorCount),
|
||||
new JProperty("sent_messages", _sentMessages),
|
||||
new JProperty("queued_messages", _jsonQueue.Count),
|
||||
new JProperty("port", _port),
|
||||
new JProperty("interval", _interval),
|
||||
new JProperty("threads", _numThreads),
|
||||
new JProperty("threads", _numThreads),
|
||||
new JProperty("hosts",
|
||||
new JArray(
|
||||
from h in _host
|
||||
@@ -76,91 +77,111 @@ namespace TimberWinR.Outputs
|
||||
//
|
||||
private void ElasticsearchSender()
|
||||
{
|
||||
while (!CancelToken.IsCancellationRequested)
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
JObject[] messages;
|
||||
lock (_locker)
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
var count = _jsonQueue.Count;
|
||||
messages = _jsonQueue.Take(count).ToArray();
|
||||
_jsonQueue.RemoveRange(0, count);
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
int numHosts = _host.Length;
|
||||
while (numHosts-- > 0)
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get the next client
|
||||
RestClient client = getClient();
|
||||
if (client != null)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl);
|
||||
JObject[] messages;
|
||||
lock (_locker)
|
||||
{
|
||||
var count = _jsonQueue.Count;
|
||||
messages = _jsonQueue.Take(count).ToArray();
|
||||
_jsonQueue.RemoveRange(0, count);
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
}
|
||||
|
||||
foreach (JObject json in messages)
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
int numHosts = _host.Length;
|
||||
while (numHosts-- > 0)
|
||||
{
|
||||
var typeName = this.eo.GetTypeName(json);
|
||||
var indexName = this.eo.GetIndexName(json);
|
||||
var req = new RestRequest(string.Format("/{0}/{1}/", indexName, typeName), Method.POST);
|
||||
|
||||
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);
|
||||
|
||||
req.RequestFormat = DataFormat.Json;
|
||||
|
||||
try
|
||||
{
|
||||
client.ExecuteAsync(req, response =>
|
||||
// Get the next client
|
||||
RestClient client = getClient();
|
||||
if (client != null)
|
||||
{
|
||||
if (response.StatusCode != HttpStatusCode.Created)
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl);
|
||||
|
||||
foreach (JObject json in messages)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Error("Failed to send: {0}", response.ErrorMessage);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
var typeName = this.eo.GetTypeName(json);
|
||||
var indexName = this.eo.GetIndexName(json);
|
||||
var req =
|
||||
new RestRequest(string.Format("/{0}/{1}/", indexName, typeName),
|
||||
Method.POST);
|
||||
|
||||
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);
|
||||
|
||||
req.RequestFormat = DataFormat.Json;
|
||||
|
||||
try
|
||||
{
|
||||
client.ExecuteAsync(req, response =>
|
||||
{
|
||||
if (response.StatusCode != HttpStatusCode.Created)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Error("Failed to send: {0}", response.ErrorMessage);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
_sentMessages++;
|
||||
GC.Collect();
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception error)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(error);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_sentMessages++;
|
||||
GC.Collect();
|
||||
}
|
||||
});
|
||||
GC.Collect();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Fatal("Unable to connect with any Elasticsearch hosts, {0}",
|
||||
String.Join(",", _host));
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception error)
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(error);
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
GC.Collect();
|
||||
}
|
||||
else
|
||||
{
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Fatal("Unable to connect with any Elasticsearch hosts, {0}",
|
||||
String.Join(",", _host));
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
|
||||
GC.Collect();
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_interval), CancelToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch (OperationCanceledException oce)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
break;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
GC.Collect();
|
||||
System.Threading.Thread.Sleep(_interval);
|
||||
}
|
||||
}
|
||||
|
||||
private RestClient getClient()
|
||||
{
|
||||
if (_hostIndex >= _host.Length)
|
||||
{
|
||||
if (_hostIndex >= _host.Length)
|
||||
_hostIndex = 0;
|
||||
|
||||
int numTries = 0;
|
||||
@@ -168,9 +189,9 @@ namespace TimberWinR.Outputs
|
||||
{
|
||||
try
|
||||
{
|
||||
string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":",""), _host[_hostIndex], _port);
|
||||
string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":", ""), _host[_hostIndex], _port);
|
||||
var client = new RestClient(url);
|
||||
client.Timeout = _timeout;
|
||||
client.Timeout = _timeout;
|
||||
|
||||
_hostIndex++;
|
||||
if (_hostIndex >= _host.Length)
|
||||
|
||||
@@ -38,6 +38,8 @@ namespace TimberWinR.Outputs
|
||||
private int _maxQueueSize;
|
||||
private bool _queueOverflowDiscardOldest;
|
||||
|
||||
public bool Stop { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Get the next client
|
||||
/// </summary>
|
||||
@@ -185,67 +187,87 @@ namespace TimberWinR.Outputs
|
||||
//
|
||||
private void RedisSender()
|
||||
{
|
||||
while (!CancelToken.IsCancellationRequested)
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
string[] messages;
|
||||
lock (_locker)
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
messages = _jsonQueue.Take(_batchCount).ToArray();
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
int numHosts = _redisHosts.Length;
|
||||
while (numHosts-- > 0)
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get the next client
|
||||
using (RedisClient client = getClient())
|
||||
string[] messages;
|
||||
lock (_locker)
|
||||
{
|
||||
if (client != null)
|
||||
{
|
||||
client.StartPipe();
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Length, client.Host);
|
||||
messages = _jsonQueue.Take(_batchCount).ToArray();
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
if (messages.Length > 0)
|
||||
_manager.IncrementMessageCount(messages.Length);
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
int numHosts = _redisHosts.Length;
|
||||
while (numHosts-- > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
_redisDepth = client.RPush(_logstashIndexName, messages);
|
||||
_sentMessages += messages.Length;
|
||||
// Get the next client
|
||||
using (RedisClient client = getClient())
|
||||
{
|
||||
if (client != null)
|
||||
{
|
||||
client.StartPipe();
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Debug("Sending {0} Messages to {1}", messages.Length, client.Host);
|
||||
|
||||
try
|
||||
{
|
||||
_redisDepth = client.RPush(_logstashIndexName, messages);
|
||||
_sentMessages += messages.Length;
|
||||
}
|
||||
catch (SocketException ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.EndPipe();
|
||||
}
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Fatal("Unable to connect with any Redis hosts, {0}",
|
||||
String.Join(",", _redisHosts));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SocketException ex)
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Warn(ex);
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.EndPipe();
|
||||
}
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
LogManager.GetCurrentClassLogger()
|
||||
.Fatal("Unable to connect with any Redis hosts, {0}",
|
||||
String.Join(",", _redisHosts));
|
||||
}
|
||||
}
|
||||
GC.Collect();
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_interval), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
Interlocked.Increment(ref _errorCount);
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
GC.Collect();
|
||||
System.Threading.Thread.Sleep(_interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ namespace TimberWinR.Outputs
|
||||
private readonly object _locker = new object();
|
||||
private readonly List<JObject> _jsonQueue;
|
||||
private long _sentMessages;
|
||||
public bool Stop { get; set; }
|
||||
|
||||
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken)
|
||||
: base(cancelToken, "Stdout")
|
||||
@@ -34,7 +35,7 @@ namespace TimberWinR.Outputs
|
||||
new JProperty("stdout",
|
||||
new JObject(
|
||||
new JProperty("sent_messages", _sentMessages))));
|
||||
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
@@ -43,31 +44,49 @@ namespace TimberWinR.Outputs
|
||||
//
|
||||
private void StdoutSender()
|
||||
{
|
||||
while (!CancelToken.IsCancellationRequested)
|
||||
using (var syncHandle = new ManualResetEventSlim())
|
||||
{
|
||||
JObject[] messages;
|
||||
lock (_locker)
|
||||
// Execute the query
|
||||
while (!Stop)
|
||||
{
|
||||
messages = _jsonQueue.Take(_jsonQueue.Count).ToArray();
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
try
|
||||
if (!CancelToken.IsCancellationRequested)
|
||||
{
|
||||
foreach (JObject obj in messages)
|
||||
try
|
||||
{
|
||||
JObject[] messages;
|
||||
lock (_locker)
|
||||
{
|
||||
messages = _jsonQueue.Take(_jsonQueue.Count).ToArray();
|
||||
_jsonQueue.RemoveRange(0, messages.Length);
|
||||
}
|
||||
|
||||
if (messages.Length > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
foreach (JObject obj in messages)
|
||||
{
|
||||
Console.WriteLine(obj.ToString());
|
||||
_sentMessages++;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
}
|
||||
if (!Stop)
|
||||
syncHandle.Wait(TimeSpan.FromSeconds(_interval), CancelToken);
|
||||
}
|
||||
catch (OperationCanceledException oce)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
Console.WriteLine(obj.ToString());
|
||||
_sentMessages++;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogManager.GetCurrentClassLogger().Error(ex);
|
||||
}
|
||||
}
|
||||
System.Threading.Thread.Sleep(_interval);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user