celery -A proj report
in the issue.master
branch of Celery.1 .deploy_tomcat2.py
`from .AnsibleApi import CallApi
def django_process(jira_num):
server = '10.10.51.30'
name = 'abc'
port = 11011
code = 'efs'
jdk = '1.12.13'
jvm = 'xxxx'
if str.isdigit(jira_num):
# import pdb
# pdb.set_trace()
call = CallApi(server,name,port,code,jdk,jvm)
return call.run_task()
`
import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)
class ResultCallback(CallbackBase):
def __init__(self, args, *kwargs):
super(ResultCallback ,self).__init__(args, *kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
def v2_runner_on_unreachable(self, result):
self.host_unreachable[result._host.get_name()] = result
def v2_runner_on_ok(self, result, *args, **kwargs):
self.host_ok[result._host.get_name()] = result
def v2_runner_on_failed(self, result, *args, **kwargs):
self.host_failed[result._host.get_name()] = result
class CallApi(object):
user = settings.SSH_USER
ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
results_callback = ResultCallback()
Options = namedtuple('Options',
['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
'become_user', 'check'])
def __init__(self,ip,name,port,code,jdk,jvm):
self.ip = ip
self.name = name
self.port = port
self.code = code
self.jdk = jdk
self.jvm = jvm
self.results_callback = ResultCallback()
self.results_raw = {}
def _gen_user_task(self):
tasks = []
deploy_script = 'autodeploy/abc.sh'
dst_script = '/tmp/abc.sh'
cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
# tasks.append(dict(action=dict(module='command', args=args)))
# tasks.append(dict(action=dict(module='command', args=args), register='result'))
# tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
self.tasks = tasks
def _set_option(self):
self._gen_user_task()
self.variable_manager = VariableManager()
self.loader = DataLoader()
self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
become=True, become_method='sudo', become_user='root', check=False)
self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
self.variable_manager.set_inventory(self.inventory)
play_source = dict(
name = "auto deploy tomcat",
hosts = self.ip,
remote_user = self.user,
gather_facts='no',
tasks = self.tasks
)
self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
def run_task(self):
self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
tqm = None
from celery.contrib import rdb
rdb.set_trace()
self._set_option()
try:
tqm = TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
passwords=None,
stdout_callback=self.results_callback,
)
result = tqm.run(self.play)
finally:
if tqm is not None:
tqm.cleanup()
for host, result in self.results_callback.host_ok.items():
self.results_raw['success'][host] = result._result
for host, result in self.results_callback.host_failed.items():
self.results_raw['failed'][host] = result._result
for host, result in self.results_callback.host_unreachable.items():
self.results_raw['unreachable'][host]= result._result
Log.info("result is :%s" % self.results_raw)
return self.results_raw
3.tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from .deploy_tomcat2 import django_process
@shared_task
def deploy(num):
#return 'hello world {0}'.format(num)
#rdb.set_trace()
return django_process(num)`
In django console :python manage.py shell , I can use deploy('123') ,code call ansibleApi to copy abc.sh to dst server sucessfully.
But I didn't work when I used 'deploy.apply_async(args=['1334'], queue='queue.ops.deploy', routing_key='ops.deploy')'
I searched a few day ,still doesn't work out .
AnsibleApi return empty dict :{'success': {}, 'failed': {}, 'unreachable': {}}
there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get('daemon'), \ 'daemonic processes are not allowed to have children'
@Xuexiang825 Sorry, it doesn't work, I use Python 3.6.3 and ansible 2.4.2.0, celery 4.1.0. My playbook executer display message but do nothing.Furthermore, can you tell me something about parameter -O OPTIMIZATION?
@Xuexiang825 我刚刚看了你在这篇文章中的解答,我发现你也是中国人,那我就用中文了,我用了export 变量好使了! 但是-O 参数好像并没有效果,celery worker -A celery_worker.celery --loglevel=info -O OPTIMIZATION 这是我的启动命令。 感觉没什么不对的地方,对吧
Most helpful comment
there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get('daemon'), \ 'daemonic processes are not allowed to have children'