Hey,

Our team is trying to use beam with connector Kafka and runner flink to
gather information and process data. We adopt python sdk and build in java
11 in python 3.7 sdk image as java runtime for kafka expansion service.

 so :
image: beam python 3.7 docker image + self build in java 11
connector: kafka
runner: flink
container: kubernetes


We encounter an docker not found error when running(logs below):
 python3 -m kafka_test --runner=FlinkRunner
--flink_master=flink-job-manager:8081 --flink_submit_uber_jar
--environment_type=EXTERNAL --environment_config=localhost:50000

We notice that in https://beam.apache.org/roadmap/portability/ it mentioned
the prerequisite also includes Docker. We wonder what is the docker usage
here? Is there any suggested way to build docker in k8s container?
(something maybe like sysbox for docker in docker? )

Or is there any recommended way/best practice to use beam python
sdk(kafka)+ flink runner in k8s/container environment? I attached our logs
as below:


Here are our logs:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmp/kafka_test.py", line 25, in <module>
    run_pipeline()
  File "/tmp/kafka_test.py", line 21, in run_pipeline
    | 'Write to text' >> WriteToText("/tmp/output2")
  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 583, in __exit__
    self.result.wait_until_finish()
  File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 581, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-93bbf6ce-f666-4d46-803d-320983fca3f1 failed in
state FAILED:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=2, No such file or
directory
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
        at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
        at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
        at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
        at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
        at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "docker": error=2, No
such file or directory
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
        at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
        at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
        at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
        at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
        at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
        at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
        at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
        ... 14 more
Caused by: java.io.IOException: error=2, No such file or directory
        at java.lang.UNIXProcess.forkAndExec(Native Method)
        at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
        at java.lang.ProcessImpl.start(ProcessImpl.java:134)
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
        ... 28 more

 Really appreciated for your time and help!

Thanks,
Yilun

Reply via email to