Hi, apparently you are creating 2 subscriptions for the same external data stream. This is currently not supported out of the box. I think the second subscription is overriding the first one. You should be able to verify that using the CLI tools and displaying the state of the cluster and streams.
Ways around that could be to: - use different streams for different kinds of events - use 1 PE prototype to subscribe to the "LinearRoad" stream and handle processing and possible re-dispatch there - modify the S4 platform code to handle this case Hope this helps. By the way are you implementing the linear road benchmark http://www.cs.brandeis.edu/~linearroad/ ? Regards, Matthieu On Mar 17, 2014, at 20:53 , Martin Schneider <another.martin.schnei...@gmail.com> wrote: > Hi, > > I would like to forward an event to multiple PEs. Therefore, I tried to use > the following code in the InputAdapter: > > PositionReportEvent positionReport = new > PositionReportEvent(line); > > > > String vehicleId = > Integer.valueOf(positionReport.getVehicleId()).toString(); > > positionReport.put("vehicleId", String.class, > vehicleId); > > positionReport.put("uniqueSegment", String.class, > positionReport.getUniqueSegment()); > > getRemoteStream().put(positionReport); > > > AddCarToSegmentEvent addCar = new > AddCarToSegmentEvent(positionReport); > > addCar.put("uniqueSegment", String.class, > addCar.getUniqueSegment()); > > addCar.put("vehicleId", String.class, vehicleId); > > getRemoteStream().put(addCar); > > > RemoveCarFromSegmentEvent removeCar = new > RemoveCarFromSegmentEvent(positionReport); > > removeCar.put("uniqueSegment", String.class, > removeCar.getPreviousUniqueSegment()); > > removeCar.put("vehicleId", String.class, vehicleId); > > getRemoteStream().put(removeCar); > > > } > > > > and the following code in the App > > @Override > > protected void onInit() { > > CarsInSegmentCounterPE carSegmentCounterPE = > createPE(CarsInSegmentCounterPE.class); > > VehicleSpeedPE vehSpeedPE = createPE(VehicleSpeedPE.class); > > // Create a stream that listens to the "names" stream and passes > events to the helloPE instance. > > > > createInputStream("LinearRoad", new KeyFinder<PositionReportEvent>() { > > > > @Override > > public List<String> get(PositionReportEvent event) { > > return Arrays.asList(new String[] { event.get("vehicleId") }); > > } > > }, vehSpeedPE); > > > createInputStream("LinearRoad", new KeyFinder<PositionReportEvent>() { > > > @Override > > public List<String> get(PositionReportEvent event) { > > return Arrays.asList(new String[] { event.get("uniqueSegment") }); > > } > > }, carSegmentCounterPE); > > > } > > > > However, just the second PE (here: carSegmentCounterPE) receives and > processes event. What I am doing wrong? > > Thanks in advance. > > > > Best, Martin >