0%

利用树莓派制作天气闹钟

Note

讯飞API升级,旧的语音合成接口已不可用,新的实现方式直接扔github了。

前言

为了能让刚买的树莓派有效的利用起来(避免吃灰),今天分享一下如何用树莓派做天气闹钟。

环境及工具

树莓派3B+、IDE、XShell、FileZilla(FTP文件上传)、小音箱。

查询天气

准备

既然要做天气闹钟,那肯定先要知道今天的天气是什么,查询天气服务还是很大众的一种服务,很多网站都可以提供了查询天气的API接口,搜索一下。
做数据服务的网站就那么几家,简单浏览之后,选择了阿里云市场里的墨迹天气API(免费是重点,免费的可用1000次,最近有0元/10000次的活动)。云市场-免费版气象服务(cityid)-墨迹天气
这个API提供一个根据城市Id查询三天精简天气预报的接口,深得我心,买。

购买成功后,需要从阿里云控制台-产品与服务-API网关-调用API-已购API中查到请求Token。详细查询过程

image

点击操作中的详情可以看到Token, 之后需要点击授权按钮,为你要调取的接口生成一个授权码(阿里云API网关需要这个)。

接口详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
url: http://freecityid.market.alicloudapi.com/whapi/json/alicityweather/briefforecast3days
method: POST
body: {
"cityid": "城市id",
“token”: "API详情页查询到的"
}

resp: {
"code": 0,
"data": {
"city": {
"cityId": 284609,
"counname": "中国",
"name": "东城区",
"pname": "北京市"
},
"forecast": [
{
"conditionDay": "多云",
"conditionIdDay": "1",
"conditionIdNight": "31",
"conditionNight": "多云",
"predictDate": "2016-09-01",
"tempDay": "27",
"tempNight": "18",
"updatetime": "2016-09-01 09:07:08",
"windDirDay": "西北风",
"windDirNight": "西北风",
"windLevelDay": "3",
"windLevelNight": "2"
},
...省略两个...
]
},
"msg": "success",
"rc": {
"c": 0,
"p": "success"
}
}

拿到接口,接下来肯定就是写代码调用接口了,调用过程中涉及的部分问题,都写在代码注释里了(这里及后面的所有请求都是用的python的requests库,本身娱乐项目,也就没有生成requirement文件)。

Num2Word.py (将数字转为中文字符串)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import math

class Num2Word:
words = {
0: '零',
1: '一',
2: '二',
3: '三',
4: '四',
5: '五',
6: '六',
7: '七',
8: '八',
9: '九',
10: '十',
100: '百',
1000: '千',
10000: '万',
}

# TODO 1024 -> 一千二十四 ==> 1024 -> 一千零二十四
# TODO 1004 -> 一千四 ==> 1024 -> 一千零四
# TODO 1024.2 小数点
@staticmethod
def to_word(num):
if isinstance(num, int):
pass
elif isinstance(num, str):
num = int(num)
else:
raise TypeError('num must be int or str')
if num < 0:
return '负' + Num2Word.to_word(-num)
else:
quotient = num
remainder = 0
s = ""
ten_num = 0
while quotient > 0:
quotient = int(num / 10)
remainder = num % 10
if remainder > 0:
if ten_num > 0:
s = Num2Word.words[remainder] + Num2Word.words[int(math.pow(10, ten_num))] + s
else:
s = Num2Word.words[remainder] + s
num = int(num / 10)
ten_num += 1
return s

MoJiWeather.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import requests
import json
import logging
import sys
import os
from sys import path
from Num2Word import Num2Word
from VoicePlayer import VoicePlayer
from XunFeiTTS import XunFeiTTS

logging.basicConfig(
level=logging.DEBUG,
handlers=[logging.StreamHandler()],
format='%(levelname)s:%(asctime)s:%(message)s'
)

