argo workflow笔记(四)

argo python adls支持库分析

Posted by odin on April 17, 2020

写在前面

如果您不熟悉Argo,建议您以纯YAML格式查看示例. 该语言是描述性的,Argo 示例提供了详尽的解释. 对于经验丰富的受众,此DSL授予您使用Python编程定义Argo工作流的能力,然后将其转换为Argo YAML规范. DSL使用在Argo Python客户端存储库中定义的Argo模型. 结合这两种方法,我们得到了对Argo Workflows的整个底层控制.

1
2
3
4
# 首先安装python-argo客户端
pip install argo-workflows
# 再安装argo-workflow-dsl
pip install argo-workflow-dsl

argo-client-python

与Kubernetes client很类似,这里不做过多分析,参考github即可。argo-client-python

argo-python-dsl

argo-python-dsl 将argo主要的几种template模式实现在argo.workflows.dsl下的template.py中,通过引用template,添加相应的模板装饰器即可,函数对象式实现argo adsl。 下面是几种argo-python-dsl与argo adsl的对比

容器模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template
 
from argo.workflows.dsl.templates import V1Container
 
 
class HelloWorld(Workflow):
 
    entrypoint = "whalesay"
 
    @template
    def whalesay(self) -> V1Container:
        container = V1Container(
            image="docker/whalesay:latest",
            name="whalesay",
            command=["cowsay"],
            args=["hello world"]
        )
 
        return container

转换为adsl如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# @file: hello-world.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: hello-world
  generateName: hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    container:
      name: whalesay
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["hello world"]

DAG模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from argo.workflows.dsl import Workflow
 
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *
 
 
class DagDiamond(Workflow):
 
    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
 
    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
 
    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
 
    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
 
    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", ""],
        )
 
        return container

转换为adsl如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# @file: dag-diamond.yaml
# The following workflow executes a diamond workflow
#
#   A
#  / \
# B   C
#  \ /
#   D
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: dag-diamond
  generateName: dag-diamond-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]
 
  # @task: [A, B, C, D]
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      name: echo
      image: alpine:3.7
      command: [echo, ""]

传递参数模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from argo.workflows.dsl import Workflow
 
 
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *
 
class ArtifactPassing(Workflow):
 
    @task
    def generate_artifact(self) -> V1alpha1Template:
        return self.whalesay()
 
    @task
    @artifact(
        name="message",
        _from=""
    )
    def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
        return self.print_message(message=message)
 
    @template
    @outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
    def whalesay(self) -> V1Container:
        container = V1Container(
            name="whalesay",
            image="docker/whalesay:latest",
            command=["sh", "-c"],
            args=["cowsay hello world | tee /tmp/hello_world.txt"]
        )
 
        return container
 
    @template
    @inputs.artifact(name="message", path="/tmp/message")
    def print_message(self, message: V1alpha1Artifact) -> V1Container:
        container = V1Container(
            name="print-message",
            image="alpine:latest",
            command=["sh", "-c"],
            args=["cat", "/tmp/message"],
        )
 
        return container

转换为adsl如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# @file: artifacts.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: artifact-passing
  generateName: artifact-passing-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: generate-artifact
        template: whalesay
      - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: ""
 
  - name: whalesay
    container:
      name: "whalesay"
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt
 
  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      name: "print-message"
      image: alpine:latest
      command: [sh, -c]
      args: ["cat", "/tmp/message"]

脚本模板

最有趣的部分,在argo adsl中支持脚本的形式是在yaml文件中source关键字处以文本形式编写代码,argo-python-dsl提供@closure和@scope两种装饰器,提供两种不同的对script风格的支持,后者更加符合pythonic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# @template形式,以字符串形式赋值给source
import textwrap
 
class ScriptsPython(Workflow):
 
    ...
 
    @template
    def gen_random_int(self) -> V1alpha1ScriptTemplate:
        source = textwrap.dedent("""\
          import random
          i = random.randint(1, 100)
          print(i)
        """)
 
        template = V1alpha1ScriptTemplate(
            image="python:alpine3.6",
            name="gen-random-int",
            command=["python"],
            source=source
        )
 
        return template
1
2
3
4
5
6
7
8
9
10
11
12
13
# @closure形式,符合我们的风格
class ScriptsPython(Workflow):
 
    ...
 
    @closure(
      image="python:alpine3.6"
    )
    def gen_random_int() -> V1alpha1ScriptTemplate:
          import random
 
          i = random.randint(1, 100)
          print(i)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 使用@scope可更进一步包装,支持避免单个函数过于复杂,可以将部分逻辑拆分至@scope中
  ...
 
 @closure(
   scope="main",
   image="python:alpine3.6"
 )
 def gen_random_int(main) -> V1alpha1ScriptTemplate:
       import random
 
       main.init_logging()
 
       i = random.randint(1, 100)
       print(i)
 
 @scope(name="main")
 def init_logging(level="DEBUG"):
     import logging
 
     logging_level = getattr(logging, level, "INFO")
     logging.getLogger("__main__").setLevel(logging_level)