On the sideline I also validated the Json format

{"schema": { "type": "struct", "fields": [ { "field": "rowkey", "type":
"string", "optional": true}],"optional": false,"name": "BQ"}, "payload":
{"rowkey": "c0224abd-a6c4-4743-ac01-55e7b6980062"}}

[image: image.png]

Thanks


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 17 Mar 2021 at 09:56, Mich Talebzadeh <[email protected]>
wrote:

>
> This is what is termed as fun and game.
>
> Trying to write a single column (for the sake of test) in this case to
> BigQuery from Kafka. I am sending the schema and payload as per docs
>
> This is message sent that I can get it from console
>
> $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server rhes75:9092 
> --from-beginning --topic md --property print.key=true
>
> Note that it also prints kafka key
>
>
> 9485818a-e6c5-434d-9096-29c6e3f55148    {"schema": { "type": "struct", 
> "fields": [ { "field": "rowkey", "type": "string", "optional": 
> true}],"optional": false,"name": "BQ"}, "payload": {"rowkey": 
> "9485818a-e6c5-434d-9096-29c6e3f55148"}}
>
> The error thrown is
>
>
> [2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0} Task threw 
> an uncaught and unrecoverable exception. Task is being killed and will not 
> recover until manually restarted. Error: Top-level Kafka Connect schema must 
> be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
>
> This is the the standalone properties file
>
>
> bootstrap.servers=rhes75:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> #key.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=true
> value.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter.schemas.enable=true
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
> offset.storage.file.filename=/tmp/connect_bq.offsetsoffset.flush.interval.ms=10000
>
> and this is the sink properties file
>
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=project_name
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=xxx.json
> sanitizeTopics=false
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=60000
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> I am sure someone should be able to spot the error here.
>
>
> Many thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to