Program Listing for File numeric_ops.h

Return to documentation for file (include/shad/core/impl/numeric_ops.h)

//===------------------------------------------------------------*- C++ -*-===//
//
//                                     SHAD
//
//      The Scalable High-performance Algorithms and Data Structure Library
//
//===----------------------------------------------------------------------===//
//
// Copyright 2018 Battelle Memorial Institute
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy
// of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
//
//===----------------------------------------------------------------------===//

#ifndef INCLUDE_SHAD_CORE_IMPL_NUMERIC_OPS_H
#define INCLUDE_SHAD_CORE_IMPL_NUMERIC_OPS_H

#include <algorithm>
#include <functional>
#include <iterator>
#include <numeric>

#include "shad/core/execution.h"
#include "shad/distributed_iterator_traits.h"
#include "shad/runtime/runtime.h"

#include "impl_patterns.h"

namespace shad {
namespace impl {

template <typename ForwardIterator, typename T>
void iota(ForwardIterator first, ForwardIterator last, const T& value) {
  using itr_traits = distributed_iterator_traits<ForwardIterator>;

  distributed_folding_map(
      // range
      first, last,
      // kernel
      [](ForwardIterator first, ForwardIterator last, T res) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        for (auto it = lrange.begin(); it != lrange.end(); ++it) {
          *it = res++;
        }
        // update the partial solution
        return res;
      },
      // initial solution
      value);
}

template <class InputIt, class T, class BinaryOperation>
T accumulate(InputIt first, InputIt last, T init, BinaryOperation op) {
  using itr_traits = distributed_iterator_traits<InputIt>;

  return distributed_folding_map(
      // range
      first, last,
      // kernel
      [](InputIt first, InputIt last, T res, BinaryOperation op) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        res = std::accumulate(lrange.begin(), lrange.end(), res, op);
        // update the partial solution
        return res;
      },
      // initial solution
      init,
      // map arguments
      op);
}

template <class InputIt1, class InputIt2, class T>
T inner_product(InputIt1 first1, InputIt1 last1, InputIt2 first2, T init) {
  using itr_traits = distributed_iterator_traits<InputIt1>;
  auto localities = itr_traits::localities(first1, last1);
  auto args = std::make_pair(first2, init);
  for (auto locality = localities.begin(), end = localities.end();
       locality != end; ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt1, InputIt1, std::pair<InputIt2, T>>& args,
           std::pair<InputIt2, T>* result) {
          auto first2 = std::get<2>(args).first;
          auto init = std::get<2>(args).second;
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          while (begin != end) {
            init = std::move(init) + *begin * *first2;
            ++begin;
            ++first2;
          }
          *result = std::make_pair(first2, init);
        },
        std::make_tuple(first1, last1, args), &args);
  }
  return args.second;
}

template <class InputIt1, class InputIt2, class T, class BinaryOperation1,
          class BinaryOperation2>
T inner_product(InputIt1 first1, InputIt1 last1, InputIt2 first2, T init,
                BinaryOperation1 op1, BinaryOperation2 op2) {
  using itr_traits = distributed_iterator_traits<InputIt1>;
  auto localities = itr_traits::localities(first1, last1);
  auto args = std::make_pair(first2, init);
  for (auto locality = localities.begin(), end = localities.end();
       locality != end; ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt1, InputIt1, std::pair<InputIt2, T>,
                            BinaryOperation1, BinaryOperation2>& args,
           std::pair<InputIt2, T>* result) {
          auto first2 = std::get<2>(args).first;
          auto init = std::get<2>(args).second;
          auto op1 = std::get<3>(args);
          auto op2 = std::get<4>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          while (begin != end) {
            init = op1(std::move(init), op2(*begin, *first2));
            ++begin;
            ++first2;
          }
          *result = std::make_pair(first2, init);
        },
        std::make_tuple(first1, last1, args, op1, op2), &args);
  }
  return args.second;
}

