Re: Beam spark 2.x runner status

2017-03-22 Thread Jean-Baptiste Onofré

Hi guys,

Ismaël summarize well what I have in mind.

I'm a bit late on the PoC around that (I started a branch already).
I will move forward over the week end.

Regards
JB

On 03/22/2017 11:42 PM, Ismaël Mejía wrote:

Amit, I suppose JB is talking about the RDD based version, so no need
to worry about SparkSession or different incompatible APIs.

Remember the idea we are discussing is to have in master both the
spark 1 and spark 2 runners using the RDD based translation. At the
same time we can have a feature branch to evolve the DataSet based
translator (this one will replace the RDD based translator for spark 2
once it is mature).

The advantages have been already discussed as well as the possible
issues so I think we have to see now if JB's idea is feasible and how
hard would be to live with this while the DataSet version evolves.

I think what we are trying to avoid is to have a long living branch
for a spark 2 runner based on RDD  because the maintenance burden
would be even worse. We would have to fight not only with the double
merge of fixes (in case the profile idea does not work), but also with
the continue evolution of Beam and we would end up in the long living
branch mess that others runners have dealt with (e.g. the Apex runner)

https://lists.apache.org/thread.html/12cc086f5ffe331cc70b89322ce5416c3112b87efc3393e3e16032a2@%3Cdev.beam.apache.org%3E

What do you think about this Amit ? Would you be ok to go with it if
JB's profile idea proves to help with the msintenance issues ?

Ismaël



On Wed, Mar 22, 2017 at 5:53 PM, Ted Yu  wrote:

hbase-spark module doesn't use SparkSession. So situation there is simpler
:-)

On Wed, Mar 22, 2017 at 5:35 AM, Amit Sela  wrote:


I'm still wondering how we'll do this - it's not just different
implementations of the same Class, but a completely different concepts such
as using SparkSession in Spark 2 instead of SparkContext/StreamingContext
in Spark 1.

On Tue, Mar 21, 2017 at 7:25 PM Ted Yu  wrote:


I have done some work over in HBASE-16179 where compatibility modules are
created to isolate changes in Spark 2.x API so that code in hbase-spark
module can be reused.

FYI





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Docker image dependencies

2017-03-22 Thread Stephen Sisk
hey Ismael,
I appreciate you asking questions to make sure we're doing the right things
to help developers out and making sure it's easy to add more IO ITs.

I strongly agree that we need to make sure we have documentation that
clearly lays out how to get it working.

