[jira] [Updated] (FLINK-10915) clojure context.collectWithTimestamp Will be blocked.

2018-11-17 Thread fengge (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fengge updated FLINK-10915:
---
Description: 
{code:java}
(deftype Cflatmapfunction [] FlatMapFunction
  (flatMap [this value collector]
(log/info "value:" (type value) value )
(let [tomap (into {} value)
  {:keys [shopid  shopname]} (ym/readstring (get tomap "body"))]
  ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" "(ym/readstring 
body)" 1))
  (.collect collector (Tuple3. shopid shopname (int 1)))
  )
))

;;;The problem is here... Clojure realizes that FlatMapFunction will block in 
clusters.  but local jvm run is ok ..

{code}
{code:java}
(defn -main [& args]
  (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
_ (.enableCheckpointing flink-env 13000)
sources (.addSource flink-env
(RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
 (gen-consumer-properties)))
_ (.name sources "ririri")
_ (.setParallelism sources 1)
ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
_ (.name ednds "ccc")
_ (.setParallelism ednds 1)
_   (.print ednds)
;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
10))  2)
;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
(Time/minutes 2))  2)
]
(prn "开始有状态的流式计算1" flink-env)
;(.setParallelism ds 1)
;(.setParallelism ednds 1)
;(.print counts)
;(.print secondcounts)
(.execute flink-env"rocketmq-flink-feng2")
)
  )

{code}
 

  was:
{code:java}
(defn -main [& args]
  (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
_ (.enableCheckpointing flink-env 13000)
sources (.addSource flink-env
(RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
 (gen-consumer-properties)))
_ (.name sources "ririri")
_ (.setParallelism sources 1)
ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
_ (.name ednds "ccc")
_ (.setParallelism ednds 1)
_   (.print ednds)
;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
10))  2)
;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
(Time/minutes 2))  2)
]
(prn "开始有状态的流式计算1" flink-env)
;(.setParallelism ds 1)
;(.setParallelism ednds 1)
;(.print counts)
;(.print secondcounts)
(.execute flink-env"rocketmq-flink-feng2")
)
  )

{code}
 


> clojure   context.collectWithTimestamp  Will be blocked.
> 
>
> Key: FLINK-10915
> URL: https://issues.apache.org/jira/browse/FLINK-10915
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.6.2
>Reporter: fengge
>Priority: Minor
>
> {code:java}
> (deftype Cflatmapfunction [] FlatMapFunction
>   (flatMap [this value collector]
> (log/info "value:" (type value) value )
> (let [tomap (into {} value)
>   {:keys [shopid  shopname]} (ym/readstring (get tomap "body"))]
>   ;(.collect collector (Tuple5. msgid "orgidid" "toporgid" 
> "(ym/readstring body)" 1))
>   (.collect collector (Tuple3. shopid shopname (int 1)))
>   )
> ))
> ;;;The problem is here... Clojure realizes that FlatMapFunction will block in 
> clusters.  but local jvm run is ok ..
> {code}
> {code:java}
> (defn -main [& args]
>   (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
> _ (.enableCheckpointing flink-env 13000)
> sources (.addSource flink-env
> (RocketMQSource. 
> (SimpleKeyValueDeserializationSchema. "msgid" "body") 
>  (gen-consumer-properties)))
> _ (.name sources "ririri")
> _ (.setParallelism sources 1)
> ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
> _ (.name ednds "ccc")
> _ (.setParallelism ednds 1)
> _   (.print ednds)
> ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) 
> (Time/seconds 10))  2)
> ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
> (Time/minutes 2))  2)
> ]
> (prn "开始有状态的流式计算1" flink-env)
> ;(.setParallelism ds 1)
> ;(.setParallelism ednds 1)
> ;(.print counts)
> ;(.print secondcounts)
> (.execute flink-env"rocketmq-flink-feng2")
> )
>   )
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10915) clojure context.collectWithTimestamp Will be blocked.

