Example

Definition of Dag

DagStream class convert your functions into dag nodes.

import dagstream

def funcA():
   print("funcA")
   return 10

def funcB(val_from_A: int):
   print(f"funcB, received {val_from_A} from A")

def funcC(val_from_A: int):
   print(f"funcC, received {val_from_A} from A")
   return 20

def funcD(val_from_C: int):
   print(f"funcD, received {val_from_C} from C")

def funcE():
   print("funcE")

def funcF():
   print("funcF")


stream = dagstream.DagStream()
# convert to functional nodes
A, B, C, D, E, F = stream.emplace(funcA, funcB, funcC, funcD, funcE, funcF)

# define relationship betweeen functional nodes

# A executes before B and C
# output of A passed to B and C
A.precede(B, C, pipe=True)

# E executes after B, C and D
E.succeed(B, C, D)

# D executes after C
# D receives output of C
D.succeed(C, pipe=True)

F.succeed(E)

Execute Dag

After definition of relationship between functional nodes, execute all.

from dagstream.executor import StreamExecutor

# construct functional dag
functional_dag = stream.construct()
executor = StreamExecutor(functional_dag)
executor.run()

In console, following items are shown.

funcA
funcB, received 10 from A
funcC, received 10 from A
funcD, received 20 from C
funcE
funcF

When executing all functions in parallel, Use StreamParallelExecutor.

from dagstream.executor import StreamParallelExecutor

# construct functional dag
functional_dag = stream.construct()
# Run in parallel by using 4 processes
executor = StreamParallelExecutor(functional_dag, n_processes=4)
executor.run()

Draw mermaid object

By using MermaidDrawer, you can output dag structure to text file as mermaid style.

from dagstream.viewers import MermaidDrawer

drawer.output(functional_dag, "workspace/sample.mmd")

Content of text file is shown below.

 1stateDiagram
 2    direction LR
 3    state "funcA" as state_0
 4    state "funcB" as state_1
 5    state "funcC" as state_2
 6    state "funcD" as state_3
 7    state "funcE" as state_4
 8    state "funcF" as state_5
 9    [*] --> state_0
10    state_0 --> state_1: Pipe
11    state_0 --> state_2: Pipe
12    state_1 --> state_4
13    state_2 --> state_4
14    state_2 --> state_3: Pipe
15    state_3 --> state_4
16    state_4 --> state_5
17    state_5 --> [*]

By rendering,

        stateDiagram
    direction LR
    state "funcA" as state_0
    state "funcB" as state_1
    state "funcC" as state_2
    state "funcD" as state_3
    state "funcE" as state_4
    state "funcF" as state_5
    [*] --> state_0
    state_0 --> state_1: Pipe
    state_0 --> state_2: Pipe
    state_1 --> state_4
    state_2 --> state_4
    state_2 --> state_3: Pipe
    state_3 --> state_4
    state_4 --> state_5
    state_5 --> [*]
    

Extract sub graph

It is able to extract sub dag from whole one after defining relationship between functions.

# construct functional dag
# Extract minimum sub dag graph which is necessary for executing B and D
functional_dag = stream.construct(mandatory_nodes=[B, D])

# execute as same
executor = StreamExecutor(functional_dag)
executor.run()

# output as same
drawer.output(functional_dag, "workspace/extract.mmd")

In console, following items are shown.

funcA
funcB, received 10 from A
funcC, received 10 from A
funcD, received 20 from C

Drawing by mermaid, sub-dag graph is shown below.

        stateDiagram
    direction LR
    state "funcB" as state_0
    state "funcA" as state_1
    state "funcD" as state_2
    state "funcC" as state_3
    state_0 --> [*]
    [*] --> state_1
    state_1 --> state_0: Pipe
    state_1 --> state_3: Pipe
    state_2 --> [*]
    state_3 --> state_2: Pipe