Program Listing for File buffer.h¶
↰ Return to documentation for file (include/shad/data_structures/buffer.h
)
//===------------------------------------------------------------*- C++ -*-===//
//
// SHAD
//
// The Scalable High-performance Algorithms and Data Structure Library
//
//===----------------------------------------------------------------------===//
//
// Copyright 2018 Battelle Memorial Institute
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
//
//===----------------------------------------------------------------------===//
#ifndef INCLUDE_SHAD_DATA_STRUCTURES_BUFFER_H_
#define INCLUDE_SHAD_DATA_STRUCTURES_BUFFER_H_
#include <algorithm>
#include <array>
#include <atomic>
#include <vector>
#include "shad/data_structures/object_identifier.h"
#include "shad/runtime/runtime.h"
namespace shad {
namespace constants {
static const size_t kBufferNumBytes = 3072;
template <typename T>
constexpr static T const max(T const a, T const b) {
return a > b ? a : b;
}
template <typename T>
constexpr static T const min(T const a, T const b) {
return a < b ? a : b;
}
} // namespace constants
namespace impl {
template <typename EntryType, typename DataStructure>
class Buffer {
template <typename, typename>
friend class BuffersVector;
public:
constexpr static size_t kBufferSize =
constants::max(constants::kBufferNumBytes / sizeof(EntryType), 1lu);
using EntriesArray = std::array<EntryType, kBufferSize>;
Buffer(const Buffer& rhs)
: data_(rhs.data_),
size_(rhs.size_),
lock_(),
tgtLoc_(rhs.tgtLoc_),
oid_(rhs.oid_) {}
Buffer()
: size_(0),
lock_(),
tgtLoc_(),
oid_(ObjectIdentifier<DataStructure>::kNullID) {}
Buffer(const rt::Locality& loc, const ObjectIdentifier<DataStructure>& oid)
: size_(0), lock_(), tgtLoc_(loc), oid_(oid) {}
enum State { INSERTED, FLUSH, WAIT_FOR_FLUSH };
struct FlushArgs {
EntriesArray data;
size_t numEntries;
ObjectIdentifier<DataStructure> oid;
};
void FlushBuffer() {
if (size_ == 0) return;
FlushArgs args = {data_, size_, oid_};
auto InsertBufferLambda = [](const FlushArgs& args) {
auto dsPtr = DataStructure::GetPtr(args.oid);
for (size_t i = 0; i < args.numEntries; i++) {
dsPtr->BufferEntryInsert(args.data[i]);
}
};
rt::executeAt(tgtLoc_, InsertBufferLambda, args);
size_ = 0;
}
void AsyncFlushBuffer(rt::Handle& handle) {
if (size_ == 0) return;
FlushArgs args = {data_, size_, oid_};
auto AsyncInsertLambda = [](rt::Handle&, const FlushArgs& args) {
auto dsPtr = DataStructure::GetPtr(args.oid);
for (size_t i = 0; i < args.numEntries; i++) {
dsPtr->BufferEntryInsert(args.data[i]);
}
};
rt::asyncExecuteAt(handle, tgtLoc_, AsyncInsertLambda, args);
size_ = 0;
}
void Insert(const EntryType entry) {
lock_.lock();
data_[size_++] = entry;
if (size_ == kBufferSize) {
FlushBuffer();
}
lock_.unlock();
}
void Insert(const EntryType* entry, const size_t num_entries) {
if (entry == nullptr) throw std::invalid_argument("elem is null");
if (num_entries > kBufferSize)
throw std::invalid_argument("num_entries greater than buffer_size");
lock_.lock();
size_t pos = size_;
size_ += num_entries;
memcpy(&data_[pos], entry, num_entries * sizeof(EntryType));
if (size_ == kBufferSize) {
FlushBuffer();
}
lock_.unlock();
}
void AsyncInsert(rt::Handle& handle, const EntryType& entry) {
lock_.lock();
data_[size_++] = entry;
if (size_ == kBufferSize) {
AsyncFlushBuffer(handle);
}
lock_.unlock();
}
void AsyncInsert(rt::Handle& handle, const EntryType* entry,
const size_t num_entries) {
if (entry == nullptr) throw std::invalid_argument("elem is null");
if (num_entries > kBufferSize)
throw std::invalid_argument("num_entries greater than buffer_size");
lock_.lock();
size_t pos = size_;
size_ += num_entries;
memcpy(&data_[pos], entry, num_entries * sizeof(EntryType));
if (size_ == kBufferSize) {
AsyncFlushBuffer(handle);
}
lock_.unlock();
}
private:
EntriesArray data_;
size_t size_;
rt::Lock lock_;
ObjectIdentifier<DataStructure> oid_;
protected:
rt::Locality tgtLoc_;
explicit Buffer(const ObjectIdentifier<DataStructure>& oid)
: size_(0), lock_(), tgtLoc_(), oid_(oid) {}
};
template <typename EntryType, typename DataStructure>
class BuffersVector {
public:
using BufferType = Buffer<EntryType, DataStructure>;
explicit BuffersVector(ObjectIdentifier<DataStructure> oid)
: buffers_(rt::numLocalities(), BufferType(oid)) {
for (size_t i = 0; i < (rt::numLocalities()); i++) {
buffers_[i].tgtLoc_ = rt::Locality(i);
}
}
void Insert(const EntryType& entry, const rt::Locality& tgtLoc) {
uint32_t tgtId = static_cast<uint32_t>(tgtLoc);
buffers_[tgtId].Insert(entry);
}
void AsyncInsert(rt::Handle& handle, const EntryType& entry,
const rt::Locality& tgtLoc) {
uint32_t tgtId = static_cast<uint32_t>(tgtLoc);
buffers_.at(tgtId).AsyncInsert(handle, entry);
}
void FlushAll() {
for (auto& buffer : buffers_) {
buffer.FlushBuffer();
}
}
void AsyncFlushAll(rt::Handle& handle) {
for (auto& buffer : buffers_) {
buffer.AsyncFlushBuffer(handle);
}
}
private:
std::vector<BufferType> buffers_;
};
} // namespace impl
} // namespace shad
#endif // INCLUDE_SHAD_DATA_STRUCTURES_BUFFER_H_