[GitHub] twalthr commented on a change in pull request #7587: [FLINK-11064] [table] Setup a new flink-table module structure
twalthr commented on a change in pull request #7587: [FLINK-11064] [table] Setup a new flink-table module structure URL: https://github.com/apache/flink/pull/7587#discussion_r252147236 ## File path: flink-table/flink-table-dist/pom.xml ## @@ -0,0 +1,103 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-table + 1.8-SNAPSHOT + .. + + + flink-table-dist_${scala.binary.version} Review comment: Sounds good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252145959 ## File path: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ## @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) // Serializer configuration snapshotting & compatibility // - override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = { -new ScalaOptionSerializerConfigSnapshot[A](elemSerializer) - } - - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = { - -configSnapshot match { - case optionSerializerConfigSnapshot - : ScalaOptionSerializerConfigSnapshot[A] => -ensureCompatibilityInternal(optionSerializerConfigSnapshot) - case legacyOptionSerializerConfigSnapshot - : OptionSerializer.OptionSerializerConfigSnapshot[A] => Review comment: Removing this path will lead to problems when restoring from Flink 1.3, because this snapshot class was used back in Flink 1.3. OTOH, it should be possible to redirect `OptionSerializerConfigSnapshot`'s compatibility check to the new `ScalaOptionSerializerSnapshot`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252144982 ## File path: flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java ## @@ -31,6 +33,7 @@ * allow calling different base class constructors from subclasses, while we need that * for the default empty constructor. Review comment: nit: Add `@deprecated` message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755785#comment-16755785 ] Dian Fu commented on FLINK-11447: - Thanks [~dawidwys] for the sharing your thoughts. If we choose option 3, we need to add the following new interfaces to Table (maybe more in the future if more join type is supported): {code:java} join(udtf: String) join(udtf: String, joinPredicate: String) join(udtf: String, joinPredicate: Expression) leftOuterJoin(udtf: String) leftOuterJoin(udtf: String, joinPredicate: String) leftOuterJoin(udtf: String, joinPredicate: Expression){code} Actually there are such kinds of interfaces in Table previously and these interfaces are removed in FLINK-6334 as to left UDTF use table.join(table) interface. Personally I prefer the table.join(table) as it's more straight forward in the semantics. Thoughts? > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: sunjincheng >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599#issuecomment-458840957 @StefanRRichter Please help to review this PR, thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252140553 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ## @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot> snapshotConfiguration() { - return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof UnionSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousSerializersAndConfigs = - ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(0).f1, - oneSerializer); - - CompatibilityResult twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(1).f1, - twoSerializer); - - if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new UnionSerializer<>( - new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer(; - } - } - - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new UnionSerializerSnapshot<>(this); } } /** * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. Review comment: nit: Add `@deprecated` message This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka removed a comment on issue #7595: [FLINK-11363][tests] Port TaskManagerConfigurationTest to new code base
Myasuka removed a comment on issue #7595: [FLINK-11363][tests] Port TaskManagerConfigurationTest to new code base URL: https://github.com/apache/flink/pull/7595#issuecomment-458839395 LGTM +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252140569 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: But still, I'd actually suggest adding that, to enforce good practices. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7595: [FLINK-11363][tests] Port TaskManagerConfigurationTest to new code base
Myasuka commented on issue #7595: [FLINK-11363][tests] Port TaskManagerConfigurationTest to new code base URL: https://github.com/apache/flink/pull/7595#issuecomment-458839395 LGTM +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252139996 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -126,15 +128,15 @@ private IS createState() throws Exception { @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); Review comment: I disagree, same reasons as above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
igalshilman commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252139840 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { - public Serializer(TypeSerializer userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE); + public Serializer(TypeSerializer valueSerializer, TypeSerializer timestampSerializer) { Review comment: As far as I can tell this is not a user facing serializer, rather used internally by the `TtlSerializer`, and I think it is important to make it explicit that this is a composite serializer and these are the nested serializers that define it. As a way to reduce verbosity, we can add a static factory method with an explicit name. Feel free to decide for yourself :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252139206 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ## @@ -260,73 +262,80 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } // - // Serializer configuration snapshotting & compatibility + // Serializer configuration snapshoting & compatibility // @Override - public RowSerializerConfigSnapshot snapshotConfiguration() { - return new RowSerializerConfigSnapshot(fieldSerializers); + public TypeSerializerSnapshot snapshotConfiguration() { + return new RowSerializerSnapshot(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof RowSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousFieldSerializersAndConfigs = - ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) { - boolean requireMigration = false; - TypeSerializer[] convertDeserializers = new TypeSerializer[fieldSerializers.length]; - - CompatibilityResult compatResult; - int i = 0; - for (Tuple2, TypeSerializerSnapshot> f : previousFieldSerializersAndConfigs) { - compatResult = CompatibilityUtil.resolveCompatibilityResult( - f.f0, - UnloadableDummyTypeSerializer.class, - f.f1, - fieldSerializers[i]); - - if (compatResult.isRequiresMigration()) { - requireMigration = true; - - if (compatResult.getConvertDeserializer() == null) { - // one of the field serializers cannot provide a fallback deserializer - return CompatibilityResult.requiresMigration(); - } else { - convertDeserializers[i] = - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); - } - } + /** +* A snapshot for {@link RowSerializer}. +*/ Review comment: nit: Add `@deprecated` message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252134315 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java ## @@ -233,45 +247,25 @@ public int hashCode() { } @Override - public NullableSerializerConfigSnapshot snapshotConfiguration() { - return new NullableSerializerConfigSnapshot<>(originalSerializer); + public TypeSerializerSnapshot snapshotConfiguration() { + return new NullableSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof NullableSerializerConfigSnapshot) { - List, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = - ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousKvSerializersAndConfigs.get(0).f1, - originalSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new NullableSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue())); - } - } - - return CompatibilityResult.requiresMigration(); - } /** * Configuration snapshot for serializers of nullable types, containing the * configuration snapshot of its original serializer. Review comment: nit: Add `@deprecated` message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types
Myasuka commented on issue #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types URL: https://github.com/apache/flink/pull/7598#issuecomment-458831260 CC @tzulitai would you please review this part of code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252132629 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: Ah scratch that, just realized that this is only a serializer used in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252130677 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: This serializer class previously did not have a `serialVersionUID` defined. Need to explicitly set it to what it was before, because I guess the serial version UID would have changed when adding the new constructors. OTOH, there seems to be missing a migration test for this serializer, because that would have caught this problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252130677 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: This serializer class previously did not have a `serialVersionUID` defined. Need to explicitly set it to what it was before, to be on the safe side here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11460) Remove useless AcknowledgeStreamMockEnvironment
zhijiang created FLINK-11460: Summary: Remove useless AcknowledgeStreamMockEnvironment Key: FLINK-11460 URL: https://issues.apache.org/jira/browse/FLINK-11460 Project: Flink Issue Type: Task Components: Tests Reporter: zhijiang Assignee: zhijiang Fix For: 1.8.0 This class is not used any more in the code path, so delete it directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128586 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs value, int index) { protected CompositeSerializer> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { - return new Serializer(precomputed, (TypeSerializer) originalSerializers[0]); + + return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]); + } + + TypeSerializer getValueSerializer() { + return fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer getTimestampSerializer() { + TypeSerializer fieldSerializer = fieldSerializers[1]; + return (TypeSerializer) fieldSerializer; + } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new ValueWithTsSerializerSnapshot(this); + } + } + + /** +* A {@link TypeSerializerSnapshot} for ValueWithTs Serializer. +*/ + public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot, Serializer> { + + private final static int VERSION = 2; + + @SuppressWarnings("unused") + public ValueWithTsSerializerSnapshot() { + super(Serializer.class); + } + + ValueWithTsSerializerSnapshot(Serializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(Serializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()}; + } + + @SuppressWarnings("unchecked") + @Override + protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + TypeSerializer valueSerializer = nestedSerializers[0]; + TypeSerializer timeSerializer = (TypeSerializer) nestedSerializers[1]; Review comment: nit: `time` --> `timestamp` for naming consistency This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128501 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { - public Serializer(TypeSerializer userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE); + public Serializer(TypeSerializer valueSerializer, TypeSerializer timestampSerializer) { Review comment: I don't think we need a public constructor that accepts the timestamp serializer. This should be a private constructor used only by the snapshot class. We should still have a public constructor that accepts the user value serializer, and by default just uses `LongSerializer.INSTANCE` as the new timestamp serializer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128802 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -126,15 +128,15 @@ private IS createState() throws Exception { @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); Review comment: As mentioned above, having to pass in a `LongSerializer.INSTANCE` every time we're instantiating a TtlSerializer seems very redundant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252127263 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ## @@ -302,6 +299,7 @@ static PrecomputedParameters precompute( } /** Snapshot field serializers of composite type. */ Review comment: nit: Add `@deprecated` message and direct to new snapshot class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai edited a comment on issue #7566: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface
tzulitai edited a comment on issue #7566: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface URL: https://github.com/apache/flink/pull/7566#issuecomment-458822350 Thanks @dawidwys! I looked at all 3 commits; the refactoring part is also nice to have. The actual migration changes in the 3rd commit also looks good! LGTM. I'll merge this together with other composite serializer changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7566: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface
tzulitai commented on issue #7566: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface URL: https://github.com/apache/flink/pull/7566#issuecomment-458822350 Thanks @dawidwys! I looked at all 3 commits; the refactoring part is also nice to have. The actual migration changes in the 3rd commit also looks good! LGTM, merging this .. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11450) Port and move TableSource and TableSink to flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-11450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755713#comment-16755713 ] Dian Fu commented on FLINK-11450: - [~twalthr] Sounds great. Will submit a PR ASAP. > Port and move TableSource and TableSink to flink-table-common > - > > Key: FLINK-11450 > URL: https://issues.apache.org/jira/browse/FLINK-11450 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: sunjincheng >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > This step only unblockes the TableEnvironment interfaces task. > Stream/BatchTableSouce/Sink remain in flink-table-api-java-bridge for now > until they have been reworked. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599#issuecomment-458821842 **Testing results:** [INFO] --- [INFO] T E S T S [INFO] --- [INFO] Running org.apache.flink.fs.osshadoop.HadoopOSSFileSystemITCase [INFO] Running org.apache.flink.fs.osshadoop.HadoopOSSFileSystemBehaviorITCase [INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.055 s - in org.apache.flink.fs.osshadoop.HadoopOSSFileSystemITCase [WARNING] Tests run: 8, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 6.429 s - in org.apache.flink.fs.osshadoop.HadoopOSSFileSystemBehaviorITCase [INFO] [INFO] Results: [INFO] [WARNING] Tests run: 11, Failures: 0, Errors: 0, Skipped: 2 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11442) Upgrade OSS SDK Version
[ https://issues.apache.org/jira/browse/FLINK-11442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11442: --- Labels: pull-request-available (was: ) > Upgrade OSS SDK Version > --- > > Key: FLINK-11442 > URL: https://issues.apache.org/jira/browse/FLINK-11442 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.8.0 >Reporter: wujinhu >Assignee: wujinhu >Priority: Major > Labels: pull-request-available > > Upgrade oss sdk version to exclude org.json dependency. > [INFO] +- com.aliyun.oss:aliyun-sdk-oss:jar:3.1.0:compile > [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.5.3:compile > [INFO] | | \- org.apache.httpcomponents:httpcore:jar:4.4.6:compile > [INFO] | +- org.jdom:jdom:jar:1.1:compile > [INFO] | +- com.sun.jersey:jersey-json:jar:1.9:compile > [INFO] | | +- org.codehaus.jettison:jettison:jar:1.1:compile > [INFO] | | | \- stax:stax-api:jar:1.0.1:compile > [INFO] | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile > [INFO] | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile > [INFO] | | | +- javax.xml.stream:stax-api:jar:1.0-2:compile > [INFO] | | | \- javax.activation:activation:jar:1.1:compile > [INFO] | | +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile > [INFO] | | \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile > [INFO] | +- com.aliyun:aliyun-java-sdk-core:jar:3.4.0:compile > [INFO] | | \- org.json:json:jar:20170516:compile > [INFO] | +- com.aliyun:aliyun-java-sdk-ram:jar:3.0.0:compile > [INFO] | +- com.aliyun:aliyun-java-sdk-sts:jar:3.0.0:compile > [INFO] | \- com.aliyun:aliyun-java-sdk-ecs:jar:4.2.0:compile > > The license of org.json:json:jar:20170516:compile is JSON License, which > cannot be included. > [https://www.apache.org/legal/resolved.html#json] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wujinhu opened a new pull request #7599: [FLINK-11442] upgrade oss sdk version
wujinhu opened a new pull request #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599 ## What is the purpose of the change Upgrade oss sdk version to exclude org.json dependency https://issues.apache.org/jira/browse/FLINK-11442 ## Brief change log Upgrade oss sdk version from 3.1.0 to 3.4.1 ## Verifying this change This change does not add tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XuQianJin-Stars commented on issue #7450: [FLINK-11296][Table API & SQL] Support truncate in TableAPI
XuQianJin-Stars commented on issue #7450: [FLINK-11296][Table API & SQL] Support truncate in TableAPI URL: https://github.com/apache/flink/pull/7450#issuecomment-458809680 hi @wuchong Thank you very much. I know you are very busy. best qianjin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wuchong commented on issue #7450: [FLINK-11296][Table API & SQL] Support truncate in TableAPI
wuchong commented on issue #7450: [FLINK-11296][Table API & SQL] Support truncate in TableAPI URL: https://github.com/apache/flink/pull/7450#issuecomment-458806492 Thank @XuQianJin-Stars and sorry for the late reply. The PR looks good to me now. Will merge this soon. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11333) First-class support for Protobuf types with evolvable schema
[ https://issues.apache.org/jira/browse/FLINK-11333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11333: --- Labels: pull-request-available (was: ) > First-class support for Protobuf types with evolvable schema > > > Key: FLINK-11333 > URL: https://issues.apache.org/jira/browse/FLINK-11333 > Project: Flink > Issue Type: Sub-task > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > > I think we have more and more users who are thinking about using Protobuf for > their state types. > Right now, Protobuf isn't supported directly in Flink. The only way to use > Protobuf for a type is to register it via Kryo: > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html. > Likewise for Avro types, we should be able to natively support Protobuf, > having a {{ProtobufSerializer}} that handles serialization of Protobuf types. > The serializer should also write necessary information in its snapshot, to > enable schema evolution for it in the future. For Protobuf, this should > almost work out-of-the-box. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka opened a new pull request #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types
Myasuka opened a new pull request #7598: [FLINK-11333][protobuf] First-class serializer support for Protobuf types URL: https://github.com/apache/flink/pull/7598 ## What is the purpose of the change Support Protobuf types directly in Flink. Unlike the built-in avro serializer of Flink, check whether schema evolvable left to Protobuf itself not checking before any code running currently. This is a known limitation and recorded in [FLINK-11333's comments](https://issues.apache.org/jira/browse/FLINK-11333?focusedCommentId=16755185=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16755185). ## Brief change log - Support to extract `ProtobufTypeInfo` within `TypeExtractor`. - Support to (de)serialize protobuf's message directly without introducing `chill-protobuf` and Kryo. - Support to migrate old savepoint data which serialized by kryo serializer to use current newly `ProtobufSeriazlier` - Also support to build protoc on travis. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests for `ProtobufTypeInfo`, `ProtobufSeriazlier` and `ProtobufSeriazlierSnapshot`. - Extended integration test to verify whether could migrate old savepoint data with kryo serializer to use current newly `ProtobufSeriazlier`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no, should not. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, savepoint data might need to be migarated. - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jinglining commented on a change in pull request #7567: [FLINK-11358][tests] Port LeaderChangeStateCleanupTest to new code base
jinglining commented on a change in pull request #7567: [FLINK-11358][tests] Port LeaderChangeStateCleanupTest to new code base URL: https://github.com/apache/flink/pull/7567#discussion_r252108778 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ## @@ -19,281 +19,156 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JobNotFinishedException; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; -import org.junit.After; + +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +/** + * Tests which verify the cluster behaviour in case of leader changes. + */ public class LeaderChangeStateCleanupTest extends TestLogger { Review comment: How about LeaderChangeJobRecoveryTest? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jinglining commented on a change in pull request #7567: [FLINK-11358][tests] Port LeaderChangeStateCleanupTest to new code base
jinglining commented on a change in pull request #7567: [FLINK-11358][tests] Port LeaderChangeStateCleanupTest to new code base URL: https://github.com/apache/flink/pull/7567#discussion_r252108778 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ## @@ -19,281 +19,156 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JobNotFinishedException; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; -import org.junit.After; + +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +/** + * Tests which verify the cluster behaviour in case of leader changes. + */ public class LeaderChangeStateCleanupTest extends TestLogger { Review comment: How about LeaderChangeJobRecoveryTest This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] chensi2017 opened a new pull request #7597: fix wrong format
chensi2017 opened a new pull request #7597: fix wrong format URL: https://github.com/apache/flink/pull/7597 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r252105308 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: Hmm, maybe I think it is safe to use the default accept method since there is nothing in `FlinkLogicalUpsertToRetraction` need to be processed by a `RexShuttle`. However, if we throw an exception in it, there are some cases would not working. For example, a `RelShuttleImpl` may be used to process the whole plan and use `RexShuttle` to process RexNodes for a RelNode. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Issue Comment Deleted] (FLINK-9054) IllegalStateException: Buffer pool is destroyed
[ https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] guchunhui updated FLINK-9054: - Comment: was deleted (was: Hi,I also have this problem with Flink 1.6. Do you solve this problem?) > IllegalStateException: Buffer pool is destroyed > --- > > Key: FLINK-9054 > URL: https://issues.apache.org/jira/browse/FLINK-9054 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Major > Fix For: 1.8.0 > > Attachments: flink-conf.yaml > > > Hi, > I have a flink cluster running on 2 machines, say A and B. > Job manager is running on A. There are 2 TaksManagers, one on each node. > So effectively, A has a job manager and a task manager, while B has a task > manager. > When I submit a job to the cluster, I see below exception and the job fails: > 2018-03-22 17:16:52,205 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while > emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 10 more > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 14 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) > ... 19 more > > The exception does not come when I run only one JobManager (only on machine > B). > > I
[jira] [Comment Edited] (FLINK-9054) IllegalStateException: Buffer pool is destroyed
[ https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755630#comment-16755630 ] guchunhui edited comment on FLINK-9054 at 1/30/19 3:14 AM: --- Hi,I also have this problem with Flink 1.6. Do you solve this problem? was (Author: guchunhui): Hi,I also have this problem with Flink 1.6. > IllegalStateException: Buffer pool is destroyed > --- > > Key: FLINK-9054 > URL: https://issues.apache.org/jira/browse/FLINK-9054 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Major > Fix For: 1.8.0 > > Attachments: flink-conf.yaml > > > Hi, > I have a flink cluster running on 2 machines, say A and B. > Job manager is running on A. There are 2 TaksManagers, one on each node. > So effectively, A has a job manager and a task manager, while B has a task > manager. > When I submit a job to the cluster, I see below exception and the job fails: > 2018-03-22 17:16:52,205 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while > emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 10 more > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 14 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at >
[jira] [Commented] (FLINK-9054) IllegalStateException: Buffer pool is destroyed
[ https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755630#comment-16755630 ] guchunhui commented on FLINK-9054: -- Hi,I also have this problem with Flink 1.6. > IllegalStateException: Buffer pool is destroyed > --- > > Key: FLINK-9054 > URL: https://issues.apache.org/jira/browse/FLINK-9054 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Major > Fix For: 1.8.0 > > Attachments: flink-conf.yaml > > > Hi, > I have a flink cluster running on 2 machines, say A and B. > Job manager is running on A. There are 2 TaksManagers, one on each node. > So effectively, A has a job manager and a task manager, while B has a task > manager. > When I submit a job to the cluster, I see below exception and the job fails: > 2018-03-22 17:16:52,205 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while > emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 10 more > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 14 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) > ... 19 more > > The exception does not come when I run only one JobManager (only on machine > B). > > I am attaching
[GitHub] klion26 commented on issue #7596: Release 1.7 sortPartition bug
klion26 commented on issue #7596: Release 1.7 sortPartition bug URL: https://github.com/apache/flink/pull/7596#issuecomment-458785790 @zhaijp You can report bug using [JIRA](https://issues.apache.org/jira/projects/FLINK). Maybe you could close this pr. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jinglining commented on issue #7582: [FLINK-11424][metrics]fix remove error type Gauge in DatadogHttpReporter
jinglining commented on issue #7582: [FLINK-11424][metrics]fix remove error type Gauge in DatadogHttpReporter URL: https://github.com/apache/flink/pull/7582#issuecomment-458785073 The Travis CI build failed, because of YarnFlinkResourceManagerTest which case is always fail recently。 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10897) Support POJO state schema evolution
[ https://issues.apache.org/jira/browse/FLINK-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16747409#comment-16747409 ] boshu Zheng edited comment on FLINK-10897 at 1/30/19 2:13 AM: -- I will try to follow the example that you presented at FlinkForwardChina. Additionally, I propose to add two fields to `PojoSerializer`, namely `previousFieldNames` and `previousFieldSerializers` which are snapshotted and restored by a new `PojoSerializerSnapshot`. When restoring states via `PojoSerializer#deserialize`, we should be able to figure out which fields have been removed/added or have a new type with these additional fields. What do you think? [~tzulitai] was (Author: kisimple): I will try to follow the example that you presented at FlinkForwardChain. Additionally, I propose to add two fields to `PojoSerializer`, namely `previousFieldNames` and `previousFieldSerializers` which are snapshotted and restored by a new `PojoSerializerSnapshot`. When restoring states via `PojoSerializer#deserialize`, we should be able to figure out which fields have been removed/added or have a new type with these additional fields. What do you think? [~tzulitai] > Support POJO state schema evolution > --- > > Key: FLINK-10897 > URL: https://issues.apache.org/jira/browse/FLINK-10897 > Project: Flink > Issue Type: Sub-task > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: boshu Zheng >Priority: Major > > Main action point for this is to implement a separate POJO serializer that is > specifically used as the restore serializer. > This restore POJO serializer should be able to read and dump values of fields > that no longer exists in the updated POJO schema, and assign default values > to newly added fields. Snapshot of the {{PojoSerializer}} should contain > sufficient information so that on restore, the information can be compared > with the adapted POJO class to figure out which fields have been removed / > added. > Changing fields types is out of scope and should not be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11380) YarnFlinkResourceManagerTest test case crashed
[ https://issues.apache.org/jira/browse/FLINK-11380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755576#comment-16755576 ] lining commented on FLINK-11380: I meet this too. https://api.travis-ci.org/v3/job/485210100/log.txt > YarnFlinkResourceManagerTest test case crashed > --- > > Key: FLINK-11380 > URL: https://issues.apache.org/jira/browse/FLINK-11380 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: vinoyang >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > context: > {code:java} > 17:18:44.415 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (default-test) on > project flink-yarn_2.11: There are test failures. > 17:18:44.415 [ERROR] > 17:18:44.415 [ERROR] Please refer to > /home/travis/build/apache/flink/flink-yarn/target/surefire-reports for the > individual test results. > 17:18:44.415 [ERROR] Please refer to dump files (if any exist) [date].dump, > [date]-jvmRun[N].dump and [date].dumpstream. > 17:18:44.415 [ERROR] ExecutionException The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 17:18:44.415 [ERROR] Command was /bin/sh -c cd > /home/travis/build/apache/flink/flink-yarn && > /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 > -XX:+UseG1GC -jar > /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar > /home/travis/build/apache/flink/flink-yarn/target/surefire > 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp > surefire_332496616764820906947tmp > 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log > 17:18:44.416 [ERROR] Process Exit Code: 243 > 17:18:44.416 [ERROR] Crashed tests: > 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest > 17:18:44.416 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > 17:18:44.416 [ERROR] Command was /bin/sh -c cd > /home/travis/build/apache/flink/flink-yarn && > /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 > -XX:+UseG1GC -jar > /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar > /home/travis/build/apache/flink/flink-yarn/target/surefire > 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp > surefire_332496616764820906947tmp > 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log > 17:18:44.416 [ERROR] Process Exit Code: 243 > 17:18:44.416 [ERROR] Crashed tests: > 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkOnceMultiple(ForkStarter.java:382) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:297) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120) > 17:18:44.416 [ERROR] at > org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355) > 17:18:44.416 [ERROR] at >
[jira] [Comment Edited] (FLINK-11380) YarnFlinkResourceManagerTest test case crashed
[ https://issues.apache.org/jira/browse/FLINK-11380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755576#comment-16755576 ] lining edited comment on FLINK-11380 at 1/30/19 2:12 AM: - I met this too. [https://api.travis-ci.org/v3/job/485210100/log.txt] was (Author: lining): I meet this too. https://api.travis-ci.org/v3/job/485210100/log.txt > YarnFlinkResourceManagerTest test case crashed > --- > > Key: FLINK-11380 > URL: https://issues.apache.org/jira/browse/FLINK-11380 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: vinoyang >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > context: > {code:java} > 17:18:44.415 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test (default-test) on > project flink-yarn_2.11: There are test failures. > 17:18:44.415 [ERROR] > 17:18:44.415 [ERROR] Please refer to > /home/travis/build/apache/flink/flink-yarn/target/surefire-reports for the > individual test results. > 17:18:44.415 [ERROR] Please refer to dump files (if any exist) [date].dump, > [date]-jvmRun[N].dump and [date].dumpstream. > 17:18:44.415 [ERROR] ExecutionException The forked VM terminated without > properly saying goodbye. VM crash or System.exit called? > 17:18:44.415 [ERROR] Command was /bin/sh -c cd > /home/travis/build/apache/flink/flink-yarn && > /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 > -XX:+UseG1GC -jar > /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar > /home/travis/build/apache/flink/flink-yarn/target/surefire > 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp > surefire_332496616764820906947tmp > 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log > 17:18:44.416 [ERROR] Process Exit Code: 243 > 17:18:44.416 [ERROR] Crashed tests: > 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest > 17:18:44.416 [ERROR] > org.apache.maven.surefire.booter.SurefireBooterForkException: > ExecutionException The forked VM terminated without properly saying goodbye. > VM crash or System.exit called? > 17:18:44.416 [ERROR] Command was /bin/sh -c cd > /home/travis/build/apache/flink/flink-yarn && > /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms256m -Xmx2048m -Dmvn.forkNumber=2 > -XX:+UseG1GC -jar > /home/travis/build/apache/flink/flink-yarn/target/surefire/surefirebooter3487840902331471745.jar > /home/travis/build/apache/flink/flink-yarn/target/surefire > 2019-01-16T17-02-23_939-jvmRun2 surefire3706271590182708448tmp > surefire_332496616764820906947tmp > 17:18:44.416 [ERROR] Error occurred in starting fork, check output in log > 17:18:44.416 [ERROR] Process Exit Code: 243 > 17:18:44.416 [ERROR] Crashed tests: > 17:18:44.416 [ERROR] org.apache.flink.yarn.YarnFlinkResourceManagerTest > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkOnceMultiple(ForkStarter.java:382) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:297) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857) > 17:18:44.416 [ERROR] at > org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) > 17:18:44.416 [ERROR] at > org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120) > 17:18:44.416
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r252089234 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Yes, I think so. I updated the code according to your suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijp opened a new pull request #7596: Release 1.7 sortPartition bug
zhaijp opened a new pull request #7596: Release 1.7 sortPartition bug URL: https://github.com/apache/flink/pull/7596 ### I found a bug when I use sortPartition public class WordCountExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet text = env.fromElements("a b b b b c c"); DataSet> wordCounts = text.flatMap(new LineSPlitter()) .groupBy(0) .sum(1) .sortPartition(1, Order.DESCENDING); wordCounts.print(); } } class LineSPlitter implements FlatMapFunction>{ @Override public void flatMap(String line, Collector> out) throws Exception { for(String word : line.split(" ")) { out.collect(new Tuple2(word,1)); } } } ### the console show this: (a,1) (b,4) (c,2) ### when i change the elements aa bb bb bb bb cc cc the result is right: (bb,4) (cc,2) (aa,1) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…
TisonKun commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre… URL: https://github.com/apache/flink/pull/7570#discussion_r252085826 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.MockStreamStatusMaintainer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import java.util.Collections; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * A builder of {@link MockStreamTask}. + */ +public class MockStreamTaskBuilder { + private final Environment environment; + private String name; + private Object checkpointLock; + private StreamConfig config; + private ExecutionConfig executionConfig; + private CloseableRegistry closableRegistry; + private StreamStatusMaintainer streamStatusMaintainer; + private CheckpointStorage checkpointStorage; + private ProcessingTimeService processingTimeService; + private StreamTaskStateInitializer streamTaskStateInitializer; + private BiConsumer handleAsyncException; + private Map> accumulatorMap; + + public MockStreamTaskBuilder(Environment environment) throws Exception { + // obligatory parameters + this.environment = environment; + + // default values Review comment: Sure! At first I initialize streamStatusMaintainer with an anonymous inner class inherited from `StreamStatusMaintainer`. I think it is a bit complex thus gather the initializations into constructor. But now the default values are simple, and you're definitely right :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r252039395 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -626,6 +686,44 @@ public void unRegisterInfoMessageListener(final String address) { } } + protected void rejectAllPendingSlotRequests(Exception e) { + slotManager.rejectAllPendingSlotRequests(e); + } + + protected synchronized void recordFailure() { + if (!checkFailureRate) { + return; + } + if (isFailureTimestampFull()) { + taskExecutorFailureTimestamps.remove(); + } + taskExecutorFailureTimestamps.add(System.currentTimeMillis()); + } + + protected boolean shouldRejectRequests() { Review comment: the rate calculation logic here share a lot with FailureRateRestartStrategy. Can we refactor the rate calculation code to a common class? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r252038865 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -192,7 +235,17 @@ public ResourceManager( this.jobManagerRegistrations = new HashMap<>(4); this.jmResourceIdRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); - infoMessageListeners = new ConcurrentHashMap<>(8); + this.infoMessageListeners = new ConcurrentHashMap<>(8); + this.failureInterval = failureInterval; + this.maximumFailureTaskExecutorPerInternal = maxFailurePerInterval; + + if (maximumFailureTaskExecutorPerInternal > 0) { + this.taskExecutorFailureTimestamps = new ArrayDeque<>(maximumFailureTaskExecutorPerInternal); Review comment: How about 0? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r252025408 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ## @@ -83,8 +83,27 @@ */ public static final ConfigOption MAX_FAILED_CONTAINERS = key("yarn.maximum-failed-containers") - .noDefaultValue() - .withDescription("Maximum number of containers the system is going to reallocate in case of a failure."); + .noDefaultValue() + .withDescription("Maximum number of containers the system is going to reallocate in case of a failure."); + + /** +* The maximum number of failed YARN containers within an interval before entirely stopping +* the YARN session / job on YARN. +* By default, the value is -1 +*/ + public static final ConfigOption MAX_FAILED_CONTAINERS_PER_INTERVAL = + key("yarn.maximum-failed-containers-per-interval") + .defaultValue(-1) + .withDescription("Maximum number of containers the system is going to reallocate in case of a failure in an interval."); Review comment: Please document what does -1 mean. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r252025274 ## File path: docs/_includes/generated/mesos_configuration.html ## @@ -27,6 +27,11 @@ -1 The maximum number of failed workers before the cluster fails. May be set to -1 to disable this feature. This option is ignored unless Flink is in legacy mode. + +mesos.maximum-failed-workers-per-interval +-1 +Maximum number of workers the system is going to reallocate in case of a failure in an interval. Review comment: Please document what does -1 & 0 mean. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r252024051 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java ## @@ -99,6 +99,25 @@ .withDescription("The config parameter defining the Mesos artifact server port to use. Setting the port to" + " 0 will let the OS choose an available port."); + /** +* The maximum number of failed Mesos worker within an interval before entirely stopping +* the Mesos session / job on Mesos. +* By default, the value is -1 +*/ + public static final ConfigOption MAX_FAILED_WORKERS_PER_INTERVAL = + key("mesos.maximum-failed-workers-per-interval") + .defaultValue(-1) + .withDescription("Maximum number of workers the system is going to reallocate in case of a failure in an interval."); + + /** +* The interval for measuring failure rate of containers in second unit. +* By default, the value is 5 minutes. +**/ + public static final ConfigOption WORKERS_FAILURE_RATE_INTERVAL = + key("mesos.workers-failure-rate-interval") + .defaultValue(300) + .withDeprecatedKeys("The interval for measuring failure rate of workers"); Review comment: withDescription here as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
suez1224 commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers URL: https://github.com/apache/flink/pull/7356#discussion_r252037753 ## File path: docs/_includes/generated/yarn_config_configuration.html ## @@ -42,6 +47,11 @@ (none) Maximum number of containers the system is going to reallocate in case of a failure. + +yarn.maximum-failed-containers-per-interval +-1 Review comment: Please document what does -1 and 0 mean. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252035267 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.schema.MessageType; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +/** + * Simple test case for conversion between Parquet schema and Flink date types. + */ +public class ParquetSchemaConverterTest extends TestUtil { + private final RowTypeInfo simplyRowType = new RowTypeInfo( Review comment: There is a utility class `Types` that eases the creation of type informations for all kinds of types (Row, Map, Array, Pojo, etc.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252016959 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.BasicArrayConverter((BasicArrayTypeInfo) typeInformation, elementType, + Character.class, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r251979596 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); Review comment: Logger is not used. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252025737 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test cases for reading Map from Parquet files. + */ +public class ParquetMapInputFormatTest { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + + @Test + @SuppressWarnings("unchecked") + public void testReadMapFromNestedRecord() throws IOException { Review comment: Add a test for projected reads, i.e., reading a subset of the fields contained in the file This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252016613 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.BasicArrayConverter((BasicArrayTypeInfo) typeInformation, elementType, + Character.class, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252036898 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java ## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.generated.ArrayItem; +import org.apache.flink.formats.parquet.generated.Bar; +import org.apache.flink.formats.parquet.generated.MapItem; +import org.apache.flink.formats.parquet.generated.NestedRecord; +import org.apache.flink.formats.parquet.generated.SimpleRecord; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.avro.Schema.Type.NULL; +import static org.junit.Assert.assertEquals; + +/** + * Utilities for testing schema conversion and test parquet file creation. + */ +public class TestUtil { + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + public static final Configuration TEST_CONFIGURATION = new Configuration(); + public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc"); + public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc"); + + protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("foo"), + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL) + .as(OriginalType.UTF8).named("bar"), + Types.optionalGroup() + .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REPEATED).named("array")) + .named("arr") + }; + + protected static final Type[] SIMPLE_STANDARD_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) + .as(OriginalType.INT_64).named("foo"), + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL) + .as(OriginalType.UTF8).named("bar"), + Types.optionalGroup() + .addField(Types.repeatedGroup().addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED) + .as(OriginalType.INT_64).named("element")).named("list")).as(OriginalType.LIST) + .named("arr")}; + + protected static final Type[] NESTED_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) + .as(OriginalType.INT_64).named("foo"), +
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252018181 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.BasicArrayConverter((BasicArrayTypeInfo) typeInformation, elementType, + Character.class, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252018047 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.BasicArrayConverter((BasicArrayTypeInfo) typeInformation, elementType, + Character.class, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252033885 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Simple test case for reading parquet records. + */ +public class ParquetRecordReaderTest extends TestUtil { + + @Test + public void testReadSimpleGroup() throws IOException { + temp.create(); + + Long[] array = {1L}; + GenericData.Record record = new GenericRecordBuilder(SIMPLE_SCHEMA) + .set("bar", "test") + .set("foo", 32L) + .set("arr", array).build(); + + Path path = createTempParquetFile(temp, SIMPLE_SCHEMA, Collections.singletonList(record)); + MessageType readSchema = (new AvroSchemaConverter()).convert(SIMPLE_SCHEMA); + ParquetRecordReader rowReader = new ParquetRecordReader(new RowReadSupport(), readSchema); + + InputFile inputFile = + HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION); + ParquetReadOptions options = ParquetReadOptions.builder().build(); + ParquetFileReader fileReader = new ParquetFileReader(inputFile, options); + + rowReader.initialize(fileReader, TEST_CONFIGURATION); + assertEquals(true, rowReader.hasNextRecord()); + + Row row = rowReader.nextRecord(); + assertEquals(3, row.getArity()); + assertEquals(32L, row.getField(0)); + assertEquals("test", row.getField(1)); + assertArrayEquals(array, (Long[]) row.getField(2)); + assertEquals(true, rowReader.reachEnd()); + } + + @Test + public void testReadMultipleSimpleGroup() throws IOException { + temp.create(); + + Long[] array = {1L}; + + List records = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + GenericData.Record record = new GenericRecordBuilder(SIMPLE_SCHEMA) + .set("bar", "test") + .set("foo", i) + .set("arr", array).build(); + records.add(record); + } + + Path path = createTempParquetFile(temp, SIMPLE_SCHEMA, records); + MessageType readSchema = (new AvroSchemaConverter()).convert(SIMPLE_SCHEMA); + ParquetRecordReader rowReader = new ParquetRecordReader(new RowReadSupport(), readSchema); + + InputFile inputFile = + HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION); + ParquetReadOptions options =
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252037449 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java ## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.generated.ArrayItem; +import org.apache.flink.formats.parquet.generated.Bar; +import org.apache.flink.formats.parquet.generated.MapItem; +import org.apache.flink.formats.parquet.generated.NestedRecord; +import org.apache.flink.formats.parquet.generated.SimpleRecord; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.avro.Schema.Type.NULL; +import static org.junit.Assert.assertEquals; + +/** + * Utilities for testing schema conversion and test parquet file creation. + */ +public class TestUtil { + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + public static final Configuration TEST_CONFIGURATION = new Configuration(); + public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc"); + public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc"); + + protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("foo"), + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL) + .as(OriginalType.UTF8).named("bar"), + Types.optionalGroup() + .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REPEATED).named("array")) + .named("arr") + }; + + protected static final Type[] SIMPLE_STANDARD_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) + .as(OriginalType.INT_64).named("foo"), + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL) + .as(OriginalType.UTF8).named("bar"), + Types.optionalGroup() + .addField(Types.repeatedGroup().addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED) + .as(OriginalType.INT_64).named("element")).named("list")).as(OriginalType.LIST) + .named("arr")}; + + protected static final Type[] NESTED_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) + .as(OriginalType.INT_64).named("foo"), +
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252036720 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java ## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.generated.ArrayItem; +import org.apache.flink.formats.parquet.generated.Bar; +import org.apache.flink.formats.parquet.generated.MapItem; +import org.apache.flink.formats.parquet.generated.NestedRecord; +import org.apache.flink.formats.parquet.generated.SimpleRecord; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.avro.Schema.Type.NULL; +import static org.junit.Assert.assertEquals; + +/** + * Utilities for testing schema conversion and test parquet file creation. + */ +public class TestUtil { + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + public static final Configuration TEST_CONFIGURATION = new Configuration(); + public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc"); + public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc"); + + protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("foo"), + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL) + .as(OriginalType.UTF8).named("bar"), + Types.optionalGroup() + .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REPEATED).named("array")) + .named("arr") + }; + + protected static final Type[] SIMPLE_STANDARD_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) + .as(OriginalType.INT_64).named("foo"), + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL) + .as(OriginalType.UTF8).named("bar"), + Types.optionalGroup() + .addField(Types.repeatedGroup().addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REQUIRED) + .as(OriginalType.INT_64).named("element")).named("list")).as(OriginalType.LIST) + .named("arr")}; + + protected static final Type[] NESTED_TYPES = { + Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL) + .as(OriginalType.INT_64).named("foo"), +
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252026077 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test cases for reading Map from Parquet files. + */ +public class ParquetMapInputFormatTest { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + + @Test + @SuppressWarnings("unchecked") + public void testReadMapFromNestedRecord() throws IOException { + temp.create(); + Tuple3, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData(); + Path path = TestUtil.createTempParquetFile(temp, TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1)); + MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA); + ParquetMapInputFormat mapInputFormat = new ParquetMapInputFormat(path, nestedType); + + RuntimeContext mockContext = Mockito.mock(RuntimeContext.class); + Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()) + .when(mockContext).getMetricGroup(); + mapInputFormat.setRuntimeContext(mockContext); + FileInputSplit[] splits = mapInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + mapInputFormat.open(splits[0]); + + Map map = mapInputFormat.nextRecord(null); + assertNotNull(map); Review comment: check size of map This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r251984469 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowReadSupport.java ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Parquet {@link ReadSupport} implementation for reading Parquet record as {@link Row}. + */ +public class RowReadSupport extends ReadSupport { + + private TypeInformation returnTypeInfo; + + @Override + public ReadContext init(InitContext initContext) { + checkNotNull(initContext, "initContext"); + returnTypeInfo = ParquetSchemaConverter.fromParquetType(initContext.getFileSchema()); + return new ReadContext(initContext.getFileSchema()); + } + + @Override + public RecordMaterializer prepareForRead( + Configuration configuration, Map keyValueMetaData, + MessageType fileSchema, ReadContext readContext) { + return new RowMaterializer(fileSchema, returnTypeInfo); Review comment: use `ReadContext.getSchemaForRead()` instead of `fileSchema`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252013399 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.BasicArrayConverter((BasicArrayTypeInfo) typeInformation, elementType, + Character.class, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252027651 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.pojo.PojoSimpleRecord; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Test cases for reading Pojo from Parquet files. + */ +public class ParquetPojoInputFormatTest { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testReadPojoFromSimpleRecord() throws IOException, NoSuchFieldException { + temp.create(); + Tuple3, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData(); + MessageType messageType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA); + Path path = TestUtil.createTempParquetFile(temp, TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1)); + + List fieldList = new ArrayList<>(); + fieldList.add(new PojoField(PojoSimpleRecord.class.getField("foo"), BasicTypeInfo.LONG_TYPE_INFO)); + fieldList.add(new PojoField(PojoSimpleRecord.class.getField("bar"), BasicTypeInfo.STRING_TYPE_INFO)); + fieldList.add(new PojoField(PojoSimpleRecord.class.getField("arr"), + BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)); + + ParquetPojoInputFormat pojoInputFormat = + new ParquetPojoInputFormat(path, messageType, new PojoTypeInfo( Review comment: A `PojoTypeInfo` can be created with `Types.POJO(PojoSimpleRecord.class)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252014383 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java ## @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.BasicArrayConverter((BasicArrayTypeInfo) typeInformation, elementType, + Character.class, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252032139 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.generated.SimpleRecord; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Simple test case for reading {@link org.apache.flink.types.Row} from Parquet files. + */ +public class ParquetRowInputFormatTest { Review comment: I think it would be good to add tests for the following cases: * Reading multiple records and validating the correctness of the data (the fault tolerance test is reading multiple records but not checking the returned values) * Test multiple files, each file should be handled as a separate split * Test projection (`ParquetRowInputFormat.selectFields()`) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252028083 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.pojo.PojoSimpleRecord; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Test cases for reading Pojo from Parquet files. + */ +public class ParquetPojoInputFormatTest { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testReadPojoFromSimpleRecord() throws IOException, NoSuchFieldException { Review comment: Add test with projection: `ParquetPojoInputFormat.selectFields(...)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r252026568 ## File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.TestUtil; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test cases for reading Map from Parquet files. + */ +public class ParquetMapInputFormatTest { + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); + + @Test + @SuppressWarnings("unchecked") + public void testReadMapFromNestedRecord() throws IOException { + temp.create(); + Tuple3, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData(); + Path path = TestUtil.createTempParquetFile(temp, TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1)); + MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA); + ParquetMapInputFormat mapInputFormat = new ParquetMapInputFormat(path, nestedType); + + RuntimeContext mockContext = Mockito.mock(RuntimeContext.class); + Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()) + .when(mockContext).getMetricGroup(); + mapInputFormat.setRuntimeContext(mockContext); + FileInputSplit[] splits = mapInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + mapInputFormat.open(splits[0]); + + Map map = mapInputFormat.nextRecord(null); + assertNotNull(map); + assertArrayEquals((Long[]) map.get("arr"), (Long[]) nested.f2.getField(3)); + assertArrayEquals((String[]) map.get("strArray"), (String[]) nested.f2.getField(4)); + + Map mapItem = (Map) ((Map) map.get("nestedMap")).get("mapItem"); Review comment: check size of mapItem This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-2491) Checkpointing only works if all operators/tasks are still running
[ https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755385#comment-16755385 ] JC commented on FLINK-2491: --- I also came across this problem trying exactly the same as [~canhuang] , i want to expand the parallelism beyond my amount of kafka partitions, i am using Flink 1.5.4 with Apache Beam. Thanks > Checkpointing only works if all operators/tasks are still running > - > > Key: FLINK-2491 > URL: https://issues.apache.org/jira/browse/FLINK-2491 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 0.10.0 >Reporter: Robert Metzger >Assignee: Márton Balassi >Priority: Critical > Attachments: fix_checkpoint_not_working_if_tasks_are_finished.patch > > > While implementing a test case for the Kafka Consumer, I came across the > following bug: > Consider the following topology, with the operator parallelism in parentheses: > Source (2) --> Sink (1). > In this setup, the {{snapshotState()}} method is called on the source, but > not on the Sink. > The sink receives the generated data. > only one of the two sources is generating data. > I've implemented a test case for this, you can find it here: > https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r251966785 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -1000,6 +1005,21 @@ public void notifyAllocationFailure(AllocationID allocationID, Exception cause) internalFailAllocation(allocationID, cause); } + @Override + public CompletableFuture updateAggregate(String aggregateName, Object aggregand, byte[] serializedAggregateFunction) + throws IOException, ClassNotFoundException { + + AggregateFunction aggregateFunction = InstantiationUtil.deserializeObject(serializedAggregateFunction, userCodeLoader); + + Object accumulator = accumulators.get(aggregateName); Review comment: Also: https://github.com/jgrier/flink/blob/d9b28e817351eb2eb6b4cdd9597061713d9160e8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java#L47-L47 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11459) Presto S3 does not show errors due to missing credentials with minio
[ https://issues.apache.org/jira/browse/FLINK-11459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-11459: Description: It seems that when using minio for S3-like storage and with mis-configurations such as missing (maybe also wrong) credentials gets into a failing state but with no reason for it: {code} ... 2019-01-29 15:43:27,676 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.heap.mb, 353 2019-01-29 15:43:27,738 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.heap.mb, 429 2019-01-29 15:43:27,758 INFO org.apache.flink.api.java.ExecutionEnvironment [] - The job has 0 registered types and 0 default Kryo serializers 2019-01-29 15:43:29,943 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2093.606], CredentialsRequestTime=[2092.961], 2019-01-29 15:43:29,956 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2115.551], 2019-01-29 15:43:31,946 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[3597.992], CredentialsRequestTime=[3597.788], 2019-01-29 15:43:31,958 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[3610.417], 2019-01-29 15:43:33,954 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2907.39], CredentialsRequestTime=[2906.853], 2019-01-29 15:43:33,963 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2917.786], 2019-01-29 15:43:36,133 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2005.692], CredentialsRequestTime=[2004.942], 2019-01-29 15:43:36,156 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2029.473], 2019-01-29 15:43:38,142 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2077.053], CredentialsRequestTime=[2076.05], 2019-01-29 15:43:38,164 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2092.878], 2019-01-29 15:43:42,181 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2005.91], CredentialsRequestTime=[2005.164], 2019-01-29 15:43:42,186 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2011.204], 2019-01-29 15:43:44,262 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2007.886], CredentialsRequestTime=[2007.165], 2019-01-29 15:43:44,276 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2024.312], 2019-01-29 15:43:44,585 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. 2019-01-29 15:43:44,628 INFO org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache 2019-01-29 15:43:44,661 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124 {code} With AWS S3, it is actually printing an exception instead: {code} 2019-01-29 19:24:39,968 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: rest.port, 8081 2019-01-29 19:24:39,990 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 2019-01-29 19:24:43,117 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2047.535], CredentialsRequestTime=[2033.619], 2019-01-29 19:24:43,118 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2049.826], 2019-01-29 19:24:46,215 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2003.168], CredentialsRequestTime=[2002.836], 2019-01-29 19:24:46,216 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2004.182], 2019-01-29 19:24:50,384 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2003.15], CredentialsRequestTime=[2002.803], 2019-01-29 19:24:50,384 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2004.308], 2019-01-29 19:24:56,691 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2002.596], CredentialsRequestTime=[2002.45], 2019-01-29 19:24:56,691 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2003.177], 2019-01-29 19:25:07,058 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2003.26],
[GitHub] jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster
jgrier commented on a change in pull request #7099: [FLINK-10887] [jobmaster] Add source watermark tracking to the JobMaster URL: https://github.com/apache/flink/pull/7099#discussion_r251964188 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -1000,6 +1005,21 @@ public void notifyAllocationFailure(AllocationID allocationID, Exception cause) internalFailAllocation(allocationID, cause); } + @Override + public CompletableFuture updateAggregate(String aggregateName, Object aggregand, byte[] serializedAggregateFunction) + throws IOException, ClassNotFoundException { + + AggregateFunction aggregateFunction = InstantiationUtil.deserializeObject(serializedAggregateFunction, userCodeLoader); + + Object accumulator = accumulators.get(aggregateName); Review comment: I believe it's already all synchronized. The RpcService is implemented as as a single Akka actor and thus access is already serialized. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11459) Presto S3 does not show errors due to missing credentials with minio
Nico Kruber created FLINK-11459: --- Summary: Presto S3 does not show errors due to missing credentials with minio Key: FLINK-11459 URL: https://issues.apache.org/jira/browse/FLINK-11459 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.6.2 Reporter: Nico Kruber It seems that when using minio for S3-like storage and with mis-configurations such as missing (maybe also wrong) credentials gets into a failing state but with no reason for it: {code} ... 2019-01-29 15:43:27,676 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.heap.mb, 353 2019-01-29 15:43:27,738 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.heap.mb, 429 2019-01-29 15:43:27,758 INFO org.apache.flink.api.java.ExecutionEnvironment [] - The job has 0 registered types and 0 default Kryo serializers 2019-01-29 15:43:29,943 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2093.606], CredentialsRequestTime=[2092.961], 2019-01-29 15:43:29,956 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2115.551], 2019-01-29 15:43:31,946 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[3597.992], CredentialsRequestTime=[3597.788], 2019-01-29 15:43:31,958 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[3610.417], 2019-01-29 15:43:33,954 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2907.39], CredentialsRequestTime=[2906.853], 2019-01-29 15:43:33,963 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2917.786], 2019-01-29 15:43:36,133 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2005.692], CredentialsRequestTime=[2004.942], 2019-01-29 15:43:36,156 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2029.473], 2019-01-29 15:43:38,142 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2077.053], CredentialsRequestTime=[2076.05], 2019-01-29 15:43:38,164 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2092.878], 2019-01-29 15:43:42,181 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2005.91], CredentialsRequestTime=[2005.164], 2019-01-29 15:43:42,186 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2011.204], 2019-01-29 15:43:44,262 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2007.886], CredentialsRequestTime=[2007.165], 2019-01-29 15:43:44,276 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency[] - ClientExecuteTime=[2024.312], 2019-01-29 15:43:44,585 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. 2019-01-29 15:43:44,628 INFO org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache 2019-01-29 15:43:44,661 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124 {code} With AWS S3, it is actually printing an exception instead: {code} 2019-01-29 19:24:39,968 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: rest.port, 8081 2019-01-29 19:24:39,990 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 2019-01-29 19:24:43,117 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2047.535], CredentialsRequestTime=[2033.619], 2019-01-29 19:24:43,118 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2049.826], 2019-01-29 19:24:46,215 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2003.168], CredentialsRequestTime=[2002.836], 2019-01-29 19:24:46,216 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2004.182], 2019-01-29 19:24:50,384 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2003.15], CredentialsRequestTime=[2002.803], 2019-01-29 19:24:50,384 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2004.308], 2019-01-29 19:24:56,691 INFO org.apache.flink.fs.s3presto.shaded.com.amazonaws.latency - ClientExecuteTime=[2002.596], CredentialsRequestTime=[2002.45], 2019-01-29 19:24:56,691 INFO
[GitHub] stevenzwu closed pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu closed pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#issuecomment-458650005 @tzulitai thx. let me close this PR then. @tillrohrmann can submit a diff PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r251924624 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: I think that this comment of mine might be partially invalid. The only optimisation rule that we support around this logical node is `CalcUpsertToRetractionTransposeRule` which doesn't use this accept. But I guess for the sake of the consistency, it should be implemented one way or another, for example by simply `throw new UnsupportedOperationException()`. Otherwise this is a "silent" land mine waiting for someone to step in? Or am I missing something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r251922093 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: As far as I can see `FlinkLogicalNativeTableScan` is the same to what I proposed in the comment above, right? The only difference is `estimateRowSize(child.getRowType)` vs `estimateRowSize(getRowType)`, which probably both are equivalent in this case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11042) testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid test
[ https://issues.apache.org/jira/browse/FLINK-11042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755198#comment-16755198 ] Piotr Nowojski edited comment on FLINK-11042 at 1/29/19 4:48 PM: - I am waiting for someone to review my PR that drops this test. I asked [~tzulitai] but if you can do it, you could do it as well :) was (Author: pnowojski): I was waiting for someone to review my PR that drops this test. I asked [~tzulitai] but if you can do it, you could do it as well :) > testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid > test > --- > > Key: FLINK-11042 > URL: https://issues.apache.org/jira/browse/FLINK-11042 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.8.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > main point of testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is > to fail transaction coordinator (by using > {{kafkaProducer.getTransactionCoordinatorId();}} ) and we expect that this > will cause failure of Flink job. However that's not always the case. Maybe > because transaction coordinator can be re-elected before {{KafkaProducer}} > even notices it or for whatever the reason, sometimes the failure is not > happening. > Because of a bug in the test, if failure hasn't happened, the test will not > fail. > Generally speaking this test is invalid and should be dropped. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11042) testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid test
[ https://issues.apache.org/jira/browse/FLINK-11042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755198#comment-16755198 ] Piotr Nowojski commented on FLINK-11042: I was waiting for someone to review my PR that drops this test. I asked [~tzulitai] but if you can do it, you could do it as well :) > testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is an invalid > test > --- > > Key: FLINK-11042 > URL: https://issues.apache.org/jira/browse/FLINK-11042 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.8.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > main point of testFlinkKafkaProducerFailTransactionCoordinatorBeforeNotify is > to fail transaction coordinator (by using > {{kafkaProducer.getTransactionCoordinatorId();}} ) and we expect that this > will cause failure of Flink job. However that's not always the case. Maybe > because transaction coordinator can be re-elected before {{KafkaProducer}} > even notices it or for whatever the reason, sometimes the failure is not > happening. > Because of a bug in the test, if failure hasn't happened, the test will not > fail. > Generally speaking this test is invalid and should be dropped. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…
pnowojski commented on a change in pull request #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre… URL: https://github.com/apache/flink/pull/7570#discussion_r251916174 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.MockStreamStatusMaintainer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; + +import java.util.Collections; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * A builder of {@link MockStreamTask}. + */ +public class MockStreamTaskBuilder { + private final Environment environment; + private String name; + private Object checkpointLock; + private StreamConfig config; + private ExecutionConfig executionConfig; + private CloseableRegistry closableRegistry; + private StreamStatusMaintainer streamStatusMaintainer; + private CheckpointStorage checkpointStorage; + private ProcessingTimeService processingTimeService; + private StreamTaskStateInitializer streamTaskStateInitializer; + private BiConsumer handleAsyncException; + private Map> accumulatorMap; + + public MockStreamTaskBuilder(Environment environment) throws Exception { + // obligatory parameters + this.environment = environment; + + // default values Review comment: nit: you could inline the default values with the fields definitions like ``` private String name = "Mock Task" ``` it would simplify/shorten the code by ~15 lines. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11333) First-class support for Protobuf types with evolvable schema
[ https://issues.apache.org/jira/browse/FLINK-11333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755185#comment-16755185 ] Yun Tang commented on FLINK-11333: -- I planed to use avro-protobuf to extract Protobuf message's schema to verify whether schema evolvable in avro's view. However, from [the comment|https://github.com/confluentinc/schema-registry/pull/672#issuecomment-393348920] described, avro defines different rules for compatibility. For example, Long cannot be treated as compatibly with Integer for avro, while protobuf could. And I write a simple program to verify this, there really exists difference. I'm afraid the idea to use avro to judge whether two protobuf message compatible is not correct. As far as I could see, check protobuf message compatibility seems a non-trivial work, we might have to leave schema evolution check of protobuf during job runtime. > First-class support for Protobuf types with evolvable schema > > > Key: FLINK-11333 > URL: https://issues.apache.org/jira/browse/FLINK-11333 > Project: Flink > Issue Type: Sub-task > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Yun Tang >Priority: Major > > I think we have more and more users who are thinking about using Protobuf for > their state types. > Right now, Protobuf isn't supported directly in Flink. The only way to use > Protobuf for a type is to register it via Kryo: > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html. > Likewise for Avro types, we should be able to natively support Protobuf, > having a {{ProtobufSerializer}} that handles serialization of Protobuf types. > The serializer should also write necessary information in its snapshot, to > enable schema evolution for it in the future. For Protobuf, this should > almost work out-of-the-box. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-458609356 Sorry, I forgot to respond to that part: > I have started to work on the last test FlinkKafkaProducer011 --> FlinkKafkaProducer But I'm quickly stuck with an exception when initializing the testHarness ( created with FlinkKafkaProducer ) with the savepoint of FlinkKakfaProducer011. Is this action feasible with the testHarness API or there is a turn around to this issue ? I don't know. I'm not aware of any test that attempts to do that, so maybe this is a missing feature and maybe you are right that it has to be fixed somehow for the test. I have also thought more about the issue with timeouts on pending transactions. This means that the only way to test for 1.7 -> 1.8 migration with pending transactions, would be some awful code that downloads the Flink 1.7 sources/binaries and creates a savepoint during the test execution. I think that would be extremely difficult to do so we can ignore this issue for now... That would leave us with the following tests that theoretically we could implement: 1. migration of `FlinkKafkaProducer` from pre-made 1.7 savepoint to master without pending transactions 2. migration of `FlinkKafkaProducer011` from pre-made 1.7 savepoint to master without pending transactions 3. migration from `FlinkKafkaProducer011` master savepoint to `FlinkKafkaProducer` master without pending transactions 4. (optional) migration from `FlinkKafkaProducer011` pre-made 1.7 savepoint to `FlinkKafkaProducer` master without pending transactions 5. (optional) migration `FlinkKafkaProducer011` -> `FlinkKafkaProducer` from savepoint created on demand (during unit test) from master to master versions, with pending transactions 6. (optional) upgrading Kafka brokers when using `FlinkKafkaProducer` from savepoint created on demand (during unit test) from master to master versions, with pending transactions 1, 2 and 3 are IMO must have. There is a chance that Flink 1.8 will support "[stop with savepoint](https://issues.apache.org/jira/browse/FLINK-11458)" feature, which would mean that there would be no pending transactions in savepoints. With that guarantee, 1, 2 and 3 would essentially cover all of the required upgrade paths. Story of upgrading from 1.7 `FlinkKafkaProducer011` to 1.8 `FlinkKafkaProducer` would be covered first by step 1, then by step 3. 3, 4 and 5. have the issue that you reported that probably test harness needs to be adjusted 6. will have yet another issue of handling two different. 4, 5 and 6. would be nice to have, but as long as "[stop with savepoint](https://issues.apache.org/jira/browse/FLINK-11458)" is there, they are not strictly required. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11458) Ensure sink commit side-effects when cancelling with savepoint.
Kostas Kloudas created FLINK-11458: -- Summary: Ensure sink commit side-effects when cancelling with savepoint. Key: FLINK-11458 URL: https://issues.apache.org/jira/browse/FLINK-11458 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Affects Versions: 1.8.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.8.0 TBD. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11419) StreamingFileSink fails to recover after taskmanager failure
[ https://issues.apache.org/jira/browse/FLINK-11419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755180#comment-16755180 ] Edward Rojas commented on FLINK-11419: -- [~kkl0u] I created the PR, you can review it now > StreamingFileSink fails to recover after taskmanager failure > > > Key: FLINK-11419 > URL: https://issues.apache.org/jira/browse/FLINK-11419 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.7.1 >Reporter: Edward Rojas >Assignee: Edward Rojas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > Time Spent: 10m > Remaining Estimate: 0h > > If a job with a StreamingFileSink sending data to HDFS is running in a > cluster with multiple taskmanagers and the taskmanager executing the job goes > down (for some reason), when the other task manager start executing the job, > it fails saying that there is some "missing data in tmp file" because it's > not able to perform a truncate in the file. > Here the full stack trace: > {code:java} > java.io.IOException: Missing data in tmp file: > hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:93) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 > for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file > lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) > at
[GitHub] pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-458609356 Sorry, I forgot to respond to that part: > I have started to work on the last test FlinkKafkaProducer011 --> FlinkKafkaProducer But I'm quickly stuck with an exception when initializing the testHarness ( created with FlinkKafkaProducer ) with the savepoint of FlinkKakfaProducer011. Is this action feasible with the testHarness API or there is a turn around to this issue ? I don't know. I'm not aware of any test that attempts to do that, so maybe this is a missing feature and maybe you are right that it has to be fixed somehow for the test. I have also thought more about the issue with timeouts on pending transactions. This means that the only way to test for 1.7 -> 1.8 migration with pending transactions, would be some awful code that downloads the Flink 1.7 sources/binaries and creates a savepoint during the test execution. I think that would be extremely difficult to do so we can ignore this issue for now... That would leave us with the following tests that theoretically we could implement: 1. migration of `FlinkKafkaProducer` from pre-made 1.7 savepoint to master without pending transactions 2. migration of `FlinkKafkaProducer011` from pre-made 1.7 savepoint to master without pending transactions 3. migration from `FlinkKafkaProducer011` master savepoint to `FlinkKafkaProducer` master without pending transactions 4. (optional) migration from `FlinkKafkaProducer011` pre-made 1.7 savepoint to `FlinkKafkaProducer` master without pending transactions 5. (optional) migration `FlinkKafkaProducer011` -> `FlinkKafkaProducer` from savepoint created on demand (during unit test) from master to master versions, with pending transactions 6. (optional) upgrading Kafka brokers when using `FlinkKafkaProducer` from savepoint created on demand (during unit test) from master to master versions, with pending transactions 1, 2 and 3 are IMO must have. There is a chance that Flink 1.8 will support "stop with savepoint" feature, which would mean that there would be no pending transactions in savepoints. With that guarantee, 1, 2 and 3 would essentially cover all of the required upgrade paths. Story of upgrading from 1.7 `FlinkKafkaProducer011` to 1.8 `FlinkKafkaProducer` would be covered first by step 1, then by step 3. 3, 4 and 5. have the issue that you reported that probably test harness needs to be adjusted 6. will have yet another issue of handling two different. 4, 5 and 6. would be nice to have, but as long as "stop with savepoint" is there, they are not strictly required. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-458604699 I've manage to make the migration test from FlinkKakfaProducer011 --> FlinkKafkaProducer work. I will make the PR for those migration test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11457) PrometheusPushGatewayReporter either overwrites its own metrics or creates too may labels
Oscar Westra van Holthe - Kind created FLINK-11457: -- Summary: PrometheusPushGatewayReporter either overwrites its own metrics or creates too may labels Key: FLINK-11457 URL: https://issues.apache.org/jira/browse/FLINK-11457 Project: Flink Issue Type: Bug Reporter: Oscar Westra van Holthe - Kind When using the PrometheusPushGatewayReporter, one has two options: * Use a fixed job name, which causes the jobmanager and taskmanager to overwrite each others metrics (i.e. last write wins, and you lose a lot of metrics) * Use a random suffix for the job name, which creates a lot of labels that have to be cleaned up manually The manual cleanup should not be necessary, but happens nonetheless when using a yarn cluster. A fix could be to add a suffix the job name, naming the nodes in a non-random manner like: {{myjob_jm0}}, {{my_job_tm1}}, {{my_job_tm1}}, {{my_job_tm2}}, {{my_job_tm3}}, {{my_job_tm4}}, ..., using a counter (not sure if such is available), or some other stable (!) suffix. Related discussion: FLINK-9187 Any thoughts on a solution? I'm happy to implement it, but Im not sure what the best solution would be. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11456) Improve window operator with sliding window assigners
Rong Rong created FLINK-11456: - Summary: Improve window operator with sliding window assigners Key: FLINK-11456 URL: https://issues.apache.org/jira/browse/FLINK-11456 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Rong Rong With Slicing and merging operators that exposes the internals of window operators. current sliding window can be improved by eliminating duplicate aggregations or duplicate element insert into multiple panes (e.g. namespaces). The following sliding window operation {code:java} val resultStream: DataStream = inputStream .keyBy("key") .window(SlidingEventTimeWindow.of(Time.seconds(5L), Time.seconds(15L))) .sum("value") {code} can produce job graph equivalent to {code:java} val resultStream: DataStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) .sum("value") .slideOver(Count.of(3)) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11455) Support evictor operations on slicing and merging operators
Rong Rong created FLINK-11455: - Summary: Support evictor operations on slicing and merging operators Key: FLINK-11455 URL: https://issues.apache.org/jira/browse/FLINK-11455 Project: Flink Issue Type: Sub-task Reporter: Rong Rong The original implementation POC of SliceStream and MergeStream does not considere evicting window operations. this support can be further expanded in order to cover multiple timeout duration session windows. See [https://docs.google.com/document/d/1ziVsuW_HQnvJr_4a9yKwx_LEnhVkdlde2Z5l6sx5HlY/edit#heading=h.ihxm3alf3tk0.] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11454) Support MergedStream operation
Rong Rong created FLINK-11454: - Summary: Support MergedStream operation Key: FLINK-11454 URL: https://issues.apache.org/jira/browse/FLINK-11454 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Rong Rong Following SlicedStream, the mergedStream operator merges results from sliced stream and produces windowing results. {code:java} val slicedStream: SlicedStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to combine // tumble results based on discrete // non-overlapping windows. .aggregate(aggFunc) val mergedStream1: MergedStream = slicedStream .slideOver(Time.second(10L)) // combine slice results with same // windowing function, equivalent to // WindowOperator with an aggregate state // and derived aggregate function. val mergedStream2: MergedStream = slicedStream .slideOver(Count.of(5)) .apply(windowFunction) // apply a different window function over // the sliced results.{code} MergedStream are produced by MergeOperator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11453) Support SliceStream with forwardable pane info
[ https://issues.apache.org/jira/browse/FLINK-11453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11453: -- Summary: Support SliceStream with forwardable pane info (was: Support SliceWindow with forwardable pane info) > Support SliceStream with forwardable pane info > --- > > Key: FLINK-11453 > URL: https://issues.apache.org/jira/browse/FLINK-11453 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Rong Rong >Priority: Major > > Support slicing operation that produces slicing: > {code:java} > val slicedStream: SlicedStream = inputStream > .keyBy("key") > .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to > combine >// tumble results based on discrete >// non-overlapping windows. > .aggregate(aggFunc) > {code} > {{SlicedStream}} will produce results that exposes current {{WindowOperator}} > internal state {{InternalAppendingState}}, which can be > later applied with {{WindowFunction}} separately in another operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11453) Support SliceWindow with forwardable pane info
Rong Rong created FLINK-11453: - Summary: Support SliceWindow with forwardable pane info Key: FLINK-11453 URL: https://issues.apache.org/jira/browse/FLINK-11453 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Rong Rong Support slicing operation that produces slicing: {code:java} val slicedStream: SlicedStream = inputStream .keyBy("key") .sliceWindow(Time.seconds(5L)) // new “slice window” concept: to combine // tumble results based on discrete // non-overlapping windows. .aggregate(aggFunc) {code} {{SlicedStream}} will produce results that exposes current {{WindowOperator}} internal state {{InternalAppendingState}}, which can be later applied with {{WindowFunction}} separately in another operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre…
TisonKun commented on issue #7570: [FLINK-11422] Prefer testing class to mock StreamTask in AbstractStre… URL: https://github.com/apache/flink/pull/7570#issuecomment-458574322 @pnowojski thanks for your advice! Address your comments. Also extract `MockStreamStatusMaintainer` to deduplicate codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11266) Only release hadoop-free Flink
[ https://issues.apache.org/jira/browse/FLINK-11266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-11266. Resolution: Fixed Release Note: Convenience binaries that include hadoop are no longer released. If a deployment relies on flink-shaded-hadoop2 being included in flink-dist, then it must be manually downloaded and copied into the /lib directory. Alternatively, a Flink distribution that includes hadoop can be built by packaging flink-dist and activating the "include-hadoop" maven profile. As hadoop is no longer included in flink-dist by default, specifying "-DwithoutHadoop" when packaging flink-dist no longer impacts the build. > Only release hadoop-free Flink > -- > > Key: FLINK-11266 > URL: https://issues.apache.org/jira/browse/FLINK-11266 > Project: Flink > Issue Type: Improvement > Components: Release System >Affects Versions: 1.8.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.8.0 > > > Currently we release 10 different binary releases (2 scala versions * (4 > hadoop version + hadoop-free)), which has increased the size of our release > to more than 2 GB. > Naturally, building Flink 10 times also takes a while, slowing down the > release process. > However, the only difference between the hadoop versions is the bundled > {{flink-shaded-hadoop2}} jar; the rest is completely identical. > I propose to stop releasing hadoop-specific distributions, and instead have > us release multiple versions of {{flink-shaded-hadoop2}} that users copy into > the hadoop-free distribution if required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest URL: https://github.com/apache/flink/pull/7525#discussion_r251869623 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java ## @@ -16,125 +16,97 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskmanager; +package org.apache.flink.runtime.taskexecutor; import net.jcip.annotations.NotThreadSafe; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.IOUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import scala.Tuple2; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URI; -import java.util.Iterator; import static org.junit.Assert.*; /** - * Validates that the TaskManager startup properly obeys the configuration + * Validates that the TaskManagerRunner startup properly obeys the configuration * values. * * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run in parallel to other * tests in the same JVM as it modifies a static (private) member of the {@link FileSystem} class * and verifies its content. */ @NotThreadSafe -public class TaskManagerConfigurationTest { +public class TaskManagerRunnerConfigurationTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testUsePreconfiguredNetworkInterface() throws Exception { + public void testUsePreconfiguredRpcService() throws Exception { final String TEST_HOST_NAME = "testhostname"; Configuration config = new Configuration(); config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.directExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - try { - - Tuple2> address = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices); - - // validate the configured test host name - assertEquals(TEST_HOST_NAME, address._1()); - } finally { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - public void testActorSystemPortConfig() throws Exception { - // config with pre-configured hostname to speed up tests (no interface selection) - Configuration config = new Configuration(); - config.setString(TaskManagerOptions.HOST, "localhost"); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); try { // auto port - Iterator portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2(); - assertTrue(portsIter.hasNext()); - assertEquals(0, (int) portsIter.next()); + RpcService rpcService = TaskManagerRunner.createRpcService(config, highAvailabilityServices); + assertTrue(rpcService.getPort() >= 0); + // pre-defined host name + assertEquals(TEST_HOST_NAME, rpcService.getAddress()); // pre-defined port final int testPort = 22551; config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort)); - -
[jira] [Comment Edited] (FLINK-10867) Add a DataSet-based CacheOperator to reuse results between jobs
[ https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710007#comment-16710007 ] Miguel E. Coimbra edited comment on FLINK-10867 at 1/29/19 2:58 PM: [~StephanEwen], I've been thinking how to go about this on my [own fork of Apache Flink|https://github.com/mcoimbra/flink] on GitHub. After having invested quite some time in the last two years using this platform and opening bug issues now and then, I'm interested in contributing something which I believe would be of use to the community A complex task such as this would likely require a design document or something along those lines. However, what I've done so far was to read the Flink source code to understand what is necessary to change by tracing what the information flow would be, considering key moments: * Programmer invoking the {{.cache()}} function; * {{JobManager}} receiving a plan with a caching operator (the operator does not have a reference to a previous job at this time); * {{TaskManager}}(s) receiving indication that the {{org.apache.flink.core.memory.MemorySegment }}instances associated to the caching operator are to be kept as they are (in the job where caching will occur, their data must have originated from a previous operator which produced the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work like a regular {{DataSink}}, but instead of writing data, its action is to not evict the segments; * {{JobManager}} receiving a plan with a caching operator referencing previously-cached data. Additional behavior properties: * Adding a parameter to decide how long the cached data is to be stored in the cluster. Number of jobs that the cached data should be persisted or amount of time? What would be desirable? * Would this imply that caching may only occur when executing in session mode so that the Flink job manager knows that it is caching specifically for a user? * The {{org.apache.flink.runtime.executiongraph.ExecutionJobVertex}} instances that depend on cached datasets could conceptually read their input from {{MemorySegment}} instances with the same logic as if reading from disk. * Create a new COMPLETED_AND_CACHED job status to make this concept explicit as far as job management is concerned? Besides the DataSet API (this part is the easier one), I've been thinking and perhaps it would be best to define an {{org.apache.flink.api.java.operators.MemorySinkOperator }}class to explicitly hold a reference to the previous job where caching occurred. The {{org.apache.flink.api.java.ExecutionEnvironment}} instance could note the references to this {{MemorySinkOperator}} instance and store in them the cluster job identification as an attribute. That way, when the client-side {{.execute()}} call finishes successfully, it would store the reference there so that the {{MemorySinkOperator}} operator reference can be reused in the next Flink job triggered by the user-level code. The major packages for this functionality are: * {{org.apache.flink.optimizer}} * {{org.apache.flink.runtime}} * {{org.apache.flink.java:}} * {{org.apache.flink.client}} was (Author: mcoimbra): [~StephanEwen], I've been thinking how to go about this on my [own fork of Apache Flink|https://github.com/mcoimbra/flink] on GitHub. After having invested quite some time in the last two years using this platform and opening bug issues now and then, I'm interested in contributing something which I believe would be of use to the community A complex task such as this would likely require a design document or something along those lines. However, what I've done so far was to read the Flink source code to understand what is necessary to change by tracing what the information flow would be, considering key moments: * Programmer invoking the {{.cache()}} function; * {{JobManager}} receiving a plan with a caching operator (the operator does not have a reference to a previous job at this time); * {{TaskManager}}(s) receiving indication that the {{org.apache.flink.core.memory.MemorySegment }}instances associated to the caching operator are to be kept as they are (in the job where caching will occur, their data must have originated from a previous operator which produced the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work like a regular {{DataSink}}, but instead of writing data, its action is to not evict the segments; * {{JobManager}} receiving a plan with a caching operator referencing previously-cached data. Additional behavior properties: * Adding a parameter to decide how long the cached data is to be stored in the cluster. Number of jobs that the cached data should be persisted or amount of time? What would be desirable? * Would this imply that caching may only occur when executing in session mode so that the