Re: How to user ParameterTool.fromPropertiesFile() to get resource file inside my jar

2017-09-10 Thread Tony Wei
Hi Aljoscha,

I found the root cause of my problem from this reference
https://stackoverflow.com/questions/18151072/cant-find-resource-file-after-exporting-to-a-runnable-jar
.
So I changed the way to use ParameterTool. I read the configurations from
InputStream, construct them as argument format and used
ParameterTool.fromArgs() to parse them with other arguments.
I'm not sure if this is a good solution. If you have any better one, please
let me know. Thanks for your help.

Best Regards,
Tony Wei

2017-09-08 23:40 GMT+08:00 Tony Wei :

> Hi Aljoscha,
>
> I have tried 
> `StreamJob.class.getClassLoader().getResource("application.conf").getPath()`,
> but I got this exception.
>
> Caused by: java.io.FileNotFoundException: Properties file
> /home/tonywei/flink/file:/tmp/flink-web-24351e69-a261-45be-
> 9503-087db8155a8f/d69a3ca9-bfa0-43ef-83e8-e15f38162a87_
> quickstart-0.1.jar!/application.conf
>
> Best Regards,
> Tony Wei
>
> 2017-09-08 23:24 GMT+08:00 Aljoscha Krettek :
>
>> Hi,
>>
>> How are you specifying the path for the properties file? Have you tried
>> reading the properties by using this.getClass().getClassLoader
>> ().getResource()?
>>
>> Best,
>> Aljoscha
>>
>> > On 8. Sep 2017, at 16:32, Tony Wei  wrote:
>> >
>> > Hi,
>> >
>> > I put the my configuration file in `./src/main/resources/` and packed
>> it inside my jar.
>> > I want to run it on standalone cluster by using web UI to submit my job.
>> > No matter which way I tried, the ParameterTool.fromPropertiesFile()
>> couldn't find the file path, but threw `FileNotFoundException` instead.
>> > Is there any best practice to deal with such problem? Thanks for your
>> help.
>> >
>> > Best Regards,
>> > Tony Wei
>>
>>
>


RE: ETL with changing reference data

2017-09-10 Thread qinnchen
Hi Peter,

I think what you referred is typical amendment process where partial or all 
results need to modified. I think it is definitely interesting topic! Here is 
my two cents 

In ideal world, reference data source can ingest updated used values as events 
and join with buffered events in windows . (it’s a bit counter intuitive, but 
think there is a magic function where we ingest all reference data as stream 
instead of doing on demand rpc)

Unfortunately, in lots of use cases, it seems hard to know exactly how 
reference data source used and dump reference data costs too much. So replay 
pipeline might be cheapest way to get things done in general.

In some cases,  results are partitioned and bounded. It makes possible to 
recomputed within bounded windows, that may requires a bit work to customize 
window which hold longer than watermark pass its endtime. I remember there was 
a Jira talk about retraction.  
In other cases, results are derived from long history which makes not rationale 
to keep. A side pipeline capture those events with late arriving event handling 
might interact with external storage and amend results.

Thanks,
Chen
 

From: Peter Lappo
Sent: Sunday, September 10, 2017 3:00 PM
To: user@flink.apache.org
Subject: ETL with changing reference data

hi,
We are building an ETL style application in Flink that consumes records from a 
file or a message bus as a DataStream. We are transforming records using SQL 
and UDFs. The UDF loads reference data in the open method and currently the 
data loaded remains in memory until the job is cancelled. The eval method of 
the UDF is used to do the actual transformation on a particular field.
So of course reference data changes and data will need to reprocessed. Lets 
assume we can identify and resubmit records for reprocessing what is the best 
design that
* keeps the Flink job running
* reloads the changed reference data
so that records are reprocessed in a deterministic fashion

Two options spring to mind
1) send a control record to the stream that reloads reference data or part of 
it and ensure resubmitted records are processed after the reload message
2) use a separate thread to poll the reference data source and reload any 
changed data which will of course suffer from race conditions

Or is there a better way of solving this type of problem with Flink?

Thanks
Peter



ETL with changing reference data

2017-09-10 Thread Peter Lappo
hi,
We are building an ETL style application in Flink that consumes records from a 
file or a message bus as a DataStream. We are transforming records using SQL 
and UDFs. The UDF loads reference data in the open method and currently the 
data loaded remains in memory until the job is cancelled. The eval method of 
the UDF is used to do the actual transformation on a particular field.
So of course reference data changes and data will need to reprocessed. Lets 
assume we can identify and resubmit records for reprocessing what is the best 
design that
* keeps the Flink job running
* reloads the changed reference data
so that records are reprocessed in a deterministic fashion

Two options spring to mind
1) send a control record to the stream that reloads reference data or part of 
it and ensure resubmitted records are processed after the reload message
2) use a separate thread to poll the reference data source and reload any 
changed data which will of course suffer from race conditions

Or is there a better way of solving this type of problem with Flink?

Thanks
Peter

Best way to deriving streams from another one

2017-09-10 Thread AndreaKinn
Hi,
I have a data stream resulting from an operation executed on a data stream
of data.
Essentially I want to obtain two different streams from that one to send
their to different cassandra tables.

I.e.:

datastream 0 composed by Tuple3

I want to have:

 a datastream 1 composed by every triple  of datastream 0
where Val2 > X
and
a data stream 2 composed by every couple .

This lied me to have two datastreams with Tuples of different arity (3 and
2).

Currently I have implemented it getting the 0 datastream and then calling
separately a map function to retrieve datastream 2 and a flatmap function to
retrieve datastream 1. So I have two different prepared statement of
Cassandra called on the two different streams. 
It works fine.

