Elasticsearch-net: `Audit.Ended` value is not set for events coming through `RequestPipelineDiagnosticObserver`

Created on 1 Mar 2021  路  3Comments  路  Source: elastic/elasticsearch-net

NEST/Elasticsearch.Net version: 7.11.1

Elasticsearch version: 7.11.1

Description of the problem including expected versus actual behavior:
Diagnostic events that are coming through RequestPipelineDiagnosticObserver have their Audit.Ended values not set.

Steps to reproduce:

  1. Provide diagnostic listener and subscribe to DiagnosticSources.RequestPipeline.SourceName event using RequestPipelineDiagnosticObserver with an Action for onNextEnd.
  2. Create a new instance of ElasticLowLevelClient and make a request to Elastic, i.e. IndexAsync.
  3. Check incoming IApiCallDetails payloads and their AuditTrail objects.

Expected behavior
Audit.Ended value should be set correctly.


I'm the maintainer of https://github.com/romansp/MiniProfiler.Elasticsearch. My little plugin pushes audit events from ElasticClient or (its low-level version) into MiniProfiler as custom timings.

I would like to provide one more way of integrating via diagnostic listeners. It works fine but incoming AuditTrail events don't have a correct Ended value so I can't compute how much time did request take.

I did some digging into transport implementation here: https://github.com/elastic/elastic-transport-net/blob/85c235850a126a161560d4754941db377ca1c7e9/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs#L173. Seems that IApiCallDetails is pushed into diagnostics observables _before_ Auditable gets disposed thus its Ended isn't set yet.

Though events that are coming through ConnectionSettings.OnRequestCompleted are correct.


Silly repro

```c#
internal class TestDiagnosticListener : IObserver, IDisposable {
private ConcurrentBag Disposables { get; } = new ConcurrentBag();

public Action<IApiCallDetails> OnEnded { get; }

public TestDiagnosticListener(Action<IApiCallDetails> onEnded) {
    OnEnded = onEnded;
}

public void OnError(Exception error) { }
public void OnCompleted() { }

public void OnNext(DiagnosticListener value) {
    TrySubscribe(DiagnosticSources.RequestPipeline.SourceName,
        () => new RequestPipelineDiagnosticObserver(null, v => OnEnded(v.Value)), value);
}

private void TrySubscribe(string sourceName, Func<IObserver<KeyValuePair<string, object>>> listener, DiagnosticListener value) {
    if (value.Name != sourceName) return;
    var d = value.Subscribe(listener());

    Disposables.Add(d);
}

public void Dispose() {
    foreach (var d in Disposables) {
        d.Dispose();
    }
}

}

[Fact]
public async Task DiagnosticListener_AuditTrailIsValid() {
// Arrange
using var listener = new TestDiagnosticListener((data) => {
var auditTrailEvent = data.AuditTrail[0];
Assert.True((auditTrailEvent.Ended - auditTrailEvent.Started) >= TimeSpan.Zero);
});
using var foo = DiagnosticListener.AllListeners.Subscribe(listener);
var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionConfiguration(connectionPool, new InMemoryConnection());

var client = new ElasticLowLevelClient(settings);
var person = new { Id = "1" };

await client.IndexAsync<BytesResponse>("test-index", PostData.Serializable(person));

}
```

bug

All 3 comments

Apologies for the delay @romansp. I'll try to carve some time to look into this during the week. Thanks for reporting it.

@romansp I have a PR in which will resolve this. It makes sense that our Auditable wrapper expose a way to stop the Audit and set the end time more explicitly, rather than just on disposal.

PR is merged for 7.12.0 release.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

iderbyshev picture iderbyshev  路  3Comments

codebrain picture codebrain  路  3Comments

Mpdreamz picture Mpdreamz  路  3Comments

ejsmith picture ejsmith  路  3Comments

mausch picture mausch  路  4Comments