zmc
2023-08-08 e792e9a60d958b93aef96050644f369feb25d61b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
 
# -*- coding: utf-8 -*-
 
"""
知识图谱
"""
 
import re
import sys
import math
import time
from .base import AipBase
from .base import base64
from .base import json
from .base import urlencode
from .base import quote
 
class AipKg(AipBase):
 
    """
    知识图谱
    """
 
    __createTaskUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_create'
 
    __updateTaskUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_update'
 
    __taskInfoUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_info'
 
    __taskQueryUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_query'
 
    __taskStartUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_start'
 
    __taskStatusUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_status'
 
    
    def createTask(self, name, template_content, input_mapping_file, output_file, url_pattern, options=None):
        """
            创建任务
        """
        options = options or {}
 
        data = {}
        data['name'] = name
        data['template_content'] = template_content
        data['input_mapping_file'] = input_mapping_file
        data['output_file'] = output_file
        data['url_pattern'] = url_pattern
 
        data.update(options)
 
        return self._request(self.__createTaskUrl, data)
    
    def updateTask(self, id, options=None):
        """
            更新任务
        """
        options = options or {}
 
        data = {}
        data['id'] = id
 
        data.update(options)
 
        return self._request(self.__updateTaskUrl, data)
    
    def getTaskInfo(self, id, options=None):
        """
            获取任务详情
        """
        options = options or {}
 
        data = {}
        data['id'] = id
 
        data.update(options)
 
        return self._request(self.__taskInfoUrl, data)
    
    def getUserTasks(self, options=None):
        """
            以分页的方式查询当前用户所有的任务信息
        """
        options = options or {}
 
        data = {}
 
        data.update(options)
 
        return self._request(self.__taskQueryUrl, data)
    
    def startTask(self, id, options=None):
        """
            启动任务
        """
        options = options or {}
 
        data = {}
        data['id'] = id
 
        data.update(options)
 
        return self._request(self.__taskStartUrl, data)
    
    def getTaskStatus(self, id, options=None):
        """
            查询任务状态
        """
        options = options or {}
 
        data = {}
        data['id'] = id
 
        data.update(options)
 
        return self._request(self.__taskStatusUrl, data)