template <class InputIt, class OutputIt, class BinaryOperation>
OutputIt adjacent_difference(distributed_sequential_tag&&, InputIt first,
                             InputIt last, OutputIt d_first,
                             BinaryOperation op) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  using value_t = typename itr_traits::value_type;
  auto startingLoc = localities.begin();
  value_t acc;
  auto res = std::make_pair(d_first, acc);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, rt::Locality, value_t,
                            BinaryOperation>& args,
           std::pair<OutputIt, value_t>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<4>(args));
            return;
          }
          BinaryOperation op = std::get<5>(args);
          value_t acc = *begin;
          if (rt::thisLocality() == std::get<3>(args)) {
            *d_first = acc;
          } else {
            *d_first = op(acc, std::get<4>(args));
          }
          while (++begin != end) {
            value_t val = *begin;
            *++d_first = val - std::move(acc);
            acc = std::move(val);
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, startingLoc, res.second, op),
        &res);
  }
  return d_first;
}

template <class InputIt, class OutputIt, class BinaryOperation>
OutputIt adjacent_difference(distributed_parallel_tag&& policy, InputIt first,
                             InputIt last, OutputIt d_first,
                             BinaryOperation op) {
  if (first == last) return d_first;
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  using value_t = typename itr_traits::value_type;
  auto startingLoc = localities.begin();
  value_t acc;
  uint32_t numLoc = localities.size();
  std::vector<OutputIt> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, rt::Locality,
                            BinaryOperation>& args,
           OutputIt* result) {
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto d_first = std::get<2>(args);
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = d_first;
            return;
          }
          value_t acc = *begin;
          BinaryOperation op = std::get<4>(args);
          if (rt::thisLocality() == std::get<3>(args)) {
            *d_first = acc;
          } else {
            auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
            --it;
            std::advance(d_first, (std::distance(gbegin, it)));
            value_t val = *d_first;
            *d_first = op(*begin, *it);
          }
          while (++begin != end) {
            value_t val = *begin;
            *++d_first = op(val, std::move(acc));
            acc = std::move(val);
          }
          *result = ++d_first;
        },
        std::make_tuple(first, last, d_first, startingLoc, op), &res[i]);
  }
  rt::waitForCompletion(h);
  return res[numLoc - 1];
}

template <class InputIt, class OutputIt, class BinaryOperation>
OutputIt partial_sum(InputIt first, InputIt last, OutputIt d_first,
                     BinaryOperation op) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  using value_t = typename itr_traits::value_type;
  auto startingLoc = localities.begin();
  value_t acc;
  auto res = std::make_pair(d_first, acc);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, rt::Locality, value_t,
                            BinaryOperation>& args,
           std::pair<OutputIt, value_t>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<4>(args));
            return;
          }
          BinaryOperation op = std::get<5>(args);
          value_t acc = *begin;
          if (rt::thisLocality() != std::get<3>(args)) {
            acc = op(std::move(acc), std::get<4>(args));
          }
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), *begin);
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, startingLoc, res.second, op),
        &res);
  }
  return d_first;
}

template <class InputIt, class T, class BinaryOperation>
T reduce(distributed_sequential_tag&& policy, InputIt first, InputIt last,
         T init, BinaryOperation op) {
  return impl::accumulate(first, last, init, op);
}

template <class InputIt, class T, class BinaryOperation>
T reduce(distributed_parallel_tag&& policy, InputIt first, InputIt last, T init,
         BinaryOperation op) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  static_assert(std::is_default_constructible<T>::value,
                "reduce requires DefaultConstructible value type");

  // distributed map
  auto map_res = distributed_map(
      // range
      first, last,
      // kernel
      [](InputIt first, InputIt last, BinaryOperation op) {
        using local_iterator_t = typename itr_traits::local_iterator_type;

        // local map
        auto lrange = itr_traits::local_range(first, last);
        auto map_res = local_map(
            // range
            lrange.begin(), lrange.end(),
            // kernel
            [&](local_iterator_t b, local_iterator_t e) {
              auto res = *b;
              while (++b != e) res = op(std::move(res), *b);
              return res;
            });

        // local reduce
        auto b = map_res.begin(), e = map_res.end();
        T res;
        if (b != e) {
          res = *b++;
          res = std::accumulate(b, e, std::move(res), op);
        }
        return res;
      },
      // map arguments
      op);

  // TODO parallel
  // reduce
  return std::accumulate(map_res.begin(), map_res.end(), init, op);
}

