Merge pull request #50 from Cimpress-MCP/add_input_generator

All updates for 1.3.24.0
This commit is contained in:
Eric Fontana
2015-04-29 10:47:41 -04:00
20 changed files with 383 additions and 94 deletions

View File

@@ -37,7 +37,8 @@ The current supported Input format sources are:
5. [Stdin](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/StdinInput.md) (Standard Input for Debugging)
6. [W3C](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/W3CInput.md)(Internet Information Services W3C Advanced/Custom Format)
7. [Udp](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/UdpInput.md) (listens for UDP on port for JSON messages)
8. [TailFiles](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/TailFiles.md) (Tails log files efficiently *New*)
8. [TailFiles](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/TailFiles.md) (Tails log files efficiently)
8. [Generator](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Generator.md) (Generate logs for testing *New*)
## Codecs
The current list of supported codecs are:

View File

@@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.3.23.0")]
[assembly: AssemblyFileVersion("1.3.23.0")]
[assembly: AssemblyVersion("1.3.24.0")]
[assembly: AssemblyFileVersion("1.3.24.0")]

View File

@@ -63,8 +63,7 @@ namespace TimberWinR.TestGenerator
// This text is always added, making the file longer over time
// if it is not deleted.
using (StreamWriter sw = File.AppendText(logFilePath))
{
sw.AutoFlush = true;
{
for (int i = 0; i < parms.NumMessages; i++)
{
JObject o = new JObject

View File

@@ -64,8 +64,7 @@ namespace TimberWinR.TestGenerator
// This text is always added, making the file longer over time
// if it is not deleted.
using (StreamWriter sw = File.AppendText(logFilePath))
{
sw.AutoFlush = true;
{
for (int i = 0; i < parms.NumMessages; i++)
{
JObject o = new JObject

View File

@@ -5,14 +5,14 @@
"taillog": {
"test1: message sent count": "[messages] == 7404",
"test2: average cpu": "[avgCpuUsage] <= 30",
"test3: maximum memory": "[maxMemUsage] <= 20"
"test3: maximum memory": "[maxMemUsage] <= 30"
}
},
{
"udp": {
"test1: message sent count": "[messages] == 1234",
"test2: average cpu": "[avgCpuUsage] <= 30",
"test3: maximum memory": "[maxMemUsage] <= 20"
"test3: maximum memory": "[maxMemUsage] <= 30"
}
}
]

View File

@@ -5,14 +5,14 @@
"taillog": {
"test1: message sent count": "[messages] == 7404",
"test2: average cpu": "[avgCpuUsage] <= 30",
"test3: maximum memory": "[maxMemUsage] <= 15"
"test3: maximum memory": "[maxMemUsage] <= 20"
}
},
{
"udp": {
"test1: message sent count": "[messages] == 1234",
"test2: average cpu": "[avgCpuUsage] <= 30",
"test3: maximum memory": "[maxMemUsage] <= 15"
"test3: maximum memory": "[maxMemUsage] <= 20"
}
}
]

View File

@@ -8,7 +8,7 @@
"--numMessages": 1234,
"--logLevel": "debug",
"--udp-host": "localhost",
"--udp": "5140",
"--udp": "5140",
"--jroll": ["r1.jlog", "r2.jlog"],
"--json": ["1.jlog", "2.jlog", "3.jlog", "4.jlog"],
"--resultsFile": "results1.json"

View File

@@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Newtonsoft.Json.Linq;
using TimberWinR.Parser;
namespace TimberWinR.Codecs
{
class JsonCodec : ICodec
{
private CodecArguments _codecArguments;
public void Apply(string msg, Inputs.InputListener listener)
{
JObject jobject = JObject.Parse(msg);
listener.AddDefaultFields(jobject);
listener.ProcessJson(jobject);
}
public JsonCodec(CodecArguments args)
{
_codecArguments = args;
}
}
}

View File

@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Newtonsoft.Json.Linq;
using TimberWinR.Parser;
namespace TimberWinR.Codecs
{
public class PlainCodec : ICodec
{
private CodecArguments _codecArguments;
public void Apply(string msg, Inputs.InputListener listener)
{
JObject json = new JObject();
listener.AddDefaultFields(json);
json["message"] = ExpandField(msg, json);
listener.ProcessJson(json);
}
protected string ExpandField(string fieldName, JObject json)
{
foreach (var token in json.Children())
{
string replaceString = "%{" + token.Path + "}";
fieldName = fieldName.Replace(replaceString, json[token.Path].ToString());
}
return fieldName;
}
public PlainCodec(CodecArguments args)
{
_codecArguments = args;
}
}
}

View File

@@ -102,6 +102,12 @@ namespace TimberWinR
get { return _stdins; }
}
private List<GeneratorParameters> _generators = new List<GeneratorParameters>();
public IEnumerable<GeneratorParameters> Generators
{
get { return _generators; }
}
private List<LogstashFilter> _filters = new List<LogstashFilter>();
public IEnumerable<LogstashFilter> Filters
@@ -239,6 +245,8 @@ namespace TimberWinR
c._iisw3clogs.AddRange(x.TimberWinR.Inputs.IISW3CLogs.ToList());
if (x.TimberWinR.Inputs.Stdins != null)
c._stdins.AddRange(x.TimberWinR.Inputs.Stdins.ToList());
if (x.TimberWinR.Inputs.Generators != null)
c._generators.AddRange(x.TimberWinR.Inputs.Generators.ToList());
if (x.TimberWinR.Inputs.Logs != null)
c._logs.AddRange(x.TimberWinR.Inputs.Logs.ToList());
if (x.TimberWinR.Inputs.TailFilesArguments != null)

View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Xml.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
@@ -12,6 +13,8 @@ namespace TimberWinR.Parser
{
public partial class Json : LogstashFilter
{
private long _errorCount;
public Json()
{
RemoveSource = true;
@@ -22,6 +25,7 @@ namespace TimberWinR.Parser
new JProperty("json",
new JObject(
new JProperty("condition", Condition),
new JProperty("errors", _errorCount),
new JProperty("source", Source),
new JProperty("promote", Source),
new JProperty("target", Target),
@@ -102,6 +106,7 @@ namespace TimberWinR.Parser
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
return true;
}
}

View File

@@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NLog;
using RestSharp.Extensions;
using TimberWinR.Codecs;
using TimberWinR.Parser;
namespace TimberWinR.Inputs
{
public class GeneratorInput : InputListener
{
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("message", _params.Message),
new JProperty("messages", _sentMessages),
new JProperty("generator", "enabled"));
return json;
}
private TimberWinR.Parser.GeneratorParameters _params;
private Thread _listenThread;
private ICodec _codec;
private int _sentMessages;
public GeneratorInput(TimberWinR.Parser.GeneratorParameters parameters, CancellationToken cancelToken)
: base(cancelToken, "Win32-InputGen")
{
_params = parameters;
if (_params.CodecArguments != null)
{
switch (_params.CodecArguments.Type)
{
case CodecArguments.CodecType.json:
_codec = new JsonCodec(_params.CodecArguments);
break;
case CodecArguments.CodecType.multiline:
_codec = new Multiline(_params.CodecArguments);
break;
case CodecArguments.CodecType.plain:
_codec = new PlainCodec(_params.CodecArguments);
break;
}
}
_listenThread = new Thread(new ThreadStart(GenerateData));
_listenThread.Start();
}
private void GenerateData()
{
LogManager.GetCurrentClassLogger().Info("Generator Creating {0} Lines", _params.Count);
int numMessages = _params.Count;
// Infinite or until done.
for (int i = 0; (_params.Count == 0 || i < numMessages); i++)
{
if (CancelToken.IsCancellationRequested)
break;
string msg = ToPrintable(_params.Message);
if (_codec != null)
_codec.Apply(msg, this);
else
{
JObject jo = new JObject();
jo["Message"] = msg;
AddDefaultFields(jo);
ProcessJson(jo);
}
Thread.Sleep(_params.Rate);
}
Finished();
}
}
}

View File

@@ -77,6 +77,7 @@ namespace TimberWinR.Inputs
var fi = new FileInfo(logName);
de.FileName = logName;
de.LogFileExists = fi.Exists;
de.Previous = "";
de.NewFile = true;
de.ProcessedFile = false;
de.LastPosition = fi.Length;
@@ -101,8 +102,10 @@ namespace TimberWinR.Inputs
var creationTime = fi.CreationTimeUtc;
if (dbe.LogFileExists && creationTime != dbe.CreationTimeUtc)
dbe.NewFile = true;
{
dbe.NewFile = true;
dbe.Previous = "";
}
dbe.CreationTimeUtc = creationTime;
return dbe;
@@ -133,7 +136,8 @@ namespace TimberWinR.Inputs
public static void Roll(LogsFileDatabaseEntry dbe)
{
dbe.ProcessedFile = false;
dbe.LastPosition = 0;
dbe.LastPosition = 0;
dbe.Previous = "";
Instance.UpdateEntry(dbe, 0);
dbe.NewFile = true;
}
@@ -268,6 +272,7 @@ namespace TimberWinR.Inputs
{
Interlocked.Increment(ref _linesProcessed);
}
public string Previous { get; set; }
}
}

