Allow multiple packets to be received from channel

dependabot/npm_and_yarn/Src/WebController/UI/websocket-extensions-0.1.4
Pawel Kurowski 2019-09-09 17:02:14 +02:00
parent 408ee1da67
commit b4cd108f03
7 changed files with 100 additions and 27 deletions

View File

@ -13,11 +13,11 @@ namespace MWR::C3::Interfaces::Channels
/// OnSend callback implementation.
/// @param packet data to send to Channel.
/// @returns size_t number of bytes successfully written.
size_t OnSendToChannel(ByteView packet) override;
size_t OnSendToChannel(ByteView packet);
/// Reads a single C3 packet from Channel.
/// @return packet retrieved from Channel.
ByteVector OnReceiveFromChannel() override;
ByteVector OnReceiveFromChannel();
/// Get channel capability.
/// @returns ByteView view of channel capability.

View File

@ -94,39 +94,40 @@ size_t MWR::C3::Interfaces::Channels::UncShareFile::OnSendToChannel(ByteView dat
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
MWR::ByteVector MWR::C3::Interfaces::Channels::UncShareFile::OnReceiveFromChannel()
std::vector<MWR::ByteVector> MWR::C3::Interfaces::Channels::UncShareFile::OnReceiveFromChannel()
{
// Read a single packet from the oldest file that belongs to this channel
std::vector<std::filesystem::path> channelFiles;
for (auto&& directoryEntry : std::filesystem::directory_iterator(m_FilesystemPath))
{
if (BelongToChannel(directoryEntry.path()))
channelFiles.emplace_back(directoryEntry.path());
}
if (channelFiles.empty())
return {};
auto oldestFile = std::min_element(channelFiles.begin(), channelFiles.end(), [](auto const& a, auto const& b) -> bool { return std::filesystem::last_write_time(a) < std::filesystem::last_write_time(b); });
std::sort(channelFiles.begin(), channelFiles.end(), [](auto const& a, auto const& b) -> bool { return std::filesystem::last_write_time(a) < std::filesystem::last_write_time(b); });
// Get the contents of the file and pass on. Return the packet even if removing the file failed.
ByteVector packet;
try
std::vector<ByteVector> ret;
ret.reserve(channelFiles.size());
for (auto&& file : channelFiles)
{
ByteVector packet;
try
{
std::ifstream readFile(oldestFile->generic_string(), std::ios::binary);
auto readFile = std::ifstream(file, std::ios::binary);
packet = ByteVector{ std::istreambuf_iterator<char>{readFile}, {} };
readFile.close();
RemoveFile(file);
}
catch (std::exception& exception)
{
Log({ OBF("Caught a std::exception when processing contents of filename: ") + file.generic_string() + OBF(" : ") + exception.what(), LogMessage::Severity::Error });
break;
}
RemoveFile(*oldestFile);
}
catch (std::exception& exception)
{
Log({ OBF("Caught a std::exception when processing contents of filename: ") + oldestFile->generic_string() + OBF(" : ") + exception.what(), LogMessage::Severity::Error });
ret.push_back(std::move(packet));
}
return packet;
return ret;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -13,11 +13,11 @@ namespace MWR::C3::Interfaces::Channels
/// OnSend callback implementation.
/// @param blob data to send to Channel.
/// @returns size_t number of bytes successfully written.
size_t OnSendToChannel(ByteView blob) override;
size_t OnSendToChannel(ByteView blob);
/// Reads a single C3 packet from Channel.
/// @return packet retrieved from Channel.
ByteVector OnReceiveFromChannel() override;
std::vector<ByteVector> OnReceiveFromChannel();
/// Get channel capability.
/// @returns ByteView view of channel capability.

View File

@ -112,12 +112,84 @@ namespace MWR::C3
constexpr static std::chrono::milliseconds s_MinUpdateFrequency = 30ms;
constexpr static std::chrono::milliseconds s_MaxUpdateFrequency = 30ms;
/// Constructor setting default update frequency for channel
Channel()
{
static_assert(Iface::s_MinUpdateFrequency >= 30ms && Iface::s_MinUpdateFrequency <= Iface::s_MaxUpdateFrequency, "The frequency is set incorrectly");
m_MinUpdateFrequency = Iface::s_MinUpdateFrequency;
m_MaxUpdateFrequency = Iface::s_MaxUpdateFrequency;
}
/// Callback that is periodically called for every Device to update itself.
/// This is point where dynamic polymorphisms is replaced by static one with recognition of returned value.
/// Types using Channel CRTP should implement MWR::ByteVector OnReceiveFromChannel(), or std::vector<MWR::ByteVector> OnReceiveFromChannel()
/// @return std::vector<ByteVector> that contains all packets retrieved from Channel.
std::vector<ByteVector> OnReceiveFromChannelInternal() override final
{
static_assert(CanRecive<>::value, "OnReceiveFromChannel is not implemented");
static_assert(std::is_same_v<ReceiveReturnType<Iface>, ByteVector> || std::is_same_v<ReceiveReturnType<Iface>, std::vector<ByteVector>>, "OnReceiveFromChannel should return ByteVector or std::vector<ByteVector>");
return ReceiveWrapper<Iface>();
}
/// Called every time Relay wants to send a packet through this Channel Device.
/// This is point where dynamic polymorphisms is replaced by static one.
/// Types using Channel CRTP should implement size_t OnSendToChannel(ByteView).
/// @param blob buffer containing data to send.
size_t OnSendToChannelInternal(ByteView packet) override final
{
static_assert(CanSend<ByteView>::value, "OnSendToChannel is not implemented");
auto self = static_cast<Iface*>(this);
return self->OnSendToChannel(packet);
}
private:
/// Alias to get result of OnReceiveFromChannel call.
/// Use in form ReceiveReturnType<Iface> to obtain type.
/// Can fail if function is not implemented.
template<class T, class...Ts>
using ReceiveReturnType = decltype(std::declval<T>().OnReceiveFromChannel(std::declval<Ts>()...));
/// Alias to test if OnReceiveFromChannel is implemented.
/// Use in form CanRecive<Iface>::value to obtain bool value with information.
template<class...Ts>
using CanRecive = MWR::Utils::CanApply<ReceiveReturnType, Iface, Ts...>;
/// Alias to get result of OnSendToChannel call.
/// Use in form SendReturnType<Iface, ByteView> to obtain type.
/// Can fail if function is not implemented.
template<class T, class...Ts>
using SendReturnType = decltype(std::declval<T>().OnSendToChannel(std::declval<Ts>()...));
/// Alias to test if OnSendToChannel is implemented.
/// Use in form CanSend<Iface>::value to obtain bool value with information.
template<class...Ts>
using CanSend = MWR::Utils::CanApply<SendReturnType, Iface, Ts...>;
/// Virtual OnSendToChannelInternal cannot be templated.
/// This function will be available for call if OnReceiveFromChannel returns ByteVector.
/// @returns std::vector<ByteVector> one packet pushed on collection if it is not empty..
template <typename T>
std::enable_if_t<std::is_same_v<ReceiveReturnType<T>, ByteVector>, std::vector<ByteVector>> ReceiveWrapper()
{
auto self = static_cast<Iface*>(this);
std::vector<ByteVector> ret;
if (auto packet = self->OnReceiveFromChannel(); !packet.empty())
ret.push_back(std::move(packet));
return ret;
}
/// Virtual OnSendToChannelInternal cannot be templated.
/// This function will be available for call if OnReceiveFromChannel returns std::vector<ByteVector>.
/// @returns std::vector<ByteVector> many packets that are not empty.
template <typename T>
std::enable_if_t<std::is_same_v<ReceiveReturnType<T>, std::vector<ByteVector>>, std::vector<ByteVector>> ReceiveWrapper()
{
auto self = static_cast<Iface*>(this);
auto ret = self->OnReceiveFromChannel();
static_cast<void>(std::remove_if(ret.begin(), ret.end(), [](auto&& e) { return e.empty(); }));
return ret;
}
};
#ifdef C3_IS_GATEWAY

View File

@ -17,7 +17,7 @@ void MWR::C3::AbstractPeripheral::OnReceive()
void MWR::C3::AbstractChannel::OnReceive()
{
if (auto bridge = GetBridge(); bridge)
if (auto packet = OnReceiveFromChannel(); !packet.empty())
for (auto&& packet : OnReceiveFromChannelInternal())
bridge->PassNetworkPacket(packet);
}

View File

@ -46,7 +46,7 @@ namespace MWR::C3
/// Called every time Relay wants to send a packet through this Channel Device. Should always be called from the same thread for every sender (so it would be safe to use thread_local vars).
/// @param blob buffer containing data to send.
/// @remarks this method is used only to pass internal (C3) packets through the C3 network, thus it won't be called for any other types of Devices than Channels.
virtual size_t OnSendToChannel(ByteView packet) = 0;
virtual size_t OnSendToChannelInternal(ByteView packet) = 0;
/// Fired by Relay to pass by provided Command from Connector.
/// @param command full Command with arguments.
@ -85,8 +85,8 @@ namespace MWR::C3
struct AbstractChannel : Device
{
/// Callback that is periodically called for every Device to update itself. Might be called from a separate thread. The Device should perform all necessary actions and leave as soon as possible.
/// @return ByteVector that contains a single packet retrieved from Channel.
virtual ByteVector OnReceiveFromChannel() = 0;
/// @return std::vector<ByteVector> that contains all packets retrieved from Channel.
virtual std::vector<ByteVector> OnReceiveFromChannelInternal() = 0;
/// Tells that this Device type is a Channel.
bool IsChannel() const override { return true; }
@ -116,7 +116,7 @@ namespace MWR::C3
/// This method unconditionally throws std::logic_error as calling it is illegal (Peripherals are not a Channels). @see DeviceDevice::OnSendToChannel.
/// @throw This method unconditionally throws std::logic_error.
size_t OnSendToChannel(ByteView) override final
size_t OnSendToChannelInternal(ByteView) override final
{
throw std::logic_error{ OBF("Tried to send a C3 packet through a Peripheral.") };
}

View File

@ -56,7 +56,7 @@ void MWR::C3::Core::DeviceBridge::OnPassNetworkPacket(ByteView packet)
if (m_IsNegotiationChannel) // negotiation channel does not support chunking. Just pass packet and leave.
{
auto sent = GetDevice()->OnSendToChannel(packet);
auto sent = GetDevice()->OnSendToChannelInternal(packet);
if (sent != packet.size())
throw std::runtime_error{OBF("Negotiation channel does not support chunking. Packet size: ") + std::to_string(packet.size()) + OBF(" Channel sent: ") + std::to_string(sent)};
@ -69,7 +69,7 @@ void MWR::C3::Core::DeviceBridge::OnPassNetworkPacket(ByteView packet)
while (!packet.empty())
{
auto data = ByteVector{}.Write(messageId, chunkId, oryginalSize).Concat(packet);
auto sent = GetDevice()->OnSendToChannel(data);
auto sent = GetDevice()->OnSendToChannelInternal(data);
if (sent >= QualityOfService::s_MinFrameSize || sent == data.size()) // if this condition were not channel must resend data.
{