Added new TailFiles listener.

This commit is contained in:
Eric Fontana
2015-01-29 09:36:22 -05:00
parent d1e5224ba3
commit 4654d7dbc1
12 changed files with 490 additions and 29 deletions

View File

@@ -36,6 +36,7 @@ The current supported Input format sources are:
5. [Stdin](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/StdinInput.md) (Standard Input for Debugging)
6. [W3C](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/W3CInput.md)(Internet Information Services W3C Advanced/Custom Format)
7. [Udp](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/UdpInput.md) (listens for UDP on port for JSON messages)
8. [TailFiles](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/TailFiles.md) (Tails log files efficiently *New*)
## Codecs
The current list of supported codecs are:
@@ -69,7 +70,7 @@ A single Json filter using the single tag (this is only provided as a convienien
]
```
Multiple Json filters must use the jsonFilters and array syntax
Multiple Json filters must use the jsonFilters and array syntax, also mutateFilters, grokFilters, dateFilters, geoipFilters.
```json
"Filters": [
{

View File

@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.3.19.0")]
[assembly: AssemblyFileVersion("1.3.19.0")]
[assembly: AssemblyVersion("1.3.20.0")]
[assembly: AssemblyFileVersion("1.3.20.0")]

View File

@@ -67,7 +67,13 @@ namespace TimberWinR
public IEnumerable<Log> Logs
{
get { return _logs; }
}
}
private List<TailFile> _tails = new List<TailFile>();
public IEnumerable<TailFile> TailFiles
{
get { return _tails; }
}
private List<IISW3CLog> _iisw3clogs = new List<IISW3CLog>();
@@ -148,6 +154,8 @@ namespace TimberWinR
c._stdins.AddRange(x.TimberWinR.Inputs.Stdins.ToList());
if (x.TimberWinR.Inputs.Logs != null)
c._logs.AddRange(x.TimberWinR.Inputs.Logs.ToList());
if (x.TimberWinR.Inputs.TailFiles != null)
c._tails.AddRange(x.TimberWinR.Inputs.TailFiles.ToList());
if (x.TimberWinR.Inputs.Tcps != null)
c._tcps.AddRange(x.TimberWinR.Inputs.Tcps.ToList());
if (x.TimberWinR.Inputs.Udps != null)

View File

@@ -5,10 +5,8 @@ using System.Linq;
using System.Text;
using Newtonsoft.Json;
using NLog;
using TimberWinR.Parser;
using LogQuery = Interop.MSUtil.LogQueryClassClass;
using TextLineInputFormat = Interop.MSUtil.COMTextLineInputContextClass;
using LogRecordSet = Interop.MSUtil.ILogRecordset;
namespace TimberWinR.Inputs
{
@@ -19,7 +17,7 @@ namespace TimberWinR.Inputs
{
private static readonly object _locker = new object();
private List<LogsFileDatabaseEntry> Entries { get; set; }
private string DatabaseDirectory { get; set; }
private string DatabaseDirectory { get; set; }
public string DatabaseFileName
{
get { return Path.Combine(DatabaseDirectory, ".timberwinrdb"); }
@@ -37,6 +35,14 @@ namespace TimberWinR.Inputs
}
}
private LogsFileDatabaseEntry FindFile(string logName)
{
lock (_locker)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
return existingEntry;
}
}
private bool ExistingFileTest(string logName)
{
var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault();
@@ -56,37 +62,51 @@ namespace TimberWinR.Inputs
}
}
private LogsFileDatabaseEntry AddFileEntry(string logName, TextLineInputFormat fmt)
private LogsFileDatabaseEntry AddFileEntry(string logName)
{
var de = new LogsFileDatabaseEntry();
lock (_locker)
{
var lq = new LogQuery();
de.NewFile = true;
var fi = new FileInfo(logName);
de.FileName = logName;
de.Size = fi.Length;
de.SampleTime = DateTime.UtcNow;
de.CreationTime = fi.CreationTimeUtc;
if (fi.Exists)
{
var qcount = string.Format("SELECT max(Index) as MaxRecordNumber FROM {0}", logName);
var rcount = lq.Execute(qcount, fmt);
var qr = rcount.getRecord();
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
de.MaxRecords = lrn;
}
de.CreationTimeUtc = fi.CreationTimeUtc;
Entries.Add(de);
WriteDatabaseFileNoLock();
}
return de;
}
public static LogsFileDatabaseEntry AddLogFile(string logName, TextLineInputFormat fmt)
public static LogsFileDatabaseEntry LookupLogFile(string logName)
{
Instance.RemoveFileEntry(logName); // Remove if already exists, otherwise ignores.
return Instance.AddFileEntry(logName, fmt);
LogsFileDatabaseEntry dbe = Instance.FindFile(logName);
if (dbe == null)
dbe = Instance.AddFileEntry(logName);
else
dbe.NewFile = false;
return dbe;
}
public static void Update(LogsFileDatabaseEntry dbe)
{
Instance.UpdateEntry(dbe);
}
private void UpdateEntry(LogsFileDatabaseEntry dbe)
{
lock(_locker)
{
var fi = new FileInfo(dbe.FileName);
dbe.CreationTimeUtc = fi.CreationTimeUtc;
dbe.SampleTime = DateTime.UtcNow;
dbe.Size = fi.Length;
WriteDatabaseFileNoLock();
}
}
public static LogsFileDatabase Instance
{
get
@@ -122,6 +142,17 @@ namespace TimberWinR.Inputs
{
LogManager.GetCurrentClassLogger()
.Error("Error reading database '{0}': {1}", DatabaseFileName, ex.ToString());
try
{
if (File.Exists(DatabaseFileName))
File.Delete(DatabaseFileName);
LogManager.GetCurrentClassLogger().Info("Creating New Database '{0}'", DatabaseFileName);
WriteDatabaseLock();
}
catch (Exception ex2)
{
LogManager.GetCurrentClassLogger().Info("Error Creating New Database '{0}': {1}", DatabaseFileName, ex2.ToString());
}
}
}
private void WriteDatabaseFileNoLock()
@@ -155,7 +186,7 @@ namespace TimberWinR.Inputs
}
private LogsFileDatabase(string databaseDirectory)
{
{
DatabaseDirectory = databaseDirectory;
Entries = new List<LogsFileDatabaseEntry>();
}
@@ -164,11 +195,13 @@ namespace TimberWinR.Inputs
public class LogsFileDatabaseEntry
{
[JsonIgnore]
public bool NewFile { get; set; }
public string FileName { get; set; }
public Int64 MaxRecords { get; set; }
public DateTime CreationTime { get; set; }
public DateTime CreationTimeUtc { get; set; }
public DateTime SampleTime { get; set; }
public long Size { get; set; }
public long Size { get; set; }
}
}

