Apollo: Message lost happens under a high sending frequency.

Created on 24 May 2019  ·  5Comments  ·  Source: ApolloAuto/apollo

Describe the bug
When a publisher sending messages to a receiver in a high frequency,
message, especially the first message, may be lost.

To Reproduce
Use the demo talker/listener example to reproduce this bug/issue.
Steps to reproduce the behavior:

listener.cc

#include "cyber/cyber.h"                                                       
#include "cyber/examples/proto/examples.pb.h"                                  

static int cnt = 0;                                                                   
void MessageCallback(                                                          
    const std::shared_ptr<apollo::cyber::examples::proto::Chatter>& msg) {     
    cnt += 1;                                                                                      
    if(cnt == 10) AINFO << "[PASS] receive all.";                                      
}                                                                              

int main(int argc, char* argv[]) {                                             
  // init cyber framework                                                      
  apollo::cyber::Init(argv[0]);                                                
  // create listener node                                                      
  auto listener_node = apollo::cyber::CreateNode("listener");                  

  apollo::cyber::ReaderConfig reader_config;                                   
  reader_config.channel_name = "channel/chatter";                              
  reader_config.pending_queue_size = 100;    
  // create listener                                      
  auto listener =                                                              
      listener_node->CreateReader<apollo::cyber::examples::proto::Chatter>(    
        reader_config, MessageCallback);                                       

  apollo::cyber::WaitForShutdown();                                            
  return 0;                                                                    
}                                                                              

talker.cc

#include "cyber/cyber.h"                                                  
#include "cyber/examples/proto/examples.pb.h"                             
#include "cyber/time/rate.h"                                              
#include "cyber/time/time.h"                                              

using apollo::cyber::Rate;                                                
using apollo::cyber::Time;                                                
using apollo::cyber::examples::proto::Chatter;                            

int NUM = 10;                                                             

int main(int argc, char *argv[]) {                                        
  // init cyber framework                                                 
  apollo::cyber::Init(argv[0]);                                           
  // create talker node                                                   
  auto talker_node = apollo::cyber::CreateNode("talker");                 

  apollo::cyber::proto::RoleAttributes attr;                              
  attr.set_channel_name("channel/chatter");                               
  attr.mutable_qos_profile()->CopyFrom(                                   
    apollo::cyber::transport::QosProfileConf::QOS_PROFILE_SYSTEM_DEFAULT);
  // create talker   
  auto talker = talker_node->CreateWriter<Chatter>(attr);                 

  while(talker->HasReader() == false) {}                                  
  sleep(1);                                                               

  Rate rate(1.0);                                       
  for(int i = 0; i < NUM; ++i) {                                          
    assert(apollo::cyber::OK());                                          
    auto msg = std::make_shared<Chatter>();                               
    msg->set_timestamp(Time::Now().ToNanosecond());                       
    msg->set_lidar_timestamp(Time::Now().ToNanosecond());                 
    msg->set_seq(i);                                                      
    msg->set_content("Hello, apollo!");                                   
    talker->Write(msg);                                                   
    AINFO << "talker sent " << i << " message!";                          
    //rate.Sleep();                                                       
  }                                                                                                                                                                                                               
  return 0;                                                               
}                                                                         

Note that "rate.Sleep" is commented for emulating a high sending frequency and the reader and writer are initialized with QoS.

  1. compile talker and listener with bazel
  2. run listener first and then talker
  3. listener may not receive all the messages

Expected behavior
The listener should receive all 10 messages.

Desktop (please complete the following information):

  • OS: Ubuntu 16.04
  • apollo master branch
  • git commit 14f8fc7de5
Cyber Help wanted

Most helpful comment

Hi, This problem occurs when your talker is shut down, the writer will leave the topology, and shm transportation between talker and listener will be broken. The SHMReceiver will be Disabled(see code: cyber/transport/receiver/shm_receiver.h:Disable). So, even we read message from shm blocks, the callback will not be triggered.
You can resolve this problem by waiting a few time, i.e. sleep(1) before your talker wants to end.

All 5 comments

Hi, This problem occurs when your talker is shut down, the writer will leave the topology, and shm transportation between talker and listener will be broken. The SHMReceiver will be Disabled(see code: cyber/transport/receiver/shm_receiver.h:Disable). So, even we read message from shm blocks, the callback will not be triggered.
You can resolve this problem by waiting a few time, i.e. sleep(1) before your talker wants to end.

@from-wind thank you so much for your timely responses. We really appreciate your support!

@from-wind Hi. Actually, we use rtps mode for transportation (sorry for not mention before). And we tried sleep(1) before return, it seems useless. Seen from our log, it seems that under rtps transportation, sometimes the publisher cannot successfully sends the first few packets out. We also tested the publisher&subscriber example of fastrtps library, the same problem did not occur.

@guozhenwuai OK. Let me see.
Hi, Your talker use qos default, you can set the depth to 1000 or more(decided by your message size).

Please reopen if you have further questions @guoweiwan

Was this page helpful?
0 / 5 - 0 ratings

Related issues

maziqiqi picture maziqiqi  ·  3Comments

YaoQii picture YaoQii  ·  3Comments

westeast picture westeast  ·  3Comments

lesun90 picture lesun90  ·  3Comments

BenBaek picture BenBaek  ·  3Comments