class RespBody():
# data = "{\"code\":0,\"data\":{\"city\":{\"cityId\":50,\"counname\":\"中国\",\"name\":\"闵行区\",\"pname\":\"上海市\",\"timezone\":\"8\"},\"forecast\":[{\"conditionDay\":\"多云\",\"conditionIdDay\":\"1\",\"conditionIdNight\":\"31\",\"conditionNight\":\"多云\",\"predictDate\":\"2018-10-17\",\"tempDay\":\"23\",\"tempNight\":\"14\",\"updatetime\":\"2018-10-17 22:09:00\",\"windDirDay\":\"北风\",\"windDirNight\":\"北风\",\"windLevelDay\":\"3-4\",\"windLevelNight\":\"3-4\"},{\"conditionDay\":\"多云\",\"conditionIdDay\":\"1\",\"conditionIdNight\":\"31\",\"conditionNight\":\"多云\",\"predictDate\":\"2018-10-18\",\"tempDay\":\"21\",\"tempNight\":\"12\",\"updatetime\":\"2018-10-17 22:09:00\",\"windDirDay\":\"北风\",\"windDirNight\":\"北风\",\"windLevelDay\":\"5-6\",\"windLevelNight\":\"3-4\"},{\"conditionDay\":\"多云\",\"conditionIdDay\":\"1\",\"conditionIdNight\":\"31\",\"conditionNight\":\"多云\",\"predictDate\":\"2018-10-19\",\"tempDay\":\"22\",\"tempNight\":\"13\",\"updatetime\":\"2018-10-17 22:09:00\",\"windDirDay\":\"东北风\",\"windDirNight\":\"东北风\",\"windLevelDay\":\"3-4\",\"windLevelNight\":\"3\"}]},\"msg\":\"success\",\"rc\":{\"c\":0,\"p\":\"success\"}}"
def __init__(self, d) -> None:
self.__dict__ = d

class Forecast():

def __init__(self, d) -> None:
self.prdict_date = d.predictDate # yyyy-MM-dd
self.update_time = d.updatetime # yyyy-MM-dd HH:mm:ss
self.condition_day = d.conditionDay # 多云
self.condition_night = d.conditionNight # 多云
self.temp_day = d.tempDay # 23
self.temp_night = d.tempNight # 14
self.wind_dir_day = d.windDirDay # 北风
self.wind_dir_night = d.windDirNight # 北风
self.wind_level_day = d.windLevelDay # 3-4
self.wind_level_night = d.windLevelNight # 4

def wind_level_to_word(self, wind_level):
wind_level = str(wind_level)
if not wind_level.__contains__('-'):
return Num2Word.to_word(wind_level)
return Num2Word.to_word(wind_level.split('-')[0]) + '至' + Num2Word.to_word(wind_level.split('-')[1])

def to_chinese(self):
# date转为文字
month = self.prdict_date.split('-')[1]
day = self.prdict_date.split('-')[2]
date_word = Num2Word.to_word(month) + '月' + Num2Word.to_word(day) + '日'
return "%s, 白天天气%s, 温度%s度, %s%s级, 夜间天气%s, 温度%s度, %s%s级" % \
(date_word, self.condition_day, Num2Word.to_word(self.temp_day), self.wind_dir_day, self.wind_level_to_word(self.wind_level_day),
self.condition_night, Num2Word.to_word(self.temp_night), self.wind_dir_night, self.wind_level_to_word(self.wind_level_night))

class MoJiWeather():
def __init__(self) -> None:
self.config = {
"baseURL": "http://freecityid.market.alicloudapi.com",
"forecastURL": "/whapi/json/alicityweather/briefforecast3days",
"AppCode": "阿里云的授权码",
"headers": {
"Host":"freecityid.market.alicloudapi.com",
"gateway_channel":"http",
"Content-Type":"application/x-www-form-urlencoded; charset=utf-8",
"Authorization":"APPCODE 阿里云的授权码"
},
"token": "墨迹天气token"
}
self.city_codes = {
"BeiJing": "2",
"ShangHaiMinHang": "50" # 国内城市地区id见末尾附录
}

