[jira] [Created] (FLINK-12024) Bump universal Kafka connector to Kafka dependency to 2.2.0

2019-03-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-12024:
--

 Summary: Bump universal Kafka connector to Kafka dependency to 
2.2.0
 Key: FLINK-12024
 URL: https://issues.apache.org/jira/browse/FLINK-12024
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.7.2
Reporter: Elias Levy


Update the Kafka client dependency to version 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11794) Allow compression of row format files created by StreamingFileSink

2019-03-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11794:
--

 Summary: Allow compression of row format files created by 
StreamingFileSink
 Key: FLINK-11794
 URL: https://issues.apache.org/jira/browse/FLINK-11794
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.7.2
Reporter: Elias Levy


Currently, there is no mechanism to compress files created using a 
StreamingFileSink.  This is highly desirable when output is a text based row 
format such as JSON.

Possible alternatives are the introduction of a callback that gets passed the 
local file before it is uploaded to the DFS, so that it could be compressed; or 
a factory method could be used that returns an OutputStream, such as 
GZIPOutputStream, that compresses a passed in output stream that could be then 
used by the Encoder.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11520) Triggers should be provided the window state

2019-02-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11520:
--

 Summary: Triggers should be provided the window state
 Key: FLINK-11520
 URL: https://issues.apache.org/jira/browse/FLINK-11520
 Project: Flink
  Issue Type: Improvement
Reporter: Elias Levy


Some triggers may require access to the window state to perform their job.  
Consider a window computing a count using an aggregate function.  It may be 
desired to fire the window when the count is 1 and then at the end of the 
window.  The early firing can provide feedback to external systems that a key 
has been observed, while waiting for the final count.

The same problem can be observed in 
org.apache.flink.streaming.api.windowing.triggers.CountTrigger, which must 
maintain an internal count instead of being able to make use of the window 
state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11517) Inefficient window state access when using RocksDB state backend

2019-02-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11517:
--

 Summary: Inefficient window state access when using RocksDB state 
backend
 Key: FLINK-11517
 URL: https://issues.apache.org/jira/browse/FLINK-11517
 Project: Flink
  Issue Type: Bug
Reporter: Elias Levy


When using an aggregate function on a window with a process function and the 
RocksDB state backend, state access is inefficient.

The WindowOperator calls windowState.add to merge the new element using the 
aggregate function.  The add method of RocksDBAggregatingState will read the 
state, deserialize the state, call the aggregate function, deserialize the 
state, and write it out.

If the trigger decides the window must be fired, as the the windowState.add 
does not return the state, the WindowOperator must call windowState.get to get 
it and pass it to the window process function, resulting in another read and 
deserialization.

Finally, while the state is not passed in to the trigger, in some cases the 
trigger may have a need to access the state.  That is our case.  As the state 
is not passed to the trigger, we must read and deserialize the state one more 
from within the trigger.

Thus, state must be read and deserialized three times to process a single 
element.  If the state is large, this can be quite costly.

 

Ideally  windowState.add would return the state, so that the WindowOperator can 
pass it to the process function without having to read it again.  Additionally, 
the state would be made available to the trigger to enable more use cases 
without having to go through the state descriptor again.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10617:
--

 Summary: Restoring job fails because of slot allocation timeout
 Key: FLINK-10617
 URL: https://issues.apache.org/jira/browse/FLINK-10617
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager, TaskManager
Affects Versions: 1.6.1
Reporter: Elias Levy


The following may be related to FLINK-9932, but I am unsure.  If you believe it 
is, go ahead and close this issue and a duplicate.

While trying to test local state recovery on a job with large state, the job 
failed to be restored because slot allocation timed out.

The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
per node).  The job has parallelism of 96, so it consumes all of the slots, and 
has ~200 GB of state in RocksDB.  

To test local state recovery I decided to kill one of the TMs.  The TM 
immediately restarted and re-registered with the JM.  I confirmed the JM showed 
96 registered task slots.
{noformat}
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Resolved ResourceManager address, beginning registration
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Registration at ResourceManager attempt 1 (timeout=100ms)
21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Successful registration at resource manager 
akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 
302988dea6afbd613bb2f96429b65d18.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,668 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Registration at JobManager attempt 1 (timeout=100ms)
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,688 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,688 INFO  

[jira] [Created] (FLINK-10520) Job save points REST API fails unless parameters are specified

2018-10-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10520:
--

 Summary: Job save points REST API fails unless parameters are 
specified
 Key: FLINK-10520
 URL: https://issues.apache.org/jira/browse/FLINK-10520
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.6.1
Reporter: Elias Levy


The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
unless the request includes a body with all parameters ({{target-directory}} 
and {{cancel-job}})), even thought the 
[documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
 suggests these are optional.

If a POST request with no data is made, the response is a 400 status code with 
the error message "Bad request received."

If the POST request submits an empty JSON object ( {} ), the response is a 400 
status code with the error message "Request did not match expected format 
SavepointTriggerRequestBody."  The same is true if only the 
{{target-directory}} or {{cancel-job}} parameters are included.

As the system is configured with a default savepoint location, there shouldn't 
be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-10-04 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10493:
--

 Summary: Macro generated CaseClassSerializer considered harmful
 Key: FLINK-10493
 URL: https://issues.apache.org/jira/browse/FLINK-10493
 Project: Flink
  Issue Type: Bug
  Components: Scala API, State Backends, Checkpointing, Type 
Serialization System
Affects Versions: 1.5.4, 1.6.1, 1.6.0, 1.5.3, 1.5.2, 1.5.1, 1.4.2, 1.4.1, 
1.4.0
Reporter: Elias Levy


The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
and {{TypeSerializer}} objects for types.  In the case of Scala tuple and case 
classes, the macro generates an [anonymous {{CaseClassSerializer}} 
class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
  

The Scala compiler will generate a name for the anonymous class that depends on 
the relative position in the code of the macro invocation to other anonymous 
classes.  If the code is changed such that the anonymous class relative 
position changes, even if the overall logic of the code or the type in question 
do not change, the name of the serializer class will change.

That will result in errors, such as the one below, if the job is restored from 
a savepoint, as the serializer to read the data in the savepoint will no longer 
be found, as its name will have changed.

At the very least, there should be a prominent warning in the documentation 
about this issue.  Minor code changes can result in jobs that can't restore 
previous state.  Ideally, the use of anonymous classes should be deprecated if 
possible.

{noformat}
WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - 
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException: 
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at 

[jira] [Created] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled

2018-10-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10483:
--

 Summary: Can't restore from a savepoint even with Allow Non 
Restored State enabled
 Key: FLINK-10483
 URL: https://issues.apache.org/jira/browse/FLINK-10483
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Type Serialization System
Affects Versions: 1.4.2
Reporter: Elias Levy


A trimmed streaming job fails a restore from a savepoint with an Unloadable 
class for type serializer error, even though the case class in question has 
been eliminated from the job and Allow Non Restored State is enabled.

We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, 
one of the streams is processed by an async function, and the output of the 
async function and the other original stream are consumed by a 
CoProcessOperator, that intern emits Scala case class instances, that go into a 
stateful ProcessFunction filter, and then into a sink.  I.e.

{code:java}
source 1 -> async function --\
   |---> co process --> process --> 
sink
source 2 --/
{code}

I eliminated most of the DAG, leaving only the source 1 --> async function 
portion of it.  This removed the case class in question from the processing 
graph.  When I try to restore from the savepoint, even if Allow Non Restored 
State is selected, the job fails to restore with the error "Deserialization of 
serializer erroed".

This is the error being generated:


{noformat}
WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - 
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException: 
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at 

[jira] [Created] (FLINK-10460) DataDog reporter JsonMappingException

2018-09-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10460:
--

 Summary: DataDog reporter JsonMappingException
 Key: FLINK-10460
 URL: https://issues.apache.org/jira/browse/FLINK-10460
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Elias Levy


Observed the following error in the TM logs this morning:


{code:java}
WARN  org.apache.flink.metrics.datadog.DatadogHttpReporter  - Failed 
reporting metrics to Datadog.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 (was java.util.ConcurrentModificationException) (through reference chain: 
org.apache.flink.metrics.datadog.DSeries["series"]->
java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"])
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998)
   at 
org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90)
   at 
org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79)
   at 
