Re: Do we have any mechanism to control requests per second for a Kafka connect sink?

2023-12-04 Thread Yeikel Santana
Apologies to everyone. I sent this to the wrong email list. Please discard







 On Mon, 04 Dec 2023 10:48:11 -0500 Yeikel Santana  wrote 
---



Hello everyone,



Is there any mechanism to force Kafka Connect to ingest at a given rate per 
second as opposed to tasks?



I am operating in a shared environment where the ingestion rate needs to be as 
low as possible (for example, 5 requests/second as an upper limit), and as far 
as I can tell, `tasks` are the main unit of work we can use.



My current understanding is that a task will be blocked to process one batch, 
and it will continue to the next batch as soon as the previous request is 
completed. This should mean that if the target server can process the requests 
at a higher rate, then the sink will continue sending at that rate.



However, in my scenario, what I need is to send n requests per second and then 
sit idle until that time passes to avoid overloading the target server. 



In this specific example, my best attempt to control the throughput was to 
configure it something like:



```json

"connector.class": 
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max": "1",

"max.retries": "10",

"retry.backoff.ms": "1000",

"max.buffered.records": "100",

"batch.size": "100",

"max.in.flight.requests": "1",

"flush.synchronously": "true",

```



Unfortunately, while that helps, it does not solve the inherent problem. I also 
understand that this is very specific to the given Sink Connector, but my 
question is more about a global overwrite that could be applied if any.



I also suppose that I could add a `Thread.sleep` call as an SMT, but that does 
not sound like a good solution. 



Thank you!

Do we have any mechanism to control requests per second for a Kafka connect sink?

2023-12-04 Thread Yeikel Santana
Hello everyone,



Is there any mechanism to force Kafka Connect to ingest at a given rate per 
second as opposed to tasks?



I am operating in a shared environment where the ingestion rate needs to be as 
low as possible (for example, 5 requests/second as an upper limit), and as far 
as I can tell, `tasks` are the main unit of work we can use.



My current understanding is that a task will be blocked to process one batch, 
and it will continue to the next batch as soon as the previous request is 
completed. This should mean that if the target server can process the requests 
at a higher rate, then the sink will continue sending at that rate.



However, in my scenario, what I need is to send n requests per second and then 
sit idle until that time passes to avoid overloading the target server. 



In this specific example, my best attempt to control the throughput was to 
configure it something like:



```json

"connector.class": 
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"tasks.max": "1",

"max.retries": "10",

"retry.backoff.ms": "1000",

"max.buffered.records": "100",

"batch.size": "100",

"max.in.flight.requests": "1",

"flush.synchronously": "true",

```



Unfortunately, while that helps, it does not solve the inherent problem. I also 
understand that this is very specific to the given Sink Connector, but my 
question is more about a global overwrite that could be applied if any.



I also suppose that I could add a `Thread.sleep` call as an SMT, but that does 
not sound like a good solution. 



Thank you!

RE: Fast Unit Tests

2018-05-01 Thread Yeikel Santana
Can you share a sample test case? How are you doing the unit tests? Are you 
creating the session in a beforeAll block or similar? 

 

As far as I know, if you use spark you will end up with light integration tests 
rather than “real” unit tests (please correct me if I am wrong). 

 

From: marcos rebelo  
Sent: Tuesday, May 1, 2018 11:25 AM
To: user 
Subject: Fast Unit Tests

 

Hey all,

 

We are using Scala and SQL heavily, but I have a problem with VERY SLOW Unit 
Tests. 

 

Is there a way to do fast Unit Tests on Spark? 

 

How are you guys going around it?

 

Best Regards

Marcos Rebelo



What is the purpose of CoarseGrainedScheduler and how can I disable it?

2018-03-31 Thread Yeikel Santana
Hi , 

 

This is probably not a spark issue, and more a configuration that I am
missing. Any help would be appreciated. 

 

I am running Spark from a docker template with the following configuration: 

 

version: '2'

 

services:

 

  master:

 

image: gettyimages/spark

 

