Re: REST API in an HA setup - must the leading JM be called?

2021-08-18 Thread Juha Mynttinen
Thank you, answers my questions.

--
Regards,
Juha

On Wed, Aug 18, 2021 at 2:28 PM Chesnay Schepler  wrote:

> You've pretty much answered the question yourself. *thumbs up*
>
> For the vast majority of cases you can call any JobManager.
> The exceptions are jar operations (because they are persisted in the
> JM-local filesystem, and other JMs don't know about them) and triggering
> savepoints (because metadata for on-going savepoint operations (i.e., the
> information returned when querying the savepoint operation status) is also
> kept locally in the JM).
>
> This does indeed imply that on JM failover all this information is lost.
>
> There are ideas to solve is, but no concrete timeline. See
> https://issues.apache.org/jira/browse/FLINK-18312
>
> On 18/08/2021 11:54, Juha Mynttinen wrote:
>
> I have questions related to REST API in the case of ZooKeeper HA and a
> standalone cluster. But I think the questions apply to other setups too
> such as YARN.
>
> Let's assume a standalone cluster with multiple JobManagers. The
> JobManagers elect the leader among themselves and register that to
> ZooKeeper. When using the Flink command line, AFAIK the code will go to
> ZooKeeper to find the host and port of the leading JobManager and send HTTP
> requests there.
>
> My question is: when accessing the REST API directly (e.g. curl) does one
> need to call the leading JobManager or will any up and running JobManager
> do? And if the leader needs to be called, why is it so?
>
> Behind the scenes the REST API will connect to the leading "JobManager"
> over RPC, making it irrelevant which JobManager receives the HTTP request.
>
> By experimenting, I found the Web UI works fine if all the JobManagers are
> behind a load balancer and leading and standby JobManagers are called. The
> only issue I found was that when a jar is submitted (/jars/upload), it is
> stored on the local disk of the JobManager that happens to handle that
> request. As a consequence, creating a job from that jar only succeeds if
> the HTTP request hits the JobManager that has the file. There might be a
> "hack" to overcome this limitation, set web.upload.dir to be in S3 / GCS or
> elsewhere accessible by all JobManagers. I didn't try this. Or in the case
> of uploading jars and creating jobs, ensure the same JobManager is called
> (bypass loadbalancer).
>
> But I wonder if there's something else why the leading JM should be called.
>
> A follow-up question arises. If the jars are stored only on the leading
> JobManager, doesn't that mean that if the leader changes, the new leader is
> not aware of the jars uploaded to the old leader? From the REST
> API's perspective this means that even in the JobManager HA setup and when
> always calling the leader, a simple "upload a jar and a deploy a job"-cycle
> is not guaranteed to work if the leader happens to change between the
> requests. Did I miss something?
>
> --
> Regards,
> Juha
>
>
>


failures during job start

2021-08-18 Thread Colletta, Edward
Any help with this would be appreciated.   Is it possible that this is a 
data/application issue or a flink config/resource issue?

Using flink 11.2, java 11, session cluster, 5 nodes 32 cores each node.

I have an issue where starting a job takes a long time, and sometimes fails 
with PartitionNotFoundException, but succeeds on restart.   The job has 10 
kafka sources (10 partitions for each topic) and parallelism 5.
The failure does not happen when the kafka logs are empty.

Note during below scenario, cpu usage on task manager and job managers is low 
(below 30%)

The scenario we see

  *   run request to load and run a jar, job appears on dashboard with all 160 
subtasks in Deploying state
  *   after 2 minutes some subtasks start transitioning to running.
  *   after another 30 seconds failure occurs and job goes into Restarting state
  *   after another minute, restart completes all nodes running.

Exception history shows
2021-08-15 07:55:02
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 205a0867c6ef540009acd962d556f981#0@a6b547c5096f3c33eb9059cfe767a2ec 
not found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765)
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






Pre shuffle aggregation in flink is not working

2021-08-18 Thread suman shil
I am trying to do pre shuffle aggregation in flink. Following is the
MapBundle implementation.



















*public class TaxiFareMapBundleFunction extends MapBundleFunction {@Overridepublic TaxiFare
addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
  if (value == null) {return input;}value.tip =
value.tip + input.tip;return value;}@Overridepublic
void finishBundle(Map buffer, Collector out)
throws Exception {for (Map.Entry entry :
buffer.entrySet()) {out.collect(entry.getValue());}
}}*