template <class InputIt, class OutputIt, class BinaryOperation, class T>
OutputIt exclusive_scan(distributed_sequential_tag&& policy, InputIt first,
                        InputIt last, OutputIt d_first, BinaryOperation op,
                        T init) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  auto res = std::make_pair(d_first, init);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, T, BinaryOperation>&
               args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<3>(args));
            return;
          }
          BinaryOperation op = std::get<4>(args);
          T acc = std::get<3>(args);
          *d_first = acc;
          acc = op(std::move(acc), *begin);
          while (++begin != end) {
            *++d_first = acc;
            acc = op(std::move(acc), *begin);
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, res.second, op), &res);
  }
  return d_first;
}

template <class InputIt, class OutputIt, class BinaryOperation, class T>
OutputIt exclusive_scan(distributed_parallel_tag&& policy, InputIt first,
                        InputIt last, OutputIt d_first, BinaryOperation op,
                        T init) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  uint32_t numLoc = localities.size();
  std::vector<std::pair<OutputIt, T>> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, BinaryOperation>& args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto df = d_first;
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto dist = std::distance(gbegin, it);
          std::advance(d_first, dist);
          if (begin == end) {
            *result = std::make_pair(d_first, T{});
            return;
          }
          BinaryOperation op = std::get<3>(args);
          T acc = *begin;
          *d_first = acc;
          while (++begin != end) {
            *++d_first = acc;
            acc = op(std::move(acc), *begin);
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, d_first, op), &res[i]);
  }
  rt::waitForCompletion(h);
  auto d_f = d_first;
  auto acc = init;
  OutputIt chunk_end = d_first;
  using outitr_traits = distributed_iterator_traits<OutputIt>;
  for (i = 0; i < numLoc; ++i) {
    chunk_end = res[i].first;
    auto d_localities = outitr_traits::localities(d_f, chunk_end);
    auto d_startingLoc = d_localities.begin();
    for (auto locality = d_startingLoc, end = d_localities.end();
         locality != end; ++locality) {
      rt::asyncExecuteAt(
          h, locality,
          [](rt::Handle&,
             const std::tuple<OutputIt, OutputIt, BinaryOperation, T>& args) {
            auto gbegin = std::get<0>(args);
            auto gend = std::get<1>(args);
            auto local_range = outitr_traits::local_range(std::get<0>(args),
                                                          std::get<1>(args));
            auto begin = local_range.begin();
            auto end = local_range.end();
            if (begin == end) return;
            BinaryOperation op = std::get<2>(args);
            auto acc = std::get<3>(args);
            *begin = acc;
            for (++begin; begin != end; ++begin) {
              *begin = op(std::move(acc), *begin);
            }
          },
          std::make_tuple(d_f, chunk_end, op, acc));
    }
    d_f = chunk_end;
    acc = op(std::move(acc), res[i].second);
  }
  rt::waitForCompletion(h);
  return chunk_end;
}

template <class InputIt, class OutputIt, class BinaryOperation>
OutputIt inclusive_scan(distributed_sequential_tag&& policy, InputIt first,
                        InputIt last, OutputIt d_first, BinaryOperation op) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  using value_t = typename itr_traits::value_type;
  value_t acc;
  auto res = std::make_pair(d_first, acc);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, rt::Locality, value_t,
                            BinaryOperation>& args,
           std::pair<OutputIt, value_t>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<4>(args));
            return;
          }
          BinaryOperation op = std::get<5>(args);
          value_t acc = *begin;
          if (rt::thisLocality() == std::get<3>(args)) {
            *d_first = acc;
          } else {
            acc = op(std::move(acc), std::get<4>(args));
            *d_first = acc;
          }
          while (++begin != end) {
            acc = op(std::move(acc), *begin);
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, startingLoc, res.second, op),
        &res);
  }
  return d_first;
}

