[GitHub] KarmaGYZ opened a new pull request #7211: [hotfix][docs] Add and update description of local-recovery config op…

2018-12-02 Thread GitBox
KarmaGYZ opened a new pull request #7211: [hotfix][docs] Add and update 
description of local-recovery config op…
URL: https://github.com/apache/flink/pull/7211
 
 
   …tions
   
   ## What is the purpose of the change
   
   This pr update the javadoc of "state.backend.local-recovery" and 
"taskmanager.state.local.root-dirs" and add description which cause empty item 
in checkpointing_configuration.html.
   
   ## Brief change log
   
   - Update the javadoc of "state.backend.local-recovery" and 
"taskmanager.state.local.root-dirs". Append the limitation in current version.
   - Add the description to these options, merge the 
checkpointing_configuration.html by [this 
guid](https://github.com/apache/flink/blob/master/flink-docs/README.md)
   - I also change the number of indents of these two options, make it same 
with other options. I think it may make the code-style cleaner.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   
   cc @zentol 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10775) Quarantined address [akka.tcp://flink@flink-jobmanager:6123] is still unreachable or has not been restarted. Keeping it quarantined.

2018-12-02 Thread miki haiat (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706252#comment-16706252
 ] 

miki haiat commented on FLINK-10775:


I had this issue as well on 1.4.x .

I can confirm that on 1.5.5 and 1.6.x this issue is no longer exists 

> Quarantined address [akka.tcp://flink@flink-jobmanager:6123] is still 
> unreachable or has not been restarted. Keeping it quarantined.
> 
>
> Key: FLINK-10775
> URL: https://issues.apache.org/jira/browse/FLINK-10775
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.4.2
> Environment: k8s+docker 
> standalone (1jobmanager + 5taskmanager)
> taskmanager.slotnum=4
>Reporter: ChuanHaiTan
>Priority: Blocker
>  Labels: k8s+docker, usability
> Attachments: 
> logs-from-flink-jobmanager-in-flink-jobmanager-65c8d85f4f-5fm2d.txt, 
> logs-from-flink-taskmanager-in-flink-taskmanager-758575577d-7lw82.txt, 
> logs-from-flink-taskmanager-in-flink-taskmanager-758575577d-qbj9g.txt, 
> 微信图片_20181031171312.png, 微信图片_20181031171316.png
>
>
> On the k8s+docker environment, the 1 jobmanager container and 5 taskmanager 
> container are the standalone cluster modes.
> {color:#FF}But for some reason, the jobmanager is rebooted, and two of 
> the remaining three taskmanger are also rebooted, and two of the remaining 
> three taskmanger don't connect to jobmanager, resulting in insufficient slot 
> resources reporting errors.{color}
> The attachments are the jobmanager log, two disconnected taskmanger logs, and 
> all available and unavailable taskmanager screenshots of flink at the time.
> It is strange that two rebooted taskmanger can connect with jobmanager, and 
> one of the three unrebooted taskamanagers can connect.
> Why?Can the cause of the restart be analyzed from the log?thank you



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] KarmaGYZ opened a new pull request #7212: [hotfix][docs] Fix invalid link in schema_evolution doc

2018-12-02 Thread GitBox
KarmaGYZ opened a new pull request #7212: [hotfix][docs] Fix invalid link in 
schema_evolution doc
URL: https://github.com/apache/flink/pull/7212
 
 
   ## What is the purpose of the change
   
   Fix an invalid link in schema_evolution doc
   
   ## Brief change log
   
   - correct link to custom_serialization
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-02 Thread GitBox
walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-443522584
 
 
   Hi @lamber-ken . the minimum reproduction result should be a good starting 
point for adding test/ITCase to justify your change. I also commented on the 
JIRA ticket as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread miki haiat (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706243#comment-16706243
 ] 

miki haiat commented on FLINK-11046:


HI ,
Can you please share the elastic connection properties .



 

 

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
> if (concurrentRequests == 0) {
> latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
> client, BulkProcessor.Listener listener) {
>  return BulkProcessor.builder(client::bulkAsync, listener);
> }
> {code}
> So, I hope someone can help 

[jira] [Created] (FLINK-11047) CoGroupGroupSortTranslationTest does not compile with scala 2.12

2018-12-02 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11047:


 Summary: CoGroupGroupSortTranslationTest does not compile with 
scala 2.12
 Key: FLINK-11047
 URL: https://issues.apache.org/jira/browse/FLINK-11047
 Project: Flink
  Issue Type: Bug
  Components: Machine Learning Library, Scala API
Affects Versions: 1.8.0
Reporter: Chesnay Schepler
 Fix For: 1.8.0


{code:java}
11:47:18.576 [ERROR] 
/home/travis/build/apache/flink/flink/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala:131:
 error: value foreach is not a member of Object
11:47:18.576 [ERROR] .apply((a, b, c: Collector[(Long, Long)]) => 
a.foreach(e => c.collect(e))){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706400#comment-16706400
 ] 

ASF GitHub Bot commented on FLINK-11010:


walterddr edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-443522584
 
 
   Hi @lamber-ken . the minimum reproduction example should be a good starting 
point for adding test/ITCase to justify your change. I also commented on the 
JIRA ticket as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-02 Thread GitBox
walterddr edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-443522584
 
 
   Hi @lamber-ken . the minimum reproduction example should be a good starting 
point for adding test/ITCase to justify your change. I also commented on the 
JIRA ticket as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-02 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706398#comment-16706398
 ] 

Rong Rong commented on FLINK-11010:
---

Based on the discussion in the mailing list: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-about-Timestamp-in-Flink-SQL-td16928.html,
 I think Flink's internal time service using GMT as default would require a 
more sophisticated solution. 

Currently I think the best solution is to use DATE_FORMAT or other SQL 
timezone-based operation to convert proctime. and always treat proctime as GMT 
on queries like: {{select proctime from tbl}}.

I've also linked https://issues.apache.org/jira/browse/FLINK-8353 for the 
reference.

> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706399#comment-16706399
 ] 

ASF GitHub Bot commented on FLINK-11010:


walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-443522584
 
 
   Hi @lamber-ken . the minimum reproduction result should be a good starting 
point for adding test/ITCase to justify your change. I also commented on the 
JIRA ticket as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-02 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706517#comment-16706517
 ] 

Thomas Weise commented on FLINK-11048:
--