org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143)
  at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
   at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
   at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown
 Source)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
   at java.lang.Thread.run(Unknown Source)
 Caused by: java.util.ConcurrentModificationException
   at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
   at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
   at java.util.AbstractCollection.addAll(Unknown Source)
   at java.util.HashSet.(Unknown Source)
   at 
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
   at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
   at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
   at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
   at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
  at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
  at org.apache.flink.metrics.datadog.DGauge.getMetricValue(DGauge.java:42)
  at 

[jira] [Created] (FLINK-10390) DataDog metric reporter leak warning

2018-09-21 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10390:
--

 Summary: DataDog metric reporter leak warning
 Key: FLINK-10390
 URL: https://issues.apache.org/jira/browse/FLINK-10390
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.6.1
Reporter: Elias Levy


After upgrading to 1.6.1 from 1.4.2 we starting observing in the log warnings 
associated with the DataDog metrics reporter:
{quote}Sep 21, 2018 9:43:20 PM 
org.apache.flink.shaded.okhttp3.internal.platform.Platform log WARNING: A 
connection to https://app.datadoghq.com/ was leaked. Did you forget to close a 
response body? To see where this was allocated, set the OkHttpClient logger 
level to FINE: 
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);
{quote}
The metric reporter's okhttp dependency version (3.7.0) has not changed, so 
that does not appear to be the source of the warning.

I believe the issue is the changed made in 
[FLINK-8553|https://github.com/apache/flink/commit/ae3d547afe7ec44d37b38222a3ea40d9181e#diff-fc396ba6772815fc05efc1310760cd4b].
  The HTTP calls were made async.  The previous code called 
{{client.newCall(r).execute().close()}}.  The new call does nothing in the 
callback, even thought the [Callback.onResponse 
documentation|https://square.github.io/okhttp/3.x/okhttp/okhttp3/Callback.html#onResponse-okhttp3.Call-okhttp3.Response-]
 states:

bq. Called when the HTTP response was successfully returned by the remote 
server. The callback may proceed to read the response body with Response.body. 
The response is still live until its response body is closed. 



 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10372) There is no API to configure the timer state backend

2018-09-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10372:
--

 Summary: There is no API to configure the timer state backend
 Key: FLINK-10372
 URL: https://issues.apache.org/jira/browse/FLINK-10372
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Elias Levy


Flink 1.6.0, via FLINK-9485, introduced the option to store timers in RocksDB 
instead of the heap.  Alas, this can only be configured via the 
{{state.backend.rocksdb.timer-service.factory}} config file option.  That means 
that the choice of state backend to use for timer can't be made on a per job 
basis on a shared cluster.

There is a need for an API in {{RocksDBStateBackend}} to configure the backend 
per job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10118) Queryable state MapState entry query

2018-08-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10118:
--

 Summary: Queryable state MapState entry query
 Key: FLINK-10118
 URL: https://issues.apache.org/jira/browse/FLINK-10118
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Affects Versions: 1.6.0
Reporter: Elias Levy


Queryable state allows querying of keyed MapState, but such a query returns all 
MapState entries for the given key.  In some cases, such MapState many include 
substantial number of entries (in the millions), while the user may only be 
interested in one entry.

I propose we allow queries for MapState to provide one or more map entry keys, 
in addition to the state key, and to only return entries for the given map keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10117) REST API for Queryable State

2018-08-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10117:
--

 Summary: REST API for Queryable State
 Key: FLINK-10117
 URL: https://issues.apache.org/jira/browse/FLINK-10117
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State, REST
Affects Versions: 1.6.0
Reporter: Elias Levy


At the moment, queryable state requires a JVM based client that can make use of 
the Java queryable state client API in flink-queryable-state-client artifact.  
In addition, the client requires a state descriptor matching the queried state, 
which tightly couples the Flink job and query state clients.

I propose that queryable state become accessible via a REST API.  FLINK-7040 
mentions this possibility, but does not specify work towards that goal.

I suggest that to enable queryable state over REST, users define JSON 
serializers via the state descriptors.  

This would allow queryable state clients to be developed in any language, not 
require them to use a Flink client library, and permit them to be loosely 
coupled with the job, as they could generically parse the returned JSON.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10098) Programmatically select timer storage backend

2018-08-07 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10098:
--

 Summary: Programmatically select timer storage backend
 Key: FLINK-10098
 URL: https://issues.apache.org/jira/browse/FLINK-10098
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing, Streaming, TaskManager
Affects Versions: 1.6.0, 1.7.0
Reporter: Elias Levy


FLINK-9486 introduced timer storage on the RocksDB storage backend.  Right now 
it is only possible to configure RocksDB as the storage for timers by settings 
the {{state.backend.rocksdb.timer-service.factory}} value in the configuration 
file for Flink.

As the state storage backend can be programmatically selected by by jobs via  
{{env.setStateBackend(...)}}, the timer backend should also be configurable 
programmatically.

Different jobs should be able to store their timers in different storage 
backends.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10037:
--

 Summary: Document details event time behavior in a single location
 Key: FLINK-10037
 URL: https://issues.apache.org/jira/browse/FLINK-10037
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.5.2
Reporter: Elias Levy
Assignee: Elias Levy


