Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions be/src/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
#include "common/status.h"
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type_serde/data_type_string_serde.h"
#include "exec/scan/scanner.h"
#include "format/file_reader/new_plain_binary_line_reader.h"
#include "format/file_reader/new_plain_text_line_reader.h"
Expand Down Expand Up @@ -451,6 +454,7 @@ Status CsvReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof)
} else {
auto columns_guard = block->mutate_columns_scoped();
auto& columns = columns_guard.mutable_columns();
_reserve_nullable_string_columns(columns, batch_size);
while (rows < batch_size && !_line_reader_eof &&
(columns_byte_size(columns) < max_block_bytes)) {
const uint8_t* ptr = nullptr;
Expand Down Expand Up @@ -559,28 +563,31 @@ Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
}

Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
auto& null_column = assert_cast<ColumnNullable&>(column);
if (_empty_field_as_null) {
if (slice.size == 0) {
null_column.insert_data(nullptr, 0);
return Status::OK();
}
// This is the per-row per-column hot path of CSV load (load reads every column as
// nullable string). The column type was already verified by the checked assert_cast
// in _reserve_nullable_string_columns at the beginning of the batch, so the casts
// here can skip the release-build typeid check.
auto& null_column = assert_cast<ColumnNullable&, TypeCheckOnRelease::DISABLE>(column);
Comment thread
liaoxin01 marked this conversation as resolved.
auto& string_column = assert_cast<ColumnString&, TypeCheckOnRelease::DISABLE>(
null_column.get_nested_column());
if (_empty_field_as_null && slice.size == 0) {
string_column.insert_default();
null_column.get_null_map_data().push_back(1);
return Status::OK();
}
if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) {
if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
null_column.insert_data(nullptr, 0);
string_column.insert_default();
null_column.get_null_map_data().push_back(1);
return Status::OK();
}
}
static DataTypeStringSerDe stringSerDe(TYPE_STRING);
auto st = stringSerDe.deserialize_one_cell_from_csv(null_column.get_nested_column(), slice,
_options);
if (!st.ok()) {
// fill null if fail
null_column.insert_data(nullptr, 0); // 0 is meaningless here
return Status::OK();
// Same as DataTypeStringSerDe::deserialize_one_cell_from_csv (which never fails),
// written out here to skip the SerDe layer and its per-cell assert_cast.
if (_options.escape_char != 0) {
escape_string_for_csv(slice.data, &slice.size, _options.escape_char, _options.quote_char);
}
// fill not null if success
string_column.insert_data(slice.data, slice.size);
null_column.get_null_map_data().push_back(0);
return Status::OK();
}
Expand Down Expand Up @@ -770,6 +777,25 @@ Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColum
return Status::OK();
}

void CsvReader::_reserve_nullable_string_columns(std::vector<MutableColumnPtr>& columns,
size_t batch_size) {
for (int i = 0; i < _file_slot_descs.size(); ++i) {
if (!_use_nullable_string_opt[i]) {
continue;
}
IColumn* col_ptr = _is_load ? columns[i].get() : columns[_file_slot_idx_map[i]].get();
// The checked casts here (once per batch) guarantee the column types for the
// unchecked per-row casts in _deserialize_nullable_string.
auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
auto& string_column = assert_cast<ColumnString&>(null_column.get_nested_column());
// Reserve up front so the per-row loop does not pay for incremental growth.
// The string chars are not reserved because their total size is unpredictable.
string_column.get_offsets().reserve(string_column.size() + batch_size);
null_column.get_null_map_data().reserve(null_column.get_null_map_data().size() +
batch_size);
}
}

Status CsvReader::_fill_empty_line(std::vector<MutableColumnPtr>& columns, size_t* rows) {
for (int i = 0; i < _file_slot_descs.size(); ++i) {
IColumn* col_ptr = columns[i].get();
Expand Down
5 changes: 5 additions & 0 deletions be/src/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ class CsvReader : public TableFormatReader {
Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns,
size_t* rows);
Status _fill_empty_line(std::vector<MutableColumnPtr>& columns, size_t* rows);
// Called once per batch: verifies the nullable string columns' concrete types (so the
// per-row casts in _deserialize_nullable_string can skip the release-build type check)
// and reserves their offsets/null_map capacity to avoid incremental growth per row.
void _reserve_nullable_string_columns(std::vector<MutableColumnPtr>& columns,
size_t batch_size);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
void _init_system_properties();
Expand Down
26 changes: 16 additions & 10 deletions be/src/format/text/text_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "core/block/block.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/data_type_serde/data_type_string_serde.h"
#include "exec/scan/scanner.h"
#include "format/csv/csv_reader.h"
#include "format/file_reader/new_plain_text_line_reader.h"
Expand Down Expand Up @@ -166,20 +169,23 @@ Status TextReader::_validate_line(const Slice& line, bool* success) {
}

Status TextReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
auto& null_column = assert_cast<ColumnNullable&>(column);
// Hot path of hive text load, see CsvReader::_deserialize_nullable_string. The
// column type was verified by the checked assert_cast in
// _reserve_nullable_string_columns at the beginning of the batch.
auto& null_column = assert_cast<ColumnNullable&, TypeCheckOnRelease::DISABLE>(column);
auto& string_column = assert_cast<ColumnString&, TypeCheckOnRelease::DISABLE>(
Comment thread
liaoxin01 marked this conversation as resolved.
null_column.get_nested_column());
if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
null_column.insert_data(nullptr, 0);
string_column.insert_default();
null_column.get_null_map_data().push_back(1);
return Status::OK();
}
static DataTypeStringSerDe stringSerDe(TYPE_STRING);
auto st = stringSerDe.deserialize_one_cell_from_hive_text(null_column.get_nested_column(),
slice, _options);
if (!st.ok()) {
// fill null if fail
null_column.insert_data(nullptr, 0); // 0 is meaningless here
return Status::OK();
// Same as DataTypeStringSerDe::deserialize_one_cell_from_hive_text (which never
// fails), written out here to skip the SerDe layer and its per-cell assert_cast.
if (_options.escape_char != 0) {
escape_string(slice.data, &slice.size, _options.escape_char);
}
// fill not null if success
string_column.insert_data(slice.data, slice.size);
null_column.get_null_map_data().push_back(0);
return Status::OK();
}
Expand Down
Loading