Re: Xcom_push is broken - probably after the timezone awareness

2017-12-06 Thread Boris Tyukin
:) you rock as always Bolke!!

On Wed, Dec 6, 2017 at 11:17 AM, Bolke de Bruin  wrote:

> 1. Naive dates are supported for initalization of DAGs and will be, after
> the patch of Sumit, for directly instantiated TaskInstances. XCom can store
> arbitrary information so it is not affected.
> 2. Any arithmetic you do on dates will need to happen with a timezone
> associated with it. This rare, you normally do date + datetime. However if
> you want to compare dates you will need to make it timezone aware.
> 3. Timezone support will not be in 1.9.0
> 4. Logging.info will be supported in 1.9.0 final
>
>
> So no reason to panic I would say ;-)
>
> Bolke
>
> Verstuurd vanaf mijn iPad
>
> > Op 6 dec. 2017 om 16:35 heeft Boris Tyukin  het
> volgende geschreven:
> >
> > hm...can you elaborate why we cannot use  "from datetime import
> datetime".
> > it worked before just fine :) i was under an impression that any pickle
> > serializable type can be using with xcom.
> >
> > Along with a recent change in logs (logging.info does not write to log
> > files anymore), this makes upgrade to 1.9 quite difficult.
> >
> >> On Wed, Dec 6, 2017 at 9:19 AM, Bolke de Bruin 
> wrote:
> >>
> >> You should not use a naive datetime as it mentioned. So instead of of
> >> using “from datetime import datetime” use “from airflow.utils.timezone
> >> import datetime”. This sets the timezone information that is required to
> >> the default configured in airflow.cfg.
> >>
> >> A patch that sets the default timezone for TaskInstance is also helpful
> in
> >> this case and will be welcomed. How to do it you can find in the DAG
> class.
> >>
> >> Bolke.
> >>
> >>> On 6 Dec 2017, at 15:13, Sumit Maheshwari 
> >> wrote:
> >>>
> >>> Folks,
> >>>
> >>> I've some testes, which used to pass earlier, but recently they started
> >> failing. When I looked closely it appeared that xcom_push is behaving
> >> differently.
> >>>
> >>>
> >>> So this is is the current code, which throws the error mentioned below
> >> to it
> >>> DEFAULT_DATE = datetime(2017, 1, 1)
> >>> ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
> >>> ti.xcom_push('abcd', 12345)
> >>>
> >>>  File "build/bdist.macosx-10.11-intel/egg/sqlalchemy_utc.py", line 31,
> >> in process_bind_param
> >>>raise ValueError('naive datetime is disallowed')
> >>> StatementError: (exceptions.ValueError) naive datetime is disallowed
> >> [SQL: u'DELETE FROM xcom WHERE xcom."key" = ? AND xcom.execution_date =
> ?
> >> AND xcom.task_id = ? AND xcom.dag_id = ?'] [parameters: [{}]]
> >>>
> >>> When I changed the execution date param to a string, it fails with
> other
> >> error.
> >>> ti = TaskInstance(task=task, execution_date=DEFAULT_DATE.isoformat())
> >>> ti.xcom_push('abcd', 12345)
> >>>
> >>>  File "build/bdist.macosx-10.11-intel/egg/sqlalchemy_utc.py", line 29,
> >> in process_bind_param
> >>>repr(value))
> >>> StatementError: (exceptions.TypeError) expected datetime.datetime, not
> >> '2017-01-01T00:00:00' [SQL: u'DELETE FROM xcom WHERE xcom."key" = ? AND
> >> xcom.execution_date = ? AND xcom.task_id = ? AND xcom.dag_id = ?']
> >> [parameters: [{}]]
> >>>
> >>> Somehow I feel that it is linked with sqlalchemy_utc library, though I
> >> may be wrong and its just missing some timezone related settings.
> >>> Please let me know if anyone else has faced this issue as well.
> >>>
> >>> Thanks,
> >>> Sumit
> >>>
> >>>
> >>
> >>
>


Re: Xcom_push is broken - probably after the timezone awareness

2017-12-06 Thread Boris Tyukin
hm...can you elaborate why we cannot use  "from datetime import datetime".
it worked before just fine :) i was under an impression that any pickle
serializable type can be using with xcom.

Along with a recent change in logs (logging.info does not write to log
files anymore), this makes upgrade to 1.9 quite difficult.

On Wed, Dec 6, 2017 at 9:19 AM, Bolke de Bruin  wrote:

> You should not use a naive datetime as it mentioned. So instead of of
> using “from datetime import datetime” use “from airflow.utils.timezone
> import datetime”. This sets the timezone information that is required to
> the default configured in airflow.cfg.
>
> A patch that sets the default timezone for TaskInstance is also helpful in
> this case and will be welcomed. How to do it you can find in the DAG class.
>
> Bolke.
>
> > On 6 Dec 2017, at 15:13, Sumit Maheshwari 
> wrote:
> >
> > Folks,
> >
> > I've some testes, which used to pass earlier, but recently they started
> failing. When I looked closely it appeared that xcom_push is behaving
> differently.
> >
> >
> > So this is is the current code, which throws the error mentioned below
> to it
> > DEFAULT_DATE = datetime(2017, 1, 1)
> > ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
> > ti.xcom_push('abcd', 12345)
> >
> >   File "build/bdist.macosx-10.11-intel/egg/sqlalchemy_utc.py", line 31,
> in process_bind_param
> > raise ValueError('naive datetime is disallowed')
> > StatementError: (exceptions.ValueError) naive datetime is disallowed
> [SQL: u'DELETE FROM xcom WHERE xcom."key" = ? AND xcom.execution_date = ?
> AND xcom.task_id = ? AND xcom.dag_id = ?'] [parameters: [{}]]
> >
> > When I changed the execution date param to a string, it fails with other
> error.
> > ti = TaskInstance(task=task, execution_date=DEFAULT_DATE.isoformat())
> > ti.xcom_push('abcd', 12345)
> >
> >   File "build/bdist.macosx-10.11-intel/egg/sqlalchemy_utc.py", line 29,
> in process_bind_param
> > repr(value))
> > StatementError: (exceptions.TypeError) expected datetime.datetime, not
> '2017-01-01T00:00:00' [SQL: u'DELETE FROM xcom WHERE xcom."key" = ? AND
> xcom.execution_date = ? AND xcom.task_id = ? AND xcom.dag_id = ?']
> [parameters: [{}]]
> >
> > Somehow I feel that it is linked with sqlalchemy_utc library, though I
> may be wrong and its just missing some timezone related settings.
> > Please let me know if anyone else has faced this issue as well.
> >
> > Thanks,
> > Sumit
> >
> >
>
>


Re: DAG logging

2017-10-31 Thread Boris Tyukin
Chris,

see my post "new logging" - apparently we cannot use logging any more and
have to init log handler.

On Tue, Oct 31, 2017 at 1:54 PM, Chris Riccomini 
wrote:

> Correction:
>
> import logging
>
> class DqRowCheckOperator(BaseOperator):
>   ...
>   def execute(...):
> logging.info('foo')
>   ...
>
> It's an operator that we're using. The 'foo' doesn't show up in the logs in
> the UI or file.
>
> On Tue, Oct 31, 2017 at 10:47 AM, Chris Riccomini 
> wrote:
>
> > Hey all,
> >
> > Just noticed when we upgraded to 1.9.0 that logging from our custom
> > operators are no longer visible in the file. Assuming this is due to all
> > the log changes that were made in 1.9.0.
> >
> > Our custom operators just have:
> >
> > import logging
> >
> > class DbDagBuilder(object):
> >   ...
> >   logging.info('foo')
> >   ...
> >
> > This was working fine in 1.8.2. What is the suggested way to make this
> > work?
> >
> > Cheers,
> > Chris
> >
>


Re: new logging

2017-10-17 Thread Boris Tyukin
thank you both! so my understanding instead one liner like
logging.info("something")

I need to init logger first in a python operator and only after that I can
invoke logger.info like below. This is a bit unexpected and a bit more work
but it does work now - thanks. Maybe I was not doing it right in the first
place but hey it did work in 1.8 :)

logger = logging.getLogger('airflow.python_test_logging')
logger.info('hi from loggin info')

On Tue, Oct 17, 2017 at 6:35 AM, Driesprong, Fokko 
wrote:

> Hi Boris,
>
> This is interesting.
>
> When I run the example you provide with the latest Airflow release, I get:
> root@9e3cf03c0544:~# airflow test python_test_logging print_the_context
> 2017-01-01
> [2017-10-17 10:27:38,321] {__init__.py:45} INFO - Using executor
> SequentialExecutor
> [2017-10-17 10:27:38,359] {models.py:186} INFO - Filling up the DagBag from
> /root/airflow/dags
> [2017-10-17 10:27:38,406] {dag.py:30} WARNING - test warn
> [2017-10-17 10:27:38,406] {dag.py:31} INFO - test info
> [2017-10-17 10:27:38,429] {models.py:1165} INFO - Dependencies all met for
>  [None]>
> [2017-10-17 10:27:38,432] {models.py:1165} INFO - Dependencies all met for
>  [None]>
> [2017-10-17 10:27:38,432] {models.py:1375} INFO -
> 
> 
> Starting attempt 1 of 1
> 
> 
>
> [2017-10-17 10:27:38,433] {models.py:1396} INFO - Executing
>  on 2017-01-01 00:00:00
> {'END_DATE': '2017-01-01',
>  'conf':  '/usr/local/lib/python3.6/site-packages/airflow/configuration.py'>,
>  'dag': ,
>  'dag_run': None,
>  'ds_nodash': '20170101',
>  'end_date': '2017-01-01',
>  'execution_date': datetime.datetime(2017, 1, 1, 0, 0),
>  'latest_date': '2017-01-01',
>  'macros':  '/usr/local/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
>  'next_execution_date': None,
>  'params': {},
>  'prev_execution_date': None,
>  'run_id': None,
>  'tables': None,
>  'task': ,
>  'task_instance':  2017-01-01 00:00:00 [None]>,
>  'task_instance_key_str':
> 'python_test_logging__print_the_context__20170101',
>  'templates_dict': None,
>  'test_mode': True,
>  'ti':  00:00:00 [None]>,
>  'tomorrow_ds': '2017-01-02',
>  'tomorrow_ds_nodash': '20170102',
>  'ts': '2017-01-01T00:00:00',
>  'ts_nodash': '20170101T00',
>  'var': {'json': None, 'value': None},
>  'yesterday_ds': '2016-12-31',
>  'yesterday_ds_nodash': '20161231'}
> hi from print
> [2017-10-17 10:27:38,441] {python_operator.py:90} INFO - Done. Returned
> value was: None
>
> When I change it to `warn`, I do get:
> WARNING:root:hi from loggin info
>
> ​​By giving an explicit logger, as Daniel suggested, that starts with
> airflow%:
> def print_context(ds, **kwargs):
> pprint(kwargs)
> print('hi from print')
> logger = logging.getLogger('airflow.python_test_logging')
> logger.info('hi from loggin info')
>
> ​Then it is picked up by the Airflow logging context​:
> [2017-10-17 10:31:05,639] {dag.py:23} INFO - hi from loggin info
>
> ​This is how the current logging is set:
> https://github.com/apache/incubator-airflow/blob/master/
> airflow/config_templates/airflow_local_settings.py
>
> Currently I'm investigating why it used to work in Airflow 1.8. This is not
> yet clear to me.
>
> Cheers, Fokko​
>
>
> 2017-10-17 7:58 GMT+02:00 Daniel Lamblin [Data Science & Platform Center] <
> lamb...@coupang.com>:
>
> > Boris, I don't see where you configured the default logger
> > <https://docs.python.org/2/howto/logging.html#configuring-logging>. I
> > think
> > you'd have to at least throw in:
> > logger = logging.getLogger('python_test_logging')
> > and then use the logger.info(…) instead of logging.info(…)
> >
> > On Tue, Oct 17, 2017 at 3:00 AM, Boris Tyukin 
> > wrote:
> >
> > > sorry i was not clear.
> > >
> > > In DAG definition file, I would normally import logging and when use
> with
> > > python operator like below (see print_context).
> > >
> > > In 1.8.2 i would see in Airflow log file (for that task) both &#

Re: Return results optionally from spark_sql_hook

2017-10-16 Thread Boris Tyukin
great, this is what I expected to hear but wanted to double check. thanks
for all your help, Fokko

On Mon, Oct 16, 2017 at 1:08 PM, Driesprong, Fokko 
wrote:

> Hi Boris,
>
> When kicking off Spark jobs using Airflow, cluster mode is highly
> recommended since the workload of the driver is on the Hadoop cluster, and
> not on the Airflow machine itself. Personally I prefer the spark-submit
> operator since it will pull all the connection variables directly from
> Airflow, and you'll end up with a central place (Airflow connections) where
> all the configuration is kept. Otherwise you'll end up with configuration
> within your Airflow logic.
>
> Cheers, Fokko
>
> 2017-10-15 17:16 GMT+02:00 Boris :
>
> > Thanks Fokko. Do you know if it is better to use pyspark directly within
> > python operator or invoke submit-job instead? My understanding in both
> > cases airflow uses yarn-client deployment mode, not yarn-cluster and
> spark
> > driver always runs on the same node with airflow worker. Not sure it is
> the
> > best practice...
> >
> > On Oct 15, 2017 05:04, "Driesprong, Fokko"  wrote:
> >
> > > Hi Boris,
> > >
> > > Instead of writing it to a file, you can also write it to xcom, this
> will
> > > keep everything inside of Airflow. My personal opinion on this;
> spark-sql
> > > is a bit limited by nature, it only support SQL. If you want to do more
> > > dynamic stuff, you will eventually have to move to spark-submit anyway.
> > >
> > > Cheers, Fokko
> > >
> > > 2017-10-14 14:45 GMT+02:00 Boris :
> > >
> > > > Thanks Fokko, I think it will do it but my concern that in this case
> my
> > > dag
> > > > will initiate two separate spark sessions and it takes about 20
> seconds
> > > in
> > > > our yarn environment to create it. I need to run 600 dags like that
> > every
> > > > morning.
> > > >
> > > > I am thinking now to create a pyspark job that will do insert and
> count
> > > and
> > > > write it to a temp file. Still not ideal... I wish I could just parse
> > > spark
> > > > SQL instead..
> > > >
> > > > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" 
> > > wrote:
> > > >
> > > > > Hi Boris,
> > > > >
> > > > > That sounds like a nice DAG.
> > > > >
> > > > > This is how I would do it: First run the long running query in a
> > > > spark-sql
> > > > > operator like you have now. Create a python function that builds a
> > > > > SparkSession within Python (using the Spark pyspark api) and
> fetches
> > > the
> > > > > count from the spark partition that you've just created. Create a
> > > > > BranchPythonOperator that will invoke this function, and based on,
> if
> > > the
> > > > > count is ok or not, branch:
> > > > >
> > > > >- If the count is okay, branch downstream and continue with the
> > > normal
> > > > >execution.
> > > > >- If the count is off, terminate and send you and email/slack
> that
> > > the
> > > > >count is not as expected.
> > > > >
> > > > > ​This will look something like this:
> > > > > [image: Inline afbeelding 1]​
> > > > >
> > > > > Would this solve your problem?
> > > > >
> > > > > Cheers, Fokko
> > > > >
> > > > >
> > > > >
> > > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin :
> > > > >
> > > > >> Hi Fokko, thanks for your response, really appreciate it!
> > > > >>
> > > > >> Basically in my case I have two Spark SQL queries:
> > > > >>
> > > > >> 1) the first query does INSERT OVERWRITE to a partition and may
> > take a
> > > > >> while for a while
> > > > >> 2) then I run a second query right after it to get count of rows
> of
> > > that
> > > > >> partition.
> > > > >> 3) I need to pass that count back to airflow dag and this count
> will
> > > be
> > > > >> used by the next task in the DAG to make a decision if this
> > partition
> > > > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> > > with a
> > > > >> pro

Re: new logging

2017-10-16 Thread Boris Tyukin
sorry i was not clear.

In DAG definition file, I would normally import logging and when use with
python operator like below (see print_context).

In 1.8.2 i would see in Airflow log file (for that task) both 'hi from
print' and 'hi from logging'.

now I can only see 'hi from print'.

I installed from master (pip install), and used newer airflow.cfg file.
Only updated it to use local executor and mysql db. Did not change any
other settings.

So my question if it is supposed to work like that and how do I log now
from python operators in a dag.


from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
import logging
import time
from pprint import pprint

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='python_test_logging', default_args=args,
schedule_interval=None)

def print_context(ds, **kwargs):
pprint(kwargs)
print('hi from print')
logging.info('hi from loggin info')

run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)


On Mon, Oct 16, 2017 at 1:09 PM, Driesprong, Fokko 
wrote:

> Hi Boris,
>
> Can you please elaborate? Where did the output used to end up? Did you
> provide any custom logging config, or are you using the default config?
> Maybe drop some example code.
>
> Cheers, Fokko
>
> 2017-10-16 19:02 GMT+02:00 Boris :
>
> > Hi guys,
> >
> > I used to do logging.info("somthing") from airflow python operators so i
> > can see output in airflow logs. Worked fine in 1.8.2.
> >
> > Looks like this is no longer the case with master branch.I did look at
> > updating.md but still not clear how to log custom messages from python
> > operators.
> >
> > please advise.
> >
>


Re: spark sql hook with multiple queries

2017-10-14 Thread Boris Tyukin
Hi Fokko,

looks like you've fixed the issue that was causing it :)
[AIRFLOW-1562] Spark-sql logging contains deadlock

This is exactly what I was seeing - the process would just freeze on the
second query I guess waiting for the lock on the log file

Thanks!

On Sat, Oct 14, 2017 at 5:07 AM, Driesprong, Fokko 
wrote:

> Hi Boris,
>
> Interesting. Multiple queries is supported by the spark-sql operator and
> this should work using Airflow. Executing SQL from a file:
>
> Fokkos-MBP:~ fokkodriesprong$ spark-sql --driver-java-options
> "-Dlog4j.configuration=file:///tmp/log4j.properties" -f query.sql
> 1
> Time taken: 1.976 seconds, Fetched 1 row(s)
> 1
> Time taken: 0.034 seconds, Fetched 1 row(s)
>
> Executing SQL from the command-line:
>
> Fokkos-MBP:~ fokkodriesprong$ spark-sql --driver-java-options
> "-Dlog4j.configuration=file:///tmp/log4j.properties" -e "SELECT 1; SELECT
> 1;"
> 1
> Time taken: 1.947 seconds, Fetched 1 row(s)
> 1
> Time taken: 0.032 seconds, Fetched 1 row(s)
>
> Can you share the exception that you are seeing? What version of Spark are
> you using?
>
> Cheers, Fokko
>
>
>
>
>
>
>
> 2017-10-11 18:01 GMT+02:00 Boris Tyukin :
>
> > hi guys,
> >
> > tried spark_sql_hook to run a multi-statement query (two queries
> separated
> > by semi-column ) and it hangs forever. If i comment out the second query,
> > it runs fine.
> >
> > Anyone had the same issue? i do not see anything in the code preventing
> > more one statement.
> >
> > sql = """
> > select * from  ;
> > select * from  ;
> > """
> >
> > spark = SparkSqlHook(sql, conn_id='spark_default', master='yarn',
> > num_executors=4)
> > spark.run_query()
> >
> > Boris
> >
>


Re: Return results optionally from spark_sql_hook

2017-10-14 Thread Boris Tyukin
Hi Fokko, thanks for your response, really appreciate it!

Basically in my case I have two Spark SQL queries:

1) the first query does INSERT OVERWRITE to a partition and may take a
while for a while
2) then I run a second query right after it to get count of rows of that
partition.
3) I need to pass that count back to airflow dag and this count will be
used by the next task in the DAG to make a decision if this partition
should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
production table partition.

So I need somehow to get that count of rows. My initial though was to parse
the log and extract that count but looks like even if i do regex it does
not quite work - spark sql writes query output to stdout which airflow
spark sql hook does not capture right now.

if you can suggest a better solution for me it would be great!

Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
PARTITION in the same pyspark job but I found out that spark does not
support this statement yet and I have to use Hive.

On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko 
wrote:

> Hi Boris,
>
> Thank you for your question and excuse me for the late response, currently
> I'm on holiday.
>
> The solution that you suggest, would not be my preferred choice. Extracting
> results from a log using a regex is expensive in terms of computational
> costs, and error prone. My question is, what are you trying to accomplish?
> For me there are two ways of using the Spark-sql operator:
>
>1. ETL Using Spark: Instead of returning the results, write the results
>back to a new table, or a new partition within the table. This data can
> be
>used downstream in the dag. Also, this will write the data to hdfs
> which is
>nice for persistance.
>2. Write the data in a simple and widely supported format (such as csv)
>onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to
> you
>local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe
> it
>to your application directly.
>
> What you are trying to accomplish, looks for me something that would fit
> the spark-submit job, where you can submit pyspark applications where you
> can directly fetch the results from Spark:
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.2.0
>   /_/
>
> Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> SparkSession available as 'spark'.
> >>> spark.sql("SELECT 1 as count").first()
> Row(count=1)
>
> Most of the time we use the Spark-sql to transform the data, then use sqoop
> to get the data from hdfs to a rdbms to expose the data to the business.
> These examples are for Spark using hdfs, but for s3 it is somewhat the
> same.
>
> Does this answer your question, if not, could you elaborate the problem
> that you are facing?
>
> Ciao, Fokko
>
>
>
>
> 2017-10-13 15:54 GMT+02:00 Boris :
>
> > hi guys,
> >
> > I opened JIRA on this and will be working on PR
> > https://issues.apache.org/jira/browse/AIRFLOW-1713
> >
> > any objections/suggestions conceptually?
> >
> > Fokko, I see you have been actively contributing to spark hooks and
> > operators so I could use your opinion before I implement this.
> >
> > Boris
> >
>


spark sql hook with multiple queries

2017-10-11 Thread Boris Tyukin
hi guys,

tried spark_sql_hook to run a multi-statement query (two queries separated
by semi-column ) and it hangs forever. If i comment out the second query,
it runs fine.

Anyone had the same issue? i do not see anything in the code preventing
more one statement.

sql = """
select * from  ;
select * from  ;
"""

spark = SparkSqlHook(sql, conn_id='spark_default', master='yarn',
num_executors=4)
spark.run_query()

Boris


Re: Apache Airflow welcome new committer/PMC member : Fokko Driespong (a.k.a. fokko)

2017-10-04 Thread Boris Tyukin
great news!

On Wed, Oct 4, 2017 at 6:03 PM, Arthur Wiedmer 
wrote:

> Welcome Fokko!
>
> On Wed, Oct 4, 2017 at 12:51 PM, Sid Anand  wrote:
>
> > Folks,
> > Please join the Apache Airflow PMC in welcoming its newest member and
> > co-committer, Fokko Driespong (a.k.a. fokko ).
> >
> > https://cwiki.apache.org/confluence/display/AIRFLOW/
> > Announcements#Announcements-Oct1,2017
> >
> >
> > -s
> >
>


Re: Some random fun

2017-09-25 Thread Boris Tyukin
this is nerdy!! :) thanks for sharing!

On Mon, Sep 25, 2017 at 3:00 PM, Dan Davydov  wrote:

> Haha, this is great.
>
> On Mon, Sep 25, 2017 at 11:37 AM Shah Altaf  wrote:
>
> > **CupcakeSensor activated**
> >
> >
> >
> > On Mon, Sep 25, 2017 at 7:31 PM Laura Lorenz 
> > wrote:
> >
> > > Just thought everyone here would appreciate the nerdy party our data
> team
> > > threw ourselves for completing a milestone on a difficult DAG recently.
> > We
> > > played Pin the Task on the DAG and ate Task State cupcakes: see pics at
> > > https://twitter.com/lalorenz6/status/912383049354096641
> > >
> >
>


Re: Info needed regarding upgrading to 1.8.2

2017-08-18 Thread Boris Tyukin
see how much time it takes to parse your dag, if you do any heavy
operations like database calls, file pulls etc. I remember someone had an
issue with web server because it took a while to parse one dag. By default
airflow will check dag folder very frequently (unless you changed the
timing in config). You can start both scheduler and web server manually and
just watch the logs

On Fri, Aug 18, 2017 at 9:03 AM, Sumit Maheshwari 
wrote:

> Thanks Boris,
>
> Seems like there is some issue in my Scheduler, which sometimes picks up
> dag modification immediately and sometimes doesn't pickup at all. And when
> Scheduler doesn't pickup new dag, UI also doesn't show it, i.e. no refresh
> button would be available. I am trying to debug that issue with Scheduler,
> but not sure if it's a real issue at all or something else.
>
>
>
> On Fri, Aug 18, 2017 at 6:16 PM, Boris Tyukin 
> wrote:
>
> > we run 1.8.1 with no debug mode. New dags are picked up just fine without
> > restarts. as well as modifications (adding new tasks or changing
> > definitions of new dags). When I want to see dag updated on UI, I hit
> > refresh button but this is just for UI to show it properly. No restarts
> > needed in our experience with 1.8.1
> >
> > On Fri, Aug 18, 2017 at 6:40 AM, Sumit Maheshwari <
> sumeet.ma...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > We are trying to upgrade to Airflow ver 1.8.2 from 1.7.0 and found a
> > couple
> > > of issues, major ones are:
> > >
> > > 1. We used to run the webserver in debug mode (-d) so we don't need to
> > > restart whenever we add/modify any dag. But with 1.8.2 debug mode
> doesn't
> > > have any effect and we need to restart it after any change in dags.
> > >
> > > 2. Scheduler used to pick up any new dag landed in dags folder, but
> that
> > is
> > > not happening anymore and we required to restart the scheduler as well.
> > >
> > > Any help would be highly appreciated.
> > >
> > > Thanks,
> > > Sumit
> > >
> >
>