A description of event time and watermarks, how they generated, assigned, and 
handled, is spread across many pages in the documentation.  I would be useful 
to have it all in a single place and includes missing information, such as how 
Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10011:
--

 Summary: Old job resurrected during HA failover
 Key: FLINK-10011
 URL: https://issues.apache.org/jira/browse/FLINK-10011
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.2
Reporter: Elias Levy


For the second time we've observed Flink resurrect an old job during JobManager 
high-availability fail over.
h4. Configuration
 * AWS environment
 * Flink 1.4.2 standalong cluster in HA mode
 * 2 JMs, 3 TMs
 * 3 node ZK ensemble
 * 1 job consuming to/from Kafka
 * Checkpoints in S3 using the Presto file system adaptor

h4. Timeline 
 * 15:18:10 JM 2 completes checkpoint 69256.
 * 15:19:10 JM 2 completes checkpoint 69257.
 * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
SocketTimeoutException
 * 15:19:57 ZK 1 closes connection to JM 2 (leader)
 * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
 * 15:19:57 JM 2 reports it can't read data from ZK
 ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
likely server has closed socket, closing socket connection and attempting 
reconnect)}}
 ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
 * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
 ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.}}
 ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
are not monitored (temporarily).}}
 ** {{Connection to ZooKeeper suspended. The contender 
akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the 
leader election}}{{ }}
 ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.}}
 * 15:19:57 JM 2 gives up leadership
 ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}

 * 15:19:57 JM 2 changes job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
 ** {{Stopping checkpoint coordinator for job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}

 * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
because there is no leader
 ** {{Discard message 
LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
 TaskManager akka://flink/user/taskmanager is disassociating)) because there is 
currently no valid leader id known.}}

 * 15:19:57 JM 2 connects to ZK 2 and renews its session
 ** {{Opening socket connection to server 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
 ** {{Socket connection established to 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
 ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.}}
 ** {{Session establishment complete on server 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
0x3003f4a0003, negotiated timeout = 4}}
 ** {{Connection to ZooKeeper was reconnected. Leader election can be 
restarted.}}
 ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are 
monitored again.}}
 ** {{State change: RECONNECTED}}

 * 15:19:57: JM 1 reports JM 1 has been granted leadership:
 ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}

 * 15:19:57 JM 2 reports the job has been suspended
 ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting 
