退订

2021-09-11 Thread qq
退订


Re: Flink 1.9.2 why always checkpoint expired

2020-04-27 Thread qq
Hi Jiayi Liao.

  Thanks your replying.   Add attachment . And can’t get any useful messages;

 


> 2020年4月27日 12:40,Jiayi Liao  写道:
> 
> <粘贴的图形-1.tiff>



Flink 1.9.2 why always checkpoint expired

2020-04-26 Thread qq
Hi all,

Why my flink checkpoint always expired, I used RocksDB checkpoint,
and I can’t get any useful messages for this. Could you help me ? Thanks very 
much.





Flink on Yarn resource arrangement

2019-11-13 Thread qq
Hi all,

   Could you list details how  Flink job on Yarn resources managed ? 

  I used command “-p 20 -yn 5 -ys 3 -yjm 2048m -ytm 2048m” to run flink job. I 
got 
containers vcores
8 22
Task Managers 7 Total Task Slots 21 


I used command “-p 20 -yn 7 -ys 4 -yjm 2048m -ytm 2048m” to run flink job, I got
containers vcores
   621
Total Task Slots 20 Task Managers 5

Could you help give the exactly resources formula ? Thanks very much.



Alex Fu
2019/11/14

Re: Flunk savepoin(checkpoint) load api or debug

2019-11-07 Thread qq
Hi all,

   Thanks very much. I wants to debug checkpoint with code. Below is my code. 
Anyway I am sorry I doesn’t understand UT class. 
def demo(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(1)
  val checkpointConfig = env.getCheckpointConfig
  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  checkpointConfig.setMinPauseBetweenCheckpoints(5000)
  checkpointConfig.setCheckpointTimeout(5000)
  checkpointConfig.setMaxConcurrentCheckpoints(1)
  
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  val fsStateBackend: StateBackend = new FsStateBackend(STATE_BACKEND)
  env.setStateBackend(fsStateBackend)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3))

  //TODO recovery my checkpoint here or run this  job from my checkpoint
  // how to run this job with checkpoint metadata ? use CheckpointCoordinator ??
  val dataStream: DataStream[String] = 
env.addSource(streamSource).name("mysource")
  dataStream.addSink(new MySQLSink).uid("tesCheckpoint").name("mysink")
  env.execute()
}
MySQLSink:
class MySQLSink extends RichSinkFunction[String] with CheckpointedFunction {

  private val bufferSize = 50
  private var count: AtomicInteger = _
  private var cacheData: ListBuffer[String] = ListBuffer[String]()
  private var checkpointedState: ListState[(String, ListBuffer[String])] = _

  override def open(parameters: Configuration): Unit = {
count = new AtomicInteger(0)
  }

  override def invoke(jsonData: String, context: SinkFunction.Context[_]): Unit 
= {
val flag = count.getAndIncrement()
val end: Long = System.currentTimeMillis()
val result = jsonData.substring(0,jsonData.length-1) + ",\"fend\":"+end+"}";
if (flag >= bufferSize) {
  cacheData += result
  saveDataList()
  cacheData.clear()
  count.set(1)
} else {
  cacheData += result
}
  }

  def saveDataList(): Unit = {

  }

  override def close(): Unit = {
super.close()
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
val buffer = ListBuffer[(String, ListBuffer[String])](("nlcpTestData", 
cacheData))
checkpointedState.addAll(buffer.toList.asJava)
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
val listStateDesc = new ListStateDescriptor[(String, 
ListBuffer[String])]("nlcpTestData", TypeInformation.of(new TypeHint[(String, 
ListBuffer[String])]() {}))
val stateStore: OperatorStateStore = context.getOperatorStateStore
checkpointedState = stateStore.getListState(listStateDesc)
if (context.isRestored) {
  val data = checkpointedState.get().iterator()
  while (data.hasNext) {
cacheData ++= data.next()._2
  }
}
  }

}


> 在 2019年11月7日,12:03,Congxian Qiu  写道:
> 
> Hi,
> If you just want to debug, maybe you can do this in UT class in module
> flink-runtime :) so that you do not need to handle the dependency problem,
> and access problem.
> 
> Best,
> Congxian
> 
> 
> Jark Wu  于2019年11月6日周三 下午3:39写道:
> 
>> Btw, user questions should be asked in user@f.a.o or user-zh@f.a.o. The
>> dev
>> ML is mainly used to discuss development.
>> 
>> Best,
>> Jark
>> 
>> On Wed, 6 Nov 2019 at 15:36, Jark Wu  wrote:
>> 
>>> Hi,
>>> 
>>> Savepoint.load(env, path) is in state processor API library, you should
>>> add the following dependency in your project.
>>> 
>>> 
>>>  org.apache.flink
>>>  flink-state-processor-api_2.11
>>>  1.9.1
>>> 
>>> 
>>> 
>>> You can see the docuementation for more detailed instructions [1].
>>> 
>>> Best,
>>> Jark
>>> 
>>> [1]:
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>> 
>>> On Wed, 6 Nov 2019 at 09:21, qq <471237...@qq.com> wrote:
>>> 
>>>> Hi all,
>>>>   I want to load checkpoint or savepoint metadata on dev . in this case
>>>> , I want to debug saved checkpoint metadata. And I knew flink provided a
>>>> api which is Savepoint.load(env, path), but I can’t find it and can’t
>> use
>>>> it. Anyone who know about this ? Could you help me ? Thanks very much;
>>>> 
>>>> 
>> 
> 



Flunk savepoin(checkpoint) load api or debug

2019-11-05 Thread qq
Hi all,
   I want to load checkpoint or savepoint metadata on dev . in this case , I 
want to debug saved checkpoint metadata. And I knew flink provided a api which 
is Savepoint.load(env, path), but I can’t find it and can’t use it. Anyone who 
know about this ? Could you help me ? Thanks very much;



[DISSCUSS] Tolerate temporarily suspended ZooKeeper connections

2019-07-20 Thread QQ邮箱
Hi All,

Desc
We deploy flink streaming jobs on hadoop cluster on per-job model and use 
zookeeper as HighAvailabilityService, but we found that flink job will restart 
because of the network disconnected temporarily between jobmanager and 
zookeeper.So we analyze this problem deeply. Flink JobManager use curator's 
`LeaderLatch` to maintain the leadership. When network disconncet, the 
`LeaderLatch` will change leadership to false directly. We think it's too 
brutally that many flink longrunning jobs will restart because of the network 
shake.Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
connection, it would be better to wait until the ZooKeeper connection is LOST.

Here're two jiras about the problem, FLINK-10052 and FLINK-13189, they are 
duplicate. Thanks to @Elias Levy told us that FLINK-13189, so close FLINK-13189.

Solution
Back to this problem, there're two ways to solve this currently, one is rewrite 
LeaderLatch#handleStateChange method, another is upgrade curator-4.2.0. The 
first way is hackly but right, the second way need to consider the 
compatibility. For more detail, please see FLINK-10052.

Hope
The FLINK-10052 was reported at 2018-08-03(about a year ago), so we hope this 
problem can fix as soon as possible. 
btw, thanks @TisonKun for talking about this problem and review pr.

Links
FLINK-10052 https://issues.apache.org/jira/browse/FLINK-10052 

FLINK-13189 https://issues.apache.org/jira/browse/FLINK-13189 


Any suggestion is welcome, what do you think? 

Best, lamber-ken.