View File

@@ -30,18 +30,19 @@ namespace TimberWinR.Inputs
private int _pollingIntervalInSeconds;
private TimberWinR.Parser.TailFileArguments _arguments;
private long _receivedMessages;
private long _errorCount;
private CodecArguments _codecArguments;
private ICodec _codec;
public bool Stop { get; set; }
public TailFileListener(TimberWinR.Parser.TailFileArguments arguments, CancellationToken cancelToken)
public TailFileListener(TimberWinR.Parser.TailFileArguments arguments,
CancellationToken cancelToken)
: base(cancelToken, "Win32-TailLog")
{
{
Stop = false;
EnsureRollingCaught();
EnsureRollingCaught();
_codecArguments = arguments.CodecArguments;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
@@ -60,7 +61,9 @@ namespace TimberWinR.Inputs
public override void Shutdown()
{
LogManager.GetCurrentClassLogger().Info("{0}: Shutting Down {1} for {2}", Thread.CurrentThread.ManagedThreadId, InputType, _arguments.Location);
LogManager.GetCurrentClassLogger()
.Info("{0}: Shutting Down {1} for {2}", Thread.CurrentThread.ManagedThreadId, InputType,
_arguments.Location);
Stop = true;
base.Shutdown();
}
@@ -71,6 +74,7 @@ namespace TimberWinR.Inputs
new JProperty("taillog",
new JObject(
new JProperty("messages", _receivedMessages),
new JProperty("errors", _errorCount),
new JProperty("type", InputType),
new JProperty("location", _arguments.Location),
new JProperty("logSource", _arguments.LogSource),
@@ -103,73 +107,100 @@ namespace TimberWinR.Inputs
private void TailFileContents(string fileName, long offset, LogsFileDatabaseEntry dbe)
{
using (StreamReader reader = new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)))
const int bufSize = 16535;
long prevLen = offset;
FileInfo fi = new FileInfo(fileName);
if (!fi.Exists)
return;
LogManager.GetCurrentClassLogger().Trace(":{0} Tailing File: {1} as Pos: {2}", Thread.CurrentThread.ManagedThreadId, fileName, prevLen);
using (var stream = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read, FileShare.Delete | FileShare.ReadWrite))
{
//start at the end of the file
long lastMaxOffset = offset;
stream.Seek(prevLen, SeekOrigin.Begin);
//if the file size has not changed, idle
if (reader.BaseStream.Length == lastMaxOffset)
return;
//seek to the last max offset
LogManager.GetCurrentClassLogger().Trace("{0}: File: {1} Seek to: {2}", Thread.CurrentThread.ManagedThreadId, fileName, lastMaxOffset);
var seekedTo = reader.BaseStream.Seek(lastMaxOffset, SeekOrigin.Begin);
// We couldn't seek to the position, so remember what we have seeked to.
if (seekedTo != lastMaxOffset)
char[] buffer = new char[bufSize];
StringBuilder current = new StringBuilder();
using (StreamReader sr = new StreamReader(stream))
{
lastMaxOffset = seekedTo;
LogsFileDatabase.Update(dbe, true, lastMaxOffset);
}
//read out of the file until the EOF
string line = "";
long lineOffset = 0;
while ((line = reader.ReadLine()) != null)
{
if (string.IsNullOrEmpty(line))
continue;
long index = lastMaxOffset + lineOffset;
string text = line;
string logFileName = fileName;
var json = new JObject();
if (json["logSource"] == null)
int nRead;
do
{
if (string.IsNullOrEmpty(_arguments.LogSource))
json.Add(new JProperty("logSource", fileName));
else
json.Add(new JProperty("logSource", _arguments.LogSource));
// Read a buffered amount
nRead = sr.ReadBlock(buffer, 0, bufSize);
for (int i = 0; i < nRead; ++i)
{
// We need the terminator!
if (buffer[i] == '\n' || buffer[i] == '\r')
{
if (current.Length > 0)
{
string line = string.Concat(dbe.Previous, current);
var json = new JObject();
if (json["logSource"] == null)
{
if (string.IsNullOrEmpty(_arguments.LogSource))
json.Add(new JProperty("logSource", fileName));
else
json.Add(new JProperty("logSource", _arguments.LogSource));
}
//LogManager.GetCurrentClassLogger().Debug(":{0} File: {1}:{2} {3}", Thread.CurrentThread.ManagedThreadId, fileName, dbe.LinesProcessed, line);
// We've processed the partial input
dbe.Previous = "";
json["Text"] = line;
json["Index"] = dbe.LinesProcessed;
json["LogFileName"] = fileName;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
{
try
{
_codec.Apply(line, this);
Interlocked.Increment(ref _receivedMessages);
dbe.IncrementLineCount();
}
catch (Exception ex)
{
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger().ErrorException("Filter Error", ex);
}
}
else
{
try
{
ProcessJson(json);
dbe.IncrementLineCount();
Interlocked.Increment(ref _receivedMessages);
LogsFileDatabase.Update(dbe, true, sr.BaseStream.Position);
}
catch (Exception ex)
{
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger().ErrorException("Process Error", ex);
}
}
}
current = new StringBuilder();
}
else // Copy character into the buffer
{
current.Append(buffer[i]);
}
}
} while (nRead > 0);
// We didn't encounter the newline, so save it.
if (current.Length > 0)
{
dbe.Previous = current.ToString();
}
json["Text"] = line;
json["Index"] = index;
json["LogFileName"] = fileName;
if (_codecArguments != null && _codecArguments.Type == CodecArguments.CodecType.multiline)
{
_codec.Apply(line, this);
Interlocked.Increment(ref _receivedMessages);
dbe.IncrementLineCount();
}
else
{
ProcessJson(json);
Interlocked.Increment(ref _receivedMessages);
dbe.IncrementLineCount();
//LogManager.GetCurrentClassLogger().Info("{0}: File: {1} {2} {3}", Thread.CurrentThread.ManagedThreadId, fileName, dbe.LinesProcessed, line);
}
lineOffset += line.Length;
}
//update the last max offset
lastMaxOffset = reader.BaseStream.Position;
LogsFileDatabase.Update(dbe, true, lastMaxOffset);
}
}
}
// One thread for each kind of file to watch, i.e. "*.log,*.txt" would be two separate
// threads.
@@ -203,13 +234,13 @@ namespace TimberWinR.Inputs
foreach (string fileName in Directory.GetFiles(path, name, so))
{
var dbe = LogsFileDatabase.LookupLogFile(fileName);
// We only spin up 1 thread for a file we haven't yet seen.
if (isWildcardPattern && !HaveSeenFile(fileName) && dbe.NewFile)
{
LogManager.GetCurrentClassLogger().Debug(":{0} Starting Thread Tailing File: {1}", Thread.CurrentThread.ManagedThreadId, dbe.FileName);
LogsFileDatabase.Update(dbe, false, dbe.LastPosition);
Task.Factory.StartNew(() => TailFileWatcher(fileName));
}
else if (!isWildcardPattern)
@@ -237,7 +268,7 @@ namespace TimberWinR.Inputs
else
{
TailFileContents(dbe.FileName, dbe.LastPosition, dbe);
}
}
}
}
}
@@ -281,7 +312,7 @@ namespace TimberWinR.Inputs
}
}
Finished();
}
}
}
}

