From ffd790b4ba3ec947cbe216667c36e6e341534eab Mon Sep 17 00:00:00 2001 From: amiry Date: Tue, 14 Apr 2015 10:35:00 +0300 Subject: [PATCH] Added async support" --- CHANGELOG.md | 3 + StatsdClient/AsyncLock.cs | 42 +++ StatsdClient/IOutputChannel.cs | 23 +- StatsdClient/IStatsd.cs | 33 +- StatsdClient/NullOutputChannel.cs | 16 +- StatsdClient/OutputChannelExtensions.cs | 10 + StatsdClient/Statsd.cs | 396 ++++++++++++------------ StatsdClient/StatsdClient.csproj | 2 + StatsdClient/StatsdClientExtensions.cs | 140 +++++++-- StatsdClient/TcpOutputChannel.cs | 138 +++++---- StatsdClient/UdpOutputChannel.cs | 56 ++-- 11 files changed, 493 insertions(+), 366 deletions(-) create mode 100644 StatsdClient/AsyncLock.cs create mode 100644 StatsdClient/OutputChannelExtensions.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f50e64..f1fd86c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # statsd-csharp-client Changelog +## v1.4.0.0 +* Added async support + ## v1.3.0.0 * Added support for Calendargrams diff --git a/StatsdClient/AsyncLock.cs b/StatsdClient/AsyncLock.cs new file mode 100644 index 0000000..376bb50 --- /dev/null +++ b/StatsdClient/AsyncLock.cs @@ -0,0 +1,42 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace StatsdClient +{ + internal sealed class AsyncLock + { + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private readonly Task _releaser; + + public AsyncLock() + { + _releaser = Task.FromResult((IDisposable)new Releaser(this)); + } + + public Task LockAsync() + { + var wait = _semaphore.WaitAsync(); + return wait.IsCompleted ? + _releaser : + wait.ContinueWith((_, state) => (IDisposable)state, + _releaser.Result, CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + } + + private sealed class Releaser : IDisposable + { + private readonly AsyncLock _toRelease; + + internal Releaser(AsyncLock toRelease) + { + _toRelease = toRelease; + } + + public void Dispose() + { + _toRelease._semaphore.Release(); + } + } + } +} \ No newline at end of file diff --git a/StatsdClient/IOutputChannel.cs b/StatsdClient/IOutputChannel.cs index 85f333b..9d79586 100644 --- a/StatsdClient/IOutputChannel.cs +++ b/StatsdClient/IOutputChannel.cs @@ -1,18 +1,15 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Threading.Tasks; namespace StatsdClient { - /// - /// Contract for sending raw statds lines to the server - /// - public interface IOutputChannel - { /// - /// Sends a line of stats data to the server. + /// Contract for sending raw statds lines to the server /// - void Send(string line); - } -} + public interface IOutputChannel + { + /// + /// Sends a line of stats data to the server asynchronously. + /// + Task SendAsync(string line); + } +} \ No newline at end of file diff --git a/StatsdClient/IStatsd.cs b/StatsdClient/IStatsd.cs index 54c1629..a82d695 100644 --- a/StatsdClient/IStatsd.cs +++ b/StatsdClient/IStatsd.cs @@ -1,4 +1,5 @@ -using System; +using System.Threading.Tasks; + namespace StatsdClient { /// @@ -9,45 +10,47 @@ namespace StatsdClient /// /// Log a count for a metric /// - void LogCount(string name, int count = 1); + Task LogCountAsync(string name, long count = 1); + /// /// Log a gauge value /// - void LogGauge(string name, int value); + Task LogGaugeAsync(string name, long value); + /// /// Log a latency / Timing /// - void LogTiming(string name, int milliseconds); - /// - /// Log a latency / Timing - /// - void LogTiming(string name, long milliseconds); + Task LogTimingAsync(string name, long milliseconds); + /// /// Log the number of unique occurrances of something /// /// /// - void LogSet(string name, int value); + Task LogSetAsync(string name, long value); + /// - /// Log a calendargram metric + /// Log a calendargram metric /// /// The metric namespace /// The unique value to be counted in the time period /// The time period, can be one of h,d,dow,w,m - void LogCalendargram(string name, string value, string period); + Task LogCalendargramAsync(string name, string value, string period); + /// - /// Log a calendargram metric + /// Log a calendargram metric /// /// The metric namespace /// The unique value to be counted in the time period /// The time period, can be one of h,d,dow,w,m - void LogCalendargram(string name, int value, string period); + Task LogCalendargramAsync(string name, long value, string period); + /// /// Log a raw metric that will not get aggregated on the server. /// /// The metric name. /// The metric value. /// (optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you. - void LogRaw(string name, int value, long? epoch = null); + Task LogRawAsync(string name, long value, long? epoch = null); } -} +} \ No newline at end of file diff --git a/StatsdClient/NullOutputChannel.cs b/StatsdClient/NullOutputChannel.cs index 713f30d..42d67be 100644 --- a/StatsdClient/NullOutputChannel.cs +++ b/StatsdClient/NullOutputChannel.cs @@ -1,15 +1,11 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Threading.Tasks; namespace StatsdClient { - internal sealed class NullOutputChannel : IOutputChannel - { - public void Send(string line) + internal sealed class NullOutputChannel : IOutputChannel { - // noop + public async Task SendAsync(string line) + { + } } - } -} +} \ No newline at end of file diff --git a/StatsdClient/OutputChannelExtensions.cs b/StatsdClient/OutputChannelExtensions.cs new file mode 100644 index 0000000..0ccfcf7 --- /dev/null +++ b/StatsdClient/OutputChannelExtensions.cs @@ -0,0 +1,10 @@ +namespace StatsdClient +{ + public static class OutputChannelExtensions + { + public static void Send(this IOutputChannel outputChannel, string line) + { + outputChannel.SendAsync(line).Wait(); + } + } +} \ No newline at end of file diff --git a/StatsdClient/Statsd.cs b/StatsdClient/Statsd.cs index 959084c..cd74e85 100644 --- a/StatsdClient/Statsd.cs +++ b/StatsdClient/Statsd.cs @@ -1,226 +1,216 @@ using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; -using System.Text; +using System.Threading.Tasks; namespace StatsdClient { - /// - /// The statsd client library. - /// - public class Statsd : IStatsd - { - private string _prefix; - private IOutputChannel _outputChannel; - /// - /// Creates a new instance of the Statsd client. + /// The statsd client library. /// - /// The statsd or statsd.net server. - /// - public Statsd(string host, int port) + public class Statsd : IStatsd { - if ( String.IsNullOrEmpty( host ) ) - { - Trace.TraceWarning( "Statsd client initialised with empty host address. Dropping back to NullOutputChannel." ); - InitialiseInternal( () => new NullOutputChannel(), "", false ); - } - else - { - InitialiseInternal( () => new UdpOutputChannel( host, port ), "", false ); - } - } + private string _prefix; + private IOutputChannel _outputChannel; - /// - /// Creates a new instance of the Statsd client. - /// - /// The statsd or statsd.net server. - /// - /// A string prefix to prepend to every metric. - /// If True, rethrows any exceptions caught due to bad configuration. - /// Choose between a UDP (recommended) or TCP connection. - /// Retry the connection if it fails (TCP only). - /// Number of times to retry before giving up (TCP only). - public Statsd(string host, - int port, - ConnectionType connectionType = ConnectionType.Udp, - string prefix = null, - bool rethrowOnError = false, - bool retryOnDisconnect = true, - int retryAttempts = 3) - { - InitialiseInternal(() => + /// + /// Creates a new instance of the Statsd client. + /// + /// The statsd or statsd.net server. + /// + public Statsd(string host, int port) { - return connectionType == ConnectionType.Tcp - ? (IOutputChannel)new TcpOutputChannel(host, port, retryOnDisconnect, retryAttempts) - : (IOutputChannel)new UdpOutputChannel(host, port); - }, - prefix, - rethrowOnError); - } - - /// - /// Creates a new instance of the Statsd client. - /// - /// The statsd or statsd.net server. - /// - /// A string prefix to prepend to every metric. - /// If True, rethrows any exceptions caught due to bad configuration. - /// Optional output channel (useful for mocking / testing). - public Statsd(string host, int port, string prefix = null, bool rethrowOnError = false, IOutputChannel outputChannel = null) - { - if (outputChannel == null) - { - InitialiseInternal(() => new UdpOutputChannel(host, port), prefix, rethrowOnError); - } - else - { - InitialiseInternal(() => outputChannel, prefix, rethrowOnError); - } - } - - private void InitialiseInternal(Func createOutputChannel, string prefix, bool rethrowOnError) - { - _prefix = prefix; - if (_prefix != null && _prefix.EndsWith(".")) - { - _prefix = _prefix.Substring(0, _prefix.Length - 1); - } - try - { - _outputChannel = createOutputChannel(); - } - catch (Exception ex) - { - if (rethrowOnError) - { - throw; + if (String.IsNullOrEmpty(host)) + { + Trace.TraceWarning("Statsd client initialised with empty host address. Dropping back to NullOutputChannel."); + InitialiseInternal(() => new NullOutputChannel(), "", false); + } + else + { + InitialiseInternal(() => new UdpOutputChannel(host, port), "", false); + } } - Trace.TraceError("Could not initialise the Statsd client: {0} - falling back to NullOutputChannel.", ex.Message); - _outputChannel = new NullOutputChannel(); - } - } - /// - /// Log a counter. - /// - /// The metric name. - /// The counter value (defaults to 1). - public void LogCount(string name, int count = 1) - { - SendMetric(MetricType.COUNT, name, _prefix, count); - } + /// + /// Creates a new instance of the Statsd client. + /// + /// The statsd or statsd.net server. + /// + /// A string prefix to prepend to every metric. + /// If True, rethrows any exceptions caught due to bad configuration. + /// Choose between a UDP (recommended) or TCP connection. + /// Retry the connection if it fails (TCP only). + /// Number of times to retry before giving up (TCP only). + public Statsd(string host, + int port, + ConnectionType connectionType = ConnectionType.Udp, + string prefix = null, + bool rethrowOnError = false, + bool retryOnDisconnect = true, + int retryAttempts = 3) + { + InitialiseInternal(() => + { + return connectionType == ConnectionType.Tcp + ? (IOutputChannel)new TcpOutputChannel(host, port, retryOnDisconnect, retryAttempts) + : (IOutputChannel)new UdpOutputChannel(host, port); + }, + prefix, + rethrowOnError); + } - /// - /// Log a timing / latency - /// - /// The metric name. - /// The duration, in milliseconds, for this metric. - public void LogTiming(string name, int milliseconds) - { - SendMetric(MetricType.TIMING, name, _prefix, milliseconds); - } + /// + /// Creates a new instance of the Statsd client. + /// + /// The statsd or statsd.net server. + /// + /// A string prefix to prepend to every metric. + /// If True, rethrows any exceptions caught due to bad configuration. + /// Optional output channel (useful for mocking / testing). + public Statsd(string host, int port, string prefix = null, bool rethrowOnError = false, IOutputChannel outputChannel = null) + { + if (outputChannel == null) + { + InitialiseInternal(() => new UdpOutputChannel(host, port), prefix, rethrowOnError); + } + else + { + InitialiseInternal(() => outputChannel, prefix, rethrowOnError); + } + } - /// - /// Log a timing / latency - /// - /// The metric name. - /// The duration, in milliseconds, for this metric. - public void LogTiming(string name, long milliseconds) - { - LogTiming(name, (int)milliseconds); - } + private void InitialiseInternal(Func createOutputChannel, string prefix, bool rethrowOnError) + { + _prefix = prefix; + if (_prefix != null && _prefix.EndsWith(".")) + { + _prefix = _prefix.Substring(0, _prefix.Length - 1); + } + try + { + _outputChannel = createOutputChannel(); + } + catch (Exception ex) + { + if (rethrowOnError) + { + throw; + } + Trace.TraceError("Could not initialise the Statsd client: {0} - falling back to NullOutputChannel.", ex.Message); + _outputChannel = new NullOutputChannel(); + } + } - /// - /// Log a gauge. - /// - /// The metric name - /// The value for this gauge - public void LogGauge(string name, int value) - { - SendMetric(MetricType.GAUGE, name, _prefix, value); - } + /// + /// Log a counter. + /// + /// The metric name. + /// The counter value (defaults to 1). + public async Task LogCountAsync(string name, long count = 1) + { + await SendMetricAsync(MetricType.COUNT, name, _prefix, count); + } - /// - /// Log to a set - /// - /// The metric name. - /// The value to log. - /// Logging to a set is about counting the number - /// of occurrences of each event. - public void LogSet(string name, int value) - { - SendMetric(MetricType.SET, name, _prefix, value); - } + /// + /// Log a timing / latency + /// + /// The metric name. + /// The duration, in milliseconds, for this metric. + public async Task LogTimingAsync(string name, long milliseconds) + { + await SendMetricAsync(MetricType.TIMING, name, _prefix, milliseconds); + } - /// - /// Log a calendargram metric - /// - /// The metric namespace - /// The unique value to be counted in the time period - /// The time period, can be one of h,d,dow,w,m - public void LogCalendargram(string name, string value, string period) - { - SendMetric(MetricType.CALENDARGRAM, name, _prefix, value, period); - } + /// + /// Log a gauge. + /// + /// The metric name + /// The value for this gauge + public async Task LogGaugeAsync(string name, long value) + { + await SendMetricAsync(MetricType.GAUGE, name, _prefix, value); + } - /// - /// Log a calendargram metric - /// - /// The metric namespace - /// The unique value to be counted in the time period - /// The time period, can be one of h,d,dow,w,m - public void LogCalendargram(string name, int value, string period) - { - SendMetric(MetricType.CALENDARGRAM, name, _prefix, value, period); - } + /// + /// Log to a set + /// + /// The metric name. + /// The value to log. + /// + /// Logging to a set is about counting the number + /// of occurrences of each event. + /// + public async Task LogSetAsync(string name, long value) + { + await SendMetricAsync(MetricType.SET, name, _prefix, value); + } - /// - /// Log a raw metric that will not get aggregated on the server. - /// - /// The metric name. - /// The metric value. - /// (optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you. - public void LogRaw(string name, int value, long? epoch = null) - { - SendMetric(MetricType.RAW, name, String.Empty, value, epoch.HasValue ? epoch.ToString() : (string)null); - } + /// + /// Log a raw metric that will not get aggregated on the server. + /// + /// The metric name. + /// The metric value. + /// (optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you. + public async Task LogRawAsync(string name, long value, long? epoch = null) + { + await SendMetricAsync(MetricType.RAW, name, String.Empty, value, epoch.HasValue ? epoch.ToString() : null); + } - private void SendMetric(string metricType, string name, string prefix, int value, string postFix = null) - { - if (value < 0) - { - Trace.TraceWarning(String.Format("Metric value for {0} was less than zero: {1}. Not sending.", name, value)); - return; - } - SendMetric(metricType, name, prefix, value.ToString(), postFix); - } + /// + /// Log a calendargram metric + /// + /// The metric namespace + /// The unique value to be counted in the time period + /// The time period, can be one of h,d,dow,w,m + public async Task LogCalendargramAsync(string name, string value, string period) + { + await SendMetricAsync(MetricType.CALENDARGRAM, name, _prefix, value, period); + } - private void SendMetric(string metricType, string name, string prefix, string value, string postFix = null) - { - if (String.IsNullOrEmpty(name)) - { - throw new ArgumentNullException("name"); - } - _outputChannel.Send(PrepareMetric(metricType, name, prefix, value, postFix)); - } + /// + /// Log a calendargram metric + /// + /// The metric namespace + /// The unique value to be counted in the time period + /// The time period, can be one of h,d,dow,w,m + public async Task LogCalendargramAsync(string name, long value, string period) + { + await SendMetricAsync(MetricType.CALENDARGRAM, name, _prefix, value, period); + } - /// - /// Prepare a metric prior to sending it off ot the Graphite server. - /// - /// - /// - /// - /// - /// A value to append to the end of the line. - /// The formatted metric - protected virtual string PrepareMetric(string metricType, string name, string prefix, string value, string postFix = null) - { - return (String.IsNullOrEmpty(prefix) ? name : (prefix + "." + name)) - + ":" + value - + "|" + metricType - + (postFix == null ? String.Empty : "|" + postFix); + private async Task SendMetricAsync(string metricType, string name, string prefix, long value, string postFix = null) + { + if (value < 0) + { + Trace.TraceWarning("Metric value for {0} was less than zero: {1}. Not sending.", name, value); + return; + } + await SendMetricAsync(metricType, name, prefix, value.ToString(), postFix); + } + + private async Task SendMetricAsync(string metricType, string name, string prefix, string value, string postFix = null) + { + if (String.IsNullOrEmpty(name)) + { + throw new ArgumentNullException("name"); + } + await _outputChannel.SendAsync(PrepareMetric(metricType, name, prefix, value, postFix)); + } + + /// + /// Prepare a metric prior to sending it off ot the Graphite server. + /// + /// + /// + /// + /// + /// A value to append to the end of the line. + /// The formatted metric + protected virtual string PrepareMetric(string metricType, string name, string prefix, string value, string postFix = null) + { + return (String.IsNullOrEmpty(prefix) ? name : (prefix + "." + name)) + + ":" + value + + "|" + metricType + + (postFix == null ? String.Empty : "|" + postFix); + } } - } -} +} \ No newline at end of file diff --git a/StatsdClient/StatsdClient.csproj b/StatsdClient/StatsdClient.csproj index 3814309..10775e5 100644 --- a/StatsdClient/StatsdClient.csproj +++ b/StatsdClient/StatsdClient.csproj @@ -40,12 +40,14 @@ + + diff --git a/StatsdClient/StatsdClientExtensions.cs b/StatsdClient/StatsdClientExtensions.cs index 96615fa..542e21f 100644 --- a/StatsdClient/StatsdClientExtensions.cs +++ b/StatsdClient/StatsdClientExtensions.cs @@ -1,35 +1,111 @@ -using StatsdClient; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System; - /// - /// A set of extensions for use with the StatsdClient library. - /// -public static class StatsdClientExtensions +namespace StatsdClient { - /// - /// Log a timing metric - /// - /// The statsd client instance. - /// The namespace of the timing metric. - /// The duration to log (will be converted into milliseconds) - public static void LogTiming(this IStatsd client, string name, TimeSpan duration) - { - client.LogTiming(name, (int)duration.TotalMilliseconds); - } + /// + /// A set of extensions for use with the StatsdClient library. + /// + public static class StatsdClientExtensions + { + /// + /// Log a counter. + /// + /// The metric name. + /// The counter value (defaults to 1). + public static void LogCount(this IStatsd client, string name, int count = 1) + { + client.LogCountAsync(name, count).Wait(); + } - /// - /// Starts a timing metric that will be logged when the TimingToken is disposed. - /// - /// The statsd clien instance. - /// The namespace of the timing metric. - /// A timing token that has been initialised with a start datetime. - /// Wrap the code you want to measure in a using() {} block. The - /// TimingToken instance will log the duration when it is disposed. - public static TimingToken LogTiming(this IStatsd client, string name) - { - return new TimingToken(client, name); - } -} + /// + /// Log a timing / latency + /// + /// The metric name. + /// The duration, in milliseconds, for this metric. + public static void LogTiming(this IStatsd client, string name, long milliseconds) + { + client.LogTimingAsync(name, milliseconds).Wait(); + } + + /// + /// Log a gauge. + /// + /// The metric name + /// The value for this gauge + public static void LogGauge(this IStatsd client, string name, int value) + { + client.LogGaugeAsync(name, value).Wait(); + } + + /// + /// Log to a set + /// + /// The metric name. + /// The value to log. + /// + /// Logging to a set is about counting the number of occurrences of each event. + /// + public static void LogSet(this IStatsd client, string name, int value) + { + client.LogSetAsync(name, value).Wait(); + } + + /// + /// Log a raw metric that will not get aggregated on the server. + /// + /// The metric name. + /// The metric value. + /// (optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you. + public static void LogRaw(this IStatsd client, string name, int value, long? epoch = null) + { + client.LogRawAsync(name, value, epoch).Wait(); + } + + /// + /// Log a calendargram metric + /// + /// The metric namespace + /// The unique value to be counted in the time period + /// The time period, can be one of h,d,dow,w,m + public static void LogCalendargram(this IStatsd client, string name, string value, string period) + { + client.LogCalendargramAsync(name, value, period).Wait(); + } + + /// + /// Log a calendargram metric + /// + /// The metric namespace + /// The unique value to be counted in the time period + /// The time period, can be one of h,d,dow,w,m + public static void LogCalendargram(this IStatsd client, string name, long value, string period) + { + client.LogCalendargramAsync(name, value, period).Wait(); + } + + /// + /// Log a timing metric + /// + /// The statsd client instance. + /// The namespace of the timing metric. + /// The duration to log (will be converted into milliseconds) + public static void LogTiming(this IStatsd client, string name, TimeSpan duration) + { + client.LogTiming(name, (long)duration.TotalMilliseconds); + } + + /// + /// Starts a timing metric that will be logged when the TimingToken is disposed. + /// + /// The statsd clien instance. + /// The namespace of the timing metric. + /// A timing token that has been initialised with a start datetime. + /// + /// Wrap the code you want to measure in a using() {} block. The TimingToken instance will log the duration when it is disposed. + /// + public static TimingToken LogTiming(this IStatsd client, string name) + { + return new TimingToken(client, name); + } + } +} \ No newline at end of file diff --git a/StatsdClient/TcpOutputChannel.cs b/StatsdClient/TcpOutputChannel.cs index 10f946c..d211d93 100644 --- a/StatsdClient/TcpOutputChannel.cs +++ b/StatsdClient/TcpOutputChannel.cs @@ -1,85 +1,87 @@ using System; -using System.Collections.Generic; using System.Diagnostics; using System.IO; -using System.Linq; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace StatsdClient { - internal sealed class TcpOutputChannel : IOutputChannel - { - private TcpClient _tcpClient; - private NetworkStream _stream; - private object _reconnectLock; - private string _host; - private int _port; - private bool _reconnectEnabled; - private int _retryAttempts; - - public TcpOutputChannel(string host, int port, bool reconnectEnabled = true, int retryAttempts = 3) + internal sealed class TcpOutputChannel : IOutputChannel { - _host = host; - _port = port; - _reconnectEnabled = reconnectEnabled; - _retryAttempts = retryAttempts; - _tcpClient = new TcpClient(); - _reconnectLock = new object(); - } + private readonly TcpClient _tcpClient; + private NetworkStream _stream; + private readonly string _host; + private readonly int _port; + private readonly bool _reconnectEnabled; + private readonly int _retryAttempts; + private readonly AsyncLock _asyncLock; - public void Send(string line) - { - SendWithRetry(line, _reconnectEnabled ? _retryAttempts - 1 : 0); - } + public TcpOutputChannel(string host, int port, bool reconnectEnabled = true, int retryAttempts = 3) + { + _host = host; + _port = port; + _reconnectEnabled = reconnectEnabled; + _retryAttempts = retryAttempts; + _tcpClient = new TcpClient(); + _asyncLock = new AsyncLock(); + } - private void SendWithRetry(string line, int attemptsLeft) - { - try - { - if ( !_tcpClient.Connected ) + public async Task SendAsync(string line) { - RestoreConnection(); + await SendWithRetryAsync(line, _reconnectEnabled ? _retryAttempts - 1 : 0); } - var bytesToSend = Encoding.UTF8.GetBytes( line + Environment.NewLine ); - _stream.Write( bytesToSend, 0, bytesToSend.Length ); - } - catch ( IOException ex ) - { - if ( attemptsLeft > 0 ) - { - SendWithRetry( line, --attemptsLeft ); - } - else - { - // No more attempts left, so log it and continue - Trace.TraceWarning( "Sending metrics via TCP failed with an IOException: {0}", ex.Message ); - } - } - catch ( SocketException ex ) - { - if ( attemptsLeft > 0 ) - { - SendWithRetry( line, --attemptsLeft ); - } - else - { - // No more attempts left, so log it and continue - Trace.TraceWarning( "Sending metrics via TCP failed with a SocketException: {0}, code: {1}", ex.Message, ex.SocketErrorCode.ToString() ); - } - } - } - private void RestoreConnection() - { - lock (_reconnectLock) - { - if (!_tcpClient.Connected) + private async Task SendWithRetryAsync(string line, int attemptsLeft) { - _tcpClient.Connect(_host, _port); - _stream = _tcpClient.GetStream(); + string errorMessage = null; + try + { + if (!_tcpClient.Connected) + { + await RestoreConnectionAsync(); + } + + var bytesToSend = Encoding.UTF8.GetBytes(line + Environment.NewLine); + await _stream.WriteAsync(bytesToSend, 0, bytesToSend.Length); + } + catch (IOException ex) + { + errorMessage = string.Format("Sending metrics via TCP failed with an IOException: {0}", ex.Message); + } + catch (SocketException ex) + { + // No more attempts left, so log it and continue + errorMessage = string.Format("Sending metrics via TCP failed with a SocketException: {0}, code: {1}", ex.Message, ex.SocketErrorCode); + } + + if (errorMessage != null) + { + if (attemptsLeft > 0) + { + await SendWithRetryAsync(line, --attemptsLeft); + } + else + { + // No more attempts left, so log it and continue + Trace.TraceWarning(errorMessage); + } + } + } + + private async Task RestoreConnectionAsync() + { + if (!_tcpClient.Connected) + { + using (await _asyncLock.LockAsync()) + { + if (!_tcpClient.Connected) + { + await _tcpClient.ConnectAsync(_host, _port); + _stream = _tcpClient.GetStream(); + } + } + } } - } } - } -} +} \ No newline at end of file diff --git a/StatsdClient/UdpOutputChannel.cs b/StatsdClient/UdpOutputChannel.cs index 0fad195..1b212df 100644 --- a/StatsdClient/UdpOutputChannel.cs +++ b/StatsdClient/UdpOutputChannel.cs @@ -1,34 +1,40 @@ -using System; -using System.Collections.Generic; -using System.Linq; +using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace StatsdClient { - internal sealed class UdpOutputChannel : IOutputChannel - { - private UdpClient _udpClient; - public Socket ClientSocket { get { return _udpClient.Client; } } - - public UdpOutputChannel(string hostOrIPAddress, int port) + internal sealed class UdpOutputChannel : IOutputChannel { - IPAddress ipAddress; - // Is this an IP address already? - if (!IPAddress.TryParse(hostOrIPAddress, out ipAddress)) - { - // Convert to ipv4 address - ipAddress = Dns.GetHostAddresses(hostOrIPAddress).First(p => p.AddressFamily == AddressFamily.InterNetwork); - } - _udpClient = new UdpClient(); - _udpClient.Connect(ipAddress, port); - } + private readonly UdpClient _udpClient; - public void Send(string line) - { - byte[] payload = Encoding.UTF8.GetBytes(line); - _udpClient.Send(payload, payload.Length); + public Socket ClientSocket + { + get + { + return _udpClient.Client; + } + } + + public UdpOutputChannel(string hostOrIPAddress, int port) + { + IPAddress ipAddress; + // Is this an IP address already? + if (!IPAddress.TryParse(hostOrIPAddress, out ipAddress)) + { + // Convert to ipv4 address + ipAddress = Dns.GetHostAddresses(hostOrIPAddress).First(p => p.AddressFamily == AddressFamily.InterNetwork); + } + _udpClient = new UdpClient(); + _udpClient.Connect(ipAddress, port); + } + + public async Task SendAsync(string line) + { + var payload = Encoding.UTF8.GetBytes(line); + await _udpClient.SendAsync(payload, payload.Length); + } } - } -} +} \ No newline at end of file