The setup & teardown scripts for postgres in jdbc's
src/test/resources/kubernetes directory *should* work on a vanilla
kubernetes cluster (it's how I setup them up) - I deliberately did not do
anything fancy when creating my kubernetes cluster. Probably the only thing
that I know of that might be tricky is that the script is currently set up
so that only exposes the postgres service on Node(vm) IPs - that probably
needs documentation on how to use it with the tests. (basically, you should
be able to take the IP address of any of the k8 VMs and use that as the IP
address of postgres - k8 will proxy that over to the correct container.)

I added a few rough notes here in the testing doc:
https://docs.google.com/document/d/153J9jPQhMCNi_eBzJfhAg-NprQ7vbf1jNVRgdqeEE8I/edit#heading=h.l06g9u1ejw4l


I'm definitely interested to hear what questions you have.

S

On Wed, Mar 22, 2017 at 3:56 PM Ismaël Mejía  wrote:

> You have really good points, I agree 100%, docker is easier if it is
> local, once we talk about distributions all of them has their
> pros/cons. I don’t intend to re open the discussion and of course it
> would be silly to go back and remake all the work you already have
> done.
>
> We already agreed on kubernetes and this is it. My point mentioning
> docker-compose was more from the we need to make the life of IT tests
> contributors easier, and maybe adding an extra tool is not the way,
> but at least we will need better documentation or references to help
> developers bootstrap their Kubernetes so they can contribute and
> validate the tests in their own.
>
> On Wed, Mar 22, 2017 at 12:14 AM, Stephen Sisk 
> wrote:
> > Hey Ismael,
> >
> > I definitely agree with you that we want something that developers will
> > actually be able to/want to use.
> >
> > in my experience *all* the container orchestration engines are
> non-trivial
> > to set up. When I started examining solutions for beam hosting, I did
> > installs of mesos, kubernetes and docker. Docker is easier in the "run
> only
> > on my local machine" case if devs have it set up, but to do anything
> > interesting (ie, interact with machines that aren't already yours), they
> > all involve work to get them setup on each machine you want to use[4].
> >
> > Kubernetes has some options that make it extremely simple to setup - both
> > AWS[2] and GCE[3] seem to be straightforward to set up for simple dev
> > clusters, with scripts to automate the process (I'm assuming docker has
> > similar setups.)
> >
> > Once kubernetes is set up, it's also a simple yaml file + command to set
> up
> > multiple machines. The kubernetes setup for postgres[5] shows a simple
> one
> > machine example, and the kubernetes setups for HIFIO[6] show
> multi-machine
> > examples.
> >
> > We've spent a lot of time discussing the various options - when we talked
> > about this earlier [1] we decided we would move forward with
> investigating
> > kubernetes, so that's what I used for the IO ITs work I've been doing,
> > which we've now gotten working.
> >
> > Do you feel the advantages of docker are such that we should re-open the
> > discussion and potentially re-do the work we've done so far to get k8
> > working?
> >
> > I took a genuine look at docker earlier in the process and it didn't seem
> > like it was better than the other options in any dimensions (other than
> > "developers usually have it installed already"), and kubernetes/mesos
> > seemed to be more stable/have more of the features discussed in [1].
> > Perhaps that's changed?
> >
> > I think we are just starting to use container orchestration engines, and
> so
> > while I don't want to throw away the work we've done so far, I also don't
> > want to have to do it later if there are reasons we knew about now. :)
> >
> > S
> >
> > [1]
> >
> https://lists.apache.org/thread.html/9fd3c51cb679706efa4d0df2111a6ac438b851818b639aba644607af@%3Cdev.beam.apache.org%3E
> >
> > [2] k8 AWS - https://kubernetes.io/docs/getting-started-guides/aws/
> > [3] k8 GKE - https://cloud.google.com/container-engine/docs/quickstart
> or
> > https://kubernetes.io/docs/getting-started-guides/gce/
> > [4] docker swarm on GCE -
> >
> https://rominirani.com/docker-swarm-on-google-compute-engine-364765b400ed#.gzvruzis9
> >
> > [5] postgres k8 script -
> >
> https://github.com/apache/beam/tree/master/sdks/java/io/jdbc/src/test/resources/kubernetes
> >
> > [6]
> >
> https://github.com/diptikul/incubator-beam/tree/HIFIO-CS-ES/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes
> >
> >
> > On Mon, Mar 20, 2017 at 3:25 PM Ismaël Mejía  wrote:
> >
> > I have somehow forgotten this one.
> >
> >> Basically - I'm trying to keep number of tools at a minimum while still
> >> p

Re: First IO IT Running!

2017-03-22 Thread Ismaël Mejía
Excellent news, I am eager to see more IOs/Runners been included in
the Integration Tests, and I will be glad to contribute in anything I
can.

Congratulations for this important milestone.
Ismaël

ps. I will try to reproduce the Kubernetes setup so I will be
eventually annoying you with questions.

On Wed, Mar 22, 2017 at 11:28 AM, Aljoscha Krettek  wrote:
> Great news! I can’t wait to also have support for this for the Flink Runner. 
> Which is partially blocked by me or others working on the Flink Runner, I 
> guess… :-(
>> On 22 Mar 2017, at 05:15, Jean-Baptiste Onofré  wrote:
>>
>> Awesome !!! Great news !
>>
>> Thanks guys for that !
>>
>> I started to implement IT in JMS, MQTT, Redis, Cassandra IOs. I keep you 
>> posted.
>>
>> Regards
>> JB
>>
>> On 03/21/2017 11:01 PM, Stephen Sisk wrote:
>>> I'm really excited to see these tests are running!
>>>
>>> These Jdbc tests are testing against a postgres instance - that instance is
>>> running on the kubernetes cluster I've set up for beam IO ITs as discussed
>>> in the "Hosting data stores for IO transform testing" thread[0]. I set up
>>> that postgres instance using the kubernetes scripts for Jdbc[1]. Anyone can
>>> run their own kubernetes cluster and do the same thing for themselves to
>>> run the ITs. (I'd actually to love to hear about that if anyone does it.)
>>>
>>> I'm excited to get a few more ITs using this infrastructure so we can test
>>> it out/smooth out the remaining rough edges in creating ITs. I'm happy to
>>> answer questions about that on the mailing list, but we obviously have to
>>> have the process written down - the Testing IO Transforms in Apache Beam
>>> doc [2] covers how to do this, but is still rough. I'm working on getting
>>> that up on the website and ironing out the rough edges [3], but generally
>>> reading that doc plus checking out how the JdbcIO or ElasticsearchIO tests
>>> work should give you a sense of how to get it working. I'm also thinking we
>>> might want to simplify the way we do data loading, so I don't consider this
>>> process fully stabilized, but I'll port code written according to the
>>> current standards to the new standards if we make changes.
>>>
>>> ElasticsearchIO has all the prerequisites, so I'd like to get them going in
>>> the near future. I know JB has started on this in his RedisIO PR, and the
>>> HadoopInputFormatIO also has ITs & k8 scripts, so there's more in the pipe.
>>> For now, each datastore has to be manually set up, but I'd like to automate
>>> that process - I'll file a JIRA ticket shortly for that.
>>>
>>> Thanks,
>>> Stephen
>>> [0] Hosting data stores for IO transform testing -
>>> https://lists.apache.org/thread.html/9fd3c51cb679706efa4d0df2111a6ac438b851818b639aba644607af@%3Cdev.beam.apache.org%3E
>>> [1] Postgres k8 scripts -
>>> https://github.com/apache/beam/tree/master/sdks/java/io/jdbc/src/test/resources/kubernetes
>>> [2] IO testing guide -
>>> https://docs.google.com/document/d/153J9jPQhMCNi_eBzJfhAg-NprQ7vbf1jNVRgdqeEE8I/edit?usp=sharing
>>> [3] Jira for IO guide - https://issues.apache.org/jira/browse/BEAM-1025
>>>
>>> On Tue, Mar 21, 2017 at 2:28 PM Jason Kuster 
>>> 
>>> wrote:
>>>
 Hi all,

 Exciting news! As of yesterday, we have checked in the Jenkins
 configuration for our first continuously running IO Integration Test! You
 can check it out in Jenkins here[1]. We’re also publishing results to a
 database, and we’ve turned up a basic dashboarding system where you can see
 the results here[2]. Caveat: there are only two runs, and we’ll be tweaking
 the underlying system still, so don’t panic that we’re up and to the right
 currently. ;)

 This is the first test running continuously on top of the performance / IO
 testing infrastructure described in this doc[3].  Initial support for Beam
 is now present in PerfKit Benchmarker; given what they had already, it was
 easiest to add support for Dataflow and Java. We need your help to add
 additional support! The doc lists a number of JIRA issues to build out
 support for other systems. I’m happy to work with people to help them
 understand what is necessary for these tasks; just send an email to the
 list if you need help and I’ll help you move forwards.

 Looking forward to it!

 Jason

 [1] https://builds.apache.org/job/beam_PerformanceTests_JDBC/
 [2]
 https://apache-beam-testing.appspot.com/explore?dashboard=5714163003293696
 [3]

 https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73

 --
 ---
 Jason Kuster
 Apache Beam / Google Cloud Dataflow

>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>


Re: Docker image dependencies

2017-03-22 Thread Ismaël Mejía
You have really good points, I agree 100%, docker is easier if it is
local, once we talk about distributions all of them has their
pros/cons. I don’t intend to re open the discussion and of course it
would be silly to go back and remake all the work you already have
done.

We already agreed on kubernetes and this is it. My point mentioning
docker-compose was more from the we need to make the life of IT tests
contributors easier, and maybe adding an extra tool is not the way,
but at least we will need better documentation or references to help
developers bootstrap their Kubernetes so they can contribute and
validate the tests in their own.

On Wed, Mar 22, 2017 at 12:14 AM, Stephen Sisk  wrote:
> Hey Ismael,
>
> I definitely agree with you that we want something that developers will
> actually be able to/want to use.
>
> in my experience *all* the container orchestration engines are non-trivial
> to set up. When I started examining solutions for beam hosting, I did
> installs of mesos, kubernetes and docker. Docker is easier in the "run only
> on my local machine" case if devs have it set up, but to do anything
> interesting (ie, interact with machines that aren't already yours), they
> all involve work to get them setup on each machine you want to use[4].
>
> Kubernetes has some options that make it extremely simple to setup - both
> AWS[2] and GCE[3] seem to be straightforward to set up for simple dev
> clusters, with scripts to automate the process (I'm assuming docker has
> similar setups.)
>
> Once kubernetes is set up, it's also a simple yaml file + command to set up
> multiple machines. The kubernetes setup for postgres[5] shows a simple one
> machine example, and the kubernetes setups for HIFIO[6] show multi-machine
> examples.
>
> We've spent a lot of time discussing the various options - when we talked
> about this earlier [1] we decided we would move forward with investigating
> kubernetes, so that's what I used for the IO ITs work I've been doing,
> which we've now gotten working.
>
> Do you feel the advantages of docker are such that we should re-open the
> discussion and potentially re-do the work we've done so far to get k8
> working?
>
> I took a genuine look at docker earlier in the process and it didn't seem
> like it was better than the other options in any dimensions (other than
> "developers usually have it installed already"), and kubernetes/mesos
> seemed to be more stable/have more of the features discussed in [1].
> Perhaps that's changed?
>
> I think we are just starting to use container orchestration engines, and so
> while I don't want to throw away the work we've done so far, I also don't
> want to have to do it later if there are reasons we knew about now. :)
>
> S
>
> [1]
> https://lists.apache.org/thread.html/9fd3c51cb679706efa4d0df2111a6ac438b851818b639aba644607af@%3Cdev.beam.apache.org%3E
>
> [2] k8 AWS - https://kubernetes.io/docs/getting-started-guides/aws/
> [3] k8 GKE - https://cloud.google.com/container-engine/docs/quickstart or
> https://kubernetes.io/docs/getting-started-guides/gce/
> [4] docker swarm on GCE -
> https://rominirani.com/docker-swarm-on-google-compute-engine-364765b400ed#.gzvruzis9
>
> [5] postgres k8 script -
> https://github.com/apache/beam/tree/master/sdks/java/io/jdbc/src/test/resources/kubernetes
>
> [6]
> https://github.com/diptikul/incubator-beam/tree/HIFIO-CS-ES/sdks/java/io/hadoop/jdk1.8-tests/src/test/resources/kubernetes
>
>
> On Mon, Mar 20, 2017 at 3:25 PM Ismaël Mejía  wrote:
>
> I have somehow forgotten this one.
>
>> Basically - I'm trying to keep number of tools at a minimum while still
>> providing good support for the functionality we need. Does docker-compose
>> provide something beyond the functionality that k8 does? I'm not familiar
>> with docker-compose, but looking at
>> https://docs.docker.com/ it doesn't
>> seem to provide anything that k8 doesn't already.
>
> I agree to have the most minimal set of tools, I mentioned
> docker-compose because I consider also its advantages because its
> installation is trivial compared to kubernetes (or even minikube for a
> local install), docker-compose does not have any significant advantage
> over kubernetes apart of been easier to install/use.
>
> But well, better to be consistent and go full with kubernetes, however
> we need to find a way to help IO authors to bootstrap this, because
> from my experience creating a cluster with docker-compose is a yaml
> file + a command, not sure if the basic installation and run of
> kubernetes is that easy.
>
> Ismaël
>
> On Wed, Mar 15, 2017 at 8:09 PM, Stephen Sisk 
> wrote:
>> thanks for the discussion! In general, I agree with the sentiments
>> expressed here. I updated
>>
> https://docs.google.com/document/d/153J9jPQhMCNi_eBzJfhAg-NprQ7vbf1jNVRgdqeEE8I/edit#heading=h.hlirex1vus1a
>> to
>> reflect this discussion. (The plan is still that I will put that on the
>> website.)
>>
>> Apache Docker Repository - are you talking about
>> https://hub.dock

Re: Beam spark 2.x runner status

2017-03-22 Thread Ismaël Mejía
Amit, I suppose JB is talking about the RDD based version, so no need
to worry about SparkSession or different incompatible APIs.

Remember the idea we are discussing is to have in master both the
spark 1 and spark 2 runners using the RDD based translation. At the
same time we can have a feature branch to evolve the DataSet based
translator (this one will replace the RDD based translator for spark 2
once it is mature).

The advantages have been already discussed as well as the possible
issues so I think we have to see now if JB's idea is feasible and how
hard would be to live with this while the DataSet version evolves.

I think what we are trying to avoid is to have a long living branch
for a spark 2 runner based on RDD  because the maintenance burden
would be even worse. We would have to fight not only with the double
merge of fixes (in case the profile idea does not work), but also with
the continue evolution of Beam and we would end up in the long living
branch mess that others runners have dealt with (e.g. the Apex runner)

https://lists.apache.org/thread.html/12cc086f5ffe331cc70b89322ce5416c3112b87efc3393e3e16032a2@%3Cdev.beam.apache.org%3E

What do you think about this Amit ? Would you be ok to go with it if
JB's profile idea proves to help with the msintenance issues ?

Ismaël



On Wed, Mar 22, 2017 at 5:53 PM, Ted Yu  wrote:
> hbase-spark module doesn't use SparkSession. So situation there is simpler
> :-)
>
> On Wed, Mar 22, 2017 at 5:35 AM, Amit Sela  wrote:
>
>> I'm still wondering how we'll do this - it's not just different
>> implementations of the same Class, but a completely different concepts such
>> as using SparkSession in Spark 2 instead of SparkContext/StreamingContext
>> in Spark 1.
>>
>> On Tue, Mar 21, 2017 at 7:25 PM Ted Yu  wrote:
>>
>> > I have done some work over in HBASE-16179 where compatibility modules are
>> > created to isolate changes in Spark 2.x API so that code in hbase-spark
>> > module can be reused.
>> >
>> > FYI
>> >
>>


Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-22 Thread Jins George


Thanks Aljoscha for the clarification. Savepoints works fine in case of 
controlled stop and restart. In case of a failure( say the entire job 
failed due node crash or application software bug) is there a way to 
resume from the checkpoint on restarting the application ? Checkpoint 
location is configured with HDFS.


Thanks,
Jins George

On 03/22/2017 03:23 AM, Aljoscha Krettek wrote:

As I mentioned before, when running a Flink Job and simply cancelling it all 
state about that job is discarded (with some exceptions, such as externalised 
checkpoints). If you want the state of a Job to survive a cancellation you have 
to perform a savepoint [1] and then when restarting the Job you have to specify 
a savepoint from which you want to restore.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html


On 22 Mar 2017, at 01:43, Raghu Angadi  wrote:

Expanding a bit more on what Dan wrote:

   - In Dataflow, there are two modes of restarting a job : regular stop
   and then start & an *update*. The checkpoint is carried over only in the
   case of update.
   - Update is the only to keep 'exactly-once' semantics.
   - If the requirements are not very strict, you can enable offset commits
   in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
   start reading from approximately where it left off in the previous run.
  - When a offset commits are enabled, KafkaIO could this by
  implementing 'finalize()' API on KafkaCheckpointMark [1].
  - This is runner independent.
  - The compromise is that this might skip a few records or read a few
  old records when the pipeline is restarted.
  - This does not override 'resume from checkpoint' support when runner
  provides KafkaCheckpointMark. Externally committed offsets are used only
  when KafkaIO's own CheckpointMark is not available.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50

On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin  wrote:


[We should keep user list involved if that's where the discussion
originally was :)]

