[GitHub] twalthr commented on a change in pull request #7587: [FLINK-11064] [table] Setup a new flink-table module structure

2019-01-29 Thread GitBox
twalthr commented on a change in pull request #7587: [FLINK-11064] [table] 
Setup a new flink-table module structure
URL: https://github.com/apache/flink/pull/7587#discussion_r252147236
 
 

 ##
 File path: flink-table/flink-table-dist/pom.xml
 ##
 @@ -0,0 +1,103 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-table
+   1.8-SNAPSHOT
+   ..
+   
+
+   flink-table-dist_${scala.binary.version}
 
 Review comment:
   Sounds good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252145959
 
 

 ##
 File path: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 ##
 @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // 

 
-  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] 
= {
-new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
-  }
-
-  override def ensureCompatibility(
-  configSnapshot: TypeSerializerConfigSnapshot[_]): 
CompatibilityResult[Option[A]] = {
-
-configSnapshot match {
-  case optionSerializerConfigSnapshot
-  : ScalaOptionSerializerConfigSnapshot[A] =>
-ensureCompatibilityInternal(optionSerializerConfigSnapshot)
-  case legacyOptionSerializerConfigSnapshot
-  : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
 
 Review comment:
   Removing this path will lead to problems when restoring from Flink 1.3, 
because this snapshot class was used back in Flink 1.3.
   
   OTOH, it should be possible to redirect `OptionSerializerConfigSnapshot`'s 
compatibility check to the new `ScalaOptionSerializerSnapshot`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252144982
 
 

 ##
 File path: 
flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
 ##
 @@ -31,6 +33,7 @@
  * allow calling different base class constructors from subclasses, while we 
need that
  * for the default empty constructor.
 
 Review comment:
   nit: Add `@deprecated` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-01-29 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755785#comment-16755785
 ] 

Dian Fu commented on FLINK-11447:
-

Thanks [~dawidwys] for the sharing your thoughts. If we choose option 3, we 
need to add the following new interfaces to Table (maybe more in the future if 
more join type is supported):
{code:java}
join(udtf: String)
join(udtf: String, joinPredicate: String)
join(udtf: String, joinPredicate: Expression)

leftOuterJoin(udtf: String)
leftOuterJoin(udtf: String, joinPredicate: String)
leftOuterJoin(udtf: String, joinPredicate: Expression){code}
Actually there are such kinds of interfaces in Table previously and these 
interfaces are removed in FLINK-6334 as to left UDTF use table.join(table) 
interface. Personally I prefer the table.join(table) as it's more straight 
forward in the semantics. Thoughts?

 

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version

2019-01-29 Thread GitBox
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
URL: https://github.com/apache/flink/pull/7599#issuecomment-458840957
 
 
   @StefanRRichter Please help to review this PR, thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252140553
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 ##
 @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) {
}
 