command: bin/spark-class org.apache.spark.deploy.master.Master -h master

 

hostname: master

 

environment:

 

  MASTER: spark://master:7077

 

  SPARK_CONF_DIR: /conf

 

  SPARK_PUBLIC_DNS: localhost

 

expose:

 

  - 7001

 

  - 7002

 

  - 7003

 

  - 7004

 

  - 7005

 

  - 7006

 

  - 7077

 

  - 6066

 

ports:

 

  - 4040:4040

 

  - 6066:6066

 

  - 7077:7077

 

  - 8080:8080

 

  worker:

 

image: gettyimages/spark

 

command: bin/spark-class org.apache.spark.deploy.worker.Worker

spark://master:7077

 

hostname: worker

 

environment:

 

  SPARK_CONF_DIR: /conf

 

  SPARK_WORKER_CORES: 2

 

  SPARK_WORKER_MEMORY: 1g

 

  SPARK_WORKER_PORT: 8881

 

  SPARK_WORKER_WEBUI_PORT: 8081

 

  SPARK_PUBLIC_DNS: localhost

 

links:

 

  - master

 

expose:

 

  - 7012

 

  - 7013

 

  - 7014

 

  - 7015

 

  - 7016

 

  - 8881

 

ports:

 

  - 8081:8081

 

 

And I have the following simple Java program: 

 

SparkConf conf = new

SparkConf().setMaster("spark://localhost:7077").setAppName("Word Count
Sample App");

 

conf.set("spark.dynamicAllocation.enabled","false");

 

String file = "test.txt";

 

JavaSparkContext sc = new JavaSparkContext(conf);

 

JavaRDD textFile = sc.textFile("src/main/resources/" + file);

 

JavaPairRDD counts = textFile.flatMap(s ->
Arrays.asList(s.split("[ ,]")).iterator()).mapToPair(word -> new
Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);counts.foreach(p ->
System.out.println(p));

 

System.out.println("Total words: " + counts.count());

 

counts.saveAsTextFile(file + "out.txt");

 

 

The problem that I am having is that at runtime , Java is calling the
following command

 

Spark Executor Command: "/usr/jdk1.8.0_131/bin/java" "-cp"

"/conf:/usr/spark-2.3.0/jars/*:/usr/hadoop-2.8.3/etc/hadoop/:/usr/hadoop-2.8

.3/etc/hadoop/*:/usr/hadoop-2.8.3/share/hadoop/common/lib/*:/usr/hadoop-2.8.

3/share/hadoop/common/*:/usr/hadoop-2.8.3/share/hadoop/hdfs/*:/usr/hadoop-2.

8.3/share/hadoop/hdfs/lib/*:/usr/hadoop-2.8.3/share/hadoop/yarn/lib/*:/usr/h

adoop-2.8.3/share/hadoop/yarn/*:/usr/hadoop-2.8.3/share/hadoop/mapreduce/lib

/*:/usr/hadoop-2.8.3/share/hadoop/mapreduce/*:/usr/hadoop-2.8.3/share/hadoop

/tools/lib/*" "-Xmx1024M" "-Dspark.driver.port=59906"

"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"

"spark://CoarseGrainedScheduler@yeikel-pc:59906" "--executor-id" "6"

"--hostname" "172.19.0.3" "--cores" "2" "--app-id" "app-20180401005243-"

"--worker-url" "spark://Worker@172.19.0.3:8881"

 

Which results in 

 

 

Caused by: java.io.IOException: Failed to connect to yeikel-pc:59906

at

org.apache.spark.network.client.TransportClientFactory.createClient(Transpor

tClientFactory.java:245)

at

org.apache.spark.network.client.TransportClientFactory.createClient(Transpor

tClientFactory.java:187)

at

org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)

at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)

at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11

42)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6

17)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.net.UnknownHostException: yeikel-pc

 

 

 

Can I overwrite the "--driver-url" from java? OR how can I disable
CoarseGrainedScheduler?

 

I tried to set spark.dynamicAllocation.enabled to false but that did not
work.