Jins George's original question was a good one. The right way to resume
from the previous offset here is what we're already doing – use the
KafkaCheckpointMark. In Beam, the runner maintains the state and not the
external system. Beam runners are responsible for maintaining the
checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
a user disables checkpointing, then they are explicitly opting into "redo
all work" on restart.

--> If checkpointing is enabled but the KafkaCheckpointMark is not being
provided, then I'm inclined to agree with Amit that there may simply be a
bug in the FlinkRunner. (+aljoscha)

For what Mingmin Xu asked about: presumably if the Kafka source is
initially configured to "read from latest offset", when it restarts with no
checkpoint this will automatically go find the latest offset. That would
mimic at-most-once semantics in a buggy runner that did not provide
checkpointing.

Dan

On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu  wrote:


In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
Can it restore during job restart? --Not test the runner in streaming for
some time.

Regarding to data-completeness, I would use at-most-once when few data
missing(mostly tasknode failure) is tolerated, compared to the performance
cost introduced by 'state'/'checkpoint'.

On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela  wrote:


On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu  wrote:


Move discuss to dev-list

Savepoint in Flink, also checkpoint in Spark, should be good enough to
handle this case.

When people don't enable these features, for example only need

at-most-once
The Spark runner forces checkpointing on any streaming (Beam)

