Once again: one step further.

The "Client cancelled" error seems to stem from a Python interpreter crash due to an error in a native library. The crash itself is not logged at all, so I only get this totally misleading gRPC error instead. Could this get a better error message perhaps?

Another big issue that is still unsolved is the problem that with --setup-file, the dependencies get installed globally in the SDK container and stay there for all following jobs. That includes the wheel built from my local Python module and when I resubmit the job, it doesn't get reinstalled, because pip figures it's already there. Only the main Python file gets resubmitted. This is a big issue and at the moment I can only solve it by killing the SDK container after each job execution. It would be much better if dependencies got installed only into a temporary venv that is discarded upon job completion or failure.

Janek


On 09/12/2021 19:45, Janek Bevendorff wrote:
Hi Kyle,

Thank you for your response.


There are a few working Beam+Flink+k8s configurations that have been published, such as [1] [2] and [3]. If these meet your requirements, I would recommend starting from one of them before reinventing your own. Otherwise, you can look to them for clues, since they’ve likely had to solve many of the same problems.

Of course I designed my deployment after those resources and for the most part working (I’m not using the operator at the moment, because the Helm chart is broken and the whole project seems barely maintained). The problems lie with what is neither documented nor solved by any of those example deployments and also with bugs in either Beam or Flink or the interaction thereof.

I’m not sure what progress you have made at this point; did you get everything to work, aside from the options issue? Did setting --artifact_endpoint resolve the “client cancelled” issue?

No, that problem is unsolved. It occurs randomly, but fortunately not very often (and usually within the first few minutes after submission). Unfortunately, there is another more common problem that keeps creeping up that looks like some totally random Flink failure with the error “Partition not found” followed by a long partition ID hash. I think it occurs after some other failure where Flink is unable to reschedule a task properly (not sure what, though). It may be to do with the fact that the Beam Python SDK runner (a sidecar container inside the task manager pod) is persistent for the lifetime of the task manager itself. At least I had issues with that earlier when I submitted a new version of my job before Flink terminated the old task manager (occurs about a minute after a job has finished). The result was that, among other things, the submitted Python wheel wasn’t reinstalled, because the SDK container still had the old version (definitely a Beam bug, pip should be called with —reinstall).

I don’t know how to trigger the problem exactly, nor how to solve it. But it usually crashes my job after a few hours of processing time.

If I understand correctly, the  “Discarding invalid overrides” warning is a red herring; the option should still be passed on. So I think there may be an issue elsewhere. If you could share as much of your Flink/Beam configuration as possible, it may help to debug the problem.

No, it’s not passed on. I tried. This is only an issue when I submit uber JARs from the client machine. It works with the FlinkRunner without uber JARs as well as PortableRunner+Job Server.

I recommend starting a separate thread regarding the incorrect Python documentation, since I fear it might get buried in this thread. The more specific incorrect examples you can point out, the better. I’d also be happy to review PRs if you’re willing to update the documentation yourself (it is all open source [4]).

That’d be quite a few places. Also one very annoying thing is that most of the time, the imports are missing, so I have to grep the Python sources to find the correct imports or guess them from the Java API. I’ll see if I find the time. That would also depend on whether I can solve these issues or whether I have to scrap Beam and use Flink directly (or have revert back to Spark, yuck!).

Regarding stateful processing, please provide code snippets so we can reproduce the issue(s). Again, it may be better to start a separate thread since stateful processing should be mostly orthogonal to the Flink deployment architecture.

I stopped using it, because I couldn’t get it to run. Perhaps I am missing some sort of configuration for persisting the state, but I am neither getting an error nor can I find any documentation about this part. The error message thrown by the DirectRunner is also totally non-descriptive. Some “Not supported” error would have saved some time here. The FlinkRunner doesn’t throw errors, but shows the behaviour I described (timers triggered after each process() and no state persistence).

Janek

Reply via email to