View File

@@ -22,9 +22,7 @@ namespace TimberWinR
public class Manager
{
public Configuration Config { get; set; }
public List<OutputSender> Outputs { get; set; }
public List<TcpInputListener> Tcps { get; set; }
public List<TcpInputListener> Udps { get; set; }
public List<OutputSender> Outputs { get; set; }
public List<InputListener> Listeners { get; set; }
public bool LiveMonitor { get; set; }
@@ -256,6 +254,15 @@ namespace TimberWinR
output.Connect(elistner);
}
foreach (var stdin in config.Generators)
{
var elistner = new GeneratorInput(stdin, cancelToken);
Listeners.Add(elistner);
foreach (var output in Outputs)
output.Connect(elistner);
}
var computerName = System.Environment.MachineName + "." +
Microsoft.Win32.Registry.LocalMachine.OpenSubKey(
@"SYSTEM\CurrentControlSet\services\Tcpip\Parameters")

View File

@@ -263,12 +263,46 @@ namespace TimberWinR.Parser
}
}
public class GeneratorParameters : IValidateSchema
{
[JsonProperty(PropertyName = "type")]
public string Type { get; set; }
[JsonProperty(PropertyName = "codec")]
public CodecArguments CodecArguments { get; set; }
[JsonProperty(PropertyName = "message")]
public string Message { get; set; }
[JsonProperty(PropertyName = "count")]
public int Count { get; set; }
[JsonProperty(PropertyName = "rate")]
public int Rate { get; set; }
public void Validate()
{
}
public GeneratorParameters()
{
Count = 0; // Infinity messages
Rate = 10; // Milliseconds
Message = "Hello, world!";
CodecArguments = new CodecArguments();
CodecArguments.Type = CodecArguments.CodecType.plain;
}
}
public class CodecArguments
{
public enum CodecType
{
singleline,
multiline
multiline,
json,
plain
};
public enum WhatType
@@ -668,6 +702,9 @@ namespace TimberWinR.Parser
[JsonProperty("Stdin")]
public Stdin[] Stdins { get; set; }
[JsonProperty("Generator")]
public GeneratorParameters[] Generators { get; set; }
}
public partial class Grok : LogstashFilter, IValidateSchema