application,

mostly because it uses mapWithState for reading from UnboundedSource and
updateStateByKey form GroupByKey - so by design, Spark runner is
at-least-once. Generally, I always thought that applications that

require

at-most-once are more focused on processing time only, as they only care
about whatever get's ingested into the pipeline at a specific time and
don't care (up to the point of losing data) about correctness.
I would be happy to hear more about your use case.


semantic, each unbounded IO should try its best to restore from last
offset, although CheckpointMark is null. Any ideas?

Mingmin

On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin 

wrote:

hey,

The native Beam UnboundedSource API supports resuming from

checkpoint

--

that specifically happens here
<

https://github.com/apache/beam/blob/master/sdks/java/io/kafk

a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>

when

the KafkaCheckpointMark is non-null.

The FlinkRunner should be providing the KafkaCheckpointMark from the

most

recent savepoint upon restore.

There shouldn't be any "special" Flink runner su

Re: [DISCUSSION] Consistent use of loggers

2017-03-22 Thread Ted Yu
+1 to what JB said. 

> On Mar 21, 2017, at 9:46 PM, Aviem Zur  wrote:
> 
> +1 to what JB said.
> 
> Will just have to be documented well as if we provide no binding there will
> be no logging out of the box unless the user adds a binding.
> 
> On Wed, Mar 22, 2017 at 6:24 AM Jean-Baptiste Onofré 
> wrote:
> 
>> Hi Aviem,
>> 
>> Good point.
>> 
>> I think, in our dependencies set, we should just depend to slf4j-api and
>> let the
>> user provides the binding he wants (slf4j-log4j12, slf4j-simple, whatever).
>> 
>> We define a binding only with test scope in our modules.
>> 
>> Regards
>> JB
>> 
>>> On 03/22/2017 04:58 AM, Aviem Zur wrote:
>>> Hi all,
>>> 
>>> There have been a few reports lately (On JIRA [1] and on Slack) from
>> users
>>> regarding inconsistent loggers used across Beam's modules.
>>> 
>>> While we use SLF4J, different modules use a different logger behind it
>>> (JUL, log4j, etc)
>>> So when people add a log4j.properties file to their classpath for
>> instance,
>>> they expect this to affect all of their dependencies on Beam modules, but
>>> it doesn’t and they miss out on some logs they thought they would see.
>>> 
>>> I think we should strive for consistency in which logger is used behind
>>> SLF4J, and try to enforce this in our modules.
>>> I for one think it should be slf4j-log4j. However, if performance of
>>> logging is critical we might want to consider logback.
>>> 
>>> Note: SLF4J will still be the facade for logging across the project. The
>>> only change would be the logger SLF4J delegates to.
>>> 
>>> Once we have something like this it would also be useful to add
>>> documentation on logging in Beam to the website.
>>> 
>>> [1] https://issues.apache.org/jira/browse/BEAM-1757
>> 
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>> 