@Override
-   public TypeSerializerConfigSnapshot> 
snapshotConfiguration() {
-   return new 
UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
-   }
-
-   @Override
-   public CompatibilityResult> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-   if (configSnapshot instanceof 
UnionSerializerConfigSnapshot) {
-   List, 
TypeSerializerSnapshot>> previousSerializersAndConfigs =
-   ((UnionSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-
-   CompatibilityResult 
oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-   previousSerializersAndConfigs.get(0).f0,
-   UnloadableDummyTypeSerializer.class,
-   previousSerializersAndConfigs.get(0).f1,
-   oneSerializer);
-
-   CompatibilityResult 
twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-   previousSerializersAndConfigs.get(1).f0,
-   UnloadableDummyTypeSerializer.class,
-   previousSerializersAndConfigs.get(1).f1,
-   twoSerializer);
-
-   if 
(!oneSerializerCompatResult.isRequiresMigration() && 
!twoSerializerCompatResult.isRequiresMigration()) {
-   return CompatibilityResult.compatible();
-   } else if 
(oneSerializerCompatResult.getConvertDeserializer() != null && 
twoSerializerCompatResult.getConvertDeserializer() != null) {
-   return 
CompatibilityResult.requiresMigration(
-   new UnionSerializer<>(
-   new 
TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
-   new 
TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer(;
-   }
-   }
-
-   return CompatibilityResult.requiresMigration();
+   public TypeSerializerSnapshot> 
snapshotConfiguration() {
+   return new UnionSerializerSnapshot<>(this);
}
}
 
/**
 * The {@link TypeSerializerConfigSnapshot} for the {@link 
UnionSerializer}.
 
 Review comment:
   nit: Add `@deprecated` message


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Myasuka removed a comment on issue #7595: [FLINK-11363][tests] Port TaskManagerConfigurationTest to new code base

2019-01-29 Thread GitBox
Myasuka removed a comment on issue #7595: [FLINK-11363][tests] Port 
TaskManagerConfigurationTest to new code base
URL: https://github.com/apache/flink/pull/7595#issuecomment-458839395
 
 
   LGTM +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
igalshilman commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252140569
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   But still, I'd actually suggest adding that, to enforce good practices.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Myasuka commented on issue #7595: [FLINK-11363][tests] Port TaskManagerConfigurationTest to new code base

2019-01-29 Thread GitBox
Myasuka commented on issue #7595: [FLINK-11363][tests] Port 
TaskManagerConfigurationTest to new code base
URL: https://github.com/apache/flink/pull/7595#issuecomment-458839395
 
 
   LGTM +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
igalshilman commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252139996
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -126,15 +128,15 @@ private IS createState() throws Exception {
@SuppressWarnings("unchecked")
private IS createValueState() throws Exception {
ValueStateDescriptor> ttlDescriptor = new 
ValueStateDescriptor<>(
-   stateDesc.getName(), new 
TtlSerializer<>(stateDesc.getSerializer()));
+   stateDesc.getName(), new 
TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 
 Review comment:
   I disagree, same reasons as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
igalshilman commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252139840
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
-   public Serializer(TypeSerializer userValueSerializer) {
-   super(true, userValueSerializer, 
LongSerializer.INSTANCE);
+   public Serializer(TypeSerializer valueSerializer, 
TypeSerializer timestampSerializer) {
 
 Review comment:
   As far as I can tell this is not a user facing serializer, rather used 
internally by the `TtlSerializer`,
   and I think it is important to make it explicit that this is a composite 
serializer and these are the nested serializers that define it.
   
   As a way to reduce verbosity, we can add a static factory method with an 
explicit name.
   Feel free to decide for yourself :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252139206
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ##
 @@ -260,73 +262,80 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
}
 
// 

-   // Serializer configuration snapshotting & compatibility
+   // Serializer configuration snapshoting & compatibility
// 

 
@Override
-   public RowSerializerConfigSnapshot snapshotConfiguration() {
-   return new RowSerializerConfigSnapshot(fieldSerializers);
+   public TypeSerializerSnapshot snapshotConfiguration() {
+   return new RowSerializerSnapshot(this);
}
 
-   @Override
-   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-   if (configSnapshot instanceof RowSerializerConfigSnapshot) {
-   List, 
TypeSerializerSnapshot>> previousFieldSerializersAndConfigs =
-   ((RowSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-
-   if (previousFieldSerializersAndConfigs.size() == 
fieldSerializers.length) {
-   boolean requireMigration = false;
-   TypeSerializer[] convertDeserializers = new 
TypeSerializer[fieldSerializers.length];
-
-   CompatibilityResult compatResult;
-   int i = 0;
-   for (Tuple2, 
TypeSerializerSnapshot> f : previousFieldSerializersAndConfigs) {
-   compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-   f.f0,
-   
UnloadableDummyTypeSerializer.class,
-   f.f1,
-   fieldSerializers[i]);
-
-   if (compatResult.isRequiresMigration()) 
{
-   requireMigration = true;
-
-   if 
(compatResult.getConvertDeserializer() == null) {
-   // one of the field 
serializers cannot provide a fallback deserializer
-   return 
CompatibilityResult.requiresMigration();
-   } else {
-   convertDeserializers[i] 
=
-   new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
-   }
-   }
+   /**
+* A snapshot for {@link RowSerializer}.
+*/
 
 Review comment:
   nit: Add `@deprecated` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252134315
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 ##
 @@ -233,45 +247,25 @@ public int hashCode() {
}
 
@Override
-   public NullableSerializerConfigSnapshot snapshotConfiguration() {
-   return new 
NullableSerializerConfigSnapshot<>(originalSerializer);
+   public TypeSerializerSnapshot snapshotConfiguration() {
+   return new NullableSerializerSnapshot<>(this);
}
 
-   @Override
-   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-   if (configSnapshot instanceof NullableSerializerConfigSnapshot) 
{
-   List, 
TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
-   ((NullableSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-
-   CompatibilityResult compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-   previousKvSerializersAndConfigs.get(0).f0,
-   UnloadableDummyTypeSerializer.class,
-   previousKvSerializersAndConfigs.get(0).f1,
-   originalSerializer);
-
-   if (!compatResult.isRequiresMigration()) {
-   return CompatibilityResult.compatible();
-   } else if (compatResult.getConvertDeserializer() != 
null) {
-   return CompatibilityResult.requiresMigration(
-   new NullableSerializer<>(
-   new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), 
padNullValue()));
-   }
-   }
-
-   return CompatibilityResult.requiresMigration();
-   }
 
/**
 * Configuration snapshot for serializers of nullable types, containing 
the
 * configuration snapshot of its original serializer.
 
 Review comment:
   nit: Add `@deprecated` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Myasuka commented on issue #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types

2019-01-29 Thread GitBox
Myasuka commented on issue #7598: [FLINK-11333][protobuf] First-class 
serializer support for Protobuf types
URL: https://github.com/apache/flink/pull/7598#issuecomment-458831260
 
 
   CC @tzulitai  would you please review this part of code?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252132629
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   Ah scratch that, just realized that this is only a serializer used in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252130677
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   This serializer class previously did not have a `serialVersionUID` defined.
   Need to explicitly set it to what it was before, because I guess the serial 
version UID would have changed when adding the new constructors.
   
   OTOH, there seems to be missing a migration test for this serializer, 
because that would have caught this problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252130677
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   This serializer class previously did not have a `serialVersionUID` defined.
   Need to explicitly set it to what it was before, to be on the safe side here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11460) Remove useless AcknowledgeStreamMockEnvironment

2019-01-29 Thread zhijiang (JIRA)
zhijiang created FLINK-11460:


 Summary: Remove useless AcknowledgeStreamMockEnvironment
 Key: FLINK-11460
 URL: https://issues.apache.org/jira/browse/FLINK-11460
 Project: Flink
  Issue Type: Task
  Components: Tests
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.8.0


This class is not used any more in the code path, so delete it directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128586
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs value, 
int index) {
protected CompositeSerializer> 
createSerializerInstance(
PrecomputedParameters precomputed,
TypeSerializer... originalSerializers) {
-   return new Serializer(precomputed, 
(TypeSerializer) originalSerializers[0]);
+
+   return new Serializer(precomputed, 
originalSerializers[0], originalSerializers[1]);
+   }
+
+   TypeSerializer getValueSerializer() {
+   return fieldSerializers[0];
+   }
+
+   @SuppressWarnings("unchecked")
+   TypeSerializer getTimestampSerializer() {
+   TypeSerializer fieldSerializer = fieldSerializers[1];
+   return (TypeSerializer) fieldSerializer;
+   }
+
+   @Override
+   public TypeSerializerSnapshot> 
snapshotConfiguration() {
+   return new ValueWithTsSerializerSnapshot(this);
+   }
+   }
+
+   /**
+* A {@link TypeSerializerSnapshot} for ValueWithTs Serializer.
+*/
+   public static final class ValueWithTsSerializerSnapshot extends 
CompositeTypeSerializerSnapshot, Serializer> {
+
+   private final static int VERSION = 2;
+
+   @SuppressWarnings("unused")
+   public ValueWithTsSerializerSnapshot() {
+   super(Serializer.class);
+   }
+
+   ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
+   super(serializerInstance);
+   }
+
+   @Override
+   protected int getCurrentOuterSnapshotVersion() {
+   return VERSION;
+   }
+
+   @Override
+   protected TypeSerializer[] getNestedSerializers(Serializer 
outerSerializer) {
+   return new 
TypeSerializer[]{outerSerializer.getValueSerializer(), 
outerSerializer.getTimestampSerializer()};
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   protected Serializer 
createOuterSerializerWithNestedSerializers(TypeSerializer[] 
nestedSerializers) {
+   TypeSerializer valueSerializer = 
nestedSerializers[0];
+   TypeSerializer timeSerializer = 
(TypeSerializer) nestedSerializers[1];
 
 Review comment:
   nit: `time` --> `timestamp` for naming consistency


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128501
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
-   public Serializer(TypeSerializer userValueSerializer) {
-   super(true, userValueSerializer, 
LongSerializer.INSTANCE);
+   public Serializer(TypeSerializer valueSerializer, 
TypeSerializer timestampSerializer) {
 
 Review comment:
   I don't think we need a public constructor that accepts the timestamp 
serializer.
   This should be a private constructor used only by the snapshot class.
   
   We should still have a public constructor that accepts the user value 
serializer, and by default just uses `LongSerializer.INSTANCE` as the new 
timestamp serializer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128802
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -126,15 +128,15 @@ private IS createState() throws Exception {
@SuppressWarnings("unchecked")
private IS createValueState() throws Exception {
ValueStateDescriptor> ttlDescriptor = new 
ValueStateDescriptor<>(
-   stateDesc.getName(), new 
TtlSerializer<>(stateDesc.getSerializer()));
+   stateDesc.getName(), new 
TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 
 Review comment:
   As mentioned above, having to pass in a `LongSerializer.INSTANCE` every time 
we're instantiating a TtlSerializer seems very redundant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252127263
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ##
 @@ -302,6 +299,7 @@ static PrecomputedParameters precompute(
}
 
/** Snapshot field serializers of composite type. */
 
 Review comment:
   nit: Add `@deprecated` message and direct to new snapshot class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai edited a comment on issue #7566: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface

2019-01-29 Thread GitBox
tzulitai edited a comment on issue #7566: [FLINK-11328][core] Migrate 
NFAStateSerializer to new TypeSerializer interface
URL: https://github.com/apache/flink/pull/7566#issuecomment-458822350
 
 
   Thanks @dawidwys! I looked at all 3 commits; the refactoring part is also 
nice to have.
   The actual migration changes in the 3rd commit also looks good!
   
   LGTM. I'll merge this together with other composite serializer changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7566: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface

2019-01-29 Thread GitBox
tzulitai commented on issue #7566: [FLINK-11328][core] Migrate 
NFAStateSerializer to new TypeSerializer interface
URL: https://github.com/apache/flink/pull/7566#issuecomment-458822350
 
 
   Thanks @dawidwys! I looked at all 3 commits; the refactoring part is also 
nice to have.
   The actual migration changes in the 3rd commit also looks good!
   
   LGTM, merging this ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11450) Port and move TableSource and TableSink to flink-table-common

2019-01-29 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755713#comment-16755713
 ] 

Dian Fu commented on FLINK-11450:
-

 [~twalthr] Sounds great. Will submit a PR ASAP.

> Port and move TableSource and TableSink to flink-table-common
> -
>
> Key: FLINK-11450
> URL: https://issues.apache.org/jira/browse/FLINK-11450
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> This step only unblockes the TableEnvironment interfaces task. 
> Stream/BatchTableSouce/Sink remain in flink-table-api-java-bridge for now 
> until they have been reworked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version

2019-01-29 Thread GitBox
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
URL: https://github.com/apache/flink/pull/7599#issuecomment-458821842
 
 
   **Testing results:**
   
   [INFO] ---
   [INFO]  T E S T S
   [INFO] ---
   [INFO] Running org.apache.flink.fs.osshadoop.HadoopOSSFileSystemITCase
   [INFO] Running 
org.apache.flink.fs.osshadoop.HadoopOSSFileSystemBehaviorITCase
   [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.055 
s - in org.apache.flink.fs.osshadoop.HadoopOSSFileSystemITCase
   [WARNING] Tests run: 8, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 
6.429 s - in org.apache.flink.fs.osshadoop.HadoopOSSFileSystemBehaviorITCase
   [INFO]
   [INFO] Results:
   [INFO]
   [WARNING] Tests run: 11, Failures: 0, Errors: 0, Skipped: 2


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11442) Upgrade OSS SDK Version

2019-01-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11442:
---
Labels: pull-request-available  (was: )

> Upgrade OSS SDK Version
> ---
>
> Key: FLINK-11442
> URL: https://issues.apache.org/jira/browse/FLINK-11442
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.8.0
>Reporter: wujinhu
>Assignee: wujinhu
>Priority: Major
>  Labels: pull-request-available
>
> Upgrade oss sdk version to exclude org.json dependency.
> [INFO] +- com.aliyun.oss:aliyun-sdk-oss:jar:3.1.0:compile
> [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.5.3:compile
> [INFO] | | \- org.apache.httpcomponents:httpcore:jar:4.4.6:compile
> [INFO] | +- org.jdom:jdom:jar:1.1:compile
> [INFO] | +- com.sun.jersey:jersey-json:jar:1.9:compile
> [INFO] | | +- org.codehaus.jettison:jettison:jar:1.1:compile
> [INFO] | | | \- stax:stax-api:jar:1.0.1:compile
> [INFO] | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
> [INFO] | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
> [INFO] | | | +- javax.xml.stream:stax-api:jar:1.0-2:compile
> [INFO] | | | \- javax.activation:activation:jar:1.1:compile
> [INFO] | | +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
> [INFO] | | \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
> [INFO] | +- com.aliyun:aliyun-java-sdk-core:jar:3.4.0:compile
> [INFO] | | \- org.json:json:jar:20170516:compile
> [INFO] | +- com.aliyun:aliyun-java-sdk-ram:jar:3.0.0:compile
> [INFO] | +- com.aliyun:aliyun-java-sdk-sts:jar:3.0.0:compile
> [INFO] | \- com.aliyun:aliyun-java-sdk-ecs:jar:4.2.0:compile
>  
> The license of org.json:json:jar:20170516:compile is JSON License, which 
> cannot be included.
> [https://www.apache.org/legal/resolved.html#json]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wujinhu opened a new pull request #7599: [FLINK-11442] upgrade oss sdk version

2019-01-29 Thread GitBox
wujinhu opened a new pull request #7599: [FLINK-11442] upgrade oss sdk version
URL: https://github.com/apache/flink/pull/7599
 
 
   
   
   ## What is the purpose of the change
   
   Upgrade oss sdk version to exclude org.json dependency
   
   https://issues.apache.org/jira/browse/FLINK-11442
   
   ## Brief change log
   
   Upgrade oss sdk version from 3.1.0 to 3.4.1
   
   ## Verifying this change
   
   This change does not add tests
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**yes** / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XuQianJin-Stars commented on issue #7450: [FLINK-11296][Table API & SQL] Support truncate in TableAPI

2019-01-29 Thread GitBox
XuQianJin-Stars commented on issue #7450: [FLINK-11296][Table API & SQL] 
Support truncate in TableAPI
URL: https://github.com/apache/flink/pull/7450#issuecomment-458809680
 
 
   hi @wuchong Thank you very much. I know you are very busy.
   best 
   qianjin


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wuchong commented on issue #7450: [FLINK-11296][Table API & SQL] Support truncate in TableAPI

2019-01-29 Thread GitBox
wuchong commented on issue #7450: [FLINK-11296][Table API & SQL] Support 
truncate in TableAPI
URL: https://github.com/apache/flink/pull/7450#issuecomment-458806492
 
 
   Thank @XuQianJin-Stars  and sorry for the late reply. The PR looks good to 
me now. 
   
   Will merge this soon.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11333) First-class support for Protobuf types with evolvable schema

2019-01-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11333:
---
Labels: pull-request-available  (was: )

> First-class support for Protobuf types with evolvable schema
> 
>
> Key: FLINK-11333
> URL: https://issues.apache.org/jira/browse/FLINK-11333
> Project: Flink
>  Issue Type: Sub-task
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>
> I think we have more and more users who are thinking about using Protobuf for 
> their state types.
> Right now, Protobuf isn't supported directly in Flink. The only way to use 
> Protobuf for a type is to register it via Kryo: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html.
> Likewise for Avro types, we should be able to natively support Protobuf, 
> having a {{ProtobufSerializer}} that handles serialization of Protobuf types. 
> The serializer should also write necessary information in its snapshot, to 
> enable schema evolution for it in the future. For Protobuf, this should 
> almost work out-of-the-box.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka opened a new pull request #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types

2019-01-29 Thread GitBox
Myasuka opened a new pull request #7598: [FLINK-11333][protobuf] First-class 
serializer support for Protobuf types
URL: https://github.com/apache/flink/pull/7598
 
 
   
   ## What is the purpose of the change
   
   Support Protobuf types directly in Flink. Unlike the built-in avro 
serializer of Flink, check whether schema evolvable left to Protobuf itself not 
checking before any code running currently. This is a known limitation and 
recorded in [FLINK-11333's 
comments](https://issues.apache.org/jira/browse/FLINK-11333?focusedCommentId=16755185=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16755185).
   
   
   ## Brief change log
   
 - Support to extract `ProtobufTypeInfo` within `TypeExtractor`.
 - Support to (de)serialize protobuf's message directly without introducing 
`chill-protobuf` and Kryo.
 - Support to migrate old savepoint data which serialized by kryo 
serializer to use current newly `ProtobufSeriazlier`
 - Also support to build protoc on travis.
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
 - Added unit tests for `ProtobufTypeInfo`, `ProtobufSeriazlier` and 
`ProtobufSeriazlierSnapshot`.
 - Extended integration test to verify whether could migrate old savepoint 
data with kryo serializer to use current newly `ProtobufSeriazlier`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): no, should 
not.
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, savepoint data might 
need to be migarated.
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jinglining commented on a change in pull request #7567: [FLINK-11358][tests] Port LeaderChangeStateCleanupTest to new code base

2019-01-29 Thread GitBox
jinglining commented on a change in pull request #7567: [FLINK-11358][tests] 
Port LeaderChangeStateCleanupTest to new code base
URL: https://github.com/apache/flink/pull/7567#discussion_r252108778
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 ##
 @@ -19,281 +19,156 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
-import org.junit.After;
+
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests which verify the cluster behaviour in case of leader changes.
+ */
 public class LeaderChangeStateCleanupTest extends TestLogger {
 
 Review comment:
   How about LeaderChangeJobRecoveryTest?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jinglining commented on a change in pull request #7567: [FLINK-11358][tests] Port LeaderChangeStateCleanupTest to new code base

2019-01-29 Thread GitBox
jinglining commented on a change in pull request #7567: [FLINK-11358][tests] 
Port LeaderChangeStateCleanupTest to new code base
URL: https://github.com/apache/flink/pull/7567#discussion_r252108778
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 ##
 @@ -19,281 +19,156 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
-import org.junit.After;
+
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
-import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests which verify the cluster behaviour in case of leader changes.
+ */
 public class LeaderChangeStateCleanupTest extends TestLogger {
 
 Review comment:
   How about LeaderChangeJobRecoveryTest


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] chensi2017 opened a new pull request #7597: fix wrong format

2019-01-29 Thread GitBox
chensi2017 opened a new pull request #7597: fix wrong format
URL: https://github.com/apache/flink/pull/7597
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-29 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r252105308
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   Hmm, maybe I think it is safe to use the default accept method since there 
is nothing in `FlinkLogicalUpsertToRetraction` need to be processed by a 
`RexShuttle`.
   However, if we throw an exception in it, there are some cases would not 
working. For example,  a `RelShuttleImpl` may be used to process the whole plan 
and use `RexShuttle` to process RexNodes for a RelNode. 
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Issue Comment Deleted] (FLINK-9054) IllegalStateException: Buffer pool is destroyed

2019-01-29 Thread guchunhui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guchunhui updated FLINK-9054:
-
Comment: was deleted

(was: Hi,I also have this problem  with Flink 1.6. Do you solve this problem?)

> IllegalStateException: Buffer pool is destroyed
> ---
>
> Key: FLINK-9054
> URL: https://issues.apache.org/jira/browse/FLINK-9054
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: flink-conf.yaml
>
>
> Hi,
> I have a flink cluster running on 2 machines, say A and B.
> Job manager is running on A. There are 2 TaksManagers, one on each node.
> So effectively, A has a job manager and a task manager, while B has a task 
> manager.
> When I submit a job to the cluster, I see below exception and the job fails:
> 2018-03-22 17:16:52,205 WARN 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while 
> emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 10 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>  ... 19 more
>  
> The exception does not come when I run only one JobManager (only on machine 
> B).
>  
> I 

[jira] [Comment Edited] (FLINK-9054) IllegalStateException: Buffer pool is destroyed

2019-01-29 Thread guchunhui (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755630#comment-16755630
 ] 

guchunhui edited comment on FLINK-9054 at 1/30/19 3:14 AM:
---

Hi,I also have this problem  with Flink 1.6. Do you solve this problem?


was (Author: guchunhui):
Hi,I also have this problem  with Flink 1.6.

> IllegalStateException: Buffer pool is destroyed
> ---
>
> Key: FLINK-9054
> URL: https://issues.apache.org/jira/browse/FLINK-9054
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: flink-conf.yaml
>
>
> Hi,
> I have a flink cluster running on 2 machines, say A and B.
> Job manager is running on A. There are 2 TaksManagers, one on each node.
> So effectively, A has a job manager and a task manager, while B has a task 
> manager.
> When I submit a job to the cluster, I see below exception and the job fails:
> 2018-03-22 17:16:52,205 WARN 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while 
> emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 10 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>  at 
> 

[jira] [Commented] (FLINK-9054) IllegalStateException: Buffer pool is destroyed

2019-01-29 Thread guchunhui (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755630#comment-16755630
 ] 

guchunhui commented on FLINK-9054:
--

Hi,I also have this problem  with Flink 1.6.

> IllegalStateException: Buffer pool is destroyed
> ---
>
> Key: FLINK-9054
> URL: https://issues.apache.org/jira/browse/FLINK-9054
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Major
> Fix For: 1.8.0
>
> Attachments: flink-conf.yaml
>
>
> Hi,
> I have a flink cluster running on 2 machines, say A and B.
> Job manager is running on A. There are 2 TaksManagers, one on each node.
> So effectively, A has a job manager and a task manager, while B has a task 
> manager.
> When I submit a job to the cluster, I see below exception and the job fails:
> 2018-03-22 17:16:52,205 WARN 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while 
> emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 10 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>  ... 19 more
>  
> The exception does not come when I run only one JobManager (only on machine 
> B).
>  
> I am attaching 

[GitHub] klion26 commented on issue #7596: Release 1.7 sortPartition bug

2019-01-29 Thread GitBox
klion26 commented on issue #7596: Release 1.7 sortPartition bug
URL: https://github.com/apache/flink/pull/7596#issuecomment-458785790
 
 
   @zhaijp You can report bug using 
[JIRA](https://issues.apache.org/jira/projects/FLINK). Maybe you could close 
this pr.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jinglining commented on issue #7582: [FLINK-11424][metrics]fix remove error type Gauge in DatadogHttpReporter

2019-01-29 Thread GitBox
jinglining commented on issue #7582: [FLINK-11424][metrics]fix remove error 
type Gauge in DatadogHttpReporter
URL: https://github.com/apache/flink/pull/7582#issuecomment-458785073
 
 
   The Travis CI build failed, because of YarnFlinkResourceManagerTest which 
case is always fail recently。


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10897) Support POJO state schema evolution

2019-01-29 Thread boshu Zheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16747409#comment-16747409
 ] 

boshu Zheng edited comment on FLINK-10897 at 1/30/19 2:13 AM:
--

I will try to follow the example that you presented at FlinkForwardChina. 
Additionally, I propose to add two fields to `PojoSerializer`, namely 
`previousFieldNames` and `previousFieldSerializers` which are snapshotted and 
restored by a new `PojoSerializerSnapshot`. When restoring states via 
`PojoSerializer#deserialize`, we should be able to figure out which fields have 
been removed/added or have a new type with these additional fields.
What do you think? [~tzulitai]


was (Author: kisimple):
I will try to follow the example that you presented at FlinkForwardChain. 
Additionally, I propose to add two fields to `PojoSerializer`, namely 
`previousFieldNames` and `previousFieldSerializers` which are snapshotted and 
restored by a new `PojoSerializerSnapshot`. When restoring states via 
`PojoSerializer#deserialize`, we should be able to figure out which fields have 
been removed/added or have a new type with these additional fields.
What do you think? [~tzulitai]

> Support POJO state schema evolution
> ---
>
> Key: FLINK-10897
> URL: https://issues.apache.org/jira/browse/FLINK-10897
> Project: Flink
>  Issue Type: Sub-task
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: boshu Zheng
>Priority: Major
>
> Main action point for this is to implement a separate POJO serializer that is 
> specifically used as the restore serializer.
> This restore POJO serializer should be able to read and dump values of fields 
> that no longer exists in the updated POJO schema, and assign default values 
> to newly added fields. Snapshot of the {{PojoSerializer}} should contain 
> sufficient information so that on restore, the information can be compared 
> with the adapted POJO class to figure out which fields have been removed / 
> added.
> Changing fields types is out of scope and should not be supported.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11380) YarnFlinkResourceManagerTest test case crashed

2019-01-29 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755576#comment-16755576
 ] 

lining commented on FLINK-11380:


I meet this too. https://api.travis-ci.org/v3/job/485210100/log.txt

> YarnFlinkResourceManagerTest test case crashed 
> ---
>
> Key: FLINK-11380
> URL: https://issues.apache.org/jira/browse/FLINK-11380
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: vinoyang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> context:
> {code:java}
> 17:18:44.415 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (default-test) on 
> project flink-yarn_2.11: There are test failures.
> 17:18:44.415 [ERROR] 
> 17:18:44.415 [ERROR] Please refer to 
> /home/travis/build/apache/flink/flink-yarn/target/surefire-reports for the 
> individual test results.
> 17:18:44.415 [ERROR] Please refer to dump files (if any exist) [date].dump, 
> [date]-jvmRun[N].dump and [date].dumpstream.
> 17:18:44.415 [ERROR] ExecutionException The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 17:18:44.415 [ERROR] Command was /bin/sh -c cd 
> /home/travis/build/apache/flink/flink-yarn && 
> /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 
> -XX:+UseG1GC -jar 
> /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar
>  /home/travis/build/apache/flink/flink-yarn/target/surefire 
> 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp 
> surefire_332496616764820906947tmp
> 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log
> 17:18:44.416 [ERROR] Process Exit Code: 243
> 17:18:44.416 [ERROR] Crashed tests:
> 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest
> 17:18:44.416 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 17:18:44.416 [ERROR] Command was /bin/sh -c cd 
> /home/travis/build/apache/flink/flink-yarn && 
> /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 
> -XX:+UseG1GC -jar 
> /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar
>  /home/travis/build/apache/flink/flink-yarn/target/surefire 
> 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp 
> surefire_332496616764820906947tmp
> 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log
> 17:18:44.416 [ERROR] Process Exit Code: 243
> 17:18:44.416 [ERROR] Crashed tests:
> 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkOnceMultiple(ForkStarter.java:382)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:297)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
> 17:18:44.416 [ERROR] at 
> 

[jira] [Comment Edited] (FLINK-11380) YarnFlinkResourceManagerTest test case crashed

2019-01-29 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755576#comment-16755576
 ] 

lining edited comment on FLINK-11380 at 1/30/19 2:12 AM:
-

I met this too. [https://api.travis-ci.org/v3/job/485210100/log.txt]


was (Author: lining):
I meet this too. https://api.travis-ci.org/v3/job/485210100/log.txt

> YarnFlinkResourceManagerTest test case crashed 
> ---
>
> Key: FLINK-11380
> URL: https://issues.apache.org/jira/browse/FLINK-11380
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: vinoyang
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> context:
> {code:java}
> 17:18:44.415 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (default-test) on 
> project flink-yarn_2.11: There are test failures.
> 17:18:44.415 [ERROR] 
> 17:18:44.415 [ERROR] Please refer to 
> /home/travis/build/apache/flink/flink-yarn/target/surefire-reports for the 
> individual test results.
> 17:18:44.415 [ERROR] Please refer to dump files (if any exist) [date].dump, 
> [date]-jvmRun[N].dump and [date].dumpstream.
> 17:18:44.415 [ERROR] ExecutionException The forked VM terminated without 
> properly saying goodbye. VM crash or System.exit called?
> 17:18:44.415 [ERROR] Command was /bin/sh -c cd 
> /home/travis/build/apache/flink/flink-yarn && 
> /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 
> -XX:+UseG1GC -jar 
> /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar
>  /home/travis/build/apache/flink/flink-yarn/target/surefire 
> 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp 
> surefire_332496616764820906947tmp
> 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log
> 17:18:44.416 [ERROR] Process Exit Code: 243
> 17:18:44.416 [ERROR] Crashed tests:
> 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest
> 17:18:44.416 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 17:18:44.416 [ERROR] Command was /bin/sh -c cd 
> /home/travis/build/apache/flink/flink-yarn && 
> /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 
> -XX:+UseG1GC -jar 
> /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar
>  /home/travis/build/apache/flink/flink-yarn/target/surefire 
> 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp 
> surefire_332496616764820906947tmp
> 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log
> 17:18:44.416 [ERROR] Process Exit Code: 243
> 17:18:44.416 [ERROR] Crashed tests:
> 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkOnceMultiple(ForkStarter.java:382)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:297)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> 17:18:44.416 [ERROR] at 
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
> 17:18:44.416 

[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-29 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r252089234
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Yes, I think so. I updated the code according to your suggestion. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijp opened a new pull request #7596: Release 1.7 sortPartition bug

2019-01-29 Thread GitBox
zhaijp opened a new pull request #7596: Release 1.7 sortPartition bug
URL: https://github.com/apache/flink/pull/7596
 
 
   ### I found a bug when I use sortPartition
   public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
DataSet text = env.fromElements("a b b b b c c");
DataSet> wordCounts = text.flatMap(new 
LineSPlitter())
.groupBy(0)
.sum(1)
.sortPartition(1, Order.DESCENDING);

wordCounts.print();

}
   }
   
   class LineSPlitter implements FlatMapFunction>{
   
@Override
public void flatMap(String line, Collector> 
out) throws Exception {
for(String word : line.split(" ")) {
out.collect(new Tuple2(word,1));
}
}

   }
   
   ### the console show this:
   (a,1)
   (b,4)
   (c,2)
   
   
   ### when i change the elements aa bb bb bb bb cc cc the result is right:
   (bb,4)
   (cc,2)
   (aa,1)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…

2019-01-29 Thread GitBox
TisonKun commented on a change in pull request #7570: [FLINK-11422] Prefer 
testing class to mock StreamTask in AbstractStre…
URL: https://github.com/apache/flink/pull/7570#discussion_r252085826
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
 ##
 @@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.MockStreamStatusMaintainer;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * A builder of {@link MockStreamTask}.
+ */
+public class MockStreamTaskBuilder {
+   private final Environment environment;
+   private String name;
+   private Object checkpointLock;
+   private StreamConfig config;
+   private ExecutionConfig executionConfig;
+   private CloseableRegistry closableRegistry;
+   private StreamStatusMaintainer streamStatusMaintainer;
+   private CheckpointStorage checkpointStorage;
+   private ProcessingTimeService processingTimeService;
+   private StreamTaskStateInitializer streamTaskStateInitializer;
+   private BiConsumer handleAsyncException;
+   private Map> accumulatorMap;
+
+   public MockStreamTaskBuilder(Environment environment) throws Exception {
+   // obligatory parameters
+   this.environment = environment;
+
+   // default values
 
 Review comment:
   Sure! At first I initialize streamStatusMaintainer with an anonymous inner 
class inherited from `StreamStatusMaintainer`. I think it is a bit complex thus 
gather the initializations into constructor. But now the default values are 
simple, and you're definitely right :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-29 Thread GitBox
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] 
Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252039395
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -626,6 +686,44 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   protected void rejectAllPendingSlotRequests(Exception e) {
+   slotManager.rejectAllPendingSlotRequests(e);
+   }
+
+   protected synchronized void recordFailure() {
+   if (!checkFailureRate) {
+   return;
+   }
+   if (isFailureTimestampFull()) {
+   taskExecutorFailureTimestamps.remove();
+   }
+   taskExecutorFailureTimestamps.add(System.currentTimeMillis());
+   }
+
+   protected boolean shouldRejectRequests() {
 
 Review comment:
   the rate calculation logic here share a lot with FailureRateRestartStrategy. 
Can we refactor the rate calculation code to a common class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-29 Thread GitBox
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] 
Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252038865
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -192,7 +235,17 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.jmResourceIdRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
-   infoMessageListeners = new ConcurrentHashMap<>(8);
+   this.infoMessageListeners = new ConcurrentHashMap<>(8);
+   this.failureInterval = failureInterval;
+   this.maximumFailureTaskExecutorPerInternal = 
maxFailurePerInterval;
+
+   if (maximumFailureTaskExecutorPerInternal > 0) {
+   this.taskExecutorFailureTimestamps = new 
ArrayDeque<>(maximumFailureTaskExecutorPerInternal);
 
 Review comment:
   How about 0? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-29 Thread GitBox
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] 
Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252025408
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ##
 @@ -83,8 +83,27 @@
 */
public static final ConfigOption MAX_FAILED_CONTAINERS =
key("yarn.maximum-failed-containers")
-   .noDefaultValue()
-   .withDescription("Maximum number of containers the system is 
going to reallocate in case of a failure.");
+   .noDefaultValue()
+   .withDescription("Maximum number of containers the 
system is going to reallocate in case of a failure.");
+
+   /**
+* The maximum number of failed YARN containers within an interval 
before entirely stopping
+* the YARN session / job on YARN.
+* By default, the value is -1
+*/
+   public static final ConfigOption 
MAX_FAILED_CONTAINERS_PER_INTERVAL =
+   key("yarn.maximum-failed-containers-per-interval")
+   .defaultValue(-1)
+   .withDescription("Maximum number of containers the system is 
going to reallocate in case of a failure in an interval.");
 
 Review comment:
   Please document what does -1 mean.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-29 Thread GitBox
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] 
Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252025274
 
 

 ##
 File path: docs/_includes/generated/mesos_configuration.html
 ##
 @@ -27,6 +27,11 @@
 -1
 The maximum number of failed workers before the cluster fails. 
May be set to -1 to disable this feature. This option is ignored unless Flink 
is in legacy mode.
 
+
+mesos.maximum-failed-workers-per-interval
+-1
+Maximum number of workers the system is going to reallocate in 
case of a failure in an interval.
 
 Review comment:
   Please document what does -1 & 0 mean.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-29 Thread GitBox
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] 
Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252024051
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
 ##
 @@ -99,6 +99,25 @@
.withDescription("The config parameter defining the 
Mesos artifact server port to use. Setting the port to" +
" 0 will let the OS choose an available port.");
 
+   /**
+* The maximum number of failed Mesos worker within an interval before 
entirely stopping
+* the Mesos session / job on Mesos.
+* By default, the value is -1
+*/
+   public static final ConfigOption 
MAX_FAILED_WORKERS_PER_INTERVAL =
+   key("mesos.maximum-failed-workers-per-interval")
+   .defaultValue(-1)
+   .withDescription("Maximum number of workers the system 
is going to reallocate in case of a failure in an interval.");
+
+   /**
+* The interval for measuring failure rate of containers in second unit.
+* By default, the value is 5 minutes.
+**/
+   public static final ConfigOption WORKERS_FAILURE_RATE_INTERVAL 
=
+   key("mesos.workers-failure-rate-interval")
+   .defaultValue(300)
+   .withDeprecatedKeys("The interval for measuring failure 
rate of workers");
 
 Review comment:
   withDescription here as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-29 Thread GitBox
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] 
Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252037753
 
 

 ##
 File path: docs/_includes/generated/yarn_config_configuration.html
 ##
 @@ -42,6 +47,11 @@
 (none)
 Maximum number of containers the system is going to reallocate 
in case of a failure.
 
+
+yarn.maximum-failed-containers-per-interval
+-1
 
 Review comment:
   Please document what does -1 and 0 mean.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252035267
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.schema.MessageType;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test case for conversion between Parquet schema and Flink date types.
+ */
+public class ParquetSchemaConverterTest extends TestUtil {
+   private final RowTypeInfo simplyRowType = new RowTypeInfo(
 
 Review comment:
   There is a utility class `Types` that eases the creation of type 
informations for all kinds of types (Row, Map, Array, Pojo, etc.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252016959
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+   private final Converter[] converters;
+   private final ParentDataHolder parentDataHolder;
+   private final TypeInformation typeInfo;
+   private Row currentRow;
+   private int posInParentRow;
+
+   public RowConverter(MessageType messageType, TypeInformation 
typeInfo) {
+   this(messageType, typeInfo, null, 0);
+   }
+
+   public RowConverter(GroupType schema, TypeInformation typeInfo, 
ParentDataHolder parent, int pos) {
+   this.typeInfo = typeInfo;
+   this.parentDataHolder = parent;
+   this.posInParentRow = pos;
+   this.converters = new Converter[schema.getFieldCount()];
+
+   int i = 0;
+   if (typeInfo.getArity() >= 1 && (typeInfo instanceof 
CompositeType)) {
+   for (Type field : schema.getFields()) {
+   converters[i] = createConverter(field, i, 
((CompositeType) typeInfo).getTypeAt(i), this);
+   i++;
+   }
+   }
+   }
+
+   private static Converter createConverter(
+   Type field,
+   int fieldPos,
+   TypeInformation typeInformation,
+   ParentDataHolder parentDataHolder) {
+   if (field.isPrimitive()) {
+   return new RowConverter.RowPrimitiveConverter(field, 
parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof MapTypeInfo) {
+   return new RowConverter.MapConverter((GroupType) field, 
(MapTypeInfo) typeInformation,
+   parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof BasicArrayTypeInfo) {
+   Type elementType = 
field.asGroupType().getFields().get(0);
+   Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+   if (typeClass.equals(Character.class)) {
+   return new 
RowConverter.BasicArrayConverter((BasicArrayTypeInfo) 
typeInformation, elementType,
+   Character.class, parentDataHolder, 
fieldPos);
+   } else if (typeClass.equals(Boolean.class)) {
+   return new 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r251979596
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
 
 Review comment:
   Logger is not used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252025737
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test cases for reading Map from Parquet files.
+ */
+public class ParquetMapInputFormatTest {
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testReadMapFromNestedRecord() throws IOException {
 
 Review comment:
   Add a test for projected reads, i.e., reading a subset of the fields 
contained in the file


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252016613
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+   private final Converter[] converters;
+   private final ParentDataHolder parentDataHolder;
+   private final TypeInformation typeInfo;
+   private Row currentRow;
+   private int posInParentRow;
+
+   public RowConverter(MessageType messageType, TypeInformation 
typeInfo) {
+   this(messageType, typeInfo, null, 0);
+   }
+
+   public RowConverter(GroupType schema, TypeInformation typeInfo, 
ParentDataHolder parent, int pos) {
+   this.typeInfo = typeInfo;
+   this.parentDataHolder = parent;
+   this.posInParentRow = pos;
+   this.converters = new Converter[schema.getFieldCount()];
+
+   int i = 0;
+   if (typeInfo.getArity() >= 1 && (typeInfo instanceof 
CompositeType)) {
+   for (Type field : schema.getFields()) {
+   converters[i] = createConverter(field, i, 
((CompositeType) typeInfo).getTypeAt(i), this);
+   i++;
+   }
+   }
+   }
+
+   private static Converter createConverter(
+   Type field,
+   int fieldPos,
+   TypeInformation typeInformation,
+   ParentDataHolder parentDataHolder) {
+   if (field.isPrimitive()) {
+   return new RowConverter.RowPrimitiveConverter(field, 
parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof MapTypeInfo) {
+   return new RowConverter.MapConverter((GroupType) field, 
(MapTypeInfo) typeInformation,
+   parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof BasicArrayTypeInfo) {
+   Type elementType = 
field.asGroupType().getFields().get(0);
+   Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+   if (typeClass.equals(Character.class)) {
+   return new 
RowConverter.BasicArrayConverter((BasicArrayTypeInfo) 
typeInformation, elementType,
+   Character.class, parentDataHolder, 
fieldPos);
+   } else if (typeClass.equals(Boolean.class)) {
+   return new 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252036898
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.generated.ArrayItem;
+import org.apache.flink.formats.parquet.generated.Bar;
+import org.apache.flink.formats.parquet.generated.MapItem;
+import org.apache.flink.formats.parquet.generated.NestedRecord;
+import org.apache.flink.formats.parquet.generated.SimpleRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.NULL;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utilities for testing schema conversion and test parquet file creation.
+ */
+public class TestUtil {
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+   public static final Configuration TEST_CONFIGURATION = new 
Configuration();
+   public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
+   public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
+
+   protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL).named("foo"),
+   Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.UTF8).named("bar"),
+   Types.optionalGroup()
+   
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REPEATED).named("array"))
+   .named("arr")
+   };
+
+   protected static final Type[] SIMPLE_STANDARD_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.INT_64).named("foo"),
+   Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.UTF8).named("bar"),
+   Types.optionalGroup()
+   .addField(Types.repeatedGroup().addField(
+   
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED)
+   
.as(OriginalType.INT_64).named("element")).named("list")).as(OriginalType.LIST)
+   .named("arr")};
+
+   protected static final Type[] NESTED_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.INT_64).named("foo"),
+   

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252018181
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+   private final Converter[] converters;
+   private final ParentDataHolder parentDataHolder;
+   private final TypeInformation typeInfo;
+   private Row currentRow;
+   private int posInParentRow;
+
+   public RowConverter(MessageType messageType, TypeInformation 
typeInfo) {
+   this(messageType, typeInfo, null, 0);
+   }
+
+   public RowConverter(GroupType schema, TypeInformation typeInfo, 
ParentDataHolder parent, int pos) {
+   this.typeInfo = typeInfo;
+   this.parentDataHolder = parent;
+   this.posInParentRow = pos;
+   this.converters = new Converter[schema.getFieldCount()];
+
+   int i = 0;
+   if (typeInfo.getArity() >= 1 && (typeInfo instanceof 
CompositeType)) {
+   for (Type field : schema.getFields()) {
+   converters[i] = createConverter(field, i, 
((CompositeType) typeInfo).getTypeAt(i), this);
+   i++;
+   }
+   }
+   }
+
+   private static Converter createConverter(
+   Type field,
+   int fieldPos,
+   TypeInformation typeInformation,
+   ParentDataHolder parentDataHolder) {
+   if (field.isPrimitive()) {
+   return new RowConverter.RowPrimitiveConverter(field, 
parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof MapTypeInfo) {
+   return new RowConverter.MapConverter((GroupType) field, 
(MapTypeInfo) typeInformation,
+   parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof BasicArrayTypeInfo) {
+   Type elementType = 
field.asGroupType().getFields().get(0);
+   Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+   if (typeClass.equals(Character.class)) {
+   return new 
RowConverter.BasicArrayConverter((BasicArrayTypeInfo) 
typeInformation, elementType,
+   Character.class, parentDataHolder, 
fieldPos);
+   } else if (typeClass.equals(Boolean.class)) {
+   return new 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252018047
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+   private final Converter[] converters;
+   private final ParentDataHolder parentDataHolder;
+   private final TypeInformation typeInfo;
+   private Row currentRow;
+   private int posInParentRow;
+
+   public RowConverter(MessageType messageType, TypeInformation 
typeInfo) {
+   this(messageType, typeInfo, null, 0);
+   }
+
+   public RowConverter(GroupType schema, TypeInformation typeInfo, 
ParentDataHolder parent, int pos) {
+   this.typeInfo = typeInfo;
+   this.parentDataHolder = parent;
+   this.posInParentRow = pos;
+   this.converters = new Converter[schema.getFieldCount()];
+
+   int i = 0;
+   if (typeInfo.getArity() >= 1 && (typeInfo instanceof 
CompositeType)) {
+   for (Type field : schema.getFields()) {
+   converters[i] = createConverter(field, i, 
((CompositeType) typeInfo).getTypeAt(i), this);
+   i++;
+   }
+   }
+   }
+
+   private static Converter createConverter(
+   Type field,
+   int fieldPos,
+   TypeInformation typeInformation,
+   ParentDataHolder parentDataHolder) {
+   if (field.isPrimitive()) {
+   return new RowConverter.RowPrimitiveConverter(field, 
parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof MapTypeInfo) {
+   return new RowConverter.MapConverter((GroupType) field, 
(MapTypeInfo) typeInformation,
+   parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof BasicArrayTypeInfo) {
+   Type elementType = 
field.asGroupType().getFields().get(0);
+   Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+   if (typeClass.equals(Character.class)) {
+   return new 
RowConverter.BasicArrayConverter((BasicArrayTypeInfo) 
typeInformation, elementType,
+   Character.class, parentDataHolder, 
fieldPos);
+   } else if (typeClass.equals(Boolean.class)) {
+   return new 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252033885
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test case for reading parquet records.
+ */
+public class ParquetRecordReaderTest extends TestUtil {
+
+   @Test
+   public void testReadSimpleGroup() throws IOException {
+   temp.create();
+
+   Long[] array = {1L};
+   GenericData.Record record = new 
GenericRecordBuilder(SIMPLE_SCHEMA)
+   .set("bar", "test")
+   .set("foo", 32L)
+   .set("arr", array).build();
+
+   Path path = createTempParquetFile(temp, SIMPLE_SCHEMA, 
Collections.singletonList(record));
+   MessageType readSchema = (new 
AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+   ParquetRecordReader rowReader = new 
ParquetRecordReader(new RowReadSupport(), readSchema);
+
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+   ParquetReadOptions options = 
ParquetReadOptions.builder().build();
+   ParquetFileReader fileReader = new ParquetFileReader(inputFile, 
options);
+
+   rowReader.initialize(fileReader, TEST_CONFIGURATION);
+   assertEquals(true, rowReader.hasNextRecord());
+
+   Row row = rowReader.nextRecord();
+   assertEquals(3, row.getArity());
+   assertEquals(32L, row.getField(0));
+   assertEquals("test", row.getField(1));
+   assertArrayEquals(array, (Long[]) row.getField(2));
+   assertEquals(true, rowReader.reachEnd());
+   }
+
+   @Test
+   public void testReadMultipleSimpleGroup() throws IOException {
+   temp.create();
+
+   Long[] array = {1L};
+
+   List records = new ArrayList<>();
+   for (int i = 0; i < 100; i++) {
+   GenericData.Record record = new 
GenericRecordBuilder(SIMPLE_SCHEMA)
+   .set("bar", "test")
+   .set("foo", i)
+   .set("arr", array).build();
+   records.add(record);
+   }
+
+   Path path = createTempParquetFile(temp, SIMPLE_SCHEMA, records);
+   MessageType readSchema = (new 
AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+   ParquetRecordReader rowReader = new 
ParquetRecordReader(new RowReadSupport(), readSchema);
+
+   InputFile inputFile =
+   HadoopInputFile.fromPath(new 
org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+   ParquetReadOptions options = 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252037449
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.generated.ArrayItem;
+import org.apache.flink.formats.parquet.generated.Bar;
+import org.apache.flink.formats.parquet.generated.MapItem;
+import org.apache.flink.formats.parquet.generated.NestedRecord;
+import org.apache.flink.formats.parquet.generated.SimpleRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.NULL;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utilities for testing schema conversion and test parquet file creation.
+ */
+public class TestUtil {
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+   public static final Configuration TEST_CONFIGURATION = new 
Configuration();
+   public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
+   public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
+
+   protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL).named("foo"),
+   Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.UTF8).named("bar"),
+   Types.optionalGroup()
+   
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REPEATED).named("array"))
+   .named("arr")
+   };
+
+   protected static final Type[] SIMPLE_STANDARD_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.INT_64).named("foo"),
+   Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.UTF8).named("bar"),
+   Types.optionalGroup()
+   .addField(Types.repeatedGroup().addField(
+   
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED)
+   
.as(OriginalType.INT_64).named("element")).named("list")).as(OriginalType.LIST)
+   .named("arr")};
+
+   protected static final Type[] NESTED_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.INT_64).named("foo"),
+   

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252036720
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.generated.ArrayItem;
+import org.apache.flink.formats.parquet.generated.Bar;
+import org.apache.flink.formats.parquet.generated.MapItem;
+import org.apache.flink.formats.parquet.generated.NestedRecord;
+import org.apache.flink.formats.parquet.generated.SimpleRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.NULL;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utilities for testing schema conversion and test parquet file creation.
+ */
+public class TestUtil {
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+   public static final Configuration TEST_CONFIGURATION = new 
Configuration();
+   public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
+   public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
+
+   protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL).named("foo"),
+   Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.UTF8).named("bar"),
+   Types.optionalGroup()
+   
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.REPEATED).named("array"))
+   .named("arr")
+   };
+
+   protected static final Type[] SIMPLE_STANDARD_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.INT_64).named("foo"),
+   Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.UTF8).named("bar"),
+   Types.optionalGroup()
+   .addField(Types.repeatedGroup().addField(
+   
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED)
+   
.as(OriginalType.INT_64).named("element")).named("list")).as(OriginalType.LIST)
+   .named("arr")};
+
+   protected static final Type[] NESTED_TYPES = {
+   Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
Type.Repetition.OPTIONAL)
+   .as(OriginalType.INT_64).named("foo"),
+   

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252026077
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test cases for reading Map from Parquet files.
+ */
+public class ParquetMapInputFormatTest {
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testReadMapFromNestedRecord() throws IOException {
+   temp.create();
+   Tuple3, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
+   Path path = TestUtil.createTempParquetFile(temp, 
TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
+   MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+   ParquetMapInputFormat mapInputFormat = new 
ParquetMapInputFormat(path, nestedType);
+
+   RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+   
Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+   .when(mockContext).getMetricGroup();
+   mapInputFormat.setRuntimeContext(mockContext);
+   FileInputSplit[] splits = mapInputFormat.createInputSplits(1);
+   assertEquals(1, splits.length);
+   mapInputFormat.open(splits[0]);
+
+   Map map = mapInputFormat.nextRecord(null);
+   assertNotNull(map);
 
 Review comment:
   check size of map


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r251984469
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowReadSupport.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Parquet {@link ReadSupport} implementation for reading Parquet record as 
{@link Row}.
+ */
+public class RowReadSupport extends ReadSupport {
+
+   private TypeInformation returnTypeInfo;
+
+   @Override
+   public ReadContext init(InitContext initContext) {
+   checkNotNull(initContext, "initContext");
+   returnTypeInfo = 
ParquetSchemaConverter.fromParquetType(initContext.getFileSchema());
+   return new ReadContext(initContext.getFileSchema());
+   }
+
+   @Override
+   public RecordMaterializer prepareForRead(
+   Configuration configuration, Map 
keyValueMetaData,
+   MessageType fileSchema, ReadContext readContext) {
+   return new RowMaterializer(fileSchema, returnTypeInfo);
 
 Review comment:
   use `ReadContext.getSchemaForRead()` instead of `fileSchema`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252013399
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+   private final Converter[] converters;
+   private final ParentDataHolder parentDataHolder;
+   private final TypeInformation typeInfo;
+   private Row currentRow;
+   private int posInParentRow;
+
+   public RowConverter(MessageType messageType, TypeInformation 
typeInfo) {
+   this(messageType, typeInfo, null, 0);
+   }
+
+   public RowConverter(GroupType schema, TypeInformation typeInfo, 
ParentDataHolder parent, int pos) {
+   this.typeInfo = typeInfo;
+   this.parentDataHolder = parent;
+   this.posInParentRow = pos;
+   this.converters = new Converter[schema.getFieldCount()];
+
+   int i = 0;
+   if (typeInfo.getArity() >= 1 && (typeInfo instanceof 
CompositeType)) {
+   for (Type field : schema.getFields()) {
+   converters[i] = createConverter(field, i, 
((CompositeType) typeInfo).getTypeAt(i), this);
+   i++;
+   }
+   }
+   }
+
+   private static Converter createConverter(
+   Type field,
+   int fieldPos,
+   TypeInformation typeInformation,
+   ParentDataHolder parentDataHolder) {
+   if (field.isPrimitive()) {
+   return new RowConverter.RowPrimitiveConverter(field, 
parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof MapTypeInfo) {
+   return new RowConverter.MapConverter((GroupType) field, 
(MapTypeInfo) typeInformation,
+   parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof BasicArrayTypeInfo) {
+   Type elementType = 
field.asGroupType().getFields().get(0);
+   Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+   if (typeClass.equals(Character.class)) {
+   return new 
RowConverter.BasicArrayConverter((BasicArrayTypeInfo) 
typeInformation, elementType,
+   Character.class, parentDataHolder, 
fieldPos);
+   } else if (typeClass.equals(Boolean.class)) {
+   return new 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252027651
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
 ##
 @@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.pojo.PojoSimpleRecord;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for reading Pojo from Parquet files.
+ */
+public class ParquetPojoInputFormatTest {
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+
+   @Test
+   public void testReadPojoFromSimpleRecord() throws IOException, 
NoSuchFieldException {
+   temp.create();
+   Tuple3, SpecificRecord, Row> 
simple = TestUtil.getSimpleRecordTestData();
+   MessageType messageType = 
SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+   Path path = TestUtil.createTempParquetFile(temp, 
TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+
+   List fieldList = new ArrayList<>();
+   fieldList.add(new 
PojoField(PojoSimpleRecord.class.getField("foo"), 
BasicTypeInfo.LONG_TYPE_INFO));
+   fieldList.add(new 
PojoField(PojoSimpleRecord.class.getField("bar"), 
BasicTypeInfo.STRING_TYPE_INFO));
+   fieldList.add(new 
PojoField(PojoSimpleRecord.class.getField("arr"),
+   BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
+
+   ParquetPojoInputFormat pojoInputFormat =
+   new ParquetPojoInputFormat(path, 
messageType, new PojoTypeInfo(
 
 Review comment:
   A `PojoTypeInfo` can be created with `Types.POJO(PojoSimpleRecord.class)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252014383
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record 
into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+   private final Converter[] converters;
+   private final ParentDataHolder parentDataHolder;
+   private final TypeInformation typeInfo;
+   private Row currentRow;
+   private int posInParentRow;
+
+   public RowConverter(MessageType messageType, TypeInformation 
typeInfo) {
+   this(messageType, typeInfo, null, 0);
+   }
+
+   public RowConverter(GroupType schema, TypeInformation typeInfo, 
ParentDataHolder parent, int pos) {
+   this.typeInfo = typeInfo;
+   this.parentDataHolder = parent;
+   this.posInParentRow = pos;
+   this.converters = new Converter[schema.getFieldCount()];
+
+   int i = 0;
+   if (typeInfo.getArity() >= 1 && (typeInfo instanceof 
CompositeType)) {
+   for (Type field : schema.getFields()) {
+   converters[i] = createConverter(field, i, 
((CompositeType) typeInfo).getTypeAt(i), this);
+   i++;
+   }
+   }
+   }
+
+   private static Converter createConverter(
+   Type field,
+   int fieldPos,
+   TypeInformation typeInformation,
+   ParentDataHolder parentDataHolder) {
+   if (field.isPrimitive()) {
+   return new RowConverter.RowPrimitiveConverter(field, 
parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof MapTypeInfo) {
+   return new RowConverter.MapConverter((GroupType) field, 
(MapTypeInfo) typeInformation,
+   parentDataHolder, fieldPos);
+   } else if (typeInformation instanceof BasicArrayTypeInfo) {
+   Type elementType = 
field.asGroupType().getFields().get(0);
+   Class typeClass = ((BasicArrayTypeInfo) 
typeInformation).getComponentInfo().getTypeClass();
+   if (typeClass.equals(Character.class)) {
+   return new 
RowConverter.BasicArrayConverter((BasicArrayTypeInfo) 
typeInformation, elementType,
+   Character.class, parentDataHolder, 
fieldPos);
+   } else if (typeClass.equals(Boolean.class)) {
+   return new 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252032139
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.generated.SimpleRecord;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Simple test case for reading {@link org.apache.flink.types.Row} from 
Parquet files.
+ */
+public class ParquetRowInputFormatTest {
 
 Review comment:
   I think it would be good to add tests for the following cases:
   * Reading multiple records and validating the correctness of the data (the 
fault tolerance test is reading multiple records but not checking the returned 
values)
   * Test multiple files, each file should be handled as a separate split
   * Test projection (`ParquetRowInputFormat.selectFields()`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252028083
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java
 ##
 @@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.pojo.PojoSimpleRecord;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases for reading Pojo from Parquet files.
+ */
+public class ParquetPojoInputFormatTest {
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+
+   @Test
+   public void testReadPojoFromSimpleRecord() throws IOException, 
NoSuchFieldException {
 
 Review comment:
   Add test with projection: `ParquetPojoInputFormat.selectFields(...)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2019-01-29 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r252026568
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test cases for reading Map from Parquet files.
+ */
+public class ParquetMapInputFormatTest {
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   @ClassRule
+   public static TemporaryFolder temp = new TemporaryFolder();
+
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testReadMapFromNestedRecord() throws IOException {
+   temp.create();
+   Tuple3, SpecificRecord, Row> 
nested = TestUtil.getNestedRecordTestData();
+   Path path = TestUtil.createTempParquetFile(temp, 
TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
+   MessageType nestedType = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+   ParquetMapInputFormat mapInputFormat = new 
ParquetMapInputFormat(path, nestedType);
+
+   RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+   
Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+   .when(mockContext).getMetricGroup();
+   mapInputFormat.setRuntimeContext(mockContext);
+   FileInputSplit[] splits = mapInputFormat.createInputSplits(1);
+   assertEquals(1, splits.length);
+   mapInputFormat.open(splits[0]);
+
+   Map map = mapInputFormat.nextRecord(null);
+   assertNotNull(map);
+   assertArrayEquals((Long[]) map.get("arr"), (Long[]) 
nested.f2.getField(3));
+   assertArrayEquals((String[]) map.get("strArray"), (String[]) 
nested.f2.getField(4));
+
+   Map mapItem = (Map) ((Map) 
map.get("nestedMap")).get("mapItem");
 
 Review comment:
   check size of mapItem


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-2491) Checkpointing only works if all operators/tasks are still running

2019-01-29 Thread JC (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755385#comment-16755385
 ] 

JC commented on FLINK-2491:
---

I also came across this problem trying exactly the same as [~canhuang] , i want 
to expand the parallelism beyond my amount of kafka partitions, i am using 
Flink 1.5.4 with Apache Beam.

Thanks

> Checkpointing only works if all operators/tasks are still running
> -
>
> Key: FLINK-2491
> URL: https://issues.apache.org/jira/browse/FLINK-2491
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Márton Balassi
>Priority: Critical
> Attachments: fix_checkpoint_not_working_if_tasks_are_finished.patch
>
>
> While implementing a test case for the Kafka Consumer, I came across the 
> following bug:
> Consider the following topology, with the operator parallelism in parentheses:
> Source (2) --> Sink (1).
> In this setup, the {{snapshotState()}} method is called on the source, but 
> not on the Sink.
> The sink receives the generated data.
> only one of the two sources is generating data.
> I've implemented a test case for this, you can find it here: 
> https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2019-01-29 Thread GitBox
jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r251966785
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -1000,6 +1005,21 @@ public void notifyAllocationFailure(AllocationID 
allocationID, Exception cause)
internalFailAllocation(allocationID, cause);
}
 
+   @Override
+   public CompletableFuture updateAggregate(String aggregateName, 
Object aggregand, byte[] serializedAggregateFunction)
+   throws IOException, ClassNotFoundException {
+
+   AggregateFunction aggregateFunction = 
InstantiationUtil.deserializeObject(serializedAggregateFunction, 
userCodeLoader);
+
+   Object accumulator = accumulators.get(aggregateName);
 
 Review comment:
   Also:  
https://github.com/jgrier/flink/blob/d9b28e817351eb2eb6b4cdd9597061713d9160e8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java#L47-L47


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11459) Presto S3 does not show errors due to missing credentials with minio

2019-01-29 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-11459:

Description: 
It seems that when using minio for S3-like storage and with mis-configurations 
such as missing (maybe also wrong) credentials gets into a failing state but 
with no reason for it:
{code}
...
2019-01-29 15:43:27,676 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.heap.mb, 353
2019-01-29 15:43:27,738 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.heap.mb, 429
2019-01-29 15:43:27,758 INFO  org.apache.flink.api.java.ExecutionEnvironment
   [] - The job has 0 registered types and 0 default Kryo serializers
2019-01-29 15:43:29,943 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2093.606], CredentialsRequestTime=[2092.961], 
2019-01-29 15:43:29,956 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2115.551], 
2019-01-29 15:43:31,946 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[3597.992], CredentialsRequestTime=[3597.788], 
2019-01-29 15:43:31,958 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[3610.417], 
2019-01-29 15:43:33,954 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2907.39], CredentialsRequestTime=[2906.853], 
2019-01-29 15:43:33,963 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2917.786], 
2019-01-29 15:43:36,133 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2005.692], CredentialsRequestTime=[2004.942], 
2019-01-29 15:43:36,156 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2029.473], 
2019-01-29 15:43:38,142 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2077.053], CredentialsRequestTime=[2076.05], 
2019-01-29 15:43:38,164 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2092.878], 
2019-01-29 15:43:42,181 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2005.91], CredentialsRequestTime=[2005.164], 
2019-01-29 15:43:42,186 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2011.204], 
2019-01-29 15:43:44,262 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2007.886], CredentialsRequestTime=[2007.165], 
2019-01-29 15:43:44,276 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2024.312], 
2019-01-29 15:43:44,585 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED 
SIGNAL 15: SIGTERM. Shutting down as requested.
2019-01-29 15:43:44,628 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
   [] - Shutting down BLOB cache
2019-01-29 15:43:44,661 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BLOB server at 0.0.0.0:6124
{code}

With AWS S3, it is actually printing an exception instead:
{code}
2019-01-29 19:24:39,968 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: rest.port, 8081
2019-01-29 19:24:39,990 INFO  org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2019-01-29 19:24:43,117 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2047.535], CredentialsRequestTime=[2033.619], 
2019-01-29 19:24:43,118 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2049.826], 
2019-01-29 19:24:46,215 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2003.168], CredentialsRequestTime=[2002.836], 
2019-01-29 19:24:46,216 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2004.182], 
2019-01-29 19:24:50,384 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2003.15], CredentialsRequestTime=[2002.803], 
2019-01-29 19:24:50,384 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2004.308], 
2019-01-29 19:24:56,691 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2002.596], CredentialsRequestTime=[2002.45], 
2019-01-29 19:24:56,691 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2003.177], 
2019-01-29 19:25:07,058 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2003.26], 

[GitHub] jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster

2019-01-29 Thread GitBox
jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] 
Add source watermark tracking to the JobMaster
URL: https://github.com/apache/flink/pull/7099#discussion_r251964188
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -1000,6 +1005,21 @@ public void notifyAllocationFailure(AllocationID 
allocationID, Exception cause)
internalFailAllocation(allocationID, cause);
}
 
+   @Override
+   public CompletableFuture updateAggregate(String aggregateName, 
Object aggregand, byte[] serializedAggregateFunction)
+   throws IOException, ClassNotFoundException {
+
+   AggregateFunction aggregateFunction = 
InstantiationUtil.deserializeObject(serializedAggregateFunction, 
userCodeLoader);
+
+   Object accumulator = accumulators.get(aggregateName);
 
 Review comment:
   I believe it's already all synchronized.  The RpcService is implemented as 
as a single Akka actor and thus access is already serialized.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11459) Presto S3 does not show errors due to missing credentials with minio

2019-01-29 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-11459:
---

 Summary: Presto S3 does not show errors due to missing credentials 
with minio
 Key: FLINK-11459
 URL: https://issues.apache.org/jira/browse/FLINK-11459
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.6.2
Reporter: Nico Kruber


It seems that when using minio for S3-like storage and with mis-configurations 
such as missing (maybe also wrong) credentials gets into a failing state but 
with no reason for it:
{code}
...
2019-01-29 15:43:27,676 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.heap.mb, 353
2019-01-29 15:43:27,738 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.heap.mb, 429
2019-01-29 15:43:27,758 INFO  org.apache.flink.api.java.ExecutionEnvironment
   [] - The job has 0 registered types and 0 default Kryo serializers
2019-01-29 15:43:29,943 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2093.606], CredentialsRequestTime=[2092.961], 
2019-01-29 15:43:29,956 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2115.551], 
2019-01-29 15:43:31,946 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[3597.992], CredentialsRequestTime=[3597.788], 
2019-01-29 15:43:31,958 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[3610.417], 
2019-01-29 15:43:33,954 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2907.39], CredentialsRequestTime=[2906.853], 
2019-01-29 15:43:33,963 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2917.786], 
2019-01-29 15:43:36,133 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2005.692], CredentialsRequestTime=[2004.942], 
2019-01-29 15:43:36,156 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2029.473], 
2019-01-29 15:43:38,142 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2077.053], CredentialsRequestTime=[2076.05], 
2019-01-29 15:43:38,164 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2092.878], 
2019-01-29 15:43:42,181 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2005.91], CredentialsRequestTime=[2005.164], 
2019-01-29 15:43:42,186 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2011.204], 
2019-01-29 15:43:44,262 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2007.886], CredentialsRequestTime=[2007.165], 
2019-01-29 15:43:44,276 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - 
ClientExecuteTime=[2024.312], 
2019-01-29 15:43:44,585 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED 
SIGNAL 15: SIGTERM. Shutting down as requested.
2019-01-29 15:43:44,628 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
   [] - Shutting down BLOB cache
