Example¶
Definition of Dag¶
DagStream class convert your functions into dag nodes.
import dagstream
def funcA():
print("funcA")
return 10
def funcB(val_from_A: int):
print(f"funcB, received {val_from_A} from A")
def funcC(val_from_A: int):
print(f"funcC, received {val_from_A} from A")
return 20
def funcD(val_from_C: int):
print(f"funcD, received {val_from_C} from C")
def funcE():
print("funcE")
def funcF():
print("funcF")
stream = dagstream.DagStream()
# convert to functional nodes
A, B, C, D, E, F = stream.emplace(funcA, funcB, funcC, funcD, funcE, funcF)
# define relationship betweeen functional nodes
# A executes before B and C
# output of A passed to B and C
A.precede(B, C, pipe=True)
# E executes after B, C and D
E.succeed(B, C, D)
# D executes after C
# D receives output of C
D.succeed(C, pipe=True)
F.succeed(E)
Execute Dag¶
After definition of relationship between functional nodes, execute all.
from dagstream.executor import StreamExecutor
# construct functional dag
functional_dag = stream.construct()
executor = StreamExecutor(functional_dag)
executor.run()
In console, following items are shown.
funcA
funcB, received 10 from A
funcC, received 10 from A
funcD, received 20 from C
funcE
funcF
When executing all functions in parallel, Use StreamParallelExecutor.
from dagstream.executor import StreamParallelExecutor
# construct functional dag
functional_dag = stream.construct()
# Run in parallel by using 4 processes
executor = StreamParallelExecutor(functional_dag, n_processes=4)
executor.run()
Draw mermaid object¶
By using MermaidDrawer, you can output dag structure to text file as mermaid style.
from dagstream.viewers import MermaidDrawer
drawer.output(functional_dag, "workspace/sample.mmd")
Content of text file is shown below.
1stateDiagram
2 direction LR
3 state "funcA" as state_0
4 state "funcB" as state_1
5 state "funcC" as state_2
6 state "funcD" as state_3
7 state "funcE" as state_4
8 state "funcF" as state_5
9 [*] --> state_0
10 state_0 --> state_1: Pipe
11 state_0 --> state_2: Pipe
12 state_1 --> state_4
13 state_2 --> state_4
14 state_2 --> state_3: Pipe
15 state_3 --> state_4
16 state_4 --> state_5
17 state_5 --> [*]
By rendering,
stateDiagram
direction LR
state "funcA" as state_0
state "funcB" as state_1
state "funcC" as state_2
state "funcD" as state_3
state "funcE" as state_4
state "funcF" as state_5
[*] --> state_0
state_0 --> state_1: Pipe
state_0 --> state_2: Pipe
state_1 --> state_4
state_2 --> state_4
state_2 --> state_3: Pipe
state_3 --> state_4
state_4 --> state_5
state_5 --> [*]
Extract sub graph¶
It is able to extract sub dag from whole one after defining relationship between functions.
# construct functional dag
# Extract minimum sub dag graph which is necessary for executing B and D
functional_dag = stream.construct(mandatory_nodes=[B, D])
# execute as same
executor = StreamExecutor(functional_dag)
executor.run()
# output as same
drawer.output(functional_dag, "workspace/extract.mmd")
In console, following items are shown.
funcA
funcB, received 10 from A
funcC, received 10 from A
funcD, received 20 from C
Drawing by mermaid, sub-dag graph is shown below.
stateDiagram
direction LR
state "funcB" as state_0
state "funcA" as state_1
state "funcD" as state_2
state "funcC" as state_3
state_0 --> [*]
[*] --> state_1
state_1 --> state_0: Pipe
state_1 --> state_3: Pipe
state_2 --> [*]
state_3 --> state_2: Pipe