Re: Flink/Scala contract positions ?

2022-06-03 Thread sri hari kali charan Tummala
Hi Jing,

thanks for reaching back are you offering a contract Job to fix Flink Scala
to fix Scala API for Flink? yes, I would be interested we can talk if you
want to schedule some time?

Thanks
Sri

On Fri, Jun 3, 2022 at 9:00 AM Jing Ge  wrote:

> Hi,
>
> Currently, the Flink Scala API is not in a good shape. Would you like to
> start from there?
>
> Best regards,
> Jing
>
> On Fri, Jun 3, 2022 at 4:29 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> Is anyone hiring for Flink or Scala Akka contract corp to corp positions
>> ? I am open in market looking for work in Scala Spark or Flink Scala or
>> Scala Akka world.
>>
>> Thanks
>> Sri
>>
>

-- 
Thanks & Regards
Sri Tummala


Re: Flink/Scala contract positions ?

2022-06-03 Thread Jing Ge
Hi,

Currently, the Flink Scala API is not in a good shape. Would you like to
start from there?

Best regards,
Jing

On Fri, Jun 3, 2022 at 4:29 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi Folks,
>
> Is anyone hiring for Flink or Scala Akka contract corp to corp positions ?
> I am open in market looking for work in Scala Spark or Flink Scala or Scala
> Akka world.
>
> Thanks
> Sri
>


Flink/Scala contract positions ?

2022-06-03 Thread sri hari kali charan Tummala
Hi Folks,

Is anyone hiring for Flink or Scala Akka contract corp to corp positions ?
I am open in market looking for work in Scala Spark or Flink Scala or Scala
Akka world.

Thanks
Sri


Unable to retrieve savepoint status from non-leader/standby in HA with Flink 1.15

2022-06-03 Thread Nick Birnberg
Hello everyone!

Our current setup has us running Flink on Kubernetes in HA mode (Zookeeper)
with multiple JobManagers. This appears to be a regression from 1.14.

We can use the flink CLI to communicate with the REST API to reproduce
this. We directly target a standby JobManager (by using `kubectl
port-forward $STANDBY_JM 8081`. And then run `flink savepoint -m
localhost:8081 $JOB_ID`. This command triggers the savepoint  via the REST
API and polls for it using the triggerId.

Relevant stack trace:

org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Internal
server error while retrieving status of savepoint operation with
triggerId=10e6bb05749f572cf4ee5eee9b4959c7 for job
488f4846310e2763dd1c338d7d7f55bb.
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.createInternalServerError(SavepointHandlers.java:352)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers.access$000(SavepointHandlers.java:115)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.lambda$null$0(SavepointHandlers.java:311)
...
Caused by: org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
Failed to serialize the result for RPC call : getTriggeredSavepointStatus.
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:405)
...
Caused by: java.io.NotSerializableException:
org.apache.flink.runtime.rest.handler.async.OperationResult
at
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:388)
... 30 more
]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:532)

The savepoint itself is successful and this is not a problem if we target
the leader JobManager. This seems similar to
https://issues.apache.org/jira/browse/FLINK-26779 and I would think that
the solution would be to
have org.apache.flink.runtime.rest.handler.async.OperationResult implement
Serializable, but I wanted a quick sanity check to make sure this is
reproducible outside of our environment before moving forward.

Thank you!


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-03 Thread Yuan Mei
Thanks, Xintong and Jark the great effort driving this, and everyone for
making this possible.

I've also Twittered this announcement on our Apache Flink Twitter account.

Best

Yuan



On Fri, Jun 3, 2022 at 12:54 AM Jing Ge  wrote:

> Thanks everyone for your effort!
>
> Best regards,
> Jing
>
> On Thu, Jun 2, 2022 at 4:17 PM Martijn Visser 
> wrote:
>
>> Thanks everyone for joining! It's good to see so many have joined in such
>> a short time already. I've just refreshed the link which you can always
>> find on the project website [1]
>>
>> Best regards, Martijn
>>
>> [1] https://flink.apache.org/community.html#slack
>>
>> Op do 2 jun. 2022 om 11:42 schreef Jingsong Li :
>>
>>> Thanks Xingtong, Jark, Martijn and Robert for making this possible!
>>>
>>> Best,
>>> Jingsong
>>>
>>>
>>> On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:
>>>
 Thank Xingtong for making this possible!

 Cheers,
 Jark Wu

 On Thu, 2 Jun 2022 at 15:31, Xintong Song 
 wrote:

 > Hi everyone,
 >
 > I'm very happy to announce that the Apache Flink community has
 created a
 > dedicated Slack workspace [1]. Welcome to join us on Slack.
 >
 > ## Join the Slack workspace
 >
 > You can join the Slack workspace by either of the following two ways:
 > 1. Click the invitation link posted on the project website [2].
 > 2. Ask anyone who already joined the Slack workspace to invite you.
 >
 > We recommend 2), if available. Due to Slack limitations, the
 invitation
 > link in 1) expires and needs manual updates after every 100 invites.
 If it
 > is expired, please reach out to the dev / user mailing lists.
 >
 > ## Community rules
 >
 > When using the community Slack workspace, please follow these
 community
 > rules:
 > * *Be respectful* - This is the most important rule!
 > * All important decisions and conclusions *must be reflected back to
 the
 > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
 happen."
 > - The Apache Mottos [3]
 > * Use *Slack threads* to keep parallel conversations from
 overwhelming a
 > channel.
 > * Please *do not direct message* people for troubleshooting, Jira
 assigning
 > and PR review. These should be picked-up voluntarily.
 >
 >
 > ## Maintenance
 >
 >
 > Committers can refer to this wiki page [4] for information needed for
 > maintaining the Slack workspace.
 >
 >
 > Thanks Jark, Martijn and Robert for helping setting up the Slack
 workspace.
 >
 >
 > Best,
 >
 > Xintong
 >
 >
 > [1] https://apache-flink.slack.com/
 >
 > [2] https://flink.apache.org/community.html#slack
 >
 > [3] http://theapacheway.com/on-list/
 >
 > [4]
 https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
 >

>>>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-03 Thread Yuan Mei
Thanks, Xintong and Jark the great effort driving this, and everyone for
making this possible.

I've also Twittered this announcement on our Apache Flink Twitter account.

Best

Yuan



On Fri, Jun 3, 2022 at 12:54 AM Jing Ge  wrote:

> Thanks everyone for your effort!
>
> Best regards,
> Jing
>
> On Thu, Jun 2, 2022 at 4:17 PM Martijn Visser 
> wrote:
>
>> Thanks everyone for joining! It's good to see so many have joined in such
>> a short time already. I've just refreshed the link which you can always
>> find on the project website [1]
>>
>> Best regards, Martijn
>>
>> [1] https://flink.apache.org/community.html#slack
>>
>> Op do 2 jun. 2022 om 11:42 schreef Jingsong Li :
>>
>>> Thanks Xingtong, Jark, Martijn and Robert for making this possible!
>>>
>>> Best,
>>> Jingsong
>>>
>>>
>>> On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:
>>>
 Thank Xingtong for making this possible!

 Cheers,
 Jark Wu

 On Thu, 2 Jun 2022 at 15:31, Xintong Song 
 wrote:

 > Hi everyone,
 >
 > I'm very happy to announce that the Apache Flink community has
 created a
 > dedicated Slack workspace [1]. Welcome to join us on Slack.
 >
 > ## Join the Slack workspace
 >
 > You can join the Slack workspace by either of the following two ways:
 > 1. Click the invitation link posted on the project website [2].
 > 2. Ask anyone who already joined the Slack workspace to invite you.
 >
 > We recommend 2), if available. Due to Slack limitations, the
 invitation
 > link in 1) expires and needs manual updates after every 100 invites.
 If it
 > is expired, please reach out to the dev / user mailing lists.
 >
 > ## Community rules
 >
 > When using the community Slack workspace, please follow these
 community
 > rules:
 > * *Be respectful* - This is the most important rule!
 > * All important decisions and conclusions *must be reflected back to
 the
 > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
 happen."
 > - The Apache Mottos [3]
 > * Use *Slack threads* to keep parallel conversations from
 overwhelming a
 > channel.
 > * Please *do not direct message* people for troubleshooting, Jira
 assigning
 > and PR review. These should be picked-up voluntarily.
 >
 >
 > ## Maintenance
 >
 >
 > Committers can refer to this wiki page [4] for information needed for
 > maintaining the Slack workspace.
 >
 >
 > Thanks Jark, Martijn and Robert for helping setting up the Slack
 workspace.
 >
 >
 > Best,
 >
 > Xintong
 >
 >
 > [1] https://apache-flink.slack.com/
 >
 > [2] https://flink.apache.org/community.html#slack
 >
 > [3] http://theapacheway.com/on-list/
 >
 > [4]
 https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
 >

>>>


Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-03 Thread Gorjan Todorovski
Hi Jan,

