Accumulators¶
Accumulators are a way of passing data from within a semaphore group to its controlling funnel.
Accumulators are defined within pipelines as URLs. These must have the
accu_name
key, which indicates the name of the funnel’s parameter that
will hold the data. These data come from the dataflow event, specifically
the parameter that has the name of the accu_name
key. This can be
overridden with the accu_input_variable
key.
There are five types of Accumulators, all described below:
scalar, pile, multiset, array and hash. For each of them we show how to
initialise them and equivalent Perl code to build the same structure.
Scalar¶
- Basic syntax:
?accu_name=scalar_name
- Extended syntax:
?accu_name=scalar_name&accu_input_variable=output_parameter_name
- Retrieval:
my $scalar_value = $self->param('scalar_name');
This is the simplest type of Accumulator. The basic syntax example passes
he value of the scalar_name
parameter from the fan to the
funnel. The extended syntax example makes the value of the
output_parameter_name
parameter from the fan available to the
funnel as if it were a parameter named scalar_name
. If there are
multiple Jobs in the fan, eHive will arbitrarily select one of them to
define the Accumulator.
In Perl, this is equivalent to doing this:
- Accumulator initialisation:
my $scalar_name;
- Accumulator extension:
$scalar_name = $scalar_name; # Basic syntax $scalar_name = $output_parameter_name; # Extended syntax
- Accumulator retrieval:
say "Value: $scalar_name";
Pile¶
- Basic syntax:
?accu_name=pile_name&accu_address=[]
- Extended syntax:
?accu_name=pile_name&accu_address=[]&accu_input_variable=pile_component
- Retrieval:
my $pile_ref = $self->param('pile_name'); foreach my $pile_element (@{$pile_ref}) { # do something with $pile_element }
A pile is an unordered list. All the pile_name
(or pile_component
in the second form) values that are dataflown
into the Accumulator are aggregated into a list named pile_name
in a random order.
In Perl, this is similar to doing this:
- Accumulator initialisation:
my @pile_name;
- Accumulator extension:
push @pile_name, $pile_name; # Basic syntax push @pile_name, $pile_component; # Extended syntax
- Accumulator retrieval:
foreach my $v (@pile_name) { say "Value: $v"; }
Multiset¶
- Basic syntax:
?accu_name=multiset_name&accu_address={}
- Extended syntax:
?accu_name=multiset_name&accu_address={}&accu_input_variable=multiset_component
- Retrieval:
my $multiset_ref = $self->param('multiset_name'); foreach my $multiset_key (keys(%{$multiset_ref})) { my $count = $multiset_ref->{$multiset_key}; }
A multiset is a set that allows multiple instances of the same element (see
Wikipedia). It is implemented in eHive as a hash that maps each element
to its multiplicity (a positive integer). The above URLs define a multiset
named multiset_name
, filling it with either the multiset_name
or
multiset_component
parameter.
In Perl, this is equivalent to doing this:
- Accumulator initialisation:
my %multiset_name;
- Accumulator extension:
$multiset_name{$multiset_name} += 1; # Basic syntax $multiset_name{$multiset_component} += 1; # Extended syntax
- Accumulator retrieval:
foreach my $key (keys %multiset_name) { say "Value $key is present ".$multiset_name{$key}." times"; }
Array¶
- Basic syntax:
?accu_name=array_name&accu_address=[index_name]
- Extended syntax:
?accu_name=array_name&accu_address=[index_name]&accu_input_variable=array_item
- Retrieval:
my $array_arrayref = $self->param('array_name'); foreach my $array_element (@{$array_arrayref}) { # do something with $array_element }
Here the emitting Job must flow both the value of the array item (either
via the array_name
or array_item
parameter) and its index
index_name
.
eHive puts together the items at the requested
positions, filling the gaps with undef, in an array named array_name
.
In Perl, this is equivalent to doing this:
- Accumulator initialisation:
my @array_name;
- Accumulator extension:
$array_name[$index_name] = $array_name; # Basic syntax $array_name[$index_name] = $array_item; # Extended syntax
- Accumulator retrieval:
foreach my $v (@array_name) { say "Value: $v"; }
Hash¶
- Basic syntax:
?accu_name=hash_name&accu_address={key_name}
- Extended syntax:
?accu_name=hash_name&accu_address={key_name}&accu_input_variable=hash_item
- Retrieval:
my $hash_hashref = $self->param('hash_name'); foreach my $key (keys(%{$hash_hashref})) { my $value = $hash_hashref->{$key}; }
Here the emitting Job must flow both the value of the hash item (either
via the hash_name
or hash_item
parameter) and the key name
key_name
.
eHive puts together the items in a hash named hash_name
.
In Perl, this is equivalent to doing this:
- Accumulator initialisation:
my %hash_name;
- Accumulator extension:
$hash_name{$key_name} = $hash_name; # Basic syntax $hash_name{$key_name} = $hash_item; # Extended syntax
- Accumulator retrieval:
foreach my $key (keys %hash_name) { say "Value $key is mapped to ".$hash_name{$key}; }
Advanced data structures¶
The accu_address
key can define more complex data structures by
chaining the simple address types shown above. For instance the following
Accumulator definition will create a multi-level hash that stores the list
of all genes on each triplet (species, chromosome, strand).
?accu_name=gene_lists&accu_address={species}{chromosome}{strand}[]&accu_input_variable=gene_name
Traversing the resulting hash can be done this way in Perl:
my %gene_list = %{$self->param('gene_list')};
foreach my $species (keys %gene_list) {
say "$species has ".scalar(keys %{$gene_list->{$species}})." chromosomes";
foreach my $chromosome (keys %{$gene_list->{$species}}){
my $pos_strand_genes = $gene_list->{$species}->{$chromosome}->{1};
my $neg_strand_genes = $gene_list->{$species}->{$chromosome}->{-1};
say "Chrom. $chromosome of $species has "
.scalar(@$pos_strand_genes)." genes on the positive strand and "
.scalar(@$neg_strand_genes)." genes on the negative strand";
}
}
K-mer pipeline¶
There are further examples in the Kmer example pipelines. These three pipelines all perform the same workflow (computing the distribution of k-mer in a given set of input sequences), but accomplish the task in different ways using various Accumulator patterns.
The first Analyses of the pipeline will break up the input sequences in chunks that can be efficiently processed in parallel. The processing and the dataflowing of each chunk are done exactly the same way in all flavours, but because of different Accumulator syntaxes, the funnel (the “compile_count” Analysis, which does the final summation) will have to use the resulting data structure in different ways.
The “count_kmers” Analysis dataflows on two branches:
- On branch #3 a hash that has the name of the file (sequence_file key) and the counts per k-mer (as a hash under the counts key).
- On branch #4 a series of hashes that contain the name of the file (sequence_file key), a k-mer (kmer key) and its count in that file (count key).
KmerPipelineAoH_conf – Array of Hashes: | |
---|---|
In this mode, the Accumulator is connected to branch #3 and aggregates all the counts field in a pile. The information about the initial file name is not tracked in the Accumulator. The Accumulator syntax is |
|
KmerPipelineHoH_conf – Hash of Hashes: | |
In this mode, the Accumulator is connected to branch #3 and aggregates all the counts field in a hash indexed by the name of the chunk sequence_file. The Accumulator syntax is |
|
KmerPipelineHoA_conf – Hash of Arrays: | |
In this mode, the Accumulator is connected to branch #4 and aggregates
all the counts in one array per k-mer.
The signature {kmer}[] indicates that the final structure is a hash
indexed by each kmer, and whose values are piles of the Accumulator’s input variable, i.e. count.
The Accumulator syntax is |