2019-01-29 15:43:44,661 INFO  org.apache.flink.runtime.blob.BlobServer  
   [] - Stopped BLOB server at 0.0.0.0:6124
{code}

With AWS S3, it is actually printing an exception instead:
{code}
2019-01-29 19:24:39,968 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: rest.port, 8081
2019-01-29 19:24:39,990 INFO  org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2019-01-29 19:24:43,117 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2047.535], CredentialsRequestTime=[2033.619], 
2019-01-29 19:24:43,118 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2049.826], 
2019-01-29 19:24:46,215 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2003.168], CredentialsRequestTime=[2002.836], 
2019-01-29 19:24:46,216 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2004.182], 
2019-01-29 19:24:50,384 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2003.15], CredentialsRequestTime=[2002.803], 
2019-01-29 19:24:50,384 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2004.308], 
2019-01-29 19:24:56,691 INFO  
org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - 
ClientExecuteTime=[2002.596], CredentialsRequestTime=[2002.45], 
2019-01-29 19:24:56,691 INFO  

[GitHub] stevenzwu closed pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-29 Thread GitBox
stevenzwu closed pull request #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-29 Thread GitBox
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-458650005
 
 
   @tzulitai thx. let me close this PR then. @tillrohrmann  can submit a diff 
PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-29 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r251924624
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   I think that this comment of mine might be partially invalid. The only 
optimisation rule that we support around this logical node is 
`CalcUpsertToRetractionTransposeRule` which doesn't use this accept. But I 
guess for the sake of the consistency, it should be implemented one way or 
another, for example by simply `throw new UnsupportedOperationException()`. 
Otherwise this is a "silent" land mine waiting for someone to step in?
   
   Or am I missing something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-29 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r251922093
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   As far as I can see `FlinkLogicalNativeTableScan` is the same to what I 
proposed in the comment above, right? The only difference is 
`estimateRowSize(child.getRowType)` vs `estimateRowSize(getRowType)`, which 
probably both are equivalent in this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11042) testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid test