down.}}
 ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}

 * 15:19:57 JM 2 reports it has lost leadership:
 ** {{Associated JobManager 
Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
 ** {{Received leader address but not running in leader ActorSystem. Cancelling 
registration.}}

 * 15:19:57 TMs register with JM 1

 * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
 ** {{Attempting to recover all jobs.}}
 ** {{There are 2 jobs to recover. Starting the job recovery.}}
 ** {{Attempting to recover job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
 ** {{Attempting to recover job 
{color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}

 * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
 ** {{Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 
reqpath:n/a Error 
Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 
Error:KeeperErrorCode = NodeExists for 
/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
 ** {{Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 
reqpath:n/a Error 

[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9731:
-

 Summary: Kafka source subtask begins to consume from earliest 
offset
 Key: FLINK-9731
 URL: https://issues.apache.org/jira/browse/FLINK-9731
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Elias Levy


On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs show no 
indication of anything amiss. There were no errors in the or Kafka related 
messages in the Flink logs.

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this 

[jira] [Created] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-06-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9682:
-

 Summary: Add setDescription to execution environment and display 
it in the UI
 Key: FLINK-9682
 URL: https://issues.apache.org/jira/browse/FLINK-9682
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Webfrontend
Affects Versions: 1.5.0
Reporter: Elias Levy


Currently you can provide a job name to {{execute}} in the execution 
environment.  In an environment where many version of a job may be executing, 
such as a development or test environment, identifying which running job is of 
a specific version via the UI can be difficult unless the version is embedded 
into the job name given the {{execute}}.  But the job name is uses for other 
purposes, such as for namespacing metrics.  Thus, it is not ideal to modify the 
job name, as that could require modifying metric dashboards and monitors each 
time versions change.

I propose a new method be added to the execution environment, 
{{setDescription}}, that would allow a user to pass in an arbitrary description 
that would be displayed in the dashboard, allowing users to distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function

2018-06-15 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9600:
-

 Summary: Add DataStream transformation variants that pass 
timestamp to the user function
 Key: FLINK-9600
 URL: https://issues.apache.org/jira/browse/FLINK-9600
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Elias Levy


It is often necessary to access the timestamp assigned to records within user 
functions.  At the moment this is only possible from {{RichFunction}}. 
Implementing a {{RichFunction}} just to access the timestamp is burdensome, so 
most job carry a duplicate of the timestamp within the record.

It would be useful if {{DataStream}} provided transformation methods that 
accepted user functions that could be passed the record's timestamp as an 
additional argument, similar to how there are two variants of {{flatMap}}, one 
with an extra parameter that gives the user function access to the output 
{{Collector}}.

Along similar lines, it may be useful to have variants that pass the record's 
key as an additional parameter.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9495) Implement ResourceManager for Kubernetes

2018-06-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9495:
-

 Summary: Implement ResourceManager for Kubernetes
 Key: FLINK-9495
 URL: https://issues.apache.org/jira/browse/FLINK-9495
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Affects Versions: 1.5.0
Reporter: Elias Levy


I noticed there is no issue for developing a Kubernetes specific 
ResourceManager under FLIP-6, so I am creating this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9450) Job hangs if S3 access it denied during checkpoints

2018-05-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9450:
-

 Summary: Job hangs if S3 access it denied during checkpoints
 Key: FLINK-9450
 URL: https://issues.apache.org/jira/browse/FLINK-9450
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Elias Levy


We have a streaming job that consumes from and writes to Kafka.  The job is 
configured to checkpoint to S3.  If we deny access to S3 by using iptables on 
the TM host to deny all outgoing connections to ports 80 and 443, whether using 
DROP or REJECT, and whether using REJECT with -reject-with tcp-reset or -r 
reject-with imp-port-unreachable, the job soon stops publishing to Kafka.

This happens whether or not the Kafka sources have 
{{setCommitOffsetsOnCheckpoints}} set to true or false.

The system is configured to use Presto for the S3 file system.  The job has a 
small amount of state, so it is configured to use {{FsStateBackend}} with 
asynchronous snapshots.

If the ip tables rules are removed, the job continues the function.

I would expect the job to either fail or continue running if a checkpoint fails.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9440) Allow cancelation and reset of timers

2018-05-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9440:
-

 Summary: Allow cancelation and reset of timers
 Key: FLINK-9440
 URL: https://issues.apache.org/jira/browse/FLINK-9440
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.2
Reporter: Elias Levy


Currently the {{TimerService}} allows one to register timers, but it is not 
possible to delete a timer or to reset a timer to a new value.  If one wishes 
to reset a timer, one must also handle the previous inserted timer callbacks 
and ignore them.

I would be useful if the API allowed one to remove and reset timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9403) Documentation continues to refer to removed methods

2018-05-20 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9403:
-

 Summary: Documentation continues to refer to removed methods
 Key: FLINK-9403
 URL: https://issues.apache.org/jira/browse/FLINK-9403
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Elias Levy


{{org.apache.flink.api.common.ExecutionConfig}} no longer has the 
{{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} 
methods.  They were removed in [this 
commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e].
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9272) DataDog API "counter" metric type is deprecated

2018-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9272:
-

 Summary: DataDog API "counter" metric type is deprecated 
 Key: FLINK-9272
 URL: https://issues.apache.org/jira/browse/FLINK-9272
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Elias Levy


It appears to have been replaced by the "count" metric type.

https://docs.datadoghq.com/developers/metrics/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-03-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8886:
-

 Summary: Job isolation via scheduling in shared cluster
 Key: FLINK-8886
 URL: https://issues.apache.org/jira/browse/FLINK-8886
 Project: Flink
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 1.5.0
Reporter: Elias Levy


Flink's TaskManager executes tasks from different jobs within the same JMV as 
threads.  We prefer to isolate different jobs on their on JVM.  Thus, we must 
use different TMs for different jobs.  As currently the scheduler will allocate 
task slots within a TM to tasks from different jobs, that means we must stand 
up one cluster per job.  This is wasteful, as it requires at least two 
JobManagers per cluster for high-availability, and the JMs have low utilization.

Additionally, different jobs may require different resources.  Some jobs are 
compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
the scheduler threats all TMs are equivalent, except possibly in their number 
of available task slots.  Thus, one is required to stand up multiple cluster if 
there is a need for different types of TMs.

 

It would be useful if one could specify requirements on job, such that they are 
only scheduled on a subset of TMs.  Properly configured, that would permit 
isolation of jobs in a shared cluster and scheduling of jobs with specific 
resource needs.

 

One possible implementation is to specify a set of tags on the TM config file 
which the TMs used when registering with the JM, and another set of tags 
configured within the job or supplied when submitting the job.  The scheduler 
could then match the tags in the job with the tags in the TMs.  In a 
restrictive mode the scheduler would assign a job task to a TM only if all tags 
match.  In a relaxed mode the scheduler could assign a job task to a TM if 
there is a partial match, while giving preference to a more accurate match.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8844) Export job jar file name or job version property via REST API

2018-03-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8844:
-

 Summary: Export job jar file name or job version property via REST 
API
 Key: FLINK-8844
 URL: https://issues.apache.org/jira/browse/FLINK-8844
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.3
Reporter: Elias Levy


To aid automated deployment of jobs, it would be useful if the REST API exposed 
either a running job's jar filename or a version property the job could set, 
similar to how it sets the job name.

As it is now there is no standard mechanism to determine what version of a job 
is running in a cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8752) ClassNotFoundException when using the user code class loader

2018-02-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8752:
-

 Summary: ClassNotFoundException when using the user code class 
loader
 Key: FLINK-8752
 URL: https://issues.apache.org/jira/browse/FLINK-8752
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.1
Reporter: Elias Levy


Attempting to submit a job results in the job failing while it is being started 
in the JMs with a ClassNotFoundException error: 
{code:java}
java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
{code}

If I drop the job's jar into the lib folder in the JM and configure the JM to  
classloader.resolve-order to parent-first the job starts successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8751) Canceling a job results in a InterruptedException in the JM

2018-02-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8751:
-

 Summary: Canceling a job results in a InterruptedException in the 
JM
 Key: FLINK-8751
 URL: https://issues.apache.org/jira/browse/FLINK-8751
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.1
Reporter: Elias Levy


Canceling a job results in the following exception reported by the JM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8358) Hostname used by DataDog metric reporter is not configurable

2018-01-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8358:
-

 Summary: Hostname used by DataDog metric reporter is not 
configurable
 Key: FLINK-8358
 URL: https://issues.apache.org/jira/browse/FLINK-8358
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.0
Reporter: Elias Levy


The hostname used by the DataDog metric reporter to report metrics is not 
configurable.  This can problematic if the hostname that Flink uses is 
different from the hostname used by the system's DataDog agent.  

For instance, in our environment we use Chef, and using the DataDog Chef 
Handler, certain metadata such a host roles is associated with the hostname in 
the DataDog service.  The hostname used to submit this metadata is the name we 
have given the host.  But as Flink picks up the default name given by EC2 to 
the instance, metrics submitted by Flink to DataDog using that hostname are not 
associated with the tags derived from Chef.

In the Job Manager we can avoid this issue by explicitly setting the config 
{{jobmanager.rpc.address}} to the hostname we desire.  I attempted to do the 
name on the Task Manager by setting the {{taskmanager.hostname}} config, but 
DataDog does not seem to pick up that value.

Digging through the code it seem the DD metric reporter get the hostname from 
the {{TaskManagerMetricGroup}} host variable, which seems to be set from 
{{taskManagerLocation.getHostname}}.  That in turn seems to be by calling 
{{this.inetAddress.getCanonicalHostName()}}, which merely perform a reverse 
lookup on the IP address, and then calling {{NetUtils.getHostnameFromFQDN}} on 
the result.  The later is further problematic because it result is a non-fully 
qualified hostname.

More generally, there seems to be a need to specify the hostname of a JM or TM 
node that be reused across Flink components.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures

2018-01-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8352:
-

 Summary: Flink UI Reports No Error on Job Submission Failures
 Key: FLINK-8352
 URL: https://issues.apache.org/jira/browse/FLINK-8352
 Project: Flink
  Issue Type: Bug
  Components: Web Client
Affects Versions: 1.4.0
Reporter: Elias Levy


If you submit a job jar via the web UI and it raises an exception when started, 
the UI will report no error and will continue the show the animated image that 
makes it seem as if it is working.  In addition, no error is printed in the 
logs, unless the level is increased to at least DEBUG:

{noformat}
@40005a4c399202b87ebc DEBUG 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
handling request.
@40005a4c399202b8868c java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b88a74   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68)
@40005a4c399202b88e5c   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
@40005a4c399202b8e44c   at 
java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
@40005a4c399202b8e44c   at java.util.concurrent.FutureTask.run(Unknown 
Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
 Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
@40005a4c399202b8f3ec   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
@40005a4c399202b8f7d4   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
@40005a4c399202b8f7d4   at java.lang.Thread.run(Unknown Source)
@40005a4c399202b8fbbc Caused by: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b90b5c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
@40005a4c399202b90f44   at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
@40005a4c399202b90f44   at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)
@40005a4c399202b91afc   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57)
@40005a4c399202b91afc   ... 8 more
@40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError
@40005a4c399202b91ee4   at 
com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala)
@40005a4c399202b922cc   at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
@40005a4c399202b92a9c   at 
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92a9c   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92e84   at java.lang.reflect.Method.invoke(Unknown 
Source)
@40005a4c399202b92e84   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
@40005a4c399202b9326c   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
@40005a4c399202b93a3c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
@40005a4c399202b949dc   ... 11 more
@40005a4c399202b949dc Caused by: java.io.FileNotFoundException: 
/data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55
 (No such file or directory)
