Hello,
I am currently working on a Java project, where I want to use Zipkin Spans (or Sleuth Spans) in an Apache Spark Application to monitor performance of micro services. So far I managed to instrument the microservices to write the Spans to a Kafka topic (either as objects or as JSON) and the zipkin-server implementation is able to consume those entries without any problems. However I cannot figure out how to extract the Span Objects in an Apache Spark Application. What I am able to do is to extract the bare JSON-String - However there seem to be some issues with characters (e.g. "squares" instead of what should most likely be curly braces) and the additional encoding information in the String does not allow for parsing into a JSON-Object. I feel a bit lost, as I cannot figure out where all those serializations and deserializations happen and I could not find any helpful resources to tell me how to approach my problem. I would really appreciate your help with pointing me into the right direction or towards the right resources to solve my problem.
I think we'll need to see some code to help you. Can you post a sample somewhere?
I am using the spring-cloud-sleuth-stream (for json spans) or spring-cloud-sleuth-zipkin-stream (for the the object spans)dependencies in the Microservices combines with the spring-cloud-stream-binder-kafka dependency. I tried both versions and both write to Kafka as expected.
In the Spark-Application (not a spring context) I try the following to read from Kafka:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class); //For Objects most likely ByteArrayDeserializer should be used
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", "test");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("sleuth");
final JavaInputDStream<ConsumerRecord<byte[], byte[]>> stream = KafkaUtils.createDirectStream(sc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<byte[], byte[]>Subscribe(topics, kafkaParams));
Getting a String from the byte-Array works with:
if(record.value()!=null){
value = new String(record.value(), "UTF-8");
}
However the String seems not to be proper JSON.
For the version with the Span-Objects submitted the Kafka-Topics I have no clue how to extract them.
I hope this information helps you to get a better undestanding of my problem.
@mbogoevici @sobychacko any ideas? We're setting in Sleuth the spring.cloud.stream.bindings.sleuth.content-type=application/json so it should be parsed as a JSON, shouldn't it?
@lsteigerwald can you print exactly how the String looks like?
BTW This is how the POJO looks like
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class Spans {
private Host host;
private List<Span> spans = Collections.emptyList();
@SuppressWarnings("unused")
private Spans() {
}
public Spans(Host host, List<Span> spans) {
this.host = host;
this.spans = spans;
}
public Host getHost() {
return this.host;
}
public List<Span> getSpans() {
return this.spans;
}
public void setHost(Host host) {
this.host = host;
}
public void setSpans(List<Span> spans) {
this.spans = spans;
}
}
This is how the kafka comandline-consumer displays the json-String. And this is what I receive in the application when I extract the string from the byte-Array like in my second code snippet above:

