[ https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-26703. ---------------------------------- Resolution: Duplicate > Hive record writer will always depends on parquet-1.6 writer should fix it > --------------------------------------------------------------------------- > > Key: SPARK-26703 > URL: https://issues.apache.org/jira/browse/SPARK-26703 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.2, 2.4.0 > Reporter: zhoukang > Priority: Major > > Currently, when we are using insert into hive table related command. > The parquet file generated will always be version 1.6,reason is below: > 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter > {code:java} > private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( > jobConf, > tableDesc, > serializer.getSerializedClass, > fileSinkConf, > new Path(path), > Reporter.NULL) > {code} > 2. we will call > {code:java} > public static RecordWriter getHiveRecordWriter(JobConf jc, > TableDesc tableInfo, Class<? extends Writable> outputClass, > FileSinkDesc conf, Path outPath, Reporter reporter) throws > HiveException { > HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, > tableInfo); > try { > boolean isCompressed = conf.getCompressed(); > JobConf jc_output = jc; > if (isCompressed) { > jc_output = new JobConf(jc); > String codecStr = conf.getCompressCodec(); > if (codecStr != null && !codecStr.trim().equals("")) { > Class<? extends CompressionCodec> codec = > (Class<? extends CompressionCodec>) > JavaUtils.loadClass(codecStr); > FileOutputFormat.setOutputCompressorClass(jc_output, codec); > } > String type = conf.getCompressType(); > if (type != null && !type.trim().equals("")) { > CompressionType style = CompressionType.valueOf(type); > SequenceFileOutputFormat.setOutputCompressionType(jc, style); > } > } > return getRecordWriter(jc_output, hiveOutputFormat, outputClass, > isCompressed, tableInfo.getProperties(), outPath, reporter); > } catch (Exception e) { > throw new HiveException(e); > } > } > public static RecordWriter getRecordWriter(JobConf jc, > OutputFormat<?, ?> outputFormat, > Class<? extends Writable> valueClass, boolean isCompressed, > Properties tableProp, Path outPath, Reporter reporter > ) throws IOException, HiveException { > if (!(outputFormat instanceof HiveOutputFormat)) { > outputFormat = new HivePassThroughOutputFormat(outputFormat); > } > return ((HiveOutputFormat)outputFormat).getHiveRecordWriter( > jc, outPath, valueClass, isCompressed, tableProp, reporter); > } > {code} > 3. then in MapredParquetOutPutFormat > {code:java} > public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter > getHiveRecordWriter( > final JobConf jobConf, > final Path finalOutPath, > final Class<? extends Writable> valueClass, > final boolean isCompressed, > final Properties tableProperties, > final Progressable progress) throws IOException { > LOG.info("creating new record writer..." + this); > final String columnNameProperty = > tableProperties.getProperty(IOConstants.COLUMNS); > final String columnTypeProperty = > tableProperties.getProperty(IOConstants.COLUMNS_TYPES); > List<String> columnNames; > List<TypeInfo> columnTypes; > if (columnNameProperty.length() == 0) { > columnNames = new ArrayList<String>(); > } else { > columnNames = Arrays.asList(columnNameProperty.split(",")); > } > if (columnTypeProperty.length() == 0) { > columnTypes = new ArrayList<TypeInfo>(); > } else { > columnTypes = > TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); > } > > DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, > columnTypes), jobConf); > return getParquerRecordWriterWrapper(realOutputFormat, jobConf, > finalOutPath.toString(), > progress,tableProperties); > } > {code} > 4. then call > {code:java} > public ParquetRecordWriterWrapper( > final OutputFormat<Void, ParquetHiveRecord> realOutputFormat, > final JobConf jobConf, > final String name, > final Progressable progress, Properties tableProperties) throws > IOException { > try { > // create a TaskInputOutputContext > TaskAttemptID taskAttemptID = > TaskAttemptID.forName(jobConf.get("mapred.task.id")); > if (taskAttemptID == null) { > taskAttemptID = new TaskAttemptID(); > } > taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID); > LOG.info("initialize serde with table properties."); > initializeSerProperties(taskContext, tableProperties); > LOG.info("creating real writer to write at " + name); > realWriter = > ((ParquetOutputFormat) > realOutputFormat).getRecordWriter(taskContext, new Path(name)); > LOG.info("real writer: " + realWriter); > } catch (final InterruptedException e) { > throw new IOException(e); > } > } > {code} > And the ((ParquetOutputFormat) is verison 1.6. > And all file generated will miss some useful Statistics like min max of > string. > We should fix this issue to use new features of parquet -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org