generated from unai/python_boilerplate
Dev #12
@ -2,6 +2,7 @@
|
||||
|
||||
import logging
|
||||
import time
|
||||
import xml.etree.ElementTree as ET
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
@ -24,7 +25,7 @@ class PlaylistManager:
|
||||
PUBLIC_DIR.mkdir(exist_ok=True)
|
||||
|
||||
def fetch_and_generate(self):
|
||||
"""Descarga datos y regenera el archivo M3U."""
|
||||
"""Descarga datos y regenera el archivo M3U y EPG."""
|
||||
logger.info("Iniciando actualización de playlist...")
|
||||
|
||||
url = f"{settings.host}/player_api.php"
|
||||
@ -44,11 +45,95 @@ class PlaylistManager:
|
||||
f"Playlist actualizada exitosamente. Total canales: {len(data)}"
|
||||
)
|
||||
|
||||
# Fetch and write EPG data
|
||||
self.fetch_and_write_epg()
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Error de red al obtener playlist: {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error inesperado actualizando playlist: {e}")
|
||||
|
||||
def fetch_and_write_epg(self):
|
||||
"""Descarga y guarda los datos EPG desde el servidor XMLTV."""
|
||||
logger.info("Iniciando descarga de EPG...")
|
||||
|
||||
url = f"{settings.host}/xmltv.php"
|
||||
params = {
|
||||
"username": settings.username,
|
||||
"password": settings.password,
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(url, params=params, timeout=60)
|
||||
response.raise_for_status()
|
||||
|
||||
# Parse and filter EPG XML
|
||||
filtered_epg = self._filter_epg(response.content)
|
||||
self._write_epg(filtered_epg)
|
||||
|
||||
logger.info("EPG actualizado exitosamente.")
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Error de red al obtener EPG: {e}")
|
||||
except ET.ParseError as e:
|
||||
logger.error(f"Error parseando XML de EPG: {e}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error inesperado actualizando EPG: {e}")
|
||||
|
||||
def _filter_epg(self, xml_content: bytes) -> ET.Element:
|
||||
"""Filtra el EPG para incluir solo canales que pasan los filtros."""
|
||||
root = ET.fromstring(xml_content)
|
||||
|
||||
# Get list of channel IDs to keep based on filters
|
||||
channels_to_remove = []
|
||||
for channel in root.findall("channel"):
|
||||
display_name = channel.find("display-name")
|
||||
name = display_name.text if display_name is not None else ""
|
||||
|
||||
should_remove = False
|
||||
|
||||
# Apply include filter (startswith)
|
||||
if settings.include_text:
|
||||
if not any(
|
||||
name.lower().startswith(inc_text.lower())
|
||||
for inc_text in settings.include_text
|
||||
):
|
||||
should_remove = True
|
||||
|
||||
# Apply exclude filter (contains)
|
||||
if not should_remove and settings.exclude_text:
|
||||
if any(
|
||||
exc_text.lower() in name.lower()
|
||||
for exc_text in settings.exclude_text
|
||||
):
|
||||
should_remove = True
|
||||
|
||||
if should_remove:
|
||||
channels_to_remove.append(channel.get("id"))
|
||||
|
||||
# Remove filtered channels
|
||||
for channel in root.findall("channel"):
|
||||
if channel.get("id") in channels_to_remove:
|
||||
root.remove(channel)
|
||||
|
||||
# Remove programmes for filtered channels
|
||||
for programme in root.findall("programme"):
|
||||
if programme.get("channel") in channels_to_remove:
|
||||
root.remove(programme)
|
||||
|
||||
return root
|
||||
|
||||
def _write_epg(self, epg_root: ET.Element):
|
||||
"""Escribe el archivo EPG en disco de forma atómica."""
|
||||
temp_file = PUBLIC_DIR / "epg.xml.tmp"
|
||||
final_file = PUBLIC_DIR / "epg.xml"
|
||||
|
||||
tree = ET.ElementTree(epg_root)
|
||||
tree.write(temp_file, encoding="utf-8", xml_declaration=True)
|
||||
|
||||
# Reemplazo atómico
|
||||
temp_file.replace(final_file)
|
||||
|
||||
def _write_m3u(self, channels: list):
|
||||
"""Escribe el archivo M3U en disco de forma atómica."""
|
||||
temp_file = PUBLIC_DIR / f"{settings.output_file}.tmp"
|
||||
@ -61,6 +146,9 @@ class PlaylistManager:
|
||||
stream_id = channel.get("stream_id")
|
||||
icon = channel.get("stream_icon", "")
|
||||
cat_id = channel.get("category_id", "")
|
||||
epg_id = channel.get("epg_channel_id", "")
|
||||
if not epg_id:
|
||||
epg_id = name
|
||||
# Filtros de inclusión/exclusión por prefijo de nombre
|
||||
if settings.include_text:
|
||||
if not any(
|
||||
@ -81,7 +169,7 @@ class PlaylistManager:
|
||||
)
|
||||
|
||||
extinf_line = (
|
||||
f'#EXTINF:-1 tvg-id="{name}" tvg-logo="{icon}" '
|
||||
f'#EXTINF:-1 tvg-id="{epg_id}" tvg-logo="{icon}" '
|
||||
f'group-title="Cat_{cat_id}",{name}\n'
|
||||
)
|
||||
f.write(extinf_line)
|
||||
|
||||
@ -66,10 +66,11 @@ class TestPlaylistManager:
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
with patch.object(manager, "_write_m3u") as mock_write:
|
||||
manager.fetch_and_generate()
|
||||
with patch.object(manager, "fetch_and_write_epg"):
|
||||
manager.fetch_and_generate()
|
||||
|
||||
mock_get.assert_called_once()
|
||||
mock_write.assert_called_once_with(sample_channels)
|
||||
mock_get.assert_called_once()
|
||||
mock_write.assert_called_once_with(sample_channels)
|
||||
|
||||
def test_fetch_and_generate_correct_api_call(self):
|
||||
"""Test: fetch_and_generate llama a la API correctamente."""
|
||||
@ -89,17 +90,18 @@ class TestPlaylistManager:
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
with patch.object(manager, "_write_m3u"):
|
||||
manager.fetch_and_generate()
|
||||
with patch.object(manager, "fetch_and_write_epg"):
|
||||
manager.fetch_and_generate()
|
||||
|
||||
mock_get.assert_called_once_with(
|
||||
"http://test-iptv.com/player_api.php",
|
||||
params={
|
||||
"username": "testuser",
|
||||
"password": "testpass",
|
||||
"action": "get_live_streams",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
mock_get.assert_called_once_with(
|
||||
"http://test-iptv.com/player_api.php",
|
||||
params={
|
||||
"username": "testuser",
|
||||
"password": "testpass",
|
||||
"action": "get_live_streams",
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def test_fetch_and_generate_request_exception(self, caplog):
|
||||
"""Test: fetch_and_generate maneja RequestException sin crash."""
|
||||
@ -830,3 +832,419 @@ class TestPlaylistManager:
|
||||
# "News ABC" y "ZABC Channel" NO deben estar
|
||||
assert "News ABC" not in content
|
||||
assert "ZABC Channel" not in content
|
||||
|
||||
# ========================================
|
||||
# Tests para fetch_and_write_epg - EPG Functionality
|
||||
# ========================================
|
||||
|
||||
def test_fetch_and_write_epg_success(self, temp_dir):
|
||||
"""Test: fetch_and_write_epg descarga y guarda EPG correctamente."""
|
||||
sample_epg = b'''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<tv>
|
||||
<channel id="ch1">
|
||||
<display-name>ESPN Sports</display-name>
|
||||
</channel>
|
||||
<programme channel="ch1" start="20260203" stop="20260203">
|
||||
<title>Sports Event</title>
|
||||
</programme>
|
||||
</tv>'''
|
||||
|
||||
with patch("m3u_list_builder.playlist.PUBLIC_DIR", temp_dir):
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = []
|
||||
mock_settings.exclude_text = []
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
with patch("m3u_list_builder.playlist.requests.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = sample_epg
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
manager.fetch_and_write_epg()
|
||||
|
||||
# Verify EPG file was written
|
||||
epg_file = temp_dir / "epg.xml"
|
||||
assert epg_file.exists()
|
||||
content = epg_file.read_text()
|
||||
assert "ESPN Sports" in content
|
||||
|
||||
def test_fetch_and_write_epg_correct_api_call(self, temp_dir):
|
||||
"""Test: fetch_and_write_epg llama a la API correctamente."""
|
||||
sample_epg = b'<?xml version="1.0"?><tv></tv>'
|
||||
|
||||
with patch("m3u_list_builder.playlist.PUBLIC_DIR", temp_dir):
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = []
|
||||
mock_settings.exclude_text = []
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
with patch("m3u_list_builder.playlist.requests.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = sample_epg
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
manager.fetch_and_write_epg()
|
||||
|
||||
mock_get.assert_called_once_with(
|
||||
"http://test-iptv.com/xmltv.php",
|
||||
params={
|
||||
"username": "testuser",
|
||||
"password": "testpass",
|
||||
},
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
def test_fetch_and_write_epg_request_exception(self, caplog):
|
||||
"""Test: fetch_and_write_epg maneja RequestException sin crash."""
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
with patch("m3u_list_builder.playlist.requests.get") as mock_get:
|
||||
mock_get.side_effect = requests.RequestException("Connection failed")
|
||||
|
||||
manager.fetch_and_write_epg()
|
||||
|
||||
assert "Error de red al obtener EPG" in caplog.text
|
||||
|
||||
def test_fetch_and_write_epg_parse_error(self, caplog):
|
||||
"""Test: fetch_and_write_epg maneja error de parsing XML."""
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
with patch("m3u_list_builder.playlist.requests.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = b"invalid xml <not closed"
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
manager.fetch_and_write_epg()
|
||||
|
||||
assert "Error parseando XML de EPG" in caplog.text
|
||||
|
||||
def test_fetch_and_write_epg_unexpected_exception(self, caplog):
|
||||
"""Test: fetch_and_write_epg maneja excepciones inesperadas."""
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = []
|
||||
mock_settings.exclude_text = []
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
with patch("m3u_list_builder.playlist.requests.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = b'<?xml version="1.0"?><tv></tv>'
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
with patch.object(
|
||||
manager, "_write_epg", side_effect=RuntimeError("Disk full")
|
||||
):
|
||||
manager.fetch_and_write_epg()
|
||||
|
||||
assert "Error inesperado actualizando EPG" in caplog.text
|
||||
|
||||
# ========================================
|
||||
# Tests para _filter_epg - Filtrado de EPG
|
||||
# ========================================
|
||||
|
||||
def test_filter_epg_include_text_filters_by_prefix(self):
|
||||
"""Test: _filter_epg filtra canales que EMPIEZAN con el texto."""
|
||||
sample_epg = b'''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<tv>
|
||||
<channel id="ch1">
|
||||
<display-name>ESPN Sports</display-name>
|
||||
</channel>
|
||||
<channel id="ch2">
|
||||
<display-name>Sports Center ESPN</display-name>
|
||||
</channel>
|
||||
<channel id="ch3">
|
||||
<display-name>HBO Movies</display-name>
|
||||
</channel>
|
||||
<programme channel="ch1" start="20260203" stop="20260203">
|
||||
<title>Sports Event</title>
|
||||
</programme>
|
||||
<programme channel="ch2" start="20260203" stop="20260203">
|
||||
<title>Center Show</title>
|
||||
</programme>
|
||||
<programme channel="ch3" start="20260203" stop="20260203">
|
||||
<title>Movie</title>
|
||||
</programme>
|
||||
</tv>'''
|
||||
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = ["ESPN"]
|
||||
mock_settings.exclude_text = []
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
manager = PlaylistManager()
|
||||
result = manager._filter_epg(sample_epg)
|
||||
|
||||
# Convert to string for easier assertion
|
||||
result_str = ET.tostring(result, encoding="unicode")
|
||||
|
||||
# ESPN Sports starts with ESPN - should be included
|
||||
assert "ESPN Sports" in result_str
|
||||
# Sports Center ESPN does NOT start with ESPN - should be filtered
|
||||
assert "Sports Center ESPN" not in result_str
|
||||
# HBO Movies - should be filtered
|
||||
assert "HBO Movies" not in result_str
|
||||
# Programme for ch1 should remain
|
||||
assert "Sports Event" in result_str
|
||||
# Programmes for ch2 and ch3 should be removed
|
||||
assert "Center Show" not in result_str
|
||||
assert "Movie" not in result_str
|
||||
|
||||
def test_filter_epg_exclude_text_filters_by_contains(self):
|
||||
"""Test: _filter_epg excluye canales que CONTIENEN el texto."""
|
||||
sample_epg = b'''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<tv>
|
||||
<channel id="ch1">
|
||||
<display-name>ESPN Sports</display-name>
|
||||
</channel>
|
||||
<channel id="ch2">
|
||||
<display-name>Adult Content</display-name>
|
||||
</channel>
|
||||
<programme channel="ch1" start="20260203" stop="20260203">
|
||||
<title>Sports Event</title>
|
||||
</programme>
|
||||
<programme channel="ch2" start="20260203" stop="20260203">
|
||||
<title>Adult Show</title>
|
||||
</programme>
|
||||
</tv>'''
|
||||
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = []
|
||||
mock_settings.exclude_text = ["Adult"]
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
manager = PlaylistManager()
|
||||
result = manager._filter_epg(sample_epg)
|
||||
|
||||
result_str = ET.tostring(result, encoding="unicode")
|
||||
|
||||
# ESPN Sports - should remain
|
||||
assert "ESPN Sports" in result_str
|
||||
# Adult Content - should be filtered
|
||||
assert "Adult Content" not in result_str
|
||||
# Programme for ch1 should remain
|
||||
assert "Sports Event" in result_str
|
||||
# Programme for ch2 should be removed
|
||||
assert "Adult Show" not in result_str
|
||||
|
||||
def test_filter_epg_no_filters_keeps_all(self):
|
||||
"""Test: _filter_epg sin filtros mantiene todos los canales."""
|
||||
sample_epg = b'''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<tv>
|
||||
<channel id="ch1">
|
||||
<display-name>Channel 1</display-name>
|
||||
</channel>
|
||||
<channel id="ch2">
|
||||
<display-name>Channel 2</display-name>
|
||||
</channel>
|
||||
</tv>'''
|
||||
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = []
|
||||
mock_settings.exclude_text = []
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
manager = PlaylistManager()
|
||||
result = manager._filter_epg(sample_epg)
|
||||
|
||||
result_str = ET.tostring(result, encoding="unicode")
|
||||
|
||||
assert "Channel 1" in result_str
|
||||
assert "Channel 2" in result_str
|
||||
|
||||
def test_filter_epg_include_and_exclude_combined(self):
|
||||
"""Test: _filter_epg aplica include y exclude juntos."""
|
||||
sample_epg = b'''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<tv>
|
||||
<channel id="ch1">
|
||||
<display-name>ESPN Sports</display-name>
|
||||
</channel>
|
||||
<channel id="ch2">
|
||||
<display-name>ESPN Adult</display-name>
|
||||
</channel>
|
||||
<channel id="ch3">
|
||||
<display-name>HBO Movies</display-name>
|
||||
</channel>
|
||||
</tv>'''
|
||||
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = ["ESPN"]
|
||||
mock_settings.exclude_text = ["Adult"]
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
manager = PlaylistManager()
|
||||
result = manager._filter_epg(sample_epg)
|
||||
|
||||
result_str = ET.tostring(result, encoding="unicode")
|
||||
|
||||
# ESPN Sports - starts with ESPN, no Adult - should remain
|
||||
assert "ESPN Sports" in result_str
|
||||
# ESPN Adult - starts with ESPN but contains Adult - should be filtered
|
||||
assert "ESPN Adult" not in result_str
|
||||
# HBO Movies - doesn't start with ESPN - should be filtered
|
||||
assert "HBO Movies" not in result_str
|
||||
|
||||
def test_filter_epg_case_insensitive(self):
|
||||
"""Test: _filter_epg es case-insensitive."""
|
||||
sample_epg = b'''<?xml version="1.0" encoding="UTF-8"?>
|
||||
<tv>
|
||||
<channel id="ch1">
|
||||
<display-name>ESPN Sports</display-name>
|
||||
</channel>
|
||||
</tv>'''
|
||||
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.include_text = ["espn"] # lowercase
|
||||
mock_settings.exclude_text = []
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
manager = PlaylistManager()
|
||||
result = manager._filter_epg(sample_epg)
|
||||
|
||||
result_str = ET.tostring(result, encoding="unicode")
|
||||
|
||||
# Should still match despite case difference
|
||||
assert "ESPN Sports" in result_str
|
||||
|
||||
# ========================================
|
||||
# Tests para _write_epg
|
||||
# ========================================
|
||||
|
||||
def test_write_epg_creates_file(self, temp_dir):
|
||||
"""Test: _write_epg crea el archivo EPG correctamente."""
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
with patch("m3u_list_builder.playlist.PUBLIC_DIR", temp_dir):
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
root = ET.Element("tv")
|
||||
channel = ET.SubElement(root, "channel", id="ch1")
|
||||
name = ET.SubElement(channel, "display-name")
|
||||
name.text = "Test Channel"
|
||||
|
||||
manager._write_epg(root)
|
||||
|
||||
epg_file = temp_dir / "epg.xml"
|
||||
assert epg_file.exists()
|
||||
content = epg_file.read_text()
|
||||
assert "Test Channel" in content
|
||||
|
||||
def test_write_epg_atomic_replacement(self, temp_dir):
|
||||
"""Test: _write_epg reemplaza atómicamente."""
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
epg_file = temp_dir / "epg.xml"
|
||||
temp_file = temp_dir / "epg.xml.tmp"
|
||||
|
||||
# Create existing file
|
||||
epg_file.write_text("OLD CONTENT")
|
||||
|
||||
with patch("m3u_list_builder.playlist.PUBLIC_DIR", temp_dir):
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
root = ET.Element("tv")
|
||||
manager._write_epg(root)
|
||||
|
||||
# Temp file should not exist after
|
||||
assert not temp_file.exists()
|
||||
# Final file should have new content
|
||||
assert "OLD CONTENT" not in epg_file.read_text()
|
||||
|
||||
def test_fetch_and_generate_calls_epg(self, sample_channels):
|
||||
"""Test: fetch_and_generate también llama a fetch_and_write_epg."""
|
||||
with patch("m3u_list_builder.playlist.settings") as mock_settings:
|
||||
mock_settings.host = "http://test-iptv.com"
|
||||
mock_settings.username = "testuser"
|
||||
mock_settings.password = "testpass"
|
||||
mock_settings.output_file = "test_playlist.m3u"
|
||||
|
||||
from m3u_list_builder.playlist import PlaylistManager
|
||||
|
||||
manager = PlaylistManager()
|
||||
|
||||
with patch("m3u_list_builder.playlist.requests.get") as mock_get:
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = sample_channels
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
with patch.object(manager, "_write_m3u"):
|
||||
with patch.object(
|
||||
manager, "fetch_and_write_epg"
|
||||
) as mock_epg:
|
||||
manager.fetch_and_generate()
|
||||
|
||||
mock_epg.assert_called_once()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user