The proper way to initialize gson field is to use setup():

public class HbaseTableUpdate<T> extends AbstractHBasePutOutputOperator<Tenant> { // Serializable not needed
...
  private transient Gson gson;
...
  @Overwrite
  public void setup(...) {
    gson = new Gson();

Vlad


On 10/20/16 22:19, Tushar Gosavi wrote:
Hi Jaspal,

you can change following line

Gson gson = new Gson();

to

transient Gson gson = new Gson();

This should work.

Operators are serialised using kryo during initial deployment as well
as during checkpoints. kryo is throwing an exception while
trying to serialize gson field in your operator, as Gson does not
seems to be serializable with kryo. Gson instance does not
maintain any useful state, you can safely make it transient and
initialise it in constructor of operator.

- Tushar.



On Fri, Oct 21, 2016 at 9:52 AM, Jaspal Singh
<jaspal.singh1...@gmail.com> wrote:
Thomas,

Can you please highlight how the Gson has to be a transient member ? Below
is our operator code.

public class HbaseTableUpdate<T> extends
AbstractHBasePutOutputOperator<Tenant> implements Serializable {
  private static final transient Logger logger =
LoggerFactory.getLogger(HbaseTableUpdate.class);
  public static final int DEFAULT_BATCH_SIZE = 1000;
  private int batchSize = DEFAULT_BATCH_SIZE;
  protected int unCommittedSize = 0;
  public static final byte[] AUDIT_CF_LOG = "cf".getBytes();
  public static final byte[] AUDIT_MESSAGE = "msg".getBytes();
  public static final byte[] AUDIT_STATUS = "sts".getBytes();
  public static final byte[] AUDIT_RETRY_CNT = "rtc".getBytes();
  //Configuration conf = HBaseConfiguration.create();
  //AuditLogDAO auditLogDAO = new AuditLogDAO(conf);
  Gson gson = new Gson();
  //ObjectMapper mapper = new ObjectMapper();

  public HbaseTableUpdate() {
      store = new HBaseStore();

store.setTableName("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/tenant");
  }



  @Override
  public void processTuple(Tenant tenant) {
      HTable table = store.getTable();
      Put put = operationPut(tenant);
      try {
          table.put(put);
          if( ++unCommittedSize >= batchSize )
          {
              table.flushCommits();
              unCommittedSize = 0;
          }
      } catch (RetriesExhaustedWithDetailsException e) {
          logger.error("Could not output tuple", e);
          DTThrowable.rethrow(e);
      } catch (InterruptedIOException e) {
          logger.error("Could not output tuple", e);
          DTThrowable.rethrow(e);
      }
      catch (IOException io) {

      }

  }

  @Override
  public void endWindow()
  {
      try
      {
          if( unCommittedSize > 0 ) {
              store.getTable().flushCommits();
              unCommittedSize = 0;
          }
      }
      catch (RetriesExhaustedWithDetailsException e) {
          logger.error("Could not output tuple", e);
          DTThrowable.rethrow(e);
      } catch (InterruptedIOException e) {
          logger.error("Could not output tuple", e);
          DTThrowable.rethrow(e);
      }
      catch (IOException io) {

      }
  }

  @Override
  public Put operationPut(Tenant tenant) {
      String rowKey = tenant.getVolumeName() + System.currentTimeMillis();
      AuditLog log;
          String msgjson = gson.toJson(tenant);

          if (StringUtils.isNotEmpty(tenant.getGl())) {
              log = new AuditLog(msgjson,
Status.VALIDATION_SUCCESS.toString());
          } else {
              log = new AuditLog(msgjson,
Status.VALIDATION_FAILED.toString());
          }

      Put p = new Put(Bytes.toBytes(rowKey));
      p.addColumn(AUDIT_CF_LOG, AUDIT_MESSAGE,
Bytes.toBytes(log.getMessage()));
      p.addColumn(AUDIT_CF_LOG, AUDIT_STATUS,
Bytes.toBytes(log.getStatus()));
      p.addColumn(AUDIT_CF_LOG, AUDIT_RETRY_CNT,
Bytes.toBytes(log.getUpdateCount()));
      return p;
  }



Thanks!!



On Thu, Oct 20, 2016 at 4:53 PM, Thomas Weise <t...@apache.org> wrote:
Please make sure that your Gson parser is a transient member of the
operator.

On Thu, Oct 20, 2016 at 2:33 PM, Bandaru, Srinivas
<srinivas.band...@optum.com> wrote:
Hi,

We are building a an apex (Datatorrent) application to write into Hbase.
Getting the below error while launching the application. Is anyone had the
similar issue?



2016-10-20 16:05:50,675 INFO org.apache.hadoop.service.AbstractService:
Service com.datatorrent.stram.StreamingAppMasterService failed in state
INITED; cause: com.esotericsoftware.kryo.KryoException: Encountered
unregistered class ID: 95

Serialization trace:

systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)

instanceCreators (com.google.gson.internal.ConstructorConstructor)

constructorConstructor (com.google.gson.Gson)

gson (com.example.datatorrent.HbaseTableUpdate)

com.esotericsoftware.kryo.KryoException: Encountered unregistered class
ID: 95

Serialization trace:

systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)

instanceCreators (com.google.gson.internal.ConstructorConstructor)

constructorConstructor (com.google.gson.Gson)

gson (com.example.datatorrent.HbaseTableUpdate)





Thanks,

Srinivas Bandaru


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the
intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


Reply via email to