Hello,

I have been trying to send JSON string data to AWS Firehose using PyFlink, but 
because of Serialization issues, it is not able to parse JSON input properly.

This is the logic am using -

redis_output_str = """
    {
        "entities": [
            {
                "metadata": {
                    "entityType": ["KMAP"]
                },
                "entityName": "artificial intelligence",
                "timestamp": 1747647770938
            }
        ],
        "is_entity_present": true,
        "user_id": 935884520,
        "timestamp": 1747647770938,
        "status": "success"
    }"""
    redis_output = json.loads(redis_output_str)


    entities_list = []
    for entity in redis_output["entities"]:
        metadata_row = Row(entity["metadata"]["entityType"])
        entity_row = Row(metadata_row, entity["entityName"], 
entity["timestamp"])
        entities_list.append(entity_row)


    firehose_input = Row(
        entities_list,
        redis_output["is_entity_present"],
        redis_output["user_id"],
        redis_output["timestamp"],
        redis_output["status"],
    )

## below not working so changed with above

    #     metadata_row = Row(
    #         entityType=entity["metadata"]["entityType"]
    #     )  # entityType: [str]
    #     print("metadata_row")
    #     print(metadata_row)
    #     print(type(metadata_row.entityType))
    #     print(type(metadata_row.entityType[0]))
    #     entity_row = Row(metadata_row, entity["entityName"], 
entity["timestamp"])
    #     print("entity_row")
    #     print(entity_row)
    #     entities_list.append(entity_row)
    #     print("entities_list")
    #     print(entities_list)

    # firehose_input = Row(
    #     entities=entities_list,
    #     is_entity_present=redis_output["is_entity_present"],
    #     user_id=redis_output["user_id"],
    #     timestamp=redis_output["timestamp"],
    #     status=redis_output["status"],
    # )
    print("firehose_input")
    print(firehose_input)
    # print(firehose_input.entities[0].metadata.entityType)
    # print(type(firehose_input.entities[0].metadata.entityType))
    # print(type(firehose_input.entities[0].metadata.entityType[0]))

    # This structure MUST match the JSON you want to produce.
    metadata_type_info_for_firehose = Types.ROW_NAMED(
        ["entityType"], [Types.LIST(Types.STRING())]
    )

    entity_type_info_for_firehose = Types.ROW_NAMED(
        ["metadata", "entityName", "timestamp"],
        [metadata_type_info_for_firehose, Types.STRING(), Types.LONG()],
    )

    firehose_output_row_type = Types.ROW_NAMED(
        ["entities", "is_entity_present", "user_id", "timestamp", "status"],
        [
            Types.LIST(entity_type_info_for_firehose),
            Types.BOOLEAN(),
            Types.LONG(),
            Types.LONG(),
            Types.STRING(),
        ],
    )

    # Wrap with RowTypeInfo to expose .get_java_type_info()
    firehose_output_row_type_info = RowTypeInfo(
        firehose_output_row_type.get_field_types(),
        firehose_output_row_type.get_field_names(),
    )

    print("firehose_output_row_type_info")
    print(firehose_output_row_type_info)

    # **NEW**: Create the JsonRowSerializationSchema for the Firehose sink
    firehose_serialization_schema = (
        JsonRowSerializationSchema.Builder()
        .with_type_info(firehose_output_row_type_info)
        .build()
    )

    # serialization_schema = firehose_serialization_schema._j_builder.build()

    # Define Firehose sink properties
    sink_properties = {"aws.region": "us-east-1"}

    # Define Firehose sink
    kdf_sink = (
        KinesisFirehoseSink.builder()
        .set_firehose_client_properties(sink_properties)
        .set_serialization_schema(firehose_serialization_schema)
        # .set_serialization_schema(json_serialization_schema)
        # .set_serialization_schema(JsonSerializationSchema())
        # .set_serialization_schema(SimpleStringSchema())
        .set_delivery_stream_name(firehose_stream_name)
        .set_fail_on_error(False)
        .set_max_batch_size(200)
        .set_max_in_flight_requests(100)
        .set_max_buffered_requests(20000)
        .set_max_batch_size_in_bytes(2 * 1024 * 1024)
        .set_max_time_in_buffer_ms(200)
        .set_max_record_size_in_bytes(1024 * 1024)
        .build()
    )

    try:
        env.from_collection(
            collection=[firehose_input], type_info=firehose_output_row_type_info
        ).sink_to(kdf_sink)
        env.execute(job_name)


But have been getting too many issues because of JsonRowSerializationSchema.
Is there a possible solution/suggestion from your end to fix the issues and 
move ahead with it.



Looking forward to getting a response at your earliest convenience.

Regards,
Sanchay

________________________________

If you are not the intended recipient or have received this message in error, 
please notify the sender and permanently delete this message and any 
attachments.

Reply via email to