Python: Keep the watch action working all the time

Created on 13 Feb 2017  路  15Comments  路  Source: kubernetes-client/python

Edited by @mbohlool:
This question lead to an action item to add a retry mechanism to watch class. It should be controlled by a flag and will result in keeping the watch open all the time.

Original post:

below is how i use the client-python in list.py:

config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()
for event in w.stream(v1.list_persistent_volume_claim_for_all_namespaces):
print("Event: %s %s" % (event['type'], event['object'].metadata.name))

when i run the script with command "python list.py", it will show the event normally,
however i will exited automatic in several minitues.

does anybody konws how could i keep this watch action working all the time?

help wanted help-needed lifecyclrotten

Most helpful comment

every time you get an event, store event['object'].metadata.resource_version into a variable (let say last_seen_version) and when you want to reconnect, pass it like this w.stream(v1.list_namespace, resource_version=last_seen_version)

All 15 comments

Do you get any error? did you try to set timeout_seconds parameter?

I see this happening with one certain cluster, but it works fine with another. So this might be some timeout on somthing in between, like load-balancer or so. But it would be cool if the watch could just do a retry in case the connection is lost.

@mbohlool i don't set the timeout_seconds parameter, use the default value, but how can i keep the watch work all the time, like the @webwurst said.

+1.
I have several threads using the watcher to monitor different types of K8S resources, and I see their connection close every 30-35 min[0]. My application then resumes the watch but when that happens all the items for a given type are returned, which is undesirable. Not sure what is causing this 30 min timeout, the API server runs in a pod on the same host as my watchers do.

Is there a way to store a watch "version number" every time I receive an event that can be used whenever I resume the stream, so that I only get events subsequent to that point?

[0] I can tell looking at the list response "closed" attribute.

look at resource_version parameter. the returned object's metadata should also have resource_version. pass resource_version of the last received object to the new call.

Thanks for the answer.
I'm not sure how to use resource_version without doing a full list for that object type first. When the streaming process is just started it makes sense, but I want to avoid a full list call every time the connection breaks. When I get an event from the watch.stream function I get something like this:

{'raw_object': {u'status': {u'phase': u'Active'}, u'kind': u'Namespace', u'spec': {u'finalizers': [u'kubernetes']}, u'apiVersion': u'v1', u'metadata': {u'name': u'demo', u'resourceVersion': u'921', u'creationTimestamp': u'2017-04-28T22:23:54Z', u'selfLink': u'/api/v1/namespacesdemo', u'uid': u'5d5cd386-2c61-11e7-9d39-84b261c2790e'}}, u'object': {u'status': {u'phase': u'Active'}, u'kind': u'Namespace', u'spec': {u'finalizers': [u'kubernetes']}, u'apiVersion': u'v1', u'metadata': {u'name': u'demo', u'resourceVersion': u'921', u'creationTimestamp': u'2017-04-28T22:23:54Z', u'selfLink': u'/api/v1/namespacesdemo', u'uid': u'5d5cd386-2c61-11e7-9d39-84b261c2790e'}}, u'type': u'ADDED'}

The only resourceVersion I see here is the one concerning the specific k8s object. Should I set that value as the resource_version argument?

every time you get an event, store event['object'].metadata.resource_version into a variable (let say last_seen_version) and when you want to reconnect, pass it like this w.stream(v1.list_namespace, resource_version=last_seen_version)

I believe the root cause of this issue is the default timeout setting in kube-apiserver:

https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/get.go#L247

        serveWatch(watcher, scope, req, w, timeout)

While the timeout is:

https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/get.go#L231-L237

        timeout := time.Duration(0)
        if opts.TimeoutSeconds != nil {
            timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
        }
        if timeout == 0 && minRequestTimeout > 0 {
            timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
        }

@lichen2013 nice catch but regardless of this I think we should get your reconnect PR in.

@caesarxuchao, you looked at timeout issue before, those this mean API server times out for watch calls and our shared informer reconnect?

cc @roycaihw

Any update on this? I have this code to watch events:

import json
import os
from kubernetes import client, config, watch


if 'KUBERNETES_PORT' in os.environ:
    config.load_incluster_config()
else:
    config.load_kube_config()


v1 = client.CoreV1Api()
w = watch.Watch()

for event in w.stream(v1.list_event_for_all_namespaces, _request_timeout=60):
    print(json.dumps(event['raw_object']))

It runs but if there is no events for some extended amount of time it then dies with this:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 331, in _error_catcher
    yield
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 637, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 569, in _update_chunk_length
    line = self._fp.fp.readline()
  File "/usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 1052, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 911, in read
    return self._sslobj.read(len, buffer)
socket.timeout: The read operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "kubernetes/charts/common/event-logger/files/event-watcher.py", line 18, in <module>
    for event in w.stream(v1.list_event_for_all_namespaces, _request_timeout=60):
  File "/usr/local/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 130, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.7/site-packages/kubernetes/watch/watch.py", line 45, in iter_resp_lines
    for seg in resp.read_chunked(decode_content=False):
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 665, in read_chunked
    self._original_response.close()
  File "/usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 336, in _error_catcher
    raise ReadTimeoutError(self._pool, None, 'Read timed out.')
urllib3.exceptions.ReadTimeoutError: HTTPSConnectionPool(host='xxxxxxxxxxxx.yl4.eu-west-1.eks.amazonaws.com', port=443): Read timed out.

Am I missing something?

Ah I think maybe my problem is the _request_timeout=60 part? If I remove this it runs indefinitely 馃槄

Issues go stale after 90d of inactivity.
Mark the issue as fresh with /remove-lifecycle stale.
Stale issues rot after an additional 30d of inactivity and eventually close.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta.
/lifecycle stale

Stale issues rot after 30d of inactivity.
Mark the issue as fresh with /remove-lifecycle rotten.
Rotten issues close after an additional 30d of inactivity.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta.
/lifecycle rotten

Rotten issues close after 30d of inactivity.
Reopen the issue with /reopen.
Mark the issue as fresh with /remove-lifecycle rotten.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta.
/close

@fejta-bot: Closing this issue.

In response to this:

Rotten issues close after 30d of inactivity.
Reopen the issue with /reopen.
Mark the issue as fresh with /remove-lifecycle rotten.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta.
/close

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Was this page helpful?
0 / 5 - 0 ratings