@40005a4c399202b951ac   at java.io.FileOutputStream.open0(Native Method)
@40005a4c399202b951ac   at java.io.FileOutputStream.open(Unknown Source)
@40005a4c399202b9597c   at java.io.FileOutputStream.(Unknown 
Source)
@40005a4c399202b9597c   at java.io.FileWriter.(Unknown Source)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b9614c   at 
scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
@40005a4c399202b9614c   at scoverage.Invoker$.invoked(Invoker.scala:42)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala:28)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8311) Flink needs documentation for network access control

2017-12-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8311:
-

 Summary: Flink needs documentation for network access control
 Key: FLINK-8311
 URL: https://issues.apache.org/jira/browse/FLINK-8311
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Elias Levy


There is a need for better documentation on what connects to what over which 
ports in a Flink cluster to allow users to configure network access control 
rules.

E.g. I was under the impression that in a ZK HA configuration the Job Managers 
were essentially independent and only coordinated via ZK.  But starting 
multiple JMs in HA with the JM RPC port blocked between JMs shows that the 
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No 
response from remote for outbound association. Associate timed out after [2 
ms].
WARN  akka.remote.ReliableDeliverySupervisor- 
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for 
outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
connection timed out: /10.210.210.127:6123



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7935) Metrics with user supplied scope variables

2017-10-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7935:
-

 Summary: Metrics with user supplied scope variables
 Key: FLINK-7935
 URL: https://issues.apache.org/jira/browse/FLINK-7935
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.2
Reporter: Elias Levy


We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
metrics.

Flink names and scopes metrics together, at least by default. E.g. by default  
the System scope for operator metrics is 
{{.taskmanager}}.  The 
scope variables become part of the metric's full name.

In DD the metric would be named something generic, e.g. 
{{taskmanager.job.operator}}, and they would be distinguished by their tag 
values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.

Flink allows you to configure the format string for system scopes, so it is 
possible to set the operator scope format to {{taskmanager.job.operator}}.  We 
do this for all scopes:

{code}
metrics.scope.jm: jobmanager
metrics.scope.jm.job: jobmanager.job
metrics.scope.tm: taskmanager
metrics.scope.tm.job: taskmanager.job
metrics.scope.task: taskmanager.job.task
metrics.scope.operator: taskmanager.job.operator
{code}

This seems to work.  The DataDog Flink metric's plugin submits all scope 
variables as tags, even if they are not used within the scope format.  And it 
appears internally this does not lead to metrics conflicting with each other.

We would like to extend this to user defined metrics, but you can define 
variables/scopes when adding a metric group or metric with the user API, so 
that in DD we have a single metric with a tag with many different values, 
rather than hundreds of metrics to just the one value we want to measure across 
different event types.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7722) MiniCluster does not appear to honor Log4j settings

2017-09-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7722:
-

 Summary: MiniCluster does not appear to honor Log4j settings
 Key: FLINK-7722
 URL: https://issues.apache.org/jira/browse/FLINK-7722
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


When executing a job from the command line for testing, it will output logs 
like:

{noformat}
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] 
with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014.
09/27/2017 13:15:13 Job execution switched to status RUNNING.
09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED 
09/27/2017 13:15:13 Source: Collect
{noformat}

It will do so even if the log4j.properties file contains:

{code}
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.flink=ERROR
log4j.logger.akka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.hadoop=ERROR
log4j.logger.org.apache.zookeeper=ERROR
{code}

It seems that the MiniCluster does not honor Log4j settings, or at least that 
is my guess.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7687) Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts

2017-09-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7687:
-

 Summary: Clarify the master and slaves files are not necessary 
unless using the cluster start/stop scripts
 Key: FLINK-7687
 URL: https://issues.apache.org/jira/browse/FLINK-7687
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


It would be helpful if the documentation was clearer on the fact that the 
master/slaves config files are not needed when configured in high-availability 
mode unless you are using the provided scripts to start and shutdown the 
cluster over SSH.  If you are using some other mechanism to manage Flink 
instances (configuration management tools such as Chef or Ansible, or container 
management frameworks like Docker Compose or Kubernetes), these files are 
unnecessary.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7646) Restart failed jobs with configurable parallelism range

2017-09-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7646:
-

 Summary: Restart failed jobs with configurable parallelism range
 Key: FLINK-7646
 URL: https://issues.apache.org/jira/browse/FLINK-7646
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently, if a TaskManager fails the whole job is terminated and then, 
depending on the restart policy, may be attempted to be restarted.  If the 
failed TaskManager has not been replaced, and there are no spare task slots in 
the cluster, the job will fail to be restarted.

There are situations where restoring or adding a new TaskManager may take a 
while  For instance, in AWS an Auto Scaling Group can only be used to manage a 
group of instances in a single availability zone.  If you have a cluster of 
TaskManagers that spans an AZ, managed by one ASG per AZ, and an AZ goes dark, 
the other ASGs won't scale automatically to make up for the lost TaskManagers.  
To resolve the situation the healthy ASGs will need to be modified manually or 
by systems external to AWS.

With that in mind, it would be useful if you could specify a range for the 
parallelism parameter.  Under normal circumstances the job would execute with 
the maximum parallelism of the range.  But if TaskManagers were lost and not 
replaced after some time, the job would accept being execute with some lower 
parallelism within the range.

I understand that this may not be feasible with checkpoints, as savepoints are 
supposed to be the mechanism used to change parallelism of a stateful job.  
Therefore, this proposal may need to wait until the implementation of the 
periodic savepoint feature (FLINK-4511).

This feature would aid the availability of Flink jobs.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail

2017-09-18 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7641:
-

 Summary: Loss of JobManager in HA mode should not cause jobs to 
fail
 Key: FLINK-7641
 URL: https://issues.apache.org/jira/browse/FLINK-7641
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently if a standalone cluster of JobManagers is configured in 
high-availability mode and the master JM is lost, the job executing in the 
cluster will be restarted.  This is less than ideal.  It would be best if the 
jobs could continue to execute without restarting while one of the spare JMs 
becomes the new master, or in the worse case, the jobs are paused while the JM 
election takes place.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7640) Dashboard should display information about JobManager cluster in HA mode

