jt2594838 commented on code in PR #409: URL: https://github.com/apache/tsfile/pull/409#discussion_r1955619045
########## cpp/src/common/cache/lru_cache.h: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef COMMON_CACHE_LRU_CACHE_H +#define COMMON_CACHE_LRU_CACHE_H + +#include <unordered_map> +#include <list> +#include <mutex> +#include <algorithm> + +#include "utils/errno_define.h" + +namespace common { +/* + * a noop lockable concept that can be used in place of std::mutex + */ +class NullLock { + public: + void lock() {} + void unlock() {} + bool try_lock() { return true; } +}; + +template <typename K, typename V> +struct KeyValuePair { + public: + K key; + V value; + + KeyValuePair(K k, V v) : key(std::move(k)), value(std::move(v)) {} +}; + +/** + * The LRU Cache class templated by + * Key - key type + * Value - value type + * MapType - an associative container like std::unordered_map + * LockType - a lock type derived from the Lock class (default: + *NullLock = no synchronization) + * + * The default NullLock based template is not thread-safe, however passing + *Lock=std::mutex will make it + * thread-safe + */ +template <class Key, class Value, class Lock = NullLock, + class Map = std::unordered_map< + Key, typename std::list<KeyValuePair<Key, Value>>::iterator>> +class Cache { + public: + typedef KeyValuePair<Key, Value> node_type; + typedef std::list<KeyValuePair<Key, Value>> list_type; + typedef Map map_type; + typedef Lock lock_type; + using Guard = std::lock_guard<lock_type>; + /** + * the maxSize is the soft limit of keys and (maxSize + elasticity) is the + * hard limit + * the cache is allowed to grow till (maxSize + elasticity) and is pruned + * back to maxSize keys set maxSize = 0 for an unbounded cache (but in that + * case, you're better off using a std::unordered_map directly anyway! :) + */ + explicit Cache(size_t maxSize = 64, size_t elasticity = 10) + : maxSize_(maxSize), elasticity_(elasticity) {} + virtual ~Cache() = default; + size_t size() const { + Guard g(lock_); + return cache_.size(); + } + bool empty() const { + Guard g(lock_); + return cache_.empty(); + } + void clear() { + Guard g(lock_); + cache_.clear(); + keys_.clear(); + } + void insert(const Key& k, Value v) { + Guard g(lock_); + const auto iter = cache_.find(k); + if (iter != cache_.end()) { + iter->second->value = v; + keys_.splice(keys_.begin(), keys_, iter->second); + return; + } + + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } + void emplace(const Key& k, Value&& v) { + Guard g(lock_); + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } + /** + for backward compatibity. redirects to tryGetCopy() + */ + bool tryGet(const Key& kIn, Value& vOut) { return tryGetCopy(kIn, vOut); } + + bool tryGetCopy(const Key& kIn, Value& vOut) { + Guard g(lock_); + Value tmp; + if (!tryGetRef_nolock(kIn, tmp)) { + return false; + } + vOut = tmp; + return true; + } + + bool tryGetRef(const Key& kIn, Value& vOut) { + Guard g(lock_); + return tryGetRef_nolock(kIn, vOut); + } + /** + * The const reference returned here is only + * guaranteed to be valid till the next insert/delete + * in multi-threaded apps use getCopy() to be threadsafe + */ + const Value& getRef(const Key& k) { + Guard g(lock_); + return get_nolock(k); + } + + /** + added for backward compatibility + */ + Value get(const Key& k) { return getCopy(k); } + /** + * returns a copy of the stored object (if found) + * safe to use/recommended in multi-threaded apps + */ + Value getCopy(const Key& k) { + Guard g(lock_); + return get_nolock(k); + } + + bool remove(const Key& k) { + Guard g(lock_); + auto iter = cache_.find(k); + if (iter == cache_.end()) { + return false; + } + keys_.erase(iter->second); + cache_.erase(iter); + return true; + } + bool contains(const Key& k) const { + Guard g(lock_); + return cache_.find(k) != cache_.end(); + } + + size_t getMaxSize() const { return maxSize_; } + size_t getElasticity() const { return elasticity_; } + size_t getMaxAllowedSize() const { return maxSize_ + elasticity_; } + template <typename F> + void cwalk(F& f) const { + Guard g(lock_); + std::for_each(keys_.begin(), keys_.end(), f); + } + + protected: + const int get_nolock(const Key& k, Value& vOut) { + const auto iter = cache_.find(k); + if (iter == cache_.end()) { + return E_NOT_EXIST; + } + keys_.splice(keys_.begin(), keys_, iter->second); + vOut = iter->second->value; + return E_OK; + } + bool tryGetRef_nolock(const Key& kIn, Value& vOut) { + const auto iter = cache_.find(kIn); + if (iter == cache_.end()) { + return false; + } + keys_.splice(keys_.begin(), keys_, iter->second); + vOut = iter->second->value; + return true; + } + size_t prune() { + size_t maxAllowed = maxSize_ + elasticity_; + if (maxSize_ == 0 || cache_.size() < maxAllowed) { + return 0; + } + size_t count = 0; + while (cache_.size() > maxSize_) { + cache_.erase(keys_.back().key); + keys_.pop_back(); + ++count; + } + return count; + } + + private: + // Disallow copying. + Cache(const Cache&) = delete; + Cache& operator=(const Cache&) = delete; + + mutable Lock lock_; + Map cache_; + list_type keys_; Review Comment: keys_ -> entries_ ########## cpp/src/common/cache/lru_cache.h: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef COMMON_CACHE_LRU_CACHE_H +#define COMMON_CACHE_LRU_CACHE_H + +#include <unordered_map> +#include <list> +#include <mutex> +#include <algorithm> + +#include "utils/errno_define.h" + +namespace common { +/* + * a noop lockable concept that can be used in place of std::mutex + */ +class NullLock { + public: + void lock() {} + void unlock() {} + bool try_lock() { return true; } +}; + +template <typename K, typename V> +struct KeyValuePair { + public: + K key; + V value; + + KeyValuePair(K k, V v) : key(std::move(k)), value(std::move(v)) {} +}; + +/** + * The LRU Cache class templated by + * Key - key type + * Value - value type + * MapType - an associative container like std::unordered_map + * LockType - a lock type derived from the Lock class (default: + *NullLock = no synchronization) + * + * The default NullLock based template is not thread-safe, however passing + *Lock=std::mutex will make it + * thread-safe + */ +template <class Key, class Value, class Lock = NullLock, + class Map = std::unordered_map< + Key, typename std::list<KeyValuePair<Key, Value>>::iterator>> +class Cache { + public: + typedef KeyValuePair<Key, Value> node_type; + typedef std::list<KeyValuePair<Key, Value>> list_type; + typedef Map map_type; + typedef Lock lock_type; + using Guard = std::lock_guard<lock_type>; + /** + * the maxSize is the soft limit of keys and (maxSize + elasticity) is the + * hard limit + * the cache is allowed to grow till (maxSize + elasticity) and is pruned + * back to maxSize keys set maxSize = 0 for an unbounded cache (but in that + * case, you're better off using a std::unordered_map directly anyway! :) + */ + explicit Cache(size_t maxSize = 64, size_t elasticity = 10) + : maxSize_(maxSize), elasticity_(elasticity) {} + virtual ~Cache() = default; + size_t size() const { + Guard g(lock_); + return cache_.size(); + } + bool empty() const { + Guard g(lock_); + return cache_.empty(); + } + void clear() { + Guard g(lock_); + cache_.clear(); + keys_.clear(); + } + void insert(const Key& k, Value v) { + Guard g(lock_); + const auto iter = cache_.find(k); + if (iter != cache_.end()) { + iter->second->value = v; + keys_.splice(keys_.begin(), keys_, iter->second); + return; + } + + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } + void emplace(const Key& k, Value&& v) { + Guard g(lock_); + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } + /** + for backward compatibity. redirects to tryGetCopy() + */ + bool tryGet(const Key& kIn, Value& vOut) { return tryGetCopy(kIn, vOut); } + + bool tryGetCopy(const Key& kIn, Value& vOut) { + Guard g(lock_); + Value tmp; + if (!tryGetRef_nolock(kIn, tmp)) { + return false; + } + vOut = tmp; + return true; + } + + bool tryGetRef(const Key& kIn, Value& vOut) { + Guard g(lock_); + return tryGetRef_nolock(kIn, vOut); + } + /** + * The const reference returned here is only + * guaranteed to be valid till the next insert/delete + * in multi-threaded apps use getCopy() to be threadsafe + */ + const Value& getRef(const Key& k) { + Guard g(lock_); + return get_nolock(k); + } + + /** + added for backward compatibility + */ + Value get(const Key& k) { return getCopy(k); } + /** + * returns a copy of the stored object (if found) + * safe to use/recommended in multi-threaded apps + */ + Value getCopy(const Key& k) { + Guard g(lock_); + return get_nolock(k); + } + + bool remove(const Key& k) { + Guard g(lock_); + auto iter = cache_.find(k); + if (iter == cache_.end()) { + return false; + } + keys_.erase(iter->second); + cache_.erase(iter); + return true; + } + bool contains(const Key& k) const { + Guard g(lock_); + return cache_.find(k) != cache_.end(); + } + + size_t getMaxSize() const { return maxSize_; } + size_t getElasticity() const { return elasticity_; } + size_t getMaxAllowedSize() const { return maxSize_ + elasticity_; } + template <typename F> + void cwalk(F& f) const { + Guard g(lock_); + std::for_each(keys_.begin(), keys_.end(), f); + } + + protected: + const int get_nolock(const Key& k, Value& vOut) { + const auto iter = cache_.find(k); + if (iter == cache_.end()) { + return E_NOT_EXIST; + } + keys_.splice(keys_.begin(), keys_, iter->second); + vOut = iter->second->value; + return E_OK; + } + bool tryGetRef_nolock(const Key& kIn, Value& vOut) { + const auto iter = cache_.find(kIn); + if (iter == cache_.end()) { + return false; + } + keys_.splice(keys_.begin(), keys_, iter->second); + vOut = iter->second->value; + return true; + } Review Comment: The two methods seem highly alike. Are both of them necessary? ########## cpp/src/common/cache/lru_cache.h: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef COMMON_CACHE_LRU_CACHE_H +#define COMMON_CACHE_LRU_CACHE_H + +#include <unordered_map> +#include <list> +#include <mutex> +#include <algorithm> + +#include "utils/errno_define.h" + +namespace common { +/* + * a noop lockable concept that can be used in place of std::mutex + */ +class NullLock { + public: + void lock() {} + void unlock() {} + bool try_lock() { return true; } +}; + +template <typename K, typename V> +struct KeyValuePair { + public: + K key; + V value; + + KeyValuePair(K k, V v) : key(std::move(k)), value(std::move(v)) {} +}; + +/** + * The LRU Cache class templated by + * Key - key type + * Value - value type + * MapType - an associative container like std::unordered_map + * LockType - a lock type derived from the Lock class (default: + *NullLock = no synchronization) + * + * The default NullLock based template is not thread-safe, however passing + *Lock=std::mutex will make it + * thread-safe + */ +template <class Key, class Value, class Lock = NullLock, + class Map = std::unordered_map< + Key, typename std::list<KeyValuePair<Key, Value>>::iterator>> +class Cache { + public: + typedef KeyValuePair<Key, Value> node_type; + typedef std::list<KeyValuePair<Key, Value>> list_type; + typedef Map map_type; + typedef Lock lock_type; + using Guard = std::lock_guard<lock_type>; + /** + * the maxSize is the soft limit of keys and (maxSize + elasticity) is the + * hard limit + * the cache is allowed to grow till (maxSize + elasticity) and is pruned + * back to maxSize keys set maxSize = 0 for an unbounded cache (but in that Review Comment: "back to maxSize keys set maxSize = 0" add a full stop between “maxSize keys” and "set" or start a new line. ########## cpp/src/common/cache/lru_cache.h: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef COMMON_CACHE_LRU_CACHE_H +#define COMMON_CACHE_LRU_CACHE_H + +#include <unordered_map> +#include <list> +#include <mutex> +#include <algorithm> + +#include "utils/errno_define.h" + +namespace common { +/* + * a noop lockable concept that can be used in place of std::mutex + */ +class NullLock { + public: + void lock() {} + void unlock() {} + bool try_lock() { return true; } +}; + +template <typename K, typename V> +struct KeyValuePair { + public: + K key; + V value; + + KeyValuePair(K k, V v) : key(std::move(k)), value(std::move(v)) {} +}; + +/** + * The LRU Cache class templated by + * Key - key type + * Value - value type + * MapType - an associative container like std::unordered_map + * LockType - a lock type derived from the Lock class (default: + *NullLock = no synchronization) + * + * The default NullLock based template is not thread-safe, however passing + *Lock=std::mutex will make it + * thread-safe + */ +template <class Key, class Value, class Lock = NullLock, + class Map = std::unordered_map< + Key, typename std::list<KeyValuePair<Key, Value>>::iterator>> +class Cache { + public: + typedef KeyValuePair<Key, Value> node_type; + typedef std::list<KeyValuePair<Key, Value>> list_type; + typedef Map map_type; + typedef Lock lock_type; + using Guard = std::lock_guard<lock_type>; + /** + * the maxSize is the soft limit of keys and (maxSize + elasticity) is the + * hard limit + * the cache is allowed to grow till (maxSize + elasticity) and is pruned + * back to maxSize keys set maxSize = 0 for an unbounded cache (but in that + * case, you're better off using a std::unordered_map directly anyway! :) + */ + explicit Cache(size_t maxSize = 64, size_t elasticity = 10) + : maxSize_(maxSize), elasticity_(elasticity) {} + virtual ~Cache() = default; + size_t size() const { + Guard g(lock_); + return cache_.size(); + } + bool empty() const { + Guard g(lock_); + return cache_.empty(); + } + void clear() { + Guard g(lock_); + cache_.clear(); + keys_.clear(); + } + void insert(const Key& k, Value v) { + Guard g(lock_); + const auto iter = cache_.find(k); + if (iter != cache_.end()) { + iter->second->value = v; + keys_.splice(keys_.begin(), keys_, iter->second); + return; + } + + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } + void emplace(const Key& k, Value&& v) { + Guard g(lock_); + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } Review Comment: Is this method necessary? If `emplace` is called for the same key twice, or mixed with `insert,` then the integrity of the cache will be compromised. ########## cpp/src/common/tsblock/tsblock.h: ########## @@ -206,6 +234,15 @@ class RowIterator { FORCE_INLINE bool end() { return row_id_ >= tsblock_->row_count_; } + FORCE_INLINE bool has_next() { return row_id_ < tsblock_->row_count_; } + + FORCE_INLINE uint32_t get_column_count() { return column_count_; } + + FORCE_INLINE TSDataType get_data_type(uint32_t slot_index) { + ASSERT(slot_index < column_count_); + return tsblock_->vectors_[slot_index]->get_vector_type(); + } Review Comment: slot_index -> column_index ########## cpp/src/reader/block/single_device_tsblock_reader.cc: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "single_device_tsblock_reader.h" + +namespace storage { + +SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( + DeviceQueryTask* device_query_task, uint32_t block_size, + IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader, + Filter* time_filter, Filter* field_filter) + : device_query_task_(device_query_task), + field_filter_(field_filter), + block_size_(block_size), + tuple_desc_(), + tsfile_io_reader_(tsfile_io_reader) { + pa_.init(512, common::AllocModID::MOD_TSFILE_READER); + tuple_desc_.reset(); + common::init_common(); + tuple_desc_.push_back(common::g_time_column_desc); + auto table_schema = device_query_task->get_table_schema(); + for (const auto& column_name : device_query_task_->get_column_names()) { + common::ColumnDesc column_desc( + table_schema->get_column_desc(column_name)); + tuple_desc_.push_back(column_desc); + } + current_block_ = common::TsBlock::create_tsblock(&tuple_desc_, block_size); + col_appenders_.resize(tuple_desc_.get_column_count()); + for (int i = 0; i < tuple_desc_.get_column_count(); i++) { + col_appenders_[i] = new common::ColAppender(i, current_block_); + } + row_appender_ = new common::RowAppender(current_block_); + std::vector<ITimeseriesIndex*> time_series_indexs( + device_query_task_->get_column_names().size()); + tsfile_io_reader_->get_timeseries_indexes( + device_query_task->get_device_id(), + device_query_task->get_column_names(), time_series_indexs, pa_); + for (const auto& time_series_index : time_series_indexs) { + construct_column_context(time_series_index, time_filter); + } + + for (const auto& id_column : + device_query_task->get_column_mapping()->get_id_columns()) { + const auto& column_pos_in_result = + device_query_task->get_column_mapping()->get_column_pos(id_column); + int column_pos_in_id = + table_schema->find_id_column_order(id_column) + 1; + id_column_contexts_.insert(std::make_pair( + id_column, + IdColumnContext(column_pos_in_result, column_pos_in_id))); + } +} + +bool SingleDeviceTsBlockReader::has_next() { + if (!last_block_returned_) { + return true; + } + + if (field_column_contexts_.empty()) { + return false; + } + current_block_->reset(); + + next_time_ = -1; + + std::vector<MeasurementColumnContext*> min_time_columns; + while (current_block_->get_row_count() < block_size_) { + for (auto& column_context : field_column_contexts_) { + int64_t time; + if (IS_FAIL(column_context.second->get_current_time(time))) { + continue; + } + if (next_time_ == -1 || time < next_time_) { + next_time_ = time; + min_time_columns.clear(); + min_time_columns.push_back(column_context.second); + } else if (time == next_time_) { + min_time_columns.push_back(column_context.second); + } + } + + if (IS_FAIL(fill_measurements(min_time_columns))) { + return false; + } else { + next_time_ = -1; + } + + if (field_column_contexts_.empty()) { + break; + } + } + if (current_block_->get_row_count() > 0) { + fill_ids(); + current_block_->fill_trailling_nulls(); + last_block_returned_ = false; + return true; + } + return false; +} + +int SingleDeviceTsBlockReader::fill_measurements( + std::vector<MeasurementColumnContext*>& column_contexts) { + int ret = common::E_OK; + if (field_filter_ == + nullptr /*TODO: || field_filter_->satisfy(column_contexts)*/) { + if (!col_appenders_[0]->add_row()) { + assert(false); + } + // std::cout << col_appenders_[0]->tsblock_->debug_string() << std::endl; + col_appenders_[0]->append((char*)&next_time_, sizeof(next_time_)); + for (uint32_t i = 0; i < column_contexts.size(); i++) { + column_contexts[i]->fill_into(col_appenders_); + advance_column(column_contexts[i]); + } + // for (auto& column_contest : column_contexts) { + // column_contest->fill_into(col_appenders_); + // advance_column(column_contest); + // } + row_appender_->add_row(); + } + return ret; +} + +void SingleDeviceTsBlockReader::advance_column( + MeasurementColumnContext* column_context) { + if (column_context->move_iter() == common::E_NO_MORE_DATA) { + column_context->remove_from(field_column_contexts_); + } +} + +void SingleMeasurementColumnContext::remove_from( + std::map<std::string, MeasurementColumnContext*>& column_context_map) { + auto iter = column_context_map.find(column_name_); + if (iter != column_context_map.end()) { + delete iter->second; + column_context_map.erase(iter); + } +} + +void SingleDeviceTsBlockReader::fill_ids() { + for (const auto& entry : id_column_contexts_) { + const auto& id_column_context = entry.second; + for (int32_t pos : id_column_context.pos_in_result_) { + common::String device_id( + device_query_task_->get_device_id()->get_segments().at( + id_column_context.pos_in_device_id_)); + col_appenders_[pos]->fill((char*)&device_id, sizeof(device_id), + current_block_->get_row_count()); + } + } +} + +int SingleDeviceTsBlockReader::next(common::TsBlock*& ret_block) { + if (!has_next()) { + return common::E_NO_MORE_DATA; + } + last_block_returned_ = true; + ret_block = current_block_; + return common::E_OK; +} + +void SingleDeviceTsBlockReader::close() { + for (auto& column_context : field_column_contexts_) { + delete column_context.second; + } + if (current_block_) { + delete current_block_; + current_block_ = nullptr; + } + for (auto& col_appender : col_appenders_) { + if (col_appender) { + delete col_appender; + col_appender = nullptr; + } + } + if (row_appender_) { + delete row_appender_; + row_appender_ = nullptr; + } +} + +void SingleDeviceTsBlockReader::construct_column_context( + const ITimeseriesIndex* time_series_index, Filter* time_filter) { + // TODO: judge whether the time_series_index is aligned and jump empty chunk + SingleMeasurementColumnContext* column_context = + new SingleMeasurementColumnContext(tsfile_io_reader_); + column_context->init(device_query_task_, time_series_index, time_filter, + pa_); + field_column_contexts_.insert(std::make_pair( + time_series_index->get_measurement_name().to_std_string(), + column_context)); +} + +int SingleMeasurementColumnContext::init( + DeviceQueryTask* device_query_task, + const ITimeseriesIndex* time_series_index, Filter* time_filter, + common::PageArena& pa) { + int ret = common::E_OK; + column_name_ = time_series_index->get_measurement_name().to_std_string(); + if (RET_FAIL(tsfile_io_reader_->alloc_ssi( + device_query_task->get_device_id()->get_device_name(), + time_series_index->get_measurement_name().to_std_string(), ssi_, pa, + time_filter))) { + } else if (RET_FAIL(get_next_tsblock(true))) { + } + return ret; +} + +int SingleMeasurementColumnContext::get_next_tsblock(bool alloc_mem) { + int ret = common::E_OK; + if (tsblock_ != nullptr) { + if (time_iter_) { + delete time_iter_; + time_iter_ = nullptr; + } + if (value_iter_) { + delete value_iter_; + value_iter_ = nullptr; + } + tsblock_->reset(); + } + if (RET_FAIL(ssi_->get_next(tsblock_, alloc_mem))) { + if (time_iter_) { + delete time_iter_; + time_iter_ = nullptr; + } + if (value_iter_) { + delete value_iter_; + value_iter_ = nullptr; + } + if (tsblock_) { + ssi_->destroy(); + tsblock_ = nullptr; + } + } else { + std::cout << "debug: \n"; + std::cout << tsblock_->debug_string() << std::endl; Review Comment: Add debug macro or remove ########## cpp/src/common/schema.h: ########## @@ -20,319 +20,342 @@ #ifndef COMMON_SCHEMA_H #define COMMON_SCHEMA_H +#include <algorithm> #include <map> // use unordered_map instead #include <memory> #include <string> +#include <unordered_map> #include "common/db_common.h" #include "writer/time_chunk_writer.h" #include "writer/value_chunk_writer.h" namespace storage { - class ChunkWriter; -} +class ChunkWriter; +class ValueChunkWriter; +class TimeChunkWriter; +} // namespace storage namespace storage { - /* schema information for one measurement */ - struct MeasurementSchema { - std::string measurement_name_; // for example: "s1" - common::TSDataType data_type_; - common::TSEncoding encoding_; - common::CompressionType compression_type_; - storage::ChunkWriter *chunk_writer_; - ValueChunkWriter *value_chunk_writer_; - std::map<std::string, std::string> props_; - - MeasurementSchema() - : measurement_name_(), - data_type_(common::INVALID_DATATYPE), - encoding_(common::INVALID_ENCODING), - compression_type_(common::INVALID_COMPRESSION), - chunk_writer_(nullptr), - value_chunk_writer_(nullptr) { - } - - MeasurementSchema(const std::string &measurement_name, - common::TSDataType data_type) - : measurement_name_(measurement_name), - data_type_(data_type), - encoding_(get_default_encoding_for_type(data_type)), - compression_type_(common::UNCOMPRESSED), - chunk_writer_(nullptr), - value_chunk_writer_(nullptr) { - } - - MeasurementSchema(const std::string &measurement_name, - common::TSDataType data_type, common::TSEncoding encoding, - common::CompressionType compression_type) - : measurement_name_(measurement_name), - data_type_(data_type), - encoding_(encoding), - compression_type_(compression_type), - chunk_writer_(nullptr), - value_chunk_writer_(nullptr) { - } - - int serialize_to(common::ByteStream &out) { - int ret = common::E_OK; - if (RET_FAIL( +/* schema information for one measurement */ +struct MeasurementSchema { + std::string measurement_name_; // for example: "s1" + common::TSDataType data_type_; + common::TSEncoding encoding_; + common::CompressionType compression_type_; + storage::ChunkWriter *chunk_writer_; + ValueChunkWriter *value_chunk_writer_; + std::map<std::string, std::string> props_; + + MeasurementSchema() + : measurement_name_(), + data_type_(common::INVALID_DATATYPE), + encoding_(common::INVALID_ENCODING), + compression_type_(common::INVALID_COMPRESSION), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + + MeasurementSchema(const std::string &measurement_name, + common::TSDataType data_type) + : measurement_name_(measurement_name), + data_type_(data_type), + encoding_(get_default_encoding_for_type(data_type)), + compression_type_(common::UNCOMPRESSED), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + + MeasurementSchema(const std::string &measurement_name, + common::TSDataType data_type, common::TSEncoding encoding, + common::CompressionType compression_type) + : measurement_name_(measurement_name), + data_type_(data_type), + encoding_(encoding), + compression_type_(compression_type), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + + int serialize_to(common::ByteStream &out) { + int ret = common::E_OK; + if (RET_FAIL( common::SerializationUtil::write_str(measurement_name_, out))) { - } else if (RET_FAIL( - common::SerializationUtil::write_ui8(data_type_, out))) { - } else if (RET_FAIL( - common::SerializationUtil::write_ui8(encoding_, out))) { - } else if (RET_FAIL(common::SerializationUtil::write_ui8( - compression_type_, out))) { - } - if (ret == common::E_OK) { - if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(), - out))) { - for (const auto &prop: props_) { - if (RET_FAIL(common::SerializationUtil::write_str( + } else if (RET_FAIL( + common::SerializationUtil::write_ui8(data_type_, out))) { + } else if (RET_FAIL( + common::SerializationUtil::write_ui8(encoding_, out))) { + } else if (RET_FAIL(common::SerializationUtil::write_ui8( + compression_type_, out))) { + } + if (ret == common::E_OK) { + if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(), + out))) { + for (const auto &prop : props_) { + if (RET_FAIL(common::SerializationUtil::write_str( prop.first, out))) { - } else if (RET_FAIL(common::SerializationUtil::write_str( - prop.second, out))) { - } - if (IS_FAIL(ret)) break; + } else if (RET_FAIL(common::SerializationUtil::write_str( + prop.second, out))) { } + if (IS_FAIL(ret)) break; } } - return ret; } + return ret; + } - int deserialize_from(common::ByteStream &in) { - int ret = common::E_OK; - uint8_t data_type = common::TSDataType::INVALID_DATATYPE, + int deserialize_from(common::ByteStream &in) { + int ret = common::E_OK; + uint8_t data_type = common::TSDataType::INVALID_DATATYPE, encoding = common::TSEncoding::INVALID_ENCODING, compression_type = common::CompressionType::INVALID_COMPRESSION; - if (RET_FAIL( + if (RET_FAIL( common::SerializationUtil::read_str(measurement_name_, in))) { - } else if (RET_FAIL( - common::SerializationUtil::read_ui8(data_type, in))) { - } else if (RET_FAIL( - common::SerializationUtil::read_ui8(encoding, in))) { - } else if (RET_FAIL(common::SerializationUtil::read_ui8( - compression_type, in))) { - } - data_type_ = static_cast<common::TSDataType>(data_type); - encoding_ = static_cast<common::TSEncoding>(encoding); - compression_type_ = static_cast<common::CompressionType>(compression_type); - uint32_t props_size; - if (ret == common::E_OK) { - if (RET_FAIL(common::SerializationUtil::read_ui32(props_size, - in))) { - for (uint32_t i = 0; i < props_.size(); ++i) { - std::string key, value; - if (RET_FAIL(common::SerializationUtil::read_str( - key, in))) { - } else if (RET_FAIL(common::SerializationUtil::read_str( - value, in))) { - } - props_.insert(std::make_pair(key, value)); - if (IS_FAIL(ret)) break; + } else if (RET_FAIL( + common::SerializationUtil::read_ui8(data_type, in))) { + } else if (RET_FAIL( + common::SerializationUtil::read_ui8(encoding, in))) { + } else if (RET_FAIL(common::SerializationUtil::read_ui8( + compression_type, in))) { + } + data_type_ = static_cast<common::TSDataType>(data_type); + encoding_ = static_cast<common::TSEncoding>(encoding); + compression_type_ = + static_cast<common::CompressionType>(compression_type); + uint32_t props_size; + if (ret == common::E_OK) { + if (RET_FAIL( + common::SerializationUtil::read_ui32(props_size, in))) { + for (uint32_t i = 0; i < props_.size(); ++i) { + std::string key, value; + if (RET_FAIL( + common::SerializationUtil::read_str(key, in))) { + } else if (RET_FAIL(common::SerializationUtil::read_str( + value, in))) { } + props_.insert(std::make_pair(key, value)); + if (IS_FAIL(ret)) break; } } - return ret; } - }; + return ret; + } +}; - typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap; - typedef std::map<std::string, MeasurementSchema *>::iterator +typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap; +typedef std::map<std::string, MeasurementSchema *>::iterator MeasurementSchemaMapIter; - typedef std::pair<MeasurementSchemaMapIter, bool> +typedef std::pair<MeasurementSchemaMapIter, bool> MeasurementSchemaMapInsertResult; - /* schema information for a device */ - struct MeasurementSchemaGroup { - // measurement_name -> MeasurementSchema - MeasurementSchemaMap measurement_schema_map_; - bool is_aligned_ = false; - TimeChunkWriter *time_chunk_writer_ = nullptr; - }; - - enum class ColumnCategory { TAG = 0, FIELD = 1 }; - - class TableSchema { - public: - static void to_lowercase_inplace(std::string &str) { - std::transform(str.begin(), str.end(), str.begin(), - [](unsigned char c) -> unsigned char { return std::tolower(c); }); - } - - TableSchema() = default; - - TableSchema(const std::string &table_name, - const std::vector<MeasurementSchema*> - &column_schemas, - const std::vector<ColumnCategory> &column_categories) - : table_name_(table_name), - column_categories_(column_categories) { - to_lowercase_inplace(table_name_); - for (const auto column_schema : column_schemas) { - if (column_schema != nullptr) { - column_schemas_.emplace_back(std::shared_ptr<MeasurementSchema>(column_schema)); - } - } - int idx = 0; - for (const auto &measurement_schema: column_schemas_) { - to_lowercase_inplace(measurement_schema->measurement_name_); - column_pos_index_.insert( - std::make_pair(measurement_schema->measurement_name_, idx++)); +/* schema information for a device */ +struct MeasurementSchemaGroup { + // measurement_name -> MeasurementSchema + MeasurementSchemaMap measurement_schema_map_; + bool is_aligned_ = false; + TimeChunkWriter *time_chunk_writer_ = nullptr; +}; + +enum class ColumnCategory { TAG = 0, FIELD = 1 }; + +class TableSchema { + public: + static void to_lowercase_inplace(std::string &str) { + std::transform( + str.begin(), str.end(), str.begin(), + [](unsigned char c) -> unsigned char { return std::tolower(c); }); + } + + TableSchema() = default; + + TableSchema(const std::string &table_name, + const std::vector<MeasurementSchema *> &column_schemas, + const std::vector<ColumnCategory> &column_categories) + : table_name_(table_name), column_categories_(column_categories) { + to_lowercase_inplace(table_name_); + for (const auto column_schema : column_schemas) { + if (column_schema != nullptr) { + column_schemas_.emplace_back( + std::shared_ptr<MeasurementSchema>(column_schema)); } } - - TableSchema(TableSchema &&other) noexcept - : table_name_(std::move(other.table_name_)), - column_schemas_(std::move(other.column_schemas_)), - column_categories_(std::move(other.column_categories_)) { + int idx = 0; + for (const auto &measurement_schema : column_schemas_) { + to_lowercase_inplace(measurement_schema->measurement_name_); + column_pos_index_.insert( + std::make_pair(measurement_schema->measurement_name_, idx++)); } + } - TableSchema(const TableSchema &other) = default; + TableSchema(TableSchema &&other) noexcept + : table_name_(std::move(other.table_name_)), + column_schemas_(std::move(other.column_schemas_)), + column_categories_(std::move(other.column_categories_)) {} - int serialize_to(common::ByteStream &out) { - int ret = common::E_OK; - if (RET_FAIL(common::SerializationUtil::write_var_uint( + int serialize_to(common::ByteStream &out) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint( column_schemas_.size(), out))) { - } else { - for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size(); - i++) { - auto column_schema = column_schemas_[i]; - auto column_category = column_categories_[i]; - if (RET_FAIL(column_schema->serialize_to(out))) { - } else if (RET_FAIL(common::SerializationUtil::write_i8( - static_cast<int8_t>(column_category), out))) { - } + } else { + for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size(); + i++) { + auto column_schema = column_schemas_[i]; + auto column_category = column_categories_[i]; + if (RET_FAIL(column_schema->serialize_to(out))) { + } else if (RET_FAIL(common::SerializationUtil::write_i8( + static_cast<int8_t>(column_category), out))) { } } - return ret; } - - int deserialize(common::ByteStream &in) { - int ret = common::E_OK; - uint32_t num_columns; - if (RET_FAIL(common::SerializationUtil::read_var_uint( - num_columns, in))) { - } else { - for (size_t i = 0; IS_SUCC(ret) && i < num_columns; - i++) { - auto column_schema = std::make_shared<MeasurementSchema>(); - int8_t column_category = 0; - if (RET_FAIL(column_schema->deserialize_from(in))) { - } else if (RET_FAIL(common::SerializationUtil::read_i8( - column_category, in))) { - } - column_schemas_.emplace_back(column_schema); - column_categories_.emplace_back(static_cast<ColumnCategory>(column_category)); + return ret; + } + + int deserialize(common::ByteStream &in) { + int ret = common::E_OK; + uint32_t num_columns; + if (RET_FAIL( + common::SerializationUtil::read_var_uint(num_columns, in))) { + } else { + for (size_t i = 0; IS_SUCC(ret) && i < num_columns; i++) { + auto column_schema = std::make_shared<MeasurementSchema>(); + int8_t column_category = 0; + if (RET_FAIL(column_schema->deserialize_from(in))) { + } else if (RET_FAIL(common::SerializationUtil::read_i8( + column_category, in))) { } + column_schemas_.emplace_back(column_schema); + column_categories_.emplace_back( + static_cast<ColumnCategory>(column_category)); } - return ret; } + return ret; + } - ~TableSchema() { - column_schemas_.clear(); - } + ~TableSchema() { column_schemas_.clear(); } - const std::string &get_table_name() { return table_name_; } + const std::string &get_table_name() { return table_name_; } - std::vector<std::string> get_measurement_names() const { - std::vector<std::string> ret(column_schemas_.size()); - for (size_t i = 0; i < column_schemas_.size(); i++) { - ret[i] = column_schemas_[i]->measurement_name_; - } - return ret; + std::vector<std::string> get_measurement_names() const { + std::vector<std::string> ret(column_schemas_.size()); + for (size_t i = 0; i < column_schemas_.size(); i++) { + ret[i] = column_schemas_[i]->measurement_name_; } - - int find_column_index(const std::string &column_name) { - std::string lower_case_column_name = to_lower(column_name); - auto it = column_pos_index_.find(lower_case_column_name); - if (it != column_pos_index_.end()) { - return it->second; - } else { - int index = -1; - for (size_t i = 0; i < column_schemas_.size(); ++i) { - if (to_lower(column_schemas_[i]->measurement_name_) == - lower_case_column_name) { - index = static_cast<int>(i); - break; - } + return ret; + } + + int find_column_index(const std::string &column_name) { + std::string lower_case_column_name = to_lower(column_name); + auto it = column_pos_index_.find(lower_case_column_name); + if (it != column_pos_index_.end()) { + return it->second; + } else { + int index = -1; + for (size_t i = 0; i < column_schemas_.size(); ++i) { + if (to_lower(column_schemas_[i]->measurement_name_) == + lower_case_column_name) { + index = static_cast<int>(i); + break; } - column_pos_index_[lower_case_column_name] = index; - return index; } + column_pos_index_[lower_case_column_name] = index; + return index; } - - void update(ChunkGroupMeta *chunk_group_meta) { - for (auto iter = chunk_group_meta->chunk_meta_list_.begin(); - iter != chunk_group_meta->chunk_meta_list_.end(); iter++) { - auto &chunk_meta = iter.get(); - int column_idx = - find_column_index(chunk_meta->measurement_name_.to_std_string()); - if (column_idx == -1) { - auto measurement_schema = std::make_shared<MeasurementSchema>( - chunk_meta->measurement_name_.to_std_string(), - chunk_meta->data_type_, chunk_meta->encoding_, - chunk_meta->compression_type_); - column_schemas_.emplace_back(measurement_schema); - column_categories_.emplace_back(ColumnCategory::FIELD); - column_pos_index_.insert( - std::make_pair(chunk_meta->measurement_name_.to_std_string(), - column_schemas_.size() - 1)); - } else { - auto origin_measurement_schema = column_schemas_.at(column_idx); - if (origin_measurement_schema->data_type_ != - chunk_meta->data_type_) { - origin_measurement_schema->data_type_ = - common::TSDataType::STRING; - } + } + + void update(ChunkGroupMeta *chunk_group_meta) { + for (auto iter = chunk_group_meta->chunk_meta_list_.begin(); + iter != chunk_group_meta->chunk_meta_list_.end(); iter++) { + auto &chunk_meta = iter.get(); + int column_idx = find_column_index( + chunk_meta->measurement_name_.to_std_string()); + if (column_idx == -1) { + auto measurement_schema = std::make_shared<MeasurementSchema>( + chunk_meta->measurement_name_.to_std_string(), + chunk_meta->data_type_, chunk_meta->encoding_, + chunk_meta->compression_type_); + column_schemas_.emplace_back(measurement_schema); + column_categories_.emplace_back(ColumnCategory::FIELD); + column_pos_index_.insert(std::make_pair( + chunk_meta->measurement_name_.to_std_string(), + column_schemas_.size() - 1)); + } else { + auto origin_measurement_schema = column_schemas_.at(column_idx); + if (origin_measurement_schema->data_type_ != + chunk_meta->data_type_) { + origin_measurement_schema->data_type_ = + common::TSDataType::STRING; } } } + } - std::vector<common::TSDataType> get_data_types() const { - std::vector<common::TSDataType> ret; - for (const auto &measurement_schema: column_schemas_) { - ret.emplace_back(measurement_schema->data_type_); - } - return ret; + std::vector<common::TSDataType> get_data_types() const { + std::vector<common::TSDataType> ret; + for (const auto &measurement_schema : column_schemas_) { + ret.emplace_back(measurement_schema->data_type_); } + return ret; + } - std::vector<ColumnCategory> get_column_categories() const { - return column_categories_; - } + std::vector<ColumnCategory> get_column_categories() const { + return column_categories_; + } - std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas() + std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas() const { - return column_schemas_; - } - - private: - static std::string to_lower(const std::string &str) { - std::string result; - std::transform(str.begin(), str.end(), std::back_inserter(result), - [](unsigned char c) -> unsigned char { return std::tolower(c); }); - return result; + return column_schemas_; + } + + common::ColumnDesc get_column_desc(const std::string &column_name) { + int column_idx = find_column_index(column_name); + return common::ColumnDesc( + column_schemas_[column_idx]->data_type_, + column_schemas_[column_idx]->encoding_, + column_schemas_[column_idx]->compression_type_, INVALID_TTL, + column_name, common::TsID()); + } Review Comment: ColumnDesc is no longer feasible for the current TsFile. There is no TTL for TsFile and TsID is deprecated. Should use ColumnSchema instead, referring to https://timechor.feishu.cn/docx/WszvdVt1polnbIxRI23c1i2Xnxe ########## cpp/src/reader/block/single_device_tsblock_reader.cc: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "single_device_tsblock_reader.h" + +namespace storage { + +SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( + DeviceQueryTask* device_query_task, uint32_t block_size, + IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader, + Filter* time_filter, Filter* field_filter) + : device_query_task_(device_query_task), + field_filter_(field_filter), + block_size_(block_size), + tuple_desc_(), + tsfile_io_reader_(tsfile_io_reader) { + pa_.init(512, common::AllocModID::MOD_TSFILE_READER); + tuple_desc_.reset(); + common::init_common(); + tuple_desc_.push_back(common::g_time_column_desc); + auto table_schema = device_query_task->get_table_schema(); + for (const auto& column_name : device_query_task_->get_column_names()) { + common::ColumnDesc column_desc( + table_schema->get_column_desc(column_name)); + tuple_desc_.push_back(column_desc); + } + current_block_ = common::TsBlock::create_tsblock(&tuple_desc_, block_size); + col_appenders_.resize(tuple_desc_.get_column_count()); + for (int i = 0; i < tuple_desc_.get_column_count(); i++) { + col_appenders_[i] = new common::ColAppender(i, current_block_); + } + row_appender_ = new common::RowAppender(current_block_); + std::vector<ITimeseriesIndex*> time_series_indexs( + device_query_task_->get_column_names().size()); + tsfile_io_reader_->get_timeseries_indexes( + device_query_task->get_device_id(), + device_query_task->get_column_names(), time_series_indexs, pa_); + for (const auto& time_series_index : time_series_indexs) { + construct_column_context(time_series_index, time_filter); + } + + for (const auto& id_column : + device_query_task->get_column_mapping()->get_id_columns()) { + const auto& column_pos_in_result = + device_query_task->get_column_mapping()->get_column_pos(id_column); + int column_pos_in_id = + table_schema->find_id_column_order(id_column) + 1; + id_column_contexts_.insert(std::make_pair( + id_column, + IdColumnContext(column_pos_in_result, column_pos_in_id))); + } +} + +bool SingleDeviceTsBlockReader::has_next() { + if (!last_block_returned_) { + return true; + } + + if (field_column_contexts_.empty()) { + return false; + } + current_block_->reset(); + + next_time_ = -1; + + std::vector<MeasurementColumnContext*> min_time_columns; + while (current_block_->get_row_count() < block_size_) { + for (auto& column_context : field_column_contexts_) { + int64_t time; + if (IS_FAIL(column_context.second->get_current_time(time))) { + continue; + } + if (next_time_ == -1 || time < next_time_) { + next_time_ = time; + min_time_columns.clear(); + min_time_columns.push_back(column_context.second); + } else if (time == next_time_) { + min_time_columns.push_back(column_context.second); + } + } + + if (IS_FAIL(fill_measurements(min_time_columns))) { + return false; + } else { + next_time_ = -1; + } + + if (field_column_contexts_.empty()) { + break; + } + } + if (current_block_->get_row_count() > 0) { + fill_ids(); + current_block_->fill_trailling_nulls(); + last_block_returned_ = false; + return true; + } + return false; +} + +int SingleDeviceTsBlockReader::fill_measurements( + std::vector<MeasurementColumnContext*>& column_contexts) { + int ret = common::E_OK; + if (field_filter_ == + nullptr /*TODO: || field_filter_->satisfy(column_contexts)*/) { + if (!col_appenders_[0]->add_row()) { + assert(false); + } + // std::cout << col_appenders_[0]->tsblock_->debug_string() << std::endl; + col_appenders_[0]->append((char*)&next_time_, sizeof(next_time_)); + for (uint32_t i = 0; i < column_contexts.size(); i++) { + column_contexts[i]->fill_into(col_appenders_); + advance_column(column_contexts[i]); + } + // for (auto& column_contest : column_contexts) { + // column_contest->fill_into(col_appenders_); + // advance_column(column_contest); + // } + row_appender_->add_row(); + } + return ret; +} + +void SingleDeviceTsBlockReader::advance_column( + MeasurementColumnContext* column_context) { + if (column_context->move_iter() == common::E_NO_MORE_DATA) { + column_context->remove_from(field_column_contexts_); + } +} + +void SingleMeasurementColumnContext::remove_from( + std::map<std::string, MeasurementColumnContext*>& column_context_map) { + auto iter = column_context_map.find(column_name_); + if (iter != column_context_map.end()) { + delete iter->second; + column_context_map.erase(iter); + } +} + +void SingleDeviceTsBlockReader::fill_ids() { + for (const auto& entry : id_column_contexts_) { + const auto& id_column_context = entry.second; + for (int32_t pos : id_column_context.pos_in_result_) { + common::String device_id( + device_query_task_->get_device_id()->get_segments().at( + id_column_context.pos_in_device_id_)); + col_appenders_[pos]->fill((char*)&device_id, sizeof(device_id), + current_block_->get_row_count()); + } + } +} + +int SingleDeviceTsBlockReader::next(common::TsBlock*& ret_block) { + if (!has_next()) { + return common::E_NO_MORE_DATA; + } + last_block_returned_ = true; + ret_block = current_block_; + return common::E_OK; +} + +void SingleDeviceTsBlockReader::close() { + for (auto& column_context : field_column_contexts_) { + delete column_context.second; + } + if (current_block_) { + delete current_block_; + current_block_ = nullptr; + } + for (auto& col_appender : col_appenders_) { + if (col_appender) { + delete col_appender; + col_appender = nullptr; + } + } + if (row_appender_) { + delete row_appender_; + row_appender_ = nullptr; + } +} + +void SingleDeviceTsBlockReader::construct_column_context( + const ITimeseriesIndex* time_series_index, Filter* time_filter) { + // TODO: judge whether the time_series_index is aligned and jump empty chunk + SingleMeasurementColumnContext* column_context = + new SingleMeasurementColumnContext(tsfile_io_reader_); + column_context->init(device_query_task_, time_series_index, time_filter, + pa_); + field_column_contexts_.insert(std::make_pair( + time_series_index->get_measurement_name().to_std_string(), + column_context)); +} + +int SingleMeasurementColumnContext::init( + DeviceQueryTask* device_query_task, + const ITimeseriesIndex* time_series_index, Filter* time_filter, + common::PageArena& pa) { + int ret = common::E_OK; + column_name_ = time_series_index->get_measurement_name().to_std_string(); + if (RET_FAIL(tsfile_io_reader_->alloc_ssi( + device_query_task->get_device_id()->get_device_name(), + time_series_index->get_measurement_name().to_std_string(), ssi_, pa, + time_filter))) { + } else if (RET_FAIL(get_next_tsblock(true))) { + } + return ret; +} + +int SingleMeasurementColumnContext::get_next_tsblock(bool alloc_mem) { + int ret = common::E_OK; + if (tsblock_ != nullptr) { + if (time_iter_) { + delete time_iter_; + time_iter_ = nullptr; + } + if (value_iter_) { + delete value_iter_; + value_iter_ = nullptr; + } + tsblock_->reset(); + } + if (RET_FAIL(ssi_->get_next(tsblock_, alloc_mem))) { + if (time_iter_) { + delete time_iter_; + time_iter_ = nullptr; + } + if (value_iter_) { + delete value_iter_; + value_iter_ = nullptr; + } + if (tsblock_) { + ssi_->destroy(); + tsblock_ = nullptr; + } + } else { + std::cout << "debug: \n"; + std::cout << tsblock_->debug_string() << std::endl; + time_iter_ = new common::ColIterator(0, tsblock_); + value_iter_ = new common::ColIterator(1, tsblock_); + } + return ret; +} + +int SingleMeasurementColumnContext::get_current_time(int64_t& time) { + if (time_iter_->end()) { + return common::E_NO_MORE_DATA; + } + uint32_t len = 0; + time = *(int64_t*)(time_iter_->read(&len)); + return common::E_OK; +} + +int SingleMeasurementColumnContext::get_current_value(char* value) { + if (value_iter_->end()) { + return common::E_NO_MORE_DATA; + } + uint32_t len = 0; + value = value_iter_->read(&len); + return common::E_OK; +} Review Comment: How can the caller know the size of value? And should value be `char* &value`? ########## cpp/src/reader/table_result_set.cc: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "reader/table_result_set.h" + +namespace storage { +void TableResultSet::init() { + row_record_ = new RowRecord(column_names_.size()); + pa_.reset(); + pa_.init(512, common::MOD_TSFILE_READER); + index_lookup_.reserve(column_names_.size()); + for (uint32_t i = 0; i < column_names_.size(); ++i) { + index_lookup_.insert({column_names_[i], i}); + } +} + +TableResultSet::~TableResultSet() { + close(); +} + +bool TableResultSet::next() { + while((row_iterator_ == nullptr || !row_iterator_->has_next()) && tsblock_reader_->has_next()) { + if (tsblock_reader_->next(tsblock_) != common::E_OK) { + break; + } + if (row_iterator_) { + delete row_iterator_; + row_iterator_ = nullptr; + } + row_iterator_ = new common::RowIterator(tsblock_); + } + if (row_iterator_ == nullptr || !row_iterator_->has_next()) { + return false; + } + row_iterator_->next(); + uint32_t len = 0; + bool null = false; + for (uint32_t i = 0; i < row_iterator_->get_column_count(); ++i) { + std::cout << "TableResultSet::next" << std::endl; + std::cout << "data_types_[i]:" << static_cast<int>(row_iterator_->get_data_type(i)) << std::endl; Review Comment: Add debug macro ########## cpp/src/reader/block/single_device_tsblock_reader.cc: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "single_device_tsblock_reader.h" + +namespace storage { + +SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( + DeviceQueryTask* device_query_task, uint32_t block_size, + IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader, + Filter* time_filter, Filter* field_filter) + : device_query_task_(device_query_task), + field_filter_(field_filter), + block_size_(block_size), + tuple_desc_(), + tsfile_io_reader_(tsfile_io_reader) { + pa_.init(512, common::AllocModID::MOD_TSFILE_READER); + tuple_desc_.reset(); + common::init_common(); + tuple_desc_.push_back(common::g_time_column_desc); + auto table_schema = device_query_task->get_table_schema(); + for (const auto& column_name : device_query_task_->get_column_names()) { + common::ColumnDesc column_desc( + table_schema->get_column_desc(column_name)); + tuple_desc_.push_back(column_desc); + } + current_block_ = common::TsBlock::create_tsblock(&tuple_desc_, block_size); + col_appenders_.resize(tuple_desc_.get_column_count()); + for (int i = 0; i < tuple_desc_.get_column_count(); i++) { + col_appenders_[i] = new common::ColAppender(i, current_block_); + } + row_appender_ = new common::RowAppender(current_block_); + std::vector<ITimeseriesIndex*> time_series_indexs( + device_query_task_->get_column_names().size()); + tsfile_io_reader_->get_timeseries_indexes( + device_query_task->get_device_id(), + device_query_task->get_column_names(), time_series_indexs, pa_); + for (const auto& time_series_index : time_series_indexs) { + construct_column_context(time_series_index, time_filter); + } + + for (const auto& id_column : + device_query_task->get_column_mapping()->get_id_columns()) { + const auto& column_pos_in_result = + device_query_task->get_column_mapping()->get_column_pos(id_column); + int column_pos_in_id = + table_schema->find_id_column_order(id_column) + 1; + id_column_contexts_.insert(std::make_pair( + id_column, + IdColumnContext(column_pos_in_result, column_pos_in_id))); + } +} + +bool SingleDeviceTsBlockReader::has_next() { + if (!last_block_returned_) { + return true; + } + + if (field_column_contexts_.empty()) { + return false; + } + current_block_->reset(); + + next_time_ = -1; + + std::vector<MeasurementColumnContext*> min_time_columns; + while (current_block_->get_row_count() < block_size_) { + for (auto& column_context : field_column_contexts_) { + int64_t time; + if (IS_FAIL(column_context.second->get_current_time(time))) { + continue; + } + if (next_time_ == -1 || time < next_time_) { + next_time_ = time; + min_time_columns.clear(); + min_time_columns.push_back(column_context.second); + } else if (time == next_time_) { + min_time_columns.push_back(column_context.second); + } + } + + if (IS_FAIL(fill_measurements(min_time_columns))) { + return false; + } else { + next_time_ = -1; + } + + if (field_column_contexts_.empty()) { + break; + } + } + if (current_block_->get_row_count() > 0) { + fill_ids(); + current_block_->fill_trailling_nulls(); + last_block_returned_ = false; + return true; + } + return false; +} + +int SingleDeviceTsBlockReader::fill_measurements( + std::vector<MeasurementColumnContext*>& column_contexts) { + int ret = common::E_OK; + if (field_filter_ == + nullptr /*TODO: || field_filter_->satisfy(column_contexts)*/) { + if (!col_appenders_[0]->add_row()) { + assert(false); + } + // std::cout << col_appenders_[0]->tsblock_->debug_string() << std::endl; + col_appenders_[0]->append((char*)&next_time_, sizeof(next_time_)); + for (uint32_t i = 0; i < column_contexts.size(); i++) { + column_contexts[i]->fill_into(col_appenders_); + advance_column(column_contexts[i]); + } + // for (auto& column_contest : column_contexts) { + // column_contest->fill_into(col_appenders_); + // advance_column(column_contest); + // } + row_appender_->add_row(); + } + return ret; +} + +void SingleDeviceTsBlockReader::advance_column( + MeasurementColumnContext* column_context) { + if (column_context->move_iter() == common::E_NO_MORE_DATA) { + column_context->remove_from(field_column_contexts_); + } +} + +void SingleMeasurementColumnContext::remove_from( + std::map<std::string, MeasurementColumnContext*>& column_context_map) { + auto iter = column_context_map.find(column_name_); + if (iter != column_context_map.end()) { + delete iter->second; + column_context_map.erase(iter); + } +} + +void SingleDeviceTsBlockReader::fill_ids() { + for (const auto& entry : id_column_contexts_) { + const auto& id_column_context = entry.second; + for (int32_t pos : id_column_context.pos_in_result_) { + common::String device_id( + device_query_task_->get_device_id()->get_segments().at( + id_column_context.pos_in_device_id_)); + col_appenders_[pos]->fill((char*)&device_id, sizeof(device_id), + current_block_->get_row_count()); + } + } +} + +int SingleDeviceTsBlockReader::next(common::TsBlock*& ret_block) { + if (!has_next()) { + return common::E_NO_MORE_DATA; + } + last_block_returned_ = true; + ret_block = current_block_; + return common::E_OK; +} + +void SingleDeviceTsBlockReader::close() { + for (auto& column_context : field_column_contexts_) { + delete column_context.second; + } + if (current_block_) { + delete current_block_; + current_block_ = nullptr; + } + for (auto& col_appender : col_appenders_) { + if (col_appender) { + delete col_appender; + col_appender = nullptr; + } + } + if (row_appender_) { + delete row_appender_; + row_appender_ = nullptr; + } +} + +void SingleDeviceTsBlockReader::construct_column_context( + const ITimeseriesIndex* time_series_index, Filter* time_filter) { + // TODO: judge whether the time_series_index is aligned and jump empty chunk + SingleMeasurementColumnContext* column_context = + new SingleMeasurementColumnContext(tsfile_io_reader_); + column_context->init(device_query_task_, time_series_index, time_filter, + pa_); + field_column_contexts_.insert(std::make_pair( + time_series_index->get_measurement_name().to_std_string(), + column_context)); Review Comment: This todo is necessary ########## cpp/src/reader/block/single_device_tsblock_reader.cc: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "single_device_tsblock_reader.h" + +namespace storage { + +SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( + DeviceQueryTask* device_query_task, uint32_t block_size, + IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader, + Filter* time_filter, Filter* field_filter) + : device_query_task_(device_query_task), + field_filter_(field_filter), + block_size_(block_size), + tuple_desc_(), + tsfile_io_reader_(tsfile_io_reader) { + pa_.init(512, common::AllocModID::MOD_TSFILE_READER); + tuple_desc_.reset(); + common::init_common(); + tuple_desc_.push_back(common::g_time_column_desc); + auto table_schema = device_query_task->get_table_schema(); + for (const auto& column_name : device_query_task_->get_column_names()) { + common::ColumnDesc column_desc( + table_schema->get_column_desc(column_name)); + tuple_desc_.push_back(column_desc); + } + current_block_ = common::TsBlock::create_tsblock(&tuple_desc_, block_size); + col_appenders_.resize(tuple_desc_.get_column_count()); + for (int i = 0; i < tuple_desc_.get_column_count(); i++) { + col_appenders_[i] = new common::ColAppender(i, current_block_); + } + row_appender_ = new common::RowAppender(current_block_); + std::vector<ITimeseriesIndex*> time_series_indexs( + device_query_task_->get_column_names().size()); + tsfile_io_reader_->get_timeseries_indexes( + device_query_task->get_device_id(), + device_query_task->get_column_names(), time_series_indexs, pa_); + for (const auto& time_series_index : time_series_indexs) { + construct_column_context(time_series_index, time_filter); + } + + for (const auto& id_column : + device_query_task->get_column_mapping()->get_id_columns()) { + const auto& column_pos_in_result = + device_query_task->get_column_mapping()->get_column_pos(id_column); + int column_pos_in_id = + table_schema->find_id_column_order(id_column) + 1; + id_column_contexts_.insert(std::make_pair( + id_column, + IdColumnContext(column_pos_in_result, column_pos_in_id))); + } +} + +bool SingleDeviceTsBlockReader::has_next() { + if (!last_block_returned_) { + return true; + } + + if (field_column_contexts_.empty()) { + return false; + } + current_block_->reset(); + + next_time_ = -1; Review Comment: Notice that timestamps can be minus and Int64.MIN_VALUE is also a valid timestamp. Therefore, you'd better use a flag to indicate if the first timestamp is set. ########## cpp/src/common/schema.h: ########## @@ -20,319 +20,342 @@ #ifndef COMMON_SCHEMA_H #define COMMON_SCHEMA_H +#include <algorithm> #include <map> // use unordered_map instead #include <memory> #include <string> +#include <unordered_map> #include "common/db_common.h" #include "writer/time_chunk_writer.h" #include "writer/value_chunk_writer.h" namespace storage { - class ChunkWriter; -} +class ChunkWriter; +class ValueChunkWriter; +class TimeChunkWriter; +} // namespace storage namespace storage { - /* schema information for one measurement */ - struct MeasurementSchema { - std::string measurement_name_; // for example: "s1" - common::TSDataType data_type_; - common::TSEncoding encoding_; - common::CompressionType compression_type_; - storage::ChunkWriter *chunk_writer_; - ValueChunkWriter *value_chunk_writer_; - std::map<std::string, std::string> props_; - - MeasurementSchema() - : measurement_name_(), - data_type_(common::INVALID_DATATYPE), - encoding_(common::INVALID_ENCODING), - compression_type_(common::INVALID_COMPRESSION), - chunk_writer_(nullptr), - value_chunk_writer_(nullptr) { - } - - MeasurementSchema(const std::string &measurement_name, - common::TSDataType data_type) - : measurement_name_(measurement_name), - data_type_(data_type), - encoding_(get_default_encoding_for_type(data_type)), - compression_type_(common::UNCOMPRESSED), - chunk_writer_(nullptr), - value_chunk_writer_(nullptr) { - } - - MeasurementSchema(const std::string &measurement_name, - common::TSDataType data_type, common::TSEncoding encoding, - common::CompressionType compression_type) - : measurement_name_(measurement_name), - data_type_(data_type), - encoding_(encoding), - compression_type_(compression_type), - chunk_writer_(nullptr), - value_chunk_writer_(nullptr) { - } - - int serialize_to(common::ByteStream &out) { - int ret = common::E_OK; - if (RET_FAIL( +/* schema information for one measurement */ +struct MeasurementSchema { + std::string measurement_name_; // for example: "s1" + common::TSDataType data_type_; + common::TSEncoding encoding_; + common::CompressionType compression_type_; + storage::ChunkWriter *chunk_writer_; + ValueChunkWriter *value_chunk_writer_; + std::map<std::string, std::string> props_; + + MeasurementSchema() + : measurement_name_(), + data_type_(common::INVALID_DATATYPE), + encoding_(common::INVALID_ENCODING), + compression_type_(common::INVALID_COMPRESSION), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + + MeasurementSchema(const std::string &measurement_name, + common::TSDataType data_type) + : measurement_name_(measurement_name), + data_type_(data_type), + encoding_(get_default_encoding_for_type(data_type)), + compression_type_(common::UNCOMPRESSED), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + + MeasurementSchema(const std::string &measurement_name, + common::TSDataType data_type, common::TSEncoding encoding, + common::CompressionType compression_type) + : measurement_name_(measurement_name), + data_type_(data_type), + encoding_(encoding), + compression_type_(compression_type), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + + int serialize_to(common::ByteStream &out) { + int ret = common::E_OK; + if (RET_FAIL( common::SerializationUtil::write_str(measurement_name_, out))) { - } else if (RET_FAIL( - common::SerializationUtil::write_ui8(data_type_, out))) { - } else if (RET_FAIL( - common::SerializationUtil::write_ui8(encoding_, out))) { - } else if (RET_FAIL(common::SerializationUtil::write_ui8( - compression_type_, out))) { - } - if (ret == common::E_OK) { - if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(), - out))) { - for (const auto &prop: props_) { - if (RET_FAIL(common::SerializationUtil::write_str( + } else if (RET_FAIL( + common::SerializationUtil::write_ui8(data_type_, out))) { + } else if (RET_FAIL( + common::SerializationUtil::write_ui8(encoding_, out))) { + } else if (RET_FAIL(common::SerializationUtil::write_ui8( + compression_type_, out))) { + } + if (ret == common::E_OK) { + if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(), + out))) { + for (const auto &prop : props_) { + if (RET_FAIL(common::SerializationUtil::write_str( prop.first, out))) { - } else if (RET_FAIL(common::SerializationUtil::write_str( - prop.second, out))) { - } - if (IS_FAIL(ret)) break; + } else if (RET_FAIL(common::SerializationUtil::write_str( + prop.second, out))) { } + if (IS_FAIL(ret)) break; } } - return ret; } + return ret; + } - int deserialize_from(common::ByteStream &in) { - int ret = common::E_OK; - uint8_t data_type = common::TSDataType::INVALID_DATATYPE, + int deserialize_from(common::ByteStream &in) { + int ret = common::E_OK; + uint8_t data_type = common::TSDataType::INVALID_DATATYPE, encoding = common::TSEncoding::INVALID_ENCODING, compression_type = common::CompressionType::INVALID_COMPRESSION; - if (RET_FAIL( + if (RET_FAIL( common::SerializationUtil::read_str(measurement_name_, in))) { - } else if (RET_FAIL( - common::SerializationUtil::read_ui8(data_type, in))) { - } else if (RET_FAIL( - common::SerializationUtil::read_ui8(encoding, in))) { - } else if (RET_FAIL(common::SerializationUtil::read_ui8( - compression_type, in))) { - } - data_type_ = static_cast<common::TSDataType>(data_type); - encoding_ = static_cast<common::TSEncoding>(encoding); - compression_type_ = static_cast<common::CompressionType>(compression_type); - uint32_t props_size; - if (ret == common::E_OK) { - if (RET_FAIL(common::SerializationUtil::read_ui32(props_size, - in))) { - for (uint32_t i = 0; i < props_.size(); ++i) { - std::string key, value; - if (RET_FAIL(common::SerializationUtil::read_str( - key, in))) { - } else if (RET_FAIL(common::SerializationUtil::read_str( - value, in))) { - } - props_.insert(std::make_pair(key, value)); - if (IS_FAIL(ret)) break; + } else if (RET_FAIL( + common::SerializationUtil::read_ui8(data_type, in))) { + } else if (RET_FAIL( + common::SerializationUtil::read_ui8(encoding, in))) { + } else if (RET_FAIL(common::SerializationUtil::read_ui8( + compression_type, in))) { + } + data_type_ = static_cast<common::TSDataType>(data_type); + encoding_ = static_cast<common::TSEncoding>(encoding); + compression_type_ = + static_cast<common::CompressionType>(compression_type); + uint32_t props_size; + if (ret == common::E_OK) { + if (RET_FAIL( + common::SerializationUtil::read_ui32(props_size, in))) { + for (uint32_t i = 0; i < props_.size(); ++i) { + std::string key, value; + if (RET_FAIL( + common::SerializationUtil::read_str(key, in))) { + } else if (RET_FAIL(common::SerializationUtil::read_str( + value, in))) { } + props_.insert(std::make_pair(key, value)); + if (IS_FAIL(ret)) break; } } - return ret; } - }; + return ret; + } +}; - typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap; - typedef std::map<std::string, MeasurementSchema *>::iterator +typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap; +typedef std::map<std::string, MeasurementSchema *>::iterator MeasurementSchemaMapIter; - typedef std::pair<MeasurementSchemaMapIter, bool> +typedef std::pair<MeasurementSchemaMapIter, bool> MeasurementSchemaMapInsertResult; - /* schema information for a device */ - struct MeasurementSchemaGroup { - // measurement_name -> MeasurementSchema - MeasurementSchemaMap measurement_schema_map_; - bool is_aligned_ = false; - TimeChunkWriter *time_chunk_writer_ = nullptr; - }; - - enum class ColumnCategory { TAG = 0, FIELD = 1 }; - - class TableSchema { - public: - static void to_lowercase_inplace(std::string &str) { - std::transform(str.begin(), str.end(), str.begin(), - [](unsigned char c) -> unsigned char { return std::tolower(c); }); - } - - TableSchema() = default; - - TableSchema(const std::string &table_name, - const std::vector<MeasurementSchema*> - &column_schemas, - const std::vector<ColumnCategory> &column_categories) - : table_name_(table_name), - column_categories_(column_categories) { - to_lowercase_inplace(table_name_); - for (const auto column_schema : column_schemas) { - if (column_schema != nullptr) { - column_schemas_.emplace_back(std::shared_ptr<MeasurementSchema>(column_schema)); - } - } - int idx = 0; - for (const auto &measurement_schema: column_schemas_) { - to_lowercase_inplace(measurement_schema->measurement_name_); - column_pos_index_.insert( - std::make_pair(measurement_schema->measurement_name_, idx++)); +/* schema information for a device */ +struct MeasurementSchemaGroup { + // measurement_name -> MeasurementSchema + MeasurementSchemaMap measurement_schema_map_; + bool is_aligned_ = false; + TimeChunkWriter *time_chunk_writer_ = nullptr; +}; + +enum class ColumnCategory { TAG = 0, FIELD = 1 }; + +class TableSchema { + public: + static void to_lowercase_inplace(std::string &str) { + std::transform( + str.begin(), str.end(), str.begin(), + [](unsigned char c) -> unsigned char { return std::tolower(c); }); + } + + TableSchema() = default; + + TableSchema(const std::string &table_name, + const std::vector<MeasurementSchema *> &column_schemas, + const std::vector<ColumnCategory> &column_categories) + : table_name_(table_name), column_categories_(column_categories) { + to_lowercase_inplace(table_name_); + for (const auto column_schema : column_schemas) { + if (column_schema != nullptr) { + column_schemas_.emplace_back( + std::shared_ptr<MeasurementSchema>(column_schema)); } } - - TableSchema(TableSchema &&other) noexcept - : table_name_(std::move(other.table_name_)), - column_schemas_(std::move(other.column_schemas_)), - column_categories_(std::move(other.column_categories_)) { + int idx = 0; + for (const auto &measurement_schema : column_schemas_) { + to_lowercase_inplace(measurement_schema->measurement_name_); + column_pos_index_.insert( + std::make_pair(measurement_schema->measurement_name_, idx++)); } + } - TableSchema(const TableSchema &other) = default; + TableSchema(TableSchema &&other) noexcept + : table_name_(std::move(other.table_name_)), + column_schemas_(std::move(other.column_schemas_)), + column_categories_(std::move(other.column_categories_)) {} - int serialize_to(common::ByteStream &out) { - int ret = common::E_OK; - if (RET_FAIL(common::SerializationUtil::write_var_uint( + int serialize_to(common::ByteStream &out) { + int ret = common::E_OK; + if (RET_FAIL(common::SerializationUtil::write_var_uint( column_schemas_.size(), out))) { - } else { - for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size(); - i++) { - auto column_schema = column_schemas_[i]; - auto column_category = column_categories_[i]; - if (RET_FAIL(column_schema->serialize_to(out))) { - } else if (RET_FAIL(common::SerializationUtil::write_i8( - static_cast<int8_t>(column_category), out))) { - } + } else { + for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size(); + i++) { + auto column_schema = column_schemas_[i]; + auto column_category = column_categories_[i]; + if (RET_FAIL(column_schema->serialize_to(out))) { + } else if (RET_FAIL(common::SerializationUtil::write_i8( + static_cast<int8_t>(column_category), out))) { } } - return ret; } - - int deserialize(common::ByteStream &in) { - int ret = common::E_OK; - uint32_t num_columns; - if (RET_FAIL(common::SerializationUtil::read_var_uint( - num_columns, in))) { - } else { - for (size_t i = 0; IS_SUCC(ret) && i < num_columns; - i++) { - auto column_schema = std::make_shared<MeasurementSchema>(); - int8_t column_category = 0; - if (RET_FAIL(column_schema->deserialize_from(in))) { - } else if (RET_FAIL(common::SerializationUtil::read_i8( - column_category, in))) { - } - column_schemas_.emplace_back(column_schema); - column_categories_.emplace_back(static_cast<ColumnCategory>(column_category)); + return ret; + } + + int deserialize(common::ByteStream &in) { + int ret = common::E_OK; + uint32_t num_columns; + if (RET_FAIL( + common::SerializationUtil::read_var_uint(num_columns, in))) { + } else { + for (size_t i = 0; IS_SUCC(ret) && i < num_columns; i++) { + auto column_schema = std::make_shared<MeasurementSchema>(); + int8_t column_category = 0; + if (RET_FAIL(column_schema->deserialize_from(in))) { + } else if (RET_FAIL(common::SerializationUtil::read_i8( + column_category, in))) { } + column_schemas_.emplace_back(column_schema); + column_categories_.emplace_back( + static_cast<ColumnCategory>(column_category)); } - return ret; } + return ret; + } - ~TableSchema() { - column_schemas_.clear(); - } + ~TableSchema() { column_schemas_.clear(); } - const std::string &get_table_name() { return table_name_; } + const std::string &get_table_name() { return table_name_; } - std::vector<std::string> get_measurement_names() const { - std::vector<std::string> ret(column_schemas_.size()); - for (size_t i = 0; i < column_schemas_.size(); i++) { - ret[i] = column_schemas_[i]->measurement_name_; - } - return ret; + std::vector<std::string> get_measurement_names() const { + std::vector<std::string> ret(column_schemas_.size()); + for (size_t i = 0; i < column_schemas_.size(); i++) { + ret[i] = column_schemas_[i]->measurement_name_; } - - int find_column_index(const std::string &column_name) { - std::string lower_case_column_name = to_lower(column_name); - auto it = column_pos_index_.find(lower_case_column_name); - if (it != column_pos_index_.end()) { - return it->second; - } else { - int index = -1; - for (size_t i = 0; i < column_schemas_.size(); ++i) { - if (to_lower(column_schemas_[i]->measurement_name_) == - lower_case_column_name) { - index = static_cast<int>(i); - break; - } + return ret; + } + + int find_column_index(const std::string &column_name) { + std::string lower_case_column_name = to_lower(column_name); + auto it = column_pos_index_.find(lower_case_column_name); + if (it != column_pos_index_.end()) { + return it->second; + } else { + int index = -1; + for (size_t i = 0; i < column_schemas_.size(); ++i) { + if (to_lower(column_schemas_[i]->measurement_name_) == + lower_case_column_name) { + index = static_cast<int>(i); + break; Review Comment: measurement names should be converted to lower cases when put into the schema, then it is necessary to call to_lower here. ########## cpp/src/common/tsfile_common.h: ########## @@ -907,233 +910,251 @@ static const char *meta_index_node_type_names[5] = { #endif struct MetaIndexNode { - // TODO use vector to support binary search - // common::SimpleList<MetaIndexEntry*> children_; - std::vector<std::shared_ptr<IMetaIndexEntry>> children_; - int64_t end_offset_; - MetaIndexNodeType node_type_; - common::PageArena *pa_; - - explicit MetaIndexNode(common::PageArena *pa) - : children_(), end_offset_(0), node_type_(), pa_(pa) {} - - std::shared_ptr<IMetaIndexEntry> peek() { - if (children_.empty()) { - return nullptr; + // TODO use vector to support binary search + // common::SimpleList<MetaIndexEntry*> children_; + std::vector<std::shared_ptr<IMetaIndexEntry>> children_; + int64_t end_offset_; + MetaIndexNodeType node_type_; + common::PageArena *pa_; + + explicit MetaIndexNode(common::PageArena *pa) + : children_(), end_offset_(0), node_type_(), pa_(pa) {} + + std::shared_ptr<IMetaIndexEntry> peek() { + if (children_.empty()) { + return nullptr; + } + return children_[0]; } - return children_[0]; - } - ~MetaIndexNode() {} + ~MetaIndexNode() {} - static void self_deleter(MetaIndexNode* ptr) { - if (ptr) { - ptr->~MetaIndexNode(); + static void self_deleter(MetaIndexNode *ptr) { + if (ptr) { + ptr->~MetaIndexNode(); + } } - } - int binary_search_children(std::shared_ptr<IComparable> key, - bool exact_search, - IMetaIndexEntry &ret_index_entry, - int64_t &ret_end_offset); + int binary_search_children(std::shared_ptr<IComparable> key, + bool exact_search, + IMetaIndexEntry &ret_index_entry, + int64_t &ret_end_offset); - int serialize_to(common::ByteStream &out) { - int ret = common::E_OK; + int serialize_to(common::ByteStream &out) { + int ret = common::E_OK; #if DEBUG_SE - int64_t start_pos = out.total_size(); + int64_t start_pos = out.total_size(); #endif - if (RET_FAIL(common::SerializationUtil::write_var_uint(children_.size(), - out))) { - } else { - for (size_t i = 0; IS_SUCC(ret) && i < children_.size(); i++) { - auto entry = children_[i]; - if (RET_FAIL(entry->serialize_to(out))) { - } - } - if (IS_SUCC(ret)) { - if (RET_FAIL(common::SerializationUtil::write_i64(end_offset_, - out))) { - } else if (RET_FAIL(common::SerializationUtil::write_char( - node_type_, out))) { + if (RET_FAIL(common::SerializationUtil::write_var_uint(children_.size(), + out))) { + } else { + for (size_t i = 0; IS_SUCC(ret) && i < children_.size(); i++) { + auto entry = children_[i]; + if (RET_FAIL(entry->serialize_to(out))) { + } + } + if (IS_SUCC(ret)) { + if (RET_FAIL(common::SerializationUtil::write_i64(end_offset_, + out))) { + } else if (RET_FAIL(common::SerializationUtil::write_char( + node_type_, out))) { + } + } } - } - } #if DEBUG_SE - std::cout << "MetaIndexNode serialize_to. this=" << *this - << " at file pos: " << start_pos << " to " << out.total_size() - << std::endl; + std::cout << "MetaIndexNode serialize_to. this=" << *this + << " at file pos: " << start_pos << " to " << out.total_size() + << std::endl; #endif - return ret; - } - - int deserialize_from(const char *buf, int len) { - common::ByteStream bs; - bs.wrap_from(buf, len); - return deserialize_from(bs); - } - int deserialize_from(common::ByteStream &in) { - int ret = common::E_OK; - uint32_t children_size = 0; - if (RET_FAIL( - common::SerializationUtil::read_var_uint(children_size, in))) { - return ret; - } - for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { - void *entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); - if (IS_NULL(entry_buf)) { - return common::E_OOM; - } - auto entry = new(entry_buf) MeasurementMetaIndexEntry; - - if (RET_FAIL(entry->deserialize_from(in, pa_))) { - } else { - children_.push_back(std::shared_ptr<IMetaIndexEntry>( - entry, IMetaIndexEntry::self_destructor)); - } - } // end for - if (IS_SUCC(ret)) { - char node_type_ch = 0; - if (RET_FAIL( - common::SerializationUtil::read_i64(end_offset_, in))) { - } else if (RET_FAIL(common::SerializationUtil::read_char( - node_type_ch, in))) { - } else { - node_type_ = (MetaIndexNodeType) node_type_ch; - } + return ret; + } + + int deserialize_from(const char *buf, int len) { + common::ByteStream bs; + bs.wrap_from(buf, len); + return deserialize_from(bs); } + int deserialize_from(common::ByteStream &in) { + int ret = common::E_OK; + uint32_t children_size = 0; + if (RET_FAIL( + common::SerializationUtil::read_var_uint(children_size, in))) { + return ret; + } + for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { + void *entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); + if (IS_NULL(entry_buf)) { + return common::E_OOM; + } + auto entry = new (entry_buf) MeasurementMetaIndexEntry; + + if (RET_FAIL(entry->deserialize_from(in, pa_))) { + } else { + children_.push_back(std::shared_ptr<IMetaIndexEntry>( + entry, IMetaIndexEntry::self_destructor)); + } + } // end for + if (IS_SUCC(ret)) { + char node_type_ch = 0; + if (RET_FAIL( + common::SerializationUtil::read_i64(end_offset_, in))) { + } else if (RET_FAIL(common::SerializationUtil::read_char( + node_type_ch, in))) { + } else { + node_type_ = (MetaIndexNodeType)node_type_ch; + } + } #if DEBUG_SE - std::cout << "MetaIndexNode deserialize_from. this=" << *this - << std::endl; + std::cout << "MetaIndexNode deserialize_from. this=" << *this + << std::endl; #endif - return ret; - } - int device_deserialize_from(const char *buf, int len) { - common::ByteStream bs; - bs.wrap_from(buf, len); - return device_deserialize_from(bs); - } - int device_deserialize_from(common::ByteStream &in) { - int ret = common::E_OK; - uint32_t children_size = 0; - if (RET_FAIL( - common::SerializationUtil::read_var_uint(children_size, in))) { - return ret; - } - for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { - void *entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); - if (IS_NULL(entry_buf)) { - return common::E_OOM; - } - // auto entry = new (entry_buf) DeviceMetaIndexEntry; - auto entry = std::make_shared<DeviceMetaIndexEntry>(); - - if (RET_FAIL(entry->deserialize_from(in, pa_))) { - } else { - children_.push_back(entry); - } - } // end for - if (IS_SUCC(ret)) { - char node_type_ch = 0; - if (RET_FAIL( - common::SerializationUtil::read_i64(end_offset_, in))) { - } else if (RET_FAIL(common::SerializationUtil::read_char( - node_type_ch, in))) { - } else { - node_type_ = (MetaIndexNodeType) node_type_ch; - } + return ret; } + int device_deserialize_from(const char *buf, int len) { + common::ByteStream bs; + bs.wrap_from(buf, len); + return device_deserialize_from(bs); + } + int device_deserialize_from(common::ByteStream &in) { + int ret = common::E_OK; + uint32_t children_size = 0; + if (RET_FAIL( + common::SerializationUtil::read_var_uint(children_size, in))) { + return ret; + } + for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { + void *entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); + if (IS_NULL(entry_buf)) { + return common::E_OOM; + } + // auto entry = new (entry_buf) DeviceMetaIndexEntry; + auto entry = std::make_shared<DeviceMetaIndexEntry>(); + + if (RET_FAIL(entry->deserialize_from(in, pa_))) { + } else { + children_.push_back(entry); + } + } // end for + if (IS_SUCC(ret)) { + char node_type_ch = 0; + if (RET_FAIL( + common::SerializationUtil::read_i64(end_offset_, in))) { + } else if (RET_FAIL(common::SerializationUtil::read_char( + node_type_ch, in))) { + } else { + node_type_ = (MetaIndexNodeType)node_type_ch; + } + } #if DEBUG_SE - std::cout << "MetaIndexNode deserialize_from. this=" << *this - << std::endl; + std::cout << "MetaIndexNode deserialize_from. this=" << *this + << std::endl; #endif - return ret; - } + return ret; + } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const MetaIndexNode &node) { - os << "end_offset=" << node.end_offset_ - << ", node_type=" << meta_index_node_type_names[node.node_type_]; - - os << ", MetaIndexEntry children={"; - for (size_t i = 0; i < node.children_.size(); i++) { - os << (i == 0 ? "" : ", ") << "[" << i << "]={" - << *node.children_[i] << "}"; - } - os << "}"; - return os; - } + friend std::ostream &operator<<(std::ostream &os, + const MetaIndexNode &node) { + os << "end_offset=" << node.end_offset_ + << ", node_type=" << meta_index_node_type_names[node.node_type_]; + + os << ", MetaIndexEntry children={"; + for (size_t i = 0; i < node.children_.size(); i++) { + os << (i == 0 ? "" : ", ") << "[" << i << "]={" + << *node.children_[i] << "}"; + } + os << "}"; + return os; + } #endif - FORCE_INLINE bool is_full() const { - return children_.size() >= - common::g_config_value_.max_degree_of_index_node_; - } + FORCE_INLINE bool is_full() const { + return children_.size() >= + common::g_config_value_.max_degree_of_index_node_; + } - FORCE_INLINE bool is_empty() const { return children_.size() == 0; } + FORCE_INLINE bool is_empty() const { return children_.size() == 0; } - FORCE_INLINE int push_entry(std::shared_ptr<IMetaIndexEntry> entry) { + FORCE_INLINE int push_entry(std::shared_ptr<IMetaIndexEntry> entry) { #if DEBUG_SE - std::cout << "MetaIndexNode.push_entry(" << *entry << ")" << std::endl; + std::cout << "MetaIndexNode.push_entry(" << *entry << ")" << std::endl; #endif - children_.push_back(entry); - return common::E_OK; - } - FORCE_INLINE void destroy() { - // std::vector<MetaIndexEntry*>().swap(children_); - children_.~vector(); - } + children_.push_back(entry); + return common::E_OK; + } + FORCE_INLINE void destroy() { + // std::vector<MetaIndexEntry*>().swap(children_); + children_.~vector(); + } }; class TableSchema; struct TsFileMeta { - typedef std::map<std::shared_ptr<IDeviceID>, std::shared_ptr<MetaIndexNode>, - IDeviceIDComparator> - DeviceNodeMap; - std::map<std::string, std::shared_ptr<MetaIndexNode>> table_metadata_index_node_map_; - std::unordered_map<std::string, std::string> tsfile_properties_; - typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>> - TableSchemasMap; - TableSchemasMap table_schemas_; - int64_t meta_offset_; - BloomFilter *bloom_filter_; - common::PageArena *page_arena_; - - TsFileMeta() - : meta_offset_(0), - bloom_filter_(nullptr), - page_arena_(nullptr) {} - - explicit TsFileMeta(common::PageArena *pa) - : meta_offset_(0), - bloom_filter_(nullptr), - page_arena_(pa) {} - ~TsFileMeta() { - if (bloom_filter_ != nullptr) { - bloom_filter_->destroy(); - } - table_metadata_index_node_map_.clear(); - table_schemas_.clear(); - } - - int serialize_to(common::ByteStream &out); - - int deserialize_from(common::ByteStream &in); + typedef std::map<std::shared_ptr<IDeviceID>, std::shared_ptr<MetaIndexNode>, + IDeviceIDComparator> + DeviceNodeMap; + std::map<std::string, std::shared_ptr<MetaIndexNode>> + table_metadata_index_node_map_; + std::unordered_map<std::string, std::string> tsfile_properties_; + typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>> + TableSchemasMap; + TableSchemasMap table_schemas_; + int64_t meta_offset_; + BloomFilter *bloom_filter_; + common::PageArena *page_arena_; + + int get_table_metaindex_node(common::String &table_name, + MetaIndexNode *&ret_node) { // TODO Review Comment: TODO what ########## cpp/src/reader/block/single_device_tsblock_reader.cc: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "single_device_tsblock_reader.h" + +namespace storage { + +SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( + DeviceQueryTask* device_query_task, uint32_t block_size, + IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader, + Filter* time_filter, Filter* field_filter) + : device_query_task_(device_query_task), + field_filter_(field_filter), + block_size_(block_size), + tuple_desc_(), + tsfile_io_reader_(tsfile_io_reader) { + pa_.init(512, common::AllocModID::MOD_TSFILE_READER); + tuple_desc_.reset(); + common::init_common(); + tuple_desc_.push_back(common::g_time_column_desc); + auto table_schema = device_query_task->get_table_schema(); + for (const auto& column_name : device_query_task_->get_column_names()) { + common::ColumnDesc column_desc( + table_schema->get_column_desc(column_name)); + tuple_desc_.push_back(column_desc); + } + current_block_ = common::TsBlock::create_tsblock(&tuple_desc_, block_size); + col_appenders_.resize(tuple_desc_.get_column_count()); + for (int i = 0; i < tuple_desc_.get_column_count(); i++) { + col_appenders_[i] = new common::ColAppender(i, current_block_); + } + row_appender_ = new common::RowAppender(current_block_); + std::vector<ITimeseriesIndex*> time_series_indexs( + device_query_task_->get_column_names().size()); + tsfile_io_reader_->get_timeseries_indexes( + device_query_task->get_device_id(), + device_query_task->get_column_names(), time_series_indexs, pa_); + for (const auto& time_series_index : time_series_indexs) { + construct_column_context(time_series_index, time_filter); + } + + for (const auto& id_column : + device_query_task->get_column_mapping()->get_id_columns()) { + const auto& column_pos_in_result = + device_query_task->get_column_mapping()->get_column_pos(id_column); + int column_pos_in_id = + table_schema->find_id_column_order(id_column) + 1; + id_column_contexts_.insert(std::make_pair( + id_column, + IdColumnContext(column_pos_in_result, column_pos_in_id))); + } +} + +bool SingleDeviceTsBlockReader::has_next() { + if (!last_block_returned_) { + return true; + } + + if (field_column_contexts_.empty()) { + return false; + } + current_block_->reset(); + + next_time_ = -1; + + std::vector<MeasurementColumnContext*> min_time_columns; + while (current_block_->get_row_count() < block_size_) { + for (auto& column_context : field_column_contexts_) { + int64_t time; + if (IS_FAIL(column_context.second->get_current_time(time))) { + continue; + } + if (next_time_ == -1 || time < next_time_) { + next_time_ = time; + min_time_columns.clear(); + min_time_columns.push_back(column_context.second); + } else if (time == next_time_) { + min_time_columns.push_back(column_context.second); + } + } + + if (IS_FAIL(fill_measurements(min_time_columns))) { + return false; + } else { + next_time_ = -1; + } + + if (field_column_contexts_.empty()) { + break; + } + } + if (current_block_->get_row_count() > 0) { + fill_ids(); + current_block_->fill_trailling_nulls(); + last_block_returned_ = false; + return true; + } + return false; +} + +int SingleDeviceTsBlockReader::fill_measurements( + std::vector<MeasurementColumnContext*>& column_contexts) { + int ret = common::E_OK; + if (field_filter_ == + nullptr /*TODO: || field_filter_->satisfy(column_contexts)*/) { + if (!col_appenders_[0]->add_row()) { + assert(false); + } + // std::cout << col_appenders_[0]->tsblock_->debug_string() << std::endl; + col_appenders_[0]->append((char*)&next_time_, sizeof(next_time_)); + for (uint32_t i = 0; i < column_contexts.size(); i++) { + column_contexts[i]->fill_into(col_appenders_); + advance_column(column_contexts[i]); + } + // for (auto& column_contest : column_contexts) { + // column_contest->fill_into(col_appenders_); + // advance_column(column_contest); + // } + row_appender_->add_row(); + } + return ret; +} + +void SingleDeviceTsBlockReader::advance_column( + MeasurementColumnContext* column_context) { + if (column_context->move_iter() == common::E_NO_MORE_DATA) { + column_context->remove_from(field_column_contexts_); + } +} + +void SingleMeasurementColumnContext::remove_from( + std::map<std::string, MeasurementColumnContext*>& column_context_map) { + auto iter = column_context_map.find(column_name_); + if (iter != column_context_map.end()) { + delete iter->second; + column_context_map.erase(iter); + } +} + +void SingleDeviceTsBlockReader::fill_ids() { + for (const auto& entry : id_column_contexts_) { + const auto& id_column_context = entry.second; + for (int32_t pos : id_column_context.pos_in_result_) { + common::String device_id( + device_query_task_->get_device_id()->get_segments().at( + id_column_context.pos_in_device_id_)); + col_appenders_[pos]->fill((char*)&device_id, sizeof(device_id), + current_block_->get_row_count()); + } + } +} + +int SingleDeviceTsBlockReader::next(common::TsBlock*& ret_block) { + if (!has_next()) { + return common::E_NO_MORE_DATA; + } + last_block_returned_ = true; + ret_block = current_block_; + return common::E_OK; +} + +void SingleDeviceTsBlockReader::close() { + for (auto& column_context : field_column_contexts_) { + delete column_context.second; + } + if (current_block_) { + delete current_block_; + current_block_ = nullptr; + } + for (auto& col_appender : col_appenders_) { + if (col_appender) { + delete col_appender; + col_appender = nullptr; + } + } + if (row_appender_) { + delete row_appender_; + row_appender_ = nullptr; + } +} + +void SingleDeviceTsBlockReader::construct_column_context( + const ITimeseriesIndex* time_series_index, Filter* time_filter) { + // TODO: judge whether the time_series_index is aligned and jump empty chunk + SingleMeasurementColumnContext* column_context = + new SingleMeasurementColumnContext(tsfile_io_reader_); + column_context->init(device_query_task_, time_series_index, time_filter, + pa_); + field_column_contexts_.insert(std::make_pair( + time_series_index->get_measurement_name().to_std_string(), + column_context)); +} + +int SingleMeasurementColumnContext::init( + DeviceQueryTask* device_query_task, + const ITimeseriesIndex* time_series_index, Filter* time_filter, + common::PageArena& pa) { + int ret = common::E_OK; + column_name_ = time_series_index->get_measurement_name().to_std_string(); + if (RET_FAIL(tsfile_io_reader_->alloc_ssi( + device_query_task->get_device_id()->get_device_name(), + time_series_index->get_measurement_name().to_std_string(), ssi_, pa, + time_filter))) { + } else if (RET_FAIL(get_next_tsblock(true))) { + } + return ret; +} + +int SingleMeasurementColumnContext::get_next_tsblock(bool alloc_mem) { + int ret = common::E_OK; + if (tsblock_ != nullptr) { + if (time_iter_) { + delete time_iter_; + time_iter_ = nullptr; + } + if (value_iter_) { + delete value_iter_; + value_iter_ = nullptr; + } + tsblock_->reset(); + } + if (RET_FAIL(ssi_->get_next(tsblock_, alloc_mem))) { + if (time_iter_) { + delete time_iter_; + time_iter_ = nullptr; + } + if (value_iter_) { + delete value_iter_; + value_iter_ = nullptr; + } + if (tsblock_) { + ssi_->destroy(); + tsblock_ = nullptr; + } + } else { + std::cout << "debug: \n"; + std::cout << tsblock_->debug_string() << std::endl; + time_iter_ = new common::ColIterator(0, tsblock_); + value_iter_ = new common::ColIterator(1, tsblock_); + } + return ret; +} + +int SingleMeasurementColumnContext::get_current_time(int64_t& time) { + if (time_iter_->end()) { + return common::E_NO_MORE_DATA; + } + uint32_t len = 0; + time = *(int64_t*)(time_iter_->read(&len)); + return common::E_OK; +} + +int SingleMeasurementColumnContext::get_current_value(char* value) { + if (value_iter_->end()) { + return common::E_NO_MORE_DATA; + } + uint32_t len = 0; + value = value_iter_->read(&len); + return common::E_OK; +} + +int SingleMeasurementColumnContext::move_iter() { + int ret = common::E_OK; + if (time_iter_->end()) { + if (RET_FAIL(get_next_tsblock(false))) { + return ret; + } + } else { + time_iter_->next(); + value_iter_->next(); + } + return ret; +} + +void SingleMeasurementColumnContext::fill_into( + std::vector<common::ColAppender*>& col_appenders) { + char* val = nullptr; + if (!get_current_value(val)) { + return; + } + for (int32_t pos : pos_in_result_) { + int len = 0; + if (get_current_value(val)) { + col_appenders[pos]->append(val, len); + } + } +} Review Comment: len is not set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