2019-01-29 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755198#comment-16755198
 ] 

Piotr Nowojski edited comment on FLINK-11042 at 1/29/19 4:48 PM:
-

I am waiting for someone to review my PR that drops this test. I asked 
[~tzulitai] but if you can do it, you could do it as well :)


was (Author: pnowojski):
I was waiting for someone to review my PR that drops this test. I asked 
[~tzulitai] but if you can do it, you could do it as well :)

> testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid 
> test
> ---
>
> Key: FLINK-11042
> URL: https://issues.apache.org/jira/browse/FLINK-11042
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> main point of testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is 
> to fail transaction coordinator (by using 
> {{kafkaProducer.getTransactionCoordinatorId();}} ) and we expect that this 
> will cause failure of Flink job. However that's not always the case. Maybe 
> because transaction coordinator can be re-elected before {{KafkaProducer}} 
> even notices it or for whatever the reason, sometimes the failure is not 
> happening.
> Because of a bug in the test, if failure hasn't happened, the test will not 
> fail.
> Generally speaking this test is invalid and should be dropped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11042) testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid test

2019-01-29 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755198#comment-16755198
 ] 

Piotr Nowojski commented on FLINK-11042:


I was waiting for someone to review my PR that drops this test. I asked 
[~tzulitai] but if you can do it, you could do it as well :)

> testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid 
> test
> ---
>
> Key: FLINK-11042
> URL: https://issues.apache.org/jira/browse/FLINK-11042
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.8.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> main point of testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is 
> to fail transaction coordinator (by using 
> {{kafkaProducer.getTransactionCoordinatorId();}} ) and we expect that this 
> will cause failure of Flink job. However that's not always the case. Maybe 
> because transaction coordinator can be re-elected before {{KafkaProducer}} 
> even notices it or for whatever the reason, sometimes the failure is not 
> happening.
> Because of a bug in the test, if failure hasn't happened, the test will not 
> fail.
> Generally speaking this test is invalid and should be dropped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…

2019-01-29 Thread GitBox
pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer 
testing class to mock StreamTask in AbstractStre…
URL: https://github.com/apache/flink/pull/7570#discussion_r251916174
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
 ##
 @@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.MockStreamStatusMaintainer;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * A builder of {@link MockStreamTask}.
+ */
+public class MockStreamTaskBuilder {
+   private final Environment environment;
+   private String name;
+   private Object checkpointLock;
+   private StreamConfig config;
+   private ExecutionConfig executionConfig;
+   private CloseableRegistry closableRegistry;
+   private StreamStatusMaintainer streamStatusMaintainer;
+   private CheckpointStorage checkpointStorage;
+   private ProcessingTimeService processingTimeService;
+   private StreamTaskStateInitializer streamTaskStateInitializer;
+   private BiConsumer handleAsyncException;
+   private Map> accumulatorMap;
+
+   public MockStreamTaskBuilder(Environment environment) throws Exception {
+   // obligatory parameters
+   this.environment = environment;
+
+   // default values
 
 Review comment:
   nit: you could inline the default values with the fields definitions like
   ```
   private String name = "Mock Task"
   ```
   it would simplify/shorten the code by ~15 lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11333) First-class support for Protobuf types with evolvable schema

2019-01-29 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755185#comment-16755185
 ] 