2017-09-18 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7640:
-

 Summary: Dashboard should display information about JobManager 
cluster in HA mode
 Key: FLINK-7640
 URL: https://issues.apache.org/jira/browse/FLINK-7640
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently the dashboard provides no information about the status of a cluster 
of JobManagers configured in high-availability mode.  

The dashboard should display the status and membership of a JM cluster in the 
Overview and Job Manager sections.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7634) Add option to create a savepoint while canceling a job in the dashboard

2017-09-16 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7634:
-

 Summary: Add option to create a savepoint while canceling a job in 
the dashboard 
 Key: FLINK-7634
 URL: https://issues.apache.org/jira/browse/FLINK-7634
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


Currently there appears to be no way to trigger a savepoint in the dashboard, 
to cancel a job while taking a savepoint, to list savepoints, or to list 
external checkpoints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable

2017-08-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7547:
-

 Summary: o.a.f.s.api.scala.async.AsyncFunction is not declared 
Serializable
 Key: FLINK-7547
 URL: https://issues.apache.org/jira/browse/FLINK-7547
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


{{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared 
{{Serializable}}, whereas 
{{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is.  This 
leads to the job not starting as the as async function can't be serialized 
during initialization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7364) Log exceptions from user code in streaming jobs

2017-08-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7364:
-

 Summary: Log exceptions from user code in streaming jobs
 Key: FLINK-7364
 URL: https://issues.apache.org/jira/browse/FLINK-7364
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.1
Reporter: Elias Levy


Currently, if an exception arises in user supplied code within an operator in a 
streaming job, Flink terminates the job, but it fails to record the reason for 
the termination.  The logs do not record that there was an exception at all, 
much less recording the type of exception and where it occurred.  This makes it 
difficult to debug jobs without implementing exception recording code on all 
user supplied operators. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7286) Flink Dashboard fails to display bytes/records received by sources

2017-07-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7286:
-

 Summary: Flink Dashboard fails to display bytes/records received 
by sources
 Key: FLINK-7286
 URL: https://issues.apache.org/jira/browse/FLINK-7286
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.3.1
Reporter: Elias Levy


It appears Flink can't measure the number of bytes read or records produced by 
a source (e.g. Kafka source). This is particularly problematic for simple jobs 
where the job pipeline is chained, and in which there are no measurements 
between operators. Thus, in the UI it appears that the job is not consuming any 
data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7178) Datadog Metric Reporter Jar is Lacking Dependencies

2017-07-13 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7178:
-

 Summary: Datadog Metric Reporter Jar is Lacking Dependencies
 Key: FLINK-7178
 URL: https://issues.apache.org/jira/browse/FLINK-7178
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.3.1
Reporter: Elias Levy


The Datadog metric reporter has dependencies on {{com.squareup.okhttp3}} and 
{{com.squareup.okio}}.  It appears there was an attempt to Maven Shade plug-in 
to move these classes to {{org.apache.flink.shaded.okhttp3}} and 
{{org.apache.flink.shaded.okio}} during packaging.  Alas, the shaded classes 
are not included in the {{flink-metrics-datadog-1.3.1.jar}} released to Maven 
Central.  Using the Jar results in an error when the Jobmanager or Taskmanager 
starts up because of the missing dependencies. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6472) BoundedOutOfOrdernessTimestampExtractor does not bound out of orderliness

2017-05-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6472:
-

 Summary: BoundedOutOfOrdernessTimestampExtractor does not bound 
out of orderliness
 Key: FLINK-6472
 URL: https://issues.apache.org/jira/browse/FLINK-6472
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Elias Levy


{{BoundedOutOfOrdernessTimestampExtractor}} attempts to emit watermarks that 
lag behind the largest observed timestamp by a configurable time delta.  It 
fails to so in some circumstances.

The class extends {{AssignerWithPeriodicWatermarks}}, which generates 
watermarks in periodic intervals.  The timer for this intervals is a processing 
time timer.

In circumstances where there is a rush of events (restarting Flink, unpausing 
an upstream producer, loading events from a file, etc), many events with 
timestamps much larger that what the configured bound would normally allow will 
be sent downstream without a watermark.  This can have negative effects 
downstream, as operators may be buffering the events waiting for a watermark to 
process them, thus leading the memory growth and possible out-of-memory 
conditions.

It is probably best to have a bounded out of orderliness extractor that is 
based on the punctuated timestamp extractor, so we can ensure that watermarks 
are generated in a timely fashion in event time, with the addition of process 
time timer to generate a watermark if there has been a lull in events, thus 
also bounding the delay of generating a watermark in processing time. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6420) Cleaner CEP API to specify conditions between events

2017-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6420:
-

 Summary: Cleaner CEP API to specify conditions between events
 Key: FLINK-6420
 URL: https://issues.apache.org/jira/browse/FLINK-6420
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.3.0
Reporter: Elias Levy
Priority: Minor


Flink 1.3 will introduce so-called iterative conditions, which allow the 
predicate to look up events already matched by conditions in the pattern.  This 
permits specifying conditions between matched events, similar to a conditional 
join between tables in SQL.  Alas, the API could be simplified to specify such 
conditions more declaratively.

At the moment you have to do something like
{code}
Pattern.
  .begin[Foo]("first")
.where( first => first.baz == 1 )
  .followedBy("next")
.where({ (next, ctx) =>
  val first = ctx.getEventsForPattern("first").next
  first.bar == next.bar && next => next.boo = "x"
})
{code}
which is not very clean.  It would friendlier if you could do something like:
{code}
Pattern.
  .begin[Foo]("first")
.where( first => first.baz == 1 )
  .followedBy("next")
.relatedTo("first", { (first, next) => first.bar == next.bar })
.where( next => next.boo = "x" )
{code}
Something along these lines would work well when the condition being tested 
against matches a single event (single quantifier).  

If the condition being tested can accept multiple events (e.g. times 
quantifier) two other methods could be used {{relatedToAny}} and 
{{relatedToAll}}, each of which takes a predicate function.  In both cases each 
previously accepted element of the requested condition is evaluated against the 
predicate.  In the former case if any evaluation returns true the condition is 
satisfied.  In the later case all evaluations must return true for the 
condition to be satisfied.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6419) Better support for CEP quantified conditions in PatternSelect.select

2017-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6419:
-

 Summary: Better support for CEP quantified conditions in 
PatternSelect.select
 Key: FLINK-6419
 URL: https://issues.apache.org/jira/browse/FLINK-6419
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.3.0
Reporter: Elias Levy
Priority: Minor


Flink 1.3 introduces to the API quantifer methods which allow one to 
declaratively specific how many times a condition must be matched before there 
is a state change.