View File

@@ -198,6 +198,8 @@ namespace TimberWinR.Inputs
recurse = _arguments.Recurse
};
Dictionary<string, string> _fnfmap = new Dictionary<string, string>();
using (var syncHandle = new ManualResetEventSlim())
{
// Execute the query
@@ -322,7 +324,12 @@ namespace TimberWinR.Inputs
}
catch (FileNotFoundException fnfex)
{
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
string fn = fnfex.FileName;
if (!_fnfmap.ContainsKey(fn))
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
else
_fnfmap[fn] = fn;
}
catch (OperationCanceledException oce)
{

View File

@@ -0,0 +1,320 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Configuration;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using Interop.MSUtil;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using NLog;
using NLog.LayoutRenderers;
using TimberWinR.Parser;
namespace TimberWinR.Inputs
{
/// <summary>
/// Tail a file.
/// </summary>
public class TailFileListener : InputListener
{
private int _pollingIntervalInSeconds;
private TimberWinR.Parser.TailFile _arguments;
private long _receivedMessages;
private Dictionary<string, Int64> _logFileMaxRecords;
private Dictionary<string, DateTime> _logFileCreationTimes;
private Dictionary<string, DateTime> _logFileSampleTimes;
private Dictionary<string, long> _logFileSizes;
private Codec _codec;
private List<string> _multiline { get; set; }
public bool Stop { get; set; }
public TailFileListener(TimberWinR.Parser.TailFile arguments, CancellationToken cancelToken)
: base(cancelToken, "Win32-TailLog")
{
Stop = false;
_codec = arguments.Codec;
_logFileMaxRecords = new Dictionary<string, Int64>();
_logFileCreationTimes = new Dictionary<string, DateTime>();
_logFileSampleTimes = new Dictionary<string, DateTime>();
_logFileSizes = new Dictionary<string, long>();
_receivedMessages = 0;
_arguments = arguments;
_pollingIntervalInSeconds = arguments.Interval;
foreach (string srcFile in _arguments.Location.Split(','))
{
string file = srcFile.Trim();
Task.Factory.StartNew(() => TailFileWatcher(file));
}
}
public override void Shutdown()
{
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
Stop = true;
base.Shutdown();
}
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("log",
new JObject(
new JProperty("messages", _receivedMessages),
new JProperty("type", InputType),
new JProperty("location", _arguments.Location),
new JProperty("logSource", _arguments.LogSource),
new JProperty("recurse", _arguments.Recurse),
new JProperty("files",
new JArray(from f in _logFileMaxRecords.Keys
select new JValue(f))),
new JProperty("fileSampleTimes",
new JArray(from f in _logFileSampleTimes.Values
select new JValue(f))),
new JProperty("fileSizes",
new JArray(from f in _logFileSizes.Values
select new JValue(f))),
new JProperty("fileIndices",
new JArray(from f in _logFileMaxRecords.Values
select new JValue(f))),
new JProperty("fileCreationDates",
new JArray(from f in _logFileCreationTimes.Values
select new JValue(f)))
)));
if (_codec != null)
{
var cp = new JProperty("codec",
new JArray(
new JObject(
new JProperty("type", _codec.Type.ToString()),
new JProperty("what", _codec.What.ToString()),
new JProperty("negate", _codec.Negate),
new JProperty("multilineTag", _codec.MultilineTag),
new JProperty("pattern", _codec.Pattern))));
json.Add(cp);
}
return json;
}
// return true to cancel codec
private void applyMultilineCodec(string msg)
{
if (_codec.Re == null)
_codec.Re = new Regex(_codec.Pattern);
Match match = _codec.Re.Match(msg);
bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate);
switch (_codec.What)
{
case Codec.WhatType.previous:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No Match
{
if (_multiline != null)
{
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codec.MultilineTag));
AddDefaultFields(jo);
ProcessJson(jo);
_receivedMessages++;
}
_multiline = new List<string>();
_multiline.Add(msg);
}
break;
case Codec.WhatType.next:
if (isMatch)
{
if (_multiline == null)
_multiline = new List<string>();
_multiline.Add(msg);
}
else // No match
{
if (_multiline != null)
{
_multiline.Add(msg);
string single = string.Join("\n", _multiline.ToArray());
_multiline = null;
JObject jo = new JObject();
jo["message"] = single;
jo.Add("tags", new JArray(_codec.MultilineTag));
AddDefaultFields(jo);
ProcessJson(jo);
_receivedMessages++;
}
else
{
JObject jo = new JObject();
jo["message"] = msg;
AddDefaultFields(jo);
ProcessJson(jo);
_receivedMessages++;
}
}
break;
}
}
private void TailFileContents(string fileName, long offset)
{
using (StreamReader reader = new StreamReader(new FileStream(fileName,
FileMode.Open, FileAccess.Read, FileShare.ReadWrite)))
{
//start at the end of the file
long lastMaxOffset = offset;
//if the file size has not changed, idle
if (reader.BaseStream.Length == lastMaxOffset)
return;
//seek to the last max offset
reader.BaseStream.Seek(lastMaxOffset, SeekOrigin.Begin);
//read out of the file until the EOF
string line = "";
long lineOffset = 0;
while ((line = reader.ReadLine()) != null)
{
if (string.IsNullOrEmpty(line))
continue;
long index = lastMaxOffset + lineOffset;
string text = line;
string logFileName = fileName;
var json = new JObject();
if (json["logSource"] == null)
{
if (string.IsNullOrEmpty(_arguments.LogSource))
json.Add(new JProperty("logSource", fileName));
else
json.Add(new JProperty("logSource", _arguments.LogSource));
}
json["Text"] = line;
json["Index"] = index;
json["LogFileName"] = fileName;
if (_codec != null && _codec.Type == Codec.CodecType.multiline)
applyMultilineCodec(line);
else
{
ProcessJson(json);
Interlocked.Increment(ref _receivedMessages);
}
lineOffset += line.Length;
// Console.WriteLine("File: {0}:{1}: {2}", fileName, reader.BaseStream.Position, line);
}
//update the last max offset
lastMaxOffset = reader.BaseStream.Position;
}
}
// One thread for each kind of file to watch, i.e. "*.log,*.txt" would be two separate
// threads.
private void TailFileWatcher(string fileToWatch)
{
Dictionary<string, string> _fnfmap = new Dictionary<string, string>();
using (var syncHandle = new ManualResetEventSlim())
{
// Execute the query
while (!Stop && !CancelToken.IsCancellationRequested)
{
try
{
if (!CancelToken.IsCancellationRequested)
{
string path = Path.GetDirectoryName(fileToWatch);
string name = Path.GetFileName(fileToWatch);
// Ok, we have a potential file filter here as 'fileToWatch' could be foo.log or *.log
SearchOption so = SearchOption.TopDirectoryOnly;
if (_arguments.Recurse == -1)
so = SearchOption.AllDirectories;
foreach (string fileName in Directory.GetFiles(path, name, so))
{
var dbe = LogsFileDatabase.LookupLogFile(fileName);
FileInfo fi = new FileInfo(dbe.FileName);
//LogManager.GetCurrentClassLogger().Info("Located File: {0}, New: {1}", dbe.FileName, dbe.NewFile);
long length = fi.Length;
bool logHasRolled = false;
if (fi.Length < dbe.Size || fi.CreationTimeUtc != dbe.CreationTimeUtc)
{
LogManager.GetCurrentClassLogger().Info("Log has Rolled: {0}", dbe.FileName);
logHasRolled = true;
}
bool processWholeFile = logHasRolled || dbe.NewFile;
if (processWholeFile)
{
LogManager.GetCurrentClassLogger().Info("Process Whole File: {0}", dbe.FileName);
TailFileContents(dbe.FileName, 0);
}
else
{
TailFileContents(dbe.FileName, dbe.Size);
}
LogsFileDatabase.Update(dbe);
}
// LogManager.GetCurrentClassLogger().Info("Finished Scan...Sleeping");
if (!Stop)
syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken);
}
}
catch (FileNotFoundException fnfex)
{
string fn = fnfex.FileName;
if (!_fnfmap.ContainsKey(fn))
LogManager.GetCurrentClassLogger().Warn(fnfex.Message);
else
_fnfmap[fn] = fn;
}
catch (OperationCanceledException oce)
{
break;
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
finally
{
}
}
}
Finished();
}
}
}

