Pass execution date as a datetime object to MongoToS3Operator

2018-10-22 Thread Kyle Hamlin
Hi,

I'm having an issue where I want to pass the dags execution_date to the
query parameter in the MongoToS3Operator via templating. The templating
works properly, however, it appears that pymongo will only filter date
fields when passed a datetime object, and while the underlying object in
the template '{{ execution_date }}' is a datetime object once rendered it
is only a string thus MongoToS3Operator is unable to filter. Furthermore, I
cannot parse a template to a datetime object since dags are processed by
the scheduler at a configured interval resulting in an error. Is there a
way around this that I don't know about or am overlooking?

-- 
Kyle Hamlin


Re: Flask-AppBuilder has pinned versions of Click & Flask-Login in 1.10.0

2018-10-05 Thread Kyle Hamlin
Thank for this Ash. Pipenv works very well in 99% of cases and is vastly
better than managing requirements files. Also, PYPA (Python Packaging
Authority) officially recommends Pipenv.

I gave poetry a try and it seems like it has a lot of potential. I did run
into two errors, a max recursion depth when installing Airflow with a lot
of extras, and when installing Airflow without any extras an issue
installing lxml.

On Fri, Oct 5, 2018 at 4:29 AM Ash Berlin-Taylor  wrote:

> Oh I meant FAB 1.11.1.
>
> And it looks like the Jinja issue is a bug in pip-tools, where it treats a
> dep of "jina" as actually being "jinja>=CURRENT"
> https://github.com/pypa/pipenv/issues/2596 <
> https://github.com/pypa/pipenv/issues/2596>
>
> In short: pip-env isn't ready for real-world use yet? (I'm guessing and
> extrapolating, but I haven't used it myself so don't trust my word on this)
>
> -ash
> > On 4 Oct 2018, at 16:38, Kyle Hamlin  wrote:
> >
> > If I remove the Flask-AppBuild pinning to 1.11.0 then it uncovers a
> Jinja2
> > conflict which is baffling because I don't see anywhere in the graph that
> > jinja2 >=2.10 is required.
> >
> > Could not find a version that matches
> > jinja2<2.9.0,>=2.10,>=2.4,>=2.5,>=2.7.3,>=2.8
> > Tried: 2.0, 2.1, 2.1.1, 2.2, 2.2.1, 2.3, 2.3.1, 2.4, 2.4.1, 2.5, 2.5.1,
> > 2.5.2, 2.5.3, 2.5.4, 2.5.5, 2.6, 2.7, 2.7.1, 2.7.2, 2.7.3, 2.8, 2.8,
> 2.8.1,
> > 2.8.1, 2.9, 2.9, 2.9.1, 2.9.1, 2.9.2, 2.9.2, 2.9.3, 2.9.3, 2.9.4, 2.9.4,
> > 2.9.5, 2.9.5, 2.9.6, 2.9.6, 2.10, 2.10
> >
> > I highlighted why the dep fails there one dep that requires Jinjs2 <
> 2.9.0
> > but I still have not idea where the 2.10.0 comes from.
> >
> > apache-airflow==2.0.0.dev0+incubating
> >  - alembic [required: >=0.9,<1.0, installed: 0.9.10]
> >- Mako [required: Any, installed: 1.0.7]
> >  - MarkupSafe [required: >=0.9.2, installed: 1.0]
> >- python-dateutil [required: Any, installed: 2.7.3]
> >  - six [required: >=1.5, installed: 1.11.0]
> >- python-editor [required: >=0.3, installed: 1.0.3]
> >- SQLAlchemy [required: >=0.7.6, installed: 1.1.18]
> >  - bleach [required: ~=2.1.3, installed: 2.1.4]
> >- html5lib [required:
> >>
> =0.pre,!=1.0b8,!=1.0b7,!=1.0b6,!=1.0b5,!=1.0b4,!=1.0b3,!=1.0b2,!=1.0b1,
> > installed: 1.0.1]
> >  - six [required: >=1.9, installed: 1.11.0]
> >  - webencodings [required: Any, installed: 0.5.1]
> >- six [required: Any, installed: 1.11.0]
> >  - configparser [required: >=3.5.0,<3.6.0, installed: 3.5.0]
> >  - croniter [required: >=0.3.17,<0.4, installed: 0.3.25]
> >- python-dateutil [required: Any, installed: 2.7.3]
> >  - six [required: >=1.5, installed: 1.11.0]
> >  - dill [required: >=0.2.2,<0.3, installed: 0.2.8.2]
> >  - flask [required: >=0.12.4,<0.13, installed: 0.12.4]
> >- click [required: >=2.0, installed: 7.0]
> >- itsdangerous [required: >=0.21, installed: 0.24]
> >- Jinja2 [required: >=2.4, installed: 2.8.1]
> >  - MarkupSafe [required: Any, installed: 1.0]
> >- Werkzeug [required: >=0.7, installed: 0.14.1]
> >  - flask-admin [required: ==1.4.1, installed: 1.4.1]
> >- Flask [required: >=0.7, installed: 0.12.4]
> >  - click [required: >=2.0, installed: 7.0]
> >  - itsdangerous [required: >=0.21, installed: 0.24]
> >  - Jinja2 [required: >=2.4, installed: 2.8.1]
> >- MarkupSafe [required: Any, installed: 1.0]
> >  - Werkzeug [required: >=0.7, installed: 0.14.1]
> >- wtforms [required: Any, installed: 2.2.1]
> >  - flask-appbuilder [required: >=1.12,<2.0.0, installed: 1.12.0]
> >- click [required: ==6.7, installed: 7.0]
> >- colorama [required: ==0.3.9, installed: 0.3.9]
> >- Flask [required: >=0.10.0,<0.12.99, installed: 0.12.4]
> >  - click [required: >=2.0, installed: 7.0]
> >  - itsdangerous [required: >=0.21, installed: 0.24]
> >  - Jinja2 [required: >=2.4, installed: 2.8.1]
> >- MarkupSafe [required: Any, installed: 1.0]
> >  - Werkzeug [required: >=0.7, installed: 0.14.1]
> >- Flask-Babel [required: ==0.11.1, installed: 0.11.1]
> >  - Babel [required: >=2.3, installed: 2.6.0]
> >- pytz [required: >=0a, installed: 2018.5]
> >  - Flask [required: Any, installed: 0.12.4]
> >- click [required: >=2.0, installed: 7.0]
> >- itsdangerous [required: >=0.21, installed: 0.24]
> >- Jinja2 [required: >=2.4

Re: Flask-AppBuilder has pinned versions of Click & Flask-Login in 1.10.0

2018-10-04 Thread Kyle Hamlin
-WTF [required: ==0.14.2, installed: 0.14.2]
  - Flask [required: Any, installed: 0.12.4]
- click [required: >=2.0, installed: 7.0]
- itsdangerous [required: >=0.21, installed: 0.24]
- Jinja2 [required: >=2.4, installed: 2.8.1]
  - MarkupSafe [required: Any, installed: 1.0]
- Werkzeug [required: >=0.7, installed: 0.14.1]
  - WTForms [required: Any, installed: 2.2.1]
- python-dateutil [required: >=2.3,<3, installed: 2.7.3]
  - six [required: >=1.5, installed: 1.11.0]
  - flask-caching [required: >=1.3.3,<1.4.0, installed: 1.3.3]
- Flask [required: Any, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
- Werkzeug [required: >=0.12, installed: 0.14.1]
  - flask-login [required: >=0.3,<0.5, installed: 0.4.1]
- Flask [required: Any, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
  - flask-swagger [required: ==0.2.13, installed: 0.2.13]
- Flask [required: >=0.10, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
- PyYAML [required: >=3.0, installed: 3.13]
  - flask-wtf [required: >=0.14.2,<0.15, installed: 0.14.2]
- Flask [required: Any, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
- WTForms [required: Any, installed: 2.2.1]
  - funcsigs [required: ==1.0.0, installed: 1.0.0]
- ordereddict [required: Any, installed: 1.1]
  - future [required: >=0.16.0,<0.17, installed: 0.16.0]
  - gitpython [required: >=2.0.2, installed: 2.1.11]
- gitdb2 [required: >=2.0.0, installed: 2.0.4]
  - smmap2 [required: >=2.0.0, installed: 2.0.4]
  - gunicorn [required: >=19.4.0,<20.0, installed: 19.9.0]
  - iso8601 [required: >=0.1.12, installed: 0.1.12]
  - jinja2 [required: >=2.7.3,<2.9.0, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - lxml [required: >=4.0.0, installed: 4.2.5]
  - markdown [required: >=2.5.2,<3.0, installed: 2.6.11]
  - pandas [required: >=0.17.1,<1.0.0, installed: 0.23.4]
- numpy [required: >=1.9.0, installed: 1.15.2]
- python-dateutil [required: >=2.5.0, installed: 2.7.3]
  - six [required: >=1.5, installed: 1.11.0]
- pytz [required: >=2011k, installed: 2018.5]
  - pendulum [required: ==1.4.4, installed: 1.4.4]
- python-dateutil [required: >=2.6.0.0,<3.0.0.0, installed: 2.7.3]
  - six [required: >=1.5, installed: 1.11.0]
- pytzdata [required: >=2018.3.0.0, installed: 2018.5]
- tzlocal [required: >=1.5.0.0,<2.0.0.0, installed: 1.5.1]
  - pytz [required: Any, installed: 2018.5]
  - psutil [required: >=4.2.0,<6.0.0, installed: 5.4.7]
  - pygments [required: >=2.0.1,<3.0, installed: 2.2.0]
  - python-daemon [required: >=2.1.1,<2.2, installed: 2.1.2]
- docutils [required: Any, installed: 0.14]
- lockfile [required: >=0.10, installed: 0.12.2]
- setuptools [required: Any, installed: 40.4.3]
  - python-dateutil [required: >=2.3,<3, installed: 2.7.3]
- six [required: >=1.5, installed: 1.11.0]
  - python-nvd3 [required: ==0.15.0, installed: 0.15.0]
- Jinja2 [required: >=2.8, installed: 2.8.1]
  - MarkupSafe [required: Any, installed: 1.0]
- python-slugify [required: >=1.2.5, installed: 1.2.6]
  - Unidecode [required: >=0.04.16, installed: 1.0.22]
  - requests [required: >=2.5.1,<3, installed: 2.19.1]
- certifi [required: >=2017.4.17, installed: 2018.8.24]
- chardet [required: >=3.0.2,<3.1.0, installed: 3.0.4]
- idna [required: >=2.5,<2.8, installed: 2.7]
- urllib3 [required: >=1.21.1,<1.24, installed: 1.23]
  - setproctitle [required: >=1.1.8,<2, installed: 1.1.10]
  - sqlalchemy [required: >=1.1.15,<1.2.0, installed: 1.1.18]
  - tabulate [required: >=0.7.5,<=0.8.2, installed: 0.8.2]
  - tenacity [required: ==4.8.0, installed: 4.8.0]
- monotonic [required: >=0.6, installed: 1.5]
- six [required: >=1.9.0, installed: 1.11.0]
  - thrift [required: &

Re: Flask-AppBuilder has pinned versions of Click & Flask-Login in 1.10.0

2018-10-04 Thread Kyle Hamlin
Thank you for the response Ash.

Even with your suggestion, there appear to be version conflicts all over
the place. Can you get this Pipfile to install because I cannot?

*Pipfile:*

[[source]]
url = "https://pypi.python.org/simple; [[source]]
verify_ssl = true
name = "pypi"

[packages]
apache-airflow = {editable = true, ref =
"fb5ffd146a5a33820cfa7541e5ce09098f3d541a", git = "
https://github.com/apache/incubator-airflow.git;, extras = ["s3", "slack",
"kubernetes", "celery", "postgres", "mongo", "crypto"]}
Flask-AppBuilder="==1.11.0"

[requires]
python_version = "3.6"






On Thu, Oct 4, 2018 at 10:50 AM Ash Berlin-Taylor  wrote:

> We've committed a fix for this to master and will include it in a 1.10.1
> https://github.com/apache/incubator-airflow/commit/fb5ffd146a5a33820cfa7541e5ce09098f3d541a
>
>
> For installing in the mea time pin `Flask-AppBuilder=1.11.0'
>
> > On 4 Oct 2018, at 00:41, Kyle Hamlin  wrote:
> >
> > Hi,
> >
> > Today I was trying to upgrade Airflow to 1.10.0 and it appears that there
> > are some version conflicts with click and flask-login. I uncovered these
> > because I use Pipenv to manage our project's dependencies. You can see
> here
> > that Flask-AppBuilder pins click==6.7 and flask-login>=0.3,<0.5
> >
> >
> https://github.com/dpgaspar/Flask-AppBuilder/blob/master/setup.py#L37-L47
> >
> > I'm able to force pipenv to install click==6.7 because that is not pinned
> > in Airflow's setup.py, but I can do nothing about flask-login because
> > Airflow pins the flask-login version:
> > https://github.com/apache/incubator-airflow/blob/master/setup.py#L304
> >
> > This prevents me from being able to upgrade to 1.10.0.
> >
> > *Pipenv's Graphed project dependencies (conflicts highlighted):*
> >
> > apache-airflow==1.10.0
> >  - alembic [required: >=0.8.3,<0.9, installed: 0.8.10]
> >- Mako [required: Any, installed: 1.0.7]
> >  - MarkupSafe [required: >=0.9.2, installed: 1.0]
> >- python-editor [required: >=0.3, installed: 1.0.3]
> >- SQLAlchemy [required: >=0.7.6, installed: 1.2.12]
> >  - bleach [required: ==2.1.2, installed: 2.1.2]
> >- html5lib [required:
> >>
> =0.pre,!=1.0b8,!=1.0b7,!=1.0b6,!=1.0b5,!=1.0b4,!=1.0b3,!=1.0b2,!=1.0b1,
> > installed: 1.0.1]
> >  - six [required: >=1.9, installed: 1.11.0]
> >  - webencodings [required: Any, installed: 0.5.1]
> >- six [required: Any, installed: 1.11.0]
> >  - configparser [required: >=3.5.0,<3.6.0, installed: 3.5.0]
> >  - croniter [required: >=0.3.17,<0.4, installed: 0.3.25]
> >- python-dateutil [required: Any, installed: 2.7.3]
> >  - six [required: >=1.5, installed: 1.11.0]
> >  - dill [required: >=0.2.2,<0.3, installed: 0.2.8.2]
> >  - flask [required: >=0.12.4,<0.13, installed: 0.12.4]
> >- click [required: >=2.0, installed: 7.0]
> >- itsdangerous [required: >=0.21, installed: 0.24]
> >- Jinja2 [required: >=2.4, installed: 2.8.1]
> >  - MarkupSafe [required: Any, installed: 1.0]
> >- Werkzeug [required: >=0.7, installed: 0.14.1]
> >  - flask-admin [required: ==1.4.1, installed: 1.4.1]
> >- Flask [required: >=0.7, installed: 0.12.4]
> >  - click [required: >=2.0, installed: 7.0]
> >  - itsdangerous [required: >=0.21, installed: 0.24]
> >  - Jinja2 [required: >=2.4, installed: 2.8.1]
> >- MarkupSafe [required: Any, installed: 1.0]
> >  - Werkzeug [required: >=0.7, installed: 0.14.1]
> >- wtforms [required: Any, installed: 2.2.1]
> >  - flask-appbuilder [required: >=1.11.1,<2.0.0, installed: 1.12.0]
> >- click [required: ==6.7, installed: 7.0]
> >- colorama [required: ==0.3.9, installed: 0.3.9]
> >- Flask [required: >=0.10.0,<0.12.99, installed: 0.12.4]
> >  - click [required: >=2.0, installed: 7.0]
> >  - itsdangerous [required: >=0.21, installed: 0.24]
> >  - Jinja2 [required: >=2.4, installed: 2.8.1]
> >- MarkupSafe [required: Any, installed: 1.0]
> >  - Werkzeug [required: >=0.7, installed: 0.14.1]
> >- Flask-Babel [required: ==0.11.1, installed: 0.11.1]
> >  - Babel [required: >=2.3, installed: 2.6.0]
> >- pytz [required: >=0a, installed: 2018.5]
> >  - Flask [required: Any, installed: 0.12.4]
> >- click [required: >=2.0, installed: 7.0]
> >- itsdangerous [required: >=0.21, installed: 0.24]
> > 

Re: Flask-AppBuilder has pinned versions of Click & Flask-Login in 1.10.0

2018-10-04 Thread Kyle Hamlin
whoops remove the [[source]] at the end of the url = "
https://pypi.python.org/simple; that is a typo.

On Thu, Oct 4, 2018 at 11:26 AM Kyle Hamlin  wrote:

> Thank you for the response Ash.
>
> Even with your suggestion, there appear to be version conflicts all over
> the place. Can you get this Pipfile to install because I cannot?
>
> *Pipfile:*
>
> [[source]]
> url = "https://pypi.python.org/simple; [[source]]
> verify_ssl = true
> name = "pypi"
>
> [packages]
> apache-airflow = {editable = true, ref =
> "fb5ffd146a5a33820cfa7541e5ce09098f3d541a", git = "
> https://github.com/apache/incubator-airflow.git;, extras = ["s3",
> "slack", "kubernetes", "celery", "postgres", "mongo", "crypto"]}
> Flask-AppBuilder="==1.11.0"
>
> [requires]
> python_version = "3.6"
>
>
>
>
>
>
> On Thu, Oct 4, 2018 at 10:50 AM Ash Berlin-Taylor  wrote:
>
>> We've committed a fix for this to master and will include it in a 1.10.1
>> https://github.com/apache/incubator-airflow/commit/fb5ffd146a5a33820cfa7541e5ce09098f3d541a
>>
>>
>> For installing in the mea time pin `Flask-AppBuilder=1.11.0'
>>
>> > On 4 Oct 2018, at 00:41, Kyle Hamlin  wrote:
>> >
>> > Hi,
>> >
>> > Today I was trying to upgrade Airflow to 1.10.0 and it appears that
>> there
>> > are some version conflicts with click and flask-login. I uncovered these
>> > because I use Pipenv to manage our project's dependencies. You can see
>> here
>> > that Flask-AppBuilder pins click==6.7 and flask-login>=0.3,<0.5
>> >
>> >
>> https://github.com/dpgaspar/Flask-AppBuilder/blob/master/setup.py#L37-L47
>> >
>> > I'm able to force pipenv to install click==6.7 because that is not
>> pinned
>> > in Airflow's setup.py, but I can do nothing about flask-login because
>> > Airflow pins the flask-login version:
>> > https://github.com/apache/incubator-airflow/blob/master/setup.py#L304
>> >
>> > This prevents me from being able to upgrade to 1.10.0.
>> >
>> > *Pipenv's Graphed project dependencies (conflicts highlighted):*
>> >
>> > apache-airflow==1.10.0
>> >  - alembic [required: >=0.8.3,<0.9, installed: 0.8.10]
>> >- Mako [required: Any, installed: 1.0.7]
>> >  - MarkupSafe [required: >=0.9.2, installed: 1.0]
>> >- python-editor [required: >=0.3, installed: 1.0.3]
>> >- SQLAlchemy [required: >=0.7.6, installed: 1.2.12]
>> >  - bleach [required: ==2.1.2, installed: 2.1.2]
>> >- html5lib [required:
>> >>
>> =0.pre,!=1.0b8,!=1.0b7,!=1.0b6,!=1.0b5,!=1.0b4,!=1.0b3,!=1.0b2,!=1.0b1,
>> > installed: 1.0.1]
>> >  - six [required: >=1.9, installed: 1.11.0]
>> >  - webencodings [required: Any, installed: 0.5.1]
>> >- six [required: Any, installed: 1.11.0]
>> >  - configparser [required: >=3.5.0,<3.6.0, installed: 3.5.0]
>> >  - croniter [required: >=0.3.17,<0.4, installed: 0.3.25]
>> >- python-dateutil [required: Any, installed: 2.7.3]
>> >  - six [required: >=1.5, installed: 1.11.0]
>> >  - dill [required: >=0.2.2,<0.3, installed: 0.2.8.2]
>> >  - flask [required: >=0.12.4,<0.13, installed: 0.12.4]
>> >- click [required: >=2.0, installed: 7.0]
>> >- itsdangerous [required: >=0.21, installed: 0.24]
>> >- Jinja2 [required: >=2.4, installed: 2.8.1]
>> >  - MarkupSafe [required: Any, installed: 1.0]
>> >- Werkzeug [required: >=0.7, installed: 0.14.1]
>> >  - flask-admin [required: ==1.4.1, installed: 1.4.1]
>> >- Flask [required: >=0.7, installed: 0.12.4]
>> >  - click [required: >=2.0, installed: 7.0]
>> >  - itsdangerous [required: >=0.21, installed: 0.24]
>> >  - Jinja2 [required: >=2.4, installed: 2.8.1]
>> >- MarkupSafe [required: Any, installed: 1.0]
>> >  - Werkzeug [required: >=0.7, installed: 0.14.1]
>> >- wtforms [required: Any, installed: 2.2.1]
>> >  - flask-appbuilder [required: >=1.11.1,<2.0.0, installed: 1.12.0]
>> >- click [required: ==6.7, installed: 7.0]
>> >- colorama [required: ==0.3.9, installed: 0.3.9]
>> >- Flask [required: >=0.10.0,<0.12.99, installed: 0.12.4]
>> >  - click [required: >=2.0, installed: 7.0]
>> >  - itsdangerous [required: >=0.21, installed: 0.24]
>>

Flask-AppBuilder has pinned versions of Click & Flask-Login in 1.10.0

2018-10-03 Thread Kyle Hamlin
  - Flask [required: Any, installed: 0.12.4]
- click [required: >=2.0, installed: 7.0]
- itsdangerous [required: >=0.21, installed: 0.24]
- Jinja2 [required: >=2.4, installed: 2.8.1]
  - MarkupSafe [required: Any, installed: 1.0]
- Werkzeug [required: >=0.7, installed: 0.14.1]
  - WTForms [required: Any, installed: 2.2.1]
- python-dateutil [required: >=2.3,<3, installed: 2.7.3]
  - six [required: >=1.5, installed: 1.11.0]
  - flask-caching [required: >=1.3.3,<1.4.0, installed: 1.3.3]
- Flask [required: Any, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
- Werkzeug [required: >=0.12, installed: 0.14.1]
  - flask-login [required: ==0.2.11, installed: 0.2.11]
- Flask [required: Any, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
  - flask-swagger [required: ==0.2.13, installed: 0.2.13]
- Flask [required: >=0.10, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
- PyYAML [required: >=3.0, installed: 3.13]
  - flask-wtf [required: >=0.14.2,<0.15, installed: 0.14.2]
- Flask [required: Any, installed: 0.12.4]
  - click [required: >=2.0, installed: 7.0]
  - itsdangerous [required: >=0.21, installed: 0.24]
  - Jinja2 [required: >=2.4, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - Werkzeug [required: >=0.7, installed: 0.14.1]
- WTForms [required: Any, installed: 2.2.1]
  - funcsigs [required: ==1.0.0, installed: 1.0.0]
- ordereddict [required: Any, installed: 1.1]
  - future [required: >=0.16.0,<0.17, installed: 0.16.0]
  - gitpython [required: >=2.0.2, installed: 2.1.11]
- gitdb2 [required: >=2.0.0, installed: 2.0.4]
  - smmap2 [required: >=2.0.0, installed: 2.0.4]
  - gunicorn [required: >=19.4.0,<20.0, installed: 19.9.0]
  - iso8601 [required: >=0.1.12, installed: 0.1.12]
  - jinja2 [required: >=2.7.3,<2.9.0, installed: 2.8.1]
- MarkupSafe [required: Any, installed: 1.0]
  - lxml [required: >=3.6.0,<4.0, installed: 3.8.0]
  - markdown [required: >=2.5.2,<3.0, installed: 2.6.11]
  - pandas [required: >=0.17.1,<1.0.0, installed: 0.23.4]
- numpy [required: >=1.9.0, installed: 1.15.2]
- python-dateutil [required: >=2.5.0, installed: 2.7.3]
  - six [required: >=1.5, installed: 1.11.0]
- pytz [required: >=2011k, installed: 2018.5]
  - pendulum [required: ==1.4.4, installed: 1.4.4]
- python-dateutil [required: >=2.6.0.0,<3.0.0.0, installed: 2.7.3]
  - six [required: >=1.5, installed: 1.11.0]
- pytzdata [required: >=2018.3.0.0, installed: 2018.5]
- tzlocal [required: >=1.5.0.0,<2.0.0.0, installed: 1.5.1]
  - pytz [required: Any, installed: 2018.5]
  - psutil [required: >=4.2.0,<5.0.0, installed: 4.4.2]
  - pygments [required: >=2.0.1,<3.0, installed: 2.2.0]
  - python-daemon [required: >=2.1.1,<2.2, installed: 2.1.2]
- docutils [required: Any, installed: 0.14]
- lockfile [required: >=0.10, installed: 0.12.2]
- setuptools [required: Any, installed: 40.4.3]
  - python-dateutil [required: >=2.3,<3, installed: 2.7.3]
- six [required: >=1.5, installed: 1.11.0]
  - python-nvd3 [required: ==0.15.0, installed: 0.15.0]
- Jinja2 [required: >=2.8, installed: 2.8.1]
  - MarkupSafe [required: Any, installed: 1.0]
- python-slugify [required: >=1.2.5, installed: 1.2.6]
  - Unidecode [required: >=0.04.16, installed: 1.0.22]
  - requests [required: >=2.5.1,<3, installed: 2.19.1]
- certifi [required: >=2017.4.17, installed: 2018.8.24]
- chardet [required: >=3.0.2,<3.1.0, installed: 3.0.4]
- idna [required: >=2.5,<2.8, installed: 2.7]
- urllib3 [required: >=1.21.1,<1.24, installed: 1.23]
  - setproctitle [required: >=1.1.8,<2, installed: 1.1.10]
  - sqlalchemy [required: >=1.1.15,<1.2.0, installed: 1.2.12]
  - tabulate [required: >=0.7.5,<0.8.0, installed: 0.7.7]
  - tenacity [required: ==4.8.0, installed: 4.8.0]
- monotonic [required: >=0.6, installed: 1.5]
- six [required: >=1.9.0, installed: 1.11.0]
  - thrift [required: >=0.9.2, installed: 0.11.0]
- six [required: >=1.7.2, installed: 1.11.0]
  - tzlocal [required: >=1.4, installed: 1.5.1]
- pytz [required: Any, installed: 2018.5]
  - unicodecsv [required: >=0.14.1, installed: 0.14.1]
  - werkzeug [required: >=0.14.1,<0.15.0, installed: 0.14.1]
  - zope.deprecation [required: >=4.0,<5.0, installed: 4.3.0]
- setuptools [required: Any, installed: 40.4.3]

--
Kyle Hamlin


Re: S3 logging not working with docker

2018-09-16 Thread Kyle Hamlin
Are you using the DockerOperator? If so, the issue is here in the
create_contianer method:

https://github.com/apache/incubator-airflow/blob/master/airflow/operators/docker_operator.py#L207

create_container takes a host_config:

https://docker-py.readthedocs.io/en/1.2.3/api/#create_container

That host_config takes a log_config:

https://docker-py.readthedocs.io/en/1.2.3/hostconfig/

The log_config needs to be set to: {'type': 'json-file'}

So the create_contianer method should look like so:

self.container = self.cli.create_container(
command=self.get_command(),
cpu_shares=cpu_shares,
environment=self.environment,
host_config=self.cli.create_host_config(
binds=self.volumes,
network_mode=self.network_mode,
shm_size=self.shm_size,
log_config={'type': 'json-file'}),
image=image,
mem_limit=self.mem_limit,
user=self.user,
working_dir=self.working_dir)

Unfortunately, log_config is not a param in the DockerOperator so you'll
need to make a pull request to fix it.

On Sun, Sep 16, 2018 at 2:12 AM Bhavani Ramasamy 
wrote:

> Hello Team,
> I am trying to setup S3 logging with docker & CeleryExecutor. Files are not
> written to S3. I have configured in airflow.cfg like below,
>
> remote_logging = True
>
> remote_log_conn_id = s3_connection_mine
>
> remote_base_log_folder = s3:// mybucket/airflow/logs/
>
>
> I have tried with *logging_config_class* as empty as well as custom
> log_config.py using airflow_local_settings.py file. It also doesnt work.
> Can you please help me.
>
>
> Thanks,
>
> Bhavani
>


-- 
Kyle Hamlin


Re: Will redeploying webserver and scheduler in Kubernetes cluster kill running tasks

2018-08-30 Thread Kyle Hamlin
Thanks for your responses! Glad to hear that tasks can run independently if
something happens.

On Thu, Aug 30, 2018 at 1:13 PM Eamon Keane  wrote:

> Adding to Greg's point, if you're using the k8s executor and for some
> reason the k8s executor worker pod fails to launch within 120 seconds (e.g.
> pending due to scaling up a new node), this counts as a task failure. Also,
> if the k8s executor pod has already launched a pod operator but is killed
> (e.g. manually or due to node upgrade), the  pod operator it launched is
> not killed and runs to completion so if using retries, you need to ensure
> idempotency. The worker pods update the db per my understanding, with each
> requiring a separate connection to the db, this can tax your connection
> budget (100-300 for small postgres instances on gcp or aws).
>
> On Thu, Aug 30, 2018 at 6:04 PM Greg Neiheisel  wrote:
>
> > Hey Kyle, the task pods will continue to run even if you reboot the
> > scheduler and webserver and the status does get updated in the airflow
> db,
> > which is great.
> >
> > I know the scheduler subscribes to the Kubernetes watch API to get an
> event
> > stream of pods completing and it keeps a checkpoint so it can resubscribe
> > when it comes back up.
> >
> > I forget if the worker pods update the db or if the scheduler is doing
> > that, but it should work out.
> >
> > On Thu, Aug 30, 2018, 9:54 AM Kyle Hamlin  wrote:
> >
> > > gentle bump
> > >
> > > On Wed, Aug 22, 2018 at 5:12 PM Kyle Hamlin 
> wrote:
> > >
> > > > I'm about to make the switch to Kubernetes with Airflow, but am
> > wondering
> > > > what happens when my CI/CD pipeline redeploys the webserver and
> > scheduler
> > > > and there are still long-running tasks (pods). My intuition is that
> > since
> > > > the database hold all state and the tasks are in charge of updating
> > their
> > > > own state, and the UI only renders what it sees in the database that
> > this
> > > > is not so much of a problem. To be sure, however, here are my
> > questions:
> > > >
> > > > Will task pods continue to run?
> > > > Can task pods continue to poll the external system they are running
> > tasks
> > > > on while being "headless"?
> > > > Can the tasks pods change/update state in the database while being
> > > > "headless"?
> > > > Will the UI/Scheduler still be aware of the tasks (pods) once they
> are
> > > > live again?
> > > >
> > > > Is there anything else the might cause issues when deploying while
> > tasks
> > > > (pods) are running that I'm not thinking of here?
> > > >
> > > > Kyle Hamlin
> > > >
> > >
> > >
> > > --
> > > Kyle Hamlin
> > >
> >
>


-- 
Kyle Hamlin


Re: Will redeploying webserver and scheduler in Kubernetes cluster kill running tasks

2018-08-30 Thread Kyle Hamlin
gentle bump

On Wed, Aug 22, 2018 at 5:12 PM Kyle Hamlin  wrote:

> I'm about to make the switch to Kubernetes with Airflow, but am wondering
> what happens when my CI/CD pipeline redeploys the webserver and scheduler
> and there are still long-running tasks (pods). My intuition is that since
> the database hold all state and the tasks are in charge of updating their
> own state, and the UI only renders what it sees in the database that this
> is not so much of a problem. To be sure, however, here are my questions:
>
> Will task pods continue to run?
> Can task pods continue to poll the external system they are running tasks
> on while being "headless"?
> Can the tasks pods change/update state in the database while being
> "headless"?
> Will the UI/Scheduler still be aware of the tasks (pods) once they are
> live again?
>
> Is there anything else the might cause issues when deploying while tasks
> (pods) are running that I'm not thinking of here?
>
> Kyle Hamlin
>


-- 
Kyle Hamlin


Will redeploying webserver and scheduler in Kubernetes cluster kill running tasks

2018-08-22 Thread Kyle Hamlin
I'm about to make the switch to Kubernetes with Airflow, but am wondering
what happens when my CI/CD pipeline redeploys the webserver and scheduler
and there are still long-running tasks (pods). My intuition is that since
the database hold all state and the tasks are in charge of updating their
own state, and the UI only renders what it sees in the database that this
is not so much of a problem. To be sure, however, here are my questions:

Will task pods continue to run?
Can task pods continue to poll the external system they are running tasks
on while being "headless"?
Can the tasks pods change/update state in the database while being
"headless"?
Will the UI/Scheduler still be aware of the tasks (pods) once they are live
again?

Is there anything else the might cause issues when deploying while tasks
(pods) are running that I'm not thinking of here?

Kyle Hamlin


Re: SparkSubmitOperator with Azure HDInsight

2018-06-22 Thread Kyle Hamlin
I haven’t used any Azure products but I did build a Livy hook and operator
so I could submit concurrent spark jobs to EMR clusters. I was planning on
contributing the code, but it’s kinda a pain haha. If your interested I can
take another stab at getting the Livy hook and operator contributed.

On Fri, Jun 22, 2018 at 8:18 AM Naik Kaxil  wrote:

> Hi all,
>
>
>
> Has anyone used the SparkSubmitOperator to submit Spark jobs on Azure
> HDInsight cluster? Are you using Livy or spark-submit to run remote Spark
> jobs?
>
>
>
> Regards,
>
> Kaxil
>
>
> Kaxil Naik
>
> Data Reply
> 2nd Floor, Nova South
> 160 Victoria Street, Westminster
> <https://maps.google.com/?q=160+Victoria+Street,+Westminster+%0D%0ALondon+SW1E+5LB+-+UK=gmail=g>
> London SW1E 5LB - UK
> <https://maps.google.com/?q=160+Victoria+Street,+Westminster+%0D%0ALondon+SW1E+5LB+-+UK=gmail=g>
> phone: +44 (0)20 7730 6000
> k.n...@reply.com
> www.reply.com
>
> [image: Data Reply]
>
-- 
Kyle Hamlin


Re: Airflow cli to remote host

2018-05-23 Thread Kyle Hamlin
I'd suggest using something like Fabric <http://www.fabfile.org/> for this.
This is is how I accomplish the same task.

On Wed, May 23, 2018 at 2:19 PM Frank Maritato <fmarit...@opentable.com>
wrote:

> Hi All,
>
> I need to be able to run backfill for my jobs against our production
> airflow server. Is there a way to run
>
> airflow backfill job_name -s 2018-05-01
>
> against a remote server? I didn’t see a -h option to specify a hostname.
>
> If not, is there a way through the ui to do this? I'd rather not have to
> ssh into the production server to run these jobs.
>
> Thanks!
> --
> Frank Maritato
>
>

-- 
Kyle Hamlin


Re: Use KubernetesExecutor to launch tasks into a Dask cluster in Kubernetes

2018-04-29 Thread Kyle Hamlin
Hi Fokko,

So its always been my intention to use the KubernetesExecutor. What I'm
trying to figure out is how to pair the KubernetesExecutor with a
Dask cluster, since Dask clusters have many optimizations for ML type tasks.

On Sat, Apr 28, 2018 at 2:29 PM Driesprong, Fokko <fo...@driesprong.frl>
wrote:

> Also one of the main benefits of the Kubernetes Executor is having a Docker
> image that contains all the dependencies that you need for your job.
> Personally I would switch to Kubernetes when it leaves the experimental
> stage.
>
> Cheers, Fokko
>
> 2018-04-28 16:27 GMT+02:00 Kyle Hamlin <hamlin...@gmail.com>:
>
> > I don't have a Dask cluster yet, but I'm interested in taking advantage
> of
> > it for ML tasks. My use case would be bursting a lot of ML jobs into a
> > Dask cluster all at once.
> > From what I understand, Dask clusters utilize caching to help speed up
> jobs
> > so I don't know if it makes sense to launch a Dask cluster for every
> single
> > ML job. Conceivably, I could just have a single Dask worker running 24/7
> > and when its time to burst k8 could autoscale the Dask workers as more ML
> > jobs are launched into the Dask cluster?
> >
> > On Fri, Apr 27, 2018 at 10:35 PM Daniel Imberman <
> > daniel.imber...@gmail.com>
> > wrote:
> >
> > > Hi Kyle,
> > >
> > > So you have a static Dask cluster running your k8s cluster? Is there
> any
> > > reason you wouldn't just launch the Dask cluster for the job you're
> > running
> > > and then tear it down? I feel like with k8s the elasticity is one of
> the
> > > main benefits.
> > >
> > > On Fri, Apr 27, 2018 at 12:32 PM Kyle Hamlin <hamlin...@gmail.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > If I have a Kubernetes cluster running in DCOC and a Dask cluster
> > running
> > > > in that same Kubernetes cluster is it possible/does it makes sense to
> > use
> > > > the KubernetesExecutor to launch tasks into the Dask cluster (these
> are
> > > ML
> > > > jobs with sklearn)? I feel like there is a bit of inception going on
> > here
> > > > in my mind and I just want to make sure a setup like this makes
> sense?
> > > > Thanks in advance for anyone's input!
> > > >
> > >
> >
> >
> > --
> > Kyle Hamlin
> >
>


-- 
Kyle Hamlin


Re: Use KubernetesExecutor to launch tasks into a Dask cluster in Kubernetes

2018-04-28 Thread Kyle Hamlin
I don't have a Dask cluster yet, but I'm interested in taking advantage of
it for ML tasks. My use case would be bursting a lot of ML jobs into a
Dask cluster all at once.
>From what I understand, Dask clusters utilize caching to help speed up jobs
so I don't know if it makes sense to launch a Dask cluster for every single
ML job. Conceivably, I could just have a single Dask worker running 24/7
and when its time to burst k8 could autoscale the Dask workers as more ML
jobs are launched into the Dask cluster?

On Fri, Apr 27, 2018 at 10:35 PM Daniel Imberman <daniel.imber...@gmail.com>
wrote:

> Hi Kyle,
>
> So you have a static Dask cluster running your k8s cluster? Is there any
> reason you wouldn't just launch the Dask cluster for the job you're running
> and then tear it down? I feel like with k8s the elasticity is one of the
> main benefits.
>
> On Fri, Apr 27, 2018 at 12:32 PM Kyle Hamlin <hamlin...@gmail.com> wrote:
>
> > Hi all,
> >
> > If I have a Kubernetes cluster running in DCOC and a Dask cluster running
> > in that same Kubernetes cluster is it possible/does it makes sense to use
> > the KubernetesExecutor to launch tasks into the Dask cluster (these are
> ML
> > jobs with sklearn)? I feel like there is a bit of inception going on here
> > in my mind and I just want to make sure a setup like this makes sense?
> > Thanks in advance for anyone's input!
> >
>


-- 
Kyle Hamlin


Use KubernetesExecutor to launch tasks into a Dask cluster in Kubernetes

2018-04-27 Thread Kyle Hamlin
Hi all,

If I have a Kubernetes cluster running in DCOC and a Dask cluster running
in that same Kubernetes cluster is it possible/does it makes sense to use
the KubernetesExecutor to launch tasks into the Dask cluster (these are ML
jobs with sklearn)? I feel like there is a bit of inception going on here
in my mind and I just want to make sure a setup like this makes sense?
Thanks in advance for anyone's input!


Bit confused about start_date and schedule_interval related to daily/weekly DAG

2018-04-18 Thread Kyle Hamlin
I'm a bit confused with how the scheduler catches up in relation to
start_date and schedule_interval. I have one dag that runs hourly:

dag = DAG(
dag_id='hourly_dag',
start_date=days_ago(1),
schedule_interval='@hourly',
default_args=ARGS)

When I start this DAG fresh it will catch up 24 hours + however many hours
have passed in the current day all the way up to the most recent hour. This
makes sense to me.

Now if I have a daily DAG:

dag = DAG(
dag_id='daily_dag',
start_date=days_ago(1),
schedule_interval='0 5 * * *',
default_args=ARGS)

Starting this DAG fresh will run yesterday's execution. This is fine since
I use the execution_date (ds_nodash) to have the task be lagged by one day.
What I can't seem to wrap my head around is how I would get this DAG to run
for the current day. I've tried passing is days_ago(0) but the tasks never
seem to start?

In addition to all that, I have a weekly DAG that must also use the
execution_date, but it needs the current weeks execution_date.

*How do I get a DAG that is not hourly to have an execution_date of the
current day or week?*


Trouble with remote s3 logging

2018-04-16 Thread Kyle Hamlin
This morning I tried to upgrade to the newer version of the logging config
file but I keep getting the following a TypeError for my database session.
I know my credentials are correct so I'm confused why this is happening now.

Has anyone experiences this? Note that I'm installing Airflow from master.

*Config*
AIRFLOW__CORE__LOGGING_LEVEL=WARN
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_logger
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://airflow/logs
AIRFLOW__CORE__LOGGING_CONFIG_CLASS=config.log_config.DEFAULT_LOGGING_CONFIG
AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=s3://airflow/logs

*Session NoneType error*
 Traceback (most recent call last):
   File
"/usr/local/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py",
line 171, in s3_write
 encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
   File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py",
line 274, in load_string
 encrypt=encrypt)
   File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py",
line 313, in load_bytes
 client = self.get_conn()
   File "/usr/local/lib/python3.6/site-packages/airflow/hooks/S3_hook.py",
line 34, in get_conn
 return self.get_client_type('s3')
   File
"/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py",
line 151, in get_client_type
 session, endpoint_url = self._get_credentials(region_name)
   File
"/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/aws_hook.py",
line 97, in _get_credentials
 connection_object = self.get_connection(self.aws_conn_id)
   File
"/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", line
82, in get_connection
 conn = random.choice(cls.get_connections(conn_id))
   File
"/usr/local/lib/python3.6/site-packages/airflow/hooks/base_hook.py", line
77, in get_connections
 conns = cls._get_connections_from_db(conn_id)
   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line
72, in wrapper
 with create_session() as session:
   File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
 return next(self.gen)
   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line
41, in create_session
 session = settings.Session()
 TypeError: 'NoneType' object is not callable

*TypeError must be str not tuple*
 [2018-04-16 18:37:28,200] ERROR in app: Exception on
/admin/airflow/get_logs_with_metadata [GET]
 Traceback (most recent call last):
   File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982,
in wsgi_app
 response = self.full_dispatch_request()
   File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614,
in full_dispatch_request
 rv = self.handle_user_exception(e)
   File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517,
in handle_user_exception
 reraise(exc_type, exc_value, tb)
   File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33,
in reraise
 raise value
   File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612,
in full_dispatch_request
 rv = self.dispatch_request()
   File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598,
in dispatch_request
 return self.view_functions[rule.endpoint](**req.view_args)
   File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line
69, in inner
 return self._run_view(f, *args, **kwargs)
   File "/usr/local/lib/python3.6/site-packages/flask_admin/base.py", line
368, in _run_view
 return fn(self, *args, **kwargs)
   File "/usr/local/lib/python3.6/site-packages/flask_login.py", line 755,
in decorated_view
 return func(*args, **kwargs)
   File "/usr/local/lib/python3.6/site-packages/airflow/www/utils.py", line
269, in wrapper
 return f(*args, **kwargs)
   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line
74, in wrapper
 return func(*args, **kwargs)
   File "/usr/local/lib/python3.6/site-packages/airflow/www/views.py", line
770, in get_logs_with_metadata
 logs, metadatas = handler.read(ti, try_number, metadata=metadata)
   File
"/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py",
line 165, in read
 logs[i] += log
 TypeError: must be str, not tuple


Re: What are the advantages of plugins, not sure I see any?

2018-03-30 Thread Kyle Hamlin
Thanks for the responses! I think my conclusion was similar, they seem good
for redistribution, but if you're only working with operators and hooks and
aren't sharing that code then it might not make too much sense to use them.

On Fri, Mar 30, 2018 at 4:23 PM Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> The original intent was to use plugins as a way to share sets of objects
> and applications build on top of Airflow.
>
> For instance it'd be possible to ship the things listed bellow as Airflow
> plugin:
>
> * "validate-and-schedule my query" UI
> * a set of ML-related hooks and operators that match a specific workflow
> * a Hive metastore-browser UI
> * drag and drop UI
> * ...
>
> As far as I know it's not a super popular feature. Maybe the scope of
> Airflow is already large enough without having all that stuff sitting on
> top of it. :)
>
> As George pointed out it could also allow to accelerate the release cadence
> of sets of things that are currently inside Airflow. Things like Google
> Cloud-related operators and hooks could ship as a plugin on their own
> release schedule.
>
> Max
>
> On Thu, Mar 29, 2018 at 11:07 PM, Alex Tronchin-James 949-412-7220
> <(949)%20412-7220> <
> alex.n.ja...@gmail.com> wrote:
>
> > At Netflix we've put our plugin inside the DAGs folder and pointed the
> > config to it there so we can both import directly in DAGs AND update the
> > plugin as we go. This makes it easy to test changes to operators needed
> for
> > ongoing DAG development in the same PR.
> >
> > The two plugin features I've used which don't translate to the direct
> > import approach are custom macros (we provide some internal libraries)
> and
> > UI menu links, which we use for linking local docs describing our
> > deployment and custom operators, server/worker monitoring with atlas, and
> > genie job monitoring.
> >
> > On Thu, Mar 29, 2018 at 4:56 PM George Leslie-Waksman
> > <geo...@cloverhealth.com.invalid> wrote:
> >
> > > It's presumably useful if you want to package your plugins for other
> > people
> > > to use but it seems like everyone just adds those directly to the
> Airflow
> > > codebase these days.
> > >
> > > On Thu, Mar 29, 2018 at 4:27 PM Kyle Hamlin <hamlin...@gmail.com>
> wrote:
> > >
> > > > Yeah so far I have only written hooks and operators so maybe the
> > benefit
> > > > only  kicks in for other airflow abstractions.
> > > >
> > > > > On Mar 29, 2018, at 7:15 PM, George Leslie-Waksman <
> > > > geo...@cloverhealth.com.INVALID> wrote:
> > > > >
> > > > > We also import our operators and sensors directly.
> > > > >
> > > > > However, executors and some other pieces are a little bit harder to
> > > deal
> > > > > with as non-plugins
> > > > >
> > > > >> On Thu, Mar 29, 2018 at 3:56 PM Kyle Hamlin <hamlin...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> Hello,
> > > > >>
> > > > >> I just got done writing a few plugins, and the process has left me
> > > > >> wondering what the real benefits are? As far as I can tell, it
> makes
> > > > >> testing more difficult since you cannot import from the created
> > > module,
> > > > you
> > > > >> have to import directly from the plugin. Additionally, your code
> > > editor
> > > > >> isn't aware of these new plugin modules since they are created
> when
> > > you
> > > > >> start the app up, this makes it seem like there are errors when
> > there
> > > > >> aren't. Why not just create a lib/ dir with hooks, operators etc..
> > > dirs
> > > > >> inside and be done with it? Very curious what peoples thoughts
> are,
> > > who
> > > > >> knows I could be testing wrong or writing the plugins wrong.
> Thanks
> > in
> > > > >> advance!
> > > > >>
> > > >
> > >
> >
>


Re: What are the advantages of plugins, not sure I see any?

2018-03-29 Thread Kyle Hamlin
Yeah so far I have only written hooks and operators so maybe the benefit only  
kicks in for other airflow abstractions.

> On Mar 29, 2018, at 7:15 PM, George Leslie-Waksman 
> <geo...@cloverhealth.com.INVALID> wrote:
> 
> We also import our operators and sensors directly.
> 
> However, executors and some other pieces are a little bit harder to deal
> with as non-plugins
> 
>> On Thu, Mar 29, 2018 at 3:56 PM Kyle Hamlin <hamlin...@gmail.com> wrote:
>> 
>> Hello,
>> 
>> I just got done writing a few plugins, and the process has left me
>> wondering what the real benefits are? As far as I can tell, it makes
>> testing more difficult since you cannot import from the created module, you
>> have to import directly from the plugin. Additionally, your code editor
>> isn't aware of these new plugin modules since they are created when you
>> start the app up, this makes it seem like there are errors when there
>> aren't. Why not just create a lib/ dir with hooks, operators etc.. dirs
>> inside and be done with it? Very curious what peoples thoughts are, who
>> knows I could be testing wrong or writing the plugins wrong. Thanks in
>> advance!
>> 


What are the advantages of plugins, not sure I see any?

2018-03-29 Thread Kyle Hamlin
Hello,

I just got done writing a few plugins, and the process has left me
wondering what the real benefits are? As far as I can tell, it makes
testing more difficult since you cannot import from the created module, you
have to import directly from the plugin. Additionally, your code editor
isn't aware of these new plugin modules since they are created when you
start the app up, this makes it seem like there are errors when there
aren't. Why not just create a lib/ dir with hooks, operators etc.. dirs
inside and be done with it? Very curious what peoples thoughts are, who
knows I could be testing wrong or writing the plugins wrong. Thanks in
advance!


Re: Submitting 1000+ tasks to airflow programatically

2018-03-22 Thread Kyle Hamlin
@Chris @Taylor
Thank you guy very much for your explanations! Your strategy makes a lot of
sense to me. Generating a dag for each client I'm going to have a ton of
dags on the front page but at least that is searchable haha. I'm going to
give this implementation a shot and I'll try to report back with the
outcome.

Can anyone comment on future work to support data science workflows like
these, or is Airflow fundamentally the wrong tool?

On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <tedmis...@gmail.com>
wrote:

> We're not using SubDagOperator.  Our approach is using 1 DAG file to
> generate a separate DAG class instance for each similar config, which gets
> hoisted into global namespace.  In simplified pseudo-Python, it looks like:
>
> # sources --> {'configs': [{...}, {...}], 'expire': ''}
> cache = Variable.get('sources', default_var={}, deserialize_json=True)
> sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> cache['configs']
> for source in sources:
>   dag = DAG(...)
>   globals()[source._id] = dag
>   # ...create tasks and set dependencies for each DAG (some config pulled
> from source object for each)...
>
> We added the cache part for the same reason you pointed out, because the
> DAG processing loop was hitting the API a lot.  Btw, you can also turn down
> how much the processing loop runs with scheduler_heartbeat_sec under the
> scheduler group in config.
>
> We also considered the route Chris mentioned of updating cache via a
> separate DAG but weren't crazy about having a DAG scheduled once per
> minute.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dcapw...@gmail.com> wrote:
>
> > For us we compile down to Python rather than do the logic in Python, that
> > makes it so the load doesn't do real work.
> >
> > We have our own DSL that is just a simplified compiler; parse, analyze,
> > optimize, code gen.  In code gen we just generate the Python code.  Our
> > build then packages it up and have airflow fetch it (very hacky fetch
> right
> > now)
> >
> > This does make it so loading is simple and fast, but means you can't use
> > the Python api directly
> >
> > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <andrewm4...@gmail.com>
> > wrote:
> >
> > > I've had similar issues with large dags being slow to render on ui and
> > > crashing chrome.
> > >
> > > I got around it by changing the default tree view from 25 to just 5.
> > >
> > > Involves a couple changes to source files though, would be great if
> some
> > of
> > > the ui defaults could go into airflow.cfg.
> > >
> > > https://stackoverflow.com/a/48665734/1919374
> > >
> > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfe...@gmail.com> wrote:
> > >
> > > > @Kyle, I do something similar and have run into the problems you've
> > > > mentioned. In my case, I access data from S3 and then generate
> separate
> > > > DAGs (of different structures) based on the data that's pulled. I've
> > > > also found that the UI for accessing a single large DAG is slow so I
> > > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > > that's responsible for accessing your API and caching the client IDs
> > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > Variable. You can run this DAG on whatever schedule is appropriate
> for
> > > > you. From there, build a function that creates a DAG and then for
> each
> > > > client ID, register a DAG built by that function to the global
> context.
> > > > Like this:
> > > > def create_client_dag(client_id):
> > > > # build dag here
> > > >
> > > > def get_client_ids_locally():
> > > > # access the data that was pulled from the API
> > > >
> > > > client_ids = get_client_ids_locally()
> > > > for client in client_ids:
> > > > dag = create_client_dag(client)
> > > > globals()[dag.dag_id] = dag
> > > >
> > > > This approach also handles removing client IDs somewhat gracefully.
> > DAGs
> > > > for removed clients will still appear in the UI (you can build a
> > > > maintenance DA

Re: Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Kyle Hamlin
Thanks for all the responses let me try to address the main themes.

@Ace @Nicholas @Taylor
I originally started with a loop over my list of client ids and created a
SparkSubmitOperator for each client. The pseudo code would look something
like this:

dag = DAG(...)

client_ids = get_client_ids()
for client_id in client_ids:
SparkSubmitOperator(
...
dag=dag
)

I found that this approach kind of clunky for a few reasons. First, the
get_cleint_ids() function was hitting our API every time the dag was read
by the scheduler which seemed excessive (every 30 seconds or so?). Second,
it seemed like when a single task failure made marked the whole dag as a
failure, but I guess retrying till the task worked could solve this? Third,
the UI gets really clunky and slow, basically unusable when it tries to
render the graph view for that many tasks. Finally, Airflow doesn't seem
very happy when client_ids are removed i.e. the get_client_ids() no longer
returns a specific client_id, it really seems to want a static dag.

Do I really have to poll and API or database every 30 seconds for this
dynamic client_id data?

@Ace
I have been limiting concurrency so as to not blast the cluster

@Nicholas
Thank you for the noise suggestion I will definitely implement that if I
continue with the same methodology

@Taylor
Are you using a SubDagOperator? Or is your process similar to the
pseudo code I wrote above?


On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston <tedmis...@gmail.com> wrote:

> We also use a similar approach to generate dynamic DAGs based on a common
> template DAG file.  We pull in the list of config objects, one per DAG,
> from an internal API lightly wrapping the database, then we cache that
> response in a Airflow Variable that gets updated once a minute.  The
> dynamic DAGs are generated from that variable.
>
> *Taylor Edmiston*
> TEdmiston.com <https://www.tedmiston.com/> | Blog
> <http://blog.tedmiston.com>
> Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor>
>
>
> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> nicolas.ki...@weightwatchers.com> wrote:
>
> > Kyle,
> >
> > We have a similar approach but on a much, much smaller scale. We now have
> > <100 “things to process” but expect it to grow to under ~200.  Each
> “thing
> > to process” has the same workflow so we have a single DAG definition that
> > does about 20 tasks per, then we loop over the list of items and produce
> a
> > dag object for each one adding it to the global definition.
> >
> > One of the things we quickly ran into was crushing the scheduler as
> > everything was running with the same start time.  To get around this we
> add
> > noise to the start time minute and seconds. Simply index % 60.  This
> > spreads out the load so that the scheduler isn’t trying to run everything
> > at the exact same moment.  I would suggest if you do go this route, to
> also
> > stagger your hours if you can because of how many you plan to run.
> Perhaps
> > your DAGs are smaller and aren’t as CPU intensive as ours.
> >
> > On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm currently using Airflow for some ETL tasks where I submit a spark
> > job
> > to a cluster and poll till it is complete. This workflow is nice
> > because it
> > is typically a single Dag. I'm now starting to do more machine
> learning
> > tasks and need to build a model per client which is 1000+ clients. My
> > spark cluster is capable of handling this workload, however, it
> doesn't
> > seem scalable to write 1000+ dags to fit models for each client. I
> want
> > each client to have its own task instance so it can be retried if it
> > fails without having to run all 1000+ tasks over again. How do I
> handle
> > this type of workflow in Airflow?
> >
> >
> >
>


Submitting 1000+ tasks to airflow programatically

2018-03-21 Thread Kyle Hamlin
Hello,

I'm currently using Airflow for some ETL tasks where I submit a spark job
to a cluster and poll till it is complete. This workflow is nice because it
is typically a single Dag. I'm now starting to do more machine learning
tasks and need to build a model per client which is 1000+ clients. My
spark cluster is capable of handling this workload, however, it doesn't
seem scalable to write 1000+ dags to fit models for each client. I want
each client to have its own task instance so it can be retried if it
fails without having to run all 1000+ tasks over again. How do I handle
this type of workflow in Airflow?