Re: Info needed regarding upgrading to 1.8.2

2017-08-18 Thread Boris Tyukin
we run 1.8.1 with no debug mode. New dags are picked up just fine without
restarts. as well as modifications (adding new tasks or changing
definitions of new dags). When I want to see dag updated on UI, I hit
refresh button but this is just for UI to show it properly. No restarts
needed in our experience with 1.8.1

On Fri, Aug 18, 2017 at 6:40 AM, Sumit Maheshwari 
wrote:

> Hi All,
>
> We are trying to upgrade to Airflow ver 1.8.2 from 1.7.0 and found a couple
> of issues, major ones are:
>
> 1. We used to run the webserver in debug mode (-d) so we don't need to
> restart whenever we add/modify any dag. But with 1.8.2 debug mode doesn't
> have any effect and we need to restart it after any change in dags.
>
> 2. Scheduler used to pick up any new dag landed in dags folder, but that is
> not happening anymore and we required to restart the scheduler as well.
>
> Any help would be highly appreciated.
>
> Thanks,
> Sumit
>


Re: Airflow dependency won't change

2017-08-17 Thread Boris Tyukin
hopefully someone chimes in and explain. I just remember I've read to
change the dag name if schedule needs to be changed that's why I suggested
it. I've tried to swap order of tasks but I often add/delete tasks from
dags as I develop and that seems to be working fine without dag renaming.

Conceptually while it is okay to generate DAGs dynamically, it is not okay
to change DAG from time to time - tasks should be pretty much static as Max
explained in another thread.

On Thu, Aug 17, 2017 at 6:11 PM, Weiwei Zhang  wrote:

> - Boris, You are right. After I change the dag id to something else, the
> dependency holds. I am very curious but why i cannot just switch the order
> and don't need to change the dag id. Thanks a lot!
>
>
>
> On Wed, Aug 9, 2017 at 10:21 AM, Boris Tyukin 
> wrote:
>
> > Hit refresh button from UI to make sure it shows the proper order before
> > you run. you might also try to restart scheduler.
> >
> > if it does not help, try to rename your dag_id to something like
> mydag_v2.
> >
> > On Wed, Aug 9, 2017 at 12:48 PM, Weiwei Zhang 
> > wrote:
> >
> > > Hi guys,
> > >
> > > I have two tasks in a DAG, t1 and t2. It used to be t2.set_upstream(t1)
> > and
> > > now I want to refactor the logic by setting t1.set_upstream(t2).
> However,
> > > when I try to run this DAG, it either will try to run two tasks
> > > simultaneously or it will try to run t2 first and towards the end, it
> > will
> > > also run t1 before t2 finishes. I am very confused about this behavior.
> > Am
> > > I missing something here? I am using Airflow 1.8.1.
> > >
> > > Thanks,
> > > -Weiwei
> > >
> >
>


Re: Airflow dependency won't change

2017-08-09 Thread Boris Tyukin
Hit refresh button from UI to make sure it shows the proper order before
you run. you might also try to restart scheduler.

if it does not help, try to rename your dag_id to something like mydag_v2.

On Wed, Aug 9, 2017 at 12:48 PM, Weiwei Zhang  wrote:

> Hi guys,
>
> I have two tasks in a DAG, t1 and t2. It used to be t2.set_upstream(t1) and
> now I want to refactor the logic by setting t1.set_upstream(t2). However,
> when I try to run this DAG, it either will try to run two tasks
> simultaneously or it will try to run t2 first and towards the end, it will
> also run t1 before t2 finishes. I am very confused about this behavior. Am
> I missing something here? I am using Airflow 1.8.1.
>
> Thanks,
> -Weiwei
>


Re: Apache airflow usage survey

2017-06-10 Thread Boris Tyukin
you are asking some really good questions, Gerard. If we do not use it in
production yet but going to, are we eligible to take it? :)
Boris

On Sat, Jun 10, 2017 at 6:40 AM, Gerard Toonstra 
wrote:

> Hi all,
>
> I'm curious how others are using and deploying airflow. Rather than
> inviting people to reply to this email on a dev list, I created a survey
> with 10 questions (all visible on first page):
>
> https://goo.gl/forms/FeSBMfI7O8oe8wZu2
>
> I'm going to close and share the results of that survey in 2 weeks, but on
> completion you can already see the survey responses up to that point in
> time.
>
> Rgds,
>
> G>
>


Re: airflow upgrade from v1.7.1.3 to 1.8

2017-06-09 Thread Boris Tyukin
Max, do you think it would be a good idea to document the update process
under installation section? I know it is basic commands but you know have a
lot of users, introduced to Python thanks to Airflow! (me!)

I can create jira and PR myself..

Boris

On Thu, Jun 8, 2017 at 6:46 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> This should help:
> https://github.com/apache/incubator-airflow/blob/master/UPDATING.md
>
> Maxime
>
> On Thu, Jun 8, 2017 at 3:38 PM, manish ranjan 
> wrote:
>
> > Hello Everyone,
> >
> > I am planning to move our production from 1.7.3 to 1.8. Is there a* NOT
> To
> > Do/To Do* list one should be aware of? Any help/ pointing to link/ blogs
> > will be really helpful.
> >
> > ~Manish
> >
>


Re: issue fetching master repo

2017-04-20 Thread Boris Tyukin
thanks Gael, this was it! man I spent 3 hours trying to figure this out!

my .gitconfig has core.autocrlf=input as it is recommended by github (the
article you references) and by git documentation. Apparently this was
causing issues with airflow repo and specifically this file (which I did
not change)

 airflow/www/static/nv.d3.js

This file has mixed line ending (both linux and windows style) and my git
was correcting this when i cloned the repo.

I wonder why other people do not have the same issue especially
because core.autocrlf=input
is a way to go normally.

Should we fix that file? Or maybe add .gitattributes file to airflow repo
as github suggesting?



On Thu, Apr 20, 2017 at 7:57 AM, Gael Magnan  wrote:

> It's probably due to your git settings having to do with line end.
> The file has mixed line end, if you git is setup to change the line end at
> pull (core.autocrlf)
> You might experience that. I suggest reading article about the subject
> like: https://help.github.com/articles/dealing-with-line-endings/
> Hope this helps.
>
> Regards
> Gael
>
> Le jeu. 20 avr. 2017 à 13:44, Bolke de Bruin  a écrit :
>
> > Hi Boris,
> >
> > To be honest this is not an airflow question, but a git question.
> >
> > If you havent made any changes to the code, why don’t you delete the test
> > folder and cone again?
> >
> > B.
> >
> > > On 20 Apr 2017, at 13:42, Boris Tyukin  wrote:
> > >
> > > I just did this
> > >
> > > $ git clone g...@github.com:apache/incubator-airflow.git test
> > > $ cd test
> > > $ git status
> > >
> > > and getting this right away -
> > > # On branch master
> > > # Changed but not updated:
> > > #   (use "git add ..." to update what will be committed)
> > > #   (use "git checkout -- ..." to discard changes in working
> > > directory)
> > > #
> > > # modified:   airflow/www/static/nv.d3.js
> > >
> > > but I did not touch that file. I cannot do rebase or commit:
> > >
> > > cannot rebase: you have unstaged changes
> > > D airflow/www/static/nv.d3.js
> > >
> > >
> > > This is really weird, please help
> > >
> > >
> > >
> > >
> > > On Wed, Apr 19, 2017 at 11:19 PM, Boris Tyukin 
> > > wrote:
> > >
> > >> hey guys,
> > >>
> > >> want to submit my first tiny PR and once I fork airflow and clone my
> > repo
> > >> get this message below:
> > >>
> > >> I cannot commit / rebase and I cannot find a way to remove this file.
> Is
> > >> it only my who has this issue?
> > >>
> > >> git status
> > >> # On branch master
> > >> # Changed but not updated:
> > >> #   (use "git add ..." to update what will be committed)
> > >> #   (use "git checkout -- ..." to discard changes in working
> > >> directory)
> > >> #
> > >> # modified:   airflow/www/static/nv.d3.js
> > >>
> > >>
> >
> >
>


Re: issue fetching master repo

2017-04-20 Thread Boris Tyukin
I just did this

$ git clone g...@github.com:apache/incubator-airflow.git test
$ cd test
$ git status

and getting this right away -
# On branch master
# Changed but not updated:
#   (use "git add ..." to update what will be committed)
#   (use "git checkout -- ..." to discard changes in working
directory)
#
# modified:   airflow/www/static/nv.d3.js

but I did not touch that file. I cannot do rebase or commit:

cannot rebase: you have unstaged changes
D airflow/www/static/nv.d3.js


This is really weird, please help




On Wed, Apr 19, 2017 at 11:19 PM, Boris Tyukin 
wrote:

> hey guys,
>
> want to submit my first tiny PR and once I fork airflow and clone my repo
> get this message below:
>
> I cannot commit / rebase and I cannot find a way to remove this file. Is
> it only my who has this issue?
>
> git status
> # On branch master
> # Changed but not updated:
> #   (use "git add ..." to update what will be committed)
> #   (use "git checkout -- ..." to discard changes in working
> directory)
> #
> # modified:   airflow/www/static/nv.d3.js
>
>


issue fetching master repo

2017-04-19 Thread Boris Tyukin
hey guys,

want to submit my first tiny PR and once I fork airflow and clone my repo
get this message below:

I cannot commit / rebase and I cannot find a way to remove this file. Is it
only my who has this issue?

git status
# On branch master
# Changed but not updated:
#   (use "git add ..." to update what will be committed)
#   (use "git checkout -- ..." to discard changes in working
directory)
#
# modified:   airflow/www/static/nv.d3.js


Re: Cleanup

2017-04-06 Thread Boris Tyukin
another related thing is cleanup of logs which was discussed a few days
ago. Airflow generates enormous of logs which I like because it is very
easy to troubleshot but one dag with 5 tasks i have been running for a few
weeks a few times a day generated 2Gb of logs! I can probably switch
logging mode to less detailed but what i really want is automatic archiving
capability. For now I can just use another airflow dag to do this cleanup
but it would be nice to have this feature

On Wed, Apr 5, 2017 at 11:23 PM, Vijay Krishna Ramesh <
vijay.krishna.ram...@gmail.com> wrote:

> To add to Siddharth's pretty extensive list (in particular, the "delete a
> DAG from the code that makes up the dag bag folder, but now it shows up
> with a ! icon and you have to manually set it to is_active = f" issue that
> I didn't see in 1.8.0-RC4 but started seeing in 1.8.0-RC5 that became
> 1.8.0) -
>
> how does XCOM data get cleaned up? would be nice to either let tasks
> consume the data (and then it goes away from the backing db, after an ack
> or something) - or at the very least, TTL after a set interval.
>
>
>
> On Wed, Apr 5, 2017 at 7:46 PM, siddharth anand  wrote:
>
> > Edgardo,
> > This is a great question and something that requires functionality to
> > address. As Airflow starts getting used for bigger workloads, we need a
> way
> > to clean up defunct resources.
> >
> >- How do we delete a dag and its related resources?
> >   - Until the recent release, the way that I stopped having a defunct
> >   (retired) dag show up in the UI was to move the DAG file out of the
> >   dag_folder or just deleting it from Git. Our dag folders are
> > just symlinks
> >   to tagged Git repos.
> >   - This no longer works -- the UI will display the dag list based on
> >   entries in the dag table in the airflow metadata db -- but will no
> > longer
> >   have code to back that dag table entry. I currently manually delete
> > a row
> >   from the dag table, but that is surely not the right thing to do.
> >   - How do we retire entries from the *task_instance, job, log,
> xcom,
> >   sla_miss, dag_stats, *and *dag_run* tables for dags that are
> deleted?
> >   (I can surely clean these up manually as well, but we need a UI
> >   control).
> >  -  *task_instance, job, log, &* *dag_run *tables grow faster
> than
> >  the others
> >  - How does one track if variables, connections, or pools are no
> >   longer referenced because all of the DAGs that use them are gone?
> >  - It would be nice here to have reference counts & links to DAGs
> >  that reference a Pool, Connection, or Variable. The reference
> > counts can be
> >  broken down into paused & unpaused.
> >
> > It's time we added some functionality to the API/CLI/UI to address these
> > functionality gaps.
> >
> > -s
> >
> > On Tue, Apr 4, 2017 at 10:25 AM, Edgardo Vega 
> > wrote:
> >
> > > Max,
> > >
> > > Thanks for the reply, it is much appreciated.  I am currently running
> > ~10k
> > > task a day in our test environment.
> > >
> > > It is good to know where the archive point is and that I shouldn't have
> > any
> > > issues for a long time.
> > >
> > > I was just thinking ahead as we get airflow into production
> environment.
> > > Maybe in this case maybe way too far ahead.
> > >
> > >
> > > Cheers,
> > >
> > > Edgardo
> > >
> > > On Tue, Apr 4, 2017 at 11:58 AM, Maxime Beauchemin <
> > > maximebeauche...@gmail.com> wrote:
> > >
> > > > We run ~50k tasks a day at Airbnb. How many tasks/day are you
> planning
> > on
> > > > running?
> > > >
> > > > Though you can archive the `task_instance` and `job` table down the
> > line,
> > > > but that shouldn't be a concern until you hit tens of millions of
> > > entries.
> > > > Then you can setup a daily Airflow job that archives some of these
> > > entries.
> > > > I believe we do it based on `start_date` and move rows to some other
> > > table
> > > > in the same db.
> > > >
> > > > Max
> > > >
> > > > On Mon, Apr 3, 2017 at 5:30 PM, Edgardo Vega  >
> > > > wrote:
> > > >
> > > > > I have been playing with airflow for a few days and it's not
> obvious
> > > what
> > > > > will happen down the road when we have lots of dags over a long
> > period
> > > of
> > > > > time. I set a fake dag to run once a minute for a few days and
> > > everything
> > > > > seems okay except the graph view dropdown which works but take a
> few
> > > > > seconds to show up.
> > > > >
> > > > > Is there a way roll older data out of the system in order to clean
> > > things
> > > > > visually and keep the database at a smallish size?
> > > > >
> > > > > --
> > > > > Cheers,
> > > > >
> > > > > Edgardo
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Cheers,
> > >
> > > Edgardo
> > >
> >
>


Re: Article - Get started developing workflows with Apache Airflow

2017-04-04 Thread Boris Tyukin
Hi Michał, really enjoyed your post - great job! what is the reason to use
virtualenv? would you use it for production setup?

On Tue, Apr 4, 2017 at 4:25 PM, Michal K 
wrote:

> Hi everyone,
>
> I would just like to let you know that I wrote an introductory article for
> new users who are getting started with Airflow.
>
> You can find it here: https://bitly.com/2nGRgzj
>
> I would be happy to hear your comments or suggestions.
> Feel free to link to the article if you find it useful.
>
> Thanks for all your work on Airflow,
>
> Michal
>


Re: variable scope with dynamic dags

2017-03-23 Thread Boris Tyukin
well I did more testing today using guppy to measure memory consumption.
Also was watching processes and memory with htop while kicking off dags. My
test python object was defined like that:

payload = [1] * (2 * 10 ** 7) # 152 Mb

As Jeremiah said, the entire python code that generates dags is loaded FOR
every task instance. Actually from airflow logs, it looks like it is
executed twice for some reason during task instance run (i wonder why).

To make things worse, airflow webserver, runs 4 gunicorn processes (by
default) and every gunicorn process runs my dag generator file every 15
seconds or something like that, constantly loading that large structure to
RAM 4 times and offloading it and then starting over after 15 seconds.

My test 150Mb python object will blow up RAM utilization by at least 200
times if I kick all 200 dags at once assuming only one single task running
at time per dag.

So lesson learned - do not use large objects with Airflow!

This is code for test dag generator with a large python object:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator,
BranchPythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from pprint import pprint
import logging
import sys
from guppy import hpy
import time

def test_memory(payload, **kwargs):
logging.info('test_memory: size of payload {}
Mb'.format(sys.getsizeof(payload) / 1024 / 1024))
logging.info(hpy().heap())
time.sleep(15)

def get_dag(i):
"""
Returns generated DAG object with tasks
"""

logging.info('get_dag({}): size of payload {} Mb'.format(i,
sys.getsizeof(payload) / 1024 / 1024))
logging.info(hpy().heap())

dag_schedule_interval = None
dag_start_date = datetime.now()

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': dag_start_date,
'email': ['airf...@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'catchup': False
}

# Change version number if schedule needs to be changed:
dag = DAG(
'memory_tester_{}'.format(i),
default_args=default_args,
schedule_interval=dag_schedule_interval,
max_active_runs=1
)
# max_active_runs set to 1 which means only one active DAG instance at
the time. Works only with scheduler, it is
# not preventing someone to run a DAG manually concurrently

t1 = PythonOperator(
task_id="t1",
python_callable=test_memory,
provide_context=True,
op_kwargs={'payload': payload
   },
dag=dag
)

t2 = PythonOperator(
task_id="t2",
python_callable=test_memory,
provide_context=True,
op_kwargs={'payload': payload
   },
dag=dag
)

t2.set_upstream(t1)

logging.info('end of get_dag({})'.format(i))
logging.info(hpy().heap())

return dag

# payload = [1,2,3] * (2 * 10 ** 7) # 457 Mb
payload = [1] * (2 * 10 ** 7) # 152 Mb

for i in range(3):
dag = get_dag(i+1)
globals()[dag.dag_id] = dag


On Wed, Mar 22, 2017 at 3:50 PM, Boris Tyukin  wrote:

> thanks Jeremiah, this is exactly what was bugging me. I am going to
> rewrite that code and look at persistent storage. your explanation helped,
> thanks!
>
> On Wed, Mar 22, 2017 at 2:29 PM, Jeremiah Lowin  wrote:
>
>> In vanilla Python, your DAGs will all reference the same object, so when
>> your DAG file is parsed and 200 DAGs are created, there will still only be
>> 1 60MB dict object created (I say vanilla because there are obviously ways
>> to create copies of the object).
>>
>> HOWEVER, you should assume that each Airflow TASK is being run in a
>> different process, and each process is going to load your DAG file when it
>> runs. If resource use is a concern, I suggest you look at out-of-core or
>> persistent storage for the object so you don't need to load the whole
>> thing
>> every time.
>>
>> On Wed, Mar 22, 2017 at 11:20 AM Boris Tyukin 
>> wrote:
>>
>> > hi Jeremiah, thanks for the explanation!
>> >
>> > i am very new to Python so was surprised that it works and my external
>> > dictionary object was still accessible to all dags generated. I think it
>> > makes sense but I would like to confirm one thing and I do not know how
>> to
>> > test it myself.
>> >
>> > do you think that large dictionary object will still be loaded to memory
>> > only once even if I generate 200 dags that will be accessing it? 

Re: variable scope with dynamic dags

2017-03-22 Thread Boris Tyukin
thanks Jeremiah, this is exactly what was bugging me. I am going to rewrite
that code and look at persistent storage. your explanation helped, thanks!

On Wed, Mar 22, 2017 at 2:29 PM, Jeremiah Lowin  wrote:

> In vanilla Python, your DAGs will all reference the same object, so when
> your DAG file is parsed and 200 DAGs are created, there will still only be
> 1 60MB dict object created (I say vanilla because there are obviously ways
> to create copies of the object).
>
> HOWEVER, you should assume that each Airflow TASK is being run in a
> different process, and each process is going to load your DAG file when it
> runs. If resource use is a concern, I suggest you look at out-of-core or
> persistent storage for the object so you don't need to load the whole thing
> every time.
>
> On Wed, Mar 22, 2017 at 11:20 AM Boris Tyukin 
> wrote:
>
> > hi Jeremiah, thanks for the explanation!
> >
> > i am very new to Python so was surprised that it works and my external
> > dictionary object was still accessible to all dags generated. I think it
> > makes sense but I would like to confirm one thing and I do not know how
> to
> > test it myself.
> >
> > do you think that large dictionary object will still be loaded to memory
> > only once even if I generate 200 dags that will be accessing it? so
> > basically they will just use a reference to it or they would create a
> copy
> > of the same 60Mb structure.
> >
> > I hope my question makes sense :)
> >
> > On Wed, Mar 22, 2017 at 10:54 AM, Jeremiah Lowin 
> > wrote:
> >
> > > At the risk of oversimplifying things, your DAG definition file is
> loaded
> > > *every* time a DAG (or any task in that DAG) is run. Think of it as a
> > > literal Python import of your dag-defining module: any variables are
> > loaded
> > > along with the DAGs, which are then executed. That's why your dict is
> > > always available. This will work with Celery since it follows the same
> > > approach, parsing your DAG file to run each task.
> > >
> > > (By the way, this is why it's critical that all parts of your Airflow
> > > infrastructure have access to the same DAGS_FOLDER)
> > >
> > > Now it is true that the DagBag loads DAG objects but think of it as
> more
> > of
> > > an "index" so that the scheduler/webserver know what DAGs are
> available.
> > > When it's time to actually run one of those DAGs, the executor loads it
> > > from the underlying source file.
> > >
> > > Jeremiah
> > >
> > > On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a weird question but it bugs my mind. I have some like below
> to
> > > > generate dags dynamically, using Max's example code from FAQ.
> > > >
> > > > It works fine but I have one large dict (let's call it my_outer_dict)
> > > that
> > > > takes over 60Mb in memory and I need to access it from all generated
> > > dags.
> > > > Needless to say, i do not want to recreate that dict for every dag
> as I
> > > > want to load it to memory only once.
> > > >
> > > > To my surprise, if i define that dag outside of my dag definition
> > code, I
> > > > can still access it.
> > > >
> > > > Can someone explain why and where is it stored? I thought only dag
> > > > definitions are loaded to dagbag and not the variables outside it.
> > > >
> > > > Is it even a good practice and will it work still if I switch to
> celery
> > > > executor?
> > > >
> > > >
> > > > def get_dag(i):
> > > > dag_id = 'foo_{}'.format(i)
> > > > dag = DAG(dag_id)
> > > > 
> > > > print my_outer_dict
> > > >
> > > > my_outer_dict = {}
> > > > for i in range(10):
> > > > dag = get_dag(i)
> > > > globals()[dag.dag_id] = dag
> > > >
> > >
> >
>


Re: variable scope with dynamic dags

2017-03-22 Thread Boris Tyukin
hi Jeremiah, thanks for the explanation!

i am very new to Python so was surprised that it works and my external
dictionary object was still accessible to all dags generated. I think it
makes sense but I would like to confirm one thing and I do not know how to
test it myself.

do you think that large dictionary object will still be loaded to memory
only once even if I generate 200 dags that will be accessing it? so
basically they will just use a reference to it or they would create a copy
of the same 60Mb structure.

I hope my question makes sense :)

On Wed, Mar 22, 2017 at 10:54 AM, Jeremiah Lowin  wrote:

> At the risk of oversimplifying things, your DAG definition file is loaded
> *every* time a DAG (or any task in that DAG) is run. Think of it as a
> literal Python import of your dag-defining module: any variables are loaded
> along with the DAGs, which are then executed. That's why your dict is
> always available. This will work with Celery since it follows the same
> approach, parsing your DAG file to run each task.
>
> (By the way, this is why it's critical that all parts of your Airflow
> infrastructure have access to the same DAGS_FOLDER)
>
> Now it is true that the DagBag loads DAG objects but think of it as more of
> an "index" so that the scheduler/webserver know what DAGs are available.
> When it's time to actually run one of those DAGs, the executor loads it
> from the underlying source file.
>
> Jeremiah
>
> On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin 
> wrote:
>
> > Hi,
> >
> > I have a weird question but it bugs my mind. I have some like below to
> > generate dags dynamically, using Max's example code from FAQ.
> >
> > It works fine but I have one large dict (let's call it my_outer_dict)
> that
> > takes over 60Mb in memory and I need to access it from all generated
> dags.
> > Needless to say, i do not want to recreate that dict for every dag as I
> > want to load it to memory only once.
> >
> > To my surprise, if i define that dag outside of my dag definition code, I
> > can still access it.
> >
> > Can someone explain why and where is it stored? I thought only dag
> > definitions are loaded to dagbag and not the variables outside it.
> >
> > Is it even a good practice and will it work still if I switch to celery
> > executor?
> >
> >
> > def get_dag(i):
> > dag_id = 'foo_{}'.format(i)
> > dag = DAG(dag_id)
> > 
> > print my_outer_dict
> >
> > my_outer_dict = {}
> > for i in range(10):
> > dag = get_dag(i)
> > globals()[dag.dag_id] = dag
> >
>


variable scope with dynamic dags

2017-03-22 Thread Boris Tyukin
Hi,

I have a weird question but it bugs my mind. I have some like below to
generate dags dynamically, using Max's example code from FAQ.

It works fine but I have one large dict (let's call it my_outer_dict) that
takes over 60Mb in memory and I need to access it from all generated dags.
Needless to say, i do not want to recreate that dict for every dag as I
want to load it to memory only once.

To my surprise, if i define that dag outside of my dag definition code, I
can still access it.

Can someone explain why and where is it stored? I thought only dag
definitions are loaded to dagbag and not the variables outside it.

Is it even a good practice and will it work still if I switch to celery
executor?


def get_dag(i):
dag_id = 'foo_{}'.format(i)
dag = DAG(dag_id)

print my_outer_dict

my_outer_dict = {}
for i in range(10):
dag = get_dag(i)
globals()[dag.dag_id] = dag


Re: Reminder : LatestOnlyOperator

2017-03-20 Thread Boris Tyukin
depends_on_past is looking at previous task instance which sounds the same
as "latestonly" but the difference becomes apparent if you look at this
example.

Let's say you have a dag, scheduled to run every day and it has been
failing for the past 3 days. The whole purpose of that dag is to populate
snapshot table or do a daily backup.  If you use depends on past, you would
have to rerun all missed runs or mark them as successful eventually doing
useless work (3 daily snapshots or backups for the same data).

LatestOnly allows you to bypass missed runs and just do it once for most
recent instance.

Another difference, depends on past is tricky if you use BranchOperator
because some branches may not run one day and run another - it will really
mess up your logic.

On Mon, Mar 20, 2017 at 12:45 PM, Ruslan Dautkhanov 
wrote:

> Thanks Boris. It does make sense.
> Although how it's different from depends_on_past task-level parameter?
> In both cases, a task will be skipped if there is another TI of this task
> is still running (from a previous dagrun), right?
>
>
> Thanks,
> Ruslan
>
>
> On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin 
> wrote:
>
> > you would just chain them - there is an example that came with airflow
> 1.8
> > https://github.com/apache/incubator-airflow/blob/master/
> > airflow/example_dags/example_latest_only.py
> >
> > so in your case, instead of dummy operator, you would use your Oracle
> > operator.
> >
> > Does it make sense?
> >
> >
> > On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov  >
> > wrote:
> >
> > > Is there is a way to combine scheduling behavior operators  (like this
> > > LatestOnlyOperator)
> > > with a functional operator (like Oracle_Operator)? I was thinking
> > multiple
> > > inheritance would do,like
> > >
> > > > class Oracle_LatestOnly_Operator (Oracle_Operator,
> LatestOnlyOperator):
> > > > ...
> > >
> > > I might be overthinking this and there could be a simpler way?
> > > Sorry, I am still learning Airflow concepts...
> > >
> > > Thanks.
> > >
> > >
> > >
> > > --
> > > Ruslan Dautkhanov
> > >
> > > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin 
> > > wrote:
> > >
> > > > Thanks George for that feature!
> > > >
> > > > sure, just created a jira on this
> > > > https://issues.apache.org/jira/browse/AIRFLOW-1008
> > > >
> > > >
> > > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand  >
> > > > wrote:
> > > >
> > > > > Thx Boris . Credit goes to George (gwax) for the implementation of
> > the
> > > > > LatestOnlyOperator.
> > > > >
> > > > > Boris,
> > > > > Can you describe what you mean in a Jira?
> > > > > -s
> > > > >
> > > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin <
> bo...@boristyukin.com
> > >
> > > > > wrote:
> > > > >
> > > > > > this is nice indeed along with the new catchup option
> > > > > > https://airflow.incubator.apache.org/scheduler.html#
> > > > backfill-and-catchup
> > > > > >
> > > > > > Thanks Sid and Ben for adding these new options!
> > > > > >
> > > > > > for a complete picture, it would be nice to force only one dag
> run
> > at
> > > > the
> > > > > > time.
> > > > > >
> > > > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand <
> > san...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > With the Apache Airflow 1.8 release imminent, you may want to
> try
> > > out
> > > > > the
> > > > > > >
> > > > > > > *LatestOnlyOperator.*
> > > > > > >
> > > > > > > If you want your DAG to only run on the most recent scheduled
> > slot,
> > > > > > > regardless of backlog, this operator will skip running
> downstream
> > > > tasks
> > > > > > for
> > > > > > > all DAG Runs prior to the current time slot.
> > > > > > >
> > > > > > > For example, I might have a DAG that takes a DB snapshot once a
> > > day.
> > > > It
> > > > > > > might be that I paused that DAG for 2 weeks or that I 

Re: Reminder : LatestOnlyOperator

2017-03-18 Thread Boris Tyukin
you would just chain them - there is an example that came with airflow 1.8
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_latest_only.py

so in your case, instead of dummy operator, you would use your Oracle
operator.

Does it make sense?


On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov 
wrote:

> Is there is a way to combine scheduling behavior operators  (like this
> LatestOnlyOperator)
> with a functional operator (like Oracle_Operator)? I was thinking multiple
> inheritance would do,like
>
> > class Oracle_LatestOnly_Operator (Oracle_Operator, LatestOnlyOperator):
> > ...
>
> I might be overthinking this and there could be a simpler way?
> Sorry, I am still learning Airflow concepts...
>
> Thanks.
>
>
>
> --
> Ruslan Dautkhanov
>
> On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin 
> wrote:
>
> > Thanks George for that feature!
> >
> > sure, just created a jira on this
> > https://issues.apache.org/jira/browse/AIRFLOW-1008
> >
> >
> > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand 
> > wrote:
> >
> > > Thx Boris . Credit goes to George (gwax) for the implementation of the
> > > LatestOnlyOperator.
> > >
> > > Boris,
> > > Can you describe what you mean in a Jira?
> > > -s
> > >
> > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin 
> > > wrote:
> > >
> > > > this is nice indeed along with the new catchup option
> > > > https://airflow.incubator.apache.org/scheduler.html#
> > backfill-and-catchup
> > > >
> > > > Thanks Sid and Ben for adding these new options!
> > > >
> > > > for a complete picture, it would be nice to force only one dag run at
> > the
> > > > time.
> > > >
> > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand 
> > > > wrote:
> > > >
> > > > > With the Apache Airflow 1.8 release imminent, you may want to try
> out
> > > the
> > > > >
> > > > > *LatestOnlyOperator.*
> > > > >
> > > > > If you want your DAG to only run on the most recent scheduled slot,
> > > > > regardless of backlog, this operator will skip running downstream
> > tasks
> > > > for
> > > > > all DAG Runs prior to the current time slot.
> > > > >
> > > > > For example, I might have a DAG that takes a DB snapshot once a
> day.
> > It
> > > > > might be that I paused that DAG for 2 weeks or that I had set the
> > start
> > > > > date to a fixed data 2 weeks in the past. When I enable my DAG, I
> > don't
> > > > > want it to run 14 days' worth of snapshots for the current state of
> > the
> > > > DB
> > > > > -- that's unnecessary work.
> > > > >
> > > > > The LatestOnlyOperator avoids that work.
> > > > >
> > > > > https://github.com/apache/incubator-airflow/commit/
> > > > > edf033be65b575f44aa221d5d0ec9ecb6b32c67a
> > > > >
> > > > > With it, you can simply use
> > > > > latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
> > > > >
> > > > > instead of
> > > > > def skip_to_current_job(ds, **kwargs):
> > > > > now = datetime.now()
> > > > > left_window = kwargs['dag'].following_
> > schedule(kwargs['execution_
> > > > > date'])
> > > > > right_window = kwargs['dag'].following_schedule(left_window)
> > > > > logging.info(('Left Window {}, Now {}, Right Window
> > > > > {}').format(left_window,now,right_window))
> > > > > if not now <= right_window:
> > > > > logging.info('Not latest execution, skipping downstream.')
> > > > > return False
> > > > > return True
> > > > >
> > > > > short_circuit = ShortCircuitOperator(
> > > > >   task_id = 'short_circuit_if_not_current_job',
> > > > >   provide_context = True,
> > > > >   python_callable = skip_to_current_job,
> > > > >   dag = dag
> > > > > )
> > > > >
> > > > > -s
> > > > >
> > > >
> > >
> >
>


Re: Reminder : LatestOnlyOperator

2017-03-18 Thread Boris Tyukin
Thanks George for that feature!

sure, just created a jira on this
https://issues.apache.org/jira/browse/AIRFLOW-1008


On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand  wrote:

> Thx Boris . Credit goes to George (gwax) for the implementation of the
> LatestOnlyOperator.
>
> Boris,
> Can you describe what you mean in a Jira?
> -s
>
> On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin 
> wrote:
>
> > this is nice indeed along with the new catchup option
> > https://airflow.incubator.apache.org/scheduler.html#backfill-and-catchup
> >
> > Thanks Sid and Ben for adding these new options!
> >
> > for a complete picture, it would be nice to force only one dag run at the
> > time.
> >
> > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand 
> > wrote:
> >
> > > With the Apache Airflow 1.8 release imminent, you may want to try out
> the
> > >
> > > *LatestOnlyOperator.*
> > >
> > > If you want your DAG to only run on the most recent scheduled slot,
> > > regardless of backlog, this operator will skip running downstream tasks
> > for
> > > all DAG Runs prior to the current time slot.
> > >
> > > For example, I might have a DAG that takes a DB snapshot once a day. It
> > > might be that I paused that DAG for 2 weeks or that I had set the start
> > > date to a fixed data 2 weeks in the past. When I enable my DAG, I don't
> > > want it to run 14 days' worth of snapshots for the current state of the
> > DB
> > > -- that's unnecessary work.
> > >
> > > The LatestOnlyOperator avoids that work.
> > >
> > > https://github.com/apache/incubator-airflow/commit/
> > > edf033be65b575f44aa221d5d0ec9ecb6b32c67a
> > >
> > > With it, you can simply use
> > > latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
> > >
> > > instead of
> > > def skip_to_current_job(ds, **kwargs):
> > > now = datetime.now()
> > > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> > > date'])
> > > right_window = kwargs['dag'].following_schedule(left_window)
> > > logging.info(('Left Window {}, Now {}, Right Window
> > > {}').format(left_window,now,right_window))
> > > if not now <= right_window:
> > > logging.info('Not latest execution, skipping downstream.')
> > > return False
> > > return True
> > >
> > > short_circuit = ShortCircuitOperator(
> > >   task_id = 'short_circuit_if_not_current_job',
> > >   provide_context = True,
> > >   python_callable = skip_to_current_job,
> > >   dag = dag
> > > )
> > >
> > > -s
> > >
> >
>


Re: SQLOperator?

2017-03-18 Thread Boris Tyukin
>>I guess I will have to call OracleHook from PythonOperator?

that's what I do! works fine. do not forget to install oracle_cx library
https://pypi.python.org/pypi/cx_Oracle/5.2.1

you can also create your own operator using one of existing ones as an
example

On Fri, Mar 17, 2017 at 11:49 PM, Ruslan Dautkhanov 
wrote:

> Thanks Alex
>
> I did notice MySQL one
>
> We are working with Oracle
>
> I guess I will have to call OracleHook from PythonOperator?
>
>
>
> On Fri, Mar 17, 2017 at 9:25 PM Alex Guziel  invalid>
> wrote:
>
> > I'm not sure if that one went away but there are different SQL operators,
> > like MySqlOperator, MsSqlOperator, etc. that I see.
> >
> > Best,
> > Alex
> >
> > On Fri, Mar 17, 2017 at 7:56 PM, Ruslan Dautkhanov  >
> > wrote:
> >
> > > I can't find references to SQLOperator neither in the source code nor
> in
> > > the API Reference.
> > >
> > > Although it is mentioned in Concepts page :
> > >
> > > https://github.com/apache/incubator-airflow/blob/master/
> > > docs/concepts.rst#operators
> > >
> > >
> > >
> > >- SqlOperator - executes a SQL command
> > >
> > > Sorry for basic questions - just started using Airflow this week.
> > > Did it got replaced with something else? If so what that is?
> > >
> > >
> > >
> > > Thanks,
> > > Ruslan Dautkhanov
> > >
> >
>


Re: Reminder : LatestOnlyOperator

2017-03-17 Thread Boris Tyukin
this is nice indeed along with the new catchup option
https://airflow.incubator.apache.org/scheduler.html#backfill-and-catchup

Thanks Sid and Ben for adding these new options!

for a complete picture, it would be nice to force only one dag run at the
time.

On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand  wrote:

> With the Apache Airflow 1.8 release imminent, you may want to try out the
>
> *LatestOnlyOperator.*
>
> If you want your DAG to only run on the most recent scheduled slot,
> regardless of backlog, this operator will skip running downstream tasks for
> all DAG Runs prior to the current time slot.
>
> For example, I might have a DAG that takes a DB snapshot once a day. It
> might be that I paused that DAG for 2 weeks or that I had set the start
> date to a fixed data 2 weeks in the past. When I enable my DAG, I don't
> want it to run 14 days' worth of snapshots for the current state of the DB
> -- that's unnecessary work.
>
> The LatestOnlyOperator avoids that work.
>
> https://github.com/apache/incubator-airflow/commit/
> edf033be65b575f44aa221d5d0ec9ecb6b32c67a
>
> With it, you can simply use
> latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
>
> instead of
> def skip_to_current_job(ds, **kwargs):
> now = datetime.now()
> left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'])
> right_window = kwargs['dag'].following_schedule(left_window)
> logging.info(('Left Window {}, Now {}, Right Window
> {}').format(left_window,now,right_window))
> if not now <= right_window:
> logging.info('Not latest execution, skipping downstream.')
> return False
> return True
>
> short_circuit = ShortCircuitOperator(
>   task_id = 'short_circuit_if_not_current_job',
>   provide_context = True,
>   python_callable = skip_to_current_job,
>   dag = dag
> )
>
> -s
>


Re: Adding Variables and Connections via script

2017-03-07 Thread Boris Tyukin
To add to Ali's reply, there was a PR for connections and cli
https://github.com/apache/incubator-airflow/pull/1802

hopefully it will make to 1.8


On Tue, Mar 7, 2017 at 4:49 PM, Nicholas Hodgkinson <
nik.hodgkin...@collectivehealth.com> wrote:

> I would like to be able to create a script to assist local development
> which would populate several Connections and Variables that are used across
> our organization; is there a way that I can add those from the command line
> or Python script without having to manually enter them via the UI?
>
> Thanks,
> -Nik
> nik.hodgkin...@collectivehealth.com
>
> --
>
>
> Read our founder's story.
> 
>
> *This message may contain confidential, proprietary, or protected
> information.  If you are not the intended recipient, you may not review,
> copy, or distribute this message. If you received this message in error,
> please notify the sender by reply email and delete this message.*
>


dev@airflow.incubator.apache.org

2017-02-28 Thread Boris Tyukin
thanks for sharing, was a good read! go Airflow go!

On Tue, Feb 28, 2017 at 1:00 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> Jointly publishing this short Q&A with Taylor D. Edmiston of Atronomer.io
>
> https://medium.com/the-astronomer-journey/airflow-and-the-future-of-data-
> engineering-a-q-a-266f68d956a9#.cpast6vjd
>


Re: Using the Sqoop Hook on Airflow

2017-02-21 Thread Boris Tyukin
I just use bash operator and call sqoop cli. IMHO sqoop operator does not
add any functionality in fact it is limiting it:

1) you cannot use all sqoop commands / options
2) more importantly, I did not want to pass connection password in open
text to sqoop (if you do, it will be stored in log files)

So I just used bash operator *templatized *with jinja  and
hadoop.security.credential.provider with jceks to store passwords
http://www.ericlin.me/securely-managing-passwords-in-sqoop





On Mon, Feb 20, 2017 at 3:36 PM, Vijay Bhat  wrote:

> How do folks configure a Sqoop hook for the Sqoop operator? I don't see a
> 'sqoop' operator type in the list of connection types under Admin -> Create
> -> Conn Type.
>
> Do you just configure as an SSH hook and add all the fields under Extra? Or
> is there a cleaner way?
>
> Thanks!
> Vijay
>


Re: Determine if a DAG is running

2017-02-16 Thread Boris Tyukin
Hi Nik, looks like you experience the same problem I had
https://lists.apache.org/thread.html/%3ccanrt7t1te7ei82nvry-yeubu2-g4ax3gsdtl3rnxyhj2q-q...@mail.gmail.com%3E

Bolke suggested to use a pool to ensure only one instance is running.

Max active run affects only scheduled jobs so triggered ones or kicked off
manually won't respect this setting for some reason (looks like it is by
design by looking at the source code)

Boris

On Wed, Feb 15, 2017 at 6:51 PM, Nicholas Hodgkinson <
nik.hodgkin...@collectivehealth.com> wrote:

> I'm using the TriggerDagRunOperator within a Sensor DAG to automatically
> sense and start loads, however the problem I'm coming across, given that I
> want to run this Sensor DAG often (every 5 minutes), is the same run can be
> triggered multiple times. Is there a way to avoid this? I've set
> max_active_runs to 1, but that doesn't seem to be respected when doing
> manual triggers. I suppose I could write out a file when I trigger the job,
> have the job clean it up, and then not trigger if that file exists, but I
> feel like there should be a better way to accomplish this; not to mention a
> failure in the dag would break this process.
>
> Thoughts? I've been trying to figure this out for a while and Gitter and
> the web haven't been much help.
>
> -N
> nik.hodgkin...@collectivehealth.com
> (913) 927-4891
>
> --
>
>
> Read our founder's story.
> 
>
> *This message may contain confidential, proprietary, or protected
> information.  If you are not the intended recipient, you may not review,
> copy, or distribute this message. If you received this message in error,
> please notify the sender by reply email and delete this message.*
>


Re: [VOTE] Release Airflow 1.8.0 based on Airflow 1.8.0rc3

2017-02-11 Thread Boris Tyukin
awesome! thanks Jeremiah

On Sat, Feb 11, 2017 at 12:53 PM, Jeremiah Lowin  wrote:

> Boris, I submitted a PR to address your second point --
> https://github.com/apache/incubator-airflow/pull/2068. Thanks!
>
> On Sat, Feb 11, 2017 at 10:42 AM Boris Tyukin 
> wrote:
>
> > I am running LocalExecutor and not doing crazy things but use DAG
> > generation heavily - everything runs fine as before. As I mentioned in
> > other threads only had a few issues:
> >
> > 1) had to upgrade MySQL which was a PAIN. Cloudera CDH is running old
> > version of MySQL which was compatible with 1.7.1 but not compatible now
> > with 1.8 because of fractional seconds support PR.
> >
> > 2) when you install airflow, there are two new example DAGs
> > (last_task_only) which are going back very far in the past and scheduled
> to
> > run every hour - a bunch of dags triggered on the first start of
> scheduler
> > and hosed my CPU
> >
> > Everything else was fine and I LOVE lots of small UI changes, which
> reduced
> > a lot my use of cli.
> >
> > Thanks again for the amazing work and an awesome project!
> >
> >
> > On Sat, Feb 11, 2017 at 9:17 AM, Jeremiah Lowin 
> wrote:
> >
> > > I was able to deploy successfully. +1 (binding)
> > >
> > > On Fri, Feb 10, 2017 at 7:37 PM Maxime Beauchemin <
> > > maximebeauche...@gmail.com> wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On Fri, Feb 10, 2017 at 3:44 PM, Arthur Wiedmer <
> > > arthur.wied...@gmail.com>
> > > > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On Feb 10, 2017 3:13 PM, "Dan Davydov"  > > invalid>
> > > > > wrote:
> > > > >
> > > > > > Our staging looks good, all the DAGs there pass.
> > > > > > +1 (binding)
> > > > > >
> > > > > > On Fri, Feb 10, 2017 at 10:21 AM, Chris Riccomini <
> > > > criccom...@apache.org
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Running in all environments. Will vote after the weekend to
> make
> > > sure
> > > > > > > things are working properly, but so far so good.
> > > > > > >
> > > > > > > On Fri, Feb 10, 2017 at 6:05 AM, Bolke de Bruin <
> > bdbr...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Dear All,
> > > > > > > >
> > > > > > > > Let’s try again!
> > > > > > > >
> > > > > > > > I have made the THIRD RELEASE CANDIDATE of Airflow 1.8.0
> > > available
> > > > > at:
> > > > > > > > https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> > > > > > > > https://dist.apache.org/repos/dist/dev/incubator/airflow/> ,
> > > > public
> > > > > > keys
> > > > > > > > are available at https://dist.apache.org/repos/
> > > > > dist/release/incubator/
> > > > > > > > airflow/ <
> > https://dist.apache.org/repos/dist/release/incubator/
> > > > > > airflow/>
> > > > > > > > . It is tagged with a local version “apache.incubating” so it
> > > > allows
> > > > > > > > upgrading from earlier releases.
> > > > > > > >
> > > > > > > > Two issues have been fixed since release candidate 2:
> > > > > > > >
> > > > > > > > * trigger_dag could create dags with fractional seconds, not
> > > > > supported
> > > > > > by
> > > > > > > > logging and UI at the moment
> > > > > > > > * local api client trigger_dag had hardcoded execution of
> None
> > > > > > > >
> > > > > > > > Known issue:
> > > > > > > > * Airflow on kubernetes and num_runs -1 (default) can expose
> > > import
> > > > > > > issues.
> > > > > > > >
> > > > > > > > I have extensively discussed this with Alex (reporter) and we
> > > > > consider
> > > > > > > > this a known issue with a workaround available as we are
> unable
> > > to
> > > > > > > > replicate this in a different environment. UPDATING.md has
> been
> > > > > updated
> > > > > > > > with the work around.
> > > > > > > >
> > > > > > > > As these issues are confined to a very specific area and full
> > > unit
> > > > > > tests
> > > > > > > > were added I would also like to raise a VOTE for releasing
> > 1.8.0
> > > > > based
> > > > > > on
> > > > > > > > release candidate 3, i.e. just renaming release candidate 3
> to
> > > > 1.8.0
> > > > > > > > release.
> > > > > > > >
> > > > > > > > Please respond to this email by:
> > > > > > > >
> > > > > > > > +1,0,-1 with *binding* if you are a PMC member or
> *non-binding*
> > > if
> > > > > you
> > > > > > > are
> > > > > > > > not.
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Bolke
> > > > > > > >
> > > > > > > > My VOTE: +1 (binding)
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] Release Airflow 1.8.0 based on Airflow 1.8.0rc3

2017-02-11 Thread Boris Tyukin
I am running LocalExecutor and not doing crazy things but use DAG
generation heavily - everything runs fine as before. As I mentioned in
other threads only had a few issues:

1) had to upgrade MySQL which was a PAIN. Cloudera CDH is running old
version of MySQL which was compatible with 1.7.1 but not compatible now
with 1.8 because of fractional seconds support PR.

2) when you install airflow, there are two new example DAGs
(last_task_only) which are going back very far in the past and scheduled to
run every hour - a bunch of dags triggered on the first start of scheduler
and hosed my CPU

Everything else was fine and I LOVE lots of small UI changes, which reduced
a lot my use of cli.

Thanks again for the amazing work and an awesome project!


On Sat, Feb 11, 2017 at 9:17 AM, Jeremiah Lowin  wrote:

> I was able to deploy successfully. +1 (binding)
>
> On Fri, Feb 10, 2017 at 7:37 PM Maxime Beauchemin <
> maximebeauche...@gmail.com> wrote:
>
> > +1 (binding)
> >
> > On Fri, Feb 10, 2017 at 3:44 PM, Arthur Wiedmer <
> arthur.wied...@gmail.com>
> > wrote:
> >
> > > +1 (binding)
> > >
> > > On Feb 10, 2017 3:13 PM, "Dan Davydov"  invalid>
> > > wrote:
> > >
> > > > Our staging looks good, all the DAGs there pass.
> > > > +1 (binding)
> > > >
> > > > On Fri, Feb 10, 2017 at 10:21 AM, Chris Riccomini <
> > criccom...@apache.org
> > > >
> > > > wrote:
> > > >
> > > > > Running in all environments. Will vote after the weekend to make
> sure
> > > > > things are working properly, but so far so good.
> > > > >
> > > > > On Fri, Feb 10, 2017 at 6:05 AM, Bolke de Bruin  >
> > > > wrote:
> > > > >
> > > > > > Dear All,
> > > > > >
> > > > > > Let’s try again!
> > > > > >
> > > > > > I have made the THIRD RELEASE CANDIDATE of Airflow 1.8.0
> available
> > > at:
> > > > > > https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> > > > > > https://dist.apache.org/repos/dist/dev/incubator/airflow/> ,
> > public
> > > > keys
> > > > > > are available at https://dist.apache.org/repos/
> > > dist/release/incubator/
> > > > > > airflow/  > > > airflow/>
> > > > > > . It is tagged with a local version “apache.incubating” so it
> > allows
> > > > > > upgrading from earlier releases.
> > > > > >
> > > > > > Two issues have been fixed since release candidate 2:
> > > > > >
> > > > > > * trigger_dag could create dags with fractional seconds, not
> > > supported
> > > > by
> > > > > > logging and UI at the moment
> > > > > > * local api client trigger_dag had hardcoded execution of None
> > > > > >
> > > > > > Known issue:
> > > > > > * Airflow on kubernetes and num_runs -1 (default) can expose
> import
> > > > > issues.
> > > > > >
> > > > > > I have extensively discussed this with Alex (reporter) and we
> > > consider
> > > > > > this a known issue with a workaround available as we are unable
> to
> > > > > > replicate this in a different environment. UPDATING.md has been
> > > updated
> > > > > > with the work around.
> > > > > >
> > > > > > As these issues are confined to a very specific area and full
> unit
> > > > tests
> > > > > > were added I would also like to raise a VOTE for releasing 1.8.0
> > > based
> > > > on
> > > > > > release candidate 3, i.e. just renaming release candidate 3 to
> > 1.8.0
> > > > > > release.
> > > > > >
> > > > > > Please respond to this email by:
> > > > > >
> > > > > > +1,0,-1 with *binding* if you are a PMC member or *non-binding*
> if
> > > you
> > > > > are
> > > > > > not.
> > > > > >
> > > > > > Thanks!
> > > > > > Bolke
> > > > > >
> > > > > > My VOTE: +1 (binding)
> > > > >
> > > >
> > >
> >
>


Re: parsing task instance log files

2017-02-10 Thread Boris Tyukin
Hi Max, I was hoping you would chime in - thanks!

this is exactly why I am doing it - I want to check row counts before I
swap production table partitions with staging ones.

Wow I did not realize that run_cli returns log and I thought I had checked
the source code! this is definitely makes it easier for me to parse and
looks like the best option for me at the moment.

`return_stats=False` in `HiveCliHook.run_cli`  - I like this idea but still
learning Python. Let me see if I can contribute to this! The only issue I
see is that one can run multiple queries at the time and then it would be
hard to figure out the stats. Also some queries might launch more than job
so the question would be which job log to capture (or just the very last
one which will make it much easier).

Great idea about using Presto for counts check - I don't have it but I have
Impala, will give it a try. With impala though, I have to invalidate
metadata for the table to get proper count which sometimes might be a time
consuming thing.

I love the idea about stats collection bot and can see how valuable it can
be - I hope we will see some advances soon!

Oh and I just noticed you are the key contributor to Superset, I need to
check it out asap then :) I hope it supports Impala.




On Fri, Feb 10, 2017 at 1:00 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> A few related thoughts:
>
> * it sucks to have to parse Hive log files, though it seems like there are
> no clean ways around it sometimes. We're doing similar for Superset to get
> an idea of % of progress of the query
> * really the HiveServer2 thrift client should allow to return structured
> stats about the job after its execution, but that's not the case and
> probably never will be...
> * the `HiveCliHook.run_cli` does return the log after it's done (now I'm
> noticing it could yield as it goes instead btw...), but that means you
> don't need the `on_success_callback`, you can do what you need to do in
> your PythonOperator which will simplify things a little
> * knowing that parsing Hive logs is a common necessary pattern, I wouldn't
> be opposed to adding some of the key elements in HiveCliHook. Maybe there's
> a new arg `return_stats=False` in `HiveCliHook.run_cli` that returns a dict
> with info about the query that just ran (row processed, cpu time, mappers /
> reducers, number of phases, ...)
> * if you have Presto, you could issue a quick COUNT(*) query right after
> your job. At Airbnb we have this common subdag pattern that stages the data
> in Hive, runs a set of data quality checks in Presto, and exchanges the
> partition in Hive when it passes the tests
>
> A total side note is we have a stats collection bot at Airbnb that finds
> all the tables/partitions that have changed recently in the metastore
> (every 5 minutes), and issues a single scan Presto query that is
> dynamically generated to get tons of stats for each column (% of null, min,
> max, avg, count distinct, number of characters, ...) and stores results in
> MySQL. This is super useful for capacity planning, debugging, data quality
> checks, anomaly detection, ... I've been talking with Carlos Bueno from
> Lyft yesterday who might be interested in taking this code (and perhaps
> other Airbnb projects), generalizing the code, cleaning it up,  documenting
> it and open sourcing it. Most of these little projects are stack specific
> and only useful to companies that happen to be running on the same stack as
> we are.
>
> Max
>
>
> On Fri, Feb 10, 2017 at 7:43 AM, Laura Lorenz 
> wrote:
>
> > I don't use the HiveCliHook so I'm not sure how it works, but is the only
> > place you can retrieve these counts the logfiles? If you have them at
> time
> > of query in your python callable, you could push them anywhere you like
> > inline at the conclusion of the task. Or, you may prefer to have your
> > PythonOperator `return` some data structure with those counts, which will
> > be stored by default in the airflow metadata database per the XCom system
> > <https://airflow.incubator.apache.org/concepts.html#xcoms>. Then,
> > depending
> > what you want to do with that, you could query those out of the metadata
> > database with the ad-hoc querying or charting UIs right within Airflow,
> or
> > a later task altogether.
> >
> > On Fri, Feb 10, 2017 at 8:58 AM, Boris Tyukin 
> > wrote:
> >
> > > please...?
> > >
> > > On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin 
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am using HiveCliHook called from PythonOperator to run a series of
> > > > queries and want to capture r