I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
not working as the "*count*" variable is always 0. Please let me know If I
am missing something.








*@Overridepublic void onElement(T element) throws Exception {
  count++;if (count >= maxCount) {
callback.finishBundle();reset();}}*

Here is the main code.


























*MapBundleFunction
mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger bundleTrigger = new CountBundleTrigger<>(10);
  KeySelector taxiFareLongKeySelector = new
KeySelector() {@Overridepublic Long
getKey(TaxiFare value) throws Exception {return
value.driverId;}};DataStream> hourlyTips =//fares.keyBy((TaxiFare
fare) -> fare.driverId)//
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;fares.transform("preshuffle",
TypeInformation.of(TaxiFare.class),new
TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
)).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(20)) {
  @Overridepublic long
extractTimestamp(TaxiFare element) {return
element.startTime.getEpochSecond();}
}).keyBy((TaxiFare fare) ->
fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
  .process(new AddTips());DataStream>
hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*

Thanks
Suman


Timer Service vs Custom Triggers

2021-08-18 Thread Aeden Jameson
My use case is that I'm producing a set of measurements by key every
60 seconds. Currently,  this is handled with the usual pattern of
keyBy().window(Tumbling...(60)).process(...) I need to provide the same
output, but as a result of a timeout. The data needed for the timeout
summary will be in the global state for that key. This seems possible by
either using the timer service in the process function without a window
(e.g. keyBy(..).process(..)) or by using a customer trigger. Why choose one
or the other?

-- 
Thanks,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson


Error while deserializing the element

2021-08-18 Thread vijayakumar palaniappan
Setup Specifics:
Version: 1.6.2
RocksDB Map State
Timers stored in rocksdb

When we have this job running for long periods of time like > 30 days, if
for some reason the job restarts, we encounter "Error while deserializing
the element". Is this a known issue fixed in later versions? I see some
changes to code for FLINK-10175, but we don't use any queryable state

Below is the stack trace

org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
element.

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)

at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)

at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.EOFException

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)

at org.apache.flink.types.StringValue.readString(StringValue.java:769)

at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)

at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)

at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)

at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)

at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)

at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)

... 20 more

-- 
Thanks,
-Vijay


Re: Periodic output at end of stream

2021-08-18 Thread Matthias Broecheler
Hey JING,

thanks for getting back to me. I tried to produce the smallest,
self-contained example that produces the phenomenon:
https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f

If you run MainRepl you should see an infinite loop of re-processing the 5
integers. The offending process is BufferedLatestSelector - specifically
the event timer that is registered in it. Without the timer the process
will not emit an output.

The timer is set whenever the state is null. Is there a problem with how I
implemented that buffering process?
Thank you,
Matthias

On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG  wrote:

> Hi Matthias,
> How often do you register the event-time timer?
> It is registered per input record, or re-registered a new timer after an
> event-time timer is triggered?
> Would you please provide your test case code, it would be very helpful for
> troubleshooting.
>
> Best wishes,
> JING ZHANG
>
> Matthias Broecheler  于2021年8月14日周六 上午3:44写道:
>
>> Hey guys,
>>
>> I have a KeyedProcessFunction that gathers statistics on the events that
>> flow through and emits it periodically (every few seconds) to a SideOutput.
>> However, at the end of stream the last set of statistics don't get
>> emitted. I read on the mailing list that processing time timers that are
>> pending don't get triggered when Flink cleans up a stream, but that event
>> timers do get triggered because a watermark with Long.MAX_VALUE is sent
>> through the stream.
>> Hence, I thought that I could register a "backup" event timer for
>> Long.MAX_VALUE-1 to make sure that my process function gets notified when
>> the stream ends to emit the in-flight statistics.
>>
>> However, now my simple test case (with a data source fromCollection of 4
>> elements) keeps iterating over the same 4 elements in an infinite loop.
>>
>> I don't know how to make sense of this and would appreciate your help.
>> Is there a better way to set a timer that gets triggered at the end of
>> stream?
>> And for my education: Why does registering an event timer cause an
>> infinite loop over the source elements?
>>
>> Thanks a lot and have a wonderful weekend,
>> Matthias
>>
>


