[GitHub] flinkbot commented on issue #7872: [FLINK-11420][core] Fixed duplicate method of TraversableSerializer
flinkbot commented on issue #7872: [FLINK-11420][core] Fixed duplicate method of TraversableSerializer URL: https://github.com/apache/flink/pull/7872#issuecomment-468577664 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys opened a new pull request #7872: [FLINK-11420][core] Fixed duplicate method of TraversableSerializer
dawidwys opened a new pull request #7872: [FLINK-11420][core] Fixed duplicate method of TraversableSerializer URL: https://github.com/apache/flink/pull/7872 ## What is the purpose of the change The duplicate method of TypeSerializer used an equality check rather than reference check of the element serializer to decide if we need a deep copy. This commit uses proper reference comparison. ## Brief change log *(for example:)* - enabled additional tests in SerializerTestInstance - fixed duplicate method of TraversableSerializer ## Verifying this change * enabled additional test (including `duplicate` method test in `SerializerTestInstance` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicabl**e / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9685) Flink should support hostname-substitution for security.kerberos.login.principal
[ https://issues.apache.org/jira/browse/FLINK-9685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-9685: -- Component/s: Runtime / Coordination > Flink should support hostname-substitution for > security.kerberos.login.principal > > > Key: FLINK-9685 > URL: https://issues.apache.org/jira/browse/FLINK-9685 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Ethan Li >Assignee: Aleksandr Salatich >Priority: Major > Labels: pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java#L83] > > We can have something like this > {code:java} > String rawPrincipal = > flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); > if (rawPrincipal != null) { >try { > rawPrincipal = rawPrincipal.replace("HOSTNAME", > InetAddress.getLocalHost().getCanonicalHostName()); >} catch (UnknownHostException e) { > LOG.error("Failed to replace HOSTNAME with localhost because {}", e); >} > } > this.principal = rawPrincipal; > {code} > So it will be easier to deploy flink to cluster. Instead of setting different > principal on every node, we can have the same principal > headless_user/HOSTNAME@DOMAIN . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11783) Deadlock during Join operation
[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-11783: --- Component/s: API / DataSet > Deadlock during Join operation > -- > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.7.2 >Reporter: Julien Nioche >Priority: Major > Attachments: flink_is_stuck.png > > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 > tid=0x7faa5c01b000 nid=0x248b waiting on condition [0x7fa9d14d4000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007b8e0eb50> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 > tid=0x7faa5c019800 nid=0x248a waiting on condition [0x7fa981df6000]}} > {{ java.lang.Thread.State:
[GitHub] libenchao commented on issue #7867: [hotfix][datastream] Fix typos in DataStream
libenchao commented on issue #7867: [hotfix][datastream] Fix typos in DataStream URL: https://github.com/apache/flink/pull/7867#issuecomment-468576409 Thanks for the review @zentol , will do the merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()
[ https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4387: -- Release Note: (was: Thank you for your assessment. I've closed it.) Thank you for your assessment. I've closed it. > Instability in KvStateClientTest.testClientServerIntegration() > -- > > Key: FLINK-4387 > URL: https://issues.apache.org/jira/browse/FLINK-4387 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State >Affects Versions: 1.1.0, 1.5.0, 1.6.0 >Reporter: Robert Metzger >Assignee: Nico Kruber >Priority: Major > Labels: test-stability > Fix For: 1.2.0, 1.8.0 > > > According to this log: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt > the {{KvStateClientTest}} didn't complete. > {code} > "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() > [0x7fb2bcb3b000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at java.lang.Object.wait(Object.java:502) > at > io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254) > - locked <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32) > at > org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185) > at > org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} > and > {code} > Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError > at > io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83) > at > io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110) > at > io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95) > at > io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()
[ https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-4387. - Resolution: Fixed Release Note: Thank you for your assessment. I've closed it. > Instability in KvStateClientTest.testClientServerIntegration() > -- > > Key: FLINK-4387 > URL: https://issues.apache.org/jira/browse/FLINK-4387 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State >Affects Versions: 1.1.0, 1.5.0, 1.6.0 >Reporter: Robert Metzger >Assignee: Nico Kruber >Priority: Major > Labels: test-stability > Fix For: 1.8.0, 1.2.0 > > > According to this log: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt > the {{KvStateClientTest}} didn't complete. > {code} > "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() > [0x7fb2bcb3b000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at java.lang.Object.wait(Object.java:502) > at > io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254) > - locked <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32) > at > org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185) > at > org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} > and > {code} > Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError > at > io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83) > at > io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110) > at > io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95) > at > io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()
[ https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781394#comment-16781394 ] Ufuk Celebi commented on FLINK-4387: I think as [~yunta] says, this should have been fixed when Netty was upgraded. Furthermore, the referenced test class does not exist anymore. [~rmetzger] I think we can close this. > Instability in KvStateClientTest.testClientServerIntegration() > -- > > Key: FLINK-4387 > URL: https://issues.apache.org/jira/browse/FLINK-4387 > Project: Flink > Issue Type: Bug > Components: Runtime / Queryable State >Affects Versions: 1.1.0, 1.5.0, 1.6.0 >Reporter: Robert Metzger >Assignee: Nico Kruber >Priority: Major > Labels: test-stability > Fix For: 1.2.0, 1.8.0 > > > According to this log: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt > the {{KvStateClientTest}} didn't complete. > {code} > "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() > [0x7fb2bcb3b000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at java.lang.Object.wait(Object.java:502) > at > io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254) > - locked <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32) > at > org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185) > at > org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} > and > {code} > Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError > at > io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83) > at > io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110) > at > io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95) > at > io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-1833) Refactor partition availability notification in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-1833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-1833. -- Resolution: Won't Do > Refactor partition availability notification in ExecutionGraph > -- > > Key: FLINK-1833 > URL: https://issues.apache.org/jira/browse/FLINK-1833 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Priority: Major > > The mechanism to notify the JobManager about available result partitions is > hard to understand. The are two parts to this: > 1) JobManager > - The deployment of receivers happens in the Execution class although it is > by now totally unrelated to the state of a specific execution. I propose to > move this to the respective IntermediateResultPartition. > - The deployment information for a receiver is spread across different > components: when creating the TaskDeploymentDescriptor and the "caching" of > partition infos at the consuming vertex. This is very hard to follow and > results in unnecessary messages being sent (which are discarded at the TM). > 2) TaskManager > - Pipelined results notify where you would expect it in the ResultPartition, > but blocking results don't have an extra message and are implicitly > piggy-backed to the final state transition, after which the job manager > deploys receivers if all blocking partitions of a result have been produced. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11533) Retrieve job class name from JAR manifest in ClassPathJobGraphRetriever
[ https://issues.apache.org/jira/browse/FLINK-11533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-11533: Fix Version/s: 1.9.0 > Retrieve job class name from JAR manifest in ClassPathJobGraphRetriever > --- > > Key: FLINK-11533 > URL: https://issues.apache.org/jira/browse/FLINK-11533 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Users running job clusters distribute their user code as part of the shared > classpath of all cluster components. We currently require users running > {{StandaloneClusterEntryPoint}} to manually specify the job class name. JAR > manifest entries that specify the main class of a JAR are ignored since they > are simply part of the classpath. > I propose to add another optional command line argument to the > {{StandaloneClusterEntryPoint}} that specifies the location of a JAR file > (such as {{lib/usercode.jar}}) and whose Manifest is respected. > Arguments: > {code} > --job-jar > --job-classname name > {code} > Each argument is optional, but at least one of the two is required. The > job-classname has precedence over job-jar. > Implementation wise we should be able to simply create the PackagedProgram > from the jar file path in ClassPathJobGraphRetriever. > If there is agreement to have this feature, I would provide the > implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] uce merged pull request #7717: [FLINK-11533] [container] Add option parse JAR manifest for jobClassName
uce merged pull request #7717: [FLINK-11533] [container] Add option parse JAR manifest for jobClassName URL: https://github.com/apache/flink/pull/7717 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7867: [hotfix][datastream] Fix typos in DataStream
zentol commented on issue #7867: [hotfix][datastream] Fix typos in DataStream URL: https://github.com/apache/flink/pull/7867#issuecomment-468570565 please merge all of your PRs into a single one. #7867, #7868, #7869, #7870, #7871 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] JingsongLi commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
JingsongLi commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261499804 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.BinaryStringUtil; +import org.apache.flink.table.util.SegmentsUtil; + +import java.util.Arrays; + +/** + * Use the special format to write data to a {@link MemorySegment} (its capacity grows + * automatically). + * + * If write a format binary: + * 1. New a writer. + * 2. Write each field by writeXX or setNullAt. (Variable length fields can not be written + * repeatedly.) + * 3. Invoke {@link #complete()}. + * + * If want to reuse this writer, please invoke {@link #reset()} first. + */ +public abstract class BinaryWriter { Review comment: BinaryArray has fix length part too. The difference is only the fix length part of BinaryRow must be in a single MemorySegment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11335) Kafka consumer can not commit offset at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781360#comment-16781360 ] Dawid Wysakowicz commented on FLINK-11335: -- Unfortunately the {{repeated.log}} is not very helpful it does not show when was the checkpoint completed and if there were any exceptions in between. Could you share full logs(not just a slices) showing the problem? Some other suggestions what you could check: * kafka offsets comitted to kafka (not via flink) * connection between kafka and task manager nodes * increase the checkpoint interval to see if it is not the case that committing checkpoints take just too much time I would also recommend posting this on the user mailling list, maybe somebody had similar problem before. I really don't believe there is anything wrong with Flink, but is rather a problem with some configuration of your environment. If you still think there is some bug with Flink, could you provide us with some minimal reproducible example, which we could use to verify this bug? > Kafka consumer can not commit offset at checkpoint > -- > > Key: FLINK-11335 > URL: https://issues.apache.org/jira/browse/FLINK-11335 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.2 > Environment: AWS EMR 5.20: hadoop, flink plugin > flink: 1.62 > run under yarn-cluster > Kafka cluster: 1.0 > >Reporter: andy hoang >Priority: Critical > Attachments: repeated.log > > > When trying to commit offset to kafka, I always get warning > {noformat} > 2019-01-15 11:18:55,405 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka takes longer than the checkpoint interval. > Skipping commit of previous offsets because newer complete checkpoint offsets > are available. This does not compromise Flink's checkpoint integrity. > {noformat} > The result is not commiting any message to kafka > The code was simplified be remove business > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint")) > env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > > env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > val properties = new Properties() > properties.setProperty("group.id", "my_groupid") > //properties.setProperty("enable.auto.commit", "false") > val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic", > new JSONKeyValueDeserializationSchema(true), > > properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true) > val stream = env.addSource(consumer) > > stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), > (Int, ujson.Value)]] { > override def map(node:ObjectNode): scala.Either[(Exception, > ObjectNode), (Int, ujson.Value)] = { > logger.info("## > %s".format(node.get("metadata").toString)) > Thread.sleep(3000) > return Right(200, writeJs(node.toString)) > } > }).print() > env.execute("pp_convoy_flink") > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10585) RestServerEndpoint responds with wrong Content-Type in Integration Test.
[ https://issues.apache.org/jira/browse/FLINK-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-10585. Resolution: Fixed Fixed via 1.8: {{a6902cef6281f06bc54592d9a3bd6e4362dd0dc3}} {{cd8857ba20e725a2c1b226488e9c88459674acc8}} {{eed09094a7d241ecede75034b42a54245027145c}} {{6d29ea2e11011596fbe7deaf0727184beb62c5c6}} master: {{eb58bf7113267da35b021a3682ca3966b74ddf84}} {{33e0ca7d7c052dde404f0cabc4e514a7be284501}} {{9b66985fe0589b7b8239f3c3d48eb8719ec6e8b2}} {{f8d0855949c24c8c283d1ace6350175d837d9cd2}} > RestServerEndpoint responds with wrong Content-Type in Integration Test. > > > Key: FLINK-10585 > URL: https://issues.apache.org/jira/browse/FLINK-10585 > Project: Flink > Issue Type: Bug > Components: Runtime / REST, Tests >Affects Versions: 1.6.1, 1.7.0 > Environment: *Rev:* 3566bb8987872b29aa88b8f7d5b0f122e74bd518 > *OS:* macOS High Sierra 10.13.6 (17G65) > *Maven:* 3.2.5 > *Java:* > openjdk version "1.8.0_181" > OpenJDK Runtime Environment (Zulu 8.31.0.1-macosx) (build 1.8.0_181-b02) > OpenJDK 64-Bit Server VM (Zulu 8.31.0.1-macosx) (build 25.181-b02, mixed mode) >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Attachments: loop_test.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > Running the test {{RestServerEndpointITCase#testRequestInterleaving}} in a > loop may trigger the HTTP server to send a response with {{Content-Type: > application/octet-stream}}, which causes the test to fail. The expected > {{Content-Type}} is {{application/json}}. Note that the REST handler used for > testing, can only return json responses. The failure can likely be triggered > for other tests inside {{RestServerEndpointITCase}} as well. The behavior has > not been observed on Linux so far. > To run the test in a loop, apply the git patch in the attachment, and execute > the following command: > {code} > mvn clean integration-test -pl flink-runtime -am > -Dtest=RestServerEndpointITCase -Dfast -DfailIfNoTests=false > -Dsurefire.skipAfterFailureCount=1 -Dlog.dir=/path/to/log-dir > -Dlog4j.configuration=file:///path/to/flink/tools/log4j-travis.properties > {code} > After a while you may see the following stacktrace in the test's log file: > {noformat} > > 15:25:45,619 INFO org.apache.flink.runtime.rest.RestServerEndpointITCase >- > > Test > testRequestInterleaving[5253](org.apache.flink.runtime.rest.RestServerEndpointITCase) > is running. > > 15:25:45,620 WARN > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Upload directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload > does not exist, or has been deleted externally. Previously uploaded files > are no longer available. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Created directory > /private/var/folders/43/ghpk0br14m99tnl9b86lwfsmgn/T/junit7127204190502548819/junit6384422818182277795/flink-web-upload > for file uploads. > 15:25:45,620 INFO org.apache.flink.runtime.rest.RestClient >- Rest client endpoint started. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Starting rest endpoint. > 15:25:45,620 INFO > org.apache.flink.runtime.rest.RestServerEndpointITCase$TestRestServerEndpoint > - Rest endpoint listening at localhost:52841 > 15:25:45,631 ERROR org.apache.flink.runtime.rest.RestClient >- Response was not valid JSON. > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, > \t) is allowed between tokens > at [Source: > org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@7823f1cb; > line: 1, column: 2] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:472) > at >
[GitHub] asfgit closed pull request #7839: [FLINK-10585][tests] Fix RestServerEndpointITCase instability
asfgit closed pull request #7839: [FLINK-10585][tests] Fix RestServerEndpointITCase instability URL: https://github.com/apache/flink/pull/7839 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] libenchao opened a new pull request #7871: [hotfix][datastream] Fix wrong javadoc for TwoInputTransformation
libenchao opened a new pull request #7871: [hotfix][datastream] Fix wrong javadoc for TwoInputTransformation URL: https://github.com/apache/flink/pull/7871 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7871: [hotfix][datastream] Fix wrong javadoc for TwoInputTransformation
flinkbot commented on issue #7871: [hotfix][datastream] Fix wrong javadoc for TwoInputTransformation URL: https://github.com/apache/flink/pull/7871#issuecomment-468562164 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7870: [hotfix][datastream] Fix wrong javadoc for OneInputTransformation.getOperator()
flinkbot commented on issue #7870: [hotfix][datastream] Fix wrong javadoc for OneInputTransformation.getOperator() URL: https://github.com/apache/flink/pull/7870#issuecomment-468561090 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] libenchao opened a new pull request #7870: [hotfix][datastream] Fix wrong javadoc for OneInputTransformation.getOperator()
libenchao opened a new pull request #7870: [hotfix][datastream] Fix wrong javadoc for OneInputTransformation.getOperator() URL: https://github.com/apache/flink/pull/7870 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7869: [hotfix][datastream] Fix typo in WindowedStream
flinkbot commented on issue #7869: [hotfix][datastream] Fix typo in WindowedStream URL: https://github.com/apache/flink/pull/7869#issuecomment-468560143 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] libenchao opened a new pull request #7869: [hotfix][datastream] Fix typo in WindowedStream
libenchao opened a new pull request #7869: [hotfix][datastream] Fix typo in WindowedStream URL: https://github.com/apache/flink/pull/7869 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7868: [hotfix][datastream] Fix typo in JoinedStreams
flinkbot commented on issue #7868: [hotfix][datastream] Fix typo in JoinedStreams URL: https://github.com/apache/flink/pull/7868#issuecomment-468559798 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] libenchao opened a new pull request #7868: [hotfix][datastream] Fix typo in JoinedStreams
libenchao opened a new pull request #7868: [hotfix][datastream] Fix typo in JoinedStreams URL: https://github.com/apache/flink/pull/7868 ## What is the purpose of the change *Fix typo* ## Brief change log - *Fix typo* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7867: [hotfix][datastream] Fix typos in DataStream
flinkbot commented on issue #7867: [hotfix][datastream] Fix typos in DataStream URL: https://github.com/apache/flink/pull/7867#issuecomment-468559265 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] libenchao opened a new pull request #7867: [hotfix][datastream] Fix typos in DataStream
libenchao opened a new pull request #7867: [hotfix][datastream] Fix typos in DataStream URL: https://github.com/apache/flink/pull/7867 ## What is the purpose of the change *Fix typos and outdated javadocs in DataStream* ## Brief change log - *Fix a few typos* - *Fix outdated javadoc for `windowAll`* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] godfreyhe commented on a change in pull request #7642: [FLINK-11516][table] Port and move some Descriptor classes to flink-table-common
godfreyhe commented on a change in pull request #7642: [FLINK-11516][table] Port and move some Descriptor classes to flink-table-common URL: https://github.com/apache/flink/pull/7642#discussion_r261476403 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Column statistics. + */ +@PublicEvolving +public final class ColumnStats { + + /** +* number of distinct values. +*/ + private final Long ndv; + + /** +* number of nulls. +*/ + private final Long nullCount; + + /** +* average length of column values. +*/ + private final Double avgLen; + + /** +* max length of column values. +*/ + private final Integer maxLen; + + /** +* max value of column values. +*/ + private final Number max; + + /** +* min value of column values. +*/ + private final Number min; + + public ColumnStats( + Long ndv, + Long nullCount, + Double avgLen, + Integer maxLen, + Number max, + Number min) { + this.ndv = ndv; + this.nullCount = nullCount; + this.avgLen = avgLen; + this.maxLen = maxLen; + this.max = max; + this.min = min; + } + + public Long getNdv() { + return ndv; + } + + public Long getNullCount() { + return nullCount; + } + + public Double getAvgLen() { + return avgLen; + } + + public Integer getMaxLen() { + return maxLen; + } + + public Number getMaxValue() { + return max; + } + + public Number getMinValue() { + return min; + } + + public String toString() { + List columnStats = new ArrayList<>(); + if (ndv != null) { + columnStats.add("ndv=" + ndv); + } + if (nullCount != null) { + columnStats.add("nullCount=" + nullCount); + } + if (avgLen != null) { + columnStats.add("avgLen=" + avgLen); + } + if (maxLen != null) { + columnStats.add("maxLen=" + maxLen); + } + if (max != null) { + columnStats.add("max=" + max); + } + if (min != null) { + columnStats.add("min=" + min); + } + String columnStatsStr = columnStats.stream().collect(Collectors.joining(", ")); Review comment: uses `String.join(", ", columnStats)` instead of `columnStats.stream().collect(Collectors.joining(", "))` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] godfreyhe commented on a change in pull request #7642: [FLINK-11516][table] Port and move some Descriptor classes to flink-table-common
godfreyhe commented on a change in pull request #7642: [FLINK-11516][table] Port and move some Descriptor classes to flink-table-common URL: https://github.com/apache/flink/pull/7642#discussion_r261487835 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.plan.stats.ColumnStats; + +import java.util.HashMap; +import java.util.Map; + +/** + * Validator for {@link Statistics}. + */ +@Internal +public class StatisticsValidator implements DescriptorValidator { + + public static final String STATISTICS_PROPERTY_VERSION = "statistics.property-version"; + public static final String STATISTICS_ROW_COUNT = "statistics.row-count"; + public static final String STATISTICS_COLUMNS = "statistics.columns"; + + // per column properties + public static final String NAME = "name"; + public static final String DISTINCT_COUNT = "distinct-count"; + public static final String NULL_COUNT = "null-count"; + public static final String AVG_LENGTH = "avg-length"; + public static final String MAX_LENGTH = "max-length"; + public static final String MAX_VALUE = "max-value"; + public static final String MIN_VALUE = "min-value"; + + @Override + public void validate(DescriptorProperties properties) { + properties.validateInt(STATISTICS_PROPERTY_VERSION, true, 0, Integer.MAX_VALUE); + properties.validateLong(STATISTICS_ROW_COUNT, true, 0); + validateColumnStats(properties, STATISTICS_COLUMNS); + } + + // utilities + + public static Map normalizeColumnStats(ColumnStats columnStats) { + Map stats = new HashMap<>(); + if (columnStats.getNdv() != null) { + stats.put(DISTINCT_COUNT, String.valueOf(columnStats.getNdv())); + } + if (columnStats.getNullCount() != null) { + stats.put(NULL_COUNT, String.valueOf(columnStats.getNullCount())); + } + if (columnStats.getAvgLen() != null) { + stats.put(AVG_LENGTH, String.valueOf(columnStats.getAvgLen())); + } + if (columnStats.getMaxLen() != null) { + stats.put(MAX_LENGTH, String.valueOf(columnStats.getMaxLen())); + } + if (columnStats.getMaxValue() != null) { + stats.put(MAX_VALUE, String.valueOf(columnStats.getMaxValue())); + } + if (columnStats.getMinValue() != null) { + stats.put(MIN_VALUE, String.valueOf(columnStats.getMinValue())); + } + return stats; + } + + public static void validateColumnStats(DescriptorProperties properties, String key) { + + // filter for number of columns + int columnCount = properties.getIndexedProperty(key, NAME).size(); + + for (int i = 0; i < columnCount; i++) { + properties.validateString(key + "." + i + "." + NAME, false, 1); + properties.validateLong(key + "." + i + "." + DISTINCT_COUNT, true, 0L); + properties.validateLong(key + "." + i + "." + NULL_COUNT, true, 0L); + properties.validateDouble(key + "." + i + "." + AVG_LENGTH, true, 0.0); + properties.validateInt(key + "." + i + "." + MAX_LENGTH, true, 0); + properties.validateDouble(key + "." + i + "." + MAX_VALUE, true, 0.0); + properties.validateDouble(key + "." + i + "." + MIN_VALUE, true, 0.0); + } + } + + public static Map readColumnStats(DescriptorProperties properties, String key) { + + // filter for number of columns + int columnCount = properties.getIndexedProperty(key, NAME).size(); + + Map stats = new HashMap<>();
[GitHub] godfreyhe commented on a change in pull request #7642: [FLINK-11516][table] Port and move some Descriptor classes to flink-table-common
godfreyhe commented on a change in pull request #7642: [FLINK-11516][table] Port and move some Descriptor classes to flink-table-common URL: https://github.com/apache/flink/pull/7642#discussion_r261485410 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.plan.stats.ColumnStats; + +import java.util.HashMap; +import java.util.Map; + +/** + * Validator for {@link Statistics}. + */ +@Internal +public class StatisticsValidator implements DescriptorValidator { + + public static final String STATISTICS_PROPERTY_VERSION = "statistics.property-version"; + public static final String STATISTICS_ROW_COUNT = "statistics.row-count"; + public static final String STATISTICS_COLUMNS = "statistics.columns"; + + // per column properties + public static final String NAME = "name"; + public static final String DISTINCT_COUNT = "distinct-count"; + public static final String NULL_COUNT = "null-count"; + public static final String AVG_LENGTH = "avg-length"; + public static final String MAX_LENGTH = "max-length"; + public static final String MAX_VALUE = "max-value"; + public static final String MIN_VALUE = "min-value"; + + @Override + public void validate(DescriptorProperties properties) { + properties.validateInt(STATISTICS_PROPERTY_VERSION, true, 0, Integer.MAX_VALUE); + properties.validateLong(STATISTICS_ROW_COUNT, true, 0); + validateColumnStats(properties, STATISTICS_COLUMNS); + } + + // utilities + + public static Map normalizeColumnStats(ColumnStats columnStats) { + Map stats = new HashMap<>(); + if (columnStats.getNdv() != null) { + stats.put(DISTINCT_COUNT, String.valueOf(columnStats.getNdv())); + } + if (columnStats.getNullCount() != null) { + stats.put(NULL_COUNT, String.valueOf(columnStats.getNullCount())); + } + if (columnStats.getAvgLen() != null) { + stats.put(AVG_LENGTH, String.valueOf(columnStats.getAvgLen())); + } + if (columnStats.getMaxLen() != null) { + stats.put(MAX_LENGTH, String.valueOf(columnStats.getMaxLen())); + } + if (columnStats.getMaxValue() != null) { + stats.put(MAX_VALUE, String.valueOf(columnStats.getMaxValue())); + } + if (columnStats.getMinValue() != null) { + stats.put(MIN_VALUE, String.valueOf(columnStats.getMinValue())); + } + return stats; + } + + public static void validateColumnStats(DescriptorProperties properties, String key) { + + // filter for number of columns + int columnCount = properties.getIndexedProperty(key, NAME).size(); + + for (int i = 0; i < columnCount; i++) { + properties.validateString(key + "." + i + "." + NAME, false, 1); + properties.validateLong(key + "." + i + "." + DISTINCT_COUNT, true, 0L); + properties.validateLong(key + "." + i + "." + NULL_COUNT, true, 0L); + properties.validateDouble(key + "." + i + "." + AVG_LENGTH, true, 0.0); + properties.validateInt(key + "." + i + "." + MAX_LENGTH, true, 0); + properties.validateDouble(key + "." + i + "." + MAX_VALUE, true, 0.0); + properties.validateDouble(key + "." + i + "." + MIN_VALUE, true, 0.0); Review comment: 1. negative number is also valid for `max` or `min` here. 2. `BigDecimal` or `BigInteger` inherit from `Number`, so should use `validateBigDecimal` instead of `validateDouble` here 3. should validate that `max` is greater than or equal to `min`
[GitHub] zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#issuecomment-468555786 @tillrohrmann also mentioned this issue in discussion of proposed shuffle manager. The life cycle of `Task` and `ResultPartition` should be decoupled, and both of them would occupy slot resources. If `Task` finishes but `ResultPartition` is not consumed completely, then the slot resource should not be released which results in active `TaskExecutor`. Regarding with when to release `ResultPartition`, I think it might have three levels: First level: Up to `ResultPartition` itself like current way, triggered by finishing data transport on producer side. Second level: Up to consumer side, triggered by finish processing all the data on consumer side. It can avoid restarting the producer to re-produce data in some scenarios if consumer fails during processing. Third level: Up to `ShuffleMaster` side as proposed in `ShuffleManager`. `ShuffleMaster` is used for managing partitions globally. Even though the partition is consumed completely by downstream side, `ShuffleMaster` can still decide not to release it or delay release it for other concerns. So I think the third level has the mechanism to support all kinds of possibilities. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11560) Translate "Flink Applications" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YE QIAN reassigned FLINK-11560: --- Assignee: YE QIAN (was: xueyu) > Translate "Flink Applications" page into Chinese > > > Key: FLINK-11560 > URL: https://issues.apache.org/jira/browse/FLINK-11560 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: YE QIAN >Priority: Major > > Translate "Flink Applications" page into Chinese. > The markdown file is located in: flink-web/flink-applications.zh.md > The url link is: https://flink.apache.org/zh/flink-applications.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10388) RestClientTest sometimes fails with AssertionError
[ https://issues.apache.org/jira/browse/FLINK-10388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781312#comment-16781312 ] Chunhui Shi commented on FLINK-10388: - Since this could fail in a valid network environment ( I am seeing the same test failure except that I got a different exception "ConnectionClosedException"), I would suggest us ignore this test case for now. > RestClientTest sometimes fails with AssertionError > -- > > Key: FLINK-10388 > URL: https://issues.apache.org/jira/browse/FLINK-10388 > Project: Flink > Issue Type: Test > Components: Runtime / Web Frontend >Reporter: Ted Yu >Priority: Minor > > Running the test on Linux I got: > {code} > testConnectionTimeout(org.apache.flink.runtime.rest.RestClientTest) Time > elapsed: 1.918 sec <<< FAILURE! > java.lang.AssertionError: > Expected: an instance of > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException > but: > Network is unreachable: /10.255.255.1:80> is a > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedSocketException > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.junit.Assert.assertThat(Assert.java:956) > at org.junit.Assert.assertThat(Assert.java:923) > at > org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:69) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11560) Translate "Flink Applications" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781311#comment-16781311 ] Stephen commented on FLINK-11560: - I'll start the translation in the next few days. > Translate "Flink Applications" page into Chinese > > > Key: FLINK-11560 > URL: https://issues.apache.org/jira/browse/FLINK-11560 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Stephen >Priority: Major > > Translate "Flink Applications" page into Chinese. > The markdown file is located in: flink-web/flink-applications.zh.md > The url link is: https://flink.apache.org/zh/flink-applications.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10658) org.apache.flink.util.FlinkException: Releasing shared slot parent.
[ https://issues.apache.org/jira/browse/FLINK-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781305#comment-16781305 ] Xingbin Sun commented on FLINK-10658: - I have two jobs, jobA and jobB. When I cancel jobA, jobB throws this exception. Is this a bug or something? Version 1.6.0 > org.apache.flink.util.FlinkException: Releasing shared slot parent. > --- > > Key: FLINK-10658 > URL: https://issues.apache.org/jira/browse/FLINK-10658 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.5.4 >Reporter: chauncy >Priority: Major > > i don't when throw the exception who tell me ? thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11764) Update BucketingSinkMigrationTest for Flink 1.8
[ https://issues.apache.org/jira/browse/FLINK-11764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11764: - Component/s: Connectors / FileSystem > Update BucketingSinkMigrationTest for Flink 1.8 > --- > > Key: FLINK-11764 > URL: https://issues.apache.org/jira/browse/FLINK-11764 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{BucketingSinkMigrationTest}} so that it covers restoring from 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11767: - Component/s: API / Type Serialization System > Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8 > - > > Key: FLINK-11767 > URL: https://issues.apache.org/jira/browse/FLINK-11767 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover > restoring from Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11756) Update migration tests for Flink 1.8
[ https://issues.apache.org/jira/browse/FLINK-11756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781290#comment-16781290 ] vinoyang commented on FLINK-11756: -- [~rmetzger] I have tried to specify the component for each sub tasks. > Update migration tests for Flink 1.8 > > > Key: FLINK-11756 > URL: https://issues.apache.org/jira/browse/FLINK-11756 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Critical > Fix For: 1.9.0 > > > Once 1.8.0 is release we should update existing migration tests to cover > restoring from 1.8 savepoints. > Each independent migration test will be tracked as a separate sub-task. > All of these sub-tasks will be started *after* releasing Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] desiam commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing
desiam commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing URL: https://github.com/apache/flink/pull/7861#issuecomment-468543014 @tzulitai From https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/_usage.html, you generally need a reference to the low-level rest client when initializing the sniffer, which can be obtained from the high level rest client. Some additional context in [FLINK-11747](https://issues.apache.org/jira/browse/FLINK-11747) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11770) Update FlinkKinesisConsumerMigrationTest for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11770: - Component/s: Connectors / Kinesis > Update FlinkKinesisConsumerMigrationTest for 1.8 > > > Key: FLINK-11770 > URL: https://issues.apache.org/jira/browse/FLINK-11770 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kinesis, Tests >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{FlinkKinesisConsumerMigrationTest}} so that it covers restoring from > 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11768) Update TypeSerializerSnapshotMigrationITCase for Flink 1.8
[ https://issues.apache.org/jira/browse/FLINK-11768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11768: - Component/s: API / Type Serialization System > Update TypeSerializerSnapshotMigrationITCase for Flink 1.8 > -- > > Key: FLINK-11768 > URL: https://issues.apache.org/jira/browse/FLINK-11768 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{TypeSerializerSnapshotMigrationITCase}} to cover restoring from > Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11766) Update Java / Scala StatefulJobSavepointMigrationITCase for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11766: - Component/s: Runtime / Checkpointing > Update Java / Scala StatefulJobSavepointMigrationITCase for 1.8 > --- > > Key: FLINK-11766 > URL: https://issues.apache.org/jira/browse/FLINK-11766 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update Java / Scala {{StatefulJobSavepointMigrationITCase}} so that it covers > recovering from Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11765) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11765: - Component/s: API / DataStream > Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.8 > - > > Key: FLINK-11765 > URL: https://issues.apache.org/jira/browse/FLINK-11765 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update Java / Scala {{StatefulJobWBroadcastStateMigrationITCase}} so that it > covers restoring from Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11763) Update AbstractKeyedOperatorRestoreTestBase for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11763: - Component/s: API / DataStream > Update AbstractKeyedOperatorRestoreTestBase for 1.8 > --- > > Key: FLINK-11763 > URL: https://issues.apache.org/jira/browse/FLINK-11763 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{AbstractKeyedOperatorRestoreTestBase}} and subclasses so that it > covers restoring from 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11761) Update FlinkKafkaConsumerBaseMigrationTest for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11761: - Component/s: Connectors / Kafka > Update FlinkKafkaConsumerBaseMigrationTest for 1.8 > -- > > Key: FLINK-11761 > URL: https://issues.apache.org/jira/browse/FLINK-11761 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{FlinkKafkaConsumerBaseMigrationTest}} so that it covers restoring > from 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11762) Update WindowOperatorMigrationTest for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11762: - Component/s: API / DataStream > Update WindowOperatorMigrationTest for 1.8 > -- > > Key: FLINK-11762 > URL: https://issues.apache.org/jira/browse/FLINK-11762 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{WindowOperatorMigrationTest}} so that it covers restoring from 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11485) Migrate PojoSerializer to use new serialization compatibility abstractions
[ https://issues.apache.org/jira/browse/FLINK-11485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-11485. --- Resolution: Fixed > Migrate PojoSerializer to use new serialization compatibility abstractions > -- > > Key: FLINK-11485 > URL: https://issues.apache.org/jira/browse/FLINK-11485 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 1h > Remaining Estimate: 0h > > This subtask covers migration of the {{PojoSerializer}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11760) Update CEPMigrationTest for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11760: - Component/s: Library / CEP > Update CEPMigrationTest for 1.8 > --- > > Key: FLINK-11760 > URL: https://issues.apache.org/jira/browse/FLINK-11760 > Project: Flink > Issue Type: Sub-task > Components: Library / CEP, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{CEPMigrationTest}} so that it covers restoring from 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11758) Update ContinuousFileProcessingMigrationTest for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-11758: - Component/s: Connectors / FileSystem > Update ContinuousFileProcessingMigrationTest for 1.8 > > > Key: FLINK-11758 > URL: https://issues.apache.org/jira/browse/FLINK-11758 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Update {{ContinuousFileProcessingMigrationTest}} so that it covers restoring > from 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] carp84 commented on issue #7866: [FLINK-11730] [State Backends] Make HeapKeyedStateBackend follow the builder pattern
carp84 commented on issue #7866: [FLINK-11730] [State Backends] Make HeapKeyedStateBackend follow the builder pattern URL: https://github.com/apache/flink/pull/7866#issuecomment-468540097 Note: now restore is included in backend building/constructing process, so we need to make sure each stream constructed in restore could also be closed in case of task cancel, for example the data input stream opened for (de)serialization during restore. This is exposed and noticed in the failure of `InterruptSensitiveRestoreTest` w/ refactoring here. @StefanRRichter IMHO this is also a potential issue for the already done `RocksDBKeyedStateBackend` refactoring, do you think we should make this change a separate PR and include it for 1.8 release? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Tom-Goong commented on a change in pull request #7820: [FLINK-11742][Metrics]Push metrics to Pushgateway without "instance"
Tom-Goong commented on a change in pull request #7820: [FLINK-11742][Metrics]Push metrics to Pushgateway without "instance" URL: https://github.com/apache/flink/pull/7820#discussion_r261476961 ## File path: flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java ## @@ -73,7 +77,7 @@ public void open(MetricConfig config) { @Override public void report() { try { - pushGateway.push(CollectorRegistry.defaultRegistry, jobName); + pushGateway.push(CollectorRegistry.defaultRegistry, jobName, instance); Review comment: First of all, thank you very much for the information provided, which will help me better understand Flink related knowledge. This question depends on how we view the Flink cluster. First, consider the cluster as a black box. JM is the same as "dispatch" in SpringMVC. That whole cluster is a Promethues Job. Second, the JM class is compared to a more feature-rich nginx. Then different jobs correspond to different Promethues Jobs. Third, treat each Flink Job as a microservice cluster. The same function of Task and even Sub-Task is a specific micro-service function, a series of associated micro-services complete the entire business. Because the above two methods need to solve a problem, different TM runs this different Flink Job's task, the same task runs on different TMs. This means that the TM's logo cannot accurately separate the tasks of different Flink Jobs, metrics of the same name still conflict . JM and TM are just the running containers for jobs. So in theory, we need to be precise to the Task and even the Sub-Task level, then use the Job prefix to group them. To solve this problem perfectly, we need a comprehensive understanding of the entire Metric system. Maybe we should close this PR and decide how to deal with it after a thorough discussion in the community. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#issuecomment-468538365 @azagrebin , you pointed out a very good question which I have not thought before. :) Currently app/flink does not provide the explicit ack mechanisms to confirm the received message in network. We only rely on the TCP internal ack mechanisms. For data message (`BufferResponse`) transfer, we add the sequence number mechanism on sender side to verify there are no messages lost in network, otherwise it would cause failover. For other protocol messages (`PartitionRequest`, `CancelRequest`, `CloseRequest`, etc) we have not the other mechanisms to confirm that if the network condition is not reliable. But for `CloseRequest`, I think it would not cause problems currently, because the consumer would close the connection after all, then the producer would be aware of inactive channel to release all the resources. But your reminder makes me thought of potential problems for other messages. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11143) AskTimeoutException is thrown during job submission and completion
[ https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781273#comment-16781273 ] Alex Vinnik commented on FLINK-11143: - [~Tony0421] tried {code:java} $ java -version java version "1.8.0_202" Java(TM) SE Runtime Environment (build 1.8.0_202-b08) Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode){code} still got {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult. at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:235) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at com.sailpoint.ida.data.jobs.peergrouptransform.PeerGroupTransformJob.main(PeerGroupTransformJob.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcherc93ba4e3-4eb1-440a-9e99-5e8a89a2808b#1617967892]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748){noformat} will try upgrading to 1.7.2 and see > AskTimeoutException is thrown during job submission and completion > -- > > Key: FLINK-11143 > URL: https://issues.apache.org/jira/browse/FLINK-11143 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.2 >Reporter: Alex Vinnik >Priority: Major > > For more details please see the thread > [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E] > On submission > 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > On completion > > {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) >
[GitHub] KurtYoung closed pull request #7834: [FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases
KurtYoung closed pull request #7834: [FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases URL: https://github.com/apache/flink/pull/7834 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11715) Add optimize program to organize optimization phases
[ https://issues.apache.org/jira/browse/FLINK-11715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-11715. -- Resolution: Implemented Fix Version/s: 1.9.0 > Add optimize program to organize optimization phases > > > Key: FLINK-11715 > URL: https://issues.apache.org/jira/browse/FLINK-11715 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, Flink organizes the optimization phases by different methods in > Batch(Stream)TableEnvironment#optimize. However this is not easy to extend > especially there are more than ten optimization stages in Blink. On the other > hand, the methods are very similar, except the match order and rule sets for > hep optimization phases, target traits and rule sets for volcano optimization > phases. > Abstracts each optimization stage into a {{FlinkOptimizeProgram}} in Blink, > defined as following: > {code} > /** > * Likes [[org.apache.calcite.tools.Program]], FlinkOptimizeProgram > transforms a relational > * expression into another relational expression. > */ > trait FlinkOptimizeProgram[OC <: OptimizeContext] { > def optimize(input: RelNode, context: OC): RelNode > } > {code} > {{FlinkOptimizeProgram}}'s subclasses include > 1. {{FlinkRuleSetProgram}}, an abstract program can add/remove {{RuleSet}}, > set target traits. > 2. {{FlinkHepRuleSetProgram}}, a subclass of {{FlinkRuleSetProgram}} which > runs with {{HepPlanner}}. > 3. {{FlinkVolcanoProgram}}, a subclass of {{FlinkRuleSetProgram}} which runs > with {{VolcanoPlanner}}. > 4. {{FlinkGroupProgram}}, a program contains a sequence of sub-programs as a > group, programs in the group will be executed in sequence, and the group can > be executed `iterations` times. > .. > {{FlinkChainedPrograms}} is responsible for organizing all the programs, each > program's optimize method will be called in sequence when > {{FlinkChainedPrograms}}#optimize is called. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11715) Add optimize program to organize optimization phases
[ https://issues.apache.org/jira/browse/FLINK-11715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781267#comment-16781267 ] Kurt Young commented on FLINK-11715: implemented in 2328ff3a45b889f5bf2c4e8873944980cd904721 > Add optimize program to organize optimization phases > > > Key: FLINK-11715 > URL: https://issues.apache.org/jira/browse/FLINK-11715 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, Flink organizes the optimization phases by different methods in > Batch(Stream)TableEnvironment#optimize. However this is not easy to extend > especially there are more than ten optimization stages in Blink. On the other > hand, the methods are very similar, except the match order and rule sets for > hep optimization phases, target traits and rule sets for volcano optimization > phases. > Abstracts each optimization stage into a {{FlinkOptimizeProgram}} in Blink, > defined as following: > {code} > /** > * Likes [[org.apache.calcite.tools.Program]], FlinkOptimizeProgram > transforms a relational > * expression into another relational expression. > */ > trait FlinkOptimizeProgram[OC <: OptimizeContext] { > def optimize(input: RelNode, context: OC): RelNode > } > {code} > {{FlinkOptimizeProgram}}'s subclasses include > 1. {{FlinkRuleSetProgram}}, an abstract program can add/remove {{RuleSet}}, > set target traits. > 2. {{FlinkHepRuleSetProgram}}, a subclass of {{FlinkRuleSetProgram}} which > runs with {{HepPlanner}}. > 3. {{FlinkVolcanoProgram}}, a subclass of {{FlinkRuleSetProgram}} which runs > with {{VolcanoPlanner}}. > 4. {{FlinkGroupProgram}}, a program contains a sequence of sub-programs as a > group, programs in the group will be executed in sequence, and the group can > be executed `iterations` times. > .. > {{FlinkChainedPrograms}} is responsible for organizing all the programs, each > program's optimize method will be called in sequence when > {{FlinkChainedPrograms}}#optimize is called. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] KurtYoung commented on issue #7834: [FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases
KurtYoung commented on issue #7834: [FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases URL: https://github.com/apache/flink/pull/7834#issuecomment-468536342 merging this... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261473138 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.BinaryStringUtil; +import org.apache.flink.table.util.SegmentsUtil; + +import java.util.Arrays; + +/** + * Use the special format to write data to a {@link MemorySegment} (its capacity grows + * automatically). + * + * If write a format binary: + * 1. New a writer. + * 2. Write each field by writeXX or setNullAt. (Variable length fields can not be written + * repeatedly.) + * 3. Invoke {@link #complete()}. + * + * If want to reuse this writer, please invoke {@link #reset()} first. + */ +public abstract class BinaryWriter { Review comment: I think we need a better abstraction for `BinaryWriter`. There are 2 issues from my point: 1. We don't have a clear interface for `BinaryWriter`, maybe we should make this `BinaryWriter` an interface, and move some common implementation to `AbstractBinaryWriter` 2. Some implementation here seems like BinaryRow specific, like `writeBytesToFixLenPart`, we shouldn't know what is fix length part and what is variable length part here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured URL: https://github.com/apache/flink/pull/7281#issuecomment-468532245 Since Flink-1.8 is about to release, @StephanEwen @StefanRRichter could anyone take a look at this problem? I submitted the same job with the same configuration (no checkpoint path but HA configured) with released Flink-1.3.2 (still has no such `MemoryStateBackend` creating random checkpoint path code, which should be treated as `old behavior`) and Flink-1.7.2 (already contained that part of code.) As you can see `Flink-1.3.2` would have a blob service folder, a completed checkpoint file and a submitted job graph file. I think this is the `old behavior`. https://user-images.githubusercontent.com/1709104/53614879-67353c80-3c16-11e9-8fac-0dee85b676d4.png;> However, `Flink-1.7.2` would have many checkpoint paths created by `MemoryStateBackend` from task-side, as you could guess, `41a7c8b8e62d81225868d2a5a60846f7` is the actual job-id of this job. These created checkpoint path should actually be useless, and might lead to `MaxDirectoryItemsExceededException` under high availability folder. https://user-images.githubusercontent.com/1709104/53614943-9e0b5280-3c16-11e9-81c4-868c3187a09b.png;> Moreover, as you can see, I don't think this would `keep supporting the old behavior` due to the grate directory structure difference. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] godfreyhe commented on a change in pull request #7834: [FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases
godfreyhe commented on a change in pull request #7834: [FLINK-11715][table-planner-blink] Add optimize program to organize optimization phases URL: https://github.com/apache/flink/pull/7834#discussion_r261471649 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/OptimizeContext.scala ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.optimize.program + +import org.apache.calcite.plan.{Context, RelOptPlanner} + +/** + * A OptimizeContext allows to obtain table environment information when optimizing. + */ +trait OptimizeContext { + + /** +* Gets the Calcite [[Context]] defined in [[org.apache.flink.table.api.TableEnvironment]]. +*/ + def getContext: Context Review comment: Yes, it's more clear than before that `OptimizeContext` extends Calcite's `Context` and exposes meaningful APIs. `BatchOptimizeContext` and `StreamOptimizeContext` can be unified after table environments have been unified. We can do this later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9650) Support Protocol Buffers formats
[ https://issues.apache.org/jira/browse/FLINK-9650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781250#comment-16781250 ] vinoyang commented on FLINK-9650: - [~amalakar] In fact, I have not applied the flink style for the IDE. I usually handle the checkstyle problem manually. If a PR is to be merged, it needs to handle all the problems (including checkstyle issues, and field problems such as enum). So, I need to know first, will you continue to work hard to make it merge? Or want other people to continue working on things you haven't done yet? > Support Protocol Buffers formats > > > Key: FLINK-9650 > URL: https://issues.apache.org/jira/browse/FLINK-9650 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We need to generate a \{{TypeInformation}} from a standard [Protobuf > schema|https://github.com/google/protobuf] (and maybe vice verse). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] JingsongLi commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
JingsongLi commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261471313 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.SegmentsUtil; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly + * reduce the serialization/deserialization of Java objects. + * + * A Row has two part: Fixed-length part and variable-length part. + * + * Fixed-length part contains 1 byte header and null bit set and field values. Null bit set is + * used for null tracking and is aligned to 8-byte word boundaries. `Field values` holds + * fixed-length primitive types and variable-length values which can be stored in 8 bytes inside. + * If it do not fit the variable-length field, then store the length and offset of variable-length + * part. + * + * Fixed-length part will certainly fall into a MemorySegment, which will speed up the read + * and write of field. During the write phase, if the target memory segment has less space than + * fixed length part size, we will skip the space. So the number of fields in a single Row cannot + * exceed the capacity of a single MemorySegment, if there are too many fields, we suggest that + * user set a bigger pageSize of MemorySegment. + * + * Variable-length part may fall into multiple MemorySegments. + * + * {@code BinaryRow} are influenced by Apache Spark UnsafeRow in project tungsten. + * The difference is that BinaryRow is placed on a discontinuous memory, and the variable length + * type can also be placed on a fixed length area (If it's short enough). + */ +public final class BinaryRow extends BinaryFormat implements BaseRow { + + public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + public static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? 0xFFF0 : 0x0FFF; + + public static int calculateBitSetWidthInBytes(int arity) { + // add 8 bit header + return ((arity + 63 + 8) / 64) * 8; + } + + private final int arity; + private final int nullBitsSizeInBytes; + + public BinaryRow(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + public int getFixedLengthPartSize() { + return nullBitsSizeInBytes + 8 * arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public byte getHeader() { + // first nullBitsSizeInBytes byte is header. + return segments[0].get(offset); + } + + @Override + public void setHeader(byte header) { + segments[0].put(offset, header); + } + + public void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + this.segments = new MemorySegment[] {segment}; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261470974 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.SegmentsUtil; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly + * reduce the serialization/deserialization of Java objects. + * + * A Row has two part: Fixed-length part and variable-length part. + * + * Fixed-length part contains 1 byte header and null bit set and field values. Null bit set is + * used for null tracking and is aligned to 8-byte word boundaries. `Field values` holds + * fixed-length primitive types and variable-length values which can be stored in 8 bytes inside. + * If it do not fit the variable-length field, then store the length and offset of variable-length + * part. + * + * Fixed-length part will certainly fall into a MemorySegment, which will speed up the read + * and write of field. During the write phase, if the target memory segment has less space than + * fixed length part size, we will skip the space. So the number of fields in a single Row cannot + * exceed the capacity of a single MemorySegment, if there are too many fields, we suggest that + * user set a bigger pageSize of MemorySegment. + * + * Variable-length part may fall into multiple MemorySegments. + * + * {@code BinaryRow} are influenced by Apache Spark UnsafeRow in project tungsten. + * The difference is that BinaryRow is placed on a discontinuous memory, and the variable length + * type can also be placed on a fixed length area (If it's short enough). + */ +public final class BinaryRow extends BinaryFormat implements BaseRow { + + public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + public static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? 0xFFF0 : 0x0FFF; + + public static int calculateBitSetWidthInBytes(int arity) { + // add 8 bit header + return ((arity + 63 + 8) / 64) * 8; + } + + private final int arity; + private final int nullBitsSizeInBytes; + + public BinaryRow(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + public int getFixedLengthPartSize() { + return nullBitsSizeInBytes + 8 * arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public byte getHeader() { + // first nullBitsSizeInBytes byte is header. + return segments[0].get(offset); + } + + @Override + public void setHeader(byte header) { + segments[0].put(offset, header); + } + + public void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + this.segments = new MemorySegment[] {segment}; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void
[jira] [Commented] (FLINK-9650) Support Protocol Buffers formats
[ https://issues.apache.org/jira/browse/FLINK-9650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781237#comment-16781237 ] Arup Malakar commented on FLINK-9650: - Hey [~yanghua], in the current PR I would see if I can get the coding styles in (I am having a hard time teaching my IDE to arrange the imports in flink style), and other small feedbacks if any and get it merged. I would definitely appreciate if you want to spend time on the enum support and whitelisting protobuf objects in PojoField so that we can avoid having to refer to proto fields with underscore but can refer to with its original name. > Support Protocol Buffers formats > > > Key: FLINK-9650 > URL: https://issues.apache.org/jira/browse/FLINK-9650 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We need to generate a \{{TypeInformation}} from a standard [Protobuf > schema|https://github.com/google/protobuf] (and maybe vice verse). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] JingsongLi commented on issue #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats
JingsongLi commented on issue #7816: [FLINK-11701][table-runtime-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#issuecomment-468528599 Fix comments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai edited a comment on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing
tzulitai edited a comment on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing URL: https://github.com/apache/flink/pull/7861#issuecomment-468524319 This would not work. The client is only created in the `open` method, and is therefore always `null` pre-flight on the client side. That's why the connector allows the `RestClient` to be configured via a `RestClientFactory`. @Salatich can you briefly describe how the client is used to allow for custom sniffing? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing
tzulitai commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing URL: https://github.com/apache/flink/pull/7861#issuecomment-468524553 @flinkbot disapprove architecture This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing
tzulitai commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing URL: https://github.com/apache/flink/pull/7861#issuecomment-468524319 @flinkbot disapprove architecture This would not work. The client is only created in the `open` method, and is therefore always `null` pre-flight on the client side. That's why the connector allows the `RestClient` to be configured via a `RestClientFactory`. @Salatich can you briefly describe how the client is used to allow for custom sniffing? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261458610 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +/** + * Util for {@link BinaryString}. + */ +public class BinaryStringUtil { + + private static final int MAX_BYTES_LENGTH = 1024 * 64; + private static final ThreadLocal BYTES_LOCAL = new ThreadLocal<>(); + + public static byte[] allocateBytes(int length) { Review comment: I think you should remind people that this byte[] is only for temporary usage, it should not be stored in somewhere else. Also, you can explain what the thread local cache is used for, what kind of scenarios you want to improve here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261457655 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +/** + * Util for {@link BinaryString}. + */ +public class BinaryStringUtil { Review comment: Put this class to `util` package? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261457262 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.SegmentsUtil; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly reduce the + * serialization/deserialization of Java objects. + * + * A Row has two part: Fixed-length part and variable-length part. + * + * Fixed-length part contains null bit set and field values. Null bit set is used for null tracking and is + * aligned to 8-byte word boundaries. `Field values` holds fixed-length primitive types and variable-length + * values which can be stored in 8 bytes inside. If it do not fit the variable-length field, then store the + * length and offset of variable-length part. Fixed-length part will certainly fall into a MemorySegment, + * which will speed up the read and write of field. + * + * Variable-length part may fall into multiple MemorySegments. + * + * {@code BinaryRow} are influenced by Apache Spark UnsafeRow in project tungsten. + * The difference is that BinaryRow is placed on a discontinuous memory, and the variable length type can + * also be placed on a fixed length area (If it's short enough). + */ +public final class BinaryRow extends BinaryFormat implements BaseRow { + + public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + + public static int calculateBitSetWidthInBytes(int arity) { + // add 8 bit header + return ((arity + 63 + 8) / 64) * 8; + } + + private final int arity; + private final int nullBitsSizeInBytes; + + public BinaryRow(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + public int getFixedLengthPartSize() { + return nullBitsSizeInBytes + 8 * arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public byte getHeader() { + // first nullBitsSizeInBytes byte is header. + return segments[0].get(offset); + } + + @Override + public void setHeader(byte header) { + segments[0].put(offset, header); + } + + public void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + this.segments = new MemorySegment[] {segment}; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void setTotalSize(int sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } + + private void setNotNullAt(int i) { + assertIndexIsValid(i); + // need add header 8 bit. + SegmentsUtil.bitUnSet(segments[0], offset, i + 8); + } + + @Override + public void setNullAt(int i) { + assertIndexIsValid(i); + // need add header 8 bit. +
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261457344 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.SegmentsUtil; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly reduce the + * serialization/deserialization of Java objects. + * + * A Row has two part: Fixed-length part and variable-length part. + * + * Fixed-length part contains null bit set and field values. Null bit set is used for null tracking and is + * aligned to 8-byte word boundaries. `Field values` holds fixed-length primitive types and variable-length + * values which can be stored in 8 bytes inside. If it do not fit the variable-length field, then store the + * length and offset of variable-length part. Fixed-length part will certainly fall into a MemorySegment, + * which will speed up the read and write of field. + * + * Variable-length part may fall into multiple MemorySegments. + * + * {@code BinaryRow} are influenced by Apache Spark UnsafeRow in project tungsten. + * The difference is that BinaryRow is placed on a discontinuous memory, and the variable length type can + * also be placed on a fixed length area (If it's short enough). + */ +public final class BinaryRow extends BinaryFormat implements BaseRow { + + public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + + public static int calculateBitSetWidthInBytes(int arity) { + // add 8 bit header + return ((arity + 63 + 8) / 64) * 8; + } + + private final int arity; + private final int nullBitsSizeInBytes; + + public BinaryRow(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + public int getFixedLengthPartSize() { + return nullBitsSizeInBytes + 8 * arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public byte getHeader() { + // first nullBitsSizeInBytes byte is header. + return segments[0].get(offset); + } + + @Override + public void setHeader(byte header) { + segments[0].put(offset, header); + } + + public void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + this.segments = new MemorySegment[] {segment}; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public void setTotalSize(int sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } + + private void setNotNullAt(int i) { + assertIndexIsValid(i); + // need add header 8 bit. + SegmentsUtil.bitUnSet(segments[0], offset, i + 8); + } + + @Override + public void setNullAt(int i) { + assertIndexIsValid(i); + // need add header 8 bit. +
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261457026 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.SegmentsUtil; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly reduce the + * serialization/deserialization of Java objects. + * + * A Row has two part: Fixed-length part and variable-length part. + * + * Fixed-length part contains null bit set and field values. Null bit set is used for null tracking and is Review comment: Header also in fixed length part This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261456634 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.util.SegmentsUtil; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A special row which is backed by {@link MemorySegment} instead of Object. It can significantly reduce the + * serialization/deserialization of Java objects. + * + * A Row has two part: Fixed-length part and variable-length part. + * + * Fixed-length part contains null bit set and field values. Null bit set is used for null tracking and is + * aligned to 8-byte word boundaries. `Field values` holds fixed-length primitive types and variable-length + * values which can be stored in 8 bytes inside. If it do not fit the variable-length field, then store the + * length and offset of variable-length part. Fixed-length part will certainly fall into a MemorySegment, Review comment: Add some more explains about the fixed part behavior, like during the write phase, if the target memory segment has less space than fixed length part size, we will skip the space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9650) Support Protocol Buffers formats
[ https://issues.apache.org/jira/browse/FLINK-9650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781186#comment-16781186 ] vinoyang commented on FLINK-9650: - [~amalakar] Thank you for the excellent work you have done. If you don't mind, my colleagues [~winipanda] (or me) can continue to help with the follow-up work. > Support Protocol Buffers formats > > > Key: FLINK-9650 > URL: https://issues.apache.org/jira/browse/FLINK-9650 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: zhangminglei >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We need to generate a \{{TypeInformation}} from a standard [Protobuf > schema|https://github.com/google/protobuf] (and maybe vice verse). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on issue #7848: [FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common
dianfu commented on issue #7848: [FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common URL: https://github.com/apache/flink/pull/7848#issuecomment-468511247 @sunjincheng121 Thanks a lot for the review. I will rebase the PR after all the commits of FLINK-11449 merged. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261456020 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.util.SegmentsUtil; + +/** + * Binary format that in {@link MemorySegment}s. + */ +public abstract class BinaryFormat { + + private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE; + private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56; + + protected MemorySegment[] segments; + protected int offset; + protected int sizeInBytes; + + public BinaryFormat() {} + + public BinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public MemorySegment[] getSegments() { + return segments; + } + + public int getOffset() { + return offset; + } + + public int getSizeInBytes() { + return sizeInBytes; + } + + @Override + public boolean equals(Object o) { + return this == o || o != null && + getClass() == o.getClass() && + binaryEquals((BinaryFormat) o); + } + + public boolean binaryEquals(BinaryFormat that) { + return sizeInBytes == that.sizeInBytes && + SegmentsUtil.equals(segments, offset, that.segments, that.offset, sizeInBytes); + } + + /** +* Get binary string, if length less than 8, will be include in variablePartOffsetAndSize. +* +* If length is less than 8, its binary format is: +* 1bit mark(1), 7bits length, and 7bytes data. Review comment: Could you explain what is mark for? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261455698 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.util.SegmentsUtil; + +/** + * Binary format that in {@link MemorySegment}s. + */ +public abstract class BinaryFormat { + + private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE; + private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56; + + protected MemorySegment[] segments; + protected int offset; + protected int sizeInBytes; + + public BinaryFormat() {} + + public BinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public MemorySegment[] getSegments() { + return segments; + } + + public int getOffset() { + return offset; + } + + public int getSizeInBytes() { + return sizeInBytes; + } + + @Override + public boolean equals(Object o) { + return this == o || o != null && + getClass() == o.getClass() && + binaryEquals((BinaryFormat) o); + } + + public boolean binaryEquals(BinaryFormat that) { + return sizeInBytes == that.sizeInBytes && + SegmentsUtil.equals(segments, offset, that.segments, that.offset, sizeInBytes); + } + + /** +* Get binary string, if length less than 8, will be include in variablePartOffsetAndSize. +* +* If length is less than 8, its binary format is: +* 1bit mark(1), 7bits length, and 7bytes data. +* +* If length is greater or equal to 8, its binary format is: +* 4bytes variable part offset and 4bytes length. Data is stored in variable-length part. +* +* Note: Need to consider the ByteOrder. +* +* @param baseOffset base offset of composite binary format. +* @param fieldOffset absolute start offset of 'variablePartOffsetAndSize'. +* @param variablePartOffsetAndSize a long value, 4bytes variable part offset and 4bytes length. +*/ + static BinaryString readBinaryStringFieldFromSegments( Review comment: Seems this method should place in `BinaryString`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats
KurtYoung commented on a change in pull request #7816: [FLINK-11701][table-planner-blink] Introduce an abstract set of data formats URL: https://github.com/apache/flink/pull/7816#discussion_r261455452 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/util/SegmentsUtil.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.util; + +import org.apache.flink.core.memory.MemorySegment; + +import java.nio.ByteOrder; + +/** + * Util for data format segments calc. + */ +public class SegmentsUtil { Review comment: Could you add some comments to each util method this class provided, just like MemorySegment did? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11679) Create Blink SQL planner and runtime modules
[ https://issues.apache.org/jira/browse/FLINK-11679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11679: --- Component/s: (was: API / Table SQL) SQL / Planner > Create Blink SQL planner and runtime modules > > > Key: FLINK-11679 > URL: https://issues.apache.org/jira/browse/FLINK-11679 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As mentioned in FLIP-32, we will create separate modules while performing the > Blink SQL merge. As part of this issue we will create > \{{flink-table-planner-blink}} and \{{flink-table-runtime-blink}} modules. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11724) Add copyToUnsafe, method to MemorySegment
[ https://issues.apache.org/jira/browse/FLINK-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11724: --- Summary: Add copyToUnsafe, method to MemorySegment (was: Add util method to MemorySegment) > Add copyToUnsafe, method to MemorySegment > -- > > Key: FLINK-11724 > URL: https://issues.apache.org/jira/browse/FLINK-11724 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Blink new binary format is based on MemorySegment. > Improve MemorySegment to provide copyToUnsafe copyFromUnsafe and equalTo. > 1.copyUnsafe: If we want to copy int[] data into MemorySegment, we can only > use int to set Int one by one, which is inefficient. We can copyFromUnsafe > directly. > 2.equalTo: Provide an efficient equalTo. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11702) Introduce a new table type system: InternalType
[ https://issues.apache.org/jira/browse/FLINK-11702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11702: --- Component/s: (was: API / Table SQL) SQL / Planner > Introduce a new table type system: InternalType > --- > > Key: FLINK-11702 > URL: https://issues.apache.org/jira/browse/FLINK-11702 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > After FLINK-11701 , A type will correspond to two data formats. We continue > to use TypeInformation to represent the calculation of the Table layer is not > suitable, because TypeInformation is bound to the physical format. > Introducing InternalType, which is a lightweight form, the table layer uses > enumerations for CodeGen and calculations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11714) Add cost model for both batch and streaming
[ https://issues.apache.org/jira/browse/FLINK-11714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11714: --- Component/s: (was: API / Table SQL) SQL / Planner > Add cost model for both batch and streaming > --- > > Key: FLINK-11714 > URL: https://issues.apache.org/jira/browse/FLINK-11714 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Calcite's default cost model only contains ROWS, IO and CPU, and does not > take IO and CPU into account when the cost is compared. > There are two improvements: > 1. Add NETWORK and MEMORY to represents distribution cost and memory usage. > 2. The optimization goal is to use minimal resources now, so the comparison > order of factors is: > (1). first compare CPU. Each operator will use CPU, so we think it's the > most important factor. > (2). then compare MEMORY, NETWORK and IO as a normalized value. > Comparison order of them is not easy to decide, so convert them to CPU cost > by different ratio. > (3). finally compare ROWS. ROWS has been counted when calculating other > factory. > e.g. CPU of Sort = nLogN(ROWS) * number of sort keys, CPU of Filter > = ROWS * condition cost on a row. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11711) Add table and column stats
[ https://issues.apache.org/jira/browse/FLINK-11711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11711: --- Component/s: (was: API / Table SQL) SQL / Planner > Add table and column stats > -- > > Key: FLINK-11711 > URL: https://issues.apache.org/jira/browse/FLINK-11711 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > We define two structure mode to hold statistics > 1. TableStats: statistics for table level, contains 2 elements: > rowCount: Long // the number of row count of table > colStats: Map[String, ColumnStats] // map each column to its ColumnStats > 2. ColumnStats: statistics for column level, contains 6 elements: > ndv: Long // number of distinct values > nullCount: Long // number of null values > avgLen: Double // average length of column values > maxLen: Integer // max length of column values > max: Any // max value of column values > min: Any // min value of column values -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11715) Add optimize program to organize optimization phases
[ https://issues.apache.org/jira/browse/FLINK-11715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11715: --- Component/s: (was: API / Table SQL) SQL / Planner > Add optimize program to organize optimization phases > > > Key: FLINK-11715 > URL: https://issues.apache.org/jira/browse/FLINK-11715 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, Flink organizes the optimization phases by different methods in > Batch(Stream)TableEnvironment#optimize. However this is not easy to extend > especially there are more than ten optimization stages in Blink. On the other > hand, the methods are very similar, except the match order and rule sets for > hep optimization phases, target traits and rule sets for volcano optimization > phases. > Abstracts each optimization stage into a {{FlinkOptimizeProgram}} in Blink, > defined as following: > {code} > /** > * Likes [[org.apache.calcite.tools.Program]], FlinkOptimizeProgram > transforms a relational > * expression into another relational expression. > */ > trait FlinkOptimizeProgram[OC <: OptimizeContext] { > def optimize(input: RelNode, context: OC): RelNode > } > {code} > {{FlinkOptimizeProgram}}'s subclasses include > 1. {{FlinkRuleSetProgram}}, an abstract program can add/remove {{RuleSet}}, > set target traits. > 2. {{FlinkHepRuleSetProgram}}, a subclass of {{FlinkRuleSetProgram}} which > runs with {{HepPlanner}}. > 3. {{FlinkVolcanoProgram}}, a subclass of {{FlinkRuleSetProgram}} which runs > with {{VolcanoPlanner}}. > 4. {{FlinkGroupProgram}}, a program contains a sequence of sub-programs as a > group, programs in the group will be executed in sequence, and the group can > be executed `iterations` times. > .. > {{FlinkChainedPrograms}} is responsible for organizing all the programs, each > program's optimize method will be called in sequence when > {{FlinkChainedPrograms}}#optimize is called. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11703) Add a basic framework for Blink SQL
[ https://issues.apache.org/jira/browse/FLINK-11703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11703: --- Component/s: (was: API / Table SQL) SQL / Planner > Add a basic framework for Blink SQL > --- > > Key: FLINK-11703 > URL: https://issues.apache.org/jira/browse/FLINK-11703 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We need an initial commit to add something like Table, TableEnvironment, > Expressions, ExpressionParser, etc.. > > They can be copied from flink-table-planner, and add changes to them in the > future when merging Blink features. There are no big differences in these > classes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11680) Make query plan easier to read
[ https://issues.apache.org/jira/browse/FLINK-11680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11680: --- Component/s: (was: API / Table SQL) SQL / Planner > Make query plan easier to read > -- > > Key: FLINK-11680 > URL: https://issues.apache.org/jira/browse/FLINK-11680 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Attachments: image-2019-02-20-20-05-06-506.png, screenshot-1.png > > > The query plan generated by RelOptUtil#toString is hard to read, especially > the query is very complex(multiple joins or unions). > There is a query plan of tpcds q25.sql generated by RelOptUtil#toString: > !image-2019-02-20-20-05-06-506.png! > We can improve the utility method to make the query plan more readable, like: > !screenshot-1.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11685) Add plan test infrastructure
[ https://issues.apache.org/jira/browse/FLINK-11685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11685: --- Component/s: (was: API / Table SQL) SQL / Planner > Add plan test infrastructure > > > Key: FLINK-11685 > URL: https://issues.apache.org/jira/browse/FLINK-11685 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner, Tests >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > > 1. Add test base for logical unit testing > 2. Add test utility for easy testing, e.g. {{register table}}, {{register > function}}, {{verify plan}} > 3. Verify plan base on > [DiffRepository|https://github.com/apache/calcite/blob/2f33a0c57b7b7e77b8193d0fff1e3531119aee0a/core/src/test/java/org/apache/calcite/test/DiffRepository.java] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11701) Introduce an abstract set of data formats
[ https://issues.apache.org/jira/browse/FLINK-11701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11701: --- Component/s: (was: API / Table SQL) Runtime / Operators > Introduce an abstract set of data formats > - > > Key: FLINK-11701 > URL: https://issues.apache.org/jira/browse/FLINK-11701 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Blink uses an abstract set of data formats to make internal calculations use > the binary format as much as possible. This minimizes the serialization > overhead and java object overhead. > It includes: > BaseRow <=> Row > BaseMap <=> Java Map > BaseArray <=> Java array > BaseString <=> Java String > Decimal <=> BigDecimal //Scale of this object is specified by the user, not > automatically determined(like BigDecimal). > int <=> Date //Flink used to use int in the calculation, but the remaining in > Row is still Date, we will change it completely. > int <=> Time > long <=> Timestamp > byte[] <=> byte[] > BaseGeneric <=> T (GenericRelDataType, we don't know it, let user define > serializer) > primitive type keep same, but use less boxed type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11488) Add a basic Blink planner framework
[ https://issues.apache.org/jira/browse/FLINK-11488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11488: --- Component/s: (was: API / Table SQL) SQL / Planner > Add a basic Blink planner framework > --- > > Key: FLINK-11488 > URL: https://issues.apache.org/jira/browse/FLINK-11488 > Project: Flink > Issue Type: Sub-task > Components: SQL / Planner >Reporter: Timo Walther >Assignee: godfrey he >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > This issue is an umbrella issue for tasks related to the setup of a basic, > unified planner framework for introducing new Blink features (in both batch > and streaming). > It includes tasks such as: > - Introduction of {{flink-table-planner-blink}} and > {{flink-table-runtime-blink}} modules > - Addition of a planner implementation that matches the interfaces > introduced in FLINK-11452 > - Addition of basic planner components such as > ** optimizer > ** statistics > ** costs > ** code generator for expressions > Types are converted between Blink's and Flink's current type system until the > type system has been reworked. Unsupported operators and calls will throw > hard exceptions for now. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11724) Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment
[ https://issues.apache.org/jira/browse/FLINK-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781173#comment-16781173 ] Kurt Young commented on FLINK-11724: implemented in 492d1d40100b1297281408bf0d83b6db5378b9cb > Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment > - > > Key: FLINK-11724 > URL: https://issues.apache.org/jira/browse/FLINK-11724 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Blink new binary format is based on MemorySegment. > Improve MemorySegment to provide copyToUnsafe copyFromUnsafe and equalTo. > 1.copyUnsafe: If we want to copy int[] data into MemorySegment, we can only > use int to set Int one by one, which is inefficient. We can copyFromUnsafe > directly. > 2.equalTo: Provide an efficient equalTo. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11724) Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment
[ https://issues.apache.org/jira/browse/FLINK-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11724: --- Summary: Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment (was: Add copyToUnsafe, copyFromUnsafe and equalTo method to MemorySegment) > Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment > - > > Key: FLINK-11724 > URL: https://issues.apache.org/jira/browse/FLINK-11724 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Blink new binary format is based on MemorySegment. > Improve MemorySegment to provide copyToUnsafe copyFromUnsafe and equalTo. > 1.copyUnsafe: If we want to copy int[] data into MemorySegment, we can only > use int to set Int one by one, which is inefficient. We can copyFromUnsafe > directly. > 2.equalTo: Provide an efficient equalTo. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-11724) Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment
[ https://issues.apache.org/jira/browse/FLINK-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young resolved FLINK-11724. Resolution: Implemented Fix Version/s: 1.9.0 > Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment > - > > Key: FLINK-11724 > URL: https://issues.apache.org/jira/browse/FLINK-11724 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Blink new binary format is based on MemorySegment. > Improve MemorySegment to provide copyToUnsafe copyFromUnsafe and equalTo. > 1.copyUnsafe: If we want to copy int[] data into MemorySegment, we can only > use int to set Int one by one, which is inefficient. We can copyFromUnsafe > directly. > 2.equalTo: Provide an efficient equalTo. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11724) Add copyToUnsafe, copyFromUnsafe and equalTo method to MemorySegment
[ https://issues.apache.org/jira/browse/FLINK-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11724: --- Summary: Add copyToUnsafe, copyFromUnsafe and equalTo method to MemorySegment (was: Add copyToUnsafe, method to MemorySegment) > Add copyToUnsafe, copyFromUnsafe and equalTo method to MemorySegment > > > Key: FLINK-11724 > URL: https://issues.apache.org/jira/browse/FLINK-11724 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Blink new binary format is based on MemorySegment. > Improve MemorySegment to provide copyToUnsafe copyFromUnsafe and equalTo. > 1.copyUnsafe: If we want to copy int[] data into MemorySegment, we can only > use int to set Int one by one, which is inefficient. We can copyFromUnsafe > directly. > 2.equalTo: Provide an efficient equalTo. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] KurtYoung closed pull request #7847: [FLINK-11724][core] Add util method to MemorySegment
KurtYoung closed pull request #7847: [FLINK-11724][core] Add util method to MemorySegment URL: https://github.com/apache/flink/pull/7847 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on issue #7847: [FLINK-11724][core] Add util method to MemorySegment
KurtYoung commented on issue #7847: [FLINK-11724][core] Add util method to MemorySegment URL: https://github.com/apache/flink/pull/7847#issuecomment-468506183 merging this... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7848: [FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common
flinkbot edited a comment on issue #7848: [FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common URL: https://github.com/apache/flink/pull/7848#issuecomment-467855282 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7848: [FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common
sunjincheng121 commented on issue #7848: [FLINK-10755][table] Port external catalogs in Table API extension points to flink-table-common URL: https://github.com/apache/flink/pull/7848#issuecomment-468505635 @flinkbot approve description @flinkbot approve consensus Should be rebased after all commits of FLINK-11449 be merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11379) "java.lang.OutOfMemoryError: Direct buffer memory" when TM loads a large size TDD
[ https://issues.apache.org/jira/browse/FLINK-11379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11379. --- Resolution: Fixed Fixed in master: 1f5359a5259987b3c9d506d559b6421af961ab0a Fixed in release-1.8: c2bc493cb38258281a98a80848370b3a5b5c01e8 > "java.lang.OutOfMemoryError: Direct buffer memory" when TM loads a large size > TDD > - > > Key: FLINK-11379 > URL: https://issues.apache.org/jira/browse/FLINK-11379 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.7.0, 1.7.1 >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > When TM loads a offloaded TDD with large size, it may throw a > "java.lang.OutOfMemoryError: Direct Buffer Memory" error. The loading uses > nio's _Files.readAllBytes()_ to read serialized TDD. In the call stack of > _Files.readAllBytes()_ , it will allocate a direct memory buffer which's size > is equal the length of the file. This will cause OutOfMemoryErro error when > direct memory is not enough. > If the length of a file is large than a maximum buffer size, the maximum > size direct-buffer should be used to read bytes of the file to avoid direct > memory OutOfMemoryError. The maximum buffer size can be 8K or others. > The exception stack is as follows (this exception stack is from an old Flink > version, but the master branch has the same problem). > Caused by: java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:706) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:182) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > at java.nio.file.Files.read(Files.java:3105) > at java.nio.file.Files.readAllBytes(Files.java:3158) > at > org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.loadBigData(TaskDeploymentDescriptor.java:338) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:397) > at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:211) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:155) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:133) > at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > ... 9 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11788) Support Code Generation for RexNode
[ https://issues.apache.org/jira/browse/FLINK-11788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11788: --- Component/s: (was: API / Table SQL) SQL / Planner > Support Code Generation for RexNode > --- > > Key: FLINK-11788 > URL: https://issues.apache.org/jira/browse/FLINK-11788 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Introduce {{CodeGeneratorContext}} and {{ExprCodeGenerator}} to support > generation codes from RexNode. > The {{CodeGeneratorContext}} will keep all the reusable statements which will > be the basic class for code generation. In the future, we will introduce > {{FunctionCodeGeneration}}, {{AggregateCodeGeneration}}, etc... and they will > depend on the {{CodeGeneratorContext}} to store reusable statements. > The {{ExprCodeGenerator}} will only do generating codes from RexNode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7797: [FLINK-11379] Fix OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD
asfgit closed pull request #7797: [FLINK-11379] Fix OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD URL: https://github.com/apache/flink/pull/7797 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services