View File

@@ -3,6 +3,11 @@
A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service.
Version / Date
### 1.3.24.0 - 2015-04-29
1. Fixed potential bug in TailFiles when tailing log files which are partially flushed
to disk, it now will not process the line until the \r\n has been seen.
2. Added Generator input.
### 1.3.23.0 - 2015-04-23
1. Fixed bug with parsing a single json config file, rather than reading
JSON files from a directory.

View File

@@ -82,7 +82,9 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Codecs\JsonCodec.cs" />
<Compile Include="Codecs\Multiline.cs" />
<Compile Include="Codecs\PlainCodec.cs" />
<Compile Include="Configuration.cs" />
<Compile Include="ConfigurationErrors.cs" />
<Compile Include="Diagnostics\Diagnostics.cs" />
@@ -94,6 +96,7 @@
<Compile Include="Filters\MutateFilter.cs" />
<Compile Include="ICodec.cs" />
<Compile Include="Inputs\FieldDefinitions.cs" />
<Compile Include="Inputs\GeneratorInput.cs" />
<Compile Include="Inputs\IISW3CRowReader.cs" />
<Compile Include="Inputs\LogsFileDatabase.cs" />
<Compile Include="Inputs\TailFileListener.cs" />
@@ -137,6 +140,7 @@
<None Include="mdocs\DateFilter.md" />
<None Include="mdocs\Filters.md" />
<None Include="mdocs\GeoIPFilter.md" />
<None Include="mdocs\Generator.md" />
<None Include="mdocs\TailFiles.md" />
<None Include="mdocs\UdpInput.md" />
<None Include="mdocs\W3CInput.md" />

