Telegraf: Telegraf MQTT input exits if Broker is not available on startup

Created on 24 Aug 2017  路  8Comments  路  Source: influxdata/telegraf

Bug report

Relevant telegraf.conf:

 [[inputs.mqtt_consumer]]
  servers = ["192.168.178.46:1883"]
  ## MQTT QoS, must be 0, 1, or 2
  qos = 0

  ## Topics to subscribe to
  topics = [
    "/home/+/metrics"
  ]
  persistent_session = false
  # If empty, a random client ID will be generated.
  client_id = ""
  data_format = "influx"

[[outputs.influxdb]]
  ## The full HTTP or UDP endpoint URL for your InfluxDB instance.
  ## Multiple urls can be specified as part of the same cluster,
  ## this means that only ONE of the urls will be written to each interval.
  # urls = ["udp://localhost:8089"] # UDP endpoint example
  urls = ["http://influxdb:8086"] # required
  ## The target database for metrics (telegraf will create it if not exists).
  database = "home" # required

  ## Retention policy to write to. Empty string writes to the default rp.
  retention_policy = ""
  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
  write_consistency = "any"

  ## Write timeout (for the InfluxDB client), formatted as a string.
  ## If not provided, will default to 5s. 0s means no timeout (not recommended).
  timeout = "30s"

System info:

latest docker container pulled from docker hub.

Steps to reproduce:

docker run -v "$(pwd)/telegraf.conf:/etc/telegraf/telegraf.conf:ro" telegraf:latest

Expected behavior:

telegraf tries to reconnect until it reaches a configurable timeout.

Actual behavior:

telegraf exists after a couple of seconds. This is not enough time for a mqtt broker to start up in a docker-compose scenario. The timeout can not be extended by the configuration defined in mqtt_consumer.go
I believe there is only one connection attempt being made. I'd like to be able to start my whole stack consisting of influxdb, telegraf, the mqtt broker and grafana to be launched in a single docker-compose stack. As of now this is unfortunately not possible.

Additional info:

% docker run -it -v "$(pwd)/telegraf.conf:/etc/telegraf/telegraf.conf:ro" telegraf:latest                                                          
2017/08/24 12:26:31 I! Using config file: /etc/telegraf/telegraf.conf
2017-08-24T12:26:37Z I! Database creation failed: Post http://influxdb:8086/query?q=CREATE+DATABASE+%22home%22: dial tcp: lookup influxdb on 192.168.3.9:53: server misbehaving
2017-08-24T12:26:37Z I! Starting Telegraf (version 1.3.5)
2017-08-24T12:26:37Z I! Loaded outputs: influxdb
2017-08-24T12:26:37Z I! Loaded inputs: inputs.mqtt_consumer
2017-08-24T12:26:37Z I! Tags enabled: host=639a81d8ad20
2017-08-24T12:26:37Z I! Agent Config: Interval:10s, Quiet:false, Hostname:"639a81d8ad20", Flush Interval:10s 
2017-08-24T12:27:07Z E! Service for input inputs.mqtt_consumer failed to start, exiting
Network Error : dial tcp 192.168.178.46:1883: i/o timeout

bug

Most helpful comment

Thanks @DanKans, I'm going to add this to the 1.4.1 release

All 8 comments

Hi @asciijungle,

In order to connect, paho.mqtt.golang package is using net.DialTimeout method link

Looks like there is a timeout set by default to 30s (ConnectTimeout: (time.Duration) 30s) and this behavior is visible in additional info that you've provided:

2017-08-24T12:26:37Z I! Agent Config: Interval:10s, Quiet:false, Hostname:"639a81d8ad20", Flush Interval:10s 

30s later:

2017-08-24T12:27:07Z E! Service for input inputs.mqtt_consumer failed to start, exiting
Network Error : dial tcp 192.168.178.46:1883: i/o timeout

IMHO the best thing that we can do is to expose this variable as configurable in plugin configuration section.

@danielnelson please let me know what do you think and I will prepare MR.

Thanks

I agree we probably should expose a connection_timeout option.

Having a long timeout may not always be sufficient here though, since other errors are possible on the socket such as connection refused if it is not listening yet.

So I also think we should remove the requirement that the initial connection be available when Telegraf starts. I think this is in line with expectations, since we don't have the same requirement for non-service inputs.

Hi @danielnelson,

could you please provide me more details about how to remove requirement for initial connection. As I understand (correct me if I'm wrong), we should provide some "retry connection loop" with predefined counter + back off mechanism to avoid situations when telegraf quits because endpoint is not ready.

Do you have something already implemented in some plugin?

Thanks!

I think in this case retry is provided by the client library, though I don't know the details of how it works. I am basing this on this bit of code:

https://github.com/influxdata/telegraf/blob/052e88ad5ea5fa5872fba8998f1bf819ce418acb/plugins/inputs/mqtt_consumer/mqtt_consumer.go#L154

So hopefully we only need to remove the part where we wait for the Connect function to complete, and the client code will do the backoff and retry loop for us, though this will need to be verified.

There is only one function (SetConnectionLostHandler) and it is handling situation where connection has been lost with the endpoint after it was established. I was looking into client code and there is no retry logic in it. Due to "blocker nature" of Start() method, I'm not able to easily implement infinity "connection wait" loop inside it without redesigning the whole plugin by adding connection in a separate go routine.
If you have some better idea, please let me know.

So the library handles reconnects if the initial Connect succeeds, but not otherwise?

Perhaps if we introduce a connected boolean to the plugin we can attempt to Connect if we haven't connected yet in the Gather function?

As far as I know it is like this.

I'm working on it in https://github.com/influxdata/telegraf/pull/3202 to accommodate what you have mention. Could you please take a look?

Thanks @DanKans, I'm going to add this to the 1.4.1 release

Was this page helpful?
0 / 5 - 0 ratings