Program Listing for File modifyng_sequence_ops.h

Return to documentation for file (include/shad/core/impl/modifyng_sequence_ops.h)

//===------------------------------------------------------------*- C++ -*-===//
//
//                                     SHAD
//
//      The Scalable High-performance Algorithms and Data Structure Library
//
//===----------------------------------------------------------------------===//
//
// Copyright 2018 Battelle Memorial Institute
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
//
//===----------------------------------------------------------------------===//

#ifndef INCLUDE_SHAD_CORE_IMPL_MODIFYING_SEQUENCE_OPS_H
#define INCLUDE_SHAD_CORE_IMPL_MODIFYING_SEQUENCE_OPS_H

#include <algorithm>
#include <cstring>
#include <functional>
#include <iterator>
#include <tuple>

#include "shad/core/execution.h"
#include "shad/core/impl/impl_patterns.h"
#include "shad/core/impl/utils.h"
#include "shad/core/iterator.h"
#include "shad/distributed_iterator_traits.h"
#include "shad/runtime/runtime.h"

namespace shad {
namespace impl {

template <typename ForwardIt, typename T>
void fill(distributed_parallel_tag&& policy, ForwardIt first, ForwardIt last,
          const T& value) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  // distributed map
  distributed_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, const T& value) {
        using local_iterator_t = typename itr_traits::local_iterator_type;

        // local map
        auto lrange = itr_traits::local_range(first, last);

        local_map_void(
            // range
            lrange.begin(), lrange.end(),
            // kernel
            [&](local_iterator_t b, local_iterator_t e) {
              std::fill(b, e, value);
            });
      },
      // map arguments
      value);
}

template <typename ForwardIt, typename T>
void fill(distributed_sequential_tag&& policy, ForwardIt first, ForwardIt last,
          const T& value) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  distributed_folding_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, const T& value) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        std::fill(lrange.begin(), lrange.end(), value);
      },
      // map arguments
      value);
}

namespace transform_impl {
//
// process a local input-portion into a local output-portion of a
// block-contiguous output-range
//
// sequential
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void block_contiguous_local(ForwardIt1 first, ForwardIt1 last,
                            ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits2 = distributed_iterator_traits<ForwardIt2>;
  auto size = std::distance(first, last);
  auto d_last = d_first;
  std::advance(d_last, size);

  // local assign
  auto local_d_range = itr_traits2::local_range(d_first, d_last);
  auto loc_res = std::transform(first, last, local_d_range.begin(), op);
}

// parallel
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void block_contiguous_local_par(ForwardIt1 first, ForwardIt1 last,
                                ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits1 = std::iterator_traits<ForwardIt1>;
  using itr_traits2 = distributed_iterator_traits<ForwardIt2>;
  auto size = std::distance(first, last);
  auto d_last = d_first;
  std::advance(d_last, size);

  // local map
  auto local_d_range = itr_traits2::local_range(d_first, d_last);
  using offset_t = typename std::iterator_traits<ForwardIt1>::difference_type;
  local_map_void_offset(
      // range
      first, last,
      // kernel
      [&](ForwardIt1 b, ForwardIt1 e, offset_t offset) {
        std::transform(b, e, local_d_range.begin() + offset, op);
      });
}

//
// process a local input-portion into a remote portion (i.e., located at a
// single locality) of a block-contiguous output iterator. Output is written in
// an RMA fashion.
//
// buffer for RMA arguments
template <typename ForwardIt>
struct gen_args_t {
  static constexpr size_t buf_size =
      (2 << 10) / sizeof(typename ForwardIt::value_type);
  typename ForwardIt::value_type buf[buf_size];
  ForwardIt w_first;
  size_t size;
};

// asynchronous, attached to a caller-provided handle.
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void async_block_contiguous_remote(rt::Locality l, rt::Handle& h,
                                   ForwardIt1 first, ForwardIt1 last,
                                   ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits2 = distributed_iterator_traits<ForwardIt2>;
  using args_t = gen_args_t<ForwardIt2>;

  // remote assign
  std::shared_ptr<uint8_t> args_buf(new uint8_t[sizeof(args_t)],
                                    std::default_delete<uint8_t[]>());
  auto typed_args_buf = reinterpret_cast<args_t*>(args_buf.get());
  auto block_last = first;
  while (first != last) {
    typed_args_buf->w_first = d_first;
    typed_args_buf->size = std::min(
        args_t::buf_size, static_cast<size_t>(std::distance(first, last)));
    std::advance(block_last, typed_args_buf->size);
    std::transform(first, block_last, typed_args_buf->buf, op);
    rt::asyncExecuteAt(
        h, l,
        [](rt::Handle&, const uint8_t* args_buf, const uint32_t) {
          const args_t& args = *reinterpret_cast<const args_t*>(args_buf);
          using val_t = typename ForwardIt2::value_type;
          ForwardIt2 w_last = args.w_first;
          std::advance(w_last, args.size);
          auto w_range = itr_traits2::local_range(args.w_first, w_last);
          std::memcpy(w_range.begin(), args.buf, sizeof(val_t) * args.size);
        },
        args_buf, sizeof(args_t));
    std::advance(first, typed_args_buf->size);
    std::advance(d_first, typed_args_buf->size);
  }
}

// synchronous
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void block_contiguous_remote(rt::Locality l, ForwardIt1 first, ForwardIt1 last,
                             ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits2 = distributed_iterator_traits<ForwardIt2>;
  using args_t = gen_args_t<ForwardIt2>;

  // remote assign
  std::shared_ptr<uint8_t> args_buf(new uint8_t[sizeof(args_t)],
                                    std::default_delete<uint8_t[]>());
  auto typed_args_buf = reinterpret_cast<args_t*>(args_buf.get());
  auto block_last = first;
  while (first != last) {
    typed_args_buf->w_first = d_first;
    typed_args_buf->size = std::min(
        args_t::buf_size, static_cast<size_t>(std::distance(first, last)));
    std::advance(block_last, typed_args_buf->size);
    std::transform(first, block_last, typed_args_buf->buf, op);
    rt::executeAt(
        l,
        [](const uint8_t* args_buf, const uint32_t) {
          const args_t& args = *reinterpret_cast<const args_t*>(args_buf);
          using val_t = typename ForwardIt2::value_type;
          ForwardIt2 w_last = args.w_first;
          std::advance(w_last, args.size);
          auto w_range = itr_traits2::local_range(args.w_first, w_last);
          std::memcpy(w_range.begin(), args.buf, sizeof(val_t) * args.size);
        },
        args_buf, sizeof(args_t));
    std::advance(first, typed_args_buf->size);
    std::advance(d_first, typed_args_buf->size);
  }
}

//
// kernels for block-contiguous output-iterators
//
// TODO(droccom) improve in-node parallelism with patterns
//
// sequential
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dseq_kernel(std::true_type, ForwardIt1 first, ForwardIt1 last,
                       ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
  using itr_traits2 = distributed_random_access_iterator_trait<ForwardIt2>;
  auto loc_range = itr_traits1::local_range(first, last);
  auto loc_first = loc_range.begin();
  auto d_last = d_first;
  std::advance(d_last, std::distance(loc_first, loc_range.end()));
  auto dmap = itr_traits2::distribution(d_first, d_last);
  auto loc_last = loc_first;
  for (auto i : dmap) {
    auto l = i.first;
    std::advance(loc_last, i.second);
    if (rt::thisLocality() == l)
      block_contiguous_local(loc_first, loc_last, d_first, op);
    else {
      block_contiguous_remote(l, loc_first, loc_last, d_first, op);
    }
    std::advance(loc_first, i.second);
    std::advance(d_first, i.second);
  }
  return d_last;
}

// parallel
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dpar_kernel(std::true_type, ForwardIt1 first, ForwardIt1 last,
                       ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
  using itr_traits2 = distributed_random_access_iterator_trait<ForwardIt2>;
  auto loc_range = itr_traits1::local_range(first, last);
  auto loc_first = loc_range.begin();
  auto first_ = itr_traits1::iterator_from_local(first, last, loc_first);
  std::advance(d_first, std::distance(first, first_));
  auto d_last = d_first;
  std::advance(d_last, std::distance(loc_first, loc_range.end()));
  auto dmap = itr_traits2::distribution(d_first, d_last);
  auto loc_last = loc_first;
  auto coloc_first = loc_first, coloc_last = loc_first;
  auto coloc_d_first = d_first;

  // create remote tasks
  rt::Handle h;
  for (auto i : dmap) {
    auto l = i.first;
    std::advance(loc_last, i.second);
    if (rt::thisLocality() == l) {
      coloc_first = loc_first;
      coloc_last = loc_last;
      coloc_d_first = d_first;
    } else
      async_block_contiguous_remote(l, h, loc_first, loc_last, d_first, op);
    std::advance(loc_first, i.second);
    std::advance(d_first, i.second);
  }

  // process local portion
  if (coloc_first != coloc_last)
    block_contiguous_local_par(coloc_first, coloc_last, coloc_d_first, op);

  // join
  rt::waitForCompletion(h);

  return d_last;
}

//
// kernels for non-block-contiguous output-iterators
//
// sequential
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dseq_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
                       ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
  auto local_range = itr_traits1::local_range(first, last);
  auto begin = local_range.begin();
  auto end = local_range.end();
  auto res = std::transform(begin, end, d_first, op);
  wait_iterator(res);
  flush_iterator(res);
  return res;
}

// parallel
// TODO(droccom) in-node parallelism
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dpar_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
                       ForwardIt2 d_first, UnaryOperation op) {
  using itr_traits = distributed_iterator_traits<ForwardIt1>;
  using local_iterator_t = typename itr_traits::local_iterator_type;

  // local map
  auto lrange = itr_traits::local_range(first, last);

  auto map_res = local_map_init(
      // range
      lrange.begin(), lrange.end(),
      // kernel
      [&](local_iterator_t b, local_iterator_t e) {
        auto res = std::transform(b, e, d_first, op);
        wait_iterator(res);
        return res;
      },
      // init value
      d_first);

  // local reduce
  flush_iterator(map_res.back());
  return map_res.back();
}

//
// dispatchers based on whether the output-iterator is block-contiguous (e.g.,
// array) or not (e.g., set)
//
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dseq_kernel(ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first,
                       UnaryOperation op) {
  return dseq_kernel(is_block_contiguous<ForwardIt2>::value, first, last,
                     d_first, op);
}

template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 dpar_kernel(ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first,
                       UnaryOperation op) {
  return dpar_kernel(is_block_contiguous<ForwardIt2>::value, first, last,
                     d_first, op);
}

}  // namespace transform_impl

template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 transform(distributed_parallel_tag&& policy, ForwardIt1 first1,
                     ForwardIt1 last1, ForwardIt2 d_first,
                     UnaryOperation unary_op) {
  using itr_traits = distributed_iterator_traits<ForwardIt1>;

  // distributed map
  auto map_res = distributed_map_init(
      // range
      first1, last1,
      // kernel
      [](ForwardIt1 first1, ForwardIt1 last1, ForwardIt2 d_first,
         UnaryOperation unary_op) {
        return transform_impl::dpar_kernel(first1, last1, d_first, unary_op);
      },
      // init value
      d_first,
      // map arguments
      d_first, unary_op);

  // reduce
  return map_res.back();
}

template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 transform(distributed_sequential_tag&& policy, ForwardIt1 first1,
                     ForwardIt1 last1, ForwardIt2 d_first,
                     UnaryOperation unary_op) {
  using itr_traits = distributed_iterator_traits<ForwardIt1>;

  return distributed_folding_map(
      // range
      first1, last1,
      // kernel
      [](ForwardIt1 first1, ForwardIt1 last1, ForwardIt2 d_first,
         UnaryOperation unary_op) {
        // local processing
        auto lrange = itr_traits::local_range(first1, last1);
        auto local_res =
            transform_impl::dseq_kernel(first1, last1, d_first, unary_op);
        // update the partial solution
        return local_res;
      },
      // initial solution
      d_first,
      // map arguments
      unary_op);
}

template <typename ForwardIt, typename Generator>
void generate(distributed_parallel_tag&& policy, ForwardIt first,
              ForwardIt last, Generator generator) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  // distributed map
  distributed_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, Generator generator) {
        using local_iterator_t = typename itr_traits::local_iterator_type;

        // local map
        auto lrange = itr_traits::local_range(first, last);

        local_map_void(
            // range
            lrange.begin(), lrange.end(),
            // kernel
            [&](local_iterator_t b, local_iterator_t e) {
              std::generate(b, e, generator);
            });
      },
      // map arguments
      generator);
}

template <typename ForwardIt, typename Generator>
void generate(distributed_sequential_tag&& policy, ForwardIt first,
              ForwardIt last, Generator generator) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  distributed_folding_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, Generator generator) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        std::generate(lrange.begin(), lrange.end(), generator);
      },
      // map arguments
      generator);
}

template <typename ForwardIt, typename T>
void replace(distributed_parallel_tag&& policy, ForwardIt first, ForwardIt last,
             const T& old_value, const T& new_value) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  // distributed map
  distributed_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, const T& old_value,
         const T& new_value) {
        using local_iterator_t = typename itr_traits::local_iterator_type;

        // local map
        auto lrange = itr_traits::local_range(first, last);

        local_map_void(
            // range
            lrange.begin(), lrange.end(),
            // kernel
            [&](local_iterator_t b, local_iterator_t e) {
              std::replace(b, e, old_value, new_value);
            });
      },
      // map arguments
      old_value, new_value);
}

template <typename ForwardIt, typename T>
void replace(distributed_sequential_tag&& policy, ForwardIt first,
             ForwardIt last, const T& old_value, const T& new_value) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  distributed_folding_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, const T& old_value,
         const T& new_value) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        std::replace(lrange.begin(), lrange.end(), old_value, new_value);
      },
      // map arguments
      old_value, new_value);
}

template <typename ForwardIt, typename UnaryPredicate, typename T>
void replace_if(distributed_parallel_tag&& policy, ForwardIt first,
                ForwardIt last, UnaryPredicate p, const T& new_value) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  // distributed map
  distributed_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, UnaryPredicate p,
         const T& new_value) {
        using local_iterator_t = typename itr_traits::local_iterator_type;

        // local map
        auto lrange = itr_traits::local_range(first, last);

        local_map_void(
            // range
            lrange.begin(), lrange.end(),
            // kernel
            [&](local_iterator_t b, local_iterator_t e) {
              std::replace_if(b, e, p, new_value);
            });
      },
      // map arguments
      p, new_value);
}

template <typename ForwardIt, typename UnaryPredicate, typename T>
void replace_if(distributed_sequential_tag&& policy, ForwardIt first,
                ForwardIt last, UnaryPredicate p, const T& new_value) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;

  distributed_folding_map_void(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, UnaryPredicate p,
         const T& new_value) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        std::replace_if(lrange.begin(), lrange.end(), p, new_value);
      },
      // map arguments
      p, new_value);
}

}  // namespace impl
}  // namespace shad

#endif /* INCLUDE_SHAD_CORE_IMPL_MODIFYING_SEQUENCE_OPS_H */