Re: parsing task instance log files

2017-02-10 Thread Boris Tyukin
thanks for responding, Laura. I am using XCOMs but my problem that HIVE
query would not return the value I need (map reduce counter) so I thought
to parse hive output and extract mapreduce jobid and then use mapred cli to
get that counter for that job_id.

another option I found is to gather statistics on my refreshed table and
then use describe extended to pull number of rows - this will take more
time because of the first step than just by grabbing mapreduce counter from
hadoop.

does it make any sense?


On Fri, Feb 10, 2017 at 10:43 AM, Laura Lorenz 
wrote:

> I don't use the HiveCliHook so I'm not sure how it works, but is the only
> place you can retrieve these counts the logfiles? If you have them at time
> of query in your python callable, you could push them anywhere you like
> inline at the conclusion of the task. Or, you may prefer to have your
> PythonOperator `return` some data structure with those counts, which will
> be stored by default in the airflow metadata database per the XCom system
> <https://airflow.incubator.apache.org/concepts.html#xcoms>. Then,
> depending
> what you want to do with that, you could query those out of the metadata
> database with the ad-hoc querying or charting UIs right within Airflow, or
> a later task altogether.
>
> On Fri, Feb 10, 2017 at 8:58 AM, Boris Tyukin 
> wrote:
>
> > please...?
> >
> > On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin 
> > wrote:
> >
> > > Hello,
> > >
> > > I am using HiveCliHook called from PythonOperator to run a series of
> > > queries and want to capture record counts for auditing and validation
> > > purposes.
> > >
> > > *I am thinking to use on_success_callback to invoke python function
> that
> > > will read the log file, produced by airflow and then parse it out using
> > > regex. *
> > >
> > > *I am going to use this method from models to get to the file log:*
> > >
> > > *def log_filepath(self): iso = self.execution_date.isoformat() log =
> > > os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
> return
> > (
> > > "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))*
> > > Is this a good strategy or there is an easier way? I wondering if
> someone
> > > did something similar.
> > >
> > > Another challenge is that the same log file contains multiple attempts
> > and
> > > reruns of the same task so I guess I need to parse the file backwards.
> > >
> > > thanks,
> > > Boris
> > >
> >
>


Re: parsing task instance log files

2017-02-10 Thread Boris Tyukin
please...?

On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin  wrote:

> Hello,
>
> I am using HiveCliHook called from PythonOperator to run a series of
> queries and want to capture record counts for auditing and validation
> purposes.
>
> *I am thinking to use on_success_callback to invoke python function that
> will read the log file, produced by airflow and then parse it out using
> regex. *
>
> *I am going to use this method from models to get to the file log:*
>
> *def log_filepath(self): iso = self.execution_date.isoformat() log =
> os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) return (
> "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))*
> Is this a good strategy or there is an easier way? I wondering if someone
> did something similar.
>
> Another challenge is that the same log file contains multiple attempts and
> reruns of the same task so I guess I need to parse the file backwards.
>
> thanks,
> Boris
>


parsing task instance log files

2017-02-09 Thread Boris Tyukin
Hello,

I am using HiveCliHook called from PythonOperator to run a series of
queries and want to capture record counts for auditing and validation
purposes.

*I am thinking to use on_success_callback to invoke python function that
will read the log file, produced by airflow and then parse it out using
regex. *

*I am going to use this method from models to get to the file log:*

*def log_filepath(self): iso = self.execution_date.isoformat() log =
os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) return (
"{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))*
Is this a good strategy or there is an easier way? I wondering if someone
did something similar.

Another challenge is that the same log file contains multiple attempts and
reruns of the same task so I guess I need to parse the file backwards.

thanks,
Boris


Re: Airflow 1.8.0 Release Candidate 1

2017-02-02 Thread Boris Tyukin
LOL awesome!

On Thu, Feb 2, 2017 at 4:00 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> The Apache mailing doesn't support images so here's a link:
>
> http://i.imgur.com/DUkpjZu.png
> ​
>
> On Thu, Feb 2, 2017 at 12:52 PM, Boris Tyukin 
> wrote:
>
> > Bolke, you are our hero! I am sure you put a lot of your time to make it
> > happen
> >
> > On Thu, Feb 2, 2017 at 2:50 PM, Bolke de Bruin 
> wrote:
> >
> > > Hi All,
> > >
> > > I have made the (first) RELEASE CANDIDATE of Airflow 1.8.0 available
> at:
> > > https://dist.apache.org/repos/dist/dev/incubator/airflow/ , public
> keys
> > > are available at https://dist.apache.org/repos/dist/release/incubator/
> > > airflow/ . It is tagged with a local version “apache.incubating” so it
> > > allows upgrading from earlier releases. This should be considered of
> > > release quality, but not yet officially vetted as a release yet.
> > >
> > > Issues fixed:
> > > * Use static nvd3 and d3
> > > * Python 3 incompatibilities
> > > * CLI API trigger dag issue
> > >
> > > As the difference between beta 5 and the release candidate is
> relatively
> > > small I hope to start the VOTE for releasing 1.8.0 quite soon (2
> days?),
> > if
> > > the vote passes also a vote needs to happen at the IPMC mailinglist. As
> > > this is our first Apache release I expect some comments and required
> > > changes and probably a RC 2.
> > >
> > > Furthermore, we now have a “v1-8-stable” branch. This has version
> > > “1.8.0rc1” and will graduate to “1.8.0” when we release. The
> “v1-8-test”
> > > branch now has version “1.8.1alpha0” as version and “master” has
> version
> > > “1.9.0dev0”. Note that “v1-8-stable” is now closed. This means that,
> per
> > > release guidelines, patches accompanied with an ASSIGNED Jira and a
> > > sign-off from a committer. Only then the release manager applies the
> > patch
> > > to stable (In this case that would be me). The release manager then
> > closes
> > > the bug when the patches have landed in the appropriate branches. For
> > more
> > > information please see: https://cwiki.apache.org/
> > > confluence/display/AIRFLOW/Airflow+Release+Planning+and+
> > > Supported+Release+Lifetime <https://cwiki.apache.org/
> > > confluence/display/AIRFLOW/Airflow+Release+Planning+and+
> > > Supported+Release+Lifetime> .
> > >
> > > Any questions or suggestions don’t hesitate to ask!
> > >
> > > Cheers
> > > Bolke
> >
>


Re: Airflow 1.8.0 Release Candidate 1

2017-02-02 Thread Boris Tyukin
Bolke, you are our hero! I am sure you put a lot of your time to make it
happen

On Thu, Feb 2, 2017 at 2:50 PM, Bolke de Bruin  wrote:

> Hi All,
>
> I have made the (first) RELEASE CANDIDATE of Airflow 1.8.0 available at:
> https://dist.apache.org/repos/dist/dev/incubator/airflow/ , public keys
> are available at https://dist.apache.org/repos/dist/release/incubator/
> airflow/ . It is tagged with a local version “apache.incubating” so it
> allows upgrading from earlier releases. This should be considered of
> release quality, but not yet officially vetted as a release yet.
>
> Issues fixed:
> * Use static nvd3 and d3
> * Python 3 incompatibilities
> * CLI API trigger dag issue
>
> As the difference between beta 5 and the release candidate is relatively
> small I hope to start the VOTE for releasing 1.8.0 quite soon (2 days?), if
> the vote passes also a vote needs to happen at the IPMC mailinglist. As
> this is our first Apache release I expect some comments and required
> changes and probably a RC 2.
>
> Furthermore, we now have a “v1-8-stable” branch. This has version
> “1.8.0rc1” and will graduate to “1.8.0” when we release. The “v1-8-test”
> branch now has version “1.8.1alpha0” as version and “master” has version
> “1.9.0dev0”. Note that “v1-8-stable” is now closed. This means that, per
> release guidelines, patches accompanied with an ASSIGNED Jira and a
> sign-off from a committer. Only then the release manager applies the patch
> to stable (In this case that would be me). The release manager then closes
> the bug when the patches have landed in the appropriate branches. For more
> information please see: https://cwiki.apache.org/
> confluence/display/AIRFLOW/Airflow+Release+Planning+and+
> Supported+Release+Lifetime  confluence/display/AIRFLOW/Airflow+Release+Planning+and+
> Supported+Release+Lifetime> .
>
> Any questions or suggestions don’t hesitate to ask!
>
> Cheers
> Bolke


Re: Airflow Meetup @ Paypal (San Jose)

2017-01-31 Thread Boris Tyukin
yay awesome! thanks *Jayesh!*

On Tue, Jan 31, 2017 at 12:19 PM, Jayesh Senjaliya 
wrote:

> HI All,
>
> √ I have approval from Paypal to host Airflow meetup.  How about March 14th
> ? Please vote.
>
> √ we will have food and drinks.
> Please let me know if anybody has any special request, I will try to
> accommodate :)
>
> For presentations:
>  1) Disk recommission using airflow with overall automation of "Hadoop Node
> and Disk Remediation". - Jayesh Senjaliya ( Paypal )
>  2) Predictive Analytics with Airflow and PySpark - ( Russell Jurney )
>
>
> Please send request to present to this email thread if you are interested
> in presenting.
>
> Thanks
> Jayesh
>
>
>
>
> On Thu, Jan 26, 2017 at 4:08 PM, Russell Jurney 
> wrote:
>
> > Cool!
> >
> > On Wed, Jan 25, 2017 at 11:23 PM Jayesh Senjaliya 
> > wrote:
> >
> > > Hi Russell,
> > >
> > > yes, I will be presenting from Paypal side.
> > > Once i have official approval from Paypal, I will sent out email.
> > > I am basically going by the steps what Siddharth outlined earlier in
> the
> > > thread.
> > >
> > > Thanks
> > > Jayesh
> > >
> > > On Wed, Jan 25, 2017 at 7:50 PM, Russell Jurney <
> > russell.jur...@gmail.com>
> > > wrote:
> > >
> > > > Is someone from Paypal likely to speak? Should we start a new thread
> > > with a
> > > > call for another speaker? There was mention of three being needed.
> > > >
> > > > On Wed, Jan 25, 2017 at 5:33 PM Jayesh Senjaliya <
> jhsonl...@gmail.com>
> > > > wrote:
> > > >
> > > > > Yes I am waiting for response from facilities about it, most likely
> > by
> > > > > early next week.
> > > > >
> > > > > Thanks
> > > > > Jayesh
> > > > >
> > > > > On Wed, Jan 25, 2017 at 4:52 PM, Russell Jurney <
> > > > russell.jur...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Boris, would you be able to attend an evening meetup on the
> nights
> > of
> > > > > 3/15
> > > > > > or 3/16? I think attendance would be better on one of those days,
> > as
> > > > many
> > > > > > people don't attend the tutorial days.
> > > > > >
> > > > > > Paypal sounds awesome as a venue. Would they handle food and
> drink
> > as
> > > > > well?
> > > > > >
> > > > > > On Wed, Jan 25, 2017 at 11:28 AM, Boris Tyukin <
> > > bo...@boristyukin.com>
> > > > > > wrote:
> > > > > >
> > > > > > > it would be great!
> > > > > > >
> > > > > > > On Wed, Jan 25, 2017 at 1:26 PM, siddharth anand <
> > > san...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Paypal is quite close (11 minute drive on local streets per
> > > google
> > > > > > Maps :
> > > > > > > > https://goo.gl/maps/otUpve9StxJ2) to the Strata venue, so it
> > > would
> > > > > > make
> > > > > > > > sense to hold the meet-up at Paypal during Strata week.
> > > > > > > >
> > > > > > > > -s
> > > > > > > >
> > > > > > > > On Wed, Jan 25, 2017 at 5:48 AM, Boris Tyukin <
> > > > bo...@boristyukin.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > any way to schedule it during Strata week? would love to
> > attend
> > > > one
> > > > > > of
> > > > > > > > > airflow meetups but I am in Florida. 03/13 or 03/14 would
> > work
> > > > the
> > > > > > best
> > > > > > > > > because first two days of Strata are training days and not
> > very
> > > > > busy
> > > > > > > > >
> > > > > > > > > On Tue, Jan 24, 2017 at 10:33 PM, Russell Jurney <
> > > > > > > > russell.jur...@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > &

Re: Airflow Meetup in NYC @ Blue Apron

2017-01-30 Thread Boris Tyukin
i hope you guys can share presentation slides at least for all of us who
are not in NYC

On Mon, Jan 30, 2017 at 7:33 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> I'd love to watch, is there any way you guys can livecast or share a video
> after the event?
>
> Looking forward to it!
>
> Max
>
> On Mon, Jan 30, 2017 at 1:56 PM, Joseph Napolitano <
> joseph.napolit...@blueapron.com.invalid> wrote:
>
> > Hi All!
> >
> > We are excited to host an Airflow Meetup in NYC.  We will have a guest
> > speaker from Spotify!
> >
> > The Meetup is in 2 days, on Feb 1st @ 6:30pm at Blue Apron's
> headquarters.
> >
> > In Summary:
> > Date: Feb 1st
> > Time 6:30 - 9pm EST
> > Location: 40 W 23rd St. New York, NY 10010
> > https://www.google.com/maps/place/40+W+23rd+St,+New+York,+NY
> > +10010/@40.7420885,-73.9938457,17z/data=!3m1!4b1!4m5!3m4!
> > 1s0x89c259a46471d2a1:0xc2517d92b1b68bba!8m2!3d40.
> > 7420845!4d-73.9916517?hl=en
> >
> > Schedule:
> > 6:30 - 7:15 Meet and greet
> > 7:15 - ? Presentations from Blue Apron and Spotify
> >
> > It's not too late to signup for a presentation.  We will stick around as
> > late as 9pm.
> >
> > We don't have an official Meetup page, so please sign up here :)
> > The signup sheet is available here:
> > https://docs.google.com/spreadsheets/d/1WmfgZeExSVdLf-u1uh3I
> > leeHy8QTwaJ4BkkSkVM-X1E/edit?usp=sharing
> >
> > Feel free to share the signup sheet with other parties.
> >
> > As mentioned, we're on the 5th floor.  You need to check in with security
> > in the building lobby, and again when you reach the fifth floor to get a
> > name tag.
> >
> > Thanks, and looking forward to meeting everyone!
> >
> > Cheers,
> > Joe Nap
> >
> >
> >
> > On Fri, Jan 20, 2017 at 1:37 PM, Joseph Napolitano <
> > joseph.napolit...@blueapron.com> wrote:
> >
> > > Hi all!
> > >
> > > I want to officially announce a Meetup for Airflow in NYC!  I'm looking
> > > forward to meeting other community members to share knowledge and
> > network.
> > >
> > > We may create an official Meetup page, but in the meantime please
> signup
> > > here:
> > > https://docs.google.com/spreadsheets/d/1WmfgZeExSVdLf-u1uh3I
> > > leeHy8QTwaJ4BkkSkVM-X1E/edit?usp=sharing
> > >
> > > I have a confirmed date of February 1st @ 6:30 at Blue Apron's
> > > headquarters.
> > >
> > > In Summary:
> > > Date: Feb 1st
> > > Time 6:30 - 9pm EST
> > > Location: 40 W 23rd St. New York, NY 10010
> > > https://www.google.com/maps/place/40+W+23rd+St,+New+York,+NY
> > > +10010/@40.7420885,-73.9938457,17z/data=!3m1!4b1!4m5!3m4!
> > > 1s0x89c259a46471d2a1:0xc2517d92b1b68bba!8m2!3d40.7420845!4d-
> > > 73.9916517?hl=en
> > >
> > > We're on the 5th floor.  You need to check in with security in the
> > > building lobby, and again when you reach the fifth floor to get a name
> > tag.
> > >
> > > Food & drink will be provided!
> > >
> > > Let me know if you would like to present.  We'd love to hear about your
> > > architecture and war stories.  We will have a large projector and PA
> > system
> > > setup.
> > >
> > > Sorry about the short notice, but it took a while to get approved over
> > the
> > > holidays and new year.  If we can't generate enough interest we can
> > > certainly push it back a month.
> > >
> > > Thanks, and Bon Appétite!
> > >
> > > --
> > > *Joe Napolitano *| Sr. Data Engineer
> > > www.blueapron.com | 5 Crosby Street, New York, NY 10013
> > >
> >
> >
> >
> > --
> > *Joe Napolitano *| Sr. Data Engineer
> > www.blueapron.com | 5 Crosby Street, New York, NY 10013
> >
>


Re: Airflow 1.8.0 BETA 5

2017-01-29 Thread Boris Tyukin
I am not sure if it is my config or something, but looks like after the
upgrade and start of scheduler, airflow would totally hose CPU. The reason
is two new examples that start running right away - latest only and latest
with trigger. Once I pause them, CPU goes back to idle. Is this because now
dags are not paused by default like it was before?

As I mentioned before, I also had to upgrade mysql to 5.7 - if someone
needs a step by step instruction, make sure to follow all steps precisely
here for in-place upgrade or you will have heck of the time (like me).
https://dev.mysql.com/doc/refman/5.7/en/upgrading.html#upgrade-procedure-inplace

BTW official Oracle repository for Oracle Linux only has MySql 5.6 - for
5.7 you have to use MySql community repo.

On Sat, Jan 28, 2017 at 10:07 AM, Bolke de Bruin  wrote:

> Hi All,
>
> I have made the FIFTH beta of Airflow 1.8.0 available at:
> https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> https://dist.apache.org/repos/dist/dev/incubator/airflow/> , public keys
> are available at https://dist.apache.org/repos/dist/release/incubator/
> airflow/ 
> . It is tagged with a local version “apache.incubating” so it allows
> upgrading from earlier releases.
>
> Issues fixed:
> * Parsing errors not showing up in UI fixing a regression**
> * Scheduler would terminate immediately if no dag files present
>
> ** As this touches the scheduler logic I though it warranted another beta.
>
> This should be the last beta in my opinion and we can prepare changelog,
> upgrade notes and release notes for the RC (Feb 2).
>
> Cheers
> Bolke


Re: Airflow 1.8.0 BETA 4

2017-01-27 Thread Boris Tyukin
yeah i hear you but you know how these big enterprise companies behind on
everything. official epel repo only has 5.1.73, same with Oracle linux
repo. We have to use normally only officially approved repos. Looks like
airflow 1.7.1 worked fine with 5.1.73 and now it does not.

Not a big deal but I still do not see where it was mentioned in
documentation - pip installed 1.8.0 without any complains as well.

On Fri, Jan 27, 2017 at 3:41 PM, Bolke de Bruin  wrote:

> BTW:
>
> * MySQL 5.6.10 (5.6.4 was a beta I just figured out - released in 2011)
> was released in 2013
> * MySQL 5.1.73 was released on 2013-12-04
>
> We are really not ahead of the pack with our requirements.
>
> Bolke
>
> > On 27 Jan 2017, at 21:32, Boris Tyukin  wrote:
> >
> > thanks for confirmation Bolke
> >
> > On Fri, Jan 27, 2017 at 2:45 PM, Bolke de Bruin 
> wrote:
> >
> >> 5.6.4 is the minimum. Yes it was required. updating.md was required.
> >> 5.6.4 is already really really old. Most distro's replaced it with
> mariadb
> >> which is compatible.
> >>
> >> Bolke
> >>
> >> Sent from my iPhone
> >>
> >>> On 27 Jan 2017, at 20:19, Boris Tyukin  wrote:
> >>>
> >>> sorry mysql version i have is mysql  Ver 14.14 Distrib 5.1.73
> >>>
> >>> I see that DATETIME(6) is only supported on  MySQL 5.7 and later.
> >>>
> >>> Was it really necessary to do use DATETIME(6) not just DATETIME?
> >>>
> >>> Probably should be mention in change doc that this requires an upgrade
> of
> >>> mysql which in my opinion is not cool. epel repo only has 5.1.73 as
> >> latest
> >>> version and CDH distro includes 5.1.73 as well.
> >>>
> >>> Some companies might not allow an upgrade of mysql unless it is in one
> of
> >>> the official linux repos.
> >>>
> >>>
> >>>> On Fri, Jan 27, 2017 at 2:14 PM, Boris Tyukin 
> >> wrote:
> >>>>
> >>>> was requirement to MySQL database version changes with 1.8.0? I run
> >> mysql
> >>>> 14.14 and it worked fine with 1.7.
> >>>>
> >>>> I just installed 1.8.4 beta 4 and got this error below when i ran
> >> airflow
> >>>> upgradedb command
> >>>>
> >>>> File "/usr/local/lib/python2.7/site-packages/MySQLdb/cursors.py",
> line
> >>>> 205, in execute
> >>>>   self.errorhandler(self, exc, value)
> >>>> File "/usr/local/lib/python2.7/site-packages/MySQLdb/connections.py",
> >>>> line 36, in defaulterrorhandler
> >>>>   raise errorclass, errorvalue
> >>>> sqlalchemy.exc.ProgrammingError: (_mysql_exceptions.ProgrammingError)
> >>>> (1064, "You have an error in your SQL syntax; check the manual that
> >>>> corresponds to your MySQL server version for the right syntax to use
> >> near
> >>>> '(6) NULL' at line 1") [SQL: u'ALTER TABLE dag MODIFY
> last_scheduler_run
> >>>> DATETIME(6) NULL']
> >>>>
> >>>>
> >>>>
> >>>>> On Thu, Jan 26, 2017 at 1:49 PM, Bolke de Bruin 
> >> wrote:
> >>>>>
> >>>>> Hi All,
> >>>>>
> >>>>> I have made the FOURTH beta of Airflow 1.8.0 available at:
> >>>>> https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> >>>>> https://dist.apache.org/repos/dist/dev/incubator/airflow/> , public
> >> keys
> >>>>> are available at https://dist.apache.org/repos/
> >>>>> dist/release/incubator/airflow/ <https://dist.apache.org/repos
> >>>>> /dist/release/incubator/airflow/> . It is tagged with a local
> version
> >>>>> “apache.incubating” so it allows upgrading from earlier releases.
> This
> >> beta
> >>>>> is available for testing in a more production like setting
> (acceptance
> >>>>> environment?).
> >>>>>
> >>>>> I would like to encourage everyone  to try it out, to report back any
> >>>>> issues so we get to a rock solid release of 1.8.0. When reporting
> >> issues a
> >>>>> test case or even a fix is highly appreciated.
> >>>>>
> >>>>> Issues fixed:
> >>>>> * Incorrect Alembic reference due to revert (initdb/upgradedb/resetdb
> >>>>> should work again)
> >>>>> * Py3 incompatibility in base_taskrunner.
> >>>>>
> >>>>> Under investigation:
> >>>>> * DAG marked success, with half of the Tasks never scheduled (Alex)
> >>>>>
> >>>>> Kind regards,
> >>>>> Bolke
> >>>>
> >>>>
> >>>>
> >>
>
>


Re: Airflow 1.8.0 BETA 4

2017-01-27 Thread Boris Tyukin
thanks for confirmation Bolke

On Fri, Jan 27, 2017 at 2:45 PM, Bolke de Bruin  wrote:

> 5.6.4 is the minimum. Yes it was required. updating.md was required.
> 5.6.4 is already really really old. Most distro's replaced it with mariadb
> which is compatible.
>
> Bolke
>
> Sent from my iPhone
>
> > On 27 Jan 2017, at 20:19, Boris Tyukin  wrote:
> >
> > sorry mysql version i have is mysql  Ver 14.14 Distrib 5.1.73
> >
> > I see that DATETIME(6) is only supported on  MySQL 5.7 and later.
> >
> > Was it really necessary to do use DATETIME(6) not just DATETIME?
> >
> > Probably should be mention in change doc that this requires an upgrade of
> > mysql which in my opinion is not cool. epel repo only has 5.1.73 as
> latest
> > version and CDH distro includes 5.1.73 as well.
> >
> > Some companies might not allow an upgrade of mysql unless it is in one of
> > the official linux repos.
> >
> >
> >> On Fri, Jan 27, 2017 at 2:14 PM, Boris Tyukin 
> wrote:
> >>
> >> was requirement to MySQL database version changes with 1.8.0? I run
> mysql
> >> 14.14 and it worked fine with 1.7.
> >>
> >> I just installed 1.8.4 beta 4 and got this error below when i ran
> airflow
> >> upgradedb command
> >>
> >>  File "/usr/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line
> >> 205, in execute
> >>self.errorhandler(self, exc, value)
> >>  File "/usr/local/lib/python2.7/site-packages/MySQLdb/connections.py",
> >> line 36, in defaulterrorhandler
> >>raise errorclass, errorvalue
> >> sqlalchemy.exc.ProgrammingError: (_mysql_exceptions.ProgrammingError)
> >> (1064, "You have an error in your SQL syntax; check the manual that
> >> corresponds to your MySQL server version for the right syntax to use
> near
> >> '(6) NULL' at line 1") [SQL: u'ALTER TABLE dag MODIFY last_scheduler_run
> >> DATETIME(6) NULL']
> >>
> >>
> >>
> >>> On Thu, Jan 26, 2017 at 1:49 PM, Bolke de Bruin 
> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> I have made the FOURTH beta of Airflow 1.8.0 available at:
> >>> https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> >>> https://dist.apache.org/repos/dist/dev/incubator/airflow/> , public
> keys
> >>> are available at https://dist.apache.org/repos/
> >>> dist/release/incubator/airflow/ <https://dist.apache.org/repos
> >>> /dist/release/incubator/airflow/> . It is tagged with a local version
> >>> “apache.incubating” so it allows upgrading from earlier releases. This
> beta
> >>> is available for testing in a more production like setting (acceptance
> >>> environment?).
> >>>
> >>> I would like to encourage everyone  to try it out, to report back any
> >>> issues so we get to a rock solid release of 1.8.0. When reporting
> issues a
> >>> test case or even a fix is highly appreciated.
> >>>
> >>> Issues fixed:
> >>> * Incorrect Alembic reference due to revert (initdb/upgradedb/resetdb
> >>> should work again)
> >>> * Py3 incompatibility in base_taskrunner.
> >>>
> >>> Under investigation:
> >>> * DAG marked success, with half of the Tasks never scheduled (Alex)
> >>>
> >>> Kind regards,
> >>> Bolke
> >>
> >>
> >>
>


Re: Airflow 1.8.0 BETA 4

2017-01-27 Thread Boris Tyukin
sorry mysql version i have is mysql  Ver 14.14 Distrib 5.1.73

I see that DATETIME(6) is only supported on  MySQL 5.7 and later.

Was it really necessary to do use DATETIME(6) not just DATETIME?

Probably should be mention in change doc that this requires an upgrade of
mysql which in my opinion is not cool. epel repo only has 5.1.73 as latest
version and CDH distro includes 5.1.73 as well.

Some companies might not allow an upgrade of mysql unless it is in one of
the official linux repos.


On Fri, Jan 27, 2017 at 2:14 PM, Boris Tyukin  wrote:

> was requirement to MySQL database version changes with 1.8.0? I run mysql
> 14.14 and it worked fine with 1.7.
>
> I just installed 1.8.4 beta 4 and got this error below when i ran airflow
> upgradedb command
>
>   File "/usr/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line
> 205, in execute
> self.errorhandler(self, exc, value)
>   File "/usr/local/lib/python2.7/site-packages/MySQLdb/connections.py",
> line 36, in defaulterrorhandler
> raise errorclass, errorvalue
> sqlalchemy.exc.ProgrammingError: (_mysql_exceptions.ProgrammingError)
> (1064, "You have an error in your SQL syntax; check the manual that
> corresponds to your MySQL server version for the right syntax to use near
> '(6) NULL' at line 1") [SQL: u'ALTER TABLE dag MODIFY last_scheduler_run
> DATETIME(6) NULL']
>
>
>
> On Thu, Jan 26, 2017 at 1:49 PM, Bolke de Bruin  wrote:
>
>> Hi All,
>>
>> I have made the FOURTH beta of Airflow 1.8.0 available at:
>> https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
>> https://dist.apache.org/repos/dist/dev/incubator/airflow/> , public keys
>> are available at https://dist.apache.org/repos/
>> dist/release/incubator/airflow/ <https://dist.apache.org/repos
>> /dist/release/incubator/airflow/> . It is tagged with a local version
>> “apache.incubating” so it allows upgrading from earlier releases. This beta
>> is available for testing in a more production like setting (acceptance
>> environment?).
>>
>> I would like to encourage everyone  to try it out, to report back any
>> issues so we get to a rock solid release of 1.8.0. When reporting issues a
>> test case or even a fix is highly appreciated.
>>
>> Issues fixed:
>> * Incorrect Alembic reference due to revert (initdb/upgradedb/resetdb
>> should work again)
>> * Py3 incompatibility in base_taskrunner.
>>
>> Under investigation:
>> * DAG marked success, with half of the Tasks never scheduled (Alex)
>>
>> Kind regards,
>> Bolke
>
>
>


Re: Airflow 1.8.0 BETA 4

2017-01-27 Thread Boris Tyukin
was requirement to MySQL database version changes with 1.8.0? I run mysql
14.14 and it worked fine with 1.7.

I just installed 1.8.4 beta 4 and got this error below when i ran airflow
upgradedb command

  File "/usr/local/lib/python2.7/site-packages/MySQLdb/cursors.py", line
205, in execute
self.errorhandler(self, exc, value)
  File "/usr/local/lib/python2.7/site-packages/MySQLdb/connections.py",
line 36, in defaulterrorhandler
raise errorclass, errorvalue
sqlalchemy.exc.ProgrammingError: (_mysql_exceptions.ProgrammingError)
(1064, "You have an error in your SQL syntax; check the manual that
corresponds to your MySQL server version for the right syntax to use near
'(6) NULL' at line 1") [SQL: u'ALTER TABLE dag MODIFY last_scheduler_run
DATETIME(6) NULL']



On Thu, Jan 26, 2017 at 1:49 PM, Bolke de Bruin  wrote:

> Hi All,
>
> I have made the FOURTH beta of Airflow 1.8.0 available at:
> https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> https://dist.apache.org/repos/dist/dev/incubator/airflow/> , public keys
> are available at https://dist.apache.org/repos/dist/release/incubator/
> airflow/ 
> . It is tagged with a local version “apache.incubating” so it allows
> upgrading from earlier releases. This beta is available for testing in a
> more production like setting (acceptance environment?).
>
> I would like to encourage everyone  to try it out, to report back any
> issues so we get to a rock solid release of 1.8.0. When reporting issues a
> test case or even a fix is highly appreciated.
>
> Issues fixed:
> * Incorrect Alembic reference due to revert (initdb/upgradedb/resetdb
> should work again)
> * Py3 incompatibility in base_taskrunner.
>
> Under investigation:
> * DAG marked success, with half of the Tasks never scheduled (Alex)
>
> Kind regards,
> Bolke


Re: how to capture sqoop mapreduce counters

2017-01-26 Thread Boris Tyukin
thanks Jayesh, replied via github

On Thu, Jan 26, 2017 at 7:29 PM, Jayesh Senjaliya 
wrote:

> Hi Boris,
>
> looks like bash_operator has same bug that ssh_execute_operator has, which
> is it does not capture multi line output
>
> I have put up the fix for bash_oeprator as well :
> https://github.com/apache/incubator-airflow/pull/2026
>
> please take a look.
>
> Thanks
> Jayesh
>
>
>
>
>
>
> On Wed, Jan 25, 2017 at 1:25 PM, Boris Tyukin 
> wrote:
>
> > I figured that luckily for me, the number of rows loaded by sqoop is
> > reported to stdout as the very last line. So I just used BashOperator and
> > set xcom_push=True. Then I did something like that:
> >
> > # Log row_count ingested
> > try:
> > row_count = int(re.search('Retrieved (\d+) records',
> >   kwargs['ti'].xcom_pull(task_
> > ids='t_sqoop_from_cerner')).group(1))
> > write_job_audit(get_job_audit_id_from_context(kwargs),
> > "rows_ingested_sqoop", row_count)
> > except ValueError:
> > write_job_audit(get_job_audit_id_from_context(kwargs),
> > "rows_ingested_sqoop", -1)
> >
> > The alternative I was considering is to get mapreduce jobid and then use
> > mapred command to get the needed counter - here is an example:
> >
> > mapred job -counter job_1484574566480_0002
> > org.apache.hadoop.mapreduce.TaskCounter
> > MAP_OUTPUT_RECORDS
> >
> > But I could not figure out an easy way to get job_id from BashOperator /
> > sqoop output. I guess I could create my own operator that would capture
> all
> > stdout lines not only the last one.
> >
> > On Tue, Jan 24, 2017 at 9:07 AM, Boris Tyukin 
> > wrote:
> >
> > > Hello all,
> > >
> > > is there a way to capture sqoop counters either using bash or sqoop
> > > operator? Specifically I need to pull a total number of rows loaded.
> > >
> > > By looking at bash operator, I think there is an option to push the
> last
> > > line of output to xcom but sqoop and mapreduce output is a bit more
> > > complicated.
> > >
> > > Thanks!
> > >
> >
>


Re: how to capture sqoop mapreduce counters

2017-01-25 Thread Boris Tyukin
I figured that luckily for me, the number of rows loaded by sqoop is
reported to stdout as the very last line. So I just used BashOperator and
set xcom_push=True. Then I did something like that:

# Log row_count ingested
try:
row_count = int(re.search('Retrieved (\d+) records',
  kwargs['ti'].xcom_pull(task_
ids='t_sqoop_from_cerner')).group(1))
write_job_audit(get_job_audit_id_from_context(kwargs),
"rows_ingested_sqoop", row_count)
except ValueError:
write_job_audit(get_job_audit_id_from_context(kwargs),
"rows_ingested_sqoop", -1)

The alternative I was considering is to get mapreduce jobid and then use
mapred command to get the needed counter - here is an example:

mapred job -counter job_1484574566480_0002
org.apache.hadoop.mapreduce.TaskCounter
MAP_OUTPUT_RECORDS

But I could not figure out an easy way to get job_id from BashOperator /
sqoop output. I guess I could create my own operator that would capture all
stdout lines not only the last one.

On Tue, Jan 24, 2017 at 9:07 AM, Boris Tyukin  wrote:

> Hello all,
>
> is there a way to capture sqoop counters either using bash or sqoop
> operator? Specifically I need to pull a total number of rows loaded.
>
> By looking at bash operator, I think there is an option to push the last
> line of output to xcom but sqoop and mapreduce output is a bit more
> complicated.
>
> Thanks!
>


Re: Airflow Meetup @ Paypal (San Jose)

2017-01-25 Thread Boris Tyukin
it would be great!

On Wed, Jan 25, 2017 at 1:26 PM, siddharth anand  wrote:

> Paypal is quite close (11 minute drive on local streets per google Maps :
> https://goo.gl/maps/otUpve9StxJ2) to the Strata venue, so it would make
> sense to hold the meet-up at Paypal during Strata week.
>
> -s
>
> On Wed, Jan 25, 2017 at 5:48 AM, Boris Tyukin 
> wrote:
>
> > any way to schedule it during Strata week? would love to attend one of
> > airflow meetups but I am in Florida. 03/13 or 03/14 would work the best
> > because first two days of Strata are training days and not very busy
> >
> > On Tue, Jan 24, 2017 at 10:33 PM, Russell Jurney <
> russell.jur...@gmail.com
> > >
> > wrote:
> >
> > > Unfortunately, Strata has no room for us :( Paypal sounds like a great
> > > option.
> > >
> > > Jayesh, sounds like you're driving? :)
> > >
> > > On Tue, Jan 24, 2017 at 12:04 PM, siddharth anand 
> > > wrote:
> > >
> > > > Russell,
> > > > Let us know what you learn about Strata.
> > > >
> > > > Even if Strata offers up rooms to communities for free (based on
> > > > information such as community size, etc...), I'm doubtful they would
> > > cover
> > > > food and drinks. That cost would need to be carried by a sponsor --
> > i.e.
> > > > you'd need to find a sponsor for it. We considered something similar
> > for
> > > > QCon -- however, our venue costs were fairly high so the catering
> cost
> > > for
> > > > most budding communities and their sponsors were a turn-off. Given
> that
> > > > Strata is a large conference hosted at a largish (i.e. expensive)
> > hotel,
> > > > I'd expect some of the same cost issues, unless Strata co-sponsored
> it.
> > > >
> > > > I'm all for something at Strata, but just wanted to share my $0.02.
> > Since
> > > > this topic came up on Jayesh's thread, I'd like to time-bound it. If
> > you
> > > > don't hear back by say Friday with specifics from Strata, I'd say
> that
> > > > Jayesh's wins by first-mover privilege.
> > > >
> > > > Jayesh,
> > > > If we don't hear from Strata by Friday, I'd say we continue with your
> > > idea.
> > > > I've already promoted your user to Event Organizer on
> > > > https://www.meetup.com/Bay-Area-Apache-Airflow-Incubating-Meetup/
> > > >
> > > > You'd need to follow the steps below:
> > > >
> > > >- Get approval from Paypal to host it
> > > >- Ping this list for 2 more speakers - I'd imagine someone from
> > PayPal
> > > >will also speak about PayPal's use of Airflow.
> > > >- Create the meet-up event (ideally once you have all 3 speakers)
> > > >- Update this list with a link to this event (and ping me if I
> don't
> > > see
> > > >it) -- I'll then promote it on our twitter channel, etc...
> > > >
> > > > -s
> > > >
> > > > On Mon, Jan 23, 2017 at 4:42 PM, Jayesh Senjaliya <
> jhsonl...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > I am actually up for both, Paypal can host after Strata.
> > > > >
> > > > > waiting for community to comment as well.
> > > > >
> > > > > Thanks
> > > > > Jayesh
> > > > >
> > > > >
> > > > > On Mon, Jan 23, 2017 at 3:45 PM, Russell Jurney <
> > > > russell.jur...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > I reached out and am awaiting to hear if they have space. They
> did
> > > say
> > > > > that
> > > > > > attendees of meetups in the evening do NOT need to have a Strata
> > > pass.
> > > > > >
> > > > > > I'm new here, so I don't want to hijack your meetup. If you guys
> > want
> > > > > > Paypal, lets have Paypal host. I'm sure it will be great either
> > way.
> > > > > >
> > > > > > On Fri, Jan 20, 2017 at 1:10 PM, Russell Jurney <
> > > > > russell.jur...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > I think if we hold it in the evening, there is no requirement
> to

Re: Article: The Rise of the Data Engineer

2017-01-25 Thread Boris Tyukin
Max, really really nice post and I like your style of writing - please
continue sharing your experience and inspire many of us working in more
traditional environments ;) I shared your post with our leadership and
hopefully we will have data engineers soon on our team! As far as UI vs.
coding, I am not sure I fully agree as we look at software development
history, we will see times when programming was the only answer and
required hardcore professionals like you but then commercial applications
which were very visual and lowered requirements to the skillset need.
Informatica, SSIS and others became hugely popular and many people swear
they save time if you know how to use them. I am pretty sure we will see
new tools in Big Data arena as well (AtScale is one example) that make
things easier for less skilled developers and users.

It is also good timing for me as my company evaluating Informatica Big Data
Management addon (which competes with Talend Big Data) - I am not sold yet
on why we would need it if we can do much more with Python and Spark and
Hive. But the key point Informatica folks make is to lower the requirements
for the skills of developers and to leverage existing skills with
Informatica and SQL. I think this is important because this is exactly why
SQL is still a huge player in Big Data world - people love SQL, they can do
a lot with SQL and they want to use their SQL experience they've built over
their carrier.

The dimensional modelling question you raised is also very interesting but
very arguable. I was thinking about it before and still did not come to
believe that flat tables is a way to go. You said it yourself that there is
still a place for highly accurate (certified) enterprise wide warehouse and
one still need to spend a lot of time thinking about use cases and design
to support them. I am not sure I like the abundance of de-normilized tables
in Big Data world but I do see your point about SCDs and all the pain to
maintain a traditional star schema DW. But dimensional modelling is not
really about maintenance or making life easier for ETL developers - IMHO it
is about structuring data to simply business and data analytics. It is
about rigorous process to conform data from multiple source systems. It is
about data quality and trust. Finally it is about better performing DW (by
nature of RDBMS which are very good at joining tables by foreign keys) -
the last benefit though is not relevant in Hadoop since we can reprocess or
query data more efficiently.

Gerard, why would you do that? if you have the skills already with SQL
Server and your DWH is tiny (I run 500Gb DWH in SQL Server on a weak
machine), you should be fine with SQL Server. The only issue you cannot
support fast BI queries. But you have enterprise license, you can easily
dump your table in tabular in memory cube and most of your queries will be
running in under 2 seconds. Vertica is cool but the learning curve is
pretty steep and it really shines on big de-normalized tables as join
performance might is not that good. I work with a large healthcare vendor
and they have Tb size tables in their Vertica db - most of them are flatten
out but they still have dimensions and facts, just less then you would
normally have with traditional star schema design.



On Wed, Jan 25, 2017 at 5:57 AM, Gerard Toonstra 
wrote:

> You mentioned Vertica and Parquet. Is it recommended to use these newer
> tools even when the DWH is not BigData
> size (about 150G in size) ?
>
> So there are a couple of good benefits, but are there any downsides and
> disadvantages you have to take into account
> comparing Vertica vs. SQL Server for example?
>
> If you really recommend Vertica over SQL Server, I'm looking at doing a PoC
> here to see where it goes...
>
> Rgds,
>
> Gerard
>
>
> On Wed, Jan 25, 2017 at 12:39 AM, Rob Goretsky 
> wrote:
>
> > Maxime,
> > Just wanted to thank you for writing this article - much like the
> original
> > articles by Jeff Hammerbacher and DJ Patil coining the term "Data
> > Scientist", I feel this article stands as a great explanation of what the
> > title of "Data Engineer" means today..  As someone who has been working
> in
> > this role before the title existed, many of the points here rang true
> about
> > how the technology and tools have evolved..
> >
> > I started my career working with graphical ETL tools (Informatica) and
> > could never shake the feeling that I could get a lot more done, with a
> more
> > maintainable set of processes, if I could just write reusable functions
> in
> > any programming language and then keep them in a shared library.
> Instead,
> > what the GUI tools forced upon us were massive Wiki documents laying out
> > 'the 9 steps you need to follow perfectly in order to build a proper
> > Informatica workflow' , that developers would painfully need to follow
> > along with, rather than being able to encapsulate the things that didn't
> > change in one central 'function' to pass in parameters

Re: Airflow Meetup @ Paypal (San Jose)

2017-01-25 Thread Boris Tyukin
any way to schedule it during Strata week? would love to attend one of
airflow meetups but I am in Florida. 03/13 or 03/14 would work the best
because first two days of Strata are training days and not very busy

On Tue, Jan 24, 2017 at 10:33 PM, Russell Jurney 
wrote:

> Unfortunately, Strata has no room for us :( Paypal sounds like a great
> option.
>
> Jayesh, sounds like you're driving? :)
>
> On Tue, Jan 24, 2017 at 12:04 PM, siddharth anand 
> wrote:
>
> > Russell,
> > Let us know what you learn about Strata.
> >
> > Even if Strata offers up rooms to communities for free (based on
> > information such as community size, etc...), I'm doubtful they would
> cover
> > food and drinks. That cost would need to be carried by a sponsor -- i.e.
> > you'd need to find a sponsor for it. We considered something similar for
> > QCon -- however, our venue costs were fairly high so the catering cost
> for
> > most budding communities and their sponsors were a turn-off. Given that
> > Strata is a large conference hosted at a largish (i.e. expensive) hotel,
> > I'd expect some of the same cost issues, unless Strata co-sponsored it.
> >
> > I'm all for something at Strata, but just wanted to share my $0.02. Since
> > this topic came up on Jayesh's thread, I'd like to time-bound it. If you
> > don't hear back by say Friday with specifics from Strata, I'd say that
> > Jayesh's wins by first-mover privilege.
> >
> > Jayesh,
> > If we don't hear from Strata by Friday, I'd say we continue with your
> idea.
> > I've already promoted your user to Event Organizer on
> > https://www.meetup.com/Bay-Area-Apache-Airflow-Incubating-Meetup/
> >
> > You'd need to follow the steps below:
> >
> >- Get approval from Paypal to host it
> >- Ping this list for 2 more speakers - I'd imagine someone from PayPal
> >will also speak about PayPal's use of Airflow.
> >- Create the meet-up event (ideally once you have all 3 speakers)
> >- Update this list with a link to this event (and ping me if I don't
> see
> >it) -- I'll then promote it on our twitter channel, etc...
> >
> > -s
> >
> > On Mon, Jan 23, 2017 at 4:42 PM, Jayesh Senjaliya 
> > wrote:
> >
> > > I am actually up for both, Paypal can host after Strata.
> > >
> > > waiting for community to comment as well.
> > >
> > > Thanks
> > > Jayesh
> > >
> > >
> > > On Mon, Jan 23, 2017 at 3:45 PM, Russell Jurney <
> > russell.jur...@gmail.com>
> > > wrote:
> > >
> > > > I reached out and am awaiting to hear if they have space. They did
> say
> > > that
> > > > attendees of meetups in the evening do NOT need to have a Strata
> pass.
> > > >
> > > > I'm new here, so I don't want to hijack your meetup. If you guys want
> > > > Paypal, lets have Paypal host. I'm sure it will be great either way.
> > > >
> > > > On Fri, Jan 20, 2017 at 1:10 PM, Russell Jurney <
> > > russell.jur...@gmail.com>
> > > > wrote:
> > > >
> > > > > I think if we hold it in the evening, there is no requirement to
> buy
> > a
> > > > > ticket to come to the meetup. Let me verify.
> > > > >
> > > > > On Fri, Jan 20, 2017 at 12:45 PM, Jayesh Senjaliya <
> > > jhsonl...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Russell,
> > > > >>
> > > > >> Sure, Strata will have its own flavor of visitors, but the tickets
> > are
> > > > >> kind of expensive too for everybody to join.
> > > > >>
> > > > >> I agree on turnouts though, so we can try for Strata first and
> > > fallback
> > > > to
> > > > >> regular
> > > > >> meetup in March end or even April if we dont get space in Strata.
> > > > >>
> > > > >> or we can just do both since there will be different group of
> people
> > > at
> > > > >> both places.
> > > > >>
> > > > >> - Jayesh
> > > > >>
> > > > >>
> > > > >> On Fri, Jan 20, 2017 at 12:35 PM, Russell Jurney <
> > > > >> russell.jur...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > As I mentioned in the other thread, I am available to speak on
> > > > >> Predictive
> > > > >> > Analytics with Airflow and PySpark.
> > > > >> >
> > > > >> > Mid march has been suggested. What about the evening of Tuesday,
> > > 3/14
> > > > -
> > > > >> the
> > > > >> > first day of sessions at Strata? We could promote the meetup
> with
> > > the
> > > > >> > conference, get it listed as an evening event. Alternative day
> > could
> > > > be
> > > > >> > Wednesday 3/15, 2nd day of Strata sessions.
> > > > >> >
> > > > >> > This brings up the question... should we maybe have the meetup
> at
> > > > >> Strata?
> > > > >> > Just a thought, we might get better turnout if we get a room
> from
> > > > >> Strata.
> > > > >> > I'm sure they would agree. I'm new here; just an idea.
> > > > >> >
> > > > >> > Russ
> > > > >> >
> > > > >> > On Fri, Jan 20, 2017 at 11:36 AM, Jacky 
> > > wrote:
> > > > >> >
> > > > >> > > Hello Airflow community !
> > > > >> > >
> > > > >> > > I am Jayesh from Paypal, and at last meetup we briefly talked
> > > about
> > > > >> > > hosting next one and I offered to host at Paypal office i

how to capture sqoop mapreduce counters

2017-01-24 Thread Boris Tyukin
Hello all,

is there a way to capture sqoop counters either using bash or sqoop
operator? Specifically I need to pull a total number of rows loaded.

By looking at bash operator, I think there is an option to push the last
line of output to xcom but sqoop and mapreduce output is a bit more
complicated.

Thanks!


Re: Flow-based Airflow?

2017-01-23 Thread Boris Tyukin
this is a good discussion. Most of traditional ETL tools (SSIS,
Informatica, DataStage etc.) have both - control flow (or task dependency)
and data flow. Some tools like SSIS make a clear distinction between them -
you create a control flow that calls data flows as a part of overall
control flow. Some tools like Alteryx were data flow only which is also
very limiting.

I am pretty sure same concepts will come to hadoop world as we saw it with
SQL (just look how many SQL on hadoop engines on the market now and how
everyone tried not to do SQL 5 years ago). Same thing with UI drag-and-drop
based tools - just give it some time and we will see a new wave of tools
that would not require Python, Java or Scala knowledge and will lower the
requirements for the skillset.

I do agree that limiting Airflow as a task dependency tool is not a good
strategy in a long run like I never liked the concept of backfills in
Airflow - ideally the tool should give a choice and support many design
patterns.

At this very moment though, I found Airflow to be the best tool for the job
and the fact that it does not support data flows in hadoop world is not a
deal breaker. Most of the hadoop pipelines are around gluing different
steps and tools which Airflow is extremely good at!

On Mon, Jan 23, 2017 at 11:05 AM, Bolke de Bruin  wrote:

> Hi All,
>
> I came by a write up of some of the downsides in current workflow
> management systems like Airflow and Luigi (http://bionics.it/posts/
> workflows-dataflow-not-task-deps) where they argue dependencies should be
> between inputs and outputs of tasks rather than between tasks
> (inlets/outlets).
>
> They extended Luigi (https://github.com/pharmbio/sciluigi) to do this and
> even published a scientific paper on it: http://jcheminf.springeropen.
> com/articles/10.1186/s13321-016-0179-6 .
>
> I kind of like the idea, has anyone played with it, any thoughts? I might
> want to try it in Airflow.
>
> Bolke


Re: Airflow 1.8.0 BETA 2

2017-01-20 Thread Boris Tyukin
just to make sure this is the latest one, right?
https://dist.apache.org/repos/dist/dev/incubator/airflow/airflow-1.8.0b2+apache.incubating.tar.gz

On Fri, Jan 20, 2017 at 10:57 AM, Bolke de Bruin  wrote:

> Hi All,
>
> I have made the SECOND beta of Airflow 1.8.0 available at:
> https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> https://dist.apache.org/repos/dist/dev/incubator/airflow/> , public keys
> are available at https://dist.apache.org/repos/dist/release/incubator/
> airflow/ 
> . It is tagged with a local version “apache.incubating” so it allows
> upgrading from earlier releases. This beta is available for testing in a
> more production like setting (acceptance environment?).
>
> I would like to encourage everyone  to try it out, to report back any
> issues so we get to a rock solid release of 1.8.0. When reporting issues a
> test case or even a fix is highly appreciated.
>
> Cgroups+impersonation are now in. This means we are feature complete for
> 1.8.0!
>
> Cheers
> Bolke


Re: Airflow 1.8.0 BETA 1

2017-01-19 Thread Boris Tyukin
I'd like to test it on my VM with the code I am working on but I do not
know how to upgrade from 1.7. Can I use pip to pull it from github? maybe
someone can give me directions - i am very new to python. Also will it mess
my airflow.cfg or something else I need to backup?

On Wed, Jan 18, 2017 at 4:38 PM, Chris Riccomini 
wrote:

> We are switching to 1.8.0b1 this week--both dev and prod. Will keep you
> posted.
>
> On Wed, Jan 18, 2017 at 11:51 AM, Alex Van Boxel  wrote:
>
> > Hey Max,
> >
> > As I'm missing the 1.7.2 labels I compared to the 172 branch. Can you
> have
> > a look at PR 2000. Its also sanitised, removing some of the commits that
> > doesn't bring value to the users.
> >
> > On Wed, Jan 18, 2017, 02:51 Maxime Beauchemin <
> maximebeauche...@gmail.com>
> > wrote:
> >
> > > Alex, for the CHANGELOG.md, I've been using `github-changes`, a js app
> > that
> > > make changelog generation flexible and easy.
> > >
> > > https://www.npmjs.com/package/github-changes
> > >
> > > Command looks something like:
> > > `github-changes -o apache -r incubator-airflow --token  > TOKEN>
> > > --between-tags 1.7.2...1.8.0beta` (tags may differ, it's easy to get a
> > > token on your GH profile page)
> > >
> > > This will write a `CHANGELOG.md` in your cwd that you can just add on
> top
> > > of the existing one
> > >
> > > Max
> > >
> > > On Jan 17, 2017 3:37 PM, "Dan Davydov"  invalid>
> > > wrote:
> > >
> > > > So it is, my bad. Bad skills with ctrl-f :).
> > > >
> > > > On Tue, Jan 17, 2017 at 3:31 PM, Bolke de Bruin 
> > > wrote:
> > > >
> > > > > Arthur's change is already in!
> > > > >
> > > > > B.
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On 17 Jan 2017, at 22:20, Dan Davydov  > > .INVALID>
> > > > > wrote:
> > > > > >
> > > > > > Would be good to cherrypick Arthur's fix into here if possible:
> > > > > > https://github.com/apache/incubator-airflow/pull/1973/files
> > (commit
> > > > > > 43bf89d)
> > > > > >
> > > > > > The impersonation stuff should be wrapping up shortly pending
> > Bolke's
> > > > > > comments.
> > > > > >
> > > > > > Also agreed with Max on the thanks. Thanks Alex too for the
> change
> > > log!
> > > > > >
> > > > > > On Tue, Jan 17, 2017 at 10:05 AM, Maxime Beauchemin <
> > > > > > maximebeauche...@gmail.com> wrote:
> > > > > >
> > > > > >> Bolke, I couldn't thank you enough for driving the release
> > process!
> > > > > >>
> > > > > >> I'll coordinate with the Airbnb team around
> impersonation/CGROUPs
> > > and
> > > > on
> > > > > >> making sure we put this release in our staging ASAP. We have our
> > > > > employee
> > > > > >> conference this week so things are slower, but we'll be back at
> > full
> > > > > speed
> > > > > >> Friday.
> > > > > >>
> > > > > >> Max
> > > > > >>
> > > > > >>> On Mon, Jan 16, 2017 at 3:51 PM, Alex Van Boxel <
> > a...@vanboxel.be>
> > > > > wrote:
> > > > > >>>
> > > > > >>> Hey Bolke, thanks great wotk. I'll handle the CHANGELOG, and
> add
> > > some
> > > > > >>> documentation about triggers with branching operators.
> > > > > >>>
> > > > > >>> About the Google Cloud Operators: I wouldn't call it feature
> > > > > complete...
> > > > > >> it
> > > > > >>> never is.
> > > > > >>>
> > > > > >>>
> > > > > >>> On Mon, Jan 16, 2017 at 11:24 PM Bolke de Bruin <
> > bdbr...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>>
> > > > >  Dear All,
> > > > > 
> > > > >  I have made the first BETA of Airflow 1.8.0 available at:
> > > > >  https://dist.apache.org/repos/dist/dev/incubator/airflow/ <
> > > > >  https://dist.apache.org/repos/dist/dev/incubator/airflow/> ,
> > > public
> > > > > >> keys
> > > > >  are available at
> > > > >  https://dist.apache.org/repos/dist/release/incubator/airflow/
> <
> > > > >  https://dist.apache.org/repos/dist/release/incubator/airflow/
> >
> > .
> > > It
> > > > > is
> > > > >  tagged with a local version “apache.incubating” so it allows
> > > > upgrading
> > > > > >>> from
> > > > >  earlier releases. This beta is available for testing in a more
> > > > > >> production
> > > > >  like setting (acceptance environment?).
> > > > > 
> > > > >  I would like to encourage everyone  to try it out, to report
> > back
> > > > any
> > > > >  issues so we get to a rock solid release of 1.8.0. When
> > reporting
> > > > > >> issues
> > > > > >>> a
> > > > >  test case or even a fix is highly appreciated.
> > > > > 
> > > > >  By moving to beta, we are also in feature freeze mode. Meaning
> > no
> > > > > major
> > > > >  adjustments or additions can be made to the v1-8-test branch.
> > > There
> > > > is
> > > > > >>> one
> > > > >  exception: the cgroups+impersonation patch. I was assured that
> > > > before
> > > > > >> we
> > > > >  merge that it will be thoroughly tested, so its can still
> enter
> > > 1.8
> > > > if
> > > > >  within a reasonable time frame. A lot of work has gone into it
> > and
> > > > it
> > > > > >>> 

Re: Good jinja templating in airflow examples

2017-01-19 Thread Boris Tyukin
Max, this is awesome! Here is the working example if someone needs it - you
almost typed it right :)


def foo(templates_dict, *args, **kwargs):
return templates_dict['ds']


# All of the values in the `templates_dict` should get templated by the
Airflow engine,
# meaning that in this context, `foo` should print the value of the `ds`.

t4 = PythonOperator(
task_id='t4',
python_callable=foo,
provide_context=True,
templates_dict={
'ds': '{{ ds }}'
},
dag=dag)

On Thu, Jan 19, 2017 at 11:58 AM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> I didn't test the code bellow, but hopefully you get the idea:
> --
>
> def foo(templates_dict, *args, **kwargs):
> print(templates_dict['ds'])
>
> op = PythonOperator('a_task_id', python_callable=foo, templates_dict={'ds':
> '{{ ds }}'})
> # All of the values in the `templates_dict` should get templated by the
> Airflow engine, meaning that in this context, `foo` should print the value
> of the `ds`.
>
> Max
>
> On Thu, Jan 19, 2017 at 6:22 AM, Boris Tyukin 
> wrote:
>
> > Maxime, I have a related question. Can you explain how template_dict work
> > with PythonOperator? Documentation is very vague about it. Example would
> be
> > nice. I could not find a way to use jinja template with Python operator
> > i.e. passing templated parameters to a pythonoperator callable. Someone
> > told me the only way to do it is to create a wrapper function and use
> xcom
> > or macro from there
> >
> > On Wed, Jan 18, 2017 at 10:09 PM, Maxime Beauchemin <
> > maximebeauche...@gmail.com> wrote:
> >
> > > Here's the list of variables and macros exposed in the jinja context:
> > > https://airflow.apache.org/code.html#macros
> > >
> > > Those are all exposed by the framework, meaning they are usable for any
> > > templated field without doing anything special.
> > >
> > > There are ways to pass your own variables and methods using the
> `params`
> > > attribute of any operator as shown in the tutorial here
> > > <https://airflow.apache.org/tutorial.html#example-pipeline-definition
> >,
> > > and
> > > there's also a `user_defined_macros` parameter as a dict you can pass
> > while
> > > creating the DAG object. Every key in that dictionary is made available
> > in
> > > the global jinja namespace.
> > >
> > > https://airflow.apache.org/code.html#airflow.models.DAG
> > >
> > > Max
> > >
> > > On Wed, Jan 18, 2017 at 11:31 AM, Boris Tyukin 
> > > wrote:
> > >
> > > > Hi Guilherme,
> > > >
> > > > I guess it depends what exactly you want to do as not everything
> works
> > > with
> > > > jinja.
> > > >
> > > > From documentation:
> > > >
> > > > https://pythonhosted.org/airflow/concepts.html#jinja-templating
> > > >
> > > > You can use Jinja templating with every parameter that is marked as
> > > > “templated” in the documentation.
> > > >
> > > > You can open source code for operators and see what parameters are
> > > actually
> > > > templated.
> > > >
> > > > For example, if you open source code for BashOperator
> > > > https://pythonhosted.org/airflow/_modules/bash_
> > > operator.html#BashOperator
> > > > you will see this line of code:
> > > > template_fields = ('bash_command', 'env')
> > > >
> > > > It means only bash_command and env will work with jinja templates.
> > > >
> > > > On Wed, Jan 18, 2017 at 12:45 PM, Guilherme Marthe <
> > > > guilherme.mar...@enjoei.com.br> wrote:
> > > >
> > > > > Hey folks!
> > > > >
> > > > > I am trying to write a dag that works well with jinja templating,
> > since
> > > > > through my study of the documentation, is the best way to ensure
> > > > > compatibility with the back-fill function.
> > > > >
> > > > > Are you guys aware of any examples online with this functionality
> > > > working?
> > > > > The, documentation is still a bit dry on insightful examples, and I
> > am
> > > a
> > > > > noobie developer, so I am trying to make the best use of the tool
> in
> > > the
> > > > > "correct way".
> > > > >
> > > > > Thank you in advance for any resources you guys can share!
> > > > >
> > > > > Gui
> > > > >
> > > > > Ps: I ve been told I can email you guys with questions like these
> :)
> > > Hope
> > > > > this is not a hassle ^_^
> > > > >
> > > >
> > >
> >
>


Re: Good jinja templating in airflow examples

2017-01-19 Thread Boris Tyukin
Maxime, I have a related question. Can you explain how template_dict work
with PythonOperator? Documentation is very vague about it. Example would be
nice. I could not find a way to use jinja template with Python operator
i.e. passing templated parameters to a pythonoperator callable. Someone
told me the only way to do it is to create a wrapper function and use xcom
or macro from there

On Wed, Jan 18, 2017 at 10:09 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> Here's the list of variables and macros exposed in the jinja context:
> https://airflow.apache.org/code.html#macros
>
> Those are all exposed by the framework, meaning they are usable for any
> templated field without doing anything special.
>
> There are ways to pass your own variables and methods using the `params`
> attribute of any operator as shown in the tutorial here
> <https://airflow.apache.org/tutorial.html#example-pipeline-definition>,
> and
> there's also a `user_defined_macros` parameter as a dict you can pass while
> creating the DAG object. Every key in that dictionary is made available in
> the global jinja namespace.
>
> https://airflow.apache.org/code.html#airflow.models.DAG
>
> Max
>
> On Wed, Jan 18, 2017 at 11:31 AM, Boris Tyukin 
> wrote:
>
> > Hi Guilherme,
> >
> > I guess it depends what exactly you want to do as not everything works
> with
> > jinja.
> >
> > From documentation:
> >
> > https://pythonhosted.org/airflow/concepts.html#jinja-templating
> >
> > You can use Jinja templating with every parameter that is marked as
> > “templated” in the documentation.
> >
> > You can open source code for operators and see what parameters are
> actually
> > templated.
> >
> > For example, if you open source code for BashOperator
> > https://pythonhosted.org/airflow/_modules/bash_
> operator.html#BashOperator
> > you will see this line of code:
> > template_fields = ('bash_command', 'env')
> >
> > It means only bash_command and env will work with jinja templates.
> >
> > On Wed, Jan 18, 2017 at 12:45 PM, Guilherme Marthe <
> > guilherme.mar...@enjoei.com.br> wrote:
> >
> > > Hey folks!
> > >
> > > I am trying to write a dag that works well with jinja templating, since
> > > through my study of the documentation, is the best way to ensure
> > > compatibility with the back-fill function.
> > >
> > > Are you guys aware of any examples online with this functionality
> > working?
> > > The, documentation is still a bit dry on insightful examples, and I am
> a
> > > noobie developer, so I am trying to make the best use of the tool in
> the
> > > "correct way".
> > >
> > > Thank you in advance for any resources you guys can share!
> > >
> > > Gui
> > >
> > > Ps: I ve been told I can email you guys with questions like these :)
> Hope
> > > this is not a hassle ^_^
> > >
> >
>


Re: Good jinja templating in airflow examples

2017-01-18 Thread Boris Tyukin
Hi Guilherme,

I guess it depends what exactly you want to do as not everything works with
jinja.

>From documentation:

https://pythonhosted.org/airflow/concepts.html#jinja-templating

You can use Jinja templating with every parameter that is marked as
“templated” in the documentation.

You can open source code for operators and see what parameters are actually
templated.

For example, if you open source code for BashOperator
https://pythonhosted.org/airflow/_modules/bash_operator.html#BashOperator
you will see this line of code:
template_fields = ('bash_command', 'env')

It means only bash_command and env will work with jinja templates.

On Wed, Jan 18, 2017 at 12:45 PM, Guilherme Marthe <
guilherme.mar...@enjoei.com.br> wrote:

> Hey folks!
>
> I am trying to write a dag that works well with jinja templating, since
> through my study of the documentation, is the best way to ensure
> compatibility with the back-fill function.
>
> Are you guys aware of any examples online with this functionality working?
> The, documentation is still a bit dry on insightful examples, and I am a
> noobie developer, so I am trying to make the best use of the tool in the
> "correct way".
>
> Thank you in advance for any resources you guys can share!
>
> Gui
>
> Ps: I ve been told I can email you guys with questions like these :) Hope
> this is not a hassle ^_^
>


Re: question about max_active_runs or how to force only one Dag run at the time

2017-01-17 Thread Boris Tyukin
interesting, thanks Bolke for giving me this idea!

On Tue, Jan 17, 2017 at 11:29 AM, Bolke de Bruin  wrote:

> Well yes I guess so. You can actually create pools from a DAG definition
> as well (I personally don’t like that for operational security reasons),
> but yes this works. So could verify if the pool exists for a certain DAG
> and create it if it doesn’t.
>
> Bolke
>
> > On 17 Jan 2017, at 17:26, Boris Tyukin  wrote:
> >
> > interesting but I have 200 DAGs like that - I generate them
> > programmatically. Does it mean I need a different pool for each one of
> > them? Can I define pools not from UI? that would be painful :)
> >
> > On Tue, Jan 17, 2017 at 10:22 AM, Bolke de Bruin 
> wrote:
> >
> >> You could use a pool, if you are fine with the “second” dag run being
> run
> >> later.
> >>
> >>
> >>> On 17 Jan 2017, at 16:18, Boris Tyukin  wrote:
> >>>
> >>> hello,
> >>>
> >>> I would like to prevent a DAG from scheduling / running concurrently
> >>> whether by scheduler or if triggered externally (via airflow
> trigger_dag
> >> or
> >>> UI).
> >>>
> >>> I tried to set max_active_runs to 1 but i did not work for externally
> >>> triggered case - airflow would still go ahead and execute multiple dag
> >> runs.
> >>>
> >>> I see this is done intentionally in dagrun_exists_dep.py:
> >>>
> >>> running_dagruns = DagRun.find(
> >>> dag_id=dag.dag_id,
> >>> state=State.RUNNING,
> >>> *external_trigger=False,*
> >>> session=session
> >>> )
> >>> Is there any other way of forcing only one DAG run at the time?
> >>>
> >>> I am aware of depends_on_past=True but I use PythonBranch operators and
> >>> that setting does not make sense in my case.
> >>
> >>
>
>


Re: question about max_active_runs or how to force only one Dag run at the time

2017-01-17 Thread Boris Tyukin
interesting but I have 200 DAGs like that - I generate them
programmatically. Does it mean I need a different pool for each one of
them? Can I define pools not from UI? that would be painful :)

On Tue, Jan 17, 2017 at 10:22 AM, Bolke de Bruin  wrote:

> You could use a pool, if you are fine with the “second” dag run being run
> later.
>
>
> > On 17 Jan 2017, at 16:18, Boris Tyukin  wrote:
> >
> > hello,
> >
> > I would like to prevent a DAG from scheduling / running concurrently
> > whether by scheduler or if triggered externally (via airflow trigger_dag
> or
> > UI).
> >
> > I tried to set max_active_runs to 1 but i did not work for externally
> > triggered case - airflow would still go ahead and execute multiple dag
> runs.
> >
> > I see this is done intentionally in dagrun_exists_dep.py:
> >
> > running_dagruns = DagRun.find(
> > dag_id=dag.dag_id,
> > state=State.RUNNING,
> > *external_trigger=False,*
> > session=session
> > )
> > Is there any other way of forcing only one DAG run at the time?
> >
> > I am aware of depends_on_past=True but I use PythonBranch operators and
> > that setting does not make sense in my case.
>
>


question about max_active_runs or how to force only one Dag run at the time

2017-01-17 Thread Boris Tyukin
hello,

I would like to prevent a DAG from scheduling / running concurrently
whether by scheduler or if triggered externally (via airflow trigger_dag or
UI).

I tried to set max_active_runs to 1 but i did not work for externally
triggered case - airflow would still go ahead and execute multiple dag runs.

I see this is done intentionally in dagrun_exists_dep.py:

running_dagruns = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
*external_trigger=False,*
session=session
)
Is there any other way of forcing only one DAG run at the time?