Setting S3 parameters in a K8 jobmanager deployment

2021-08-18 Thread Robert Cullen
I have a kubernetes jobmanager deployment that requires parameters be
passed as command line rather than retrieving values from the flink-config
map. Is there a way to do this?

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1 # Set the value to greater than 1 to start standby JobManagers
  selector:
matchLabels:
  app: flink
  component: jobmanager
  template:
metadata:
  labels:
app: flink
component: jobmanager
spec:
  containers:
  - name: jobmanager
image: apache/flink:1.13.0-scala_2.11
args: ["jobmanager", "-Ds3.endpoint=https://192.173.0.0:9000;,
"-Ds3.access-key=key", "Ds3.secret-key=secret"]
ports

Robert Cullen
240-475-4490


Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Ingo Bürk
Hi Yuval,

I can expand a bit more on the technical side of validation, though as a
heads-up, I don't have a solution.

When validating entire pipelines on a logical level, you run into the
(maybe obvious) issue, that statements depend on previous statements. In
the simple case of a CREATE TABLE DDL followed by some query, ("full")
validation of the query depends on the table actually existing. On the
other hand, validating a CREATE TABLE DDL shouldn't actually execute that
DDL, creating a conflict.

Of course this is only a concern if during validation we care about the
table existing, but from the perspective of syntax this wouldn't matter.
However, Flink's parser (ParserImpl) under the hood calls
SqlToOperationConverter, which in some places does table lookups etc., so
it depends on the catalog manager. This prevents us from doing this kind of
validation. Ideally, SqlToOperationConverter would not have such a
dependency, but it takes some work to change that as operations would have
to be redesigned and "evaluated" later on.

I think, as of now, you'd have to actually use the CalciteParser directly
to bypass this call, but of course this is not accessible
(non-reflectively). I've also never tried this, so I don't know whether it
would actually work. It'd definitely be missing the ability to parse
anything handled in Flink's "extended parser" right now, but that is mostly
concerning SQL-client-specific syntax.


Best
Ingo

On Wed, Aug 18, 2021 at 2:41 PM Yuval Itzchakov  wrote:

> Thanks Ingo!
> I just discovered this a short while before you posted :)
>
> Ideally, I'd like to validate that the entire pipeline is set up
> correctly. The problem is that I can't use methods like `tableEnv.sqlQuery`
> from multiple threads, and this is really limiting my ability to speed up
> the process (today it takes over an hour to complete, which isn't
> reasonable).
>
> If anyone has any suggestions on how I can still leverage the
> TableEnvironment in the processor to validate my SQL queries I'd be happy
> to know.
>
> On Wed, Aug 18, 2021 at 2:37 PM Ingo Bürk  wrote:
>
>> Hi Yuval,
>>
>> if syntactical correctness is all you care about, parsing the SQL should
>> suffice. You can get a hold of the parser from
>> TableEnvironmentImpl#getParser and then run #parse. This will require you
>> to cast your table environment to the (internal) implementation, but maybe
>> this works for you?
>>
>>
>> Best
>> Ingo
>>
>> On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a use-case where I need to validate hundreds of Flink SQL
>>> queries. Ideally, I'd like to run these validations in parallel. But, given
>>> that there's an issue with Calcite and the use of thread-local storage, I
>>> can only interact with the table runtime via a single thread.
>>>
>>> Ideally, I don't really care about the overall registration process of
>>> sources, transformations and sinks, I just want to make sure the syntax is
>>> correct from Flinks perspective.
>>>
>>> Is there any straightforward way of doing this?
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Process suspend when get Hana connection in open method of sink function

2021-08-18 Thread Chenzhiyuan(HR)
Dear all:
I have a problem when I want to sink data to Hana database.
Process is suspended when get Hana connection in the open method of sink 
function as below.
My flink version is 1.10.


public class HrrmPayValueSumToHana extends 
RichSinkFunction  {


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HrrmUtils.getHanaConnection();// process is suspended here
}



@Override
public void invoke() {

   ...

}



@Override
public void close() throws Exception {

   ..

}
}