Yun Tang commented on FLINK-11333:
--

I planed to use avro-protobuf to extract Protobuf message's schema to verify 
whether schema evolvable in avro's view. However,  from [the 
comment|https://github.com/confluentinc/schema-registry/pull/672#issuecomment-393348920]
 described, avro defines different rules for compatibility. For example, Long 
cannot be treated as compatibly with Integer for avro, while protobuf could. 
And I write a simple program to verify this, there really exists difference.

I'm afraid the idea to use avro to judge whether two protobuf message 
compatible is not correct. As far as I could see, check protobuf message 
compatibility seems a non-trivial work, we might have to leave schema evolution 
check of protobuf during job runtime.

> First-class support for Protobuf types with evolvable schema
> 
>
> Key: FLINK-11333
> URL: https://issues.apache.org/jira/browse/FLINK-11333
> Project: Flink
>  Issue Type: Sub-task
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Yun Tang
>Priority: Major
>
> I think we have more and more users who are thinking about using Protobuf for 
> their state types.
> Right now, Protobuf isn't supported directly in Flink. The only way to use 
> Protobuf for a type is to register it via Kryo: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html.
> Likewise for Avro types, we should be able to natively support Protobuf, 
> having a {{ProtobufSerializer}} that handles serialization of Protobuf types. 
> The serializer should also write necessary information in its snapshot, to 
> enable schema evolution for it in the future. For Protobuf, this should 
> almost work out-of-the-box.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-29 Thread GitBox
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 
can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-458609356
 
 
   Sorry, I forgot to respond to that part:
   > I have started to work on the last test FlinkKafkaProducer011 --> 
