1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
class RestInput(InputChannel):
"""A custom http input channel.
This implementation is the basis for a custom implementation of a chat
frontend. You can customize this to send messages to Rasa Core and
retrieve responses from the agent."""
@classmethod
def name(cls) -> Text:
return "rest"
@staticmethod
async def on_message_wrapper(
on_new_message: Callable[[UserMessage], Awaitable[Any]],
text: Text,
queue: Queue,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> None:
collector = QueueOutputChannel(queue)
message = UserMessage(
text, collector, sender_id, input_channel=input_channel, metadata=metadata
)
await on_new_message(message)
await queue.put("DONE") # pytype: disable=bad-return-type
async def _extract_sender(self, req: Request) -> Optional[Text]:
return req.json.get("sender", None)
# noinspection PyMethodMayBeStatic
def _extract_message(self, req: Request) -> Optional[Text]:
return req.json.get("message", None)
def _extract_input_channel(self, req: Request) -> Text:
return req.json.get("input_channel") or self.name()
def stream_response(
self,
on_new_message: Callable[[UserMessage], Awaitable[None]],
text: Text,
sender_id: Text,
input_channel: Text,
metadata: Optional[Dict[Text, Any]],
) -> Callable[[Any], Awaitable[None]]:
async def stream(resp: Any) -> None:
# 这里的Queue作用于生产者消费者模式
q = Queue()
task = asyncio.ensure_future(
self.on_message_wrapper(
on_new_message, text, q, sender_id, input_channel, metadata
)
)
result = None # declare variable up front to avoid pytype error
while True:
result = await q.get()
if result == "DONE":
break
else:
await resp.write(json.dumps(result) + "\n")
await task
return stream # pytype: disable=bad-return-type
# 这里的on_new_message就是上一个片段的代码中出现的:
# async def handler(*args, **kwargs):
# # 用来处理消息
# await app.agent.handle_message(*args, **kwargs)
def blueprint(
self, on_new_message: Callable[[UserMessage], Awaitable[None]]
) -> Blueprint:
custom_webhook = Blueprint(
"custom_webhook_{}".format(type(self).__name__),
inspect.getmodule(self).__name__,
)
# noinspection PyUnusedLocal
# 最后组合后的url为 ip:port/webhooks/rest/
@custom_webhook.route("/", methods=["GET"])
async def health(request: Request) -> HTTPResponse:
return response.json({"status": "ok"})
# 最后组合后的url为 ip:port/webhooks/rest/webhook
@custom_webhook.route("/webhook", methods=["POST"])
async def receive(request: Request) -> HTTPResponse:
sender_id = await self._extract_sender(request)
# 用户输入的消息
text = self._extract_message(request)
should_use_stream = rasa.utils.endpoints.bool_arg(
request, "stream", default=False
)
input_channel = self._extract_input_channel(request)
metadata = self.get_metadata(request)
if should_use_stream:
return response.stream(
self.stream_response(
on_new_message, text, sender_id, input_channel, metadata
),
content_type="text/event-stream",
)
else:
collector = CollectingOutputChannel()
# noinspection PyBroadException
try:
await on_new_message(
UserMessage(
text,
collector,
sender_id,
input_channel=input_channel,
metadata=metadata,
)
)
except CancelledError:
logger.error(
"Message handling timed out for "
"user message '{}'.".format(text)
)
except Exception:
logger.exception(
"An exception occured while handling "
"user message '{}'.".format(text)
)
return response.json(collector.messages)
return custom_webhook
|