From 929047d5eef58f238e2162ac6a2fddb55d2b985b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Zavark=C3=B3?= Date: Thu, 18 May 2023 15:51:37 +0200 Subject: [PATCH 01/11] Change the type of Exchange and Routing key to ROM --- .../AsyncBasicConsumerFake.cs | 4 ++-- .../ConsumerDispatching/ConsumerDispatcher.cs | 12 ++++++----- .../WireFormatting/MethodSerialization.cs | 2 +- .../client/api/AsyncDefaultBasicConsumer.cs | 6 +++--- .../client/api/DefaultBasicConsumer.cs | 4 ++-- .../client/api/IAsyncBasicConsumer.cs | 4 ++-- .../client/api/IBasicConsumer.cs | 4 ++-- .../events/AsyncEventingBasicConsumer.cs | 2 +- .../client/events/BasicDeliverEventArgs.cs | 8 +++---- .../client/events/EventingBasicConsumer.cs | 2 +- .../client/framing/BasicDeliver.cs | 16 +++++++------- .../client/impl/ChannelBase.cs | 4 ++-- .../AsyncConsumerDispatcher.cs | 5 +++++ .../ConsumerDispatching/ConsumerDispatcher.cs | 5 +++++ .../ConsumerDispatcherChannelBase.cs | 12 ++++++----- .../ConsumerDispatching/FallbackConsumer.cs | 4 ++-- .../IConsumerDispatcher.cs | 5 +++-- .../client/impl/IncomingCommand.cs | 5 +++++ .../client/impl/WireFormatting.Read.cs | 21 +++++++++++++++++++ .../Unit/APIApproval.Approve.verified.txt | 16 +++++++------- projects/Unit/TestAsyncConsumerExceptions.cs | 4 ++-- projects/Unit/TestConnectionRecovery.cs | 4 ++-- projects/Unit/TestConsumerExceptions.cs | 4 ++-- .../Unit/TestConsumerOperationDispatch.cs | 2 +- projects/Unit/TestMainLoop.cs | 4 ++-- 25 files changed, 100 insertions(+), 59 deletions(-) diff --git a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs index 573db2a277..2d211762cd 100644 --- a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs +++ b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs @@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent) _autoResetEvent = autoResetEvent; } - public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { if (Interlocked.Increment(ref _current) == Count) { @@ -28,7 +28,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel return Task.CompletedTask; } - void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { if (Interlocked.Increment(ref _current) == Count) diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index b751091c4d..028396007e 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System.Text; +using System.Threading; using BenchmarkDotNet.Attributes; using RabbitMQ.Client; using RabbitMQ.Client.ConsumerDispatching; @@ -15,9 +16,10 @@ public class ConsumerDispatcherBase private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent); protected readonly string _consumerTag = "ConsumerTag"; protected readonly ulong _deliveryTag = 500UL; - protected readonly string _exchange = "Exchange"; - protected readonly string _routingKey = "RoutingKey"; + protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange"); + protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey"); protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties(); + protected readonly byte[] _method = new byte[512]; protected readonly byte[] _body = new byte[512]; } @@ -41,7 +43,7 @@ public void AsyncConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body); + _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); @@ -59,7 +61,7 @@ public void ConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body); + _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); diff --git a/projects/Benchmarks/WireFormatting/MethodSerialization.cs b/projects/Benchmarks/WireFormatting/MethodSerialization.cs index 1a67657c80..eb406e98d1 100644 --- a/projects/Benchmarks/WireFormatting/MethodSerialization.cs +++ b/projects/Benchmarks/WireFormatting/MethodSerialization.cs @@ -45,7 +45,7 @@ public override void SetUp() } [Benchmark] - public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead + public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead [Benchmark] public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span); diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index a0c0af161a..8d9d9b2e11 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -110,8 +110,8 @@ public virtual Task HandleBasicConsumeOk(string consumerTag) public virtual Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { @@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); } - void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'."); } diff --git a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs index 15b40ea808..15dd570a24 100644 --- a/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs @@ -150,8 +150,8 @@ public virtual void HandleBasicConsumeOk(string consumerTag) public virtual void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs index 291eda1293..364189747a 100644 --- a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs @@ -49,8 +49,8 @@ public interface IAsyncBasicConsumer Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body); diff --git a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs index d5244e973e..ca1a22c706 100644 --- a/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IBasicConsumer.cs @@ -92,8 +92,8 @@ public interface IBasicConsumer void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body); diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 2dc046badc..d3fec5296a 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -71,7 +71,7 @@ public override async Task HandleBasicConsumeOk(string consumerTag) } ///Fires the Received event. - public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { // No need to call base, it's empty. return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); diff --git a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs index fbd497bb0f..fccf38e051 100644 --- a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs @@ -47,8 +47,8 @@ public BasicDeliverEventArgs() public BasicDeliverEventArgs(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { @@ -77,13 +77,13 @@ public BasicDeliverEventArgs(string consumerTag, ///The exchange the message was originally published ///to. - public string Exchange { get; set; } + public ReadOnlyMemory Exchange { get; set; } ///The AMQP "redelivered" flag. public bool Redelivered { get; set; } ///The routing key used when the message was ///originally published. - public string RoutingKey { get; set; } + public ReadOnlyMemory RoutingKey { get; set; } } } diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs index 681247ea56..d06d5c3c1f 100644 --- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs @@ -84,7 +84,7 @@ public override void HandleBasicConsumeOk(string consumerTag) /// Accessing the body at a later point is unsafe as its memory can /// be already released. /// - public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); Received?.Invoke( diff --git a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs index ebc49cd4da..67dfc38493 100644 --- a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs @@ -41,16 +41,16 @@ namespace RabbitMQ.Client.Framing.Impl public readonly string _consumerTag; public readonly ulong _deliveryTag; public readonly bool _redelivered; - public readonly string _exchange; - public readonly string _routingKey; + public readonly ReadOnlyMemory _exchange; + public readonly ReadOnlyMemory _routingKey; - public BasicDeliver(ReadOnlySpan span) + public BasicDeliver(ReadOnlyMemory data) { - int offset = WireFormatting.ReadShortstr(span, out _consumerTag); - offset += WireFormatting.ReadLonglong(span.Slice(offset), out _deliveryTag); - offset += WireFormatting.ReadBits(span.Slice(offset), out _redelivered); - offset += WireFormatting.ReadShortstr(span.Slice(offset), out _exchange); - WireFormatting.ReadShortstr(span.Slice(offset), out _routingKey); + int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag); + offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag); + offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered); + offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange); + WireFormatting.ReadShortMemory(data.Slice(offset), out _routingKey); } public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicDeliver; diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index decf6ac84f..ab6973fc73 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -600,8 +600,7 @@ protected void HandleBasicConsumeOk(in IncomingCommand cmd) protected void HandleBasicDeliver(in IncomingCommand cmd) { - var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes.Span); - cmd.ReturnMethodBuffer(); + var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes); var header = new ReadOnlyBasicProperties(cmd.HeaderBytes.Span); cmd.ReturnHeaderBuffer(); @@ -613,6 +612,7 @@ protected void HandleBasicDeliver(in IncomingCommand cmd) method._routingKey, header, cmd.Body, + cmd.TakeoverMethod(), cmd.TakeoverBody()); } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index b1f80582dc..d6bf1c2c08 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -39,6 +39,11 @@ protected override async Task ProcessChannelAsync() } finally { + if (work.RentedMethodArray != null) + { + ArrayPool.Shared.Return(work.RentedMethodArray); + } + if (work.RentedArray != null) { ArrayPool.Shared.Return(work.RentedArray); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs index 09aa5cc627..9b47f58f50 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs @@ -49,6 +49,11 @@ protected override async Task ProcessChannelAsync() } finally { + if (work.RentedMethodArray != null) + { + ArrayPool.Shared.Return(work.RentedMethodArray); + } + if (work.RentedArray != null) { ArrayPool.Shared.Return(work.RentedArray); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index f53218e6bd..3d1a7174d4 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -54,11 +54,11 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag) } public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedArray) + ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedMethodArray, byte[] rentedArray) { if (!IsShutdown) { - _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray)); + _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray)); } } @@ -108,10 +108,11 @@ protected readonly struct WorkStruct public readonly string? ConsumerTag; public readonly ulong DeliveryTag; public readonly bool Redelivered; - public readonly string? Exchange; - public readonly string? RoutingKey; + public readonly ReadOnlyMemory Exchange; + public readonly ReadOnlyMemory RoutingKey; public readonly ReadOnlyBasicProperties BasicProperties; public readonly ReadOnlyMemory Body; + public readonly byte[]? RentedMethodArray; public readonly byte[]? RentedArray; public readonly ShutdownEventArgs? Reason; public readonly WorkType WorkType; @@ -133,7 +134,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason) } public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedArray) + ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedMethodArray, byte[] rentedArray) { WorkType = WorkType.Deliver; Consumer = consumer; @@ -144,6 +145,7 @@ public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag RoutingKey = routingKey; BasicProperties = basicProperties; Body = body; + RentedMethodArray = rentedMethodArray; RentedArray = rentedArray; Reason = default; } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs index 3510be5e5d..73b99bd8a2 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs @@ -37,7 +37,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag) ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicConsumeOk)} for tag {consumerTag}"); } - void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, + void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicDeliver)} for tag {consumerTag}"); @@ -66,7 +66,7 @@ Task IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag) return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, + Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { ((IBasicConsumer)this).HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index ba7c462529..dda93e7577 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -48,10 +48,11 @@ internal interface IConsumerDispatcher void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, + byte[] rentedMethodArray, byte[] rentedArray); void HandleBasicCancelOk(string consumerTag); diff --git a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs index 5f3aa7fbb8..b8d33ec2a4 100644 --- a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs +++ b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs @@ -32,6 +32,11 @@ public IncomingCommand(ProtocolCommandId commandId, ReadOnlyMemory methodB _rentedBodyArray = rentedBodyArray; } + public byte[] TakeoverMethod() + { + return _rentedMethodBytes; + } + public byte[] TakeoverBody() { return _rentedBodyArray; diff --git a/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs b/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs index cd5aef51d4..2d40f85462 100644 --- a/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs +++ b/projects/RabbitMQ.Client/client/impl/WireFormatting.Read.cs @@ -192,6 +192,27 @@ public static int ReadShortstr(ReadOnlySpan span, out string value) return ThrowArgumentOutOfRangeException(span.Length, byteCount + 1); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int ReadShortMemory(ReadOnlyMemory data, out ReadOnlyMemory value) + { + int byteCount = data.Span[0]; + if (byteCount == 0) + { + value = default; + return 1; + } + + // equals data.Length >= byteCount + 1 + if (data.Length > byteCount) + { + value = data.Slice(1, byteCount); + return 1 + byteCount; + } + + value = default; + return ThrowArgumentOutOfRangeException(data.Length, byteCount + 1); + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int ReadBits(ReadOnlySpan span, out bool val) { diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index cd3dda3e7e..9ab0a422c7 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -49,7 +49,7 @@ namespace RabbitMQ.Client public virtual System.Threading.Tasks.Task HandleBasicCancel(string consumerTag) { } public virtual System.Threading.Tasks.Task HandleBasicCancelOk(string consumerTag) { } public virtual System.Threading.Tasks.Task HandleBasicConsumeOk(string consumerTag) { } - public virtual System.Threading.Tasks.Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } + public virtual System.Threading.Tasks.Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } public virtual System.Threading.Tasks.Task HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) { } public virtual System.Threading.Tasks.Task OnCancel(params string[] consumerTags) { } } @@ -248,7 +248,7 @@ namespace RabbitMQ.Client public virtual void HandleBasicCancel(string consumerTag) { } public virtual void HandleBasicCancelOk(string consumerTag) { } public virtual void HandleBasicConsumeOk(string consumerTag) { } - public virtual void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } + public virtual void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } public virtual void HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) { } public virtual void OnCancel(params string[] consumerTags) { } } @@ -321,7 +321,7 @@ namespace RabbitMQ.Client System.Threading.Tasks.Task HandleBasicCancel(string consumerTag); System.Threading.Tasks.Task HandleBasicCancelOk(string consumerTag); System.Threading.Tasks.Task HandleBasicConsumeOk(string consumerTag); - System.Threading.Tasks.Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body); + System.Threading.Tasks.Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body); System.Threading.Tasks.Task HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason); } public interface IAuthMechanism @@ -340,7 +340,7 @@ namespace RabbitMQ.Client void HandleBasicCancel(string consumerTag); void HandleBasicCancelOk(string consumerTag); void HandleBasicConsumeOk(string consumerTag); - void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body); + void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body); void HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason); } public interface IBasicProperties : RabbitMQ.Client.IReadOnlyBasicProperties @@ -785,14 +785,14 @@ namespace RabbitMQ.Client.Events public class BasicDeliverEventArgs : System.EventArgs { public BasicDeliverEventArgs() { } - public BasicDeliverEventArgs(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } + public BasicDeliverEventArgs(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } public RabbitMQ.Client.ReadOnlyBasicProperties BasicProperties { get; set; } public System.ReadOnlyMemory Body { get; set; } public string ConsumerTag { get; set; } public ulong DeliveryTag { get; set; } - public string Exchange { get; set; } + public System.ReadOnlyMemory Exchange { get; set; } public bool Redelivered { get; set; } - public string RoutingKey { get; set; } + public System.ReadOnlyMemory RoutingKey { get; set; } } public class BasicNackEventArgs : System.EventArgs { @@ -847,7 +847,7 @@ namespace RabbitMQ.Client.Events public event System.EventHandler Unregistered; public override void HandleBasicCancelOk(string consumerTag) { } public override void HandleBasicConsumeOk(string consumerTag) { } - public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } + public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } public override void HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) { } } public class FlowControlEventArgs : System.EventArgs diff --git a/projects/Unit/TestAsyncConsumerExceptions.cs b/projects/Unit/TestAsyncConsumerExceptions.cs index d4f1a35798..99a1c60198 100644 --- a/projects/Unit/TestAsyncConsumerExceptions.cs +++ b/projects/Unit/TestAsyncConsumerExceptions.cs @@ -125,8 +125,8 @@ public ConsumerFailingOnDelivery(IChannel channel) : base(channel) public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index e6f770a71b..9d01cacc61 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -1785,8 +1785,8 @@ public TestBasicConsumer(IChannel channel, ushort totalMessageCount, ManualReset public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/Unit/TestConsumerExceptions.cs b/projects/Unit/TestConsumerExceptions.cs index efd19bc237..099a23522f 100644 --- a/projects/Unit/TestConsumerExceptions.cs +++ b/projects/Unit/TestConsumerExceptions.cs @@ -49,8 +49,8 @@ public ConsumerFailingOnDelivery(IChannel channel) : base(channel) public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { diff --git a/projects/Unit/TestConsumerOperationDispatch.cs b/projects/Unit/TestConsumerOperationDispatch.cs index d8f75a1c76..49714f0182 100644 --- a/projects/Unit/TestConsumerOperationDispatch.cs +++ b/projects/Unit/TestConsumerOperationDispatch.cs @@ -87,7 +87,7 @@ public CollectingConsumer(IChannel channel) } public override void HandleBasicDeliver(string consumerTag, - ulong deliveryTag, bool redelivered, string exchange, string routingKey, + ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { // we test concurrent dispatch from the moment basic.delivery is returned. diff --git a/projects/Unit/TestMainLoop.cs b/projects/Unit/TestMainLoop.cs index a2fa183a6f..50194d34d6 100644 --- a/projects/Unit/TestMainLoop.cs +++ b/projects/Unit/TestMainLoop.cs @@ -53,8 +53,8 @@ public FaultyConsumer(IChannel channel) : base(channel) { } public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { From 7c034d0c91a66c6cc2e2a0819bd9adba402bc707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Zavark=C3=B3?= Date: Thu, 18 May 2023 15:54:21 +0200 Subject: [PATCH 02/11] Do not create consumerTag string in HandleBasicDeliver when conumer exists in the dictionary --- .../ConsumerDispatching/ConsumerDispatcher.cs | 5 +- .../client/framing/BasicDeliver.cs | 4 +- .../ConsumerDispatcherBase.cs | 52 +++++++++++++++---- .../ConsumerDispatcherChannelBase.cs | 5 +- .../IConsumerDispatcher.cs | 2 +- .../util/MemoryOfByteEqualityComparer.cs | 41 +++++++++++++++ 6 files changed, 93 insertions(+), 16 deletions(-) create mode 100644 projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index 028396007e..d25d7ba106 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -15,6 +15,7 @@ public class ConsumerDispatcherBase private protected IConsumerDispatcher _dispatcher; private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent); protected readonly string _consumerTag = "ConsumerTag"; + protected static readonly byte[] _consumerTagBytes = Encoding.UTF8.GetBytes("ConsumerTag"); protected readonly ulong _deliveryTag = 500UL; protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange"); protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey"); @@ -43,7 +44,7 @@ public void AsyncConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); + _dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); @@ -61,7 +62,7 @@ public void ConsumerDispatcher() { for (int i = 0; i < Count; i++) { - _dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); + _dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _method, _body); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); diff --git a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs index 67dfc38493..b0664d97fa 100644 --- a/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/framing/BasicDeliver.cs @@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl { internal readonly struct BasicDeliver : IAmqpMethod { - public readonly string _consumerTag; + public readonly ReadOnlyMemory _consumerTag; public readonly ulong _deliveryTag; public readonly bool _redelivered; public readonly ReadOnlyMemory _exchange; @@ -46,7 +46,7 @@ namespace RabbitMQ.Client.Framing.Impl public BasicDeliver(ReadOnlyMemory data) { - int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag); + int offset = WireFormatting.ReadShortMemory(data, out _consumerTag); offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag); offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered); offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index c6b90f4287..87ba94ff5f 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -1,6 +1,10 @@ -using System.Collections.Generic; +using System; +using System.Buffers; +using System.Collections.Generic; using System.Runtime.CompilerServices; +using System.Text; using System.Threading.Tasks; +using RabbitMQ.Util; namespace RabbitMQ.Client.ConsumerDispatching { @@ -8,28 +12,47 @@ namespace RabbitMQ.Client.ConsumerDispatching internal abstract class ConsumerDispatcherBase { private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer(); - private readonly Dictionary _consumers; + private readonly Dictionary, (IBasicConsumer consumer, string consumerTag)> _consumers; public IBasicConsumer? DefaultConsumer { get; set; } protected ConsumerDispatcherBase() { - _consumers = new Dictionary(); + _consumers = new Dictionary, (IBasicConsumer, string)>(MemoryOfByteEqualityComparer.Instance); } protected void AddConsumer(IBasicConsumer consumer, string tag) { lock (_consumers) { - _consumers[tag] = consumer; + var tagBytes = Encoding.UTF8.GetBytes(tag); + _consumers[tagBytes] = (consumer, tag); } } - protected IBasicConsumer GetConsumerOrDefault(string tag) + protected (IBasicConsumer consumer, string consumerTag) GetConsumerOrDefault(ReadOnlyMemory tag) { lock (_consumers) { - return _consumers.TryGetValue(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer(); + if (_consumers.TryGetValue(tag, out var consumerPair)) + { + return consumerPair; + } + +#if !NETSTANDARD + var consumerTag = Encoding.UTF8.GetString(tag.Span); +#else + string consumerTag; + unsafe + { + fixed (byte* bytes = tag.Span) + { + consumerTag = Encoding.UTF8.GetString(bytes, tag.Length); + } + } +#endif + + return (GetDefaultOrFallbackConsumer(), consumerTag); } } @@ -37,7 +60,18 @@ public IBasicConsumer GetAndRemoveConsumer(string tag) { lock (_consumers) { - return _consumers.Remove(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer(); + var utf8 = Encoding.UTF8; + var pool = ArrayPool.Shared; + var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length)); +#if NETSTANDARD + int count = utf8.GetBytes(tag, 0, tag.Length, buf, 0); +#else + int count = utf8.GetBytes(tag, buf); +#endif + var memory = buf.AsMemory(0, count); + var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer(); + pool.Return(buf); + return result; } } @@ -45,9 +79,9 @@ public Task ShutdownAsync(ShutdownEventArgs reason) { lock (_consumers) { - foreach (KeyValuePair pair in _consumers) + foreach (KeyValuePair, (IBasicConsumer consumer, string consumerTag)> pair in _consumers) { - ShutdownConsumer(pair.Value, reason); + ShutdownConsumer(pair.Value.consumer, reason); } _consumers.Clear(); } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 3d1a7174d4..9c0e5926ff 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -53,12 +53,13 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag) } } - public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, + public void HandleBasicDeliver(ReadOnlyMemory consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, ReadOnlyMemory routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory body, byte[] rentedMethodArray, byte[] rentedArray) { if (!IsShutdown) { - _writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray)); + var consumerPair = GetConsumerOrDefault(consumerTag); + _writer.TryWrite(new WorkStruct(consumerPair.consumer, consumerPair.consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedMethodArray, rentedArray)); } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index dda93e7577..43c6b3e03c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -45,7 +45,7 @@ internal interface IConsumerDispatcher void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag); - void HandleBasicDeliver(string consumerTag, + void HandleBasicDeliver(ReadOnlyMemory consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory exchange, diff --git a/projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs b/projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs new file mode 100644 index 0000000000..4504980c3f --- /dev/null +++ b/projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; + +namespace RabbitMQ.Util; + +internal sealed class MemoryOfByteEqualityComparer : IEqualityComparer> +{ + public static MemoryOfByteEqualityComparer Instance { get; } = new MemoryOfByteEqualityComparer(); + + public bool Equals(ReadOnlyMemory left, ReadOnlyMemory right) + { + return left.Span.SequenceEqual(right.Span); + } + + public int GetHashCode(ReadOnlyMemory value) + { +#if NETSTANDARD + unchecked + { + int hashCode = 0; + var longPart = MemoryMarshal.Cast(value.Span); + foreach (long item in longPart) + { + hashCode = (hashCode * 397) ^ item.GetHashCode(); + } + + foreach (int item in value.Span.Slice(longPart.Length * 8)) + { + hashCode = (hashCode * 397) ^ item.GetHashCode(); + } + + return hashCode; + } +#else + HashCode result = default; + result.AddBytes(value.Span); + return result.ToHashCode(); +#endif + } +} From c1fb7bf51a366860788f0c797b532360fabec6db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Zavark=C3=B3?= Date: Thu, 18 May 2023 18:38:11 +0200 Subject: [PATCH 03/11] APIApproval fix --- projects/Unit/APIApproval.Approve.verified.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index 9ab0a422c7..7c51486ef6 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -767,7 +767,7 @@ namespace RabbitMQ.Client.Events public event RabbitMQ.Client.Events.AsyncEventHandler Unregistered; public override System.Threading.Tasks.Task HandleBasicCancelOk(string consumerTag) { } public override System.Threading.Tasks.Task HandleBasicConsumeOk(string consumerTag) { } - public override System.Threading.Tasks.Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } + public override System.Threading.Tasks.Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, System.ReadOnlyMemory exchange, System.ReadOnlyMemory routingKey, in RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) { } public override System.Threading.Tasks.Task HandleChannelShutdown(object channel, RabbitMQ.Client.ShutdownEventArgs reason) { } } public abstract class BaseExceptionEventArgs : System.EventArgs From 23f381dcccfbc6c9afb6b72f9cc0e585825e4acc Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 10 May 2024 09:07:18 -0700 Subject: [PATCH 04/11] * Post-merge fixes. --- .../ConsumerDispatching/ConsumerDispatcher.cs | 8 ++++---- projects/RabbitMQ.Client/client/impl/ChannelBase.cs | 2 +- .../RabbitMQ.Client/client/impl/IncomingCommand.cs | 8 ++++++++ .../client/impl/RabbitMQActivitySource.cs | 13 ++++++++++--- projects/Test/Common/TestConnectionRecoveryBase.cs | 4 ++-- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs index cd75980cde..5d657be8d9 100644 --- a/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs @@ -55,8 +55,8 @@ public async Task AsyncConsumerDispatcher() { for (int i = 0; i < Count; i++) { - await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body, - CancellationToken.None); + await _dispatcher.HandleBasicDeliverAsync(_consumerTagBytes, _deliveryTag, + false, _exchange, _routingKey, _properties, body, CancellationToken.None); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); @@ -78,8 +78,8 @@ public async Task ConsumerDispatcher() { for (int i = 0; i < Count; i++) { - await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body, - CancellationToken.None); + await _dispatcher.HandleBasicDeliverAsync(_consumerTagBytes, _deliveryTag, + false, _exchange, _routingKey, _properties, body, CancellationToken.None); } _autoResetEvent.Wait(); _autoResetEvent.Reset(); diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index fd6f4d4e71..e791bca41b 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -670,7 +670,7 @@ protected async Task HandleBasicDeliverAsync(IncomingCommand cmd, Cancella { try { - var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodSpan); + var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodMemory); var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); await ConsumerDispatcher.HandleBasicDeliverAsync( method._consumerTag, diff --git a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs index d5fb43d016..3e16aa1823 100644 --- a/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs +++ b/projects/RabbitMQ.Client/client/impl/IncomingCommand.cs @@ -55,6 +55,14 @@ public IncomingCommand(ProtocolCommandId commandId, Body = body; } + public ReadOnlyMemory MethodMemory + { + get + { + return Method.Memory; + } + } + public ReadOnlySpan MethodSpan { get diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index 5d76f10f37..0887b189b0 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -117,18 +117,25 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) return null; } + // TODO + string routingKey = Encoding.UTF8.GetString(deliverEventArgs.RoutingKey.ToArray()); + string exchange = Encoding.UTF8.GetString(deliverEventArgs.Exchange.ToArray()); + // Extract the PropagationContext of the upstream parent from the message headers. DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties.Headers, ExtractTraceIdAndState, out string traceparent, out string traceState); + ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext); + Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( - UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver", + UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver", ActivityKind.Consumer, parentContext); + if (activity != null && activity.IsAllDataRequested) { PopulateMessagingTags("deliver", - deliverEventArgs.RoutingKey, - deliverEventArgs.Exchange, + routingKey, + exchange, deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length, diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index e8eb2884e7..a381c1c346 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -318,8 +318,8 @@ public TestBasicConsumer(IChannel channel, ushort totalMessageCount, TaskComplet public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, - string routingKey, + ReadOnlyMemory exchange, + ReadOnlyMemory routingKey, ReadOnlyBasicProperties properties, ReadOnlyMemory body) { From 67a85e751075dd2392a9721ce41a57cbf223ecc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Zavark=C3=B3?= Date: Fri, 10 May 2024 19:30:59 +0200 Subject: [PATCH 05/11] create string only when neede in RabbitMQActivitySource --- .../client/impl/RabbitMQActivitySource.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index 0887b189b0..a5cbfec2d6 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -117,22 +117,25 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) return null; } - // TODO - string routingKey = Encoding.UTF8.GetString(deliverEventArgs.RoutingKey.ToArray()); - string exchange = Encoding.UTF8.GetString(deliverEventArgs.Exchange.ToArray()); - // Extract the PropagationContext of the upstream parent from the message headers. DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties.Headers, ExtractTraceIdAndState, out string traceparent, out string traceState); ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext); + string routingKey = UseRoutingKeyAsOperationName ? Encoding.UTF8.GetString(deliverEventArgs.RoutingKey.Span) : null; Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver", ActivityKind.Consumer, parentContext); if (activity != null && activity.IsAllDataRequested) { + string exchange = Encoding.UTF8.GetString(deliverEventArgs.Exchange.Span); + if (routingKey == null) + { + routingKey = Encoding.UTF8.GetString(deliverEventArgs.RoutingKey.Span); + } + PopulateMessagingTags("deliver", routingKey, exchange, From 31433b98625c8f572bbf9b563b7d5f25e7ed2b50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Zavark=C3=B3?= Date: Sat, 11 May 2024 18:08:00 +0200 Subject: [PATCH 06/11] netstandard 2.0 fix --- .../client/impl/RabbitMQActivitySource.cs | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index a5cbfec2d6..155065ca65 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -1,8 +1,10 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Net.Sockets; using System.Reflection; +using System.Runtime.CompilerServices; using System.Text; using RabbitMQ.Client.Events; using RabbitMQ.Client.Impl; @@ -110,6 +112,23 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv return activity; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static string GetString(ReadOnlySpan span) + { +#if NETSTANDARD + unsafe + { + fixed (byte* bytesPtr = span) + { + return Encoding.UTF8.GetString(bytesPtr, span.Length); + } + } +#else + return Encoding.UTF8.GetString(span); +#endif + } + + internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) { if (!s_subscriberSource.HasListeners()) @@ -123,17 +142,17 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext); - string routingKey = UseRoutingKeyAsOperationName ? Encoding.UTF8.GetString(deliverEventArgs.RoutingKey.Span) : null; + string routingKey = UseRoutingKeyAsOperationName ? GetString(deliverEventArgs.RoutingKey.Span) : null; Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{routingKey} deliver" : "deliver", ActivityKind.Consumer, parentContext); if (activity != null && activity.IsAllDataRequested) { - string exchange = Encoding.UTF8.GetString(deliverEventArgs.Exchange.Span); + string exchange = GetString(deliverEventArgs.Exchange.Span); if (routingKey == null) { - routingKey = Encoding.UTF8.GetString(deliverEventArgs.RoutingKey.Span); + routingKey = GetString(deliverEventArgs.RoutingKey.Span); } PopulateMessagingTags("deliver", From 3db0f40c4eb752a23c129061213fd601d93adc26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Zavark=C3=B3?= Date: Sun, 12 May 2024 11:07:37 +0200 Subject: [PATCH 07/11] fix for empty routing keys --- .../RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index 155065ca65..99cd4c766f 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -116,6 +116,11 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv private static string GetString(ReadOnlySpan span) { #if NETSTANDARD + if (span.Length == 0) + { + return string.Empty; + } + unsafe { fixed (byte* bytesPtr = span) From 547c778f44801a0d9a3090ff40ddb5c18c133007 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 13 May 2024 08:42:18 -0700 Subject: [PATCH 08/11] Remove DEBUG from two exception messages --- projects/RabbitMQ.Client/client/impl/MainSession.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/MainSession.cs b/projects/RabbitMQ.Client/client/impl/MainSession.cs index 3aa3bef75c..c64b2ca60c 100644 --- a/projects/RabbitMQ.Client/client/impl/MainSession.cs +++ b/projects/RabbitMQ.Client/client/impl/MainSession.cs @@ -106,7 +106,7 @@ public void SetSessionClosing(bool closeIsServerInitiated) } else { - throw new InvalidOperationException("[DEBUG] couldn't enter semaphore"); + throw new InvalidOperationException("couldn't enter semaphore"); } } @@ -129,7 +129,7 @@ public async Task SetSessionClosingAsync(bool closeIsServerInitiated) } else { - throw new InvalidOperationException("[DEBUG] couldn't async enter semaphore"); + throw new InvalidOperationException("couldn't async enter semaphore"); } } From 4e1156bc33e594e49d70b57d4789eca344c2689d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 13 May 2024 09:02:54 -0700 Subject: [PATCH 09/11] Catch all exceptions when disposing ToxiproxyManager --- projects/Test/Integration/ToxiproxyManager.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/projects/Test/Integration/ToxiproxyManager.cs b/projects/Test/Integration/ToxiproxyManager.cs index b1d581623e..cb58ca1749 100644 --- a/projects/Test/Integration/ToxiproxyManager.cs +++ b/projects/Test/Integration/ToxiproxyManager.cs @@ -116,8 +116,14 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - _proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult(); - _proxyConnection.Dispose(); + try + { + _proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult(); + _proxyConnection.Dispose(); + } + catch + { + } } _disposedValue = true; From c502374ef3c4a31f351ef2a33eb50d831954441b Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 21 May 2024 16:13:03 -0700 Subject: [PATCH 10/11] Fix ShutdownInitiator in `CloseAsync` `AutorecoveringChannel` `CloseAsync` should just call `_innerChannel.CloseAsync` --- projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs | 3 +-- projects/Test/Applications/MassPublish/Program.cs | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 19d60e7479..f0c6e09628 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -204,8 +204,7 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { ThrowIfDisposed(); - var args = new ShutdownEventArgs(ShutdownInitiator.Library, replyCode, replyText); - return CloseAsync(args, abort, cancellationToken); + return _innerChannel.CloseAsync(replyCode, replyText, abort, cancellationToken); } public async Task CloseAsync(ShutdownEventArgs args, bool abort, diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index a3938b69a8..57bcdd9b72 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -156,6 +156,7 @@ await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: Routi } await consumeChannel.CloseAsync(); + await consumeConnection.CloseAsync(); } private static void PublishChannel_BasicNacks(object sender, BasicNackEventArgs e) From f445f2d363734154d71f6a126ae4b36bbecba7dd Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 21 May 2024 16:40:49 -0700 Subject: [PATCH 11/11] fixup --- .../client/impl/AutorecoveringChannel.cs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index f0c6e09628..317099a078 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -200,11 +200,21 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph _innerChannel.RunRecoveryEventHandlers(this); } - public Task CloseAsync(ushort replyCode, string replyText, bool abort, + public async Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { ThrowIfDisposed(); - return _innerChannel.CloseAsync(replyCode, replyText, abort, cancellationToken); + try + { + await _innerChannel.CloseAsync(replyCode, replyText, abort, cancellationToken) + .ConfigureAwait(false); + } + finally + { + await _connection.DeleteRecordedChannelAsync(this, + channelsSemaphoreHeld: false, recordedEntitiesSemaphoreHeld: false) + .ConfigureAwait(false); + } } public async Task CloseAsync(ShutdownEventArgs args, bool abort,