задача

This commit is contained in:
ALEXSim 2026-05-08 11:23:10 +03:00
parent e1519804fc
commit 70022c0081
9 changed files with 585 additions and 0 deletions

33
.gitignore vendored Normal file
View File

@ -0,0 +1,33 @@
# Виртуальное окружение PyCharm
venv/
venv_*/
env/
.env/
ENV/
# Файлы PyCharm
.idea/
*.iml
.DS_Store
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
# Логи и временные файлы
*.log
*.pcap
*.pcapng
# Секреты и конфиги
.env
token.txt
config.py
secrets.py
# Файлы отладки
.vscode/
*.pid

207
gitea_api_client.py Normal file
View File

@ -0,0 +1,207 @@
# gitea_api_client.py
import os
import requests
import json
import logging
from pathlib import Path
# Загружаем .env файл для PyCharm
try:
from dotenv import load_dotenv
# Ищем .env файл в текущей директории
env_path = Path('.') / '.env'
if env_path.exists():
load_dotenv()
logging.info("Загружен .env файл")
else:
logging.warning(".env файл не найден, используем системные переменные")
except ImportError:
logging.warning("python-dotenv не установлен, используем os.environ")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class GiteaClient:
"""Клиент для работы с Gitea API"""
def __init__(self):
self.base_url = os.getenv('GITEA_URL', 'https://git.vyatsu.ru')
self.token = os.getenv('GITEA_TOKEN')
if not self.token:
# Пробуем альтернативные имена переменных
self.token = os.getenv('GITEA_TOKEN') or os.getenv('TOKEN')
if not self.token:
raise ValueError(
"Токен не найден!\n"
"Создайте файл .env с содержимым:\n"
"GITEA_TOKEN=ваш_токен\n"
"Или установите переменную окружения GITEA_TOKEN"
)
self.headers = {
'Authorization': f'token {self.token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
logging.info(f"✓ Инициализирован клиент для {self.base_url}")
def get_user_info(self):
"""Получение информации о пользователе"""
try:
response = requests.get(
f"{self.base_url}/api/v1/user",
headers=self.headers,
timeout=10
)
response.raise_for_status()
user = response.json()
logging.info(f"\n👤 Информация о пользователе:")
logging.info(f" Логин: {user.get('login')}")
logging.info(f" Полное имя: {user.get('full_name', 'Не указано')}")
logging.info(f" Email: {user.get('email')}")
logging.info(f" ID: {user.get('id')}")
logging.info(f" Админ: {user.get('is_admin', False)}")
return user
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка получения пользователя: {e}")
return None
def create_repository(self, name, description="", private=False):
"""Создание нового репозитория"""
try:
data = {
'name': name,
'description': description,
'private': private,
'auto_init': True
}
response = requests.post(
f"{self.base_url}/api/v1/user/repos",
headers=self.headers,
json=data,
timeout=30
)
response.raise_for_status()
repo = response.json()
logging.info(f"\n✅ Репозиторий создан: {repo['html_url']}")
logging.info(f" Название: {repo['name']}")
logging.info(f" Приватный: {repo['private']}")
return repo
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка создания репозитория: {e}")
if hasattr(e, 'response') and e.response:
logging.error(f"Ответ сервера: {e.response.text}")
return None
def create_issue(self, owner, repo, title, body=""):
"""Создание issue"""
try:
data = {
'title': title,
'body': body or "Создано через API PyCharm"
}
response = requests.post(
f"{self.base_url}/api/v1/repos/{owner}/{repo}/issues",
headers=self.headers,
json=data,
timeout=30
)
response.raise_for_status()
issue = response.json()
logging.info(f"\n📝 Issue создан: {issue['html_url']}")
logging.info(f" Номер: #{issue['number']}")
logging.info(f" Заголовок: {issue['title']}")
return issue
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка создания issue: {e}")
return None
def main():
"""Основная функция"""
print("=" * 50)
print("Gitea API Клиент")
print("=" * 50)
try:
client = GiteaClient()
# Получаем информацию о пользователе
user = client.get_user_info()
if not user:
return
username = user.get('login')
print("\nВыберите действие:")
print("1. Прочитать информацию")
print("2. Создать репозиторий")
print("3. Создать issue (в существующем репозитории)")
print("4. Создать репозиторий + issue (полный тест)")
choice = input("\nВаш выбор (1-4): ").strip()
if choice == '1':
client.get_user_info()
elif choice == '2':
name = input("Название репозитория: ")
desc = input("Описание: ")
private = input("Приватный? (y/n): ").lower() == 'y'
client.create_repository(name, desc, private)
elif choice == '3':
owner = input("Владелец репозитория: ")
repo = input("Название репозитория: ")
title = input("Заголовок issue: ")
body = input("Текст issue: ")
client.create_issue(owner, repo, title, body)
elif choice == '4':
repo_name = f"test-repo-{__import__('time').time()}"
print(f"\nСоздаю репозиторий {repo_name}...")
repo = client.create_repository(
name=repo_name,
description="Тестовый репозиторий из PyCharm"
)
if repo:
print("Создаю issue...")
client.create_issue(
owner=username,
repo=repo_name,
title="Тестовый issue от PyCharm",
body="Этот issue создан автоматически через Gitea API\n"
"Лабораторная работа: Работа с сетевыми соединениями"
)
except ValueError as e:
print(f"\n❌ Ошибка: {e}")
print("\nРешение:")
print("1. Создайте файл .env в корне проекта")
print("2. Добавьте в него: GITEA_TOKEN=ваш_токен")
print("3. Перезапустите программу")
except Exception as e:
print(f"\n❌ Неожиданная ошибка: {e}")
if __name__ == '__main__':
main()

40
http_requests_client.py Normal file
View File

@ -0,0 +1,40 @@
# http_requests_client.py
import requests
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def http_get_requests():
"""HTTP GET запрос через библиотеку requests"""
try:
url = "http://vyatsu.ru"
# Выполняем GET запрос
logging.info(f"Запрос к {url}")
response = requests.get(url, timeout=10)
# Информация об ответе
logging.info(f"Статус код: {response.status_code} {response.reason}")
logging.info(f"Время ответа: {response.elapsed.total_seconds():.3f} сек")
logging.info(f"Размер ответа: {len(response.content)} байт")
# Заголовки ответа
logging.info("Основные заголовки:")
for header in ['content-type', 'server', 'date']:
if header in response.headers:
logging.info(f" {header}: {response.headers[header]}")
# Тело ответа (первые 500 символов)
logging.info(f"Тело ответа:\n{response.text[:500]}")
return response
except requests.exceptions.RequestException as e:
logging.error(f"Ошибка запроса: {e}")
return None
if __name__ == '__main__':
http_get_requests()

61
http_socket_client.py Normal file
View File

@ -0,0 +1,61 @@
# http_socket_client.py
import socket
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def http_get_socket():
"""HTTP GET запрос через низкоуровневый socket"""
HOST = 'vyatsu.ru'
PORT = 80
# HTTP запрос вручную
request = f"""GET / HTTP/1.1
Host: {HOST}
User-Agent: PyCharm-Socket-Client
Accept: text/html
Connection: close
"""
# Заменяем переносы строк на \r\n как требует HTTP
request = request.replace('\n', '\r\n')
try:
# Создаем TCP соединение
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
logging.info(f"Подключение к {HOST}:{PORT}")
client_socket.connect((HOST, PORT))
logging.info("Отправка HTTP запроса...")
client_socket.sendall(request.encode())
# Получаем ответ
response = b''
while True:
chunk = client_socket.recv(4096)
if not chunk:
break
response += chunk
# Разбираем ответ
response_str = response.decode('utf-8', errors='replace')
# Разделяем заголовки и тело
if '\r\n\r\n' in response_str:
headers, body = response_str.split('\r\n\r\n', 1)
else:
headers = response_str
body = ''
logging.info(f"Статус: {headers.split(chr(10))[0]}")
logging.info(f"Заголовков получено: {len(headers)} символов")
logging.info(f"Тело ответа (первые 300 символов):\n{body[:300]}")
except Exception as e:
logging.error(f"Ошибка: {e}")
if __name__ == '__main__':
http_get_socket()

56
run_all.py Normal file
View File

@ -0,0 +1,56 @@
# run_all.py - универсальный запуск для PyCharm
import subprocess
import sys
import time
import os
def run_script(script_name):
"""Запуск Python скрипта"""
print(f"\n{'=' * 50}")
print(f"Запуск {script_name}")
print(f"{'=' * 50}")
result = subprocess.run([sys.executable, script_name])
return result.returncode
def main():
print("🚀 Запуск всех скриптов лабораторной работы")
print(f"Python: {sys.executable}")
scripts = [
('tcp_server.py', False), # Сервер запускаем в фоне?
('tcp_client.py', True),
('udp_server.py', False),
('udp_client.py', True),
('http_socket_client.py', True),
('http_requests_client.py', True),
('gitea_api_client.py', True),
]
# Запускаем серверы
servers = []
for script, is_client in scripts:
if not is_client:
print(f"\nЗапуск сервера: {script}")
proc = subprocess.Popen([sys.executable, script])
servers.append(proc)
time.sleep(1) # Даем серверу время на запуск
# Запускаем клиенты
for script, is_client in scripts:
if is_client:
time.sleep(0.5)
run_script(script)
# Завершаем серверы
for proc in servers:
proc.terminate()
print(f"Сервер остановлен")
print("\nВсе тесты завершены!")
if __name__ == '__main__':
main()

45
tcp_client.py Normal file
View File

@ -0,0 +1,45 @@
import socket
import time
def run_tcp_client():
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 10000
try:
print(f"CLIENT: Подключение к {SERVER_HOST}:{SERVER_PORT}...")
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((SERVER_HOST, SERVER_PORT))
print("CLIENT: ✅ Соединение установлено\n")
messages = [
"Hello, TCP Server!",
"Как дела?",
"Это тестовое сообщение",
"EXIT"
]
for i, message in enumerate(messages, 1):
print(f"CLIENT: Отправка '{message}'")
client_socket.sendall(message.encode('utf-8'))
response = client_socket.recv(1024)
print(f"CLIENT: Ответ '{response.decode('utf-8')}'\n")
time.sleep(1)
client_socket.close()
print("CLIENT: Соединение закрыто")
except ConnectionRefusedError:
print("CLIENT: ❌ Ошибка - сервер не запущен!")
print("CLIENT: Сначала запустите 'python tcp_server_fixed.py'")
except ConnectionResetError:
print("CLIENT: ❌ Ошибка - соединение разорвано сервером")
except Exception as e:
print(f"CLIENT: ❌ Ошибка: {e}")
if __name__ == '__main__':
run_tcp_client()

58
tcp_server.py Normal file
View File

@ -0,0 +1,58 @@
import socket
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def run_tcp_server():
HOST = '127.0.0.1'
PORT = 10000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT))
server_socket.listen(5)
print(f"SERVER: TCP сервер запущен на {HOST}:{PORT}")
print("SERVER: Ожидание подключений...")
print("SERVER: Нажмите Ctrl+C для остановки\n")
try:
while True:
client_socket, client_address = server_socket.accept()
print(f"SERVER: Клиент {client_address} подключился")
try:
while True:
data = client_socket.recv(1024)
if not data:
print(f"SERVER: Клиент {client_address} отключился")
break
received_text = data.decode('utf-8')
print(f"SERVER: Получено '{received_text}' от {client_address}")
modified_data = data.upper()
client_socket.sendall(modified_data)
print(f"SERVER: Отправлено '{modified_data.decode('utf-8')}'")
if modified_data == b'EXIT':
print(f"SERVER: Команда EXIT от {client_address}")
break
except ConnectionResetError:
print(f"SERVER: Клиент {client_address} разорвал соединение")
except Exception as e:
print(f"SERVER: Ошибка с {client_address}: {e}")
finally:
client_socket.close()
print(f"SERVER: Соединение с {client_address} закрыто\n")
except KeyboardInterrupt:
print("\nSERVER: Остановка сервера...")
if __name__ == '__main__':
run_tcp_server()

