Example

Definition of Dag

DagStream class convert your functions into dag nodes.

import dagstream

def funcA():
   print("funcA")
   return 10

def funcB(val_from_A: int):
   print(f"funcB, received {val_from_A} from A")

def funcC(val_from_A: int):
   print(f"funcC, received {val_from_A} from A")
   return 20

def funcD(val_from_C: int):
   print(f"funcD, received {val_from_C} from C")

def funcE():
   print("funcE")

def funcF():
   print("funcF")


stream = dagstream.DagStream()
# convert to functional nodes
A, B, C, D, E, F = stream.emplace(funcA, funcB, funcC, funcD, funcE, funcF)

# define relationship betweeen functional nodes

# A executes before B and C
# output of A passed to B and C
A.precede(B, C, pipe=True)

# E executes after B, C and D
E.succeed(B, C, D)

# D executes after C
# D receives output of C
D.succeed(C, pipe=True)

F.succeed(E)

Execute Dag

After definition of relationship between functional nodes, execute all.

from dagstream.executor import StreamExecutor

# construct functional dag
functional_dag = stream.construct()
executor = StreamExecutor(functional_dag)
executor.run()

In console, following items are shown.

funcA
funcB, received 10 from A
funcC, received 10 from A
funcD, received 20 from C
funcE
funcF

When executing all functions in parallel, Use StreamParallelExecutor.

from dagstream.executor import StreamParallelExecutor

# construct functional dag
functional_dag = stream.construct()
# Run in parallel by using 4 processes
executor = StreamParallelExecutor(functional_dag, n_processes=4)
executor.run()

Draw mermaid object

By using MermaidDrawer, you can output dag structure to text file as mermaid style.

from dagstream.viewers import MermaidDrawer

drawer.output(functional_dag, "workspace/sample.mmd")

Content of text file is shown below.

 1stateDiagram
 2    direction LR
 3    state "funcA" as state_0
 4    state "funcB" as state_1
 5    state "funcC" as state_2
 6    state "funcD" as state_3
 7    state "funcE" as state_4
 8    state "funcF" as state_5
 9    [*] --> state_0
10    state_0 --> state_1: Pipe
11    state_0 --> state_2: Pipe
12    state_1 --> state_4
13    state_2 --> state_4
14    state_2 --> state_3: Pipe
15    state_3 --> state_4
16    state_4 --> state_5
17    state_5 --> [*]

By rendering,

stateDiagram direction LR state "funcA" as state_0 state "funcB" as state_1 state "funcC" as state_2 state "funcD" as state_3 state "funcE" as state_4 state "funcF" as state_5 [*] --> state_0 state_0 --> state_1: Pipe state_0 --> state_2: Pipe state_1 --> state_4 state_2 --> state_4 state_2 --> state_3: Pipe state_3 --> state_4 state_4 --> state_5 state_5 --> [*]

Extract sub graph

It is able to extract sub dag from whole one after defining relationship between functions.

# construct functional dag
# Extract minimum sub dag graph which is necessary for executing B and D
functional_dag = stream.construct(mandatory_nodes=[B, D])

# execute as same
executor = StreamExecutor(functional_dag)
executor.run()

# output as same
drawer.output(functional_dag, "workspace/extract.mmd")

In console, following items are shown.

funcA
funcB, received 10 from A
funcC, received 10 from A
funcD, received 20 from C

Drawing by mermaid, sub-dag graph is shown below.

stateDiagram direction LR state "funcB" as state_0 state "funcA" as state_1 state "funcD" as state_2 state "funcC" as state_3 state_0 --> [*] [*] --> state_1 state_1 --> state_0: Pipe state_1 --> state_3: Pipe state_2 --> [*] state_3 --> state_2: Pipe