.. _program_listing_file_include_shad_data_structures_array.h: Program Listing for File array.h ================================ |exhale_lsh| :ref:`Return to documentation for file ` (``include/shad/data_structures/array.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp //===------------------------------------------------------------*- C++ -*-===// // // SHAD // // The Scalable High-performance Algorithms and Data Structure Library // //===----------------------------------------------------------------------===// // // Copyright 2018 Battelle Memorial Institute // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy // of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations // under the License. // //===----------------------------------------------------------------------===// #ifndef INCLUDE_SHAD_DATA_STRUCTURES_ARRAY_H_ #define INCLUDE_SHAD_DATA_STRUCTURES_ARRAY_H_ #include #include #include #include #include #include #include #include #include "shad/data_structures/abstract_data_structure.h" #include "shad/data_structures/buffer.h" #include "shad/runtime/runtime.h" namespace shad { template class Array : public AbstractDataStructure> { template friend class AbstractDataStructure; public: constexpr static size_t kMaxChunkSize = constants::max(constants::kBufferNumBytes / sizeof(T), 1lu); using ObjectID = typename AbstractDataStructure>::ObjectID; using BuffersVector = impl::BuffersVector, Array>; using ShadArrayPtr = typename AbstractDataStructure>::SharedPtr; ObjectID GetGlobalID() const { return oid_; } size_t Size() const noexcept { return size_; } #ifdef DOXYGEN_IS_RUNNING static ShadArrayPtr Create(size_t size, const T &initValue); #endif void InsertAt(const size_t pos, const T &value); void InsertAt(const size_t pos, const T *values, const size_t numValues); void AsyncInsertAt(rt::Handle &handle, const size_t pos, const T &value); void AsyncInsertAt(rt::Handle &handle, const size_t pos, const T *values, const size_t numValues); void BufferedInsertAt(const size_t pos, const T &value); void BufferedAsyncInsertAt(rt::Handle &handle, const size_t pos, const T &value); void WaitForBufferedInsert() { buffers_.FlushAll(); } T At(const size_t pos); void AsyncAt(rt::Handle &handle, const size_t pos, T *result); template void Apply(const size_t pos, ApplyFunT &&function, Args &... args); template void AsyncApply(rt::Handle &handle, const size_t pos, ApplyFunT &&function, Args &... args); template void ForEachInRange(const size_t first, const size_t last, ApplyFunT &&function, Args &... args); template void AsyncForEachInRange(rt::Handle &handle, const size_t first, const size_t last, ApplyFunT &&function, Args &... args); template void ForEach(ApplyFunT &&function, Args &... args); template void AsyncForEach(rt::Handle &handle, ApplyFunT &&function, Args &... args); // FIXME it should be protected void BufferEntryInsert(const std::tuple entry) { data_[std::get<0>(entry)] = std::get<1>(entry); } protected: Array(ObjectID oid, size_t size, const T &initValue) : oid_(oid), size_(size), pivot_((size % rt::numLocalities() == 0) ? rt::numLocalities() : rt::numLocalities() - (size % rt::numLocalities())), data_(), dataDistribution_(), buffers_(oid) { rt::Locality pivot(pivot_); size_t start = 0; size_t chunkSize = size / rt::numLocalities(); auto localities = rt::allLocalities(); for (auto &locality : localities) { if (locality < pivot) { dataDistribution_.emplace_back( std::make_pair(start, start + chunkSize - 1)); } else { dataDistribution_.emplace_back( std::make_pair(start, start + chunkSize)); ++start; } start += chunkSize; } if (rt::thisLocality() < pivot) data_.resize(chunkSize, initValue); else data_.resize(chunkSize + 1, initValue); } private: ObjectID oid_; size_t size_; uint32_t pivot_; std::vector data_; std::vector> dataDistribution_; BuffersVector buffers_; struct InsertAtArgs { ObjectID oid; size_t pos; T value; }; struct AtArgs { ObjectID oid; size_t pos; }; static void InsertAtFun(const InsertAtArgs &args) { ShadArrayPtr ptr = Array::GetPtr(args.oid); ptr->data_[args.pos] = args.value; } static void RangedInsertAtFun(const uint8_t *args, const uint32_t size) { uint8_t *argsPtr = const_cast(args); ObjectID &oid = *reinterpret_cast(argsPtr); argsPtr += sizeof(oid); size_t pos = *reinterpret_cast(argsPtr); argsPtr += sizeof(size_t); size_t chunkSize = *reinterpret_cast(argsPtr); argsPtr += sizeof(size_t); ShadArrayPtr ptr = Array::GetPtr(oid); memcpy(&(ptr->data_[pos]), argsPtr, chunkSize * sizeof(T)); } static void AsyncRangedInsertAtFun(rt::Handle &, const uint8_t *args, const uint32_t size) { uint8_t *argsPtr = const_cast(args); ObjectID &oid = *reinterpret_cast(argsPtr); argsPtr += sizeof(oid); size_t pos = *reinterpret_cast(argsPtr); argsPtr += sizeof(size_t); size_t chunkSize = *reinterpret_cast(argsPtr); argsPtr += sizeof(size_t); ShadArrayPtr ptr = Array::GetPtr(oid); memcpy(&(ptr->data_[pos]), argsPtr, chunkSize * sizeof(T)); } static void AsyncInsertAtFun(shad::rt::Handle &, const InsertAtArgs &args) { ShadArrayPtr ptr = Array::GetPtr(args.oid); ptr->data_[args.pos] = args.value; } static void AtFun(const AtArgs &args, T *result) { ShadArrayPtr ptr = Array::GetPtr(args.oid); *result = ptr->data_[args.pos]; } static void AsyncAtFun(rt::Handle &handle, const AtArgs &args, T *result) { ShadArrayPtr ptr = Array::GetPtr(args.oid); *result = ptr->data_[args.pos]; } template static void CallApplyFun(ObjectID &oid, size_t pos, size_t loffset, ApplyFunT function, std::tuple &args, std::index_sequence) { // Get a local instance on the remote node. auto arrayPtr = Array::GetPtr(oid); T &element = arrayPtr->data_[loffset]; function(pos, element, std::get(args)...); } template static void ApplyFunWrapper(const Tuple &args) { constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; Tuple &tuple = const_cast(args); CallApplyFun(std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::get<4>(tuple), std::make_index_sequence{}); } template static void AsyncCallApplyFun(rt::Handle &handle, ObjectID &oid, size_t pos, size_t loffset, ApplyFunT function, std::tuple &args, std::index_sequence) { // Get a local instance on the remote node. auto arrayPtr = Array::GetPtr(oid); T &element = arrayPtr->data_[loffset]; function(handle, pos, element, std::get(args)...); } template static void AsyncApplyFunWrapper(rt::Handle &handle, const Tuple &args) { constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; Tuple &tuple = const_cast(args); AsyncCallApplyFun(handle, std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::get<4>(tuple), std::make_index_sequence{}); } template static void CallForEachInRangeFun(size_t i, ObjectID &oid, size_t pos, size_t lpos, ApplyFunT function, std::tuple &args, std::index_sequence) { // Get a local instance on the remote node. auto arrayPtr = Array::GetPtr(oid); T &element = arrayPtr->data_[i + lpos]; function(i + pos, element, std::get(args)...); } template static void ForEachInRangeFunWrapper(const Tuple &args, size_t i) { constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; Tuple &tuple = const_cast(args); CallForEachInRangeFun(i, std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::get<4>(tuple), std::make_index_sequence{}); } template static void AsyncCallForEachInRangeFun(rt::Handle &handle, size_t i, ObjectID &oid, size_t pos, size_t lpos, ApplyFunT function, std::tuple &args, std::index_sequence) { // Get a local instance on the remote node. auto arrayPtr = Array::GetPtr(oid); T &element = arrayPtr->data_[i + lpos]; function(handle, i + pos, element, std::get(args)...); } template static void AsyncForEachInRangeFunWrapper(rt::Handle &handle, const Tuple &args, size_t i) { constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; Tuple &tuple = const_cast(args); AsyncCallForEachInRangeFun(handle, i, std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::get<4>(tuple), std::make_index_sequence{}); } template static void AsyncCallForEachFun(rt::Handle &handle, const size_t i, T *arrayPtr, ApplyFunT function, size_t pos, std::tuple &args, std::index_sequence) { function(handle, pos + i, arrayPtr[i], std::get(args)...); } template static void AsyncForEachFunWrapper(rt::Handle &handle, const Tuple &args, size_t i) { constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; Tuple &tuple = const_cast(args); AsyncCallForEachFun(handle, i, std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::make_index_sequence{}); } template static void CallForEachFun(size_t i, T *arrayPtr, ApplyFunT function, size_t pos, std::tuple &args, std::index_sequence) { function(i + pos, arrayPtr[i], std::get(args)...); } template static void ForEachFunWrapper(const Tuple &args, size_t i) { constexpr auto Size = std::tuple_size< typename std::decay(args))>::type>::value; Tuple &tuple = const_cast(args); CallForEachFun(i, std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple), std::get<3>(tuple), std::make_index_sequence{}); } }; static std::pair getTargetLocalityFromTargePosition( const std::vector> &dataDistribution, size_t position) { auto itr = std::lower_bound( dataDistribution.begin(), dataDistribution.end(), position, [](const std::pair &lhs, const size_t position) -> bool { return lhs.second < position; }); rt::Locality dest(std::distance(dataDistribution.begin(), itr)); size_t off = position - itr->first; return std::make_pair(dest, off); } template void Array::InsertAt(const size_t pos, const T &value) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); if (target.first == rt::thisLocality()) { data_[target.second] = value; } else { InsertAtArgs args = {oid_, target.second, value}; rt::executeAt(target.first, InsertAtFun, args); } } template void Array::AsyncInsertAt(rt::Handle &handle, const size_t pos, const T &value) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); if (target.first == rt::thisLocality()) { data_[target.second] = value; } else { InsertAtArgs args = {oid_, target.second, value}; rt::asyncExecuteAt(handle, target.first, AsyncInsertAtFun, args); } } template void Array::BufferedInsertAt(const size_t pos, const T &value) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); if (target.first == rt::thisLocality()) { data_[target.second] = value; } else { buffers_.Insert(std::tuple(target.second, value), target.first); } } template void Array::InsertAt(const size_t pos, const T *values, const size_t numValues) { size_t tgtPos = 0, firstPos = pos; rt::Locality tgtLoc; size_t remainingValues = numValues; size_t chunkSize = 0; T *valuesPtr = const_cast(values); while ((remainingValues > 0) && (firstPos < pivot_ * (size_ / rt::numLocalities()))) { tgtLoc = rt::Locality(firstPos / (size_ / rt::numLocalities())); tgtPos = firstPos % (size_ / rt::numLocalities()); chunkSize = std::min((size_ / rt::numLocalities() - tgtPos), remainingValues); if (tgtLoc == rt::thisLocality()) { memcpy(&data_[tgtPos], valuesPtr, chunkSize * sizeof(T)); } else { chunkSize = constants::min(chunkSize, kMaxChunkSize); size_t argsSize = sizeof(oid_) + sizeof(size_t) * 2 + sizeof(T) * chunkSize; std::shared_ptr args(new uint8_t[argsSize], std::default_delete()); uint8_t *argsPtr = args.get(); memcpy(argsPtr, &oid_, sizeof(oid_)); argsPtr += sizeof(oid_); memcpy(argsPtr, &tgtPos, sizeof(size_t)); argsPtr += sizeof(size_t); memcpy(argsPtr, &chunkSize, sizeof(size_t)); argsPtr += sizeof(size_t); memcpy(argsPtr, valuesPtr, sizeof(T) * chunkSize); rt::executeAt(tgtLoc, RangedInsertAtFun, args, argsSize); } firstPos += chunkSize; remainingValues -= chunkSize; valuesPtr += chunkSize; } while (remainingValues > 0) { size_t newPos = firstPos - (pivot_ * (size_ / rt::numLocalities())); tgtLoc = rt::Locality(pivot_ + newPos / ((size_ / rt::numLocalities() + 1))); tgtPos = newPos % ((size_ / rt::numLocalities() + 1)); chunkSize = std::min((size_ / rt::numLocalities() + 1 - tgtPos), remainingValues); if (tgtLoc == rt::thisLocality()) { memcpy(&data_[tgtPos], valuesPtr, chunkSize * sizeof(T)); } else { chunkSize = constants::min(chunkSize, kMaxChunkSize); size_t argsSize = sizeof(oid_) + sizeof(size_t) * 2 + sizeof(T) * chunkSize; // FIXME(SHAD-125) std::shared_ptr args(new uint8_t[argsSize], std::default_delete()); uint8_t *argsPtr = args.get(); memcpy(argsPtr, &oid_, sizeof(oid_)); argsPtr += sizeof(oid_); memcpy(argsPtr, &tgtPos, sizeof(size_t)); argsPtr += sizeof(size_t); memcpy(argsPtr, &chunkSize, sizeof(size_t)); argsPtr += sizeof(size_t); memcpy(argsPtr, valuesPtr, sizeof(T) * chunkSize); rt::executeAt(tgtLoc, RangedInsertAtFun, args, argsSize); } firstPos += chunkSize; remainingValues -= chunkSize; valuesPtr += chunkSize; } } template void Array::AsyncInsertAt(rt::Handle &handle, const size_t pos, const T *values, const size_t numValues) { size_t tgtPos = 0, firstPos = pos; rt::Locality tgtLoc; size_t remainingValues = numValues; size_t chunkSize = 0; T *valuesPtr = const_cast(values); while (remainingValues > 0) { if (firstPos < pivot_ * (size_ / rt::numLocalities())) { tgtLoc = rt::Locality(firstPos / (size_ / rt::numLocalities())); tgtPos = firstPos % (size_ / rt::numLocalities()); chunkSize = std::min((size_ / rt::numLocalities() - tgtPos), remainingValues); } else { size_t newPos = firstPos - (pivot_ * (size_ / rt::numLocalities())); tgtLoc = rt::Locality(pivot_ + newPos / ((size_ / rt::numLocalities() + 1))); tgtPos = newPos % ((size_ / rt::numLocalities() + 1)); chunkSize = std::min((size_ / rt::numLocalities() + 1 - tgtPos), remainingValues); } if (tgtLoc == rt::thisLocality()) { memcpy(&data_[tgtPos], valuesPtr, chunkSize * sizeof(T)); } else { chunkSize = constants::min(chunkSize, kMaxChunkSize); size_t argsSize = sizeof(oid_) + sizeof(size_t) * 2 + sizeof(T) * chunkSize; // FIXME(SHAD-125) std::shared_ptr args(new uint8_t[argsSize], std::default_delete()); ; uint8_t *argsPtr = args.get(); memcpy(argsPtr, &oid_, sizeof(oid_)); argsPtr += sizeof(oid_); memcpy(argsPtr, &tgtPos, sizeof(size_t)); argsPtr += sizeof(size_t); memcpy(argsPtr, &chunkSize, sizeof(size_t)); argsPtr += sizeof(size_t); memcpy(argsPtr, valuesPtr, sizeof(T) * chunkSize); rt::asyncExecuteAt(handle, tgtLoc, AsyncRangedInsertAtFun, args, argsSize); } firstPos += chunkSize; remainingValues -= chunkSize; valuesPtr += chunkSize; } } template void Array::BufferedAsyncInsertAt(rt::Handle &handle, const size_t pos, const T &value) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); if (target.first == rt::thisLocality()) { data_[target.second] = value; } else { buffers_.AsyncInsert(handle, std::tuple(target.second, value), target.first); } } template T Array::At(const size_t pos) { if (rt::numLocalities() == 1) { return data_[pos]; } auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); T retValue; if (target.first == rt::thisLocality()) { retValue = data_[target.second]; } else { AtArgs args{oid_, target.second}; rt::executeAtWithRet(target.first, AtFun, args, &retValue); } return retValue; } template void Array::AsyncAt(rt::Handle &handle, const size_t pos, T *result) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); if (target.first == rt::thisLocality()) { *result = data_[target.second]; } else { AtArgs args = {oid_, target.second}; rt::asyncExecuteAtWithRet(handle, target.first, AsyncAtFun, args, result); } } template template void Array::Apply(const size_t pos, ApplyFunT &&function, Args &... args) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); if (target.first == rt::thisLocality()) { function(pos, data_[target.second], args...); return; } using FunctionTy = void (*)(size_t, T &, Args & ...); FunctionTy fn = std::forward(function); using ArgsTuple = std::tuple>; ArgsTuple argsTuple{oid_, pos, target.second, fn, std::tuple(args...)}; rt::executeAt(target.first, ApplyFunWrapper, argsTuple); } template template void Array::AsyncApply(rt::Handle &handle, const size_t pos, ApplyFunT &&function, Args &... args) { auto target = getTargetLocalityFromTargePosition(dataDistribution_, pos); using FunctionTy = void (*)(rt::Handle &, size_t, T &, Args & ...); FunctionTy fn = std::forward(function); using ArgsTuple = std::tuple>; ArgsTuple argsTuple{oid_, pos, target.second, fn, std::tuple(args...)}; rt::asyncExecuteAt(handle, target.first, AsyncApplyFunWrapper, argsTuple); } template template void Array::ForEachInRange(const size_t first, const size_t last, ApplyFunT &&function, Args &... args) { using FunctionTy = void (*)(size_t, T &, Args & ...); FunctionTy fn = std::forward(function); using ArgsTuple = std::tuple>; size_t tgtPos = 0, firstPos = first; rt::Locality tgtLoc; size_t remainingValues = last - first; size_t chunkSize = 0; ArgsTuple argsTuple{oid_, firstPos, tgtPos, fn, std::tuple(args...)}; while (remainingValues > 0) { if (firstPos < pivot_ * (size_ / rt::numLocalities())) { tgtLoc = rt::Locality(firstPos / (size_ / rt::numLocalities())); tgtPos = firstPos % (size_ / rt::numLocalities()); chunkSize = std::min((size_ / rt::numLocalities() - tgtPos), remainingValues); } else { size_t newPos = firstPos - (pivot_ * (size_ / rt::numLocalities())); tgtLoc = rt::Locality(pivot_ + newPos / ((size_ / rt::numLocalities() + 1))); tgtPos = newPos % ((size_ / rt::numLocalities() + 1)); chunkSize = std::min((size_ / rt::numLocalities() + 1 - tgtPos), remainingValues); } std::get<1>(argsTuple) = firstPos; std::get<2>(argsTuple) = tgtPos; rt::forEachAt(tgtLoc, ForEachInRangeFunWrapper, argsTuple, chunkSize); firstPos += chunkSize; remainingValues -= chunkSize; } } template template void Array::AsyncForEachInRange(rt::Handle &handle, const size_t first, const size_t last, ApplyFunT &&function, Args &... args) { using FunctionTy = void (*)(rt::Handle &, size_t, T &, Args & ...); FunctionTy fn = std::forward(function); using ArgsTuple = std::tuple>; size_t tgtPos = 0, firstPos = first; rt::Locality tgtLoc; size_t remainingValues = last - first; size_t chunkSize = 0; // first it ArgsTuple argsTuple{oid_, firstPos, tgtPos, fn, std::tuple(args...)}; while (remainingValues > 0) { if (firstPos < pivot_ * (size_ / rt::numLocalities())) { tgtLoc = rt::Locality(firstPos / (size_ / rt::numLocalities())); tgtPos = firstPos % (size_ / rt::numLocalities()); chunkSize = std::min((size_ / rt::numLocalities() - tgtPos), remainingValues); } else { size_t newPos = firstPos - (pivot_ * (size_ / rt::numLocalities())); tgtLoc = rt::Locality(pivot_ + newPos / ((size_ / rt::numLocalities() + 1))); tgtPos = newPos % ((size_ / rt::numLocalities() + 1)); chunkSize = std::min((size_ / rt::numLocalities() + 1 - tgtPos), remainingValues); } std::get<1>(argsTuple) = firstPos; std::get<2>(argsTuple) = tgtPos; rt::asyncForEachAt(handle, tgtLoc, AsyncForEachInRangeFunWrapper, argsTuple, chunkSize); firstPos += chunkSize; remainingValues -= chunkSize; } } template template void Array::AsyncForEach(rt::Handle &handle, ApplyFunT &&function, Args &... args) { using FunctionTy = void (*)(rt::Handle &, size_t, T &, Args & ...); FunctionTy fn = std::forward(function); using feArgs = std::tuple>; using ArgsTuple = std::tuple>; feArgs arguments{oid_, fn, std::tuple(args...)}; auto feLambda = [](rt::Handle &handle, const feArgs &args) { auto arrayPtr = Array::GetPtr(std::get<0>(args)); size_t currentLocality = static_cast(rt::thisLocality()); ArgsTuple argsTuple(arrayPtr->data_.data(), std::get<1>(args), arrayPtr->dataDistribution_[currentLocality].first, std::get<2>(args)); rt::asyncForEachAt(handle, rt::thisLocality(), AsyncForEachFunWrapper, argsTuple, arrayPtr->data_.size()); }; rt::asyncExecuteOnAll(handle, feLambda, arguments); } template template void Array::ForEach(ApplyFunT &&function, Args &... args) { using FunctionTy = void (*)(size_t, T &, Args & ...); FunctionTy fn = std::forward(function); using feArgs = std::tuple>; using ArgsTuple = std::tuple>; feArgs arguments{oid_, fn, std::tuple(args...)}; auto feLambda = [](const feArgs &args) { auto arrayPtr = Array::GetPtr(std::get<0>(args)); size_t currentLocality = static_cast(rt::thisLocality()); ArgsTuple argsTuple(arrayPtr->data_.data(), std::get<1>(args), arrayPtr->dataDistribution_[currentLocality].first, std::get<2>(args)); rt::forEachAt(rt::thisLocality(), ForEachFunWrapper, argsTuple, arrayPtr->data_.size()); }; rt::executeOnAll(feLambda, arguments); } } // namespace shad #endif // INCLUDE_SHAD_DATA_STRUCTURES_ARRAY_H_