Re: Do we have any mechanism to control requests per second for a Kafka connect sink?
Apologies to everyone. I sent this to the wrong email list. Please discard On Mon, 04 Dec 2023 10:48:11 -0500 Yeikel Santana wrote --- Hello everyone, Is there any mechanism to force Kafka Connect to ingest at a given rate per second as opposed to tasks? I am operating in a shared environment where the ingestion rate needs to be as low as possible (for example, 5 requests/second as an upper limit), and as far as I can tell, `tasks` are the main unit of work we can use. My current understanding is that a task will be blocked to process one batch, and it will continue to the next batch as soon as the previous request is completed. This should mean that if the target server can process the requests at a higher rate, then the sink will continue sending at that rate. However, in my scenario, what I need is to send n requests per second and then sit idle until that time passes to avoid overloading the target server. In this specific example, my best attempt to control the throughput was to configure it something like: ```json "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "max.retries": "10", "retry.backoff.ms": "1000", "max.buffered.records": "100", "batch.size": "100", "max.in.flight.requests": "1", "flush.synchronously": "true", ``` Unfortunately, while that helps, it does not solve the inherent problem. I also understand that this is very specific to the given Sink Connector, but my question is more about a global overwrite that could be applied if any. I also suppose that I could add a `Thread.sleep` call as an SMT, but that does not sound like a good solution. Thank you!
Do we have any mechanism to control requests per second for a Kafka connect sink?
Hello everyone, Is there any mechanism to force Kafka Connect to ingest at a given rate per second as opposed to tasks? I am operating in a shared environment where the ingestion rate needs to be as low as possible (for example, 5 requests/second as an upper limit), and as far as I can tell, `tasks` are the main unit of work we can use. My current understanding is that a task will be blocked to process one batch, and it will continue to the next batch as soon as the previous request is completed. This should mean that if the target server can process the requests at a higher rate, then the sink will continue sending at that rate. However, in my scenario, what I need is to send n requests per second and then sit idle until that time passes to avoid overloading the target server. In this specific example, my best attempt to control the throughput was to configure it something like: ```json "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "max.retries": "10", "retry.backoff.ms": "1000", "max.buffered.records": "100", "batch.size": "100", "max.in.flight.requests": "1", "flush.synchronously": "true", ``` Unfortunately, while that helps, it does not solve the inherent problem. I also understand that this is very specific to the given Sink Connector, but my question is more about a global overwrite that could be applied if any. I also suppose that I could add a `Thread.sleep` call as an SMT, but that does not sound like a good solution. Thank you!
RE: Fast Unit Tests
Can you share a sample test case? How are you doing the unit tests? Are you creating the session in a beforeAll block or similar? As far as I know, if you use spark you will end up with light integration tests rather than “real” unit tests (please correct me if I am wrong). From: marcos rebelo Sent: Tuesday, May 1, 2018 11:25 AM To: user Subject: Fast Unit Tests Hey all, We are using Scala and SQL heavily, but I have a problem with VERY SLOW Unit Tests. Is there a way to do fast Unit Tests on Spark? How are you guys going around it? Best Regards Marcos Rebelo
What is the purpose of CoarseGrainedScheduler and how can I disable it?
Hi , This is probably not a spark issue, and more a configuration that I am missing. Any help would be appreciated. I am running Spark from a docker template with the following configuration: version: '2' services: master: image: gettyimages/spark command: bin/spark-class org.apache.spark.deploy.master.Master -h master hostname: master environment: MASTER: spark://master:7077 SPARK_CONF_DIR: /conf SPARK_PUBLIC_DNS: localhost expose: - 7001 - 7002 - 7003 - 7004 - 7005 - 7006 - 7077 - 6066 ports: - 4040:4040 - 6066:6066 - 7077:7077 - 8080:8080 worker: image: gettyimages/spark command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 hostname: worker environment: SPARK_CONF_DIR: /conf SPARK_WORKER_CORES: 2 SPARK_WORKER_MEMORY: 1g SPARK_WORKER_PORT: 8881 SPARK_WORKER_WEBUI_PORT: 8081 SPARK_PUBLIC_DNS: localhost links: - master expose: - 7012 - 7013 - 7014 - 7015 - 7016 - 8881 ports: - 8081:8081 And I have the following simple Java program: SparkConf conf = new SparkConf().setMaster("spark://localhost:7077").setAppName("Word Count Sample App"); conf.set("spark.dynamicAllocation.enabled","false"); String file = "test.txt"; JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD textFile = sc.textFile("src/main/resources/" + file); JavaPairRDD counts = textFile.flatMap(s -> Arrays.asList(s.split("[ ,]")).iterator()).mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);counts.foreach(p -> System.out.println(p)); System.out.println("Total words: " + counts.count()); counts.saveAsTextFile(file + "out.txt"); The problem that I am having is that at runtime , Java is calling the following command Spark Executor Command: "/usr/jdk1.8.0_131/bin/java" "-cp" "/conf:/usr/spark-2.3.0/jars/*:/usr/hadoop-2.8.3/etc/hadoop/:/usr/hadoop-2.8 .3/etc/hadoop/*:/usr/hadoop-2.8.3/share/hadoop/common/lib/*:/usr/hadoop-2.8. 3/share/hadoop/common/*:/usr/hadoop-2.8.3/share/hadoop/hdfs/*:/usr/hadoop-2. 8.3/share/hadoop/hdfs/lib/*:/usr/hadoop-2.8.3/share/hadoop/yarn/lib/*:/usr/h adoop-2.8.3/share/hadoop/yarn/*:/usr/hadoop-2.8.3/share/hadoop/mapreduce/lib /*:/usr/hadoop-2.8.3/share/hadoop/mapreduce/*:/usr/hadoop-2.8.3/share/hadoop /tools/lib/*" "-Xmx1024M" "-Dspark.driver.port=59906" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@yeikel-pc:59906" "--executor-id" "6" "--hostname" "172.19.0.3" "--cores" "2" "--app-id" "app-20180401005243-" "--worker-url" "spark://Worker@172.19.0.3:8881" Which results in Caused by: java.io.IOException: Failed to connect to yeikel-pc:59906 at org.apache.spark.network.client.TransportClientFactory.createClient(Transpor tClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(Transpor tClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 42) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 17) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: yeikel-pc Can I overwrite the "--driver-url" from java? OR how can I disable CoarseGrainedScheduler? I tried to set spark.dynamicAllocation.enabled to false but that did not work.