Java – Build custom connection logic in cascading to ensure only MAP_SIDE

Build custom connection logic in cascading to ensure only MAP_SIDE… here is a solution to the problem.

Build custom connection logic in cascading to ensure only MAP_SIDE

I have 3 cascading pipes (one connected to the other two) as described below

  • LHSPipe – (larger size).

enter image description here

  • RHSPipes – (may fit smaller size of memory).

enter image description here

The pseudocode is as follows, and in this case involves two joins

IF F1DecidingFactor = YES Then
Join LHSPipe with RHS Lookup#1 BY (LHSPipe.F1Input = RHS Lookup#1.Join#F1) and set the result (SET LHSPipe.F1Output = Result#F1)
Otherwise
SET LHSPipe.F1Output = N/A

The same logic applies to F2 calculations.

Expected output,

enter image description here

THIS SITUATION FORCES ME TO USE A CUSTOM JOIN ACTION BECAUSE IF-ELSE DECIDES WHETHER TO JOIN OR NOT.

With the above in mind, I

want to make a MAP-SIDE connection (keeping RHSPipe in the memory of the MAP task node), and I’m considering the following possible solutions, each with its pros and cons. Your advice on this is needed.

Option #1:

CoGroup – We can use CoGroup and BufferJoiner to build custom join logic, followed by custom connections (actions), but this does not guarantee a MAP-SIDE connection.

Option #2:

HashJoin – It ensures a MAP-SIDE connection, but as far as I can tell, it is not possible to build a custom connection using it.

Please correct my understanding and offer your opinion to address this need.

Thanks in advance.

Solution

The best way to solve this problem (I can think of) is to modify smaller datasets. You can add a new field (F1DecidingFactor) to a smaller dataset. The value of F1Result should look like this:

Sudo code

if F1DecidingFactor == "Yes" then
    F1Result = ACTUAL_VALUE
else
    F1Result = "N/A"

Results table

| F1#Join| F1#Result| F1#DecidingFactor|
|    Yes|        0|             True|
|    Yes|        1|            False|
|     No|        0|              N/A|
|     No|        1|              N/A|

You can also do this by cascading.

After that, you can make a map-side connection.

If modifying a smaller dataset is not possible, then I have 2 options to solve the problem.

Option 1

Add a new field to your little pipeline, which is equivalent to your deciding factor (i.e. F1DecidingFactor_RHS = Yes). Then include it in your joining conditions. After joining is complete, you will only have values for those rows that match this criteria. Otherwise it will be empty/blank. Sample code:

Main class

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTestOption2 {
    public StackHashJoinTestOption2() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

Fields f1DecidingFactor = new Fields("F1DecidingFactor");
        Fields f2DecidingFactor = new Fields("F2DecidingFactor");
        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");

Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);

Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);

Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");

 Large Pipe fields : 
         F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

 Small Pipe 1 Fields : 
         F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

 New field to small pipe. Expected Fields:
         F1Join F1Result F1DecidingFactor_RHS
        rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);

 Small Pipe 2 Fields : 
         F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

 New field to small pipe. Expected Fields:
         F2Join F2Result F2DecidingFactor_RHS
        rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);

 Joining first small pipe. Expected fields after join: 
         F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
        Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());

 Joining second small pipe. Expected fields after join: 
         F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
        Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());

Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);

result = new Discard(result, f1DecidingFactorRhs);
        result = new Discard(result, f2DecidingFactorRhs);

 result Pipe should have expected result
    }
}

Option 2

If you want to use the default value instead of null/blank, then I recommend that you first do a HashJoin with the default connector, and then use a function to update the tuple with the appropriate value. Something like this:

Main class

import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTest {
    public StackHashJoinTest() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");

 Large Pipe fields : 
         F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

 Small Pipe 1 Fields : 
         F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

 Small Pipe 2 Fields : 
         F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

 Joining first small pipe. 
         Expected fields after join: 
         F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
        Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());

 Joining second small pipe. 
         Expected fields after join: 
         F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
        Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());

Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);

 result Pipe should have expected result
    }
}

Update the function

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class TestFunction extends BaseOperation<Void> implements Function<Void> {

private static final long serialVersionUID = 1L;

private static final String DECIDING_FACTOR = "No";
    private static final String DEFAULT_VALUE = "N/A";

 Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"
    public TestFunction() {
        super(Fields.ARGS);
    }

@Override
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {
        TupleEntry arguments = call.getArguments();

TupleEntry result = new TupleEntry(arguments);

if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F1Output", DEFAULT_VALUE);
        }

if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F2Output", DEFAULT_VALUE);
        }

call.getOutputCollector().add(result);
    }

}

Citations

This should solve your problem. Please let me know if this helps.

Related Problems and Solutions