I am aware of depends_on_past=True but I use PythonBranch operators and
that setting does not make sense in my case.


Strata San Jose

2017-01-16 Thread Boris Tyukin
Hello everyone,

is anyone talking about Airflow at Strata in March? Or maybe we can plan a
meetup there - would awesome!


Re: Airflow 2.0

2016-11-21 Thread Boris Tyukin
I am still deciding between Airflow and oozie for our brand new Hadoop
project but here is a few things that I did not like during my limited
testing:

1) pain with scheduler/webserver restarts - things magically begin working
after restart or disappear (like DAG tasks that are no longer part of DAG)
2) no security - a big deal for enterprise-like companies like the one I
work for (a large healthcare organization).
3) backfill concept is a bit weird to me. I think Gerard put it pretty well
- backfills should be run for the entire missing window, not day by day.
Logging for backfills should be consistent with normal DAG Runs.
4) confusion around execution time and start time - i wish UI would clearly
distinct them. Execution time only covers interval to a previous DAG run -
I wish it would go back the LAST successful DAG run. That way I can rely on
it to use it as watermarks for incremental processes.
5) UTC confusion - not all companies have a luxury to run all the systems
on UTC.


On Mon, Nov 21, 2016 at 5:26 PM, siddharth anand  wrote:

> Also, a survey will be a little less noisy and easier to summarize than +1s
> in this email thread.
> -s (Sid)
>
> On Mon, Nov 21, 2016 at 2:25 PM, siddharth anand 
> wrote:
>
> > Sergei,
> > These are some great ideas -- I would classify at least half of them as
> > pain points.
> >
> > Folks!
> > I suggest people (on the dev list) keep feeding this thread at least for
> > the next 2 days. I can then float a survey based on these ideas and give
> > the community a chance to vote so we can prioritize the wish list.
> >
> > -s
> >
> > On Mon, Nov 21, 2016 at 5:22 AM, Sergei Iakhnin 
> wrote:
> >
> >> I've been running Airflow on 1500 cores in the context of scientific
> >> workflows for the past year and a half. Features that would be important
> >> to
> >> me for 2.0:
> >>
> >> - Add FK to dag_run to the task_instance table on Postgres so that
> >> task_instances can be uniquely attributed to dag runs.
> >> - Ensure scheduler can be run continuously without needing restarts.
> Right
> >> now it gets into some ill-determined bad state forcing me to restart it
> >> every 20 minutes.
> >> - Ensure scheduler can handle tens of thousands of active workflows.
> Right
> >> now this results in extremely long scheduling times and inconsistent
> >> scheduling even at 2 thousand active workflows.
> >> - Add more flexible task scheduling prioritization. The default
> >> prioritization is the opposite of the behaviour I want. I would prefer
> >> that
> >> downstream tasks always have higher priority than upstream tasks to
> cause
> >> entire workflows to tend to complete sooner, rather than scheduling
> tasks
> >> from other workflows. Having a few scheduling prioritization strategies
> >> would be beneficial here.
> >> - Provide better support for manually-triggered DAGs on the UI i.e. by
> >> showing them as queued.
> >> - Provide some resource management capabilities via something like slots
> >> that can be defined on workers and occupied by tasks. Using celery's
> >> concurrency parameter at the airflow server level is too coarse-grained
> as
> >> it forces all workers to be the same, and does not allow proper resource
> >> management when different workflow tasks have different resource
> >> requirements thus hurting utilization (a worker could run 8 parallel
> tasks
> >> with small memory footprint, but only 1 task with large memory footprint
> >> for instance).
> >>
> >> With best regards,
> >>
> >> Sergei.
> >>
> >>
> >> On Mon, Nov 21, 2016 at 2:00 PM Ryabchuk, Pavlo <
> >> ext-pavlo.ryabc...@here.com>
> >> wrote:
> >>
> >> > -1. We extremely rely on data profiling, as a pipeline health
> monitoring
> >> > tool
> >> >
> >> > -Original Message-
> >> > From: Chris Riccomini [mailto:criccom...@apache.org]
> >> > Sent: Saturday, November 19, 2016 1:57 AM
> >> > To: dev@airflow.incubator.apache.org
> >> > Subject: Re: Airflow 2.0
> >> >
> >> > > RIP out the charting application and the data profiler
> >> >
> >> > Yes please! +1
> >> >
> >> > On Fri, Nov 18, 2016 at 2:41 PM, Maxime Beauchemin <
> >> > maximebeauche...@gmail.com> wrote:
> >> > > Another point that may be controversial for Airflow 2.0: RIP out the
> >> > > charting application and the data profiler. Even though it's nice to
> >> > > have it there, it's just out of scope and has major security
> >> > issues/implications.
> >> > >
> >> > > I'm not sure how popular it actually is. We may need to run a survey
> >> > > at some point around this kind of questions.
> >> > >
> >> > > Max
> >> > >
> >> > > On Fri, Nov 18, 2016 at 2:39 PM, Maxime Beauchemin <
> >> > > maximebeauche...@gmail.com> wrote:
> >> > >
> >> > >> Using FAB's Model, we get pretty much all of that (REST API,
> >> > >> auth/perms,
> >> > >> CRUD) for free:
> >> > >> https://emea01.safelinks.protection.outlook.com/?url=http%
> >> 3A%2F%2Ffla
> >> > >> sk-appbuilder.readthedocs.io%2Fen%2Flatest%2F&data=01%7C01%7
> >> C%7C0064f
> >> > >> 74fd0d94