public static Connection getHanaConnection() {
Connection con = null;
try {
Class.forName(HrrmConstants.HANA_DRIVER_CLASS);
con = DriverManager.getConnection(HrrmConstants.HANA_SOURCE_DRIVER_URL,
HrrmConstants.HANA_SOURCE_USER, HrrmConstants.HANA_SOURCE_PASSWORD);
} catch (Exception e) {
LOG.error("---hana get connection has exception , msg = ", e);
}
return con;
}


Hana driver dependency as below:



com.sap.cloud.db.jdbc
ngdbc
2.3.62






Process suspend when get Hana connection in open method of sink function

2021-08-18 Thread Chenzhiyuan(HR)
Dear all:
I have a problem when I want to sink data to Hana database.
Process is suspended when get Hana connection in the open method of sink 
function as below.
My flink version is 1.10.


public class HrrmPayValueSumToHana extends 
RichSinkFunction  {


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HrrmUtils.getHanaConnection();// process is suspended here
}



@Override
public void invoke() {

   ...

}



@Override
public void close() throws Exception {

   ..

}
}



public static Connection getHanaConnection() {
Connection con = null;
try {
Class.forName(HrrmConstants.HANA_DRIVER_CLASS);
con = DriverManager.getConnection(HrrmConstants.HANA_SOURCE_DRIVER_URL,
HrrmConstants.HANA_SOURCE_USER, HrrmConstants.HANA_SOURCE_PASSWORD);
} catch (Exception e) {
LOG.error("---hana get connection has exception , msg = ", e);
}
return con;
}


Hana driver dependency as below:



com.sap.cloud.db.jdbc
ngdbc
2.3.62






Re: flink Kinesis Consumer Connected but not consuming

2021-08-18 Thread Danny Cranmer
Hey Tarun,



Your application looks ok and should work. I did notice this, however I
cannot imagine it is an issue, unless you are not setting the region
correctly:

   - getKafkaConsumerProperties()



Make sure you are setting the correct region
(AWSConfigConstants.AWS_REGION) in the properties. If this is ok, please
check Flink dashboard to ensure the following metrics are flowing for this
operator:

   - millisBehindLatest
   - numberOfAggregatedRecords
   - numberOfDeaggregatedRecords



Thanks,

On Wed, Aug 18, 2021 at 2:33 AM tarun joshi <1985.ta...@gmail.com> wrote:

> Hey All,
>
> I am running flink in docker containers (image Tag
> :flink:scala_2.11-java11) on EC2.
>
> I am able to connect to a Kinesis Connector but nothing is being consumed.
>
> My command to start Jobmanager and TaskManager :
>
>
>
>
>
>
>
>
>
> *docker run \--rm \--volume /root/:/root/ \--env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
> \--env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \--name=jobmanager \--network flink-network \--publish 8081:8081
> \flink:scala_2.11-java11 jobmanager &*
>
>
>
>
>
>
>
>
>
> *docker run \--rm \--env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
> \--env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
> taskmanager &*
>
> 2021-08-17 22:38:01,106 INFO
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
> Subtask 0 will be seeded with initial shard StreamShardHandle{streamName=' Stream Name>', shard='{ShardId: shardId-,HashKeyRange:
> {StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
> ,SequenceNumberRange: {StartingSequenceNumber:
> 49600280467722672235426674687631661510244124728928239618,}}'}, starting
> state set as sequence number LATEST_SEQUENCE_NUM
>
> &&& this for each shard Consumer
>
> 2021-08-17 22:38:01,107 INFO
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
> [] - Subtask 0 will start consuming seeded shard
> StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
> shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
> 34028236692093846346337460743176821144},SequenceNumberRange:
> {StartingSequenceNumber:
> 49600280467722672235426674687631661510244124728928239618,}}'} from
> sequence number LATEST_SEQUENCE_NUM with ShardConsumer 0
>
> my program is simple to test out a DataStream from Kinesis
>
> FlinkKinesisConsumer kinesisConsumer =
> new FlinkKinesisConsumer<>(
> "", new SimpleStringSchema(), 
> getKafkaConsumerProperties());
> env.addSource(kinesisConsumer).print();
>
> env.execute("Read files in streaming fashion");
>
> Other Facts:
>
>
>1. I can see data being flowing into our kinesis stream from the
>Monitoring Tab of AWS continuously.
>2. I was facing issues with Authorization of accessing the Kinesis in
>our AWS infra, but I resolved that by moving in the same security group of
>Kinesis deployment and creating a role with full access to Kinesis.
>
>
> Any pointers are really appreciated!
>
> Thanks,
> Tarun
>