View File

@@ -21,6 +21,7 @@ namespace TimberWinR.Inputs
private readonly int _port;
private long _receivedMessages;
private long _parsedErrors;
private struct listenProfile
{
@@ -34,6 +35,7 @@ namespace TimberWinR.Inputs
new JProperty("udp",
new JObject(
new JProperty("port", _port),
new JProperty("errors", _parsedErrors),
new JProperty("messages", _receivedMessages)
)));
@@ -71,22 +73,25 @@ namespace TimberWinR.Inputs
private void StartListener(object useProfile)
{
var profile = (listenProfile)useProfile;
string lastMessage = "";
try
{
while (!CancelToken.IsCancellationRequested)
{
byte[] bytes = profile.client.Receive(ref profile.endPoint);
try
{
var data = Encoding.ASCII.GetString(bytes, 0, bytes.Length);
byte[] bytes = profile.client.Receive(ref profile.endPoint);
var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
lastMessage = data;
JObject json = JObject.Parse(data);
ProcessJson(json);
_receivedMessages++;
}
catch (Exception ex1)
{
LogManager.GetCurrentClassLogger().Warn("Bad JSON: {0}", lastMessage);
LogManager.GetCurrentClassLogger().Warn(ex1);
_parsedErrors++;
}
}
_udpListener.Close();

View File

@@ -200,6 +200,14 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (var logConfig in Config.TailFiles)
{
var elistner = new TailFileListener(logConfig, cancelToken);
Listeners.Add(elistner);
foreach (var output in Outputs)
output.Connect(elistner);
}
foreach (var tcp in Config.Tcps)
{
var elistner = new TcpInputListener(cancelToken, tcp.Port);

View File

@@ -297,6 +297,36 @@ namespace TimberWinR.Parser
}
}
public class TailFile : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
public string Location { get; set; }
[JsonProperty(PropertyName = "recurse")]
public int Recurse { get; set; }
[JsonProperty(PropertyName = "fields")]
public List<Field> Fields { get; set; }
[JsonProperty(PropertyName = "interval")]
public int Interval { get; set; }
[JsonProperty(PropertyName = "logSource")]
public string LogSource { get; set; }
[JsonProperty(PropertyName = "codec")]
public Codec Codec { get; set; }
public TailFile()
{
Fields = new List<Field>();
Fields.Add(new Field("LogFilename", "string"));
Fields.Add(new Field("Index", "integer"));
Fields.Add(new Field("Text", "string"));
Interval = 5;
}
public void Validate()
{
}
}
public class Log : IValidateSchema
{
[JsonProperty(PropertyName = "location")]
@@ -600,6 +630,9 @@ namespace TimberWinR.Parser
[JsonProperty("Logs")]
public Log[] Logs { get; set; }
[JsonProperty("TailFiles")]
public TailFile[] TailFiles { get; set; }
[JsonProperty("Tcp")]
public Tcp[] Tcps { get; set; }

View File

@@ -3,6 +3,10 @@
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
Version History
### 1.3.20.0 - 01/29/2015
1. Added new TailFiles input type which uses a native implementation (more-efficient) than using LogParser's Log
2. Updated Udp input listner to use UTF8 Encoding rather than ASCII
3. Reduced noisy complaint about missing log files for Logs listener
### 1.3.19.0 - 01/12/2015

View File

@@ -88,6 +88,7 @@
<Compile Include="Inputs\FieldDefinitions.cs" />
<Compile Include="Inputs\IISW3CRowReader.cs" />
<Compile Include="Inputs\LogsFileDatabase.cs" />
<Compile Include="Inputs\TailFileListener.cs" />
<Compile Include="Inputs\UdpInputListener.cs" />
<Compile Include="Inputs\W3CInputListener.cs" />
<Compile Include="Inputs\IISW3CInputListener.cs" />
@@ -127,6 +128,7 @@
<None Include="mdocs\DateFilter.md" />
<None Include="mdocs\Filters.md" />
<None Include="mdocs\GeoIPFilter.md" />
<None Include="mdocs\TailFiles.md" />
<None Include="mdocs\UdpInput.md" />
<None Include="mdocs\W3CInput.md" />
<None Include="mdocs\JsonFilter.md" />

View File

@@ -0,0 +1,40 @@
# Input: TailFiles
The TailFiles input will monitor a log (text) file similar to how a Linux "tail -f" command works. This uses
a native implementation rather than uses LogParser
## Parameters
The following parameters are allowed when configuring WindowsEvents.
| Parameter | Type | Description | Details | Default |
| :---------------- |:---------------| :----------------------------------------------------------------------- | :--------------------------- | :-- |
| *location* | string |Location of file(s) to monitor | Path to text file(s) including wildcards. | |
| *logSource* | string |Source name | Used for conditions | |
| *recurse* | integer |Max subdirectory recursion level. | 0 disables subdirectory recursion; -1 enables unlimited recursion. | 0 |
| [codec](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md) | object | Codec to use |
Example Input: Monitors all files (recursively) located at C:\Logs1\ matching *.log as a pattern. I.e. C:\Logs1\foo.log, C:\Logs1\Subdir\Log2.log, etc.
```json
{
"TimberWinR": {
"Inputs": {
"TailFiles": [
{
"logSource": "log files",
"location": "C:\\Logs1\\*.log",
"recurse": -1
}
]
}
}
}
```
## Fields
After a successful parse of an event, the following fields are added:
| Name | Type | Description |
| ---- |:-----| :-----------|
| LogFilename | STRING |Full path of the file containing this line |
| Index | INTEGER | Line number |
| Text | STRING | Text line content |