The pre-existing {{PatternSelect.select}} method does not account for this 
change very well.  The selection function passed to {{select}} receives a 
{{Map[String,T]}} as an argument that permits the function to look up the 
matched events by the condition's name.  

To support the new functionality that permits a condition to match multiple 
elements, when a quantifier is greater than one, the matched events are stored 
in the map by appending the condition's name with an underscore and an index 
value.

While functional, this is less than ideal.  It would be best if conditions with 
quantifier that is a multiple returned the matched events in an array and if 
they were accessible via the condition's name, without have to construct keys 
from the condition's name and an index, and iterate querying the map until no 
more are found. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6243) Continuous Joins: True Sliding Window Joins

2017-04-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6243:
-

 Summary: Continuous Joins:  True Sliding Window Joins
 Key: FLINK-6243
 URL: https://issues.apache.org/jira/browse/FLINK-6243
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 1.1.4
Reporter: Elias Levy


Flink defines sliding window joins as the join of elements of two streams that 
share a window of time, where the windows are defined by advancing them forward 
some amount of time that is less than the window time span.  More generally, 
such windows are just overlapping hopping windows. 

Other systems, such as Kafka Streams, support a different notion of sliding 
window joins.  In these systems, two elements of a stream are joined if the 
absolute time difference between the them is less or equal the time window 
length.

This alternate notion of sliding window joins has some advantages in some 
applications over the current implementation.  

Elements to be joined may both fall within multiple overlapping sliding 
windows, leading them to be joined multiple times, when we only wish them to be 
joined once.

The implementation need not instantiate window objects to keep track of stream 
elements, which becomes problematic in the current implementation if the window 
size is very large and the slide is very small.

It allows for asymmetric time joins.  E.g. join if elements from stream A are 
no more than X time behind and Y time head of an element from stream B.

It is currently possible to implement a join with these semantics using 
{{CoProcessFunction}}, but the capability should be a first class feature, such 
as it is in Kafka Streams.

To perform the join, elements of each stream must be buffered for at least the 
window time length.  To allow for large window sizes and high volume of 
elements, the state, possibly optionally, should be buffered such as it can 
spill to disk (e.g. by using RocksDB).

The same stream may be joined multiple times in a complex topology.  As an 
optimization, it may be wise to reuse any element buffer among colocated join 
operators.  Otherwise, there may write amplification and increased state that 
must be snapshotted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6239) Sharing of State Across Operators

2017-04-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6239:
-

 Summary: Sharing of State Across Operators
 Key: FLINK-6239
 URL: https://issues.apache.org/jira/browse/FLINK-6239
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.4
Reporter: Elias Levy


Currently state cannot be shared across operators.  On a keyed stream, the 
state is implicitly keyed by the operator id, in addition to the stream key.  
This can make it more difficult and inefficient to implement complex 
topologies, where multiple operator may need to access the same state.  It 
would be value to be able to access keyed value and map stated across operators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-4558) Add support for synchronizing streams

2016-09-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4558:
-

 Summary: Add support for synchronizing streams
 Key: FLINK-4558
 URL: https://issues.apache.org/jira/browse/FLINK-4558
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.0
Reporter: Elias Levy


As mentioned on the [mailing 
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html],
 there are use cases that require synchronizing two streams on via their times 
and where it is not practical to buffer all messages from one streams while 
waiting for the other to synchronize.  Flink should add functionality to enable 
such use cases.

This could be implemented by modifying TwoInputStreamOperator so that calls to 
processElement1 and processElement2 could return a value indicating that the 
element can't yet be processed, having the framework then pause processing for 
some time, potentially using exponential back off with a hard maximum, and then 
allowing the back pressure system to do its work and pause the stream.

Alternatively, an API could be added to explicitly pause/unpause a stream.

For ease of use either of these mechanism should be used to create a 
SynchronizedTwoInputStreamOperator that end users can utilize by passing a 
configurable time delta to use as a synchronization threshold.
 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4502:
-

 Summary: Cassandra connector documentation has misleading 
consistency guarantees
 Key: FLINK-4502
 URL: https://issues.apache.org/jira/browse/FLINK-4502
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy


The Cassandra connector documentation states that  "enableWriteAheadLog() is an 
optional method, that allows exactly-once processing for non-deterministic 
algorithms."  This claim appears to be false.

>From what I gather, the write ahead log feature of the connector works as 
>follows:
- The sink is replaced with a stateful operator that writes incoming messages 
to the state backend based on checkpoint they belong in.
- When the operator is notified that a Flink checkpoint has been completed it, 
for each set of checkpoints older than and including the committed one:
  * reads its messages from the state backend
  * writes them to Cassandra
  * records that it has committed them to Cassandra for the specific checkpoint 
and operator instance
   * and erases them from the state backend.

This process attempts to avoid resubmitting queries to Cassandra that would 
otherwise occur when recovering a job from a checkpoint and having messages 
replayed.

Alas, this does not guarantee exactly once semantics at the sink.  The writes 
to Cassandra that occur when the operator is notified that checkpoint is 
completed are not atomic and they are potentially non-idempotent.  If the job 
dies while writing to Cassandra or before committing the checkpoint via 
committer, queries will be replayed when the job recovers.  Thus the 
documentation appear to be incorrect in stating this provides exactly-once 
semantics.

There also seems to be an issue in GenericWriteAheadSink's 
notifyOfCompletedCheckpoint which may result in incorrect output.  If 
sendValues returns false because a write failed, instead of bailing, it simply 
moves on to the next checkpoint to commit if there is one, keeping the previous 
one around to try again later.  But that can result in newer data being 
overwritten with older data when the previous checkpoint is retried.  Although 
given that CassandraCommitter implements isCheckpointCommitted as checkpointID 
<= this.lastCommittedCheckpointID, it actually means that when it goes back to 
try the uncommitted older checkpoint it will consider it committed, even though 
some of its data may not have been written out, and the data will be discarded.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4501) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4501:
-

 Summary: Cassandra sink can lose messages
 Key: FLINK-4501
 URL: https://issues.apache.org/jira/browse/FLINK-4501
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The problem is the same as I pointed out with the Kafka producer sink 
(FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
both send data asynchronously to Cassandra and record whether an error occurs 
via a future callback.  But CassandraSinkBase does not implement Checkpointed, 
so it can't stop checkpoint from happening even though the are Cassandra 
queries in flight from the checkpoint that may fail.  If they do fail, they 
would subsequently not be replayed when the job recovered, and would thus be 
lost.

In addition, 
CassandraSinkBase's close should check whether there is a pending exception and 
throw it, rather than silently close.  It should also wait for any pending 
async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4500) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4500:
-

 Summary: Cassandra sink can lose messages
 Key: FLINK-4500
 URL: https://issues.apache.org/jira/browse/FLINK-4500
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The problem is the same as I pointed out with the Kafka producer sink 
(FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
both send data asynchronously to Cassandra and record whether an error occurs 
via a future callback.  But CassandraSinkBase does not implement Checkpointed, 
so it can't stop checkpoint from happening even though the are Cassandra 
queries in flight from the checkpoint that may fail.  If they do fail, they 
would subsequently not be replayed when the job recovered, and would thus be 
lost.

In addition, 
CassandraSinkBase's close should check whether there is a pending exception and 
throw it, rather than silently close.  It should also wait for any pending 
async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4498) Better Cassandra sink documentation

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4498:
-

 Summary: Better Cassandra sink documentation
 Key: FLINK-4498
 URL: https://issues.apache.org/jira/browse/FLINK-4498
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy


The Cassandra sink documentation is somewhat muddled and could be improved.  
For instance, the fact that is only supports tuples and POJO's that use 
DataStax Mapper annotations is only mentioned in passing, and it is not clear 
that the reference to tuples only applies to Flink Java tuples and not Scala 
tuples.  

The documentation also does not mention that setQuery() is only necessary for 
tuple streams.  It would be good to have an example of a POJO stream with the 
DataStax annotations.

The explanation of the write ahead log could use some cleaning up to clarify 
when it is appropriate to use, ideally with an example.  Maybe this would be 
best as a blog post to expand on the type of non-deterministic streams this 
applies to.

It would also be useful to mention that tuple elements will be mapped to 
Cassandra columns using the Datastax Java driver's default encoders, which are 
somewhat limited (e.g. to write to a blob column the type in the tuple must be 
a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4497:
-

 Summary: Add support for Scala tuples and case classes to 
Cassandra sink
 Key: FLINK-4497
 URL: https://issues.apache.org/jira/browse/FLINK-4497
 Project: Flink
  Issue Type: Improvement
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The new Cassandra sink only supports streams of Flink Java tuples and Java 
POJOs that have been annotated for use by Datastax Mapper.  The sink should be 
extended to support Scala types and case classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground

2016-08-05 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4326:
-

 Summary: Flink start-up scripts should optionally start services 
on the foreground
 Key: FLINK-4326
 URL: https://issues.apache.org/jira/browse/FLINK-4326
 Project: Flink
  Issue Type: Improvement
  Components: Startup Shell Scripts
Affects Versions: 1.0.3
Reporter: Elias Levy


This has previously been mentioned in the mailing list, but has not been 
addressed.  Flink start-up scripts start the job and task managers in the 
background.  This makes it difficult to integrate Flink with most processes 
supervisory tools and init systems, including Docker.  One can get around this 
via hacking the scripts or manually starting the right classes via Java, but it 
is a brittle solution.

In addition to starting the daemons in the foreground, the start up scripts 
should use exec instead of running the commends, so as to avoid forks.  Many 
supervisory tools assume the PID of the process to be monitored is that of the 
process it first executes, and fork chains make it difficult for the supervisor 
to figure out what process to monitor.  Specifically, jobmanager.sh and 
taskmanager.sh should exec flink-daemon.sh, and flink-daemon.sh should exec 
java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4050) FlinkKafkaProducer API Refactor

2016-06-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4050:
-

 Summary: FlinkKafkaProducer API Refactor
 Key: FLINK-4050
 URL: https://issues.apache.org/jira/browse/FLINK-4050
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Elias Levy


The FlinkKafkaProducer API seems more difficult to use than it should be.  

The API requires you pass it a SerializationSchema or a 
KeyedSerializationSchema, but the Kafka producer already has a serialization 
API.  Requiring a serializer in the Flink API precludes the use of the Kafka 
serializers.  For instance, they preclude the use of the Confluent 
KafkaAvroSerializer class that makes use of the Confluent Schema Registry.  
Ideally, the serializer would be optional, so as to allow the Kafka producer 
serializers to handle the task.

In addition, the KeyedSerializationSchema conflates message key extraction with 
key serialization.  If the serializer were optional, to allow the Kafka 
producer serializers to take over, you'd still need to extract a key from the 
message.

And given that the key may not be part of the message you want to write to 
Kafka, an upstream step may have to package the key with the message to make 
both available to the sink, for instance in a tuple. That means you also need 
to define a method to extract the message to write to Kafka from the element 
passed into the sink by Flink.  

In summary, there should be separation of extraction of the key and message 
from the element passed into the sink from serialization, and the serialization 
step should be optional.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4027:
-

 Summary: FlinkKafkaProducer09 sink can lose messages
 Key: FLINK-4027
 URL: https://issues.apache.org/jira/browse/FLINK-4027
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Elias Levy
Priority: Critical


The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.

The producer is publishing messages asynchronously.  A callback can record 
publishing errors, which will be raised when detected.  But as far as I can 
tell, there is no barrier to wait for async errors from the sink when 
checkpointing or to track the event time of acked messages to inform the 
checkpointing process.

If a checkpoint occurs while there are pending publish requests, and the 
requests return a failure after the checkpoint occurred, those message will be 
lost as the checkpoint will consider them processed by the sink.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3984) Event time of stream transformations is undocumented

2016-05-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-3984:
-

 Summary: Event time of stream transformations is undocumented
 Key: FLINK-3984
 URL: https://issues.apache.org/jira/browse/FLINK-3984
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.0.3
Reporter: Elias Levy


The Event Time, Windowing, and DataStream Transformation documentation section 
fail to state what event time, if any, the output of transformations have on a 
stream that is configured to use event time and that has timestamp assigners.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3710) ScalaDocs for org.apache.flink.streaming.scala are missing from the web site

2016-04-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-3710:
-

 Summary: ScalaDocs for org.apache.flink.streaming.scala are 
missing from the web site
 Key: FLINK-3710
 URL: https://issues.apache.org/jira/browse/FLINK-3710
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.0.1
Reporter: Elias Levy


The ScalaDocs only include docs for org.apache.flink.scala and sub-packages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3692) Develop a Kafka state backend

2016-04-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-3692:
-

 Summary: Develop a Kafka state backend
 Key: FLINK-3692
 URL: https://issues.apache.org/jira/browse/FLINK-3692
 Project: Flink
  Issue Type: New Feature
  Components: Core
Reporter: Elias Levy


Flink clusters usually consume of a Kafka cluster.  It simplify operations if 
Flink could store its state checkpoints in Kafka.  This should be possibly 
using different topics to write to, partitioning appropriately, and using 
compacted topics.  This would avoid the need to run an HDFS cluster just to 
store Flink checkpoints.

For inspiration you may want to take a look at how Samza checkpoints a task's 
local state to a Kafka topic, and how the newer Kafka consumers checkpoint 
their offsets to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)