This is a batch job so no windows. It is basically a job launched by a TFX
component, so I don't have control over Beam code being executed.
I conclude that the job is stuck, since the number of bytes and processed
rows do not move for a long time on a specific task and subtask (always the
same one).

Thanks,
Gorjan


On Thu, Jun 2, 2022 at 4:45 PM Jan Lukavský  wrote:

> -user@flink  as this looks like purely beam
> issue
>
> Could you please elaborate more about what "stuck" means? Does the
> watermark stop progressing? Does that happen at any specific instant (e.g.
> end of window or end of window + allowed lateness)?
> On 6/1/22 15:43, Gorjan Todorovski wrote:
>
> Hi Jan,
>
> I have not checked the harness log. I have now checked it *Apache Beam
> worker log) and found this, but currently not sure what it means:
>
> 2022/06/01 13:34:40 Python exited: 
> 2022/06/01 13:34:41 Python exited: 
> Exception in thread read_grpc_client_inputs:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/threading.py", line 926, in
> _bootstrap_inner
> self.run()
>   File "/usr/local/lib/python3.7/threading.py", line 870, in run
> self._target(*self._args, **self._kwargs)
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
> line 587, in 
> target=lambda: self._read_inputs(elements_iterator),
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
> line 570, in _read_inputs
> for elements in elements_iterator:
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
> 416, in __next__
> return self._next()
>   File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line
> 803, in _next
> raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.CANCELLED
> details = "Multiplexer hanging up"
> debug_error_string =
> "{"created":"@1654090485.252525992","description":"Error received from peer
> ipv4:127.0.0.1:44439","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
> hanging up","grpc_status":1}"
> >
>
> 2022/06/01 13:34:45 Python exited: 
> 2022/06/01 13:34:46 Python exited: 
> 2022/06/01 13:34:46 Python exited: 
> 2022/06/01 13:34:47 Python exited: 
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-1',
> '--logging_endpoint=localhost:44267',
> '--artifact_endpoint=localhost:36413',
> '--provision_endpoint=localhost:42179',
> '--control_endpoint=localhost:38825']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-3',
> '--logging_endpoint=localhost:38683',
> '--artifact_endpoint=localhost:44867',
> '--provision_endpoint=localhost:34833',
> '--control_endpoint=localhost:44351']
> Starting worker with command ['/opt/apache/beam/boot', '--id=3-2',
> '--logging_endpoint=localhost:35391',
> '--artifact_endpoint=localhost:46571',
> '--provision_endpoint=localhost:44073',
> '--control_endpoint=localhost:44133']
> Starting work...
>
> On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský  wrote:
>
>> Hi Gorjan,
>>
>> +user@beam 
>>
>> The trace you posted is just waiting for a bundle to finish in the SDK
>> harness. I would suspect there is a problem in the logs of the harness. Did
>> you look for possible errors there?
>>
>>  Jan
>> On 5/31/22 13:54, Gorjan Todorovski wrote:
>>
>> Hi,
>>
>> I am running a TensorFlow Extended (TFX) pipeline which uses Apache Beam
>> for data processing which in turn has a Flink Runner (Basically a batch job
>> on a Flink Session Cluster on Kubernetes) version 1.13.6, but the job (for
>> gathering stats) gets stuck.
>>
>> There is nothing significant in the Job Manager or Task Manager logs. The
>> only thing that possibly might tell why the task is stuck seems to be a
>> thread dump:
>>
>> "MapPartition (MapPartition at [14]{TFXIORead[train],
>> GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@6f078632
>> at sun.misc.Unsafe.park(Native Method)
>> - waiting on java.util.concurrent.CompletableFuture$Signaller@
>> 6f078632
>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>> at java.util.concurrent.CompletableFuture$Signaller.block(
>> CompletableFuture.java:1707)
>> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:
>> 3323)
>> at java.util.concurrent.CompletableFuture.waitingGet(
>> CompletableFuture.java:1742)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
>> 1908)
>> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>> at org.apache.beam.runners.fnexecution.control.
>> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient
>> .java:504)
>> ...
>> I use 32 parallel degrees. Task managers are set, so each TM runs in one
>> container with 1 CPU and a total process memory set to 20 GB. Each TM
>> runs 

Re: flink-ml algorithms

2022-06-03 Thread Jing Ge
Hi,

It seems like an evaluation with a small dataset. In this case, would you
like to share your data sample and code? In addition, have you tried KMeans
with the same dataset and got inconsistent results too?

Best regards,
Jing

