使用Python连接AWSS3存储桶

AWS配置

1
2
3
4
5
6
访问S3需要aws_access_key_id和aws_secret_access_key。
func1.在boto3提供的API中,可以指定这两个值。
func2.但是把安全相关的这些信息放到代码中并不是一个好的选择。
所以,也可通过awscli配置,将其存放到~/.aws/credentials。
boto3的API在访问S3时,会自动从该文件中读取相关信息。
awscli可以通过pip install awscli进行安装。

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import os
import boto3
from loguru import logger


BUCKET_NAME = "your-bucket-name" # 存储桶名称

# aws_access_key_id和aws_secret_access_key
CN_S3_AKI = 'your_aws_access_key_id'
CN_S3_SAK = 'your_aws_secret_access_key'

CN_REGION_NAME = 'your-cn-region-name' # 区域

# s3 实例
s3 = boto3.client('s3', region_name=CN_REGION_NAME,
aws_access_key_id=CN_S3_AKI, aws_secret_access_key=CN_S3_SAK
)


def upload_files(path_local, path_s3):
"""
上传(重复上传会覆盖同名文件)
:param path_local: 本地路径
:param path_s3: s3路径
"""
logger.info(f'Start upload files.')

if not upload_single_file(path_local, path_s3):
logger.error(f'Upload files failed.')

logger.info(f'Upload files successful.')


def upload_single_file(src_local_path, dest_s3_path):
"""
上传单个文件
:param src_local_path:
:param dest_s3_path:
:return:
"""
try:
with open(src_local_path, 'rb') as f:
s3.upload_fileobj(f, BUCKET_NAME, dest_s3_path)
except Exception as e:
logger.error(f'Upload data failed. | src: {src_local_path} | dest: {dest_s3_path} | Exception: {e}')
return False
logger.info(f'Uploading file successful. | src: {src_local_path} | dest: {dest_s3_path}')
return True


def download_zip(path_s3, path_local):
"""
下载
:param path_s3:
:param path_local:
:return:
"""
retry = 0
while retry < 3: # 下载异常尝试3次
logger.info(f'Start downloading files. | path_s3: {path_s3} | path_local: {path_local}')
try:
s3.download_file(BUCKET_NAME, path_s3, path_local)
file_size = os.path.getsize(path_local)
logger.info(f'Downloading completed. | size: {round(file_size / 1048576, 2)} MB')
break # 下载完成后退出重试
except Exception as e:
logger.error(f'Download zip failed. | Exception: {e}')
retry += 1

if retry >= 3:
logger.error(f'Download zip failed after max retry.')


def delete_s3_zip(path_s3, file_name=''):
"""
删除
:param path_s3:
:param file_name:
:return:
"""
try:
# copy
# copy_source = {'Bucket': BUCKET_NAME, 'Key': path_s3}
# s3.copy_object(CopySource=copy_source, Bucket=BUCKET_NAME, Key='is-zips-cache/' + file_name)
s3.delete_object(Bucket=BUCKET_NAME, Key=path_s3)
except Exception as e:
logger.error(f'Delete s3 file failed. | Exception: {e}')
logger.info(f'Delete s3 file Successful. | path_s3 = {path_s3}')


def batch_delete_s3(delete_key_list):
"""
批量删除
:param delete_key_list: [
{'Key': "test-01/虎式03的副本.jpeg"},
{'Key': "test-01/tank001.png"},
]
:return:
"""
try:
res = s3.delete_objects(
Bucket=BUCKET_NAME,
Delete={'Objects': delete_key_list}
)
except Exception as e:
logger.error(f"Batch delete file failed. | Excepthon: {e}")
logger.info(f"Batch delete file success. ")



def get_files_list(Prefix=None):
"""
查询
:param start_after:
:return:
"""
logger.info(f'Start getting files from s3.')
try:
if Prefix is not None:
all_obj = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=Prefix)

# 获取某个对象的head信息
# obj = s3.head_object(Bucket=BUCKET_NAME, Key=Prefix)
# logger.info(f"obj = {obj}")
else:
all_obj = s3.list_objects_v2(Bucket=BUCKET_NAME)

except Exception as e:
logger.error(f'Get files list failed. | Exception: {e}')
return

contents = all_obj.get('Contents')
logger.info(f"--- contents = {contents}")
if not contents:
return

file_name_list = []
for zip_obj in contents:
# logger.info(f"zip_obj = {zip_obj}")
file_size = round(zip_obj['Size'] / 1024 / 1024, 3) # 大小
# logger.info(f"file_path = {zip_obj['Key']}")
# logger.info(f"LastModified = {zip_obj['LastModified']}")
# logger.info(f"file_size = {file_size} Mb")
# zip_name = zip_obj['Key'][len(start_after):]
zip_name = zip_obj['Key']

file_name_list.append(zip_name)

logger.info(f'Get file list successful.')

return file_name_list


# TODO 检测是否存在 check_exists,可用get_files_list时遍历路径下是否存在要检测的文件名称来判断
# def check_exists(bucket_name, path_name):
# s3 = boto3.resource('s3')
# bucket = s3.Bucket(bucket_name)
# counter = 0
# for _ in bucket.objects.filter(Prefix=path_name):
# counter = counter + 1
# if counter > 0:
# return True
# return False


if __name__ == "__main__":
pass
# TODO test 查询 / 上传 / 下载 /删除
# 查询
# file_name_list = get_files_list(Prefix='test-01/tank001.png')
file_name_list = get_files_list()
logger.info(f"file_name_list = {file_name_list}")

# 上传
# path_local = '/Users/Desktop/test/local_file_path/虎式02的副本.jpeg'
# path_s3 = 'test-02/虎式02的副本.jpeg' # s3路径不存在则自动创建
# upload_files(path_local, path_s3)

# 下载
# path_s3 = 'test-01/虎式02的副本.jpeg'
# path_local = '/Users/Desktop/test/download_from_s3/虎式02的副本.jpeg'
# download_zip(path_s3, path_local)

# 删除
# path_s3 = 'test-01/虎式02的副本.jpeg'
# delete_s3_zip(path_s3)

# 批量删除 - 桶内多个文件
# delete_key_list = [
# {'Key': "test-01/虎式02.jpeg"},
# {'Key': "test-01/虎式03.jpeg"},
# {'Key': "test-01/tank001.png"},
# {'Key': "test-01/face.png"},
# {'Key': "test-01/114.png"},
# {'Key': "test-01/xiaox.JPG"},
# {'Key': "test-01/face.jpg"},
# ]
# batch_delete_s3(delete_key_list)

使用Python连接AWSS3存储桶
https://johnnysxy.github.io/2023/04/24/使用Python连接AWSS3存储桶/
作者
Johnny Song
发布于
2023年4月24日
许可协议