Nextflow: Unclear how to send poison pill over non-output channel?

Created on 25 Apr 2017  路  11Comments  路  Source: nextflow-io/nextflow

This may be veering into support, but I've got a hang which I believe is occurring because no poison-pill is being sent over a channel which is not an output channel for a task.

https://github.com/uiuc-cgm/dnhs-chipimputation/blob/dnhs-chipimputation/impute2.nf is the script, and the last process which produces a poison pill is

 Apr-24 19:07:32.695 [Actor Thread 5] DEBUG nextflow.processor.TaskProcessor - <duplicatePedMapByChr> Sending poison pills and terminating process

Is there some way to associate a channel which I'm bind()ing to with a specific process? Or should I be doing this in some other way?

All 11 comments

The script looks fine. Can you please include the .nextflow.log file?

It seems that it completes correctly

Apr-24 19:30:01.186 [main] DEBUG nextflow.script.ScriptRunner - > Execution complete -- Goodby

Did it hang ?

Yeah; it never actually submits any of the imputeCombine processes.

Are you sure the script are you running is the same as the one publish at this link?

For example I'm not understanding from where this variable is variable is coming from. This should raise an error.

Sorry; I gave you the wrong branch there. This is the correct branch: https://github.com/uiuc-cgm/dnhs-chipimputation/blob/dnhs-imputation/impute2.nf (I've edited the initial post to have the correct link now).

OK. Now it makes more sense. When you create and bind "manually" values to a channel, you need to terminate it by using a close operator, which send a poison pill you where referring.

Said that channels should not be used inside the body of a process. I would suggest to refactor this process with a chain of operators. I did't get all the logic but I think a splitText would be a good start. That should allow you to handle the channel creation/bind/termination manually.

Ah, OK. So I should implement this with map, I guess. I naively thought that something like this would work too:

 chromosomes.subscribe onComplete: {  ped_maps_per_chr.close() };

but that complains because both a process and an operator use it.

I've now re-implemented this as two maps which cleared up the issue. [Although the code is a bit ugly; my groovy-fu is weak.]

Thanks for the pointers! I'll try to think about how to clarify the documentation now that I understand better how the processes are actually handled.

Much better now, you can even refactor as shown below:

chromosomes.flatMap { ped, map, chroms -> 
  def res = [];
  for (chrminmax in chroms.readLines()) {
  .. 
  } 
  res
}.set { ped_maps_per_chr }

I'm closing this issue. Feel free to comment if needed.

Was this page helpful?
0 / 5 - 0 ratings