kubectl cp is able to
behind kubectl cp, seems it uses "exec" with "POST",
POST https://www.mymaster.com/api/v1/namespaces/default/pods/pod-XXXXXXXX-l2b4x/exec?command=tar&command=cf&command=-&command=%2Ftmp%filename.jar&container=containerId&container=containerId&stderr=true&stdout=true
was trying to use exec() to do it, but seems exec always do `GET" request.
Please help.
I see the same thing. Anyone can help?
me too here.
ExecWatch watch = client.pods()
.inNamespace(namespace)
.withName(podName)
.inContainer(containerName)
.redirectingInput()
.redirectingError()
.exec("tar", "xf", "-", "-C", "/");
TarArchiveEntry entry = new TarArchiveEntry(desFilePath);
try (TarArchiveOutputStream tarOut = new TarArchiveOutputStream(watch.getInput());
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();) {
byte[] bytes = new byte[4096];
int count;
while ((count = inputStream.read(bytes)) > 0) {
byteOut.write(bytes, 0, count);
}
entry.setSize(byteOut.size());
tarOut.putArchiveEntry(entry);
byteOut.writeTo(tarOut);
tarOut.closeArchiveEntry();
}
verified, @Mrwangjinxin 's code works.
Note: seems like we need to manually call tarOut.close() to tell kc/http3 that we are done sending data, otherwise it will stuck most of time.
Can someone else confirm that this snippet of code works? It doesn't work in my case, unfortunately.
@ashofthephoenix : Could you please share what error are you facing?
@rohanKanojia i don't get any errors in return - the program keeps running, but nothing is written to the pod.
@zoujinhe : Which openshift/kubernetes version you used in order to test this? Is it still working on latest versions?
@pk044 : Were you able to resolve this issue? Did you try calling tarOut.close() in the end? I feel we should add this to DSL also....
@rohanKanojia hey, i came up with this a few months ago:
try(KubernetesClient client = new DefaultKubernetesClient()) {
final ExecWatch watch = client
.pods()
.inNamespace("default")
.withName("laughing-pug-jenkins-856f744489-tzjkl")
.redirectingInput()
.redirectingError()
.exec("tar", "xf","-","-C", "/");
on(watch.getInput()).field("sink").set("buffer", new byte[8192 * 1024]);
try(final FileInputStream stream = new FileInputStream("/home/x/Development/kubernetes-client/backup/test.file")) {
final TarArchiveEntry entry = new TarArchiveEntry("/test.file");
try (TarArchiveOutputStream tarOut = new TarArchiveOutputStream(watch.getInput());
ByteArrayOutputStream byteOut = new ByteArrayOutputStream()) {
IOUtils.copyLarge(stream, byteOut);
entry.setSize(byteOut.size());
tarOut.putArchiveEntry(entry);
byteOut.writeTo(tarOut);
tarOut.closeArchiveEntry();
}
}
(needs jOOR to be run)
There might be one bug(or a few) in this snippet of code - when i checked the md5sum of copied file it was different than expected. Also, it might randomly throw exceptions about pipe getting closed, 'pipe write end dead' etc.
a similar implementation was written - I haven't checked the following solution, but it should be worth analyzing:
https://github.com/fabric8io/kubernetes-client/issues/1008#issuecomment-448106408
by the way, thanks for looking into this issue!
I followed the implementations above, however, found both to be extremely slow and streams would often break. Originally thought it was the cluster I was working on, however, kubectl cp worked fine and I then pinpointed it to the use of ByteArrayOutputStream.
I then did some digging on how kubectl cp was implemented and it turns out it uses the same internal functionality as kubectl exec with the use of tar. https://github.com/kubernetes/kubernetes/blob/5b496fe8f5933de9b53e35780a486c8f7736d02b/pkg/kubectl/cmd/cp/cp.go#L226
I then replicated kubectl cp with kubectl exec in the following statement
tar -czvf <folder>.tar.gz <folder> && cat <folder>.tar.gz | kctl exec -i <podname> -- tar -xzmf - -C <path_to_target>
Turning this into java:
Process p = Runtime.getRuntime().exec("tar -czvf <folder>.tar.gz <folder> && cat <folder>.tar.gz");
try (ExecWatch watch = client.pods()
.inNamespace(namespace)
.withName(podname)
.readingInput(p.getInputStream())
.redirectingError().exec("tar", "-xzmf", "-", "-C", "<path_to_target>")) {
// Sleep for adequate time to ensure full copy
Thread.sleep(XXXXX);
} catch (Exception e) {
// handle errors
}
It's a bit hacky and requires you to have tar and cat on the host you're executing this on, but I found this to be much more performant and resilient.
@msb217 : We added pod read/copy operations. please have a look at https://github.com/fabric8io/kubernetes-client/blob/b2136a092e9023368ade1a8814a68d27c03fb749/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java#L168-L200
@rohanKanojia While those are operations are useful (and thank you for your work!), those read/copy methods deal with _downloading from_ a pod. The process the folks here are referring to deals with _uploading to_ a pod.
We've had to implement our own solution following what other people in this thread have done, but it would be super great to have an officially supported operation for this! 馃檹 馃槂
Can someone turn this into python code pls?
@xaoo : Sorry but I couldn't since I have never coded in python. Plus fabric8 client is very different syntax from the official.
@orendain : Sorry, I missed your comment somehow. I think it's a valid use case and it would be great if we support it. I'm not sure we should reopen this issue and continue from here. Could you please file a new issue as a feature request for uploading data to pod(so that this idea doesn't get lost :smile: )?
Np. I did it a few hours later forgot to share :(
exec_command = ['/bin/sh']
resp = stream(api.connect_get_namespaced_pod_exec, pod, ns[0],
container='airflow-worker',
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
# Encode file
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
# if json
# buffer = json.dumps(pubsub_message, indent=2).encode('utf-8')
# if str, ex.: PubSub
pubsub_message = bytes(pubsub_message, 'utf-8')
destination_file = '/home/airflow/gcs/data/var.json'
commands = [bytes("cat <<'EOF' >" + destination_file + "\n", 'utf-8'), pubsub_message,
bytes("\n" + "EOF\n", 'utf-8')]
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
pprint("STDOUT: %s" % resp.read_stdout())
if resp.peek_stderr():
pprint("STDERR: %s" % resp.read_stderr())
if commands:
c = commands.pop(0)
# pprint("Running command... %s\n" % c)
resp.write_stdin(str(c, encoding='utf-8'))
else:
break
resp.close()
Np. I did it a few hours later forgot to share :(
exec_command = ['/bin/sh'] resp = stream(api.connect_get_namespaced_pod_exec, pod, ns[0], container='airflow-worker', command=exec_command, stderr=True, stdin=True, stdout=True, tty=False, _preload_content=False) # Encode file pubsub_message = base64.b64decode(event['data']).decode('utf-8') # if json # buffer = json.dumps(pubsub_message, indent=2).encode('utf-8') # if str, ex.: PubSub pubsub_message = bytes(pubsub_message, 'utf-8') destination_file = '/home/airflow/gcs/data/var.json' commands = [bytes("cat <<'EOF' >" + destination_file + "\n", 'utf-8'), pubsub_message, bytes("\n" + "EOF\n", 'utf-8')] while resp.is_open(): resp.update(timeout=1) if resp.peek_stdout(): pprint("STDOUT: %s" % resp.read_stdout()) if resp.peek_stderr(): pprint("STDERR: %s" % resp.read_stderr()) if commands: c = commands.pop(0) # pprint("Running command... %s\n" % c) resp.write_stdin(str(c, encoding='utf-8')) else: break resp.close()@xaoo
what is pubsub_message for?
@zffocussss : I think we are supporting it via DSL too. Check out : https://github.com/fabric8io/kubernetes-client/blob/3e4518a6181dd8b3abb4329ca59f910351128d5a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java#L264
@zffocussss : I think we are supporting it via DSL too. Check out :
https://github.com/fabric8io/kubernetes-client/blob/3e4518a6181dd8b3abb4329ca59f910351128d5a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java#L264
is pubsub_message for a text command which will be execuetd?
I got an traceback error running the code (python.py
Python 3.6.8
Traceback (most recent call last):
File "repl.py", line 2, in
resp = stream(api.connect_get_namespaced_pod_exec, pod, ns[0],
NameError: name 'stream' is not defined
Np. I did it a few hours later forgot to share :(
exec_command = ['/bin/sh'] resp = stream(api.connect_get_namespaced_pod_exec, pod, ns[0], container='airflow-worker', command=exec_command, stderr=True, stdin=True, stdout=True, tty=False, _preload_content=False) # Encode file pubsub_message = base64.b64decode(event['data']).decode('utf-8') # if json # buffer = json.dumps(pubsub_message, indent=2).encode('utf-8') # if str, ex.: PubSub pubsub_message = bytes(pubsub_message, 'utf-8') destination_file = '/home/airflow/gcs/data/var.json' commands = [bytes("cat <<'EOF' >" + destination_file + "\n", 'utf-8'), pubsub_message, bytes("\n" + "EOF\n", 'utf-8')] while resp.is_open(): resp.update(timeout=1) if resp.peek_stdout(): pprint("STDOUT: %s" % resp.read_stdout()) if resp.peek_stderr(): pprint("STDERR: %s" % resp.read_stderr()) if commands: c = commands.pop(0) # pprint("Running command... %s\n" % c) resp.write_stdin(str(c, encoding='utf-8')) else: break resp.close()@xaoo
what is pubsub_message for?
You can ignor it, it a Google Cloud API.
PubSub is an asynchronous messaging service.
Most helpful comment
@rohanKanojia While those are operations are useful (and thank you for your work!), those read/copy methods deal with _downloading from_ a pod. The process the folks here are referring to deals with _uploading to_ a pod.
We've had to implement our own solution following what other people in this thread have done, but it would be super great to have an officially supported operation for this! 馃檹 馃槂