jedoist-client/src/main.py

157 lines
5.2 KiB
Python
Raw Normal View History

2024-06-05 23:22:58 +03:00
import falcon
import json
2024-06-06 00:08:10 +03:00
import os
# https://falcon.readthedocs.io/en/stable/user/quickstart.html#learning-by-example
class AuthMiddleware:
def process_request(self, req, resp):
token = req.get_header('Authorization')
account_id = req.get_header('Account-ID')
challenges = ['Token type="Fernet"']
if token is None:
description = 'Please provide an auth token as part of the request.'
raise falcon.HTTPUnauthorized(
title='Auth token required',
description=description,
challenges=challenges,
href='http://docs.example.com/auth',
)
if not self._token_is_valid(token, account_id):
description = (
'The provided auth token is not valid. '
'Please request a new token and try again.'
)
raise falcon.HTTPUnauthorized(
title='Authentication required',
description=description,
challenges=challenges,
href='http://docs.example.com/auth',
)
def _token_is_valid(self, token, account_id):
return os.environ["TOKEN"] # Suuuuuure it's valid...
2024-06-05 23:22:58 +03:00
class RequireJSON:
def process_request(self, req, resp):
if not req.client_accepts_json:
raise falcon.HTTPNotAcceptable(
description='This API only supports responses encoded as JSON.',
href='http://docs.examples.com/api/json',
)
if req.method in ('POST', 'PUT'):
if req.content_type == None or 'application/json' not in req.content_type:
raise falcon.HTTPUnsupportedMediaType(
title='This API only supports requests encoded as JSON.',
href='http://docs.examples.com/api/json',
)
2024-06-06 00:08:10 +03:00
2024-06-05 23:22:58 +03:00
class JSONTranslator:
# NOTE: Normally you would simply use req.media and resp.media for
# this particular use case; this example serves only to illustrate
# what is possible.
def process_request(self, req, resp):
# req.stream corresponds to the WSGI wsgi.input environ variable,
# and allows you to read bytes from the request body.
#
# See also: PEP 3333
if req.content_length in (None, 0):
# Nothing to do
return
body = req.stream.read()
if not body:
raise falcon.HTTPBadRequest(
title='Empty request body',
description='A valid JSON document is required.',
)
try:
req.context.doc = json.loads(body.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
description = (
'Could not decode the request body. The '
'JSON was incorrect or not encoded as '
'UTF-8.'
)
raise falcon.HTTPBadRequest(title='Malformed JSON', description=description)
def process_response(self, req, resp, resource, req_succeeded):
if not hasattr(resp.context, 'result'):
return
resp.text = json.dumps(resp.context.result)
def max_body(limit):
def hook(req, resp, resource, params):
length = req.content_length
if length is not None and length > limit:
msg = (
'The size of the request is too large. The body must not '
'exceed ' + str(limit) + ' bytes in length.'
)
raise falcon.HTTPPayloadTooLarge(
title='Request body is too large', description=msg
)
return hook
2024-06-06 00:08:10 +03:00
########################################################################################################
2024-06-05 23:22:58 +03:00
class CreateTaskResource:
2024-06-06 00:08:10 +03:00
TODO_HEADER = '# ToDo'
def __init__(self) -> None:
self.__todo_path = "{0}/{1}".format(os.environ['TODO_PATH'], os.environ['TODO_FILE'])
2024-06-05 23:22:58 +03:00
def on_post(self, req, resp):
2024-06-06 00:08:10 +03:00
task = req.context.doc['task']
self.__create_task(task)
2024-06-05 23:22:58 +03:00
quote = {
'title': 'Получена задача',
2024-06-06 00:08:10 +03:00
'description': task
2024-06-05 23:22:58 +03:00
}
resp.media = quote
2024-06-06 00:08:10 +03:00
def __create_task(self, text: str) -> None:
content = []
section_found = False
is_waiting_empty_line = False
with open(self.__todo_path, 'r', encoding='UTF-8') as file:
while line := file.readline():
if section_found == False:
if line.strip() == CreateTaskResource.TODO_HEADER:
section_found = True
is_waiting_empty_line = True
content.append(line)
if section_found and is_waiting_empty_line == True and line.strip() == '':
content.append("- [ ] {0}\n".format(text))
is_waiting_empty_line = False
with open(self.__todo_path, 'w', encoding='utf-8') as file:
file.writelines(content)
########################################################################################################
2024-06-05 23:22:58 +03:00
app = falcon.App(
middleware=[
2024-06-06 00:08:10 +03:00
AuthMiddleware(),
2024-06-05 23:22:58 +03:00
RequireJSON(),
JSONTranslator()
]
)
app.add_route('/create-task', CreateTaskResource())