Related: 
http://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAO_f5ND=0f+ubbresfthmbi-bny4bjgbozo3fzeszujiovb...@mail.gmail.com%3E

> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-02 Thread Thomas Weise (JIRA)
Thomas Weise created FLINK-11048:


 Summary: Ability to programmatically execute streaming pipeline 
with savepoint restore  
 Key: FLINK-11048
 URL: https://issues.apache.org/jira/browse/FLINK-11048
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.7.0
Reporter: Thomas Weise
Assignee: Thomas Weise


RemoteStreamEnvironment.execute doesn't support restore from savepoint, though 
the underlying ClusterClient does. Add an explicit "execute remotely" that can 
be used by downstream projects.

[https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]

 

  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins

2018-12-02 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706580#comment-16706580
 ] 

Hequn Cheng commented on FLINK-11020:
-

Maybe we can implement cross join with parallelism of 1? For example, call 
forceNonParallel() for connectOperator in `DataStreamJoin` when it is cross 
join. We can do some optimization later, such as broadcasting the smaller side, 
etc.

> Reorder joins only to eliminate cross joins 
> 
>
> Key: FLINK-11020
> URL: https://issues.apache.org/jira/browse/FLINK-11020
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied 
> if a cross join is the only alternative.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] liu-zhaokun commented on issue #7193: [FLINK-11024]Overwrite yarn configuration by setting configuration started with "flink.yarn." in flink-conf.yaml

2018-12-02 Thread GitBox
liu-zhaokun commented on issue #7193: [FLINK-11024]Overwrite yarn configuration 
by setting configuration started with "flink.yarn." in flink-conf.yaml
URL: https://github.com/apache/flink/pull/7193#issuecomment-443566938
 
 
   I think this failing check does not related to my PR, is there anyone helped 
me re-trigger CI,please?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11024) Overwrite yarn configuration by setting configuration started with "flink.yarn." in flink-conf.yaml

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706591#comment-16706591
 ] 

ASF GitHub Bot commented on FLINK-11024:


liu-zhaokun commented on issue #7193: [FLINK-11024]Overwrite yarn configuration 
by setting configuration started with "flink.yarn." in flink-conf.yaml
URL: https://github.com/apache/flink/pull/7193#issuecomment-443566938
 
 
   I think this failing check does not related to my PR, is there anyone helped 
me re-trigger CI,please?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Overwrite yarn configuration by setting configuration started with 
> "flink.yarn." in flink-conf.yaml
> ---
>
> Key: FLINK-11024
> URL: https://issues.apache.org/jira/browse/FLINK-11024
> Project: Flink
>  Issue Type: New Feature
>  Components: YARN
>Affects Versions: 1.6.2
>Reporter: liuzhaokun
>Priority: Major
>  Labels: pull-request-available
>
> Overwrite yarn configuration by setting configuration started with 
> "flink.yarn." in flink-conf.yaml as spark implement this feature by setting 
> configuration started with "spark.hadoop." in spark-defaults.conf. It is 
> convenient for users to set yarn config,because they can only focus on flink 
> configuration files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706592#comment-16706592
 ] 

ASF GitHub Bot commented on FLINK-7208:
---

dianfu commented on issue #7201: [FLINK-7208] [table] Optimize 
Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#issuecomment-443566996
 
 
   Hi @walterddr , thanks a lot for your review. Make sense to me and have 
added an IT case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView
> -
>
> Key: FLINK-7208
> URL: https://issues.apache.org/jira/browse/FLINK-7208
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: kaibo.zhou
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Refactor build-in agg(MaxWithRetractAccumulator and 
> MinWithRetractAccumulator) using the DataView.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on issue #7201: [FLINK-7208] [table] Optimize Min/MaxWithRetractAggFunction with DataView

2018-12-02 Thread GitBox
dianfu commented on issue #7201: [FLINK-7208] [table] Optimize 
Min/MaxWithRetractAggFunction with DataView
URL: https://github.com/apache/flink/pull/7201#issuecomment-443566996
 
 
   Hi @walterddr , thanks a lot for your review. Make sense to me and have 
added an IT case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread luoguohao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoguohao updated FLINK-11046:
--
Description: 
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that 
`synchronized` is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object 
payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the 
bulk process thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new 
ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
   {color:red} if (concurrentRequests == 0) {
latch.await();
}{color}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) {  // if we fail on 
client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry 
operation thread was block, because the the bulk process thread never release 
the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
`concurrentRequests` was set to zero. And i saw the the initialize for 
bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =  
callBridge.createBulkProcessorBuilder(client, listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make 
sense, but i still wonder why the retry operation is not in the same thread as 
the bulk process execution, after i read the code, `bulkAsync` method might be 
the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
client, BulkProcessor.Listener listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and 
also i can make a try to take it. 
 Thanks a lot !

  was:
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the 

[jira] [Updated] (FLINK-11045) UserCodeClassLoader has not been set correctly for RuntimeUDFContext in CollectionExecutor

2018-12-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11045:
---
Labels: pull-request-available  (was: )

> UserCodeClassLoader has not been set correctly for RuntimeUDFContext in 
> CollectionExecutor
> --
>
> Key: FLINK-11045
> URL: https://issues.apache.org/jira/browse/FLINK-11045
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> We should use {{UserCodeClassLoader}} when new a {{RuntimeUDFContext}}.
> i.e., Change the code from
> {code:java}
> this.classLoader = getClass().getClassLoader();
> new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
> accumulators, metrics)
> {code}
> to
> {code:java}
> new RuntimeUDFContext(taskInfo, Thread.currentThread().getContextClassLoader, 
> executionConfig, cachedFiles, accumulators, metrics)
> {code}
> in {{CollectionExecutor}}.
> FYI. This is a problem reported from the [user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-example-Table-program-cannot-be-compiled-This-is-a-bug-Please-file-an-issue-td24852.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11045) UserCodeClassLoader has not been set correctly for RuntimeUDFContext in CollectionExecutor

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706601#comment-16706601
 ] 

ASF GitHub Bot commented on FLINK-11045:


hequn8128 opened a new pull request #7213: [FLINK-11045][table] Set correct 
UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
URL: https://github.com/apache/flink/pull/7213
 
 
   
   ## What is the purpose of the change
   
   Currently, a system classloader is been used when creating a 
`RuntimeUDFContext`. This pull request set correct `UserCodeClassLoader` for 
`RuntimeUDFContext` in `CollectionExecutor`.
   
   
   ## Brief change log
   
 - Set correct UserCodeClassLoader for RuntimeUDFContext in 