固定间隔重启策略 - 计数逻辑

2021-08-18 Thread much l
Hi 大家好:
我想问一下当重启策略为:restart-strategy: fixed-delay 时,其参数
restart-strategy.fixed-delay.attempts
是全局计数(任务生命周期)?还是每次当任务从HA失败策略中恢复后,会重置重试次数,下次失败重新从0开始?


Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Yuval Itzchakov
Thanks Ingo!
I just discovered this a short while before you posted :)

Ideally, I'd like to validate that the entire pipeline is set up correctly.
The problem is that I can't use methods like `tableEnv.sqlQuery` from
multiple threads, and this is really limiting my ability to speed up the
process (today it takes over an hour to complete, which isn't reasonable).

If anyone has any suggestions on how I can still leverage the
TableEnvironment in the processor to validate my SQL queries I'd be happy
to know.

On Wed, Aug 18, 2021 at 2:37 PM Ingo Bürk  wrote:

> Hi Yuval,
>
> if syntactical correctness is all you care about, parsing the SQL should
> suffice. You can get a hold of the parser from
> TableEnvironmentImpl#getParser and then run #parse. This will require you
> to cast your table environment to the (internal) implementation, but maybe
> this works for you?
>
>
> Best
> Ingo
>
> On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov 
> wrote:
>
>> Hi,
>>
>> I have a use-case where I need to validate hundreds of Flink SQL queries.
>> Ideally, I'd like to run these validations in parallel. But, given that
>> there's an issue with Calcite and the use of thread-local storage, I can
>> only interact with the table runtime via a single thread.
>>
>> Ideally, I don't really care about the overall registration process of
>> sources, transformations and sinks, I just want to make sure the syntax is
>> correct from Flinks perspective.
>>
>> Is there any straightforward way of doing this?
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

-- 
Best Regards,
Yuval Itzchakov.


Re: flink 1.13.1版本,使用hive方言,执行insert overwirite语句,插入数据为空时,没有将表中原数据清空

2021-08-18 Thread Rui Li
你好,

这个可以去开个jira跟踪一下

On Tue, Aug 17, 2021 at 2:47 PM Asahi Lee <978466...@qq.com.invalid> wrote:

> hi!
> 
> 我使用如下sql,我select查询的数据为0行记录时,运行结束后,插入表的原数据没有被清空;而我在hive客户端执行时,表是被清空的!
> INSERT OVERWRITE target_table SELECT * from source_table where id  10;



-- 
Best regards!
Rui Li


Re: Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Ingo Bürk
Hi Yuval,

if syntactical correctness is all you care about, parsing the SQL should
suffice. You can get a hold of the parser from
TableEnvironmentImpl#getParser and then run #parse. This will require you
to cast your table environment to the (internal) implementation, but maybe
this works for you?


Best
Ingo

On Wed, Aug 18, 2021 at 12:51 PM Yuval Itzchakov  wrote:

> Hi,
>
> I have a use-case where I need to validate hundreds of Flink SQL queries.
> Ideally, I'd like to run these validations in parallel. But, given that
> there's an issue with Calcite and the use of thread-local storage, I can
> only interact with the table runtime via a single thread.
>
> Ideally, I don't really care about the overall registration process of
> sources, transformations and sinks, I just want to make sure the syntax is
> correct from Flinks perspective.
>
> Is there any straightforward way of doing this?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: REST API in an HA setup - must the leading JM be called?

2021-08-18 Thread Chesnay Schepler

You've pretty much answered the question yourself. *thumbs up*

For the vast majority of cases you can call any JobManager.
The exceptions are jar operations (because they are persisted in the 
JM-local filesystem, and other JMs don't know about them) and triggering 
savepoints (because metadata for on-going savepoint operations (i.e., 
the information returned when querying the savepoint operation status) 
is also kept locally in the JM).


This does indeed imply that on JM failover all this information is lost.

There are ideas to solve is, but no concrete timeline. See 
https://issues.apache.org/jira/browse/FLINK-18312


On 18/08/2021 11:54, Juha Mynttinen wrote:
I have questions related to REST API in the case of ZooKeeper HA and a 
standalone cluster. But I think the questions apply to other setups 
too such as YARN.


Let's assume a standalone cluster with multiple JobManagers. The 
JobManagers elect the leader among themselves and register that to 
ZooKeeper. When using the Flink command line, AFAIK the code will go 
to ZooKeeper to find the host and port of the leading JobManager and 
send HTTP requests there.


My question is: when accessing the REST API directly (e.g. curl) does 
one need to call the leading JobManager or will any up and 
running JobManager do? And if the leader needs to be called, why is it so?


Behind the scenes the REST API will connect to the leading 
"JobManager" over RPC, making it irrelevant which JobManager receives 
the HTTP request.


By experimenting, I found the Web UI works fine if all the JobManagers 
are behind a load balancer and leading and standby JobManagers are 
called. The only issue I found was that when a jar is submitted 
(/jars/upload), it is stored on the local disk of the JobManager that 
happens to handle that request. As a consequence, creating a job from 
that jar only succeeds if the HTTP request hits the JobManager that 
has the file. There might be a "hack" to overcome this limitation, set 
web.upload.dir to be in S3 / GCS or elsewhere accessible by all 
JobManagers. I didn't try this. Or in the case of uploading jars and 
creating jobs, ensure the same JobManager is called (bypass loadbalancer).


But I wonder if there's something else why the leading JM should be 
called.


A follow-up question arises. If the jars are stored only on the 
leading JobManager, doesn't that mean that if the leader changes, the 
new leader is not aware of the jars uploaded to the old leader? From 
the REST API's perspective this means that even in the JobManager HA 
setup and when always calling the leader, a simple "upload a jar and a 
deploy a job"-cycle is not guaranteed to work if the leader happens to 
change between the requests. Did I miss something?


--
Regards,
Juha





Validating Flink SQL without registering with StreamTableEnvironment

2021-08-18 Thread Yuval Itzchakov
Hi,

I have a use-case where I need to validate hundreds of Flink SQL queries.
Ideally, I'd like to run these validations in parallel. But, given that
there's an issue with Calcite and the use of thread-local storage, I can
only interact with the table runtime via a single thread.

Ideally, I don't really care about the overall registration process of
sources, transformations and sinks, I just want to make sure the syntax is
correct from Flinks perspective.

Is there any straightforward way of doing this?

-- 
Best Regards,
Yuval Itzchakov.


REST API in an HA setup - must the leading JM be called?

2021-08-18 Thread Juha Mynttinen
I have questions related to REST API in the case of ZooKeeper HA and a
standalone cluster. But I think the questions apply to other setups too
such as YARN.

Let's assume a standalone cluster with multiple JobManagers. The
JobManagers elect the leader among themselves and register that to
ZooKeeper. When using the Flink command line, AFAIK the code will go to
ZooKeeper to find the host and port of the leading JobManager and send HTTP
requests there.

My question is: when accessing the REST API directly (e.g. curl) does one
need to call the leading JobManager or will any up and running JobManager
do? And if the leader needs to be called, why is it so?

Behind the scenes the REST API will connect to the leading "JobManager"
over RPC, making it irrelevant which JobManager receives the HTTP request.

By experimenting, I found the Web UI works fine if all the JobManagers are
behind a load balancer and leading and standby JobManagers are called. The
only issue I found was that when a jar is submitted (/jars/upload), it is
stored on the local disk of the JobManager that happens to handle that
request. As a consequence, creating a job from that jar only succeeds if
the HTTP request hits the JobManager that has the file. There might be a
"hack" to overcome this limitation, set web.upload.dir to be in S3 / GCS or
elsewhere accessible by all JobManagers. I didn't try this. Or in the case
of uploading jars and creating jobs, ensure the same JobManager is called
(bypass loadbalancer).

But I wonder if there's something else why the leading JM should be called.

A follow-up question arises. If the jars are stored only on the leading
JobManager, doesn't that mean that if the leader changes, the new leader is
not aware of the jars uploaded to the old leader? From the REST
API's perspective this means that even in the JobManager HA setup and when
always calling the leader, a simple "upload a jar and a deploy a job"-cycle
is not guaranteed to work if the leader happens to change between the
requests. Did I miss something?

--
Regards,
Juha


Re:Re: cumulate函数和比较函数连用报错

2021-08-18 Thread 李航飞
哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>Hi!
>
>目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>
>李航飞  于2021年8月18日周三 下午3:55写道:
>
>> 通过flinksql建立数据处理通道
>> SELECT window_start,window_end,SUM(price)
>>
>> FROM TABLE(
>>
>> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10'
>> MINUTES))
>>
>> GROUP BY window_start,window_end;
>>
>> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> java.lang.UnsupportedOperationException:
>> Currently Flink doesn't support individual window table-valued function
>> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>>  Please use window table-valued function with aggregate together using
>> window_start and window_end as group keys.
>> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>>
>>


Re: flinksql的udf中可以使用Operator state的api么?

2021-08-18 Thread Caizhi Weng
Hi!

SQL 目前并不支持 stateful udf,你可能需要通过 data stream api 来完成这个需求,详见文档
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/process_function/

andrew <15021959...@163.com> 于2021年8月17日周二 下午7:04写道:

>  hi,你好:
>  通过flinksql读kafka数据流,实现监控用户信息基于上一次状态值发生变更触发最新用户信息输出.


Re: cumulate函数和比较函数连用报错

2021-08-18 Thread Caizhi Weng
Hi!

目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。

李航飞  于2021年8月18日周三 下午3:55写道:

> 通过flinksql建立数据处理通道
> SELECT window_start,window_end,SUM(price)
>
> FROM TABLE(
>
> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10'
> MINUTES))
>
> GROUP BY window_start,window_end;
>
> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
> 关键一步是 StatementSet对象 sta.execute() 执行报错
> java.lang.UnsupportedOperationException:
> Currently Flink doesn't support individual window table-valued function
> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>  Please use window table-valued function with aggregate together using
> window_start and window_end as group keys.
> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>
>