template <class InputIt, class OutputIt, class BinaryOperation>
OutputIt inclusive_scan(distributed_parallel_tag&& policy, InputIt first,
                        InputIt last, OutputIt d_first, BinaryOperation op) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  using value_t = typename itr_traits::value_type;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  uint32_t numLoc = localities.size();
  if (numLoc == 0) {
    return d_first;
  }
  std::vector<std::pair<OutputIt, value_t>> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, BinaryOperation>& args,
           std::pair<OutputIt, value_t>* result) {
          auto d_first = std::get<2>(args);
          auto df = d_first;
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto dist = std::distance(gbegin, it);
          std::advance(d_first, dist);
          if (begin == end) {
            *result = std::make_pair(d_first, value_t{});
            return;
          }
          BinaryOperation op = std::get<3>(args);
          value_t acc = *begin;
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), *begin);
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, d_first, op), &res[i]);
  }
  rt::waitForCompletion(h);
  auto d_f = res[0].first;
  value_t acc = res[0].second;
  OutputIt chunk_end = d_first;
  for (i = 1; i < numLoc; ++i) {
    chunk_end = res[i].first;
    auto d_localities = itr_traits::localities(d_f, chunk_end);
    auto d_startingLoc = d_localities.begin();
    for (auto locality = d_startingLoc, end = d_localities.end();
         locality != end; ++locality) {
      rt::asyncExecuteAt(
          h, locality,
          [](rt::Handle&, const std::tuple<OutputIt, OutputIt, BinaryOperation,
                                           value_t>& args) {
            auto gbegin = std::get<0>(args);
            auto gend = std::get<1>(args);
            auto local_range =
                itr_traits::local_range(std::get<0>(args), std::get<1>(args));
            auto begin = local_range.begin();
            auto end = local_range.end();
            BinaryOperation op = std::get<2>(args);
            auto acc = std::get<3>(args);
            for (auto it = begin; it != end; ++it) {
              *it = op(*it, std::move(acc));
            }
          },
          std::make_tuple(d_f, chunk_end, op, acc));
    }
    d_f = chunk_end;
    acc = op(std::move(acc), res[i].second);
  }
  rt::waitForCompletion(h);
  return chunk_end;
}

template <class InputIt, class OutputIt, class BinaryOperation, class T>
OutputIt inclusive_scan(distributed_sequential_tag&& policy, InputIt first,
                        InputIt last, OutputIt d_first, BinaryOperation op,
                        T init) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  auto res = std::make_pair(d_first, init);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, T, BinaryOperation>&
               args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<3>(args));
            return;
          }
          BinaryOperation op = std::get<4>(args);
          T acc = op(std::get<3>(args), *begin);
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), *begin);
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, res.second, op), &res);
  }
  return d_first;
}

template <class InputIt, class OutputIt, class BinaryOperation, class T>
OutputIt inclusive_scan(distributed_parallel_tag&& policy, InputIt first,
                        InputIt last, OutputIt d_first, BinaryOperation op,
                        T init) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  uint32_t numLoc = localities.size();
  std::vector<std::pair<OutputIt, T>> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, BinaryOperation>& args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto df = d_first;
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto dist = std::distance(gbegin, it);
          std::advance(d_first, dist);
          if (begin == end) {
            *result = std::make_pair(d_first, T{});
            return;
          }
          BinaryOperation op = std::get<3>(args);
          T acc = *begin;
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), *begin);
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, d_first, op), &res[i]);
  }
  rt::waitForCompletion(h);
  auto d_f = d_first;
  auto acc = init;
  OutputIt chunk_end = d_first;
  using outitr_traits = distributed_iterator_traits<OutputIt>;
  for (i = 0; i < numLoc; ++i) {
    chunk_end = res[i].first;
    auto d_localities = outitr_traits::localities(d_f, chunk_end);
    auto d_startingLoc = d_localities.begin();
    for (auto locality = d_startingLoc, end = d_localities.end();
         locality != end; ++locality) {
      rt::asyncExecuteAt(
          h, locality,
          [](rt::Handle&,
             const std::tuple<OutputIt, OutputIt, BinaryOperation, T>& args) {
            auto gbegin = std::get<0>(args);
            auto gend = std::get<1>(args);
            auto local_range =
                itr_traits::local_range(std::get<0>(args), std::get<1>(args));
            auto begin = local_range.begin();
            auto end = local_range.end();
            BinaryOperation op = std::get<2>(args);
            auto acc = std::get<3>(args);
            for (auto it = begin; it != end; ++it) {
              *it = op(std::move(acc), *it);
            }
          },
          std::make_tuple(d_f, chunk_end, op, acc));
    }
    d_f = chunk_end;
    acc = op(std::move(acc), res[i].second);
  }
  rt::waitForCompletion(h);
  return chunk_end;
}

