Hi Niclas,

Glad that you got it working!
Thanks for sharing the problem and solution.

Best, Fabian

2018-02-19 9:29 GMT+01:00 Niclas Hedhman <nic...@apache.org>:

>
> (Sorry for the incoherent order and ramblings. I am writing this as I am
> trying to sort out what is going on...)
>
> 1. It is the first message to be processed in the Kafka topic. If I set
> the offset manually, it will pick up the message at that point, process it,
> and ignore all following messages.
>
> 2. Yes, the Kafka console consumer tool is spitting out the messages
> without problem. Btw, they are plain Strings, well, serialized JSON objects.
>
> 3. Code is a bit messy, but I have copied out the relevant parts below.
>
> I also noticed that a LOT of exceptions are thrown ("breakpoint on any
> exception"), mostly ClassCastException, classes not found and
> NoSuchMethodException, but nothing that bubbles up out of the internals. Is
> this part of Scala raping the JVM, or just the normal JVM class loading
> sequence (no wonder it is so slow)? Is that expected?
>
> I have tried to use both the ObjectMapper from Jackson proper, as well as
> the shadowed ObjectMapper in flink. No difference.
>
> Recap; Positioning Kafka consumer to message 8th from the last. Only that
> message is consumed, the remaining 7 are ignored/swallowed.
>
>
> Ok, so I think I have traced this down to something happening in the
> CassandraSink. There is a Exception being thrown somewhere, which I see as
> the Kafka09Fetcher.runFetchLoop()'s finally clause is called.
>
> Found it (hours later in debugging), on this line (Flink 1.4.1)
>
> org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258
>
> which contains
>
>     future.addListener(callbackListener, executor);  // IDEA says 'future' is 
> of type DefaultResultSetFuture
>
> throws an Exception without stepping into the addListener() method. There
> is nothing catching the Exception (and I don't want to go down the rabbit
> hole of building from source), so I can't really say what Exception is
> being thrown. IDEA doesn't seem to report it, and the catch clauses in
> OperatorChain.pushToOperator() (ClassCastException and Exception) are in
> the call stack, but doesn't catch it, which could suggest an
> java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO
> MANY classloading exception going on all the time.
>
> Hold on a second... There are TWO 
> com.datastax.driver.core.DefaultResultSetFuture
> types in the classpath. One from the Cassandra client that I declared, and
> on from inside the flink-connector-cassandra_2.11 artifact...
>
> So will it work if I remove my own dependency declaration and that's it?
>
>
> YEEEEESSSSS!!! FInally.....
>
>
> SOLVED!
>
> -o-o-o-o-o-
>
> public static void main( String[] args )
>     throws Exception
> {
>     cli = CommandLine.populateCommand( new ServerCliOptions(), args );
>     initializeCassandra( cli );
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism( 32768 
> );
> //    createPollDocPipeline( env );
>     createAdminPipeline( env );
>     env.execute( "schedule.poll" );
> }
>
>
>
> private static void createAdminPipeline( StreamExecutionEnvironment env )
> {
>     try
>     {
>         FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource();
>         SplitStream<AdminCommand> adminStream =
>             env.addSource( adminSource )
>                .name( "scheduler.admin" )
>                .map( value -> {
>                    try
>                    {
>                        return mapper.readValue( value, AdminCommand.class );
>                    }
>                    catch( Throwable e )
>                    {
>                        LOG.error( "Unexpected error deserializing 
> AdminCommand", e );
>                        return null;
>                    }
>                } )
>                .name( "admin.command.read" )
>                .split( value -> singletonList( value.action() ) );
>
>         SingleOutputStreamOperator<Tuple3<List<String>, String, String>> 
> insertStream =
>             adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
>                        .map( new GetPollDeclaration() )
>                        .name( "scheduler.admin.insert" )
>                        .map( new PollDeclarationToTuple3Map() )
>                        .name( "scheduler.pollDeclToTuple3" )
>                        .filter( tuple -> tuple != null );
>
>         SingleOutputStreamOperator<Tuple3<List<String>, String, String>> 
> deleteStream =
>             adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
>                        .map( new GetPollDeclaration() )
>                        .name( "scheduler.admin.delete" )
>                        .map( new PollDeclarationToTuple3Map() )
>                        .name( "scheduler.pollDeclToTuple3" )
>                        .filter( tuple -> tuple != null );
>
>         CassandraSink.addSink( insertStream )
>                      .setHost( cli.primaryCassandraHost(), 
> cli.primaryCassandraPort() )
>                      .setQuery( String.format( INSERT_SCHEDULE, 
> cli.cassandraKeyspace ) )
>                      .build();
>
>         CassandraSink.addSink( deleteStream )
>                      .setHost( cli.primaryCassandraHost(), 
> cli.primaryCassandraPort() )
>                      .setQuery( String.format( DELETE_SCHEDULE, 
> cli.cassandraKeyspace ) )
>                      .build();
>     }
>     catch( Throwable e )
>     {
>         String message = "Unable to start Scheduling Admin";
>         LOG.error( message );
>         throw new RuntimeException( message, e );
>     }
> }
>
>
> private static class GetPollDeclaration
>     implements MapFunction<AdminCommand, PollDeclaration>
> {
>     private static final Logger LOG = LoggerFactory.getLogger( 
> GetPollDeclaration.class );
>
>     @Override
>     public PollDeclaration map( AdminCommand command )
>         throws Exception
>     {
>         try
>         {
>             if( command == null )
>             {
>                 return null;
>             }
>             return (PollDeclaration) command.value();
>         }
>         catch( Throwable e )
>         {
>             LOG.error( "Unable to cast command data to PollDeclaration", e );
>             return null;
>         }
>     }
> }
>
>
> private static class PollDeclarationToTuple3Map
>     implements MapFunction<PollDeclaration, Tuple3<List<String>, String, 
> String>>
> {
>     @Override
>     public Tuple3<List<String>, String, String> map( PollDeclaration decl )
>         throws Exception
>     {
>         try
>         {
>             if( decl == null )
>             {
>                 return null;
>             }
>             return new Tuple3<>( singletonList( mapper.writeValueAsString( 
> decl ) ), decl.zoneId + ":" + decl.schedule, decl.url );
>         }
>         catch( Throwable e )
>         {
>             LOG.error( "Unable to cast command data to PollDeclaration", e );
>             return null;
>         }
>     }
> }
>
> Flink Dependencies;
>
> flink         : [
>         [group: "org.apache.flink", name: "flink-core", version: 
> flinkVersion],
>         [group: "org.apache.flink", name: "flink-java", version: 
> flinkVersion],
>         [group: "org.apache.flink", name: "flink-connector-cassandra_2.11", 
> version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-connector-kafka-0.11_2.11", 
> version: flinkVersion],
>         [group: "org.apache.flink", name: 
> "flink-queryable-state-runtime_2.11", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-streaming-java_2.11", 
> version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-streaming-scala_2.11", 
> version: flinkVersion]
> ],
>
>
>
>
>
> On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Niclas,
>>
>> About the second point you mentioned, was the processed message a random
>> one or a fixed one?
>>
>> The default startup mode for FlinkKafkaConsumer is
>> StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while
>> debugging. Also, before that, you may try fetching the messages with the
>> Kafka console consumer tool to see whether they can be consumed completely.
>>
>> Besides, I wonder if you could provide the code for you Flink pipeline.
>> That’ll be helpful.
>>
>> Best,
>> Xingcan
>>
>>
>>
>> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <nic...@apache.org> wrote:
>>
>>
>> So, the producer is run (at the moment) manually (command-line) one
>> message at a time.
>> Kafka's tooling (different consumer group) shows that a message is added
>> each time.
>>
>> Since my last post, I have also added a UUID as the key, and that didn't
>> make a difference, so you are likely correct about de-dup.
>>
>>
>> There is only a single partition on the topic, so it shouldn't be a
>> partitioning issue.
>>
>> I also noticed;
>> 1. Sending a message while consumer topology is running, after the first
>> message, then that message will be processed after a restart.
>>
>> 2. Sending many messages, while consumer is running, and then doing many
>> restarts will only process a single of those. No idea what happens to the
>> others.
>>
>> I am utterly confused.
>>
>> And digging in the internals are not for the faint-hearted, but the
>> kafka.poll() returns frequently with empty records.
>>
>> Will continue debugging that tomorrow...
>>
>>
>> Niclas
>>
>> On Feb 18, 2018 18:50, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>
>>> Hi Niclas,
>>>
>>> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
>>> "feature" is not implemented.
>>> Do you produce into the topic that you want to read or is the data in
>>> the topic static?
>>> If you do not produce in the topic while the consuming application is
>>> running, this might be an issue with the start position of the consumer
>>> [1].
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <nic...@apache.org>:
>>>
>>>> Hi,
>>>> I am pretty new to Flink, and I like what I see and have started to
>>>> build my first application using it.
>>>> I must be missing something very fundamental. I have a
>>>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>>>> functions and terminated with the standard CassandraSink. I have try..catch
>>>> on all my own maps/filters and the first message in the queue is processed
>>>> after start-up, but any additional messages are ignore, i.e. not reaching
>>>> the first map(). Any additional messages are swallowed (i.e. consumed but
>>>> not forwarded).
>>>>
>>>> I suspect that this is some type of de-duplication going on, since the
>>>> (test) producer of these messages. The producer provide different values on
>>>> each, but there is no "key" being passed to the KafkaProducer.
>>>>
>>>> Is that required? And if so, why? Can I tell Flink or Flink's
>>>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Niclas Hedhman, Software Developer
>>>> http://zest.apache.org - New Energy for Java
>>>>
>>>
>>>
>>
>
>
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org - New Energy for Java
>

Reply via email to