|
|
|
|
@@ -343,24 +343,25 @@ func (obj *MapFunc) Stream(ctx context.Context) error {
|
|
|
|
|
|
|
|
|
|
func (obj *MapFunc) replaceSubGraph(subgraphInput interfaces.Func) error {
|
|
|
|
|
// Create a subgraph which splits the input list into 'n' nodes, applies
|
|
|
|
|
// 'newFuncValue' to each, then combines the 'n' outputs back into a list.
|
|
|
|
|
// 'newFuncValue' to each, then combines the 'n' outputs back into a
|
|
|
|
|
// list.
|
|
|
|
|
//
|
|
|
|
|
// Here is what the subgraph looks like:
|
|
|
|
|
//
|
|
|
|
|
// digraph {
|
|
|
|
|
// "subgraphInput" -> "inputElemFunc0"
|
|
|
|
|
// "subgraphInput" -> "inputElemFunc1"
|
|
|
|
|
// "subgraphInput" -> "inputElemFunc2"
|
|
|
|
|
// "subgraphInput" -> "inputElemFunc0"
|
|
|
|
|
// "subgraphInput" -> "inputElemFunc1"
|
|
|
|
|
// "subgraphInput" -> "inputElemFunc2"
|
|
|
|
|
//
|
|
|
|
|
// "inputElemFunc0" -> "outputElemFunc0"
|
|
|
|
|
// "inputElemFunc1" -> "outputElemFunc1"
|
|
|
|
|
// "inputElemFunc2" -> "outputElemFunc2"
|
|
|
|
|
// "inputElemFunc0" -> "outputElemFunc0"
|
|
|
|
|
// "inputElemFunc1" -> "outputElemFunc1"
|
|
|
|
|
// "inputElemFunc2" -> "outputElemFunc2"
|
|
|
|
|
//
|
|
|
|
|
// "outputElemFunc0" -> "outputListFunc"
|
|
|
|
|
// "outputElemFunc1" -> "outputListFunc"
|
|
|
|
|
// "outputElemFunc1" -> "outputListFunc"
|
|
|
|
|
// "outputElem0" -> "outputListFunc"
|
|
|
|
|
// "outputElem1" -> "outputListFunc"
|
|
|
|
|
// "outputElem2" -> "outputListFunc"
|
|
|
|
|
//
|
|
|
|
|
// "outputListFunc" -> "subgraphOutput"
|
|
|
|
|
// "outputListFunc" -> "subgraphOutput"
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
const channelBasedSinkFuncArgNameEdgeName = structs.ChannelBasedSinkFuncArgName // XXX: not sure if the specific name matters.
|
|
|
|
|
|