On Fri, Jun 3, 2022 at 4:29 AM Natia Chachkhiani <
natia.chachkhia...@gmail.com> wrote:

> Hi,
>
> I am running OnlineKmeans from flink-ml repo on a small dataset. I've
> noticed that I don't get consistent results, assignments to clusters,
> across different runs. I have set both parallelism and globalBatchSize to 1.
> I am doing simple fit and transform on each data point ingested. Is the
> order of processing not guaranteed? Or am I missing something?
>
> Thanks,
> Natia
>


Re: Issue Facing While Using EmbeddedRocksDbCheckpointing FlinkVersion(1.15.0)

2022-06-03 Thread Shuiqiang Chen
Hi,

I guess that the traceback log you provided might not be the root cause of
the failure, could you please provide the complete log of the Taskmanager?

Best,
Shuiqiang

harshit.varsh...@iktara.ai  于2022年6月2日周四
22:04写道:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
> The errors I am getting
>
> Traceback (most recent call last):
>
>   File
> "E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.py",
> line 470, in input_elements
>
> element = received.get(timeout=1)
>
>   File
> "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py", line
> 178, in get
>
> raise Empty
>
> _queue.Empty
>
> RuntimeError: Channel closed prematurely.
>
> My code is:
>
> import json
>
> import os
>
> import time
>
> from datetime import datetime
>
>
>
> from pyflink.common import SimpleStringSchema,
> JsonRowDeserializationSchema, Types, JsonRowSerializationSchema
>
> from pyflink.datastream import StreamExecutionEnvironment, WindowFunction,
> HashMapStateBackend, CheckpointingMode, \
>
> FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext,
> EmbeddedRocksDBStateBackend, RocksDBStateBackend
>
> from pyflink.datastream.connectors import FlinkKafkaConsumer,
> FlinkKafkaProducer
>
> from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
> ListStateDescriptor
>
> from sklearn.preprocessing import LabelEncoder
>
> import pickle
>
> import pandas as pd
>
> from pyflink.common import Row
>
>
>
> import argparse
>
> from typing import Iterable
>
>
>
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> RollingPolicy
>
>
>
> from pyflink.common import Types, WatermarkStrategy, Time, Encoder
>
> from pyflink.common.watermark_strategy import TimestampAssigner
>
> from pyflink.datastream import StreamExecutionEnvironment,
> ProcessWindowFunction
>
> from pyflink.datastream.window import TumblingEventTimeWindows,
> TimeWindow, TumblingProcessingTimeWindows
>
>
>
>
>
> class MyTimestampAssigner(TimestampAssigner):
>
> def extract_timestamp(self, value, record_timestamp) -> int:
>
> return int(value[0])
>
>
>
>
>
> class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
>
> def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
>
> # return [(key, result)]
>
> return [(key, len([e for e in inputs]))]
>
>
>
>
>
> class Storage(KeyedProcessFunction):
>
>
>
> def __init__(self):
>
> self.state = None
>
>
>
> def open(self, runtime_context: RuntimeContext):
>
> state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
>
> state_ttl_config = StateTtlConfig \
>
> .new_builder(Time.days(7)) \
>
> .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
>
> .disable_cleanup_in_background() \
>
> .build()
>
> state_descriptor.enable_time_to_live(state_ttl_config)
>
> self.state = runtime_context.get_state(state_descriptor)
>
>
>
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>
> # retrieve the current count
>
> current = self.state.value()
>
> if current is None:
>
> current = 0
>
> current = value[1]
>
> self.state.update(current)
>
>
>
> yield current,time.time()
>
>
>
>
>
> def write_to_kafka():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> env.set_parallelism(1)
>
> env.enable_checkpointing(1000)
>
> env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)
>
> env.set_state_backend(EmbeddedRocksDBStateBackend())
>
>
> env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
>
> #env.get_checkpoint_config().enable_unaligned_checkpoints()
>
> check = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>  'checkpoint-dir11')
>
>
> env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///{}".format(check)))
>
> kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>  'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
> env.add_jars("file:///{}".format(kafka_jar))
>
> rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>  'flink-statebackend-rocksdb_2.11-1.0.0.jar')
>
> env.add_jars("file:///{}".format(rocksdb_jar))
>
> # deserialization_schema = SimpleStringSchema()
>
> deserialization_schema = JsonRowDeserializationSchema.builder() \
>
> .type_info(type_info=Types.ROW_NAMED(["time_stamp",
>
>   "Bill_number", "Store_Code",
> "itemdescription", "Item_code",
>
>   "Gross_Price",