cumulate函数和比较函数连用报错

2021-08-18 Thread 李航飞
通过flinksql建立数据处理通道
SELECT window_start,window_end,SUM(price)

FROM TABLE(

CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL '10' 
MINUTES))

GROUP BY window_start,window_end;

大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
关键一步是 StatementSet对象 sta.execute() 执行报错
java.lang.UnsupportedOperationException:
Currently Flink doesn't support individual window table-valued function 
CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
 Please use window table-valued function with aggregate together using 
window_start and window_end as group keys.
执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。



Re:广告业务中 如何用flink替换spark一些逻辑处理,是否需要用到processfunction

2021-08-18 Thread 东东



这意思是处理乱序吧,如果重试10次都join不上就放弃的意思?


flink下面就是双流interval join的事情吧,然后watermark设置要斟酌一下,如果对延迟不敏感就直接30分钟,如果敏感也可以搞分级重试。


纯猜测。






在 2021-08-18 10:25:49,"张锴"  写道:
>需求描述:
>需要将目前的spark程序替换成flink去做,在梳理逻辑的时候有一块不知道用flink咋实现,spark是按每三分钟一个批次来跑的。
>描述如下:
>广告日志按照ask日志->bid->show->click顺序流程,要求是要将不同的日志都与bid日志merge,来保证bid数据的完整性,key按sessionid+Adid做唯一
>逻辑:spark读取多个日志topic
>含xxtopic,格式化,joinAll之后得到(string,pair)日志类型pair.logType如果是'bid'直接写到bidtopic,如果是其他类型,需要从之前HBASE缓存中拿bid表匹配,匹配到(可能是show
>or click ..)合并输出到bidtopic,
>没有匹配到,会有pair.n来记录次数,并写到xxtopic,n>10次(循环来回30分钟)都没有匹配到bid数据直接写到bidtopic,n<=10次内匹配不到bid
>n+1,并写到xxtopic进入下个批次。
>10次是业务方提的,也就是30分钟的缓存,如果没有10次限定,会有很多数据都写到xxtopic,这里不涉及计算,只是合并,也不去重,假如根据key
>找到了3条同样的数据,也要合并三条。
>
>这个用flink怎么实现?