Describe the bug
When a publisher sending messages to a receiver in a high frequency,
message, especially the first message, may be lost.
To Reproduce
Use the demo talker/listener example to reproduce this bug/issue.
Steps to reproduce the behavior:
listener.cc
#include "cyber/cyber.h"
#include "cyber/examples/proto/examples.pb.h"
static int cnt = 0;
void MessageCallback(
const std::shared_ptr<apollo::cyber::examples::proto::Chatter>& msg) {
cnt += 1;
if(cnt == 10) AINFO << "[PASS] receive all.";
}
int main(int argc, char* argv[]) {
// init cyber framework
apollo::cyber::Init(argv[0]);
// create listener node
auto listener_node = apollo::cyber::CreateNode("listener");
apollo::cyber::ReaderConfig reader_config;
reader_config.channel_name = "channel/chatter";
reader_config.pending_queue_size = 100;
// create listener
auto listener =
listener_node->CreateReader<apollo::cyber::examples::proto::Chatter>(
reader_config, MessageCallback);
apollo::cyber::WaitForShutdown();
return 0;
}
talker.cc
#include "cyber/cyber.h"
#include "cyber/examples/proto/examples.pb.h"
#include "cyber/time/rate.h"
#include "cyber/time/time.h"
using apollo::cyber::Rate;
using apollo::cyber::Time;
using apollo::cyber::examples::proto::Chatter;
int NUM = 10;
int main(int argc, char *argv[]) {
// init cyber framework
apollo::cyber::Init(argv[0]);
// create talker node
auto talker_node = apollo::cyber::CreateNode("talker");
apollo::cyber::proto::RoleAttributes attr;
attr.set_channel_name("channel/chatter");
attr.mutable_qos_profile()->CopyFrom(
apollo::cyber::transport::QosProfileConf::QOS_PROFILE_SYSTEM_DEFAULT);
// create talker
auto talker = talker_node->CreateWriter<Chatter>(attr);
while(talker->HasReader() == false) {}
sleep(1);
Rate rate(1.0);
for(int i = 0; i < NUM; ++i) {
assert(apollo::cyber::OK());
auto msg = std::make_shared<Chatter>();
msg->set_timestamp(Time::Now().ToNanosecond());
msg->set_lidar_timestamp(Time::Now().ToNanosecond());
msg->set_seq(i);
msg->set_content("Hello, apollo!");
talker->Write(msg);
AINFO << "talker sent " << i << " message!";
//rate.Sleep();
}
return 0;
}
Note that "rate.Sleep" is commented for emulating a high sending frequency and the reader and writer are initialized with QoS.
Expected behavior
The listener should receive all 10 messages.
Desktop (please complete the following information):
Hi, This problem occurs when your talker is shut down, the writer will leave the topology, and shm transportation between talker and listener will be broken. The SHMReceiver will be Disabled(see code: cyber/transport/receiver/shm_receiver.h:Disable). So, even we read message from shm blocks, the callback will not be triggered.
You can resolve this problem by waiting a few time, i.e. sleep(1) before your talker wants to end.
@from-wind thank you so much for your timely responses. We really appreciate your support!
@from-wind Hi. Actually, we use rtps mode for transportation (sorry for not mention before). And we tried sleep(1) before return, it seems useless. Seen from our log, it seems that under rtps transportation, sometimes the publisher cannot successfully sends the first few packets out. We also tested the publisher&subscriber example of fastrtps library, the same problem did not occur.
@guozhenwuai OK. Let me see.
Hi, Your talker use qos default, you can set the depth to 1000 or more(decided by your message size).
Please reopen if you have further questions @guoweiwan
Most helpful comment
Hi, This problem occurs when your talker is shut down, the writer will leave the topology, and shm transportation between talker and listener will be broken. The SHMReceiver will be Disabled(see code: cyber/transport/receiver/shm_receiver.h:Disable). So, even we read message from shm blocks, the callback will not be triggered.
You can resolve this problem by waiting a few time, i.e. sleep(1) before your talker wants to end.