View File

@@ -0,0 +1,37 @@
# Input: Generator
The Generator input can be used to Generate log files for test purposes.
## Parameters
The following parameters are allowed when configuring the test log Generator.
| Parameter | Type | Description | Details | Default |
| :---------------- |:---------------| :----------------------------------------------------------------------- | :--------------------------- | :-- |
| *type* | string |Message type | | Win32-InputGen |
| *message* | string |Message format to send | | Hello, World! |
| *count* | integer |Number of messages to generate | 0 - Infinite, otherwise that number | 0 |
| *rate* | integer |Sleep time between generated messages | Milliseconds | 10 |
| [codec](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md) | object | Codec to use |
Example: Generate 100000 "Hello Win32-InputGen" messages
```json
{
"TimberWinR": {
"Inputs": {
"Generator": [
{
"message": "Hello %{type}",
"count": 100000
}
]
}
}
}
```
## Fields
After a successful parse of the generated line, the following fields are added:
| Name | Type | Description |
| ---- |:-----| :-----------|
| Message | STRING | Text line content |

View File

@@ -2,8 +2,6 @@ $packageName = 'TimberWinR-${version}' # arbitrary name for the package, used in
$installerType = 'msi' #only one of these: exe, msi, msu
$scriptPath = $(Split-Path $MyInvocation.MyCommand.Path)
$fileFullPath = Join-Path $scriptPath 'TimberWinR-${version}.0.msi'
$silentArgs = '${PROJECTGUID} /quiet'
$silentArgs = '{CC4DF908-07C4-4BD8-A9FA-6E6AC315E30B} /quiet'
$validExitCodes = @(0) #please insert other valid exit codes here, exit codes for ms http://msdn.microsoft.com/en-us/library/aa368542(VS.85).aspx
UnInstall-ChocolateyPackage "$packageName" "$installerType" "$silentArgs" "fileFullPath" -validExitCodes $validExitCodes