NEST/Elasticsearch.Net version: 7.11.1
Elasticsearch version: 7.11.1
Description of the problem including expected versus actual behavior:
Diagnostic events that are coming through RequestPipelineDiagnosticObserver have their Audit.Ended values not set.
Steps to reproduce:
DiagnosticSources.RequestPipeline.SourceName event using RequestPipelineDiagnosticObserver with an Action for onNextEnd.ElasticLowLevelClient and make a request to Elastic, i.e. IndexAsync.IApiCallDetails payloads and their AuditTrail objects.Expected behavior
Audit.Ended value should be set correctly.
I'm the maintainer of https://github.com/romansp/MiniProfiler.Elasticsearch. My little plugin pushes audit events from ElasticClient or (its low-level version) into MiniProfiler as custom timings.
I would like to provide one more way of integrating via diagnostic listeners. It works fine but incoming AuditTrail events don't have a correct Ended value so I can't compute how much time did request take.
I did some digging into transport implementation here: https://github.com/elastic/elastic-transport-net/blob/85c235850a126a161560d4754941db377ca1c7e9/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs#L173. Seems that IApiCallDetails is pushed into diagnostics observables _before_ Auditable gets disposed thus its Ended isn't set yet.
Though events that are coming through ConnectionSettings.OnRequestCompleted are correct.
Silly repro
```c#
internal class TestDiagnosticListener : IObserver
private ConcurrentBag
public Action<IApiCallDetails> OnEnded { get; }
public TestDiagnosticListener(Action<IApiCallDetails> onEnded) {
OnEnded = onEnded;
}
public void OnError(Exception error) { }
public void OnCompleted() { }
public void OnNext(DiagnosticListener value) {
TrySubscribe(DiagnosticSources.RequestPipeline.SourceName,
() => new RequestPipelineDiagnosticObserver(null, v => OnEnded(v.Value)), value);
}
private void TrySubscribe(string sourceName, Func<IObserver<KeyValuePair<string, object>>> listener, DiagnosticListener value) {
if (value.Name != sourceName) return;
var d = value.Subscribe(listener());
Disposables.Add(d);
}
public void Dispose() {
foreach (var d in Disposables) {
d.Dispose();
}
}
}
[Fact]
public async Task DiagnosticListener_AuditTrailIsValid() {
// Arrange
using var listener = new TestDiagnosticListener((data) => {
var auditTrailEvent = data.AuditTrail[0];
Assert.True((auditTrailEvent.Ended - auditTrailEvent.Started) >= TimeSpan.Zero);
});
using var foo = DiagnosticListener.AllListeners.Subscribe(listener);
var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionConfiguration(connectionPool, new InMemoryConnection());
var client = new ElasticLowLevelClient(settings);
var person = new { Id = "1" };
await client.IndexAsync<BytesResponse>("test-index", PostData.Serializable(person));
}
```
Apologies for the delay @romansp. I'll try to carve some time to look into this during the week. Thanks for reporting it.
@romansp I have a PR in which will resolve this. It makes sense that our Auditable wrapper expose a way to stop the Audit and set the end time more explicitly, rather than just on disposal.
PR is merged for 7.12.0 release.