Hi,
I'm still testing Airflow and still can't get all the examples running correctly. The tutorial example runs, but now I tried following:
airflow backfill example_python_operator -s 2015-07-25
Note, this is the basic install, I did update models.py according to (https://github.com/airbnb/airflow/commit/97c98b12f517b06c21f834841cb5cf4c0b59255d); dropped the database followed by airflow initdb.
The worker (started with "airflow worker") gave the following errors:
[2015-07-31 02:43:26,993: WARNING/MainProcess] celery@BNA001082-SRV01 ready.
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 100, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2445, in first
ret = list(self[0:1])
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2281, in __getitem__
return list(res)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 86, in instances
util.raise_from_cause(err)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 71, in instances
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 428, in _instance
loaded_instance, populate_existing, populators)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
dict_[key] = getter(row)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/sqltypes.py", line 1261, in process
return loads(value)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 209, in loads
return load(file)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 199, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 353, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named unusual_prefix_example_python_operator
[2015-07-31 02:26:16,474: ERROR/Worker-2] 1
[2015-07-31 02:26:16,502: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c788e876-86bf-4be3-87b3-7276b7a4b4c8] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 42, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed
What I also see is that the mesage broker is not really cleaned up after such errors, messages keep on piling up.
Can anyone help me out here, I changed the database to SQLite, same problem, cannot get this example running (I use Celery and RabbitMQ). Any idea? Which configuration does work?
Discovered a cause. When I run the backfill, celery executor tries to run this command: airflow run example_python_operator print_the_context 2015-08-10T00:00:00 --pickle 6 --local -s 2015-08-10T00:00:00
The --pickle 6 argument seems to cause the error, no idea what this means but when I leave it out it runs correct. I found out by adding some print statements in the celery_executor.py file if that would help to understand.
Hope this helps to identify the problem.
I don't think sqlite supports any sort of concurrency, I'd be surprised if it worked along with Celery.
Do you have a homogenous configuration (all nodes talking to the same database)?
The backfill logs the commands it puts on the message queue, to debug you can take that statement, and try to run it on a worker node and it should run properly. All of this should be done while trying to recreate the environmental conditions under which your worker operates (same box, same env vars, same user, ...)
Everything runs on one machine, postgres as backend database. Ubuntu server. When I run the command manually that the backfill generates I get same error:
airflow run example_python_operator sleep_for_3 2015-08-10T00:00:00 --pickle 9 --local -s 2015-08-10T00:00:00
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 100, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2445, in first
ret = list(self[0:1])
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2281, in __getitem__
return list(res)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 86, in instances
util.raise_from_cause(err)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 71, in instances
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 428, in _instance
loaded_instance, populate_existing, populators)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
dict_[key] = getter(row)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/sqltypes.py", line 1261, in process
return loads(value)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 209, in loads
return load(file)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 199, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 353, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named unusual_prefix_example_python_operator
The worker generates this error:
[2015-08-21 06:33:30,947: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[b3ef0e9a-0e11-4892-8ece-1ff3fb946077] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 45, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed
Additionally I tried Mysql now as database backend, same error, identical: Server version: 5.5.44-0ubuntu0.14.04.1 (Ubuntu)
If you run airflow list_dags on the worker, do you see example_python_operator in the list?
Oh btw, github trick: use triple backticks "```" to enclose code or logs in comments, makes it much more readable.
Yes, it's there. What I don't understand is this error message:
ImportError: No module named unusual_prefix_example_python_operator
Where does that 'unusual_prefix_' come from...?
When I run this, it works:
airflow run example_python_operator sleep_for_3 2015-08-10T00:00:00 --local -s 2015-08-10T00:00:00
ie. without the pickle option, don't know what that means
The pickle is the mechanism by which your local DAG gets serialized and shipped to the worker. Pickling is the common term for serializing in Python.
Airflow attemps to pickle your DAG, ships it to the metadata database and gets an id associated with it. This id is passed around to the workers so that they pick up the pickle, deserialize it and run it.
unusual_prefix_ is a hack to avoid conflicts in sys.modules as Airflow attempts to refresh your DagBag from the underlying changing code.
For now and until I manage to recreate the bug and fix it, I'd say use --local when running backfills. Is the scheduler working ok for you?
It does seem to work with the --local. Is there a chat where i can pose smaller questions. Airflow keeps on confusing me. When I run for example the other example: and use this command:
airflow backfill example_branch_operator --local -a -s 2015-08-10; I would expect it to process all sub-items. It never processes branch_d for example. Any idea?
Hi guys,
After some digs, I guess this issue is related to the pickler used in airflow, dill. Generally, dill is not able to unpickle modules.
To be short, if you want airflow being able to execute specific task on a remote box, you have to put the 'unusual_prefix_...' module on PYTHONPATH. There're two ways you can do that:
1) Copy all of the $AIRFLOW_HOME/dags directory to remote box, and start task instance with '-sd' argument provided, namely, 'airflow run example_dag .. -sd '. This way, airflow would scan the provided subdir, and import dag modules as 'unusual_prefix_**' names.
In this way, we're actually not using the pickling feature of airflow.
2) Ask the author of dill to add module serializing support.
The related issue is here.
hi, @mistercrunch could you please give any hints on this ?
I also want to +1 this issue. Pickling hasn't worked for me when using celery to send tasks to worker nodes. We're using Airflow 1.5. One comment which is a side note that didn't solve the problem is that we upgraded airflow from a previous version, and the column size of the pickle_hash was increased, so initially the column type was Int and it was changed to BigInt so that explained initially why things couldn't be unpickled.
ALTER TABLE dag_pickle MODIFY pickle_hash BIGINT;
But after fixing that issue we still couldn't get unpickling to work. Planning on digging into this more at some point when I've got more bandwidth.
+1 Any update on this issue?
Can someone confirm that this is still a problem with the recent changes in trunk?
If so I'll try to find time to reproduce the bug and dig in. In the meantime running the commands with --local should work without problems.
@mistercrunch Pypi version is still having this issue but master branch does not have this issue anymore.
I tested it out initiating workers on local machine as well as remote. Didnt see any problem running example dag.
Is there a timeline on when this could be pushed up to pypi?
I'm able to do a task "run" without a problem, but if I try to a dag "backfill" it fails with the same pickle error as above. @wxiang7 I also tried using -sd but that also give the same error.
if you want airflow being able to execute specific task on a remote box . . .
I'm a bit confused by this, and maybe I am missing something, but isn't this the main point of Celery execution? Is there some other technique of using celery workers which avoids this? How does Airbnb kick off jobs on remote workers?
@bryan0
When you use '-sd' parameter (plus '--local'), you should ensure dag definition files exist on your remote executor disk.
To make "real remote execution" happens, I actually forked the underlying pickler used by airflow (dill), and tweaked airflow a little bit to make remote execution works. But I've no time to merge it upstream for now.
If you're interested in what's going on, you can check the following repos
https://github.com/wxiang7/dill
https://github.com/wxiang7/airflow
Most helpful comment
Hi guys,
After some digs, I guess this issue is related to the pickler used in airflow, dill. Generally, dill is not able to unpickle modules.
To be short, if you want airflow being able to execute specific task on a remote box, you have to put the 'unusual_prefix_...' module on PYTHONPATH. There're two ways you can do that:
1) Copy all of the $AIRFLOW_HOME/dags directory to remote box, and start task instance with '-sd' argument provided, namely, 'airflow run example_dag .. -sd '. This way, airflow would scan the provided subdir, and import dag modules as 'unusual_prefix_**' names.
In this way, we're actually not using the pickling feature of airflow.
2) Ask the author of dill to add module serializing support.
The related issue is here.
hi, @mistercrunch could you please give any hints on this ?