So my byte strings are basically kafka generated protobuf strings stored in sequential files as ".gz" files. Now I am reading them and trying to create a spark rdd to run analytics on them. When I try to print the strings I have, they are basically bytes and not the "usual" strings.
On Sat, Jul 1, 2017 at 3:24 AM, Jisi Liu <[email protected]> wrote: > ParseFromString only takes the binary string that SerializeToString() > generates. I don't know you get the input bytes, but you probably would > have to decode it first. > > On Fri, Jun 30, 2017 at 6:45 AM Tanmay Saha <[email protected]> wrote: > >> This is what I have done. >> >> mymessageobj = mymessageproto.MyMessage() >> myrdd = mysparkcontext.sequenceFile(filename1, >> 'org.apache.hadoop.io.Text', 'org.apache.hadoop.io.BytesWritable') >> firstvaluebytearray = myrdd.first()[1] >> >> myhexstring = ''.join(hex(eachvalue) for eachvalue in firstvaluebytearray >> ) >> print mymessageobj.ParseFromString(myhexstring) >> >> But I get the error '*Unexpected end-group tag.*' >> >> When I try to send a byte string instead of a hexstring, it throws an >> error stating '*Invalid wire tag.*' >> >> Any help would be appreciated. >> Thanks, >> Tanmay. >> >> On Wednesday, July 15, 2015 at 3:02:12 AM UTC+5:30, Krystian Sakowski >> wrote: >>> >>> Finally I have found buggy code. I had an error in this line in C++ >>> server: >>> >>> memcpy((void *)reply.data(), &response_string, response_string.length()); >>> >>> Instead of the buggy code above it should be: >>> >>> memcpy((void *)reply.data(), (void *)response_string.data(), >>> response_string.length()); >>> >>> I understood how to convert C++ string into ZMQ string because I've >>> found this function on the web: >>> >>> // Convert string to 0MQ string and send to socket static bool s_send >>> (zmq::socket_t & socket, const std::string & string) { >>> >>> zmq::message_t message(string.size()); >>> memcpy (message.data(), string.data(), string.size()); >>> >>> bool rc = socket.send (message); >>> return (rc);} >>> >>> Below is the link to *zhelpers.hpp* header file which contains the >>> function pasted above and many other useful function for C++ ZMQ based >>> application:https://github.com/imatix/zguide/blob/master/ >>> examples/C%2B%2B/zhelpers.hpp >>> >>> On Tuesday, July 14, 2015 at 7:46:00 PM UTC+2, [email protected] wrote: >>>> >>>> The length is these same on each side. >>>> However binary it is something else on each side. >>>> >>>> First of all I noted in python client that first byte of received >>>> message sporadically changes, >>>> Secondly >>>> >>>> This is serialized protobuf message to string in C++ server application >>>> 0x08 0x02 0x10 0x01 0x18 0x00 >>>> This is received packet in ZMQ client written in python 0xe4 0x1f 0x02 >>>> 0x00 0x90 0xf6 >>>> >>>> So it is totally different.. >>>> >>>> On Tuesday, July 14, 2015 at 1:21:19 AM UTC+2, Ilia Mirkin wrote: >>>>> >>>>> Is what you're sending the same thing as what you're receiving? Do the >>>>> lengths match up? Pretty easy to buggily truncate at the first null >>>>> byte... >>>>> >>>>> On Mon, Jul 13, 2015 at 5:22 PM, <[email protected]> wrote: >>>>> > I'm developing zmq/protobuf application and I have a problem with >>>>> > deserialization of messages sent from C++ to python. I easily handle >>>>> > messages from python to C++ however in the other direction I have a >>>>> problem. >>>>> > >>>>> > Protobuf library in python client application complains that it >>>>> detected >>>>> > 'Unexpected end-group tag.' >>>>> > >>>>> > I presume there is a problem between C++ serizalization and python >>>>> > deserialization. I'm wondering if there is some problem with null >>>>> terminator >>>>> > in C/C++ :(. >>>>> > >>>>> > This is my C++ serialization code.. >>>>> > >>>>> > // Test Code. >>>>> > // Try to send some 'demo' response back >>>>> > RPiProtocol::Message response; >>>>> > std::string response_string; >>>>> > response.set_type(RPiProtocol::Message::RESPONSE); >>>>> > response.set_command(RPiProtocol::Message::GET_SYS_INFO); >>>>> > response.set_version(0); >>>>> > >>>>> > // Serialize ZMQ message to string. >>>>> > if (response.SerializeToString(&response_string)) >>>>> > { >>>>> > // Debug prints. >>>>> > printf("%#010x\n", response_string.c_str()); >>>>> > cout << "Response string length= " << response_string.length() >>>>> << endl; >>>>> > >>>>> > // Send response message back to the client. >>>>> > zmq::message_t reply(response_string.length()); >>>>> > memcpy((void *)reply.data(), &response_string, >>>>> > response_string.length()); >>>>> > socket.send(reply); >>>>> > } >>>>> > >>>>> > This is my python deserialization code.. >>>>> > >>>>> > # Get the reply. >>>>> > message = socket.recv() >>>>> > print len(message) >>>>> > print ':'.join(x.encode('hex') for x in str(message)) >>>>> > response = rpi_protocol_pb2.Message() >>>>> > >>>>> > # This line fails >>>>> > response.ParseFromString(message) >>>>> > >>>>> > I debugged that deserialization fails in this function >>>>> > \google\protobuf\internal\python_message.py >>>>> > >>>>> > def InternalParse(self, buffer, pos, end): >>>>> > self._Modified() >>>>> > field_dict = self._fields >>>>> > unknown_field_list = self._unknown_fields >>>>> > while pos != end: >>>>> > (tag_bytes, new_pos) = local_ReadTag(buffer, pos) >>>>> > field_decoder, field_desc = decoders_by_tag.get(tag_bytes, >>>>> (None, >>>>> > None)) >>>>> > if field_decoder is None: >>>>> > value_start_pos = new_pos >>>>> > new_pos = local_SkipField(buffer, new_pos, end, tag_bytes) >>>>> > if new_pos == -1: # HERE I HAVE -1 !!! >>>>> > return pos >>>>> > if not unknown_field_list: >>>>> > unknown_field_list = self._unknown_fields = [] >>>>> > unknown_field_list.append((tag_bytes, >>>>> > buffer[value_start_pos:new_pos])) >>>>> > pos = new_pos >>>>> > else: >>>>> > pos = field_decoder(buffer, new_pos, end, self, field_dict) >>>>> > if field_desc: >>>>> > self._UpdateOneofState(field_desc) >>>>> > return pos >>>>> > cls._InternalParse = InternalParse >>>>> > >>>>> > C++ (ZMQ SERVER - REP): http://pastebin.com/ACaXk8Vz >>>>> > >>>>> > PYTHON (ZMQ CLIENT - REQ): http://pastebin.com/X9DR8ue9 >>>>> > >>>>> > Could you help me with enabling my application? >>>>> > >>>>> > -- >>>>> > You received this message because you are subscribed to the Google >>>>> Groups >>>>> > "Protocol Buffers" group. >>>>> > To unsubscribe from this group and stop receiving emails from it, >>>>> send an >>>>> > email to [email protected]. >>>>> > To post to this group, send email to [email protected]. >>>>> > Visit this group at http://groups.google.com/group/protobuf. >>>>> > For more options, visit https://groups.google.com/d/optout. >>>>> >>>> -- >> You received this message because you are subscribed to the Google Groups >> "Protocol Buffers" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at https://groups.google.com/group/protobuf. >> For more options, visit https://groups.google.com/d/optout. >> > -- *With Due RegardsTanmay Saha,* -- You received this message because you are subscribed to the Google Groups "Protocol Buffers" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/protobuf. For more options, visit https://groups.google.com/d/optout.