43
udp_client.py Normal file
View File

@ -0,0 +1,43 @@
# udp_client.py
import socket
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def run_udp_client():
"""UDP клиент - подключение без установки соединения"""
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 10001
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client_socket:
# UDP не требует connect(), но можно вызвать для удобства
# client_socket.connect((SERVER_HOST, SERVER_PORT))
messages = [
"Hello UDP Server!",
"UDP быстрее, но ненадежнее",
"EXIT"
]
for message in messages:
logging.info(f"📤 Отправка UDP: {message}")
# Отправляем дейтаграмму
client_socket.sendto(message.encode('utf-8'), (SERVER_HOST, SERVER_PORT))
# Получаем ответ (с таймаутом)
client_socket.settimeout(2)
try:
response, server_address = client_socket.recvfrom(1024)
logging.info(f"📥 Ответ: {response.decode('utf-8')}")
except socket.timeout:
logging.warning("⚠️ Таймаут: ответ не получен")
import time
time.sleep(1)
if __name__ == '__main__':
run_udp_client()

42
udp_server.py Normal file
View File

@ -0,0 +1,42 @@
# udp_server.py
import socket
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
def run_udp_server():
"""UDP сервер без установки соединения"""
HOST = '127.0.0.1'
PORT = 10001
# Создаем UDP сокет
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as server_socket:
server_socket.bind((HOST, PORT))
logging.info(f"🚀 UDP сервер запущен на {HOST}:{PORT}")
logging.info("UDP не требует установки соединения, просто ждет данные...")
try:
while True:
# Получаем данные и адрес отправителя
data, client_address = server_socket.recvfrom(1024)
received_text = data.decode('utf-8')
logging.info(f"📥 Получено от {client_address}: {received_text}")
# Отправляем обратно в верхнем регистре
modified_data = data.upper()
server_socket.sendto(modified_data, client_address)
logging.info(f"📤 Отправлено {client_address}: {modified_data.decode('utf-8')}")
if modified_data == b'EXIT':
logging.info("Получена команда выхода")
break
except KeyboardInterrupt:
logging.info("\nСервер остановлен")
if __name__ == '__main__':
run_udp_server()