FlinkKafkaProducer  But I'm quickly stuck with an exception when initializing 
the testHarness ( created with FlinkKafkaProducer ) with the savepoint of 
FlinkKakfaProducer011. Is this action feasible with the testHarness API or 
there is a turn around to this issue ?
   
   I don't know. I'm not aware of any test that attempts to do that, so maybe 
this is a missing feature and maybe you are right that it has to be fixed 
somehow for the test.
   
   I have also thought more about the issue with timeouts on pending 
transactions. This means that the only way to test for 1.7 -> 1.8 migration 
with pending transactions, would be some awful code that downloads the Flink 
1.7 sources/binaries and creates a savepoint during the test execution. I think 
that would be extremely difficult to do so we can ignore this issue for now... 
That would leave us with the following tests that theoretically we could 
implement:
   1. migration of `FlinkKafkaProducer` from pre-made 1.7 savepoint to master 
without pending transactions
   2. migration of `FlinkKafkaProducer011` from pre-made 1.7 savepoint to 
master without pending transactions
   3. migration from  `FlinkKafkaProducer011` master savepoint to 
`FlinkKafkaProducer` master without pending transactions
   4. (optional) migration from  `FlinkKafkaProducer011` pre-made 1.7 savepoint 
to `FlinkKafkaProducer` master without pending transactions
   5. (optional) migration `FlinkKafkaProducer011` -> `FlinkKafkaProducer` from 
savepoint created on demand (during unit test) from master to master versions, 
with pending transactions 
   6. (optional) upgrading Kafka brokers when using `FlinkKafkaProducer` from 
savepoint created on demand (during unit test) from master to master versions, 
with pending transactions
   
   1, 2 and 3 are IMO must have. There is a chance that Flink 1.8 will support 
"[stop with savepoint](https://issues.apache.org/jira/browse/FLINK-11458)" 
feature, which would mean that there would be no pending transactions in 
savepoints. With that guarantee, 1, 2 and 3 would essentially cover all of the 
required upgrade paths. Story of upgrading from 1.7 `FlinkKafkaProducer011` to 
1.8 `FlinkKafkaProducer` would be covered first by step 1, then by step 3. 
   3, 4 and 5. have the issue that you reported that probably test harness 
needs to be adjusted
   6. will have yet another issue of handling two different.
   4, 5 and 6. would be nice to have, but as long as "[stop with 
savepoint](https://issues.apache.org/jira/browse/FLINK-11458)" is there, they 
are not strictly required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11458) Ensure sink commit side-effects when cancelling with savepoint.

2019-01-29 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-11458:
--

 Summary: Ensure sink commit side-effects when cancelling with 
savepoint.
 Key: FLINK-11458
 URL: https://issues.apache.org/jira/browse/FLINK-11458
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.0


TBD.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11419) StreamingFileSink fails to recover after taskmanager failure

2019-01-29 Thread Edward Rojas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755180#comment-16755180
 ] 

Edward Rojas commented on FLINK-11419:
--

[~kkl0u] I created the PR, you can review it now

> StreamingFileSink fails to recover after taskmanager failure
> 
>
> Key: FLINK-11419
> URL: https://issues.apache.org/jira/browse/FLINK-11419
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.7.1
>Reporter: Edward Rojas
>Assignee: Edward Rojas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> If a job with a StreamingFileSink sending data to HDFS is running in a 
> cluster with multiple taskmanagers and the taskmanager executing the job goes 
> down (for some reason), when the other task manager start executing the job, 
> it fails saying that there is some "missing data in tmp file" because it's 
> not able to perform a truncate in the file.
>  Here the full stack trace:
> {code:java}
> java.io.IOException: Missing data in tmp file: 
> hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:93)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191
>  for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file 
> lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
>   at 

[GitHub] pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-29 Thread GitBox
pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not 
be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-458609356
 
 
   Sorry, I forgot to respond to that part:
   > I have started to work on the last test FlinkKafkaProducer011 --> 
FlinkKafkaProducer  But I'm quickly stuck with an exception when initializing 
the testHarness ( created with FlinkKafkaProducer ) with the savepoint of 
FlinkKakfaProducer011. Is this action feasible with the testHarness API or 
there is a turn around to this issue ?
   
   I don't know. I'm not aware of any test that attempts to do that, so maybe 
this is a missing feature and maybe you are right that it has to be fixed 
somehow for the test.
   
   I have also thought more about the issue with timeouts on pending 
transactions. This means that the only way to test for 1.7 -> 1.8 migration 
with pending transactions, would be some awful code that downloads the Flink 
1.7 sources/binaries and creates a savepoint during the test execution. I think 
that would be extremely difficult to do so we can ignore this issue for now... 
That would leave us with the following tests that theoretically we could 
implement:
   1. migration of `FlinkKafkaProducer` from pre-made 1.7 savepoint to master 
without pending transactions
   2. migration of `FlinkKafkaProducer011` from pre-made 1.7 savepoint to 
master without pending transactions
   3. migration from  `FlinkKafkaProducer011` master savepoint to 
`FlinkKafkaProducer` master without pending transactions
   4. (optional) migration from  `FlinkKafkaProducer011` pre-made 1.7 savepoint 
to `FlinkKafkaProducer` master without pending transactions
   5. (optional) migration `FlinkKafkaProducer011` -> `FlinkKafkaProducer` from 
savepoint created on demand (during unit test) from master to master versions, 
with pending transactions 
   6. (optional) upgrading Kafka brokers when using `FlinkKafkaProducer` from 
savepoint created on demand (during unit test) from master to master versions, 
with pending transactions
   
   1, 2 and 3 are IMO must have. There is a chance that Flink 1.8 will support 
"stop with savepoint" feature, which would mean that there would be no pending 
transactions in savepoints. With that guarantee, 1, 2 and 3 would essentially 
cover all of the required upgrade paths. Story of upgrading from 1.7 
`FlinkKafkaProducer011` to 1.8 `FlinkKafkaProducer` would be covered first by 
step 1, then by step 3. 
   3, 4 and 5. have the issue that you reported that probably test harness 
needs to be adjusted
   6. will have yet another issue of handling two different.
   4, 5 and 6. would be nice to have, but as long as "stop with savepoint" is 
there, they are not strictly required.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-29 Thread GitBox
tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can 
not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-458604699
 
 
   I've manage to make the migration test from FlinkKakfaProducer011 --> 
FlinkKafkaProducer work. I will make the PR for those migration test. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11457) PrometheusPushGatewayReporter either overwrites its own metrics or creates too may labels

2019-01-29 Thread Oscar Westra van Holthe - Kind (JIRA)
Oscar Westra van Holthe - Kind created FLINK-11457:
--

 Summary: PrometheusPushGatewayReporter either overwrites its own 
metrics or creates too may labels
 Key: FLINK-11457
 URL: https://issues.apache.org/jira/browse/FLINK-11457
 Project: Flink
  Issue Type: Bug
Reporter: Oscar Westra van Holthe - Kind


When using the PrometheusPushGatewayReporter, one has two options:
 * Use a fixed job name, which causes the jobmanager and taskmanager to 
overwrite each others metrics (i.e. last write wins, and you lose a lot of 
metrics)
 * Use a random suffix for the job name, which creates a lot of labels that 
have to be cleaned up manually

The manual cleanup should not be necessary, but happens nonetheless when using 
a yarn cluster.

A fix could be to add a suffix the job name, naming the nodes in a non-random 
manner like: {{myjob_jm0}}, {{my_job_tm1}}, {{my_job_tm1}}, {{my_job_tm2}}, 
{{my_job_tm3}}, {{my_job_tm4}}, ..., using a counter (not sure if such is 
available), or some other stable (!) suffix.