Re: ETL best practices for airflow

2016-10-25 Thread Boris Tyukin
ets/datablocks systematically as sources to your computations, in
> >> ways
> >> that any task instance sources from immutable datasets that are
> persisted
> >> in your backend. That allows to satisfy the guarantee that re-running
> any
> >> chunk of ETL at different point in time should lead to the exact same
> >> result. It also usually means that you need to 1-do incremental loads,
> and
> >> 2- "snapshot" your dimension/referential/small tables in time to make
> sure
> >> that running the ETL from 26 days ago sources from the dimension
> snapshot
> >> as it was back then and yields the exact same result.
> >>
> >> Anyhow, it's a complex and important subject I should probably write
> about
> >> in a structured way sometime.
> >>
> >> Max
> >>
> >> On Mon, Oct 17, 2016 at 6:12 PM, Boris Tyukin 
> >> wrote:
> >>
> >>
> >>
> >
>


Re: Changing the crontab for a DAG

2016-10-25 Thread Boris Tyukin
yep this is what was recommeded in
https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls


   - When needing to change your start_date and schedule interval, change
   the name of the dag (a.k.a. dag_id) - I follow the convention : my_dag_v1
   , my_dag_v2, my_dag_v3, my_dag_v4, etc...


Needless to say this is not the best way but looks like this is what others
are doing. I actually prefer not to rename dag so i can keep the history
and logs.