Re: [DISCUSSION] Consistent use of loggers

2017-03-22 Thread Tibor Kiss
This is a great idea!

I believe Python-SDK's logging could also be enhanced (a bit differently):
Currently we are not instantiating the logger, just using the class what 
logging package provides.
Shortcoming of this approach is that the user cannot set the log level on a per 
module basis as all log messages
end up in the root level.

On 3/22/17, 5:46 AM, "Aviem Zur"  wrote:

+1 to what JB said.

Will just have to be documented well as if we provide no binding there will
be no logging out of the box unless the user adds a binding.

On Wed, Mar 22, 2017 at 6:24 AM Jean-Baptiste Onofré 
wrote:

> Hi Aviem,
>
> Good point.
>
> I think, in our dependencies set, we should just depend to slf4j-api and
> let the
> user provides the binding he wants (slf4j-log4j12, slf4j-simple, 
whatever).
>
> We define a binding only with test scope in our modules.
>
> Regards
> JB
>
> On 03/22/2017 04:58 AM, Aviem Zur wrote:
> > Hi all,
> >
> > There have been a few reports lately (On JIRA [1] and on Slack) from
> users
> > regarding inconsistent loggers used across Beam's modules.
> >
> > While we use SLF4J, different modules use a different logger behind it
> > (JUL, log4j, etc)
> > So when people add a log4j.properties file to their classpath for
> instance,
> > they expect this to affect all of their dependencies on Beam modules, 
but
> > it doesn’t and they miss out on some logs they thought they would see.
> >
> > I think we should strive for consistency in which logger is used behind
> > SLF4J, and try to enforce this in our modules.
> > I for one think it should be slf4j-log4j. However, if performance of
> > logging is critical we might want to consider logback.
> >
> > Note: SLF4J will still be the facade for logging across the project. The
> > only change would be the logger SLF4J delegates to.
> >
> > Once we have something like this it would also be useful to add
> > documentation on logging in Beam to the website.
> >
> > [1] https://issues.apache.org/jira/browse/BEAM-1757
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>