def fetch_forecast(self, cityId):
req_body = {
"cityId": str(cityId),
"token": self.config["token"]
}
json_str = json.dumps(req_body)
url = self.config["baseURL"] + self.config["forecastURL"]
# print(url)
# print(self.config["headers"])
resp = requests.post(url=url, data=req_body, headers=self.config["headers"])
resp_json = resp.content.decode('utf8')
# print(resp_json)
# print(resp.headers)
logging.debug("[MoJiWeather.fetch_forecast] - status = %s" % resp.status_code)
logging.debug("[MoJiWeather.fetch_forecast] - resp json = %s" % resp_json)

resp_body = json.loads(resp_json, object_hook=RespBody)

code = resp_body.code
if code == 0:
data = resp_body.data
city = data.city
province_name = city.pname
city_name = city.name
logging.info("[MoJiWeather.fetch_forecast] - %s, %s" % (province_name, city_name))

three_days_forecast_list = data.forecast
return three_days_forecast_list
else:
logging.info("[MoJiWeather.fetch_forecast] - Resp Not Success")
return []

现在可以测试一下代码运行效果了,MojiWeather.fetch_forecast方法返回的是天气预报数组(字典数组)。为了方便测试,就直接在MoJiWeather中创建一个main方法来获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if __name__ == '__main__':
mo_ji_weather = MoJiWeather()
# 天气预报的数组
forecast_list = mo_ji_weather.fetch_forecast(mo_ji_weather.city_codes["ShangHaiMinHang"])

print(forecast_list)
forecast_words = []
for forecast in forecast_list:
# 将dict转为Forecast对象
f = Forecast(forecast)
# 将天气预报转为中文文字
forecast_words.append(f.to_chinese())
print(f.to_chinese())
# 将三个预报文本拼接成一个字符串
s = ",".join(forecast_words)

运行结果
image

语音合成

到目前为止,我们已经能够拿到最近三天内的天气预报了,既然是做天气闹钟,那就要让程序会“说话”,也就是把文字转为语音(语音合成)。国内做语音合成,第一个想到的就是讯飞了,而且讯飞语音合成也有免费版的(每日500次限额,只有一个发音人可选),讯飞TTS介绍页也可以体验语音合成,经测试,讯飞的TTS还是挺清晰的。

然后按照下面的步骤去获取讯飞的API-KEY,完成准备工作。
image

剩下的工作就是按文档下代码

XunFeiTTS.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/python3
# -*- coding:utf-8 -*-

import hashlib
import base64
import time
import json
import requests
import os
import logging

logging.basicConfig(
level=logging.DEBUG,
handlers=[logging.StreamHandler()],
format='%(levelname)s:%(asctime)s:%(message)s'
)

class XunFeiTTS:
def __init__(self) -> None:
self.app_id = "讯飞App id" # 讯飞的应用id
self.app_key = "讯飞TOKEN" # 讯飞的token
self.tts_url = "http://api.xfyun.cn/v1/service/v1/tts"

def __gen_sig(self, req_params_base64, time_now):
"""
授权认证,生成认证信息
:param req_params_base64: 请求参数的base64串
:param time_now: 当前时间
:return:
"""
s = self.app_key + time_now + req_params_base64
hl = hashlib.md5()
hl.update(s.encode(encoding='utf8'))
return hl.hexdigest()