On Tue, Oct 25, 2016 at 6:29 AM, Gerard Toonstra 
wrote:

> What some people do is give the new dag a new version in the name, like _v1
> or _v2 at the end. Then it's treated like another dag and you can disable
> the old one. IF you make changes to dags, it's possible that old
> operators/tasks are no longer visible in the UI and you no longer have
> access to the log files.
>
> Obviously, it's important to use the correct start_date in that case and
> probably turn off the old version.
>
> Rgds,
>
> Gerard
>
>
> On Tue, Oct 25, 2016 at 12:09 PM,  wrote:
>
> > Hi David,
> >
> > For me worked:
> > 1. Disable the DAG
> > 2. Refresh
> > 3. Edit the schedule_interval
> > 4. Refresh the DAG
> > 5. Enable
> >
> > You also should edit the start_date to avoid backfilled runs.
> >
> > --alex
> >
> > --
> > B: mapredit.blogspot.com
> >
> > From: David Batista
> > Sent: Tuesday, October 25, 2016 11:48 AM
> > To: dev@airflow.incubator.apache.org
> > Subject: Changing the crontab for a DAG
> >
> > Hi everyone,
> >
> > Just a quick question, if you have a running DAG (i.e., runs daily for
> > instance), and at a certain point you want to update the crontab, for
> > instance, change the hour on which it runs.
> >
> > Usually, want I do, is to clear all the past tasks with the old crontab,
> > and update the the start_date of the DAG, to be equal to the day when the
> > the hour changed.
> >
> > Nevertheless, sometimes, it happens that the DAG starts running with the
> > old hour (before the change)
> >
> > What is the best way to deal with this, changing, let's say the hour,
> for a
> > running a DAG?
> >
> > Best regards,
> > David Batista
> >
> > --
> >
> > [image: logo]
> >       > HelloFreshde>
> >   
> >  > deep_link=hellofresh%3A%2F%2F&post_deep_link=https%3A%2F%
> > 2Fwww.hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_
> > source%3Demail_signature&fallback=https%3A%2F%2Fwww.
> > hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_source%
> > 3Demail_signature>
> >
> > *HelloFresh App –Download Now!*
> >  > deep_link=hellofresh%3A%2F%2F&post_deep_link=https%3A%2F%
> > 2Fwww.hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_
> > source%3Demail_signature&fallback=https%3A%2F%2Fwww.
> > hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_source%
> > 3Demail_signature>
> > *We're active in:*
> > US  > email_signature>
> >  |  DE
> > 
> |
> > UK
> >  email_signature
> > >
> > |  NL
> > 
> |
> > AU
> >  > source=email_signature>
> >  |  BE
> > 
> |
> > AT  email_signature
> > >
> > |  CH
> > 
> |
> > CA  email_signature
> > >
> >
> > www.HelloFreshGroup.com
> >  > source=email_signature>
> >
> > We are hiring around the world – Click here to join us
> >  > source=email_signature>
> >
> > --
> >
> >  > source=email_signature>
> > HelloFresh AG, Berlin (Sitz der Gesellschaft) | Vorstände: Dominik S.
> > Richter (Vorsitzender), Thomas W. Griesel, Christian Gärtner |
> Vorsitzender
> > des Aufsichtsrats: Jeffrey Lieberman | Eingetragen beim Amtsgericht
> > Charlottenburg, HRB 171666 B | USt-Id Nr.: DE 302210417
> >
> > *CONFIDENTIALITY NOTICE:* This message (including any attachments) is
> > confidential and may be privileged. It may be read, copied and used only
> by
> > the intended recipient. If you have received it in error please contact
> the
> > sender (by return e-mail) immediately and delete this message. Any
> > unauthorized use or dissemination of this message in whole or in parts is
> > strictly prohibited.
> >
> >
>


Re: Best practices for dynamically generated tasks and dags

2016-10-21 Thread Boris Tyukin
thanks Laura, it helps! i was hoping you would reply :) very good points
about UI / logs / restarts - I think at this point I really like #2 option
myself.

I still wonder if people do something creative to generate complex DAGs
outside of a DAG folder - so this would be an example when it takes
significant time to poll metadata/databases to generate all the tasks. I do
not know if it is possible as I am not strong with Python (actually I have
been learning Python as I am learning Airflow!) The idea is to have an
outside py to generate static .py file for a DAG/s and place these
generated py files under airflow dag_folder once a day or on some schedule.
Is anyone doing this or I am over-complicating things and #2 should just
work?

I think in my case it might take a good minute to parse out metadata files
and some database tables to actually generate DAG tasks. Also I imagine it
will produce a heck of log records since scheduler polls dag folders every
minute and this process will repeat again itself in a minute - so it will
be like doing this non-stop unless I change airflow scheduler settings.



On Fri, Oct 21, 2016 at 11:39 AM, Laura Lorenz 
wrote:

> We've been evolving from type 1 you describe to a pull/poll version of the
> type 2 you describe. For type 1, it is really hard to tell what's going on
> (all the UI views become useless because they are so huge). Having one big
> dag also means you can't turn off the scheduler for individual parts, and
> the whole DAG fails if one task does, so if you can functionally separate
> them I think that gives you more configuration options. Our biggest DAG now
> is more like 22*10 tasks, which is still too big in our opinions. We
> leverage ExternalTaskSensors to link dags together which is more of a
> pull/poll paradigm, but you could use a TriggerDagRunOperator if you wanted
> more of a push/trigger paradigm which is what I hea ryou saying in type 2.
>
> To your second question, our DAGs are dynamic based on the results of an
> API call we embed in the DAG and our scheduler is on a 5-second timelapse
> for each attemp to refill the DagBag. I think because of the frequency of
> the scheduler polling the files, because our API call is relatively fast,
> we are working with DAGs that are on a 24 hour schedule_interval, and the
> resultant DAG structure is not too large or complicated, we haven't had any
> issues with that or done anything special. I think it's just the fact of
> the matter that if you give the scheduler a lot of work to do to determine
> the DAG shape, it will take a while.
>
> Laura
>
> On Fri, Oct 21, 2016 at 10:52 AM, Boris Tyukin 
> wrote:
>
> > Guys, would you mind to chime in and share your experience?
> >
>


Re: Best practices for dynamically generated tasks and dags

2016-10-21 Thread Boris Tyukin
Guys, would you mind to chime in and share your experience?


Re: Backfill for tasks to be scheduled @once

2016-10-20 Thread Boris Tyukin
hello Tamara,

I just created a quick example below and it worked like you would expect it
to work. It ran only once. Also when I cleared the task instance execution
via UI (or alternatively, deleting it and setting DagRun to running
status), it would rerun it again - this would be your on-demand scenario.

Make sure your start date is actually set in the past.



from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# Determine schedule:
dag_schedule_interval = "@once"
dag_start_date = datetime.now() - timedelta(seconds=60)

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': dag_start_date,
'email': ['airf...@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
}

# Change version number if schedule needs to be changed:
dag = DAG(
'once', default_args=default_args,
schedule_interval=dag_schedule_interval)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='t1',
bash_command='echo execution ts {{ ts }} & echo 1',
# bash_command='exit 1',
dag=dag)


On Thu, Oct 20, 2016 at 10:38 AM, Tamara Mendt  wrote:

> Hi,
>
> I have tried to use the "@once" scheduling option for the first time for a
> DAG that I only wish to run once. I would possibly want to run it again at
> some random point in the future, but not on regular intervals.
>
> I thought that if I set a start date to the DAG, it would be scheduled only
> once, with the start date. However it seemed to be scheduled once with the
> start date, and another time with the time point when I unpause the DAG.
> Also, when I tried clearing all executions of the DAG and running a
> backfill, nothing was scheduled.
>
> Maybe I am confused as to how this type of one time scheduling works. Can
> somebody please provide some insight?
>
> Thank you,
>
> Cheers,
>
> Tamara.
>
> --
>
> [image: logo]
>       HelloFreshde>
>   
>  deep_link=hellofresh%3A%2F%2F&post_deep_link=https%3A%2F%
> 2Fwww.hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_
> source%3Demail_signature&fallback=https%3A%2F%2Fwww.
> hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_source%
> 3Demail_signature>
>
> *HelloFresh App –Download Now!*
>  deep_link=hellofresh%3A%2F%2F&post_deep_link=https%3A%2F%
> 2Fwww.hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_
> source%3Demail_signature&fallback=https%3A%2F%2Fwww.
> hellofresh.com%2Fapp%2F%3Futm_medium%3Demail%26utm_source%
> 3Demail_signature>
> *We're active in:*
> US  email_signature>
>  |  DE
>  |
> UK
>  >
> |  NL
>  |
> AU
>  source=email_signature>
>  |  BE
>  |
> AT  >
> |  CH
>  |
> CA  >
>
> www.HelloFreshGroup.com
>  source=email_signature>
>
> We are hiring around the world – Click here to join us
>  source=email_signature>
>
> --
>
>  source=email_signature>
> HelloFresh AG, Berlin (Sitz der Gesellschaft) | Vorstände: Dominik S.
> Richter (Vorsitzender), Thomas W. Griesel, Christian Gärtner | Vorsitzender
> des Aufsichtsrats: Jeffrey Lieberman | Eingetragen beim Amtsgericht
> Charlottenburg, HRB 171666 B | USt-Id Nr.: DE 302210417
>
> *CONFIDENTIALITY NOTICE:* This message (including any attachments) is
> confidential and may be privileged. It may be read, copied and used only by
> the intended recipient. If you have received it in error please contact the
> sender (by return e-mail) immediately and delete this message. Any
> unauthorized use or dissemination of this message in whole or in parts is
> strictly prohibited.
>


Re: Usage of "on_failure_callback" ?

2016-10-18 Thread Boris Tyukin
hi Jason,

here is an example below - in  task0_python_callable i did 1/0 to raise an
error and Airflow then called task0_failure_cb. I could see FAIL in the
task log so my  task0_failure_cb was definitely invoked.

The task would still have failed status if look at UI. Are you expecting a
different behavior?


def task0_python_callable(ds, **kwargs):
print ds
print 1/0
return 'Success!!! Execution time = {}'.format(kwargs['execution_date'])


def task0_failure_cb(ds, **kwargs):
logging.info("FAIL")

task0 = PythonOperator(
task_id='task0',
python_callable=task0_python_callable,
on_failure_callback=task0_failure_cb,
provide_context=True,
dag=dag)



[2016-10-18 16:07:02,631] {models.py:154} INFO - Filling up the DagBag from
/home/oracle/airflow/dags/fail_callback.py
[2016-10-18 16:07:03,239] {models.py:154} INFO - Filling up the DagBag from
/home/oracle/airflow/dags/fail_callback.py
[2016-10-18 16:07:03,312] {models.py:1196} INFO -

Starting attempt 1 of 1


[2016-10-18 16:07:03,317] {models.py:1219} INFO - Executing
 on 2016-10-18 16:06:02
[2016-10-18 16:07:03,322] {models.py:1286} ERROR - integer division or
modulo by zero
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line
1245, in run
result = task_copy.execute(context=context)
  File
"/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py",
line 66, in execute
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/oracle/airflow/dags/fail_callback.py", line 33, in
task0_python_callable
print 1/0
ZeroDivisionError: integer division or modulo by zero
[2016-10-18 16:07:03,324] {models.py:1306} INFO - Marking task as FAILED.
[2016-10-18 16:07:03,324] {fail_callback.py:38} INFO - FAIL
[2016-10-18 16:07:03,327] {models.py:1327} ERROR - integer division or
modulo by zero


On Tue, Oct 18, 2016 at 2:59 PM, Jason Chen 
wrote:

> Hi airflow team,
>
> Is there any sample code to use "on_failure_callback" ?
> I tried to use that as a callback to "post-process" when a task fails.
> However, I cannot make it work.
>
> The definition of my task is as below (just one task in my dag).
>
> It invokes "task0_python_callable" which executes a command-line script and
> it could drops exception. It in turns making task0 failed.
> In that case, I am thinking it should trigger the "on_failure_callback"
> function "task0_failure_cb", but it does not happen.
>
> Any suggestion?
> Thanks.
>
> Jason
>
> =
> task0 = PythonOperator(
> task_id='task0',
> python_callable=task0_python_callable,
> on_failure_callback=task0_failure_cb,
> dag=dag)
>


Best practices for dynamically generated tasks and dags

2016-10-18 Thread Boris Tyukin
Sorry again for posting in the DEV group since it is user type question but
I do not think we have a user group list and I do not feel that gitter is
appropriate for this sort of discussions.

I am actively testing Airflow for a specific use case which is a generation
of workflows/tasks for 200-300 tables in my source database. Every table
will have a set a pretty standard tasks (~6-8) and tasks will have some
conditions (e.g. on some runs some of the tasks will be skipped).

After looking at Oozie, Luigi, Pinball and Airflow and watching numerous
presentations, I thought Airflow was a perfect match for that. I am really
blown away by features and potential and use cases.

I know many of committers are doing something similar and I'd love to hear
the details and some guidance in terms of best practices. I heard 3 options
I think:

#1 Create one big DAG (in my case it would be a DAG with 300x8 tasks)

#2 Create one DAG that will generate smaller DAGs (as described here
https://wecode.wepay.com/posts/airflow-wepay)

#3 A combination of #1 and #2? like external .py file to generate "static"
DAG on demand (e.g. adding a new table or removing one)

Second question related to this concept. Airflow webserver and scheduler
polls DAG folder to fill up DagBag and it does that very frequently by
default (every minute?) The problem is that it takes time to generate DAG
dynamically - in my case I will be using some metadata from YAML or
database and this process might well take a minute or too. How do you deal
with this?

Thanks again for such an amazing project!


Re: Fwd: Issue with Dynamically created tasks in a DAG

2016-10-18 Thread Boris Tyukin
thanks Laura, this confirms what I am seeing exactly. Ben Tallman picked
Jira request so hopefully he can come up with something that won't be that
confusing.

On Tue, Oct 18, 2016 at 10:47 AM, Laura Lorenz 
wrote:

> I think the source of the confusion you're experiencing is that the UI is
> based off of the DAG definition file at time of webserver load, which I
> believe is on the one hand defensible since the scheduler operates in a
> somewhat similar way, but on the other hand rightfully confusing (and
> doesn't TOTALLY mimic scheduler activities so it's really the worst of both
> worlds IMHO). When you change the DAG definition file, you have to kick the
> webserver to pick up new graph/tree drawings. In the case of removing
> tasks, I think that if you queried the underlying metadata database
> directly, your 'table2' task instances would still exist, but the UI
> doesn't know that it should show them based on the DAG definition files it
> has on hand during webserver process reload.
>
> I have non-dynamic DAGs that when the DAG shape is changed dramatically by
> me (including removing tasks) I usually create an entirely new DAG (in
> practice this is changing the dag_id of the DAG object in the DAG
> definition file, for example 'my_dag' becomes 'my_dag_v2') so that there is
> no confusion of it being tied to previous history. If you choose to keep
> your previous DAG definition file ('my_dag') but have the scheduler for
> that DAG in the off state, and add in the new DAG in the on state
> ('my_dag_v2') the UI will render both as different DAGs and you can
> navigate through history with the UI as normal.
>
> This has been discussed as the preferred workaround for several different
> types of major DAG configuration changes (such as a start_date further in
> the past than the original version was configured for), but I'm not sure if
> anything has been going on (yet?) to redesign it in any way. I believe as
> mentioned it is basically a side effect of depending on the DAG definition
> files to draw graphs and trees as opposed to history.
>
> Sort of an aside but relevant if you are changing DAG shape with any
> frequency: We also see that when we add tasks to an existing DAG, what I
> will see is that the tree view/graph view will fill in the added task for
> all of history with the state 'no status'. If that DAG is set to have
> depends_on_past=True, this will actually clog up a new DagRun unless I do
> something to force the new task in the new DagRun that has no previous task
> instance history to execute regardless.
>
> On Sun, Oct 16, 2016 at 7:04 PM, Boris Tyukin 
> wrote:
>
> > I opened a JIRA - looks like based on comments in other threads, it does
> > not work properly right now.
> >
> > [AIRFLOW-574] Show Graph/Tree view and Task Instance logs using executed
> > DagRun, not current
> >
> >
> >
> >
>


Re: ETL best practices for airflow

2016-10-17 Thread Boris Tyukin
Thanks for sharing your slides, Laura! I think I've watched all the airflow
related slides I could find and you did a very good job - adding your
slides to my collection :)  I especially liked how were explaining
execution date concept but I wish you could elaborate on a backfill concept
and running the same dag in parallel (if you guys do this sort of thing) -
I think this the most confusing thing of Airflow that needs good
explanation / examples.

On Mon, Oct 17, 2016 at 5:19 PM, Laura Lorenz 
wrote:

> Same! I actually recently gave a talk about how my company uses airflow at
> PyData DC. The video isn't live yet, but the slides are here
> <http://www.slideshare.net/LauraLorenz4/how-i-learned-to-
> time-travel-or-data-pipelining-and-scheduling-with-airflow>.
> In substance it's actually very similar to what you've written.
>
> I have some airflow-specific ideas about ways to write custom sensors that
> poll job apis (pretty common for us). We do dynamic generation of tasks
> using external metadata by embedding an API call in the DAG definition
> file, which I'm not sure is a best practice or not...
>
> Anyways, if it makes sense to contribute these case studies for
> consideration as a 'best practice', if this is the place or way to do it,
> I'm game. I agree that the resources and thought leadership on ETL design
> is fragmented, and think the Airflow community is fertile ground to provide
> discussion about it.
>
> On Sun, Oct 16, 2016 at 6:40 PM, Boris Tyukin 
> wrote:
>
> > I really look forward to it, Gerard! I've read what you you wrote so far
> > and I really liked it - please keep up the great job!
> >
> > I am hoping to see some best practices for the design of incremental
> loads
> > and using timestamps from source database systems (not being on UTC so
> > still confused about it in Airflow). Also practical use of subdags and
> > dynamic generation of tasks using some external metadata (maybe describe
> in
> > details something similar that wepay did
> > https://wecode.wepay.com/posts/airflow-wepay)
> >
> >
> > On Sun, Oct 16, 2016 at 5:23 PM, Gerard Toonstra 
> > wrote:
> >
> > > Hi all,
> > >
> > > About a year ago, I contributed the HTTPOperator/Sensor and I've been
> > > tracking airflow since. Right now it looks like we're going to adopt
> > > airflow at the company I'm currently working at.
> > >
> > > In preparation for that, I've done a bit of research work how airflow
> > > pipelines should fit together, how important ETL principles are covered
> > and
> > > decided to write this up on a documentation site. The airflow
> > documentation
> > > site contains everything on how all airflow works and the constructs
> that
> > > you have available to build pipelines, but it can still be a challenge
> > for
> > > newcomers to figure out how to put those constructs together to use it
> > > effectively.
> > >
> > > The articles I found online don't go into a lot of detail either.
> Airflow
> > > is built around an important philosophy towards ETL and there's a risk
> > that
> > > newcomers simply pick up a really great tool and start off in the wrong
> > way
> > > when using it.
> > >
> > >
> > > This weekend, I set off to write some documentation to try to fill this
> > > gap. It starts off with a generic understanding of important ETL
> > principles
> > > and I'm currently working on a practical step-by-step example that
> > adheres
> > > to these principles with DAG implementations in airflow; i.e. showing
> how
> > > it can all fit together.
> > >
> > > You can find the current version here:
> > >
> > > https://gtoonstra.github.io/etl-with-airflow/index.html
> > >
> > >
> > > Looking forward to your comments. If you have better ideas how I can
> make
> > > this contribution, don't hesitate to contact me with your suggestions.
> > >
> > > Best regards,
> > >
> > > Gerard
> > >
> >
>


Re: Fwd: Issue with Dynamically created tasks in a DAG

2016-10-16 Thread Boris Tyukin
I opened a JIRA - looks like based on comments in other threads, it does not 
work properly right now.

[AIRFLOW-574] Show Graph/Tree view and Task Instance logs using executed 
DagRun, not current





Re: ETL best practices for airflow

2016-10-16 Thread Boris Tyukin
I really look forward to it, Gerard! I've read what you you wrote so far
and I really liked it - please keep up the great job!

I am hoping to see some best practices for the design of incremental loads
and using timestamps from source database systems (not being on UTC so
still confused about it in Airflow). Also practical use of subdags and
dynamic generation of tasks using some external metadata (maybe describe in
details something similar that wepay did
https://wecode.wepay.com/posts/airflow-wepay)


On Sun, Oct 16, 2016 at 5:23 PM, Gerard Toonstra 
wrote:

> Hi all,
>
> About a year ago, I contributed the HTTPOperator/Sensor and I've been
> tracking airflow since. Right now it looks like we're going to adopt
> airflow at the company I'm currently working at.
>
> In preparation for that, I've done a bit of research work how airflow
> pipelines should fit together, how important ETL principles are covered and
> decided to write this up on a documentation site. The airflow documentation
> site contains everything on how all airflow works and the constructs that
> you have available to build pipelines, but it can still be a challenge for
> newcomers to figure out how to put those constructs together to use it
> effectively.
>
> The articles I found online don't go into a lot of detail either. Airflow
> is built around an important philosophy towards ETL and there's a risk that
> newcomers simply pick up a really great tool and start off in the wrong way
> when using it.
>
>
> This weekend, I set off to write some documentation to try to fill this
> gap. It starts off with a generic understanding of important ETL principles
> and I'm currently working on a practical step-by-step example that adheres
> to these principles with DAG implementations in airflow; i.e. showing how
> it can all fit together.
>
> You can find the current version here:
>
> https://gtoonstra.github.io/etl-with-airflow/index.html
>
>
> Looking forward to your comments. If you have better ideas how I can make
> this contribution, don't hesitate to contact me with your suggestions.
>
> Best regards,
>
> Gerard
>


Re: A question/poll on the TaskInstance data model...

2016-10-15 Thread Boris Tyukin
thanks Ben for the explanation. Is there a Jira for this or do you want to
me open one? I think it is pretty important thing as all public talks
mentioned tasks generation programmatically (and dynamically) as a one of
the main features of Airflow. If we cannot see what was really generated in
the past and get to every task even if it does not exist anymore, it does
not complete this feature.

Also I am concerned at this point (very) that a lot of things require
restart of airflow scheduler and webserver - it does not look like a good
strategy to me. I realize most of this happening because of python caching
but as an end user, I do not really care :)

I will be also looking at Luigi and Oozie (leaving letter for last because
I get dizzy by looking at its xml). My use case is to generate tasks every
day for hundreds of tables and some table will come and go.

On Sat, Oct 15, 2016 at 3:01 PM, Ben Tallman  wrote:

> That is part of it. In this case, we aren't planning to store the contents
> of the DagBag, as it was when the DagRun was created (that was the pickling
> stuff that is deprecated), but it solves HALF of the problem. It allows us
> to begin at least drawing the graph as it was when it was run. Storing the
> DagBag Dag would begin to solve your problem as well.
>
> I would dearly love to have tasks generated at schedule time (not during
> the run), not every time the dag file is evaluated (every 3 minutes or so).
>
> There is disagreement as to the best way to handle this, however based on
> conversations that I've heard and participated in, the current preferred
> solution is to head down the path of a "git time machine". However that
> doesn't actually solve the problem that we see. Basically, we want to have
> the evaluation of the dag python file interogate outside systems to
> generate the tasks and have them run. The problem with the git time machine
> solution is that those outside systems are not static. They change over
> time. In the past, an effort was made to pickle the dag, and run from that,
> but pickling has it's own issues.
>
> To be clear, at the time, I think the goal of the pickling was to
> distribute the dag to distributed workers, not freeze it in time. I think
> that storing the pickled dag in the dagrun could probably solve this, but
> it is a major issue/change. It is one that I am beginning to work on for us
> though.
>
>
> Thanks,
> Ben
>
> *--*
> *ben tallman* | *apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=e558dca3-
> da0a-4d9f-c1b3-6cb9174fcb5e>*
>  | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=e558dca3-
> da0a-4d9f-c1b3-6cb9174fcb5e>
>  @apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%
> 3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=
> e558dca3-da0a-4d9f-c1b3-6cb9174fcb5e>
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=e558dca3-
> da0a-4d9f-c1b3-6cb9174fcb5e>
>
> On Sat, Oct 15, 2016 at 11:35 AM, Boris Tyukin 
> wrote:
>
> > Hi Ben,
> >
> > is it to address the issue I just described yesterday "Issue with
> > Dynamically created tasks in a DAG"?
> >
> > I was hoping someone can confirm this as a bug and if there is a JIRA to
> > address that - otherwise I would be happy to open one. To me it is a
> pretty
> > major issue and a very misleading one especially because Airflow's key
> > feature is to generate/update DAGs programmatically
> >
>


Re: A question/poll on the TaskInstance data model...

2016-10-15 Thread Boris Tyukin
Hi Ben,

is it to address the issue I just described yesterday "Issue with Dynamically 
created tasks in a DAG"?

I was hoping someone can confirm this as a bug and if there is a JIRA to 
address that - otherwise I would be happy to open one. To me it is a pretty 
major issue and a very misleading one especially because Airflow's key feature 
is to generate/update DAGs programmatically 


Re: scheduler questions

2016-10-14 Thread Boris Tyukin
Done. It makes sense to me Ben as backfill concept is very confusing to me. I 
think it even should be off by default. 

On 2016-10-13 12:05 (-0400), Ben Tallman  wrote: 
> Boris -
> 
> We have a pull request in which causes the scheduler to not backfill on a
> per dag basis. This is designed for exactly this situation. Basically, the
> scheduler will skip intervals and jump to the last one in the list if this
> flag is set. If this is important to you, please vote for it.
> 
> https://github.com/apache/incubator-airflow/pull/1830
> 
> For instance:
> dag = DAG(
> "test_dag_id_here",
> "backfill": False
> , ...
> )
> 
> 
> 
> Thanks,
> Ben
> 
> *--*
> *ben tallman* | *apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=64fe41ec-e85d-4ccd-89a6-fa51d2a93409>*
>  | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=64fe41ec-e85d-4ccd-89a6-fa51d2a93409>
>  @apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=64fe41ec-e85d-4ccd-89a6-fa51d2a93409>
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=64fe41ec-e85d-4ccd-89a6-fa51d2a93409>
> 
> On Thu, Oct 13, 2016 at 8:07 AM, Joseph Napolitano <
> joseph.napolit...@blueapron.com.invalid> wrote:
> 
> > Hi Boris,
> >
> > To answer the first question, the backfill command has a flag to mark jobs
> > as successful without running them.  Take care to align the start and end
> > times precisely as needed.  As an example, for a job that runs daily at
> > 7am:
> >
> > airflow backfill -s 2016-10-07T07 -e 2016-10-10T07 my-dag-name -m
> >
> > The "-m" parameter tells Airflow to mark it successful without running it.
> >
> > On Thu, Oct 13, 2016 at 10:46 AM, Boris Tyukin 
> > wrote:
> >
> > > Hello all and thanks for such an amazing project! I have been evaluating
> > > Airflow and spent a few days reading about it and playing with it and I
> > > have a few questions that I struggle to understand.
> > >
> > > Let's say I have a simple DAG that runs once a day and it is doing a full
> > > reload of tables from the source database so the process is not
> > > incremental.
> > >
> > > Let's consider this scenario:
> > >
> > > Day 1 - OK
> > >
> > > Day 2 - airflow scheduler or server with airflow is down for some reason
> > > ((or
> > > DAG is paused)
> > >
> > > Day 3 - still down(or DAG is paused)
> > >
> > > Day 4 - server is up and now needs to run missing jobs.
> > >
> > >
> > > How can I make airflow to run only Day 4 job and not backfill Day 2 and
> > 3?
> > >
> > >
> > > I tried to do depend_on_past = True but it does not seem to do this
> > trick.
> > >
> > >
> > > I also found in a roadmap doc this but seems it is not made to the
> > release
> > > yet:
> > >
> > >
> > >  Only Run Latest - Champion : Sid
> > >
> > > • For cases where we need to only run the latest in a series of task
> > > instance runs and mark the others as skipped. For example, we may have
> > job
> > > to execute a DB snapshot every day. If the DAG is paused for 5 days and
> > > then unpaused, we don’t want to run all 5, just the latest. With this
> > > feature, we will provide “cron” functionality for task scheduling 
> > > that is
> > > not related to ETL
> > >
> > >
> > > My second question, what if I have another DAG that does incremental
> > loads
> > > from a source table:
> > >
> > >
> > > Day 1 - OK, loaded new/changed data for previous day
> > >
> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> > >
> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> > >
> > > Day 4 - source system is up, Airflow Dagrun succeeded
> > >
> > >
> > > My problem (unless I am missing something), Airflow on Day 4 would use
> > > execution time from Day 3, so the interval for incremental load would be
> > > since the last run (which was Failed). My hope it would use the last
> > > _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> > > achieve this?
> > >
> > > I am aware of a manual backfill command via CLI but I am not sure I want
> > to
> > > use due to all the issues and inconsistencies I've read about it.
> > >
> > > Thanks!
> > >
> >
> >
> >
> > --
> > *Joe Napolitano *| Sr. Data Engineer
> > www.blueapron.com | 5 Crosby Street, New York, NY 10013
> >
> 


Fwd: Issue with Dynamically created tasks in a DAG

2016-10-14 Thread Boris Tyukin
One of the selling points for me to use airflow for our new project is an
ability to create tasks programmatically on every run. People mentioned
that in various talks then they would generate tasks on every run, pulling
a list of files or using some external configs (YAML) etc.

I also found this example https://gist.github.com/mtustin-handy/
ecd80c1cc9dcad1c4cf7

I created a quick dag similar to above and I am observing some weird issues
(using Airflow v1.7.1.3 and Sequential Executor).

My Dag has a list like
tables_list = ['table1','table2']

Then i would create a first task (dummy) and then generate bash operators
for every table in a list and use first dummy task as upstream task,

It works great on a first run - all tasks created properly.

Then I change the list to add a new table3:
tables_list = ['table1','table2','table3']

DAG runs again but I do not see table3 in the Graph or Tree view. I do see
table3 task under Task Instance View so it was generated. But if I click on
it, i would get an error like Task [dynamic_job_proto_v1.t_table3] doesn't
seem to exist at the moment

Then I restarted the scheduler - same thing. New Dags would not show that
task.

then I restarted airflow webserver - this time I was able to see table3
task in views.

After that I removed table2 from my list and DAG ran again - same issue.
Table2 was still in views untill i restarted the webserver. After the
restart, table2 dissappered from previosly ran Dags which is bad because
now i cannot go back in history, cannot compare execution time etc.

Is this a known bug?


Re: scheduler questions

2016-10-13 Thread Boris Tyukin
you rock, Sid! thanks for taking your time explaining it for me

On Thu, Oct 13, 2016 at 6:10 PM, siddharth anand  wrote:

> I can't see an image.
>
> We run most of our dags with depends_on_past=True.
>
> If you want to chain your dag runs, such as not starting the first task of
> your dag run start until the last task of your previous dag runs completes,
> you can use an external task sensor. The external task sensor would be the
> first task in the dag and would depend on the last task in the same dag
> from the previous dag run. This is strict dag chaining.
>
> If you just don't want the same task in the subsequent dag run to get
> scheduled unless the first task completes, depends_on_past=True helps
> there. This is more a cascading effect in the tree view.
> -s
>
> On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin 
> wrote:
>
> > This is not what I see actually. I posted below my test DAG and a
> > screenshot.
> >
> > It does create DAGRuns on subsequent runs - I modeled that scenario by
> > commenting one bash command and uncommenting another one with Exit 1.
> >
> > it does not create Task Instances on subsequent failed DAGs but it does
> > create DAGRuns and the first successful run after failed ones would not
> > have execution timestamp from last successful run
> >
> >
> > [image: Inline image 1]
> >
> >
> > here is my test DAG
> >
> >
> >
> > from datetime import datetime, timedelta
> >
> > # Determine schedule:
> > dag_schedule_interval = timedelta(seconds=60)
> > dag_start_date = datetime.now() - dag_schedule_interval
> >
> >
> > default_args = {
> > 'owner': 'airflow',
> > 'depends_on_past': True,
> > 'start_date': dag_start_date,
> > # 'start_date': datetime(2016, 10, 11, 17, 0),
> > 'email': ['airf...@airflow.com'],
> > 'email_on_failure': False,
> > 'email_on_retry': False,
> > 'retries': 0,
> > 'retry_delay': timedelta(seconds=20),
> > ,'only_run_latest'=True,
> > # 'queue': 'bash_queue',
> > # 'pool': 'backfill',
> > # 'priority_weight': 10,
> > # 'end_date': datetime(2016, 1, 1),
> > }
> >
> > # Change version number if schedule needs to be changed:
> > dag = DAG(
> > 'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> > schedule_interval)
> >
> > dag.doc_md = __doc__
> >
> > # t1, t2 and t3 are examples of tasks created by instantiating operators
> > t1 = BashOperator(
> > task_id='t1',
> > bash_command='echo execution ts {{ ts }} & echo 1',
> > # bash_command='exit 1',
> > dag=dag)
> >
> > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand 
> > wrote:
> >
> >> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
> >>
> >> -s
> >>
> >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
> >> wrote:
> >>
> >> > Yes! It does work with Depends_on_past=True.
> >> > -s
> >> >
> >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin  >
> >> > wrote:
> >> >
> >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> >> will
> >> >> it work with depend_on_past = True? or it will assume that DAG is
> used
> >> >> False?
> >> >>
> >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> >> >> wrote:
> >> >>
> >> >> > Boris,
> >> >> >
> >> >> > *Question 1*
> >> >> > Only_Run_Latest is in master -
> >> >> > https://github.com/apache/incubator-airflow/commit/
> >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> >> > That will solve your problem.
> >> >> >
> >> >> > Releases come out one a quarter sometimes once every 2 quarters,
> so I
> >> >> would
> >> >> > recommend that you run off master or off your own fork.
> >> >> >
> >> >> > You could also achieve this yourself with the following code
> >> snippet. It
> >> >>

Re: scheduler questions

2016-10-13 Thread Boris Tyukin
This is not what I see actually. I posted below my test DAG and a
screenshot.

It does create DAGRuns on subsequent runs - I modeled that scenario by
commenting one bash command and uncommenting another one with Exit 1.

it does not create Task Instances on subsequent failed DAGs but it does
create DAGRuns and the first successful run after failed ones would not
have execution timestamp from last successful run


[image: Inline image 1]


here is my test DAG



from datetime import datetime, timedelta

# Determine schedule:
dag_schedule_interval = timedelta(seconds=60)
dag_start_date = datetime.now() - dag_schedule_interval


default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': dag_start_date,
# 'start_date': datetime(2016, 10, 11, 17, 0),
'email': ['airf...@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(seconds=20),
,'only_run_latest'=True,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

# Change version number if schedule needs to be changed:
dag = DAG(
'pipeline1_v8', default_args=default_args,
schedule_interval=dag_schedule_interval)

dag.doc_md = __doc__

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='t1',
bash_command='echo execution ts {{ ts }} & echo 1',
# bash_command='exit 1',
dag=dag)

On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand  wrote:

> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
>
> -s
>
> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
> wrote:
>
> > Yes! It does work with Depends_on_past=True.
> > -s
> >
> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
> > wrote:
> >
> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> will
> >> it work with depend_on_past = True? or it will assume that DAG is used
> >> False?
> >>
> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> >> wrote:
> >>
> >> > Boris,
> >> >
> >> > *Question 1*
> >> > Only_Run_Latest is in master -
> >> > https://github.com/apache/incubator-airflow/commit/
> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> > That will solve your problem.
> >> >
> >> > Releases come out one a quarter sometimes once every 2 quarters, so I
> >> would
> >> > recommend that you run off master or off your own fork.
> >> >
> >> > You could also achieve this yourself with the following code snippet.
> It
> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
> >> DagRun
> >> > being executed is not the current one. It will work for any schedule.
> >> The
> >> > code below has essentially been implemented in the LatestOnlyOperator
> >> above
> >> > for convenience.
> >> >
> >> > def skip_to_current_job(ds, **kwargs):
> >> >
> >> > now = datetime.now()
> >> >
> >> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> >> > date'])
> >> >
> >> >     right_window = kwargs['dag'].following_schedule(left_window)
> >> >
> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
> >> > ).format(left_window,now,right_window))
> >> >
> >> > if not now <= right_window:
> >> >
> >> > logging.info('Not latest execution, skipping downstream.')
> >> >
> >> > return False
> >> >
> >> > return True
> >> >
> >> >
> >> > t0 = ShortCircuitOperator(
> >> >
> >> >   task_id = 'short_circuit_if_not_current,
> >> >
> >> >   provide_context = True,
> >> >
> >> >   python_callable = skip_to_current_job,
> >> >
> >> >   dag = dag
> >> >
> >> > )
> >> >
> >> >
> >> > -s
> >> >
> >> >
> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> >> > wrote:
> >> >

Re: scheduler questions

2016-10-13 Thread Boris Tyukin
so for my second scenario, I think i would still need to run missing days
jobs one by one (by clearing the failed ones) and I understand this is
recommended approach as I figured from Maxime's video.

But sometimes it is more efficient to combine all missing day runs in one
so I would be using a window for incremental process as the last successful
job (Day 1) to current run (Day 4) - so it will be only one DagRun not 3 to
catch up. Does it make sense? is it possible?

On Thu, Oct 13, 2016 at 1:16 PM, siddharth anand  wrote:

> *Question 2*
> You can use depend_on_past=True. Then, future dag runs won't be scheduled
> until past one succeed, which I specify as shown below:
>
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': True,
> 'pool': 'ep_data_pipeline',
> 'start_date': START_DATE,
> 'email': [import_ep_pipeline_alert_email_dl],
> 'email_on_failure': import_airflow_enable_notifications,
> 'email_on_retry': import_airflow_enable_notifications,
> 'retries': 3,
> 'retry_delay': timedelta(seconds=30),
> 'priority_weight': import_airflow_priority_weight}dag =
> DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args,
> sla_miss_callback=sla_alert_func)
>
>
>
> I also use retries to sidestep intermittent issues. If you need to retry a
> failed dag run, you can clear that dag run in the UI (or CLI) and the
> scheduler will rerun it.
>
> On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand 
> wrote:
>
> > Boris,
> >
> > *Question 1*
> > Only_Run_Latest is in master - https://github.com/apache/
> > incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That
> > will solve your problem.
> >
> > Releases come out one a quarter sometimes once every 2 quarters, so I
> > would recommend that you run off master or off your own fork.
> >
> > You could also achieve this yourself with the following code snippet. It
> > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> > being executed is not the current one. It will work for any schedule. The
> > code below has essentially been implemented in the LatestOnlyOperator
> above
> > for convenience.
> >
> > def skip_to_current_job(ds, **kwargs):
> >
> > now = datetime.now()
> >
> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'
> > ])
> >
> > right_window = kwargs['dag'].following_schedule(left_window)
> >
> > logging.info(('Left Window {}, Now {}, Right Window {}'
> > ).format(left_window,now,right_window))
> >
> >     if not now <= right_window:
> >
> > logging.info('Not latest execution, skipping downstream.')
> >
> > return False
> >
> > return True
> >
> >
> > t0 = ShortCircuitOperator(
> >
> >   task_id = 'short_circuit_if_not_current,
> >
> >   provide_context = True,
> >
> >   python_callable = skip_to_current_job,
> >
> >   dag = dag
> >
> > )
> >
> >
> > -s
> >
> >
> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> > wrote:
> >
> >> Hello all and thanks for such an amazing project! I have been evaluating
> >> Airflow and spent a few days reading about it and playing with it and I
> >> have a few questions that I struggle to understand.
> >>
> >> Let's say I have a simple DAG that runs once a day and it is doing a
> full
> >> reload of tables from the source database so the process is not
> >> incremental.
> >>
> >> Let's consider this scenario:
> >>
> >> Day 1 - OK
> >>
> >> Day 2 - airflow scheduler or server with airflow is down for some reason
> >> ((or
> >> DAG is paused)
> >>
> >> Day 3 - still down(or DAG is paused)
> >>
> >> Day 4 - server is up and now needs to run missing jobs.
> >>
> >>
> >> How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >>
> >>
> >> I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >>
> >>
> >> I also found in a roadmap doc this but seems it is not made to the
> release
> >> yet:
> >>
> >>
> >>  Only Run Latest - Champion : Sid
> >>
> >> • For cases where we need t

Re: scheduler questions

2016-10-13 Thread Boris Tyukin
thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will
it work with depend_on_past = True? or it will assume that DAG is used
False?

On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand  wrote:

> Boris,
>
> *Question 1*
> Only_Run_Latest is in master -
> https://github.com/apache/incubator-airflow/commit/
> edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> That will solve your problem.
>
> Releases come out one a quarter sometimes once every 2 quarters, so I would
> recommend that you run off master or off your own fork.
>
> You could also achieve this yourself with the following code snippet. It
> uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> being executed is not the current one. It will work for any schedule. The
> code below has essentially been implemented in the LatestOnlyOperator above
> for convenience.
>
> def skip_to_current_job(ds, **kwargs):
>
> now = datetime.now()
>
> left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'])
>
> right_window = kwargs['dag'].following_schedule(left_window)
>
> logging.info(('Left Window {}, Now {}, Right Window {}'
> ).format(left_window,now,right_window))
>
> if not now <= right_window:
>
> logging.info('Not latest execution, skipping downstream.')
>
> return False
>
> return True
>
>
> t0 = ShortCircuitOperator(
>
>   task_id = 'short_circuit_if_not_current,
>
>   provide_context = True,
>
>   python_callable = skip_to_current_job,
>
>   dag = dag
>
> )
>
>
> -s
>
>
> On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> wrote:
>
> > Hello all and thanks for such an amazing project! I have been evaluating
> > Airflow and spent a few days reading about it and playing with it and I
> > have a few questions that I struggle to understand.
> >
> > Let's say I have a simple DAG that runs once a day and it is doing a full
> > reload of tables from the source database so the process is not
> > incremental.
> >
> > Let's consider this scenario:
> >
> > Day 1 - OK
> >
> > Day 2 - airflow scheduler or server with airflow is down for some reason
> > ((or
> > DAG is paused)
> >
> > Day 3 - still down(or DAG is paused)
> >
> > Day 4 - server is up and now needs to run missing jobs.
> >
> >
> > How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >
> >
> > I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >
> >
> > I also found in a roadmap doc this but seems it is not made to the
> release
> > yet:
> >
> >
> >  Only Run Latest - Champion : Sid
> >
> > • For cases where we need to only run the latest in a series of task
> > instance runs and mark the others as skipped. For example, we may have
> job
> > to execute a DB snapshot every day. If the DAG is paused for 5 days and
> > then unpaused, we don’t want to run all 5, just the latest. With this
> > feature, we will provide “cron” functionality for task scheduling that is
> > not related to ETL
> >
> >
> > My second question, what if I have another DAG that does incremental
> loads
> > from a source table:
> >
> >
> > Day 1 - OK, loaded new/changed data for previous day
> >
> > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> >
> > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> >
> > Day 4 - source system is up, Airflow Dagrun succeeded
> >
> >
> > My problem (unless I am missing something), Airflow on Day 4 would use
> > execution time from Day 3, so the interval for incremental load would be
> > since the last run (which was Failed). My hope it would use the last
> > _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> > achieve this?
> >
> > I am aware of a manual backfill command via CLI but I am not sure I want
> to
> > use due to all the issues and inconsistencies I've read about it.
> >
> > Thanks!
> >
>


scheduler questions

2016-10-13 Thread Boris Tyukin
Hello all and thanks for such an amazing project! I have been evaluating
Airflow and spent a few days reading about it and playing with it and I
have a few questions that I struggle to understand.

Let's say I have a simple DAG that runs once a day and it is doing a full
reload of tables from the source database so the process is not incremental.

Let's consider this scenario:

Day 1 - OK

Day 2 - airflow scheduler or server with airflow is down for some reason ((or
DAG is paused)

Day 3 - still down(or DAG is paused)

Day 4 - server is up and now needs to run missing jobs.


How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?


I tried to do depend_on_past = True but it does not seem to do this trick.


I also found in a roadmap doc this but seems it is not made to the release
yet:


 Only Run Latest - Champion : Sid

• For cases where we need to only run the latest in a series of task
instance runs and mark the others as skipped. For example, we may have job
to execute a DB snapshot every day. If the DAG is paused for 5 days and
then unpaused, we don’t want to run all 5, just the latest. With this
feature, we will provide “cron” functionality for task scheduling that is
not related to ETL


My second question, what if I have another DAG that does incremental loads
from a source table:


Day 1 - OK, loaded new/changed data for previous day

Day 2 - source system is down (or DAG is paused), Airflow DagRun failed

Day 3 - source system is down (or DAG is paused), Airflow DagRun failed

Day 4 - source system is up, Airflow Dagrun succeeded


My problem (unless I am missing something), Airflow on Day 4 would use
execution time from Day 3, so the interval for incremental load would be
since the last run (which was Failed). My hope it would use the last
_successful_ run so on Day 4 it would go back to Day 1. Is it possible to
achieve this?

I am aware of a manual backfill command via CLI but I am not sure I want to
use due to all the issues and inconsistencies I've read about it.

Thanks!