I assume there is nothing wrong with the content in the Kafka topic, as the Zipkin-Server implementation is able to handle it without any issue. I just cannot figure out how to extract the Spans from Kafka in my own code.
zipkin server is reading spans stored after converting from sleuth's format
to zipkin's format. I suspect if you used the same code you could get them
in zipkin format, too.
On Wed, Jun 7, 2017 at 5:36 PM, lsteigerwald notifications@github.com
wrote:
I assume there is nothing wrong with the content in the Kafka topic, as
the Zipkin-Server implementation is able to handle it without any issue. I
just cannot figure out how to extract the Spans from Kafka in my own code.—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/spring-cloud/spring-cloud-sleuth/issues/610#issuecomment-306743713,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAD611heSullzIJov9F8V8kBRW9pLeMOks5sBm8ogaJpZM4NxihU
.
Yeah but actually what is flying around in Kafka is the aforementioned POJO. I thought that @lsteigerwald has a problem to deserialize the text to the Sleuth Stream format.
@marcingrzejszczak That is exactly my problem. I do not really care which kind of object I get after deserializing from Kafka, as long as it is a realiably structured thing (Json, Object, whatever). I have to process the data anyway before running my machine learning on it. Deserializing to a Sleuth Span would be perfectly enough to work with.
Yeah sure - that's why I wonder if the Stream guys @mbogoevici @ilayaperumalg @sobychacko can help you serialize the thing. Actually come to think of it... remember that at the beginning of your message you have headers. Kafka doesn't support headers out of the box. That's why in SC-Stream the initial part of the payload contains headers. You need to start parsing from {"host"...
The thing is that however the opening curly braces somehow get lost in serializing/desirializing (see the squares in the screenshot I attached earlier). Only the closing curly braces seem to be displayed/encoded the right way. Debugging in my IDE showed the same squares in the String as the command line in the screenshot.
Can you paste the content of the screenshot here?
Here is a span copied from the command prompt - not sure whether it is the same as the one in the screenhot, but I guess that doesn't matter too much:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"X-B3-SpanId "8196bfd0fbc091d9"X-B3-TraceId "8196bfd0fbc091d9"X-B3-Sampled "0"{"host":{"serviceName":"drivenow-mobility-service","address":"192.168.178.47","port":6001},"spans":[{"begin":1496752071311,"end":1496752071360,"name":"find-distance","traceId":-179599583673909797,"parents":[-9070718407819085451],"spanId":1618686251472792219,"exportable":true,"tags":{"mvc.controller.method":"findDistance","mvc.controller.class":"DriveNowController"},"logs":[],"durationMicros":48396},{"begin":1496752071300,"end":1496752071361,"name":"http:/getroutes","traceId":-179599583673909797,"parents":[5935189279474955391],"spanId":-9070718407819085451,"remote":true,"exportable":true,"logs":[{"timestamp":1496752071301,"event":"sr"},{"timestamp":1496752071361,"event":"ss"}],"durationMicros":61053}]}
So this is your JSON. It's a valid structure... What you have before this, are headers that you should ignore if you're doing manual parsing.
{
"host":{
"serviceName":"drivenow-mobility-service",
"address":"192.168.178.47",
"port":6001
},
"spans":[
{
"begin":1496752071311,
"end":1496752071360,
"name":"find-distance",
"traceId":-179599583673909797,
"parents":[
-9070718407819085451
],
"spanId":1618686251472792219,
"exportable":true,
"tags":{
"mvc.controller.method":"findDistance",
"mvc.controller.class":"DriveNowController"
},
"logs":[
],
"durationMicros":48396
},
{
"begin":1496752071300,
"end":1496752071361,
"name":"http:/getroutes",
"traceId":-179599583673909797,
"parents":[
5935189279474955391
],
"spanId":-9070718407819085451,
"remote":true,
"exportable":true,
"logs":[
{
"timestamp":1496752071301,
"event":"sr"
},
{
"timestamp":1496752071361,
"event":"ss"
}
],
"durationMicros":61053
}
]
}
Ok - then I will most likely try the manual approach if there is no already existing solution inside the sleuth framework (what I think would be a bit weird). Thank you very much :-)
We use Spring Cloud Stream that does the work for us. We don't do deserialization manually -
the framework does it for us. That's why maybe someone from the Stream team that I mentioned will be able to help you more on this.
For anything outside of Spring Cloud Stream application (here using from Apache Spark application), you need to use spring.cloud.stream.bindings.<channelName>.producer.headerMode=raw or spring.cloud.stream.bindings.<channelName>.consumer.headerMode=raw depending on whether you write or read to/from the Kafka topic.
@lsteigerwald What happens when you set this property?
Also, you can read this for some context on this.
But he's not using Spring or Spring Cloud Stream from what I understand...
@marcingrzejszczak What I meant was, reading the messages from Kafka topic that were not published by a Spring Cloud Stream application. I saw you mentioned:
We're setting in Sleuth the spring.cloud.stream.bindings.sleuth.content-type=application/json
and this made me think that the Spring Cloud Stream application is trying to read the messages from the Kafka topic (using the SCSt Kafka binder) but the messages were originally published by the non-Spring Cloud Stream application. So, in this case we need to set spring.cloud.stream.bindings.sleuth.consumer.headerMode=raw I believe.
@ilayaperumalg It's the other way around. The application publishes messages via SCSt and one recipient is a Spark application. It can be tried, but I'm not sure that the Zipkin Stream server (which is the primary recipient of the spans) will be able to read the raw message, without the application/json header embedded, as it relies on the @StreamListener conversion.
Seems like we are not supporting this use case (simultaneously publishing to a Stream and non-Stream app) very well right now. As a workaround, the Spark app can manually extract the payload using something similar to https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/EmbeddedHeaderUtils.java#L118.
Starting with Kafka 0.11 this will go away, as header support will eliminate embedding.
Thank you all for your help. I have now found a working solution for me to extract the Spans from Kafka.
Just one minor thing. I tried to use the ConvertToZipkinSpanList class to get a zipkin-span from the sleuth span, like @adriancole suggested earlier. However I seem not to be able to import that class by adding the spring-cloud-sleuth-zipkin-stream dependency to my pom. Is that intentional behaviour or have I messed up my configuration and it should be known once the the mentioned dependency is included?
Yeah that's intentional. As you can see the class is final and packaged scoped.