def __gen_req_header(self, time_now, req_params_base64, sig):
"""
生成请求头
:param time_now: 当前时间
:param req_params_base64: 请求参数的base64串
:param sig:
:return:
"""
header = {
"X-Appid": self.app_id,
"X-CurTime": time_now,
"X-Param": req_params_base64,
"X-CheckSum": sig,
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
return header

def fetch_voice(self, text):
"""
根据传入text生成语音
:param text:
:return:
"""
req_params = {
"auf": "audio/L16;rate=16000",
"aue": "raw", # 返回的语音格式 raw为wav格式语音, lame为MP3格式语音
"voice_name": "xiaoyan",
"speed": "50",
"volume": "50",
"pitch": "50",
"engine_type": "intp65",
"text_type": "text",
"text": text + " 噻"
}

time_now = str(time.time()).split('.')[0]
req_params_json = json.dumps(req_params)
req_params_base64 = str(base64.b64encode(req_params_json.encode('ascii')).decode('ascii'))
header = self.__gen_req_header(time_now, req_params_base64, self.__gen_sig(req_params_base64, time_now))

resp = requests.post(url=self.tts_url, data=req_params, headers=header)
content_type = resp.headers['Content-type']

# 请求成功时, contentType为audio.mpeg, 失败时,contentType为text/plain, 返回异常信息
if content_type == 'audio/mpeg':
# 将语音写入文件voice.wav
f = open('voice.wav', 'wb')
f.write(resp.content)
f.close()

logging.info("[XunFeiTTS.fetch_voice] - Fetch Voice Success! Save As %s" % f.name)
else:
resp_json = resp.content.decode('utf-8')
logging.info("[XunFeiTTs.fetch_voice] - %s" % resp_json)
resp_dict = json.loads(resp_json)
logging.error("[XunFeiTTS.fetch_voice] - ErrCode = %s, Desc = %s" % (resp_dict['code'], resp_dict['desc']))

现在我们需要重新修改MoJiWeather中的main方法,调用讯飞TTS将天气预报的字符串转变为语音。
接下来就是使用pyaudio库来播放天气预报语音(python上有很多库可以播放音频,试了几个之后,感觉还是pyaudio更适合这个例子)。

如果在树莓派上使用pip安装pyaudio时出现Pyaudio installation error - 'command 'gcc' failed with exit status 1'错误,请在树莓派上执行下面的命令

1
2
3
4
5
sudo apt-get install python-dev
sudo apt-get install portaudio19-dev
sudo apt-get install libportaudio0 libportaudio2 libportaudiocpp0 portaudio19-dev

pip3 install pyaudio

播放wav语音的工具类

VoicePlayer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import pyaudio
import wave
import os
import logging

logging.basicConfig(
level=logging.DEBUG,
handlers=[logging.StreamHandler()],
format='%(levelname)s:%(asctime)s:%(message)s'
)

class VoicePlayer:

def __init__(self) -> None:
self.chunk = 1024

def play(self, filename):
logging.debug("[VoicePlayer.play] - load file %s" % filename)
chunk = 1024
wf = wave.open('voice.wav', 'rb')
p = pyaudio.PyAudio()

stream = p.open(
format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
data = wf.readframes(chunk)

while data != '':
stream.write(data)
data = wf.readframes(chunk)
if data == b'':
break

stream.close()
p.terminate()
logging.debug("[VoicePlayer.play] - Voice Play Finish")

整合

到这里,就可以把上面的代码整合到一起了,这里要注意一下Python的包引入问题,在MoJiWeather.py中会引入Num2Word.py,VoicePlayer.py,XunFeiTTS.py,在运行程序时请保证这几个文件都在同一目录下

MoJiWeather.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if __name__ == '__main__':
mo_ji_weather = MoJiWeather()
forecast_list = mo_ji_weather.fetch_forecast(mo_ji_weather.city_codes["ShangHaiMinHang"])

print(forecast_list)

xun_fei_tts = XunFeiTTS()
s = ""
forecast_words = []
for forecast in forecast_list:
f = Forecast(forecast)
forecast_words.append(f.to_chinese())
print(f.to_chinese())
s = ",".join(forecast_words)
logging.debug("[MojiWeather.main] - %s" % s)
xun_fei_tts.fetch_voice(s)

voice_player = VoicePlayer()
voice_player.play('voice.wav')

部署

程序编写完成后,将代码通过FileZilla上传工具把代码上传到树莓派。现在距离天气闹钟只差最后一步(闹钟)。借助linux的crond定时任务就可以很容易的实现闹钟这一功能。在树莓派上执行crontab -e就可以编辑crond任务了,第一次打开时会提示让你选择一个编辑器,按个人喜好选择即可。

使用crontab -e命令时要注意不要手滑按成crontab -r(这两个键挨着很近),后者是【清空corntab配置】。

crond 配置说明 image

附上本人树莓派上的crontab设置 image

实际效果

image
左边的就是淘宝上几十块买的小音响,分蓝牙和有线两种链接模式,颜值还是很高的。文章里的代码贴的比较多(娱乐项目,代码写的比较糟,想吐槽就吐吧orz),请各位看客海涵。

附录