However this solutions looks really awful and inefficient, there is a more
elegant alternative?

I tried also to send towards Cassandra a datastream and
select in the statement just two values (in this way I should use just the
flatmap operator) but during the execution raise an exception on it.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


FLIP-17: Side Inputs

2017-09-10 Thread Elias Levy
A bit late to this discussion, but I wanted to reiterate something that
others also said. Side input readiness, and blocking until that is the
case, is an important feature.  This is specially true when the side input
is used as a configuration stream.  You don't want the main stream to be
processed until at least the minimal required configuration is loaded.

I'll also note that Kafka Streams has struggled with the same problem (
KAFKA-4113 ), but it has
the advantage that KS prioritizes consumption from sources based on
timestamp.  So with KS if your KTable config records have an earlier
timestamp than the stream records they are joined with, they will be
consumed first (although it does so on a best effort basis).


Is State access synchronized?

2017-09-10 Thread Federico D'Ambrosio
Hi,

as per the mail subject I wanted to ask you if a State access (read and
write) is synchronized.

I have the following stream:

val airtrafficEvents = stream
.keyBy(_.flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a
MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated
MapState

}

So, I'm using the MapState to track the integer IDs of the events of the
stream, retaining only the latest records inside the MapState, and I'm
using the ValueState to generate an incremental integer ID for said events.
Given all of this, I'm really not sure how the mapping is applied to the
keyedstream in input: is it guaranteed that each time the method is called
I'm getting the latest and updated value/map?

Thank you for your attention,
Federico


Queryable State

2017-09-10 Thread Navneeth Krishnan
Hi All,

I'm running a streaming job on flink 1.3.2 with few queryable states. There
are 3 task managers and a job manager. I'm getting timeout exception when
trying to query a state and also a warning message in the job manager log.

*Client:*
final Configuration config = new Configuration();

config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
config.setInteger(JobManagerOptions.PORT,
JobManagerOptions.PORT.defaultValue());

final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),

HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);


*Exception:*
Exception in thread "main" io.netty.channel.ConnectTimeoutException:
connection timed out: /172.31.18.170:43537
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
at
io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

*Job Manager:*
2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system [akka.tcp://
flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms.
Reason: [Disassociated]

Thanks,
Navneeth


Re: State Issue

2017-09-10 Thread Navneeth Krishnan
Sorry my bad, figured out it was a change done at our end which created
different keys. Thanks.

On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan 
wrote:

> Hi,
>
> I'm experiencing a wired issue where any data put into map state when
> retrieved with the same key is returning as null and hence it puts the same
> value again and again. I used rocksdb state backend but tried with Memory
> state backend too but the issue still exist.
>
> Each time when I set the key and value into MapState it creates a new map
> I couldn't access the previous value. But when I iterate over the MapState
> keys and values, I can see the same key added multiple times.
>
> Each put operation goes through the code lines marked in red.
>
> *NestedMapsStateTable.java*
>
> S get(K key, int keyGroupIndex, N namespace) {
>
>checkKeyNamespacePreconditions(key, namespace);
>
>Map> namespaceMap = getMapForKeyGroup(keyGroupIndex);
>
>
>
> * if (namespaceMap == null) {  return null;   }*
>
>Map keyedMap = namespaceMap.get(namespace);
>
>if (keyedMap == null) {
>   return null;
>}
>
>return keyedMap.get(key);
> }
>
>
> *HeapMapState.java*
>
> @Override
> public void put(UK userKey, UV userValue) {
>
>HashMap userMap = stateTable.get(currentNamespace);
>
>
>
> * if (userMap == null) {  userMap = new HashMap<>();  
> stateTable.put(currentNamespace, userMap);   }*
>
>userMap.put(userKey, userValue);
> }
>
>
> *My Code:*
>
> *open()*
>
> MapStateDescriptor testStateDescriptor = new 
> MapStateDescriptor<>("test-state",
> TypeInformation.of(new TypeHint() {}), TypeInformation.of(new 
> TypeHint() {}));
>
> testState = getRuntimeContext().getMapState(testStateDescriptor);
>
>
> *flatMap:*
>
> if(testState.contains(user)){
> *// DO Something*
> } else {
> testState.put(user, userInfo);
> }
>
>
> streamEnv.setStateBackend(new MemoryStateBackend());
>
> streamEnv.setParallelism(1);
>
>
> Thanks
>
>


HeapInternalTimerService#advanceWatermark

2017-09-10 Thread aitozi
Hi,

i have read this snip of code again and again, but i cant understand the
usage of it , can anyone explain it for me. thank you very much.

public void advanceWatermark(long time) throws Exception {
currentWatermark = time;

InternalTimer timer;

while ((timer = eventTimeTimersQueue.peek()) != null &&
timer.getTimestamp() <= time) {

Set> timerSet =
getEventTimeTimerSetForTimer(timer);
timerSet.remove(timer);
eventTimeTimersQueue.remove();

keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: LatencyMarker

2017-09-10 Thread aitozi
Hi,

You are right, i have proved that the backpressure will increase the
latency,and in have another question, now the value of the latency is not
Visualization in the dashboard,is there any plan to do this, i have add this
by remove the other operator and only keep the source -> end latency and
then i can see the latency in dashboard,  if the community accept the patch

Thanks.

and now the 
Tzu-Li (Gordon) Tai wrote
> Hi!
> 
> Yes, backpressure should also increase the latency value calculated from
> LatencyMarkers.
> LatencyMarkers are special events that flow along with the actual stream
> records, so they should also be affected by backpressure.
> 
> Are you asking because you observed otherwise?
> 
> Cheers,
> Gordon
> 
> 
> 
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/