Hello, I noticed an issue with bulk insert through map reduce in phoenix
4.4.0.2.3.0.0-2557, using outline of the code below
Normally the inserts of about 25 million rows complete in about 5 mins, there
are 5 region servers and the phoenix table has 32 buckets
But sometimes (maybe after major compactions or region movement?), writes
simply slow down to 90 mins, when I truncate SYSTEM.STATS hbase table, the
inserts get a little faster (60 mins), but when I truncate both SYSTEM.CATALOG
& SYSTEM.STATS tables, and recreate the phoenix table def(s) the inserts go
back to 5 mins, the workaround of truncating SYSTEM tables is not sustainable
for long, can someone help and let me know if there is a patch available for
this? Thanks in advance.
Job job = Job.getInstance(conf, NAME);
// Set the target Phoenix table and the columns
PhoenixMapReduceUtil.setOutput(job, tableName,
"WEB_ID,WEB_PAGE_LABEL,DEVICE_TYPE," +
"WIDGET_INSTANCE_ID,WIDGET_TYPE,WIDGET_VERSION,WIDGET_CONTEXT," +
"TOTAL_CLICKS,TOTAL_CLICK_VIEWS,TOTAL_HOVER_TIME_MS,TOTAL_TIME_ON_PAGE_MS,TOTAL_VIEWABLE_TIME_MS,"
+
"VIEW_COUNT,USER_SEGMENT,DIM_DATE_KEY,VIEW_DATE,VIEW_DATE_TIMESTAMP,ROW_NUMBER");
FileInputFormat.setInputPaths(job, inputPath);
job.setMapperClass(WidgetPhoenixMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(WidgetPagesStatsWritable.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
TableMapReduceUtil.addDependencyJars(job);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
public static class WidgetPhoenixMapper extends Mapper<LongWritable, Text,
NullWritable, WidgetPagesStatsWritable> {
@Override
public void map(LongWritable longWritable, Text text, Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
String rundateString = conf.get("rundate");
PagesSegmentWidgetLineParser parser = new
PagesSegmentWidgetLineParser();
try {
PagesSegmentWidget pagesSegmentWidget =
parser.parse(text.toString());
if (pagesSegmentWidget != null) {
WidgetPagesStatsWritable widgetPagesStatsWritable = new
WidgetPagesStatsWritable();
WidgetPagesStats widgetPagesStats = new WidgetPagesStats();
widgetPagesStats.setWebId(pagesSegmentWidget.getWebId());
widgetPagesStats.setWebPageLabel(pagesSegmentWidget.getWebPageLabel());
widgetPagesStats.setWidgetInstanceId(pagesSegmentWidget.getWidgetInstanceId());
…..
widgetPagesStatsWritable.setWidgetPagesStats(widgetPagesStats);
context.write(NullWritable.get(), widgetPagesStatsWritable);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
public final class WidgetPagesStats {
private String webId;
private String webPageLabel;
private long widgetInstanceId;
private String widgetType;
…
@Override
public boolean equals(Object o) {
..
}
@Override
public int hashCode() {
..
}
@Override
public String toString() {
return "WidgetPhoenix{“….
'}';
}
}
public class WidgetPagesStatsWritable implements DBWritable, Writable {
private WidgetPagesStats widgetPagesStats;
public void readFields(DataInput input) throws IOException {
widgetPagesStats.setWebId(input.readLine());
widgetPagesStats.setWebPageLabel(input.readLine());
widgetPagesStats.setWidgetInstanceId(input.readLong());
widgetPagesStats.setWidgetType(input.readLine());
…
}
public void write(DataOutput output) throws IOException {
output.writeBytes(widgetPagesStats.getWebId());
output.writeBytes(widgetPagesStats.getWebPageLabel());
output.writeLong(widgetPagesStats.getWidgetInstanceId());
output.writeBytes(widgetPagesStats.getWidgetType());
..
}
public void readFields(ResultSet rs) throws SQLException {
widgetPagesStats.setWebId(rs.getString("WEB_ID"));
widgetPagesStats.setWebPageLabel(rs.getString("WEB_PAGE_LABEL"));
widgetPagesStats.setWidgetInstanceId(rs.getLong("WIDGET_INSTANCE_ID"));
widgetPagesStats.setWidgetType(rs.getString("WIDGET_TYPE"));
…
}
public void write(PreparedStatement pstmt) throws SQLException {
Connection connection = pstmt.getConnection();
PhoenixConnection phoenixConnection = (PhoenixConnection) connection;
//connection.getClientInfo().setProperty("scn",
Long.toString(widgetPhoenix.getViewDateTimestamp()));
pstmt.setString(1, widgetPagesStats.getWebId());
pstmt.setString(2, widgetPagesStats.getWebPageLabel());
pstmt.setString(3, widgetPagesStats.getDeviceType());
pstmt.setLong(4, widgetPagesStats.getWidgetInstanceId());
…
}
public WidgetPagesStats getWidgetPagesStats() {
return widgetPagesStats;
}
public void setWidgetPagesStats(WidgetPagesStats widgetPagesStats) {
this.widgetPagesStats = widgetPagesStats;
}
}
----------------------------------------------------------------------
This message and any attachments are intended only for the use of the addressee
and may contain information that is privileged and confidential. If the reader
of the message is not the intended recipient or an authorized representative of
the intended recipient, you are hereby notified that any dissemination of this
communication is strictly prohibited. If you have received this communication
in error, notify the sender immediately by return email and delete the message
and any attachments from your system.