2018-11-17 Thread fengge (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fengge updated FLINK-10915:
---
Description: 
```

(defn -main [& args]
 (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
 _ (.enableCheckpointing flink-env 13000)
 sources (.addSource flink-env
 (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") 
 (gen-consumer-properties)))
 _ (.name sources "ririri")
 _ (.setParallelism sources 1)
 ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
 _ (.name ednds "ccc")
 _ (.setParallelism ednds 1)
 _ (.print ednds)
 ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2)
 ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 
2)) 2)
 ]
 (prn "开始有状态的流式计算1" flink-env)
 ;(.setParallelism ds 1)
 ;(.setParallelism ednds 1)
 ;(.print counts)
 ;(.print secondcounts)
 (.execute flink-env"rocketmq-flink-feng2")
 )
 )

```

> clojure   context.collectWithTimestamp  Will be blocked.
> 
>
> Key: FLINK-10915
> URL: https://issues.apache.org/jira/browse/FLINK-10915
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.6.2
>Reporter: fengge
>Priority: Minor
>
> ```
> (defn -main [& args]
>  (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
>  _ (.enableCheckpointing flink-env 13000)
>  sources (.addSource flink-env
>  (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") 
>  (gen-consumer-properties)))
>  _ (.name sources "ririri")
>  _ (.setParallelism sources 1)
>  ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
>  _ (.name ednds "ccc")
>  _ (.setParallelism ednds 1)
>  _ (.print ednds)
>  ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 
> 2)
>  ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
> (Time/minutes 2)) 2)
>  ]
>  (prn "开始有状态的流式计算1" flink-env)
>  ;(.setParallelism ds 1)
>  ;(.setParallelism ednds 1)
>  ;(.print counts)
>  ;(.print secondcounts)
>  (.execute flink-env"rocketmq-flink-feng2")
>  )
>  )
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10915) clojure context.collectWithTimestamp Will be blocked.

2018-11-17 Thread fengge (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fengge updated FLINK-10915:
---
Description: 
{code:java}
//代码占位符
{code}
(defn -main [& args] (let [flink-env 
(StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing 
flink-env 13000) sources (.addSource flink-env (RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
(gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism 
sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) 
_ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts 
(.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) 
;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 
2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) 
;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute 
flink-env"rocketmq-flink-feng2") ) )

  was:
```

(defn -main [& args]
 (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
 _ (.enableCheckpointing flink-env 13000)
 sources (.addSource flink-env
 (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") 
 (gen-consumer-properties)))
 _ (.name sources "ririri")
 _ (.setParallelism sources 1)
 ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
 _ (.name ednds "ccc")
 _ (.setParallelism ednds 1)
 _ (.print ednds)
 ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2)
 ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 
2)) 2)
 ]
 (prn "开始有状态的流式计算1" flink-env)
 ;(.setParallelism ds 1)
 ;(.setParallelism ednds 1)
 ;(.print counts)
 ;(.print secondcounts)
 (.execute flink-env"rocketmq-flink-feng2")
 )
 )

```


> clojure   context.collectWithTimestamp  Will be blocked.
> 
>
> Key: FLINK-10915
> URL: https://issues.apache.org/jira/browse/FLINK-10915
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.6.2
>Reporter: fengge
>Priority: Minor
>
> {code:java}
> //代码占位符
> {code}
> (defn -main [& args] (let [flink-env 
> (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing 
> flink-env 13000) sources (.addSource flink-env (RocketMQSource. 
> (SimpleKeyValueDeserializationSchema. "msgid" "body") 
> (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism 
> sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) 
> CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print 
> ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
> 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
> (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) 
> ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute 
> flink-env"rocketmq-flink-feng2") ) )



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10915) clojure context.collectWithTimestamp Will be blocked.

2018-11-17 Thread fengge (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fengge updated FLINK-10915:
---
Description: 
{code:java}
(defn -main [& args]
  (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
_ (.enableCheckpointing flink-env 13000)
sources (.addSource flink-env
(RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
 (gen-consumer-properties)))
_ (.name sources "ririri")
_ (.setParallelism sources 1)
ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
_ (.name ednds "ccc")
_ (.setParallelism ednds 1)
_   (.print ednds)
;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
10))  2)
;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
(Time/minutes 2))  2)
]
(prn "开始有状态的流式计算1" flink-env)
;(.setParallelism ds 1)
;(.setParallelism ednds 1)
;(.print counts)
;(.print secondcounts)
(.execute flink-env"rocketmq-flink-feng2")
)
  )

{code}
 

  was:
{code:java}
//代码占位符
{code}
(defn -main [& args] (let [flink-env 
(StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing 
flink-env 13000) sources (.addSource flink-env (RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
(gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism 
sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) 
_ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts 
(.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) 
;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 
2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) 
;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute 
flink-env"rocketmq-flink-feng2") ) )


> clojure   context.collectWithTimestamp  Will be blocked.
> 
>
> Key: FLINK-10915
> URL: https://issues.apache.org/jira/browse/FLINK-10915
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.6.2
>Reporter: fengge
>Priority: Minor
>
> {code:java}
> (defn -main [& args]
>   (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
> _ (.enableCheckpointing flink-env 13000)
> sources (.addSource flink-env
> (RocketMQSource. 
> (SimpleKeyValueDeserializationSchema. "msgid" "body") 
>  (gen-consumer-properties)))
> _ (.name sources "ririri")
> _ (.setParallelism sources 1)
> ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
> _ (.name ednds "ccc")
> _ (.setParallelism ednds 1)
> _   (.print ednds)
> ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) 
> (Time/seconds 10))  2)
> ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
> (Time/minutes 2))  2)
> ]
> (prn "开始有状态的流式计算1" flink-env)
> ;(.setParallelism ds 1)
> ;(.setParallelism ednds 1)
> ;(.print counts)
> ;(.print secondcounts)
> (.execute flink-env"rocketmq-flink-feng2")
> )
>   )
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)