Related discussion: FLINK-9187

 

Any thoughts on a solution? I'm happy to implement it, but Im not sure what the 
best solution would be.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11456) Improve window operator with sliding window assigners

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11456:
-

 Summary: Improve window operator with sliding window assigners
 Key: FLINK-11456
 URL: https://issues.apache.org/jira/browse/FLINK-11456
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Rong Rong


With Slicing and merging operators that exposes the internals of window 
operators. current sliding window can be improved by eliminating duplicate 
aggregations or duplicate element insert into multiple panes (e.g. namespaces). 

The following sliding window operation
{code:java}
val resultStream: DataStream = inputStream
  .keyBy("key")
  .window(SlidingEventTimeWindow.of(Time.seconds(5L), Time.seconds(15L)))
  .sum("value")
{code}
can produce job graph equivalent to
{code:java}
val resultStream: DataStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))
  .sum("value")
  .slideOver(Count.of(3))
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11455) Support evictor operations on slicing and merging operators

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11455:
-

 Summary: Support evictor operations on slicing and merging 
operators
 Key: FLINK-11455
 URL: https://issues.apache.org/jira/browse/FLINK-11455
 Project: Flink
  Issue Type: Sub-task
Reporter: Rong Rong


The original implementation POC of SliceStream and MergeStream does not 
considere evicting window operations. this support can be further expanded in 
order to cover multiple timeout duration session windows. See 
[https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit#heading=h.ihxm3alf3tk0.]
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11454) Support MergedStream operation

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11454:
-

 Summary: Support MergedStream operation
 Key: FLINK-11454
 URL: https://issues.apache.org/jira/browse/FLINK-11454
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Rong Rong


Following SlicedStream, the mergedStream operator merges results from sliced 
stream and produces windowing results.
{code:java}
val slicedStream: SlicedStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
combine 
   // tumble results based on discrete
   // non-overlapping windows.
  .aggregate(aggFunc)

val mergedStream1: MergedStream = slicedStream
  .slideOver(Time.second(10L)) // combine slice results with same   
 
   // windowing function, equivalent to 
   // WindowOperator with an aggregate 
state 
   // and derived aggregate function.

val mergedStream2: MergedStream = slicedStream
  .slideOver(Count.of(5))
  .apply(windowFunction)   // apply a different window function 
over  
   // the sliced results.{code}
MergedStream are produced by MergeOperator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11453) Support SliceStream with forwardable pane info

2019-01-29 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-11453:
--
Summary: Support SliceStream with forwardable pane  info  (was: Support 
SliceWindow with forwardable pane  info)

> Support SliceStream with forwardable pane  info
> ---
>
> Key: FLINK-11453
> URL: https://issues.apache.org/jira/browse/FLINK-11453
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Rong Rong
>Priority: Major
>
> Support slicing operation that produces slicing:
> {code:java}
> val slicedStream: SlicedStream = inputStream
>   .keyBy("key")
>   .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
> combine 
>// tumble results based on discrete
>// non-overlapping windows.
>   .aggregate(aggFunc)
> {code}
> {{SlicedStream}} will produce results that exposes current {{WindowOperator}} 
> internal state {{InternalAppendingState}}, which can be 
> later applied with {{WindowFunction}} separately in another operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11453) Support SliceWindow with forwardable pane info

2019-01-29 Thread Rong Rong (JIRA)
Rong Rong created FLINK-11453:
-

 Summary: Support SliceWindow with forwardable pane  info
 Key: FLINK-11453
 URL: https://issues.apache.org/jira/browse/FLINK-11453
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Rong Rong


Support slicing operation that produces slicing:
{code:java}
val slicedStream: SlicedStream = inputStream
  .keyBy("key")
  .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
combine 
   // tumble results based on discrete
   // non-overlapping windows.
  .aggregate(aggFunc)
{code}
{{SlicedStream}} will produce results that exposes current {{WindowOperator}} 
internal state {{InternalAppendingState}}, which can be 
later applied with {{WindowFunction}} separately in another operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…

2019-01-29 Thread GitBox
TisonKun commented on issue #7570: [FLINK-11422] Prefer testing class to mock 
StreamTask in AbstractStre…
URL: https://github.com/apache/flink/pull/7570#issuecomment-458574322
 
 
   @pnowojski thanks for your advice! Address your comments. Also extract 
`MockStreamStatusMaintainer` to deduplicate codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11266) Only release hadoop-free Flink

2019-01-29 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-11266.

  Resolution: Fixed
Release Note: 
Convenience binaries that include hadoop are no longer released.

If a deployment relies on flink-shaded-hadoop2 being included in flink-dist, 
then it must be manually downloaded and copied into the /lib directory.
Alternatively, a Flink distribution that includes hadoop can be built by 
packaging flink-dist and activating the "include-hadoop" maven profile.

As hadoop is no longer included in flink-dist by default, specifying 
"-DwithoutHadoop" when packaging flink-dist no longer impacts the build.

> Only release hadoop-free Flink
> --
>
> Key: FLINK-11266
> URL: https://issues.apache.org/jira/browse/FLINK-11266
> Project: Flink
>  Issue Type: Improvement
>  Components: Release System
>Affects Versions: 1.8.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.8.0
>
>
> Currently we release 10 different binary releases (2 scala versions * (4 
> hadoop version + hadoop-free)), which has increased the size of our release 
> to more than 2 GB.
> Naturally, building Flink 10 times also takes a while, slowing down the 
> release process.
> However, the only difference between the hadoop versions is the bundled 
> {{flink-shaded-hadoop2}} jar; the rest is completely identical.
> I propose to stop releasing hadoop-specific distributions, and instead have 
> us release multiple versions of {{flink-shaded-hadoop2}} that users copy into 
> the hadoop-free distribution if required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest

2019-01-29 Thread GitBox
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and 
remove TaskManagerConfigurationTest
URL: https://github.com/apache/flink/pull/7525#discussion_r251869623
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 ##
 @@ -16,125 +16,97 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskmanager;
+package org.apache.flink.runtime.taskexecutor;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
 import org.junit.rules.TemporaryFolder;
-import scala.Tuple2;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
-import java.util.Iterator;
 
 import static org.junit.Assert.*;
 
 /**
- * Validates that the TaskManager startup properly obeys the configuration
+ * Validates that the TaskManagerRunner startup properly obeys the 
configuration
  * values.
  *
  * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run 
in parallel to other
  * tests in the same JVM as it modifies a static (private) member of the 
{@link FileSystem} class
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerConfigurationTest {
+public class TaskManagerRunnerConfigurationTest {
 
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@Test
-   public void testUsePreconfiguredNetworkInterface() throws Exception {
+   public void testUsePreconfiguredRpcService() throws Exception {
final String TEST_HOST_NAME = "testhostname";
 
Configuration config = new Configuration();
config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);
 
-   HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-   config,
-   Executors.directExecutor(),
-   
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-   try {
-
-   Tuple2> address = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices);
-
-   // validate the configured test host name
-   assertEquals(TEST_HOST_NAME, address._1());
-   } finally {
-   highAvailabilityServices.closeAndCleanupAllData();
-   }
-   }
-
-   @Test
-   public void testActorSystemPortConfig() throws Exception {
-   // config with pre-configured hostname to speed up tests (no 
interface selection)
-   Configuration config = new Configuration();
-   config.setString(TaskManagerOptions.HOST, "localhost");
-   config.setString(JobManagerOptions.ADDRESS, "localhost");
-   config.setInteger(JobManagerOptions.PORT, 7891);
-
HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.directExecutor(),

HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
try {
// auto port
-   Iterator portsIter = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices)._2();
-   assertTrue(portsIter.hasNext());
-   assertEquals(0, (int) portsIter.next());
+   RpcService rpcService = 
TaskManagerRunner.createRpcService(config, highAvailabilityServices);
+   assertTrue(rpcService.getPort() >= 0);
+   // pre-defined host name
+   assertEquals(TEST_HOST_NAME, rpcService.getAddress());
 
// pre-defined port
final int testPort = 22551;
config.setString(TaskManagerOptions.RPC_PORT, 
String.valueOf(testPort));
-
-   

[jira] [Comment Edited] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs

2019-01-29 Thread Miguel E. Coimbra (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710007#comment-16710007
 ] 

Miguel E. Coimbra edited comment on FLINK-10867 at 1/29/19 2:58 PM:


[~StephanEwen], I've been thinking how to go about this on my [own fork of 
Apache Flink|https://github.com/mcoimbra/flink] on GitHub. After having 
invested quite some time in the last two years using this platform and opening 
bug issues now and then, I'm interested in contributing something which I 
believe would be of use to the community

A complex task such as this would likely require a design document or something 
along those lines. However, what I've done so far was to read the Flink source 
code to understand what is necessary to change by tracing what the information 
flow would be, considering key moments:
 * Programmer invoking the {{.cache()}} function;
 * {{JobManager}} receiving a plan with a caching operator (the operator does 
not have a reference to a previous job at this time);
 * {{TaskManager}}(s) receiving indication that the 
{{org.apache.flink.core.memory.MemorySegment }}instances associated to the 
caching operator are to be kept as they are (in the job where caching will 
occur, their data must have originated from a previous operator which produced 
the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work 
like a regular {{DataSink}}, but instead of writing data, its action is to not 
evict the segments;
 * {{JobManager}} receiving a plan with a caching operator referencing 
previously-cached data.

 

Additional behavior properties:
 * Adding a parameter to decide how long the cached data is to be stored in the 
cluster. Number of jobs that the cached data should be persisted or amount of 
time? What would be desirable?
 * Would this imply that caching may only occur when executing in session mode 
so that the Flink job manager knows that it is caching specifically for a user?
 * The {{org.apache.flink.runtime.executiongraph.ExecutionJobVertex}} instances 
that depend on cached datasets could conceptually read their input from 
{{MemorySegment}} instances with the same logic as if reading  from disk.
 * Create a new COMPLETED_AND_CACHED job status to make this concept explicit 
as far as job management is concerned?

 

Besides the DataSet API (this part is the easier one), I've been thinking and 
perhaps it would be best to define an 
{{org.apache.flink.api.java.operators.MemorySinkOperator }}class to explicitly 
hold a reference to the previous job where caching occurred. The 
{{org.apache.flink.api.java.ExecutionEnvironment}} instance could note the 
references to this {{MemorySinkOperator}} instance and store in them the 
cluster job identification as an attribute. That way, when the client-side 
{{.execute()}} call finishes successfully, it would store the reference there 
so that the {{MemorySinkOperator}} operator reference can be reused in the next 
Flink job triggered by the user-level code.

The major packages for this functionality are:
 * {{org.apache.flink.optimizer}}
 * {{org.apache.flink.runtime}}
 * {{org.apache.flink.java:}}
 * {{org.apache.flink.client}}

 

 


was (Author: mcoimbra):
[~StephanEwen], I've been thinking how to go about this on my [own fork of 
Apache Flink|https://github.com/mcoimbra/flink] on GitHub. After having 
invested quite some time in the last two years using this platform and opening 
bug issues now and then, I'm interested in contributing something which I 
believe would be of use to the community

A complex task such as this would likely require a design document or something 
along those lines. However, what I've done so far was to read the Flink source 
code to understand what is necessary to change by tracing what the information 
flow would be, considering key moments:
 * Programmer invoking the {{.cache()}} function;
 * {{JobManager}} receiving a plan with a caching operator (the operator does 
not have a reference to a previous job at this time);
 * {{TaskManager}}(s) receiving indication that the 
{{org.apache.flink.core.memory.MemorySegment }}instances associated to the 
caching operator are to be kept as they are (in the job where caching will 
occur, their data must have originated from a previous operator which produced 
the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work 
like a regular {{DataSink}}, but instead of writing data, its action is to not 
evict the segments;
 * {{JobManager}} receiving a plan with a caching operator referencing 
previously-cached data.

 

Additional behavior properties:
 * Adding a parameter to decide how long the cached data is to be stored in the 
cluster. Number of jobs that the cached data should be persisted or amount of 
time? What would be desirable?
 * Would this imply that caching may only occur when executing in session mode 
so that the 

  1   2   3   >