Program Listing for File PcBuffer.h

Program Listing for File PcBuffer.h#

Return to documentation for file (src/buffer/PcBuffer.h)

#pragma once

/* toolchain */
#include <functional>

/* internal */
#include "../ContextLock.h"
#include "CircularBuffer.h"
#include "PcBufferReader.h"
#include "PcBufferState.h"
#include "PcBufferWriter.h"

namespace Coral
{

template <std::size_t depth, typename element_t = std::byte,
          std::size_t alignment = sizeof(element_t), class Lock = NoopLock>
class PcBuffer
    : public PcBufferWriter<PcBuffer<depth, element_t, alignment, Lock>,
                            element_t>,
      public PcBufferReader<PcBuffer<depth, element_t, alignment, Lock>,
                            element_t>
{
  public:
    static constexpr std::size_t Depth = depth;

    using ServiceCallback =
        std::function<void(PcBuffer<depth, element_t, alignment, Lock> *)>;

    PcBuffer(bool _auto_service = false,
             ServiceCallback _space_available = nullptr,
             ServiceCallback _data_available = nullptr)
        : state(depth), buffer(), space_available(_space_available),
          data_available(_data_available), auto_service(_auto_service)
    {
    }

    void set_space_available(ServiceCallback _space_available = nullptr)
    {
        /* Don't allow double assignment. */
        assert(not _space_available or
               (_space_available and space_available == nullptr));
        space_available = _space_available;
    }

    void set_data_available(ServiceCallback _data_available = nullptr)
    {
        /* Don't allow double assignment. */
        assert(not _data_available or
               (_data_available and data_available == nullptr));
        data_available = _data_available;
    }

    inline bool empty(void)
    {
        return state.empty();
    }

    inline bool full(void)
    {
        return state.full();
    }

    inline void clear()
    {
        Lock lock;

        /* Reset state. */
        state.reset();

        uint32_t tmp;
        buffer.poll_metrics(tmp, tmp);
    }

    inline element_t peek()
    {
        return buffer.peek();
    }

    Result pop_impl(element_t &elem)
    {
        /* Allow a pop request to feed the buffer. */
        if (auto_service)
        {
            service_space();
        }

        bool result;
        {
            Lock lock;
            result = state.decrement_data();
            if (result)
            {
                buffer.read_single(elem);
            }
        }

        if (result)
        {
            service_space();
        }

        return ToResult(result);
    }

    Result pop_n_impl(element_t *elem_array, std::size_t count)
    {
        /* Allow a pop request to feed the buffer. */
        if (auto_service)
        {
            service_space();
        }

        bool result;
        {
            Lock lock;
            result = state.decrement_data(count);
            if (result)
            {
                buffer.read_n(elem_array, count);
            }
        }

        if (result)
        {
            service_space();
        }

        return ToResult(result);
    }

    std::size_t try_pop_n_impl(element_t *elem_array, std::size_t count)
    {
        count = std::min(count, state.data_available());

        if (count)
        {
            count = ToBool(pop_n_impl(elem_array, count)) ? count : 0;
        }

        return count;
    }

    std::size_t pop_all_impl(element_t *elem_array = nullptr)
    {
        std::size_t result = state.data_available();
        if (result)
        {
            result = ToBool(pop_n_impl(elem_array, result)) ? result : 0;
        }
        return result;
    }

    Result push_impl(const element_t elem, bool drop = false)
    {
        if (auto_service)
        {
            service_data();
        }

        bool result;
        {
            Lock lock;
            result = state.increment_data(drop);
            if (result)
            {
                buffer.write_single(elem);
            }
        }

        if (result)
        {
            service_data();
        }

        return ToResult(result);
    }

    void push_blocking_impl(const element_t elem)
    {
        while (full())
        {
            service_data(true);
        }
        push_impl(elem);
    }

    inline void flush(void)
    {
        while (!empty())
        {
            service_data(true);
        }
    }

    Result push_n_impl(const element_t *elem_array, std::size_t count,
                       bool drop = false)
    {
        if (auto_service)
        {
            service_data();
        }

        bool result;
        {
            Lock lock;
            result = state.increment_data(drop, count);
            if (result)
            {
                buffer.write_n(elem_array, count);
            }
        }

        if (result)
        {
            service_data();
        }

        return ToResult(result);
    }

    std::size_t try_push_n_impl(const element_t *elem_array, std::size_t count)
    {
        count = std::min(count, state.space_available());

        if (count)
        {
            count = ToBool(push_n_impl(elem_array, count)) ? count : 0;
        }

        return count;
    }

    void push_n_blocking_impl(const element_t *elem_array, std::size_t count)
    {
        std::size_t chunk;
        while (count)
        {
            chunk = std::min(depth, count);

            while (!state.has_enough_space(chunk))
            {
                service_data(true);
            }

            if (ToBool(push_n_impl(elem_array, chunk)))
            {
                elem_array += chunk;
                count -= chunk;
            }
        }
    }

    inline const element_t *head(void)
    {
        return buffer.head();
    }

    PcBufferState state;

  protected:
    CircularBuffer<depth, element_t, alignment> buffer;

    ServiceCallback space_available;
    ServiceCallback data_available;

    bool auto_service;

    inline void service_data(bool required = false)
    {
        (void)required;
        assert(data_available or not required);

        if (data_available)
        {
            data_available(this);
        }
    }

    inline void service_space(bool required = false)
    {
        (void)required;
        assert(space_available or not required);

        if (space_available)
        {
            space_available(this);
        }
    }
};

/* Convenient aliases. */
template <std::size_t depth, std::size_t alignment = sizeof(std::byte),
          class Lock = NoopLock>
using ByteBuffer = PcBuffer<depth, std::byte, alignment, Lock>;
template <std::size_t depth, std::size_t alignment = sizeof(char),
          class Lock = NoopLock>
using CharBuffer = PcBuffer<depth, char, alignment, Lock>;
template <std::size_t depth, std::size_t alignment = sizeof(wchar_t),
          class Lock = NoopLock>
using WcharBuffer = PcBuffer<depth, wchar_t, alignment, Lock>;

/*
 * Stream interfaces.
 */

template <std::size_t depth, typename element_t = std::byte,
          std::size_t alignment = sizeof(element_t), class Lock = NoopLock>
inline std::basic_istream<element_t> &operator>>(
    std::basic_istream<element_t> &stream,
    PcBuffer<depth, element_t, alignment, Lock> &instance)
{
    std::array<element_t, depth> elem_array;
    instance.push_n_blocking(elem_array.data(),
                             stream.readsome(elem_array.data(), depth));
    return stream;
}

template <std::size_t depth, typename element_t = std::byte,
          std::size_t alignment = sizeof(element_t), class Lock = NoopLock>
inline std::basic_ostream<element_t> &operator<<(
    std::basic_ostream<element_t> &stream,
    PcBuffer<depth, element_t, alignment, Lock> &instance)
{
    std::array<element_t, depth> elem_array;
    stream.write(elem_array.data(), instance.try_pop_n(elem_array));
    return stream;
}

}; // namespace Coral