CollectionExecutor
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UserCodeClassLoader has not been set correctly for RuntimeUDFContext in 
> CollectionExecutor
> --
>
> Key: FLINK-11045
> URL: https://issues.apache.org/jira/browse/FLINK-11045
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> We should use {{UserCodeClassLoader}} when new a {{RuntimeUDFContext}}.
> i.e., Change the code from
> {code:java}
> this.classLoader = getClass().getClassLoader();
> new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
> accumulators, metrics)
> {code}
> to
> {code:java}
> new RuntimeUDFContext(taskInfo, Thread.currentThread().getContextClassLoader, 
> executionConfig, cachedFiles, accumulators, metrics)
> {code}
> in {{CollectionExecutor}}.
> FYI. This is a problem reported from the [user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-example-Table-program-cannot-be-compiled-This-is-a-bug-Please-file-an-issue-td24852.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 opened a new pull request #7213: [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor

2018-12-02 Thread GitBox
hequn8128 opened a new pull request #7213: [FLINK-11045][table] Set correct 
UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
URL: https://github.com/apache/flink/pull/7213
 
 
   
   ## What is the purpose of the change
   
   Currently, a system classloader is been used when creating a 
`RuntimeUDFContext`. This pull request set correct `UserCodeClassLoader` for 
`RuntimeUDFContext` in `CollectionExecutor`.
   
   
   ## Brief change log
   
 - Set correct UserCodeClassLoader for RuntimeUDFContext in 
CollectionExecutor
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7026: [FLINK-10798] Add the version number of Flink 1.7 to MigrationVersion

2018-12-02 Thread GitBox
tzulitai commented on issue #7026: [FLINK-10798] Add the version number of 
Flink 1.7 to MigrationVersion
URL: https://github.com/apache/flink/pull/7026#issuecomment-443592846
 
 
   +1 LGTM, will merge this now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10798) Add the version number of Flink 1.7 to MigrationVersion

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706670#comment-16706670
 ] 

ASF GitHub Bot commented on FLINK-10798:


tzulitai commented on issue #7026: [FLINK-10798] Add the version number of 
Flink 1.7 to MigrationVersion
URL: https://github.com/apache/flink/pull/7026#issuecomment-443592846
 
 
   +1 LGTM, will merge this now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add the version number of Flink 1.7 to MigrationVersion
> ---
>
> Key: FLINK-10798
> URL: https://issues.apache.org/jira/browse/FLINK-10798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11050) When IntervalJoin, get left or right buffer's entries more quickly by assigning lowerBound

2018-12-02 Thread Liu (JIRA)
Liu created FLINK-11050:
---

 Summary: When IntervalJoin, get left or right buffer's entries 
more quickly by assigning lowerBound
 Key: FLINK-11050
 URL: https://issues.apache.org/jira/browse/FLINK-11050
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.7.0, 1.6.2
Reporter: Liu
 Fix For: 1.7.1


When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10798) Add the version number of Flink 1.7 to MigrationVersion

2018-12-02 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10798.
---
Resolution: Fixed

Merged to master for 1.8.0: f08adb721d1a4b373ef328529313443f4af923b3

> Add the version number of Flink 1.7 to MigrationVersion
> ---
>
> Key: FLINK-10798
> URL: https://issues.apache.org/jira/browse/FLINK-10798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7026: [FLINK-10798] Add the version number of Flink 1.7 to MigrationVersion

2018-12-02 Thread GitBox
asfgit closed pull request #7026: [FLINK-10798] Add the version number of Flink 
1.7 to MigrationVersion
URL: https://github.com/apache/flink/pull/7026
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
index 3eb29d4cf74..5ce24ed6835 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
@@ -30,7 +30,8 @@
v1_3("1.3"),
v1_4("1.4"),
v1_5("1.5"),
-   v1_6("1.6");
+   v1_6("1.6"),
+   v1_7("1.7");
 
private String versionStr;
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10776) Update migration tests for Flink 1.7

2018-12-02 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706678#comment-16706678
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-10776:
-

[~yanghua] I've merged FLINK-10798 to unblock the remaining work. Thanks for 
pushing this forward.

> Update migration tests for Flink 1.7
> 
>
> Key: FLINK-10776
> URL: https://issues.apache.org/jira/browse/FLINK-10776
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.8.0
>
>
> Now that the feature branch is cut for Flink 1.7, we should now update 
> existing migration tests to cover restoring from 1.7 savepoints.
> Each independent migration test will be tracked as a separate sub-task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10798) Add the version number of Flink 1.7 to MigrationVersion

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706676#comment-16706676
 ] 

ASF GitHub Bot commented on FLINK-10798:


asfgit closed pull request #7026: [FLINK-10798] Add the version number of Flink 
1.7 to MigrationVersion
URL: https://github.com/apache/flink/pull/7026
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
index 3eb29d4cf74..5ce24ed6835 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java
@@ -30,7 +30,8 @@
v1_3("1.3"),
v1_4("1.4"),
v1_5("1.5"),
-   v1_6("1.6");
+   v1_6("1.6"),
+   v1_7("1.7");
 
private String versionStr;
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add the version number of Flink 1.7 to MigrationVersion
> ---
>
> Key: FLINK-10798
> URL: https://issues.apache.org/jira/browse/FLINK-10798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10776) Update migration tests for Flink 1.7

2018-12-02 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706678#comment-16706678
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-10776 at 12/3/18 5:30 AM:
--

[~yanghua] I've merged FLINK-10798 to unblock the remaining work. Thanks for 
driving the effort.


was (Author: tzulitai):
[~yanghua] I've merged FLINK-10798 to unblock the remaining work. Thanks for 
pushing this forward.

> Update migration tests for Flink 1.7
> 
>
> Key: FLINK-10776
> URL: https://issues.apache.org/jira/browse/FLINK-10776
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.8.0
>
>
> Now that the feature branch is cut for Flink 1.7, we should now update 
> existing migration tests to cover restoring from 1.7 savepoints.
> Each independent migration test will be tracked as a separate sub-task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11050) When IntervalJoin, get left or right buffer's entries more quickly by assigning lowerBound

2018-12-02 Thread Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liu updated FLINK-11050:

