From b4cd108f033273fa26a9ef3bd89114dc0e158520 Mon Sep 17 00:00:00 2001 From: Pawel Kurowski Date: Mon, 9 Sep 2019 17:02:14 +0200 Subject: [PATCH] Allow multiple packets to be received from channel --- Src/Common/MWR/C3/Interfaces/Channels/Slack.h | 4 +- .../C3/Interfaces/Channels/UncShareFile.cpp | 33 ++++----- .../MWR/C3/Interfaces/Channels/UncShareFile.h | 4 +- .../MWR/C3/Internals/AutomaticRegistrator.h | 72 +++++++++++++++++++ Src/Common/MWR/C3/Internals/Interface.cpp | 2 +- Src/Common/MWR/C3/Internals/Interface.h | 8 +-- Src/Core/DeviceBridge.cpp | 4 +- 7 files changed, 100 insertions(+), 27 deletions(-) diff --git a/Src/Common/MWR/C3/Interfaces/Channels/Slack.h b/Src/Common/MWR/C3/Interfaces/Channels/Slack.h index 029f8a9..8c84925 100644 --- a/Src/Common/MWR/C3/Interfaces/Channels/Slack.h +++ b/Src/Common/MWR/C3/Interfaces/Channels/Slack.h @@ -13,11 +13,11 @@ namespace MWR::C3::Interfaces::Channels /// OnSend callback implementation. /// @param packet data to send to Channel. /// @returns size_t number of bytes successfully written. - size_t OnSendToChannel(ByteView packet) override; + size_t OnSendToChannel(ByteView packet); /// Reads a single C3 packet from Channel. /// @return packet retrieved from Channel. - ByteVector OnReceiveFromChannel() override; + ByteVector OnReceiveFromChannel(); /// Get channel capability. /// @returns ByteView view of channel capability. diff --git a/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.cpp b/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.cpp index 24422f8..7dcca9b 100644 --- a/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.cpp +++ b/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.cpp @@ -94,39 +94,40 @@ size_t MWR::C3::Interfaces::Channels::UncShareFile::OnSendToChannel(ByteView dat } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -MWR::ByteVector MWR::C3::Interfaces::Channels::UncShareFile::OnReceiveFromChannel() +std::vector MWR::C3::Interfaces::Channels::UncShareFile::OnReceiveFromChannel() { // Read a single packet from the oldest file that belongs to this channel std::vector channelFiles; for (auto&& directoryEntry : std::filesystem::directory_iterator(m_FilesystemPath)) - { if (BelongToChannel(directoryEntry.path())) channelFiles.emplace_back(directoryEntry.path()); - } - if (channelFiles.empty()) - return {}; - auto oldestFile = std::min_element(channelFiles.begin(), channelFiles.end(), [](auto const& a, auto const& b) -> bool { return std::filesystem::last_write_time(a) < std::filesystem::last_write_time(b); }); + std::sort(channelFiles.begin(), channelFiles.end(), [](auto const& a, auto const& b) -> bool { return std::filesystem::last_write_time(a) < std::filesystem::last_write_time(b); }); - // Get the contents of the file and pass on. Return the packet even if removing the file failed. - ByteVector packet; - try + std::vector ret; + ret.reserve(channelFiles.size()); + for (auto&& file : channelFiles) { + ByteVector packet; + try { - std::ifstream readFile(oldestFile->generic_string(), std::ios::binary); + auto readFile = std::ifstream(file, std::ios::binary); packet = ByteVector{ std::istreambuf_iterator{readFile}, {} }; + readFile.close(); + RemoveFile(file); + } + catch (std::exception& exception) + { + Log({ OBF("Caught a std::exception when processing contents of filename: ") + file.generic_string() + OBF(" : ") + exception.what(), LogMessage::Severity::Error }); + break; } - RemoveFile(*oldestFile); - } - catch (std::exception& exception) - { - Log({ OBF("Caught a std::exception when processing contents of filename: ") + oldestFile->generic_string() + OBF(" : ") + exception.what(), LogMessage::Severity::Error }); + ret.push_back(std::move(packet)); } - return packet; + return ret; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.h b/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.h index a0f61b1..7847987 100644 --- a/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.h +++ b/Src/Common/MWR/C3/Interfaces/Channels/UncShareFile.h @@ -13,11 +13,11 @@ namespace MWR::C3::Interfaces::Channels /// OnSend callback implementation. /// @param blob data to send to Channel. /// @returns size_t number of bytes successfully written. - size_t OnSendToChannel(ByteView blob) override; + size_t OnSendToChannel(ByteView blob); /// Reads a single C3 packet from Channel. /// @return packet retrieved from Channel. - ByteVector OnReceiveFromChannel() override; + std::vector OnReceiveFromChannel(); /// Get channel capability. /// @returns ByteView view of channel capability. diff --git a/Src/Common/MWR/C3/Internals/AutomaticRegistrator.h b/Src/Common/MWR/C3/Internals/AutomaticRegistrator.h index ef0e42e..3da229b 100644 --- a/Src/Common/MWR/C3/Internals/AutomaticRegistrator.h +++ b/Src/Common/MWR/C3/Internals/AutomaticRegistrator.h @@ -112,12 +112,84 @@ namespace MWR::C3 constexpr static std::chrono::milliseconds s_MinUpdateFrequency = 30ms; constexpr static std::chrono::milliseconds s_MaxUpdateFrequency = 30ms; + /// Constructor setting default update frequency for channel Channel() { static_assert(Iface::s_MinUpdateFrequency >= 30ms && Iface::s_MinUpdateFrequency <= Iface::s_MaxUpdateFrequency, "The frequency is set incorrectly"); m_MinUpdateFrequency = Iface::s_MinUpdateFrequency; m_MaxUpdateFrequency = Iface::s_MaxUpdateFrequency; } + + /// Callback that is periodically called for every Device to update itself. + /// This is point where dynamic polymorphisms is replaced by static one with recognition of returned value. + /// Types using Channel CRTP should implement MWR::ByteVector OnReceiveFromChannel(), or std::vector OnReceiveFromChannel() + /// @return std::vector that contains all packets retrieved from Channel. + std::vector OnReceiveFromChannelInternal() override final + { + static_assert(CanRecive<>::value, "OnReceiveFromChannel is not implemented"); + static_assert(std::is_same_v, ByteVector> || std::is_same_v, std::vector>, "OnReceiveFromChannel should return ByteVector or std::vector"); + return ReceiveWrapper(); + } + + /// Called every time Relay wants to send a packet through this Channel Device. + /// This is point where dynamic polymorphisms is replaced by static one. + /// Types using Channel CRTP should implement size_t OnSendToChannel(ByteView). + /// @param blob buffer containing data to send. + size_t OnSendToChannelInternal(ByteView packet) override final + { + static_assert(CanSend::value, "OnSendToChannel is not implemented"); + auto self = static_cast(this); + return self->OnSendToChannel(packet); + } + + private: + /// Alias to get result of OnReceiveFromChannel call. + /// Use in form ReceiveReturnType to obtain type. + /// Can fail if function is not implemented. + template + using ReceiveReturnType = decltype(std::declval().OnReceiveFromChannel(std::declval()...)); + + /// Alias to test if OnReceiveFromChannel is implemented. + /// Use in form CanRecive::value to obtain bool value with information. + template + using CanRecive = MWR::Utils::CanApply; + + /// Alias to get result of OnSendToChannel call. + /// Use in form SendReturnType to obtain type. + /// Can fail if function is not implemented. + template + using SendReturnType = decltype(std::declval().OnSendToChannel(std::declval()...)); + + /// Alias to test if OnSendToChannel is implemented. + /// Use in form CanSend::value to obtain bool value with information. + template + using CanSend = MWR::Utils::CanApply; + + /// Virtual OnSendToChannelInternal cannot be templated. + /// This function will be available for call if OnReceiveFromChannel returns ByteVector. + /// @returns std::vector one packet pushed on collection if it is not empty.. + template + std::enable_if_t, ByteVector>, std::vector> ReceiveWrapper() + { + auto self = static_cast(this); + std::vector ret; + if (auto packet = self->OnReceiveFromChannel(); !packet.empty()) + ret.push_back(std::move(packet)); + + return ret; + } + + /// Virtual OnSendToChannelInternal cannot be templated. + /// This function will be available for call if OnReceiveFromChannel returns std::vector. + /// @returns std::vector many packets that are not empty. + template + std::enable_if_t, std::vector>, std::vector> ReceiveWrapper() + { + auto self = static_cast(this); + auto ret = self->OnReceiveFromChannel(); + static_cast(std::remove_if(ret.begin(), ret.end(), [](auto&& e) { return e.empty(); })); + return ret; + } }; #ifdef C3_IS_GATEWAY diff --git a/Src/Common/MWR/C3/Internals/Interface.cpp b/Src/Common/MWR/C3/Internals/Interface.cpp index f860eec..2e08c85 100644 --- a/Src/Common/MWR/C3/Internals/Interface.cpp +++ b/Src/Common/MWR/C3/Internals/Interface.cpp @@ -17,7 +17,7 @@ void MWR::C3::AbstractPeripheral::OnReceive() void MWR::C3::AbstractChannel::OnReceive() { if (auto bridge = GetBridge(); bridge) - if (auto packet = OnReceiveFromChannel(); !packet.empty()) + for (auto&& packet : OnReceiveFromChannelInternal()) bridge->PassNetworkPacket(packet); } diff --git a/Src/Common/MWR/C3/Internals/Interface.h b/Src/Common/MWR/C3/Internals/Interface.h index 68981c4..a5a010a 100644 --- a/Src/Common/MWR/C3/Internals/Interface.h +++ b/Src/Common/MWR/C3/Internals/Interface.h @@ -46,7 +46,7 @@ namespace MWR::C3 /// Called every time Relay wants to send a packet through this Channel Device. Should always be called from the same thread for every sender (so it would be safe to use thread_local vars). /// @param blob buffer containing data to send. /// @remarks this method is used only to pass internal (C3) packets through the C3 network, thus it won't be called for any other types of Devices than Channels. - virtual size_t OnSendToChannel(ByteView packet) = 0; + virtual size_t OnSendToChannelInternal(ByteView packet) = 0; /// Fired by Relay to pass by provided Command from Connector. /// @param command full Command with arguments. @@ -85,8 +85,8 @@ namespace MWR::C3 struct AbstractChannel : Device { /// Callback that is periodically called for every Device to update itself. Might be called from a separate thread. The Device should perform all necessary actions and leave as soon as possible. - /// @return ByteVector that contains a single packet retrieved from Channel. - virtual ByteVector OnReceiveFromChannel() = 0; + /// @return std::vector that contains all packets retrieved from Channel. + virtual std::vector OnReceiveFromChannelInternal() = 0; /// Tells that this Device type is a Channel. bool IsChannel() const override { return true; } @@ -116,7 +116,7 @@ namespace MWR::C3 /// This method unconditionally throws std::logic_error as calling it is illegal (Peripherals are not a Channels). @see DeviceDevice::OnSendToChannel. /// @throw This method unconditionally throws std::logic_error. - size_t OnSendToChannel(ByteView) override final + size_t OnSendToChannelInternal(ByteView) override final { throw std::logic_error{ OBF("Tried to send a C3 packet through a Peripheral.") }; } diff --git a/Src/Core/DeviceBridge.cpp b/Src/Core/DeviceBridge.cpp index 56078a4..f87b4e4 100644 --- a/Src/Core/DeviceBridge.cpp +++ b/Src/Core/DeviceBridge.cpp @@ -56,7 +56,7 @@ void MWR::C3::Core::DeviceBridge::OnPassNetworkPacket(ByteView packet) if (m_IsNegotiationChannel) // negotiation channel does not support chunking. Just pass packet and leave. { - auto sent = GetDevice()->OnSendToChannel(packet); + auto sent = GetDevice()->OnSendToChannelInternal(packet); if (sent != packet.size()) throw std::runtime_error{OBF("Negotiation channel does not support chunking. Packet size: ") + std::to_string(packet.size()) + OBF(" Channel sent: ") + std::to_string(sent)}; @@ -69,7 +69,7 @@ void MWR::C3::Core::DeviceBridge::OnPassNetworkPacket(ByteView packet) while (!packet.empty()) { auto data = ByteVector{}.Write(messageId, chunkId, oryginalSize).Concat(packet); - auto sent = GetDevice()->OnSendToChannel(data); + auto sent = GetDevice()->OnSendToChannelInternal(data); if (sent >= QualityOfService::s_MinFrameSize || sent == data.size()) // if this condition were not channel must resend data. {