Hi Kyle, Thank you for your response.
> There are a few working Beam+Flink+k8s configurations that have been > published, such as [1] [2] and [3]. If these meet your requirements, I would > recommend starting from one of them before reinventing your own. Otherwise, > you can look to them for clues, since they’ve likely had to solve many of the > same problems. Of course I designed my deployment after those resources and for the most part working (I’m not using the operator at the moment, because the Helm chart is broken and the whole project seems barely maintained). The problems lie with what is neither documented nor solved by any of those example deployments and also with bugs in either Beam or Flink or the interaction thereof. > I’m not sure what progress you have made at this point; did you get > everything to work, aside from the options issue? Did setting > --artifact_endpoint resolve the “client cancelled” issue? No, that problem is unsolved. It occurs randomly, but fortunately not very often (and usually within the first few minutes after submission). Unfortunately, there is another more common problem that keeps creeping up that looks like some totally random Flink failure with the error “Partition not found” followed by a long partition ID hash. I think it occurs after some other failure where Flink is unable to reschedule a task properly (not sure what, though). It may be to do with the fact that the Beam Python SDK runner (a sidecar container inside the task manager pod) is persistent for the lifetime of the task manager itself. At least I had issues with that earlier when I submitted a new version of my job before Flink terminated the old task manager (occurs about a minute after a job has finished). The result was that, among other things, the submitted Python wheel wasn’t reinstalled, because the SDK container still had the old version (definitely a Beam bug, pip should be called with —reinstall). I don’t know how to trigger the problem exactly, nor how to solve it. But it usually crashes my job after a few hours of processing time. > If I understand correctly, the “Discarding invalid overrides” warning is a > red herring; the option should still be passed on. So I think there may be an > issue elsewhere. If you could share as much of your Flink/Beam configuration > as possible, it may help to debug the problem. No, it’s not passed on. I tried. This is only an issue when I submit uber JARs from the client machine. It works with the FlinkRunner without uber JARs as well as PortableRunner+Job Server. > I recommend starting a separate thread regarding the incorrect Python > documentation, since I fear it might get buried in this thread. The more > specific incorrect examples you can point out, the better. I’d also be happy > to review PRs if you’re willing to update the documentation yourself (it is > all open source [4]). That’d be quite a few places. Also one very annoying thing is that most of the time, the imports are missing, so I have to grep the Python sources to find the correct imports or guess them from the Java API. I’ll see if I find the time. That would also depend on whether I can solve these issues or whether I have to scrap Beam and use Flink directly (or have revert back to Spark, yuck!). > Regarding stateful processing, please provide code snippets so we can > reproduce the issue(s). Again, it may be better to start a separate thread > since stateful processing should be mostly orthogonal to the Flink deployment > architecture. I stopped using it, because I couldn’t get it to run. Perhaps I am missing some sort of configuration for persisting the state, but I am neither getting an error nor can I find any documentation about this part. The error message thrown by the DirectRunner is also totally non-descriptive. Some “Not supported” error would have saved some time here. The FlinkRunner doesn’t throw errors, but shows the behaviour I described (timers triggered after each process() and no state persistence). Janek