Re: Beam spark 2.x runner status

2017-03-22 Thread Ted Yu
hbase-spark module doesn't use SparkSession. So situation there is simpler
:-)

On Wed, Mar 22, 2017 at 5:35 AM, Amit Sela  wrote:

> I'm still wondering how we'll do this - it's not just different
> implementations of the same Class, but a completely different concepts such
> as using SparkSession in Spark 2 instead of SparkContext/StreamingContext
> in Spark 1.
>
> On Tue, Mar 21, 2017 at 7:25 PM Ted Yu  wrote:
>
> > I have done some work over in HBASE-16179 where compatibility modules are
> > created to isolate changes in Spark 2.x API so that code in hbase-spark
> > module can be reused.
> >
> > FYI
> >
>


Re: Beam spark 2.x runner status

2017-03-22 Thread Amit Sela
I'm still wondering how we'll do this - it's not just different
implementations of the same Class, but a completely different concepts such
as using SparkSession in Spark 2 instead of SparkContext/StreamingContext
in Spark 1.

On Tue, Mar 21, 2017 at 7:25 PM Ted Yu  wrote:

> I have done some work over in HBASE-16179 where compatibility modules are
> created to isolate changes in Spark 2.x API so that code in hbase-spark
> module can be reused.
>
> FYI
>


Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-22 Thread Amit Sela
On Tue, Mar 21, 2017 at 11:59 PM Mingmin Xu  wrote:

> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
> Can it restore during job restart? --Not test the runner in streaming for
> some time.
>
 TmpCheckpointDirFactory simply points to a "default" directory. Spark
runner, just like Spark, requires you specify a path to a resilient FS (gs,
s3, hdfs) to save the checkpoint. Starting the same application with an
existing checkpoint would pick-up where you left.
If you're using a "non-resilient" FS (such as your local FS) both Spark and
the Spark runner would WARN - it's OK to do this, but you should take into
account that if the checkpoint is lost, so is your ability to resume.

>
> Regarding to data-completeness, I would use at-most-once when few data
> missing(mostly tasknode failure) is tolerated, compared to the performance
> cost introduced by 'state'/'checkpoint'.
>
> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela  wrote:
>
> > On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu  wrote:
> >
> > > Move discuss to dev-list
> > >
> > > Savepoint in Flink, also checkpoint in Spark, should be good enough to
> > > handle this case.
> > >
> > > When people don't enable these features, for example only need
> > at-most-once
> > >
> > The Spark runner forces checkpointing on any streaming (Beam)
> application,
> > mostly because it uses mapWithState for reading from UnboundedSource and
> > updateStateByKey form GroupByKey - so by design, Spark runner is
> > at-least-once. Generally, I always thought that applications that require
> > at-most-once are more focused on processing time only, as they only care
> > about whatever get's ingested into the pipeline at a specific time and
> > don't care (up to the point of losing data) about correctness.
> > I would be happy to hear more about your use case.
> >
> > > semantic, each unbounded IO should try its best to restore from last
> > > offset, although CheckpointMark is null. Any ideas?
> > >
> > > Mingmin
> > >
> > > On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin 
> > wrote:
> > >
> > > > hey,
> > > >
> > > > The native Beam UnboundedSource API supports resuming from checkpoint
> > --
> > > > that specifically happens here
> > > > <
> > > https://github.com/apache/beam/blob/master/sdks/java/io/kafk
> > a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
> > > when
> > > > the KafkaCheckpointMark is non-null.
> > > >
> > > > The FlinkRunner should be providing the KafkaCheckpointMark from the
> > most
> > > > recent savepoint upon restore.
> > > >
> > > > There shouldn't be any "special" Flink runner support needed, nor is
> > the
> > > > State API involved.
> > > >
> > > > Dan
> > > >
> > > > On Tue, Mar 21, 2017 at 9:01 AM, Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > >> Would not it be Flink runner specific ?
> > > >>
> > > >> Maybe the State API could do the same in a runner agnostic way (just
> > > >> thinking loud) ?
> > > >>
> > > >> Regards
> > > >> JB
> > > >>
> > > >> On 03/21/2017 04:56 PM, Mingmin Xu wrote:
> > > >>
> > > >>> From KafkaIO itself, looks like it either start_from_beginning or
> > > >>> start_from_latest. It's designed to leverage
> > > >>> `UnboundedSource.CheckpointMark`
> > > >>> during initialization, but so far I don't see it's provided by
> > runners.
> > > >>> At the
> > > >>> moment Flink savepoints is a good option, created a JIRA(BEAM-1775
> > > >>> )  to handle it
> in
> > > >>> KafkaIO.
> > > >>>
> > > >>> Mingmin
> > > >>>
> > > >>> On Tue, Mar 21, 2017 at 3:40 AM, Aljoscha Krettek <
> > aljos...@apache.org
> > > >>> > wrote:
> > > >>>
> > > >>> Hi,
> > > >>> Are you using Flink savepoints [1] when restoring your
> > application?
> > > >>> If you
> > > >>> use this the Kafka offset should be stored in state and it
> should
> > > >>> restart
> > > >>> from the correct position.
> > > >>>
> > > >>> Best,
> > > >>> Aljoscha
> > > >>>
> > > >>> [1]
> > > >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> > > >>> setup/savepoints.html
> > > >>>  > > >>> /setup/savepoints.html>
> > > >>> > On 21 Mar 2017, at 01:50, Jins George  > > >>> > wrote:
> > > >>> >
> > > >>> > Hello,
> > > >>> >
> > > >>> > I am writing a Beam pipeline(streaming) with Flink runner to
> > > >>> consume data
> > > >>> from Kafka and apply some transformations and persist to Hbase.
> > > >>> >
> > > >>> > If I restart the application ( due to failure/manual
> restart),
> > > >>> consumer
> > > >>> does not resume from the offset where it was prior to restart.
> It
> > > >>> always
> > > >>> resume from the latest offset.
> > > >>> >
> > > >>> > If I enable Flink checkpionting with hdfs state back-end,
> > system
> > > >>> appears
> > > >

Re: First IO IT Running!

2017-03-22 Thread Aljoscha Krettek
Great news! I can’t wait to also have support for this for the Flink Runner. 
Which is partially blocked by me or others working on the Flink Runner, I 
guess… :-(
> On 22 Mar 2017, at 05:15, Jean-Baptiste Onofré  wrote:
> 
> Awesome !!! Great news !
> 
> Thanks guys for that !
> 
> I started to implement IT in JMS, MQTT, Redis, Cassandra IOs. I keep you 
> posted.
> 
> Regards
> JB
> 
> On 03/21/2017 11:01 PM, Stephen Sisk wrote:
>> I'm really excited to see these tests are running!
>> 
>> These Jdbc tests are testing against a postgres instance - that instance is
>> running on the kubernetes cluster I've set up for beam IO ITs as discussed
>> in the "Hosting data stores for IO transform testing" thread[0]. I set up
>> that postgres instance using the kubernetes scripts for Jdbc[1]. Anyone can
>> run their own kubernetes cluster and do the same thing for themselves to
>> run the ITs. (I'd actually to love to hear about that if anyone does it.)
>> 
>> I'm excited to get a few more ITs using this infrastructure so we can test
>> it out/smooth out the remaining rough edges in creating ITs. I'm happy to
>> answer questions about that on the mailing list, but we obviously have to
>> have the process written down - the Testing IO Transforms in Apache Beam
>> doc [2] covers how to do this, but is still rough. I'm working on getting
>> that up on the website and ironing out the rough edges [3], but generally
>> reading that doc plus checking out how the JdbcIO or ElasticsearchIO tests
>> work should give you a sense of how to get it working. I'm also thinking we
>> might want to simplify the way we do data loading, so I don't consider this
>> process fully stabilized, but I'll port code written according to the
>> current standards to the new standards if we make changes.
>> 
>> ElasticsearchIO has all the prerequisites, so I'd like to get them going in
>> the near future. I know JB has started on this in his RedisIO PR, and the
>> HadoopInputFormatIO also has ITs & k8 scripts, so there's more in the pipe.
>> For now, each datastore has to be manually set up, but I'd like to automate
>> that process - I'll file a JIRA ticket shortly for that.
>> 
>> Thanks,
>> Stephen
>> [0] Hosting data stores for IO transform testing -
>> https://lists.apache.org/thread.html/9fd3c51cb679706efa4d0df2111a6ac438b851818b639aba644607af@%3Cdev.beam.apache.org%3E
>> [1] Postgres k8 scripts -
>> https://github.com/apache/beam/tree/master/sdks/java/io/jdbc/src/test/resources/kubernetes
>> [2] IO testing guide -
>> https://docs.google.com/document/d/153J9jPQhMCNi_eBzJfhAg-NprQ7vbf1jNVRgdqeEE8I/edit?usp=sharing
>> [3] Jira for IO guide - https://issues.apache.org/jira/browse/BEAM-1025
>> 
>> On Tue, Mar 21, 2017 at 2:28 PM Jason Kuster 
>> wrote:
>> 
>>> Hi all,
>>> 
>>> Exciting news! As of yesterday, we have checked in the Jenkins
>>> configuration for our first continuously running IO Integration Test! You
>>> can check it out in Jenkins here[1]. We’re also publishing results to a
>>> database, and we’ve turned up a basic dashboarding system where you can see
>>> the results here[2]. Caveat: there are only two runs, and we’ll be tweaking
>>> the underlying system still, so don’t panic that we’re up and to the right
>>> currently. ;)
>>> 
>>> This is the first test running continuously on top of the performance / IO
>>> testing infrastructure described in this doc[3].  Initial support for Beam
>>> is now present in PerfKit Benchmarker; given what they had already, it was
>>> easiest to add support for Dataflow and Java. We need your help to add
>>> additional support! The doc lists a number of JIRA issues to build out
>>> support for other systems. I’m happy to work with people to help them
>>> understand what is necessary for these tasks; just send an email to the
>>> list if you need help and I’ll help you move forwards.
>>> 
>>> Looking forward to it!
>>> 
>>> Jason
>>> 
>>> [1] https://builds.apache.org/job/beam_PerformanceTests_JDBC/
>>> [2]
>>> https://apache-beam-testing.appspot.com/explore?dashboard=5714163003293696
>>> [3]
>>> 
>>> https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73
>>> 
>>> --
>>> ---
>>> Jason Kuster
>>> Apache Beam / Google Cloud Dataflow
>>> 
>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com



Re: Kafka Offset handling for Restart/failure scenarios.

2017-03-22 Thread Aljoscha Krettek
As I mentioned before, when running a Flink Job and simply cancelling it all 
state about that job is discarded (with some exceptions, such as externalised 
checkpoints). If you want the state of a Job to survive a cancellation you have 
to perform a savepoint [1] and then when restarting the Job you have to specify 
a savepoint from which you want to restore.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html

> On 22 Mar 2017, at 01:43, Raghu Angadi  wrote:
> 
> Expanding a bit more on what Dan wrote:
> 
>   - In Dataflow, there are two modes of restarting a job : regular stop
>   and then start & an *update*. The checkpoint is carried over only in the
>   case of update.
>   - Update is the only to keep 'exactly-once' semantics.
>   - If the requirements are not very strict, you can enable offset commits
>   in Kafka itself. KafkaIO lets you configure this. Here the pipeline would
>   start reading from approximately where it left off in the previous run.
>  - When a offset commits are enabled, KafkaIO could this by
>  implementing 'finalize()' API on KafkaCheckpointMark [1].
>  - This is runner independent.
>  - The compromise is that this might skip a few records or read a few
>  old records when the pipeline is restarted.
>  - This does not override 'resume from checkpoint' support when runner
>  provides KafkaCheckpointMark. Externally committed offsets are used only
>  when KafkaIO's own CheckpointMark is not available.
> 
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java#L50
> 
> On Tue, Mar 21, 2017 at 5:28 PM, Dan Halperin  wrote:
> 
>> [We should keep user list involved if that's where the discussion
>> originally was :)]
>> 
>> Jins George's original question was a good one. The right way to resume
>> from the previous offset here is what we're already doing – use the
>> KafkaCheckpointMark. In Beam, the runner maintains the state and not the
>> external system. Beam runners are responsible for maintaining the
>> checkpoint marks, and for redoing all uncommitted (uncheckpointed) work. If
>> a user disables checkpointing, then they are explicitly opting into "redo
>> all work" on restart.
>> 
>> --> If checkpointing is enabled but the KafkaCheckpointMark is not being
>> provided, then I'm inclined to agree with Amit that there may simply be a
>> bug in the FlinkRunner. (+aljoscha)
>> 
>> For what Mingmin Xu asked about: presumably if the Kafka source is
>> initially configured to "read from latest offset", when it restarts with no
>> checkpoint this will automatically go find the latest offset. That would
>> mimic at-most-once semantics in a buggy runner that did not provide
>> checkpointing.
>> 
>> Dan
>> 
>> On Tue, Mar 21, 2017 at 2:59 PM, Mingmin Xu  wrote:
>> 
>>> In SparkRunner, the default checkpoint storage is TmpCheckpointDirFactory.
>>> Can it restore during job restart? --Not test the runner in streaming for
>>> some time.
>>> 
>>> Regarding to data-completeness, I would use at-most-once when few data
>>> missing(mostly tasknode failure) is tolerated, compared to the performance
>>> cost introduced by 'state'/'checkpoint'.
>>> 
>>> On Tue, Mar 21, 2017 at 1:36 PM, Amit Sela  wrote:
>>> 
 On Tue, Mar 21, 2017 at 7:26 PM Mingmin Xu  wrote:
 
> Move discuss to dev-list
> 
> Savepoint in Flink, also checkpoint in Spark, should be good enough to
> handle this case.
> 
> When people don't enable these features, for example only need
 at-most-once
> 
 The Spark runner forces checkpointing on any streaming (Beam)
>>> application,
 mostly because it uses mapWithState for reading from UnboundedSource and
 updateStateByKey form GroupByKey - so by design, Spark runner is
 at-least-once. Generally, I always thought that applications that
>>> require
 at-most-once are more focused on processing time only, as they only care
 about whatever get's ingested into the pipeline at a specific time and
 don't care (up to the point of losing data) about correctness.
 I would be happy to hear more about your use case.
 
> semantic, each unbounded IO should try its best to restore from last
> offset, although CheckpointMark is null. Any ideas?
> 
> Mingmin
> 
> On Tue, Mar 21, 2017 at 9:39 AM, Dan Halperin 
 wrote:
> 
>> hey,
>> 
>> The native Beam UnboundedSource API supports resuming from
>>> checkpoint
 --
>> that specifically happens here
>> <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafk
 a/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L674>
> when
>> the KafkaCheckpointMark is non-null.
>> 
>> The FlinkRunner should be providing the KafkaCheckpointMark from the
 most
>> recent savepoint upon restore.
>> 
>> There shouldn't be any "special" Flink runner