Description: 
    When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.

    Our usage is like below:
{code:java}
labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
   .between(Time.milliseconds(0), Time.milliseconds(60))
   .process(new processFunction())
   .sink(kafkaProducer)
{code}
    Our data is huge. The job always runs for an hour and is stuck by RocksDB's 
seek when get buffer's entries. We use rocksDB's data to simulate the problem 
RocksDB. 

  was:When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.


> When IntervalJoin, get left or right buffer's entries more quickly by 
> assigning lowerBound
> --
>
> Key: FLINK-11050
> URL: https://issues.apache.org/jira/browse/FLINK-11050
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Liu
>Priority: Major
>  Labels: performance
> Fix For: 1.7.1
>
>
>     When IntervalJoin, it is very slow to get left or right buffer's entries. 
> Because we have to scan all buffer's values, including the deleted values 
> which are out of time range. These deleted values's processing consumes too 
> much time in RocksDB's level 0. Since lowerBound is known, it can be 
> optimized by seek from the timestamp of lowerBound.
>     Our usage is like below:
> {code:java}
> labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
>.between(Time.milliseconds(0), Time.milliseconds(60))
>.process(new processFunction())
>.sink(kafkaProducer)
> {code}
>     Our data is huge. The job always runs for an hour and is stuck by 
> RocksDB's seek when get buffer's entries. We use rocksDB's data to simulate 
> the problem RocksDB. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10798) Add the version number of Flink 1.7 to MigrationVersion

2018-12-02 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10798:

Fix Version/s: 1.8.0

> Add the version number of Flink 1.7 to MigrationVersion
> ---
>
> Key: FLINK-10798
> URL: https://issues.apache.org/jira/browse/FLINK-10798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10996) Enable state ttl in cep

2018-12-02 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706685#comment-16706685
 ] 

aitozi commented on FLINK-10996:


Hi, [~StephanEwen]

I think it can be resolved by supporting the api to set the related expire ttl 
time in flink cep. If it is set, then cep operator can init the state with ttl 
when creating the nfa/sharedbuffer related state. I will work on it this week.

> Enable state ttl in cep
> ---
>
> Key: FLINK-10996
> URL: https://issues.apache.org/jira/browse/FLINK-10996
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.7.1
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.7.1
>
>
> In flink cep, although the within clause can clean up the data when data is 
> expire. But If the specific key data never come again, the partial matched 
> data will not be deleted again which will lead to the state leak. With the 
> state ttl feature support by flink runtime, we has the ability to expire the 
> correlative data asyncly . 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct aggregates for group window in Table API

2018-12-02 Thread GitBox
sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct 
aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181#issuecomment-443595531
 
 
   @dianfu @walterddr Thanks for the review and update! will merging..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11013) Fix distinct aggregates for group window in Table API

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706688#comment-16706688
 ] 

ASF GitHub Bot commented on FLINK-11013:


sunjincheng121 commented on issue #7181: [FLINK-11013] [table] Fix distinct 
aggregates for group window in Table API
URL: https://github.com/apache/flink/pull/7181#issuecomment-443595531
 
 
   @dianfu @walterddr Thanks for the review and update! will merging..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix distinct aggregates for group window in Table API
> -
>
> Key: FLINK-11013
> URL: https://issues.apache.org/jira/browse/FLINK-11013
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently distinct aggregates does not work on group window in Table API.
> For the following query:
> {code:java}
> val table = util.addTable[(Int, Long, String)](
>   "MyTable",
>   'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> val result = table
>   .window(Tumble over 15.minute on 'rowtime as 'w)
>   .groupBy('w)
>   .select('a.count.distinct, 'a.sum)
> {code}
> The following exception will be thrown:
> {code:java}
> org.apache.flink.table.api.ValidationException: It's not allowed to use an 
> aggregate function as input of another aggregate function
> at 
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:643)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1$$anonfun$apply$3.apply(operators.scala:641)
> at org.apache.flink.table.plan.TreeNode.preOrderVisit(TreeNode.scala:82)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:641)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$1.apply(operators.scala:640)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:640)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2$4.apply(operators.scala:654)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.org$apache$flink$table$plan$logical$WindowAggregate$$validateAggregateExpression$2(operators.scala:654)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate$$anonfun$validate$12.apply(operators.scala:628)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:628)
> at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1206)
> at 
> org.apache.flink.table.api.stream.table.DistinctAggregateTest.testDistinctAggregateOnTumbleWindow(DistinctAggregateTest.scala:60)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)

[GitHub] zjffdu opened a new pull request #7214: FLINK-11409

2018-12-02 Thread GitBox
zjffdu opened a new pull request #7214: FLINK-11409
URL: https://github.com/apache/flink/pull/7214
 
 
   ## What is the purpose of the change
   
   This PR allow user to run partial dag which is associated to certain sinks 
instead of all sinks.
   
   ## Brief change log
   
   It add new api in `ExecutionEnvironment` to allow user to specify the target 
sinks he want to run. 
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - * Add test in `ExecutionEnvironmentITCase.java`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no) 
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented) No
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-02 Thread ambition (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706717#comment-16706717
 ] 

ambition commented on FLINK-11039:
--

It was originally implemented as RelTimeIndicatorConverter.visit(exchange: 
LogicalExchange) function. But this function cannot be used directly in SQL 
statements, any distributed engine has Shuffle/Hash/Range/Broadcast operator to 
shuffle their data with other deamons in the runtime system. so this function 
helps to implement changes to these operations.

> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
> Fix For: 1.7.0
>
>
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11050) When IntervalJoin, get left or right buffer's entries more quickly by assigning lowerBound

2018-12-02 Thread Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liu updated FLINK-11050:

Description: 
    When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.

    Our usage is like below:
{code:java}
labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
   .between(Time.milliseconds(0), Time.milliseconds(60))
   .process(new processFunction())
   .sink(kafkaProducer)
{code}
    Our data is huge. The job always runs for an hour and is stuck by RocksDB's 
seek when get buffer's entries. We use rocksDB's data to simulate the problem 
RocksDB and find that it takes too much time in deleted values. So we decide to 
optimize it by assigning the lowerBound instead of global search.

  was:
    When IntervalJoin, it is very slow to get left or right buffer's entries. 
Because we have to scan all buffer's values, including the deleted values which 
are out of time range. These deleted values's processing consumes too much time 
in RocksDB's level 0. Since lowerBound is known, it can be optimized by seek 
from the timestamp of lowerBound.

    Our usage is like below:
{code:java}
labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
   .between(Time.milliseconds(0), Time.milliseconds(60))
   .process(new processFunction())
   .sink(kafkaProducer)
{code}
    Our data is huge. The job always runs for an hour and is stuck by RocksDB's 
seek when get buffer's entries. We use rocksDB's data to simulate the problem 
RocksDB. 


> When IntervalJoin, get left or right buffer's entries more quickly by 
> assigning lowerBound
> --
>
> Key: FLINK-11050
> URL: https://issues.apache.org/jira/browse/FLINK-11050
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Liu
>Priority: Major
>  Labels: performance
> Fix For: 1.7.1
>
>
>     When IntervalJoin, it is very slow to get left or right buffer's entries. 
> Because we have to scan all buffer's values, including the deleted values 
> which are out of time range. These deleted values's processing consumes too 
> much time in RocksDB's level 0. Since lowerBound is known, it can be 
> optimized by seek from the timestamp of lowerBound.
>     Our usage is like below:
> {code:java}
> labelStream.keyBy(uuid).intervalJoin(adLogStream.keyBy(uuid))
>.between(Time.milliseconds(0), Time.milliseconds(60))
>.process(new processFunction())
>.sink(kafkaProducer)
> {code}
>     Our data is huge. The job always runs for an hour and is stuck by 
> RocksDB's seek when get buffer's entries. We use rocksDB's data to simulate 
> the problem RocksDB and find that it takes too much time in deleted values. 
> So we decide to optimize it by assigning the lowerBound instead of global 
> search.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #7185: [FLINK-10884] [yarn/mesos] adjust container memory param to set a safe margin from offheap memory

2018-12-02 Thread GitBox
zhijiangW commented on a change in pull request #7185: [FLINK-10884] 
[yarn/mesos]  adjust  container memory param  to set a safe margin from offheap 
memory
URL: https://github.com/apache/flink/pull/7185#discussion_r238154920
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create(
 
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
-   // use the cut-off memory for off-heap (that was its intention)
-   final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
+   // (3) try to compute the offHeapMemory from a safe margin
+   final long restMemoryMB = containerMemoryMB - heapSizeMB;
+   final long offHeapCutoffMemory = 
calculateOffHeapCutoffMB(config, restMemoryMB);
 
 Review comment:
   I agree that both ways can work well. If we introduce the parameter 
`containerized.offheap-cutoff-ratio`, do you think we should also introduce 
`containerized.offheap-cutoff-min` to keep the same behavior with previous 
parameters?
   
   I suggest naming the current `containerized.heap-cutoff-ratio` to 
`containerized.memory-cutoff-ratio` to integrate all the memory overhead issues 
for below reasons:
   
   1. Less parameters seem better sometimes, but not always. If you want to cut 
off 100 heap memory, and 200 off-heap memory, then you can cut off 300 memory 
directly. It does not matter and no control how the 300 memory are used by heap 
or off-heap.  And it is actually stolen by any memory usages as long as no 
exceeding the container physical memory.
   
   2. Minimum change for only refactor the existing parameter name.
   
   Of course I can also accept the separate parameters if you insist on it. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706723#comment-16706723
 ] 

ASF GitHub Bot commented on FLINK-10884:


zhijiangW commented on a change in pull request #7185: [FLINK-10884] 
[yarn/mesos]  adjust  container memory param  to set a safe margin from offheap 
memory
URL: https://github.com/apache/flink/pull/7185#discussion_r238154920
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create(
 
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
-   // use the cut-off memory for off-heap (that was its intention)
-   final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
+   // (3) try to compute the offHeapMemory from a safe margin
+   final long restMemoryMB = containerMemoryMB - heapSizeMB;
+   final long offHeapCutoffMemory = 
calculateOffHeapCutoffMB(config, restMemoryMB);
 
 Review comment:
   I agree that both ways can work well. If we introduce the parameter 
`containerized.offheap-cutoff-ratio`, do you think we should also introduce 
`containerized.offheap-cutoff-min` to keep the same behavior with previous 
parameters?
   
   I suggest naming the current `containerized.heap-cutoff-ratio` to 
`containerized.memory-cutoff-ratio` to integrate all the memory overhead issues 
for below reasons:
   
   1. Less parameters seem better sometimes, but not always. If you want to cut 
off 100 heap memory, and 200 off-heap memory, then you can cut off 300 memory 
directly. It does not matter and no control how the 300 memory are used by heap 
or off-heap.  And it is actually stolen by any memory usages as long as no 
exceeding the container physical memory.
   
   2. Minimum change for only refactor the existing parameter name.
   
   Of course I can also accept the separate parameters if you insist on it. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Core
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Assignee: wgcn
>Priority: Major
>  Labels: pull-request-available, yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6441) Improve the UDTF

2018-12-02 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706574#comment-16706574
 ] 

Hequn Cheng commented on FLINK-6441:


I tested it in an ITCase. It seems the issue has not been fixed? [~RuidongLi] 
What's your plan on it. 

> Improve the UDTF
> 
>
> Key: FLINK-6441
> URL: https://issues.apache.org/jira/browse/FLINK-6441
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> According to [FLINK-6334], UDTF's apply method return a unbounded Table which 
> consists of a LogicalTableFunctionCall, and only supported Alias 
> transformation, this issue is focus on adding evaluating in Select, e.g 
> table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8739) Optimize runtime support for distinct filter

2018-12-02 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706595#comment-16706595
 ] 

Dian Fu commented on FLINK-8739:


[~walterddr] Thanks a lot. I'll provide a PR ASAP and looking forward to your 
review.

> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10149) Fink Mesos allocates extra port when not configured to do so.

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706765#comment-16706765
 ] 

ASF GitHub Bot commented on FLINK-10149:


asfgit closed pull request #7203: [FLINK-10149[mesos] Don't allocate extra 
mesos port for TM unless configured to do so
URL: https://github.com/apache/flink/pull/7203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 426a891e814..0c4e1f6bcba 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -136,8 +136,9 @@
/**
 * Config parameter to configure which configuration keys will 
dynamically get a port assigned through Mesos.
 */
-   public static final ConfigOption PORT_ASSIGNMENTS = 
key("mesos.resourcemanager.tasks.port-assignments")
-   .defaultValue("")
+   public static final ConfigOption PORT_ASSIGNMENTS =
+   key("mesos.resourcemanager.tasks.port-assignments")
+   .noDefaultValue()
.withDescription(Description.builder()
.text("Comma-separated list of configuration keys which 
represent a configurable port. " +
"All port keys will dynamically get a port 
assigned through Mesos.")
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 84ec2229a2a..637442c899d 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -41,6 +41,7 @@
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -64,12 +65,13 @@
 public class LaunchableMesosWorker implements LaunchableTask {
 
protected static final Logger LOG = 
LoggerFactory.getLogger(LaunchableMesosWorker.class);
+
/**
 * The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
 */
-   static final String[] TM_PORT_KEYS = {
+   static final Set TM_PORT_KEYS = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
"taskmanager.rpc.port",
-   "taskmanager.data.port"};
+   "taskmanager.data.port")));
 
private final MesosArtifactResolver resolver;
private final ContainerSpecification containerSpec;
@@ -342,16 +344,18 @@ public String toString() {
 * @return A deterministically ordered Set of port keys to expose from 
the TM container
 */
static Set extractPortKeys(Configuration config) {
-   final LinkedHashSet tmPortKeys = new 
LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+   final LinkedHashSet tmPortKeys = new 
LinkedHashSet<>(TM_PORT_KEYS);
 
final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
-   Arrays.stream(portKeys.split(","))
-   .map(String::trim)
-   .peek(key -> LOG.debug("Adding port key " + key + " to 
mesos request"))
-   .forEach(tmPortKeys::add);
+   if (portKeys != null) {
+   Arrays.stream(portKeys.split(","))
+   .map(String::trim)
+   .peek(key -> LOG.debug("Adding port key {} to 
mesos request"))
+   .forEach(tmPortKeys::add);
+   }
 
-   return tmPortKeys;
+   return Collections.unmodifiableSet(tmPortKeys);
}
 
@Override
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 6784e427c1f..48a436cb995 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -23,11 +23,14 @@
 
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import 

[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706598#comment-16706598
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars closed pull request #7157: [FLINK-10134] UTF-16 support for 
TextInputFormat bug
URL: https://github.com/apache/flink/pull/7157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..e13560d4823 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -28,6 +28,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.LRUCache;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Map;
 
 /**
  * Base implementation for input formats that split the input at a delimiter 
into records.
@@ -62,6 +64,23 @@
// Charset is not serializable
private transient Charset charset;
 
+   /**
+* The charset of bom in the file to process.
+*/
+   private transient Charset bomIdentifiedCharset;
+   /**
+* This is the charset that is configured via setCharset().
+*/
+   private transient Charset configuredCharset;
+   /**
+* The Map to record the BOM encoding of all files.
+*/
+   private transient final Map fileBomCharsetMap;
+   /**
+* The bytes to BOM check.
+*/
+   byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, 
(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+
/**
 * The default read buffer size = 1MB.
 */
@@ -184,6 +203,7 @@ protected DelimitedInputFormat(Path filePath, Configuration 
configuration) {
configuration = GlobalConfiguration.loadConfiguration();
}
loadConfigParameters(configuration);
+   this.fileBomCharsetMap = new LRUCache<>(1024);
}
 
/**
@@ -195,12 +215,25 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
 */
@PublicEvolving
public Charset getCharset() {
-   if (this.charset == null) {
+   if (this.configuredCharset != null) {
+   this.charset = this.configuredCharset;
+   } else if (this.bomIdentifiedCharset != null) {
+   this.charset = this.bomIdentifiedCharset;
+   } else {
this.charset = Charset.forName(charsetName);
}
return this.charset;
}
 
+   /**
+* get the charsetName.
+*
+* @return the charsetName
+*/
+   public String getCharsetName() {
+   return charsetName;
+   }
+
/**
 * Set the name of the character set used for the row delimiter. This is
 * also used by subclasses to interpret field delimiters, comment 
strings,
@@ -214,7 +247,7 @@ public Charset getCharset() {
@PublicEvolving
public void setCharset(String charset) {
this.charsetName = Preconditions.checkNotNull(charset);
-   this.charset = null;
+   this.configuredCharset = getSpecialCharset(charset);
 
if (this.delimiterString != null) {
this.delimiter = delimiterString.getBytes(getCharset());
@@ -472,6 +505,7 @@ public void open(FileInputSplit split) throws IOException {
 
this.offset = splitStart;
if (this.splitStart != 0) {
+   setBomFileCharset(split);
this.stream.seek(offset);
readLine();
// if the first partial record already pushes the 
stream over
@@ -481,6 +515,7 @@ public void open(FileInputSplit split) throws IOException {
}
} else {
fillBuffer(0);
+   setBomFileCharset(split);
}
}
 
@@ -536,6 +571,71 @@ public void close() throws IOException {
super.close();
}
 
+   /**
+* Special default processing for utf-16 and utf-32 is performed.
+*
+* @param charsetName

[GitHub] XuQianJin-Stars closed pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug

2018-12-02 Thread GitBox
XuQianJin-Stars closed pull request #7157: [FLINK-10134] UTF-16 support for 
TextInputFormat bug
URL: https://github.com/apache/flink/pull/7157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..e13560d4823 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -28,6 +28,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.LRUCache;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Map;
 
 /**
  * Base implementation for input formats that split the input at a delimiter 
into records.
@@ -62,6 +64,23 @@
// Charset is not serializable
private transient Charset charset;
 
+   /**
+* The charset of bom in the file to process.
+*/
+   private transient Charset bomIdentifiedCharset;
+   /**
+* This is the charset that is configured via setCharset().
+*/
+   private transient Charset configuredCharset;
+   /**
+* The Map to record the BOM encoding of all files.
+*/
+   private transient final Map fileBomCharsetMap;
+   /**
+* The bytes to BOM check.
+*/
+   byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, 
(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+
/**
 * The default read buffer size = 1MB.
 */
@@ -184,6 +203,7 @@ protected DelimitedInputFormat(Path filePath, Configuration 
configuration) {
configuration = GlobalConfiguration.loadConfiguration();
}
loadConfigParameters(configuration);
+   this.fileBomCharsetMap = new LRUCache<>(1024);
}
 
/**
@@ -195,12 +215,25 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
 */
@PublicEvolving
public Charset getCharset() {
-   if (this.charset == null) {
+   if (this.configuredCharset != null) {
+   this.charset = this.configuredCharset;
+   } else if (this.bomIdentifiedCharset != null) {
+   this.charset = this.bomIdentifiedCharset;
+   } else {
this.charset = Charset.forName(charsetName);
}
return this.charset;
}
 
+   /**
+* get the charsetName.
+*
+* @return the charsetName
+*/
+   public String getCharsetName() {
+   return charsetName;
+   }
+
/**
 * Set the name of the character set used for the row delimiter. This is
 * also used by subclasses to interpret field delimiters, comment 
strings,
@@ -214,7 +247,7 @@ public Charset getCharset() {
@PublicEvolving
public void setCharset(String charset) {
this.charsetName = Preconditions.checkNotNull(charset);
-   this.charset = null;
+   this.configuredCharset = getSpecialCharset(charset);
 
if (this.delimiterString != null) {
this.delimiter = delimiterString.getBytes(getCharset());
@@ -472,6 +505,7 @@ public void open(FileInputSplit split) throws IOException {
 
this.offset = splitStart;
if (this.splitStart != 0) {
+   setBomFileCharset(split);
this.stream.seek(offset);
readLine();
// if the first partial record already pushes the 
stream over
@@ -481,6 +515,7 @@ public void open(FileInputSplit split) throws IOException {
}
} else {
fillBuffer(0);
+   setBomFileCharset(split);
}
}
 
@@ -536,6 +571,71 @@ public void close() throws IOException {
super.close();
}
 
+   /**
+* Special default processing for utf-16 and utf-32 is performed.
+*
+* @param charsetName
+* @return
+*/
+   private Charset getSpecialCharset(String charsetName) {
+   Charset charset;
+   switch (charsetName.toUpperCase()) {
+   case "UTF-16":
+  

[jira] [Updated] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread luoguohao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoguohao updated FLINK-11046:
--
Description: 
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find that 
`synchronized` is needed on each add operation.
{code:java}
private synchronized void internalAdd(DocWriteRequest request, @Nullable Object 
payload) {
  ensureOpen();
  bulkRequest.add(request, payload);
  executeIfNeeded();
}
{code}
And, at i also noticed that `bulkprocessor` object would also locked in the 
bulk process thread. 

the bulk process operation is in the following code:
{code:java}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new 
ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
   if (concurrentRequests == 0) {
   latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) {  // if we fail on 
client.bulk() release the semaphore
toRelease.run();
}
}
}
{code}
As the read line i marked above, i think, that's the reason why the retry 
operation thread was block, because the the bulk process thread never release 
the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
`concurrentRequests` was set to zero. And i saw the the initialize for 
bulkprocessor in class `ElasticsearchSinkBase`:
{code:java}
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
 ...
 BulkProcessor.Builder bulkProcessorBuilder =  
callBridge.createBulkProcessorBuilder(client, listener);

 // This makes flush() blocking
 bulkProcessorBuilder.setConcurrentRequests(0);
 
 ...

 return bulkProcessorBuilder.build();
}
{code}
 this field value was set to zero explicitly. So, all things seems to make 
sense, but i still wonder why the retry operation is not in the same thread as 
the bulk process execution, after i read the code, `bulkAsync` method might be 
the last puzzle.
{code:java}
@Override
public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient 
client, BulkProcessor.Listener listener) {
 return BulkProcessor.builder(client::bulkAsync, listener);
}
{code}
So, I hope someone can help to fix this problem, or given some suggestions, and 
also i can make a try to take it. 
 Thanks a lot !

  was:
When i'm using es6 sink to index into es, bulk process with some exception 
catched, and  i trying to reindex the document with the call 
`indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, 
but things goes incorrect. The call thread stuck there, and with the thread 
dump, i saw the `bulkprocessor` object was locked by other thread. 
{code:java}
public interface ActionRequestFailureHandler extends Serializable {

 void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
RequestIndexer indexer) throws Throwable;

}
{code}
After i read the code implemented in the `indexer.add(action)`, i find 

[jira] [Commented] (FLINK-6441) Improve the UDTF

2018-12-02 Thread Ruidong Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706587#comment-16706587
 ] 

Ruidong Li commented on FLINK-6441:
---

Thanks for [~twalthr] and [~hequn8128], This issue has not been fixed. I'll fix 
it these days.

> Improve the UDTF
> 
>
> Key: FLINK-6441
> URL: https://issues.apache.org/jira/browse/FLINK-6441
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> According to [FLINK-6334], UDTF's apply method return a unbounded Table which 
> consists of a LogicalTableFunctionCall, and only supported Alias 
> transformation, this issue is focus on adding evaluating in Select, e.g 
> table.join(split('c) as ('a, b) select ('a * 2 as 'a, 'b + 1 as 'b))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8739) Optimize runtime support for distinct filter

2018-12-02 Thread Dian Fu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-8739:
--

Assignee: Dian Fu

> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Dian Fu
>Priority: Major
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-02 Thread luoguohao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706611#comment-16706611
 ] 

luoguohao commented on FLINK-11046:
---

sorry for missing that, here are all my settings for the ES sink:
 * bulk.flush.max.actions: 1000
 * bulk.flush.interval.ms: 10s
 * bulk.flush.max.size.mb: 10M
 * bulk.flush.backoff.enable: true
 * bulk.flush.backoff.retries: 3
 * bulk.flush.backoff.type: EXPONENTIAL
 * bulk.flush.backoff.delay: 1minute

 

 

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> {code:java}
> 

[jira] [Created] (FLINK-11049) Unable to execute partial DAG

2018-12-02 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-11049:
--

 Summary: Unable to execute partial DAG
 Key: FLINK-11049
 URL: https://issues.apache.org/jira/browse/FLINK-11049
 Project: Flink
  Issue Type: Bug
  Components: Job-Submission
Affects Versions: 1.7.0
Reporter: Jeff Zhang


{code}
val benv = ExecutionEnvironment.getExecutionEnvironment
val btEnv = TableEnvironment.getTableEnvironment(benv)
val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
data.writeAsText("/Users/jzhang/a.txt", FileSystem.WriteMode.OVERWRITE);
val table = data.flatMap(line=>line.split("\\s")).
  map(w => (w, 1)).
  toTable(btEnv, 'word, 'number)
btEnv.registerTable("wc", table)
btEnv.sqlQuery("select word, count(1) from wc group by word").
  toDataSet[Row].print()
{code}

In the above example, the last statement will trigger the 2 job execution 
(writeAsText and print), but what user expect is the print job. The root cause 
is that currently, flink unable to submit partial dag. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11049) Unable to execute partial DAG

2018-12-02 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang reassigned FLINK-11049:
--

Assignee: Jeff Zhang

> Unable to execute partial DAG
> -
>
> Key: FLINK-11049
> URL: https://issues.apache.org/jira/browse/FLINK-11049
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission
>Affects Versions: 1.7.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>
> {code}
> val benv = ExecutionEnvironment.getExecutionEnvironment
> val btEnv = TableEnvironment.getTableEnvironment(benv)
> val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
> data.writeAsText("/Users/jzhang/a.txt", FileSystem.WriteMode.OVERWRITE);
> val table = data.flatMap(line=>line.split("\\s")).
>   map(w => (w, 1)).
>   toTable(btEnv, 'word, 'number)
> btEnv.registerTable("wc", table)
> btEnv.sqlQuery("select word, count(1) from wc group by word").
>   toDataSet[Row].print()
> {code}
> In the above example, the last statement will trigger the 2 job execution 
> (writeAsText and print), but what user expect is the print job. The root 
> cause is that currently, flink unable to submit partial dag. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7203: [FLINK-10149[mesos] Don't allocate extra mesos port for TM unless configured to do so

2018-12-02 Thread GitBox
asfgit closed pull request #7203: [FLINK-10149[mesos] Don't allocate extra 
mesos port for TM unless configured to do so
URL: https://github.com/apache/flink/pull/7203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 426a891e814..0c4e1f6bcba 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -136,8 +136,9 @@
/**
 * Config parameter to configure which configuration keys will 
dynamically get a port assigned through Mesos.
 */
-   public static final ConfigOption PORT_ASSIGNMENTS = 
key("mesos.resourcemanager.tasks.port-assignments")
-   .defaultValue("")
+   public static final ConfigOption PORT_ASSIGNMENTS =
+   key("mesos.resourcemanager.tasks.port-assignments")
+   .noDefaultValue()
.withDescription(Description.builder()
.text("Comma-separated list of configuration keys which 
represent a configurable port. " +
"All port keys will dynamically get a port 
assigned through Mesos.")
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 84ec2229a2a..637442c899d 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -41,6 +41,7 @@
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -64,12 +65,13 @@
 public class LaunchableMesosWorker implements LaunchableTask {
 
protected static final Logger LOG = 
LoggerFactory.getLogger(LaunchableMesosWorker.class);
+
/**
 * The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
 */
-   static final String[] TM_PORT_KEYS = {
+   static final Set TM_PORT_KEYS = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
"taskmanager.rpc.port",
-   "taskmanager.data.port"};
+   "taskmanager.data.port")));
 
private final MesosArtifactResolver resolver;
private final ContainerSpecification containerSpec;
@@ -342,16 +344,18 @@ public String toString() {
 * @return A deterministically ordered Set of port keys to expose from 
the TM container
 */
static Set extractPortKeys(Configuration config) {
-   final LinkedHashSet tmPortKeys = new 
LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+   final LinkedHashSet tmPortKeys = new 
LinkedHashSet<>(TM_PORT_KEYS);
 
final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
-   Arrays.stream(portKeys.split(","))
-   .map(String::trim)
-   .peek(key -> LOG.debug("Adding port key " + key + " to 
mesos request"))
-   .forEach(tmPortKeys::add);
+   if (portKeys != null) {
+   Arrays.stream(portKeys.split(","))
+   .map(String::trim)
+   .peek(key -> LOG.debug("Adding port key {} to 
mesos request"))
+   .forEach(tmPortKeys::add);
+   }
 
-   return tmPortKeys;
+   return Collections.unmodifiableSet(tmPortKeys);
}
 
@Override
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 6784e427c1f..48a436cb995 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -23,11 +23,14 @@
 
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static 

[GitHub] kisimple opened a new pull request #7215: [hotfix][test][streaming] Fix invalid testNotSideOutputXXX in WindowOperatorTest

2018-12-02 Thread GitBox
kisimple opened a new pull request #7215: [hotfix][test][streaming] Fix invalid 
testNotSideOutputXXX in WindowOperatorTest
URL: https://github.com/apache/flink/pull/7215
 
 
   `lateOutputTag` should not be `null` when testing side output.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzanko-matev commented on issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative DataStream Jobs

2018-12-02 Thread GitBox
tzanko-matev commented on issue #1668: [FLINK-3257] Add Exactly-Once Processing 
Guarantees for Iterative DataStream Jobs
URL: https://github.com/apache/flink/pull/1668#issuecomment-443545153
 
 
   Hi, are there any updates on this feature?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2018-12-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706487#comment-16706487
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

tzanko-matev commented on issue #1668: [FLINK-3257] Add Exactly-Once Processing 
Guarantees for Iterative DataStream Jobs
URL: https://github.com/apache/flink/pull/1668#issuecomment-443545153
 
 
   Hi, are there any updates on this feature?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>Priority: Major
>  Labels: pull-request-available
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2018-12-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-3257:
--
Labels: pull-request-available  (was: )

> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>Priority: Major
>  Labels: pull-request-available
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10149) Fink Mesos allocates extra port when not configured to do so.

2018-12-02 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-10149.

Resolution: Fixed

fixed via
1.8: 
529288dfac1fb82b3c15d5acaeec2e54676a930d
ffa0a9e453b8f770a8fe8db08f8324de330db569
419a3535e8aa1688828c03ef022c7aee3ba56700
8ba8337941fd1f53f217d0fe1dfe749467db9a82

1.7:
e82df4f07f0a4e82410db0d30013b8df32e66720
e3017db7a54edea22b546d5f427b3f82ade56b25
3108567f886177cba7ae72ad1dd5124c57987860
0ff5f86f78a71ae77ccbe0e9abf5a387e339f67a

1.6:
b909d4d5bd96e492fe88db7a954b307f99d5c81d
9b7b6af5fc50cacc5c0b4ab11c78f1046d1e010d
bec8d7f5c0255dbdcee25cc9a023bb4cf86048be
dbba9b072c001d6ebc852d4bf36424f9353916bb

> Fink Mesos allocates extra port when not configured to do so.
> -
>
> Key: FLINK-10149
> URL: https://issues.apache.org/jira/browse/FLINK-10149
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Rune Skou Larsen
>Assignee: Gary Yao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> Internal testing has revealed a small bug in the way LaunchableMesosWorker 
> handles the absense of the new *mesos.resourcemanager.tasks.port-assignments* 
> config option.
> It allocates an extra mesos port even when the option is not set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)