//
// transform_reduce
//
// single range - sequential
template <class ForwardIt, class T, class BinaryOp, class UnaryOp>
T transform_reduce(distributed_sequential_tag&& policy, ForwardIt first,
                   ForwardIt last, T init, BinaryOp op, UnaryOp uop) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;
  return distributed_folding_map(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, T res, BinaryOp op, UnaryOp uop) {
        // local processing
        auto lrange = itr_traits::local_range(first, last);
        for (auto b = lrange.begin(); b != lrange.end(); ++b) {
          res = op(std::move(res), uop(*b));
        }
        // update the partial solution
        return res;
      },
      // initial solution
      init,
      // map arguments
      op, uop);
}

// single range - parallel
template <class ForwardIt, class T, class BinaryOp, class UnaryOp>
T transform_reduce(distributed_parallel_tag&& policy, ForwardIt first,
                   ForwardIt last, T init, BinaryOp op, UnaryOp uop) {
  using itr_traits = distributed_iterator_traits<ForwardIt>;
  static_assert(
      std::is_default_constructible<T>::value,
      "transform_reduce requires DefaultConstructible transformed value type");

  // distributed map
  auto map_res = distributed_map(
      // range
      first, last,
      // kernel
      [](ForwardIt first, ForwardIt last, BinaryOp op, UnaryOp uop) {
        using local_iterator_t = typename itr_traits::local_iterator_type;

        // local map
        auto lrange = itr_traits::local_range(first, last);
        auto map_res = local_map(
            // range
            lrange.begin(), lrange.end(),
            // kernel
            [&](local_iterator_t b, local_iterator_t e) {
              auto res = uop(*b++);
              for (; b != e; b++) res = op(std::move(res), uop(*b));
              return res;
            });

        // local reduce
        auto b = map_res.begin(), e = map_res.end();
        T res{};
        if (b != e) {
          res = *b++;
          res = std::accumulate(b, e, std::move(res), op);
        }
        return res;
      },
      // map arguments
      op, uop);

  // TODO parallel
  // reduce
  return std::accumulate(map_res.begin(), map_res.end(), init, op);
}

// two ranges - sequential
template <class ForwardIt1, class ForwardIt2, class T, class BinaryOp1,
          class BinaryOp2>
T transform_reduce(distributed_sequential_tag&& policy, ForwardIt1 first1,
                   ForwardIt1 last1, ForwardIt2 first2, T init, BinaryOp1 op1,
                   BinaryOp2 op2) {
  using itr_traits = distributed_iterator_traits<ForwardIt1>;
  auto localities = itr_traits::localities(first1, last1);
  std::pair<ForwardIt2, T> res = std::make_pair(first2, init);
  for (auto locality = localities.begin(), end = localities.end();
       locality != end; ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<ForwardIt1, ForwardIt1, ForwardIt2, T, BinaryOp1,
                            BinaryOp2>& args,
           std::pair<ForwardIt2, T>* result) {
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto first2 = std::get<2>(args);
          auto init = std::get<3>(args);
          auto op = std::get<4>(args);
          auto op2 = std::get<5>(args);
          for (; begin != end; ++begin, ++first2) {
            init = op(std::move(init), op2(*begin, *first2));
          }
          *result = std::make_pair(first2, init);
        },
        std::make_tuple(first1, last1, res.first, res.second, op1, op2), &res);
  }
  return res.second;
}

// two ranges - parallel
template <class ForwardIt1, class ForwardIt2, class T, class BinaryOp1,
          class BinaryOp2>
T transform_reduce(distributed_parallel_tag&& policy, ForwardIt1 first1,
                   ForwardIt1 last1, ForwardIt2 first2, T init, BinaryOp1 op1,
                   BinaryOp2 op2) {
  using itr_traits = distributed_iterator_traits<ForwardIt1>;
  auto localities = itr_traits::localities(first1, last1);
  rt::Handle h;
  std::vector<T> results(localities.size());
  size_t i = 0;
  for (auto locality = localities.begin(), end = localities.end();
       locality != end; ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle& h,
           const std::tuple<ForwardIt1, ForwardIt1, ForwardIt2, BinaryOp1,
                            BinaryOp2>& args,
           T* result) {
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto first2 = std::get<2>(args);
          size_t dist = std::distance(gbegin, it);
          std::advance(first2, dist);
          auto op1 = std::get<3>(args);
          auto op2 = std::get<4>(args);
          T acc = op2(*begin, *first2);
          while (++begin != end) {
            acc = op1(std::move(acc), op2(*begin, *(++first2)));
          }
          *result = acc;
        },
        std::make_tuple(first1, last1, first2, op1, op2), &results[i]);
  }
  rt::waitForCompletion(h);
  for (auto lval : results) {
    init = op1(std::move(init), lval);
  }
  return init;
}

template <class InputIt, class OutputIt, class T, class BinaryOperation,
          class UnaryOperation>
OutputIt transform_exclusive_scan(distributed_sequential_tag&& policy,
                                  InputIt first, InputIt last, OutputIt d_first,
                                  T init, BinaryOperation op,
                                  UnaryOperation uop) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  auto res = std::make_pair(d_first, init);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, T, BinaryOperation,
                            UnaryOperation>& args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<3>(args));
            return;
          }
          T acc = std::get<3>(args);
          auto op = std::get<4>(args);
          auto uop = std::get<5>(args);
          *d_first = acc;
          acc = op(std::move(acc), uop(*begin));
          while (++begin != end) {
            *++d_first = acc;
            acc = op(std::move(acc), uop(*begin));
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, res.second, op, uop), &res);
  }
  return res.first;
}

template <class InputIt, class OutputIt, class T, class BinaryOperation,
          class UnaryOperation>
OutputIt transform_exclusive_scan(distributed_parallel_tag&& policy,
                                  InputIt first, InputIt last, OutputIt d_first,
                                  T init, BinaryOperation op,
                                  UnaryOperation uop) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  uint32_t numLoc = localities.size();
  std::vector<std::pair<OutputIt, T>> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, BinaryOperation,
                            UnaryOperation>& args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto df = d_first;
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto dist = std::distance(gbegin, it);
          std::advance(d_first, dist);
          if (begin == end) {
            *result = std::make_pair(d_first, T{});
            return;
          }
          auto op = std::get<3>(args);
          auto uop = std::get<4>(args);
          T acc = *begin;
          *d_first = acc;
          acc = uop(acc);
          while (++begin != end) {
            *++d_first = acc;
            acc = op(std::move(acc), uop(*begin));
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, d_first, op, uop), &res[i]);
  }
  rt::waitForCompletion(h);
  auto d_f = d_first;
  auto acc = init;
  OutputIt chunk_end = d_first;
  using outitr_traits = distributed_iterator_traits<OutputIt>;
  for (i = 0; i < numLoc; ++i) {
    chunk_end = res[i].first;
    auto d_localities = outitr_traits::localities(d_f, chunk_end);
    auto d_startingLoc = d_localities.begin();
    for (auto locality = d_startingLoc, end = d_localities.end();
         locality != end; ++locality) {
      rt::asyncExecuteAt(
          h, locality,
          [](rt::Handle&,
             const std::tuple<OutputIt, OutputIt, BinaryOperation, T>& args) {
            auto gbegin = std::get<0>(args);
            auto gend = std::get<1>(args);
            auto local_range =
                itr_traits::local_range(std::get<0>(args), std::get<1>(args));
            auto begin = local_range.begin();
            auto end = local_range.end();
            if (begin == end) return;
            BinaryOperation op = std::get<2>(args);
            auto acc = std::get<3>(args);
            *begin = acc;
            for (++begin; begin != end; ++begin) {
              *begin = op(std::move(acc), *begin);
            }
          },
          std::make_tuple(d_f, chunk_end, op, acc));
    }
    d_f = chunk_end;
    acc = op(std::move(acc), res[i].second);
  }
  rt::waitForCompletion(h);
  return chunk_end;
}

template <class InputIt, class OutputIt, class BinaryOperation,
          class UnaryOperation>
OutputIt transform_inclusive_scan(distributed_sequential_tag&& policy,
                                  InputIt first, InputIt last, OutputIt d_first,
                                  BinaryOperation op, UnaryOperation uop) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  using value_t = typename itr_traits::value_type;
  value_t acc;
  auto res = std::make_pair(d_first, acc);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, rt::Locality, value_t,
                            BinaryOperation, UnaryOperation>& args,
           std::pair<OutputIt, value_t>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<4>(args));
            return;
          }
          BinaryOperation op = std::get<5>(args);
          UnaryOperation uop = std::get<6>(args);
          value_t acc = uop(*begin);
          if (rt::thisLocality() == std::get<3>(args)) {
            *d_first = acc;
          } else {
            acc = op(std::move(acc), std::get<4>(args));
            *d_first = acc;
          }
          while (++begin != end) {
            acc = op(std::move(acc), uop(*begin));
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, startingLoc, res.second, op,
                        uop),
        &res);
  }
  return res.first;
}

template <class InputIt, class OutputIt, class BinaryOperation,
          class UnaryOperation>
OutputIt transform_inclusive_scan(distributed_parallel_tag&& policy,
                                  InputIt first, InputIt last, OutputIt d_first,
                                  BinaryOperation op, UnaryOperation uop) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  using value_t = typename itr_traits::value_type;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  uint32_t numLoc = localities.size();
  if (numLoc == 0) {
    return d_first;
  }
  std::vector<std::pair<OutputIt, value_t>> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, BinaryOperation,
                            UnaryOperation>& args,
           std::pair<OutputIt, value_t>* result) {
          auto d_first = std::get<2>(args);
          auto df = d_first;
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto dist = std::distance(gbegin, it);
          std::advance(d_first, dist);
          if (begin == end) {
            *result = std::make_pair(d_first, value_t{});
            return;
          }
          BinaryOperation op = std::get<3>(args);
          UnaryOperation uop = std::get<4>(args);
          value_t acc = uop(*begin);
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), uop(*begin));
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, d_first, op, uop), &res[i]);
  }
  rt::waitForCompletion(h);
  auto d_f = res[0].first;
  value_t acc = res[0].second;
  OutputIt chunk_end = d_first;
  using outitr_traits = distributed_iterator_traits<OutputIt>;
  for (i = 1; i < numLoc; ++i) {
    chunk_end = res[i].first;
    auto d_localities = outitr_traits::localities(d_f, chunk_end);
    auto d_startingLoc = d_localities.begin();
    for (auto locality = d_startingLoc, end = d_localities.end();
         locality != end; ++locality) {
      rt::asyncExecuteAt(
          h, locality,
          [](rt::Handle&, const std::tuple<OutputIt, OutputIt, BinaryOperation,
                                           value_t>& args) {
            auto gbegin = std::get<0>(args);
            auto gend = std::get<1>(args);
            auto local_range =
                itr_traits::local_range(std::get<0>(args), std::get<1>(args));
            auto begin = local_range.begin();
            auto end = local_range.end();
            BinaryOperation op = std::get<2>(args);
            auto acc = std::get<3>(args);
            for (auto it = begin; it != end; ++it) {
              *it = op(std::move(acc), *it);
            }
          },
          std::make_tuple(d_f, chunk_end, op, acc));
    }
    d_f = chunk_end;
    acc = op(std::move(acc), res[i].second);
  }
  rt::waitForCompletion(h);
  return chunk_end;
}

template <class InputIt, class OutputIt, class BinaryOperation,
          class UnaryOperation, class T>
OutputIt transform_inclusive_scan(distributed_sequential_tag&& policy,
                                  InputIt first, InputIt last, OutputIt d_first,
                                  BinaryOperation op, UnaryOperation uop,
                                  T init) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  auto res = std::make_pair(d_first, init);
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality) {
    rt::executeAtWithRet(
        locality,
        [](const std::tuple<InputIt, InputIt, OutputIt, T, BinaryOperation,
                            UnaryOperation>& args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto local_range =
              itr_traits::local_range(std::get<0>(args), std::get<1>(args));
          auto begin = local_range.begin();
          auto end = local_range.end();
          if (begin == end) {
            *result = std::make_pair(d_first, std::get<3>(args));
            return;
          }
          BinaryOperation op = std::get<4>(args);
          UnaryOperation uop = std::get<5>(args);
          T acc = op(std::get<3>(args), uop(*begin));
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), uop(*begin));
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, res.first, res.second, op, uop), &res);
  }
  return res.first;
}

template <class InputIt, class OutputIt, class BinaryOperation,
          class UnaryOperation, class T>
OutputIt transform_inclusive_scan(distributed_parallel_tag&& policy,
                                  InputIt first, InputIt last, OutputIt d_first,
                                  BinaryOperation op, UnaryOperation uop,
                                  T init) {
  using itr_traits = distributed_iterator_traits<InputIt>;
  auto localities = itr_traits::localities(first, last);
  auto startingLoc = localities.begin();
  uint32_t numLoc = localities.size();
  if (numLoc == 0) {
    return d_first;
  }
  std::vector<std::pair<OutputIt, T>> res(numLoc);
  rt::Handle h;
  size_t i = 0;
  for (auto locality = startingLoc, end = localities.end(); locality != end;
       ++locality, ++i) {
    rt::asyncExecuteAtWithRet(
        h, locality,
        [](rt::Handle&,
           const std::tuple<InputIt, InputIt, OutputIt, BinaryOperation,
                            UnaryOperation>& args,
           std::pair<OutputIt, T>* result) {
          auto d_first = std::get<2>(args);
          auto df = d_first;
          auto gbegin = std::get<0>(args);
          auto gend = std::get<1>(args);
          auto local_range = itr_traits::local_range(gbegin, gend);
          auto begin = local_range.begin();
          auto end = local_range.end();
          auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
          auto dist = std::distance(gbegin, it);
          std::advance(d_first, dist);
          if (begin == end) {
            *result = std::make_pair(d_first, T{});
            return;
          }
          BinaryOperation op = std::get<3>(args);
          UnaryOperation uop = std::get<4>(args);
          T acc = uop(*begin);
          *d_first = acc;
          while (++begin != end) {
            acc = op(std::move(acc), uop(*begin));
            *++d_first = acc;
          }
          *result = std::make_pair(++d_first, acc);
        },
        std::make_tuple(first, last, d_first, op, uop), &res[i]);
  }
  rt::waitForCompletion(h);
  auto d_f = res[0].first;
  auto acc = res[0].second;
  OutputIt chunk_end = d_first;
  using outitr_traits = distributed_iterator_traits<OutputIt>;
  for (i = 1; i < numLoc; ++i) {
    chunk_end = res[i].first;
    auto d_localities = outitr_traits::localities(d_f, chunk_end);
    auto d_startingLoc = d_localities.begin();
    for (auto locality = d_startingLoc, end = d_localities.end();
         locality != end; ++locality) {
      rt::asyncExecuteAt(
          h, locality,
          [](rt::Handle&,
             const std::tuple<OutputIt, OutputIt, BinaryOperation, T>& args) {
            auto gbegin = std::get<0>(args);
            auto gend = std::get<1>(args);
            auto local_range =
                itr_traits::local_range(std::get<0>(args), std::get<1>(args));
            auto begin = local_range.begin();
            auto end = local_range.end();
            BinaryOperation op = std::get<2>(args);
            auto acc = std::get<3>(args);
            for (auto it = begin; it != end; ++it) {
              *it = op(std::move(acc), *it);
            }
          },
          std::make_tuple(d_f, chunk_end, op, acc));
    }
    d_f = chunk_end;
    acc = op(std::move(acc), res[i].second);
  }
  rt::waitForCompletion(h);
  return chunk_end;
}

}  // namespace impl
}  // namespace shad

#endif /* INCLUDE_SHAD_CORE_IMPL_NUMERIC_OPS_H */