WIP: feat: Unify 5 RDF converter scripts using Strategy Pattern #22
No reviewers
Labels
No labels
Blocked
Bounty
$100
Bounty
$1000
Bounty
$10000
Bounty
$20
Bounty
$2000
Bounty
$250
Bounty
$50
Bounty
$500
Bounty
$5000
Bounty
$750
MoSCoW
Could have
MoSCoW
Must have
MoSCoW
Should have
Needs feedback
Points
1
Points
13
Points
2
Points
21
Points
3
Points
34
Points
5
Points
55
Points
8
Points
88
Priority
Backlog
Priority
Critical
Priority
High
Priority
Low
Priority
Medium
Signed-off: Owner
Signed-off: Scrum Master
Signed-off: Tech Lead
Spike
State
Completed
State
Duplicate
State
In Progress
State
In Review
State
Paused
State
Unverified
State
Verified
State
Wont Do
Type
Bug
Type
Discussion
Type
Documentation
Type
Epic
Type
Feature
Type
Legendary
Type
Support
Type
Task
Type
Testing
No milestone
No project
No assignees
3 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
cleverdatasets/dataset-uploader!22
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "15-refactor-rdf-converters"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Unifies five separate RDF converter scripts into a single maintainable codebase (convert_rdf_to_hf_dataset_unified.py) using the Strategy Pattern to support standard, streaming, and parallel processing.
Implements an auto selection mode that dynamically chooses the optimal conversion strategy based on file size and format, alongside improved Rich-based progress tracking.
Refactors the main upload_all_datasets.2.py script to simplify the pipeline by removing complex branching logic and delegating all conversions to the new unified tool.
Are the files
convert_rdf_to_hf_dataset.*.pyother thanconvert_rdf_to_hf_dataset_unified.pysupposed to go away? If so, then please ignore my comments inside them and delete those files instead of fixing them.I have read about 800 lines of
convert_rdf_to_hf_dataset_unified.py, and I know that I'll have more comments.But I need to study for the evening, so I'll pass you 72 notes.
@ -49,6 +49,8 @@ import jsonimport loggingimport multiprocessing as mpimport shutilimport shutilLine 51 is
import shutil. You don't need to duplicate it.@ -577,0 +584,4 @@# Save final datasetconsole.print("[yellow]Saving final dataset...[/yellow]")dataset_dict.save_to_disk(str(output_path))Lines 563-587 are almost identical to lines 589-613.
This is really bad form; a future programmer will make a change to one and forget the other.
Could you please rewrite these lines to remove duplicate code? (If you have any problems, I am glad to help.)
I like the use of line 561 to refer to the directory being written. You can use the same idea when
clean_cacheis off with something similar to:The above code would require clean-up at the end.
If you don't want to do clean-up, you should be able to do something like
but I find that harder to understand. Your choice.
@ -634,7 +668,9 @@ def main():parser.add_argument("--description", help="Dataset description")parser.add_argument("--citation", help="Dataset citation")parser.add_argument("--homepage", help="Dataset homepage URL")parser.add_argument("--homepage", help="Dataset homepage URL")Lines 670 and 671 are duplicates.
@ -704,0 +727,4 @@console.print(f"[red]Error saving dataset: {e}[/red]")console.print(f"[yellow]Output path: {output_path}[/yellow]")console.print(f"[yellow]Check disk space and permissions[/yellow]")raise eLines 707-730 and lines 732-755 are almost duplicates.
I don't know if you copy/pasted this or if an LMM generated this, but duplicate code is usually bad, especially when it's as easy as this to fix.
Here's an explainer about code duplication: https://axify.io/blog/code-duplication
@ -407,0 +413,4 @@# Save final datasetconsole.print("[yellow]Saving final dataset...[/yellow]")dataset_dict.save_to_disk(str(output_path))Lines 401-416 and lines 418-433 are fairly similar, except for where and how in the code
dataset_dictis initialized.They should be easy to merge.
@ -308,3 +311,2 @@# Read all Parquet files as a single datasetdataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"))print(f"\nPROGRESS: 90", flush=True)# Read all Parquet files as a single datasetDuplicate line.
@ -322,0 +330,4 @@# Save final datasetconsole.print("[yellow]Saving final dataset...[/yellow]")print(f"\nPROGRESS: 95", flush=True)dataset_dict.save_to_disk(str(output_path))Lines 315-333 and lines 335-352 are almost duplicates.
@ -367,7 +394,9 @@ def main():)parser.add_argument("--description", type=str, help="Dataset description")parser.add_argument("--homepage", type=str, help="Dataset homepage")parser.add_argument("--homepage", type=str, help="Dataset homepage")You don't need another
--homepageargument.@ -0,0 +1,2208 @@#!/usr/bin/env python3When I run
ruff check, it reports that the following lines are too long. I don't have the patience to copy and paste for each of these lines:275, 413, 451, 460, 466, 485, 495, 518, 563, 594, 631, 647, 668, 676, 689, 716, 718, 720, 721, 729, 845, 855, 858, 865, 867, 875, 878, 884, 895, 914, 930, 944, 959, 971, 989, 1021, 1042, 1088, 1161, 1163, 1174, 1176, 1188, 1190, 1192, 1201, 1204, 1211, 1229, 1235, 1236, 1256, 1260, 1263, 1265, 1303, 1312, 1330, 1382, 1383, 1393, 1395, 1402, 1404, 1411, 1414, 1420, 1439, 1440, 1459, 1463, 1472, 1473, 1488, 1504, 1528, 1538, 1553, 1601, 1602, 1603, 1613, 1615, 1617, 1627, 1632, 1634, 1643, 1647, 1655, 1673, 1679, 1680, 1682, 1701, 1709, 1713, 1717, 1718, 1733, 1735, 1741, 1771, 1776, 1795, 1802, 1804, 1820, 1830, 1838, 1974, 1979, 1989, 1993, 2011, and 2055.
@ -0,0 +34,4 @@"object": "value","object_type": "uri|literal|blank_node","object_datatype": "xsd:string or None","object_language": "en or None"We DEFINITELY have data that's not in English.
Corrected the docstring
@ -0,0 +56,4 @@import timefrom abc import ABC, abstractmethodfrom contextlib import contextmanagerfrom dataclasses import dataclass, fieldfieldis never used.According to
ruff check:@ -0,0 +60,4 @@from io import BytesIOfrom multiprocessing import Manager, Poolfrom pathlib import Pathfrom typing import Any, Iterator, OptionalAccording to
ruff check:@ -0,0 +73,4 @@SpinnerColumn,TextColumn,TimeElapsedColumn,TimeRemainingColumn,TimeRemainingColumnis never used.According to
ruff check:@ -0,0 +91,4 @@output_path: Pathrdf_format: str = "turtle"chunk_size: int = 10000num_workers: Optional[int] = NoneAccording to
ruff check:(I find
Optionaleasier to understand myself... Crap.)@ -0,0 +92,4 @@rdf_format: str = "turtle"chunk_size: int = 10000num_workers: Optional[int] = Nonemetadata: Optional[dict[str, Any]] = NoneAccording to
ruff check:@ -0,0 +105,4 @@success: booltotal_triples: int = 0processing_time_seconds: float = 0.0output_path: Optional[Path] = NoneAccording to
ruff check:@ -0,0 +106,4 @@total_triples: int = 0processing_time_seconds: float = 0.0output_path: Optional[Path] = Noneerror_message: Optional[str] = NoneAccording to
ruff check:@ -0,0 +107,4 @@processing_time_seconds: float = 0.0output_path: Optional[Path] = Noneerror_message: Optional[str] = Nonestrategy_used: Optional[str] = NoneAccording to
ruff check:@ -0,0 +123,4 @@"""@staticmethoddef extract_triple(subject, predicate, obj) -> dict[str, Any]:It looks like you can use
at this time. But make sure that the
TripleExtractorhandles objects that are themselves triples!A few clarifications on the current implementation:
TripleExtractor class removed: We refactored extract_triple to be a module-level function (line 117) instead of a class method. This was necessary for multiprocessing pickling support, class methods can have serialization issues when passed to worker processes
Return type dict[str, Any] is correct: We can't use dict[str, str] because object_datatype and object_language can be None.
Triple terms (RDF-star/RDF 1.2): Currently, the code handles this by falling through to blank_node
@ -0,0 +141,4 @@object_type = "literal"object_datatype = str(obj.datatype) if obj.datatype else Noneobject_language = obj.language if obj.language else Noneelif isinstance(obj, URIRef):What happens if the object is an IRI but not a URI? See https://www.w3.org/TR/rdf12-concepts/#section-IRIs
The RDFLib documentation and implementation confirm that the URIRef class is designed to handle both URIs (Uniform Resource Identifiers) and IRIs (Internationalized Resource Identifiers), even though its name explicitly references only "URI"
@ -0,0 +145,4 @@object_type = "uri"object_datatype = Noneobject_language = Noneelse:What happens if the object is a triple term? (See https://www.w3.org/TR/rdf12-n-triples/#triple-terms )
Triple terms (RDF-star/RDF 1.2) currently fall through to the else branch and are classified as blank_node.
Do we have any RDF-star datasets that need explicit handling? If so, I can add explicit triple term support as a follow-up.
@ -0,0 +214,4 @@"""Handle file I/O operations including compression."""@staticmethoddef open_file(file_path: Path, mode: str = "r"):I'm surprised that
pyrightdidn't point out that this method doesn't list what it returns...@ -0,0 +261,4 @@return Falsetry:with open(file_path, "r", encoding="utf-8") as f:According to
ruff check:@ -0,0 +264,4 @@with open(file_path, "r", encoding="utf-8") as f:first_line = f.readline().strip()return first_line.startswith("http://") or first_line.startswith("https://")except Exception:I think that all file errors descend from
OSError, so you should be able to use that instead ofException.@ -0,0 +328,4 @@yield progress, task@contextmanagerdef progress_bar(self, description: str, total: Optional[int] = None,According to
ruff check:@ -0,0 +329,4 @@@contextmanagerdef progress_bar(self, description: str, total: Optional[int] = None,show_count: bool = False, extra_fields: Optional[dict] = None):According to
ruff check:@ -0,0 +352,4 @@task_kwargs = {"total": total}if extra_fields:task_kwargs.update(extra_fields)task = progress.add_task(f"[yellow]{description}[/yellow]", **task_kwargs)pyrightgives the following error:@ -0,0 +402,4 @@"""passdef estimate_memory_usage_mb(self, file_size_mb: float) -> float:estimate_memory_usage_mbis never used in this code.removed the dead code
@ -0,0 +406,4 @@"""Estimate memory requirements in MB."""return file_size_mb * 3.0 # Default: 3x file sizedef supports_format(self, rdf_format: str) -> bool:supports_formatis never used in the code. Is the check missing?removed the dead code
@ -0,0 +414,4 @@"""Save dataset to disk with optional cache cleaning."""config.output_path.mkdir(parents=True, exist_ok=True)dataset_dict.save_to_disk(str(config.output_path))self.progress.print(f"Dataset saved to {config.output_path}", "green")The description says "with optional cache cleaning". Is the cache cleaning missing?
@ -0,0 +448,4 @@def supports_format(self, rdf_format: str) -> bool:"""Standard strategy supports all formats including TSV."""return rdf_format in ["turtle", "nt", "ntriples", "xml", "n3", "trig", "nquads", "tsv"]Although
supports_formatis never used, if it were used, should you includegeonameshere?@ -0,0 +517,4 @@with self.progress.progress_bar("Parsing TSV triples...", show_count=True) as (progress, task):# Count lineswith open(file_path, "r", encoding="utf-8") as f:According to
ruff check:@ -0,0 +521,4 @@total_lines = sum(1 for _ in f)progress.update(task, total=total_lines)with open(file_path, "r", encoding="utf-8") as f:According to
ruff check:@ -0,0 +532,4 @@continuesubject, predicate, obj = partstriples.append({You're duplicating code. You should use
TripleExtractor::extract_triplehere.The TSV parsing intentionally doesn't use extract_triple() because they handle different data types. There's no RDFLib parsing involved, we can't create Literal or URIRef objects from these strings because they're not valid IRIs or typed literals.
@ -0,0 +567,4 @@def update_progress():while not stop_updates.is_set():elapsed = time.time() - parse_startpct = min(95, (elapsed / estimated_parse_time) * 100)Might this line be better as
The way the code is currently written, the last 5% of updates will show
pctholding at 95% without change.pct = min(95, (elapsed / estimated_parse_time) * 95). This ensures progress smoothly increases from 0% → 95% over the full estimated time, then jumps to 100% when parsing actually completes.
@ -0,0 +589,4 @@file_content = f.read()self.progress.print("Parsing RDF content...", "dim")estimated_parse_time = file_size_mb * 2.5I doubt that size. Look through my comments in CleverDatasets.
@ -0,0 +599,4 @@last_progress = 0while not stop_updates.is_set():elapsed = time.time() - parse_startpct = min(95, (elapsed / estimated_parse_time) * 100)Same comment about the last 5 percent of the estimated time...
@ -0,0 +613,4 @@thread.start()try:graph.parse(BytesIO(file_content), format=config.rdf_format)pyrightgives the following error:@ -0,0 +630,4 @@with self.progress.progress_bar("Converting triples...",total=len(graph_list), show_count=True) as (progress, task):for idx, (s, p, o) in enumerate(graph_list):triples.append(self.triple_extractor.extract_triple(s, p, o))This looks like it's replicating the work of
TripleExtractor::extract_triples_from_graph(except thatextract_triples_from_graphdoesn't have a progress bar.)Should you unify that code?
These are intentionally different implementations serving different purposes
extract_triples_from_graph(graph): Quick extraction without progress
Loop with progress bar: Large files where user needs visual feedback
The key difference is progress tracking. Python's list comprehension doesn't allow mid-iteration callbacks. To show a progress bar, we need an explicit loop
@ -0,0 +666,4 @@all_triples = []with self.progress.progress_bar("Processing GeoNames chunks...", total=100) as (progress, task):with Pool(processes=num_workers) as pool:According to
ruff check:@ -0,0 +879,4 @@dataset_dict.save_to_disk(str(config.output_path))else:dataset = Dataset.from_parquet(str(data_dir / "*.parquet"))dataset_dict = self._create_dataset_dict(dataset, config)pyrightgives the following error:@ -0,0 +892,4 @@self._save_dataset_info(config, total_triples, dataset_dict, elapsed)self.progress.print(f"✓ Successfully converted to HuggingFace dataset", "bold green")According to
ruff check:@ -0,0 +948,4 @@with self.file_handler.open_file(config.input_path) as f:for line in f:line = line.strip()if not line or line.startswith("#"):pyrightreports the following:@ -0,0 +973,4 @@graph = Graph()with self.file_handler.open_file(config.input_path) as f:graph.parse(f, format="turtle")pyrightreports:@ -0,0 +1016,4 @@for worker_chunks in results:for chunk in worker_chunks:yield chunkAccording to
ruff check:@ -0,0 +1199,4 @@if config.clean_cache:with tempfile.TemporaryDirectory() as temp_cache_dir:dataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"), cache_dir=temp_cache_dir)dataset_dict = DatasetDict({"data": dataset})pyrightreports:@ -0,0 +1206,4 @@dataset_dict.save_to_disk(str(config.output_path))else:dataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"))dataset_dict = DatasetDict({"data": dataset})pyrightreports:@ -0,0 +1232,4 @@with open(config.output_path / "dataset_info.json", "w") as f:json.dump(info, f, indent=2)self.progress.print(f"✓ Successfully converted to HuggingFace dataset", "bold green")According to
ruff check:@ -0,0 +1255,4 @@def _stream_turtle_file(self, config: ConversionConfig, re) -> Iterator[list[dict[str, Any]]]:"""Stream Turtle file with prefix handling."""# Detect compressionWhy are you using this detection when you have the
FileHandlerclass?Fixed! _stream_turtle_file now uses FileHandler.open_file() instead of manual compression detection
@ -0,0 +1260,4 @@is_bz2 = config.input_path.suffix == ".bz2" or str(config.input_path).endswith(".ttl.bz2")if is_bz2:file_obj = bz2.open(config.input_path, "rt", encoding="utf-8", errors="ignore")According to
ruff check:@ -0,0 +1262,4 @@if is_bz2:file_obj = bz2.open(config.input_path, "rt", encoding="utf-8", errors="ignore")elif is_gzipped:file_obj = gzip.open(config.input_path, "rt", encoding="utf-8", errors="ignore")According to
ruff check:@ -0,0 +1264,4 @@elif is_gzipped:file_obj = gzip.open(config.input_path, "rt", encoding="utf-8", errors="ignore")else:file_obj = open(config.input_path, "r", encoding="utf-8", errors="ignore")According to
ruff check:@ -0,0 +1409,4 @@if config.clean_cache:with tempfile.TemporaryDirectory() as temp_cache_dir:dataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"), cache_dir=temp_cache_dir)dataset_dict = DatasetDict({"data": dataset})pyrightreports:@ -0,0 +1415,4 @@dataset_dict.save_to_disk(str(config.output_path))else:dataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"))dataset_dict = DatasetDict({"data": dataset})pyrightreports:@ -0,0 +1436,4 @@with open(config.output_path / "dataset_info.json", "w") as f:json.dump(info, f, indent=2)self.progress.print(f"✓ Successfully converted to HuggingFace dataset", "bold green")According to
ruff check:@ -0,0 +1493,4 @@with self.file_handler.open_file(config.input_path) as f:for line in f:if line.startswith("http://") or line.startswith("https://"):pyrightreports:@ -0,0 +1542,4 @@with self.file_handler.open_file(config.input_path) as f:for line in f:line = line.strip()if not line or line.startswith("#"):pyrightreports:@ -0,0 +1642,4 @@with tempfile.TemporaryDirectory() as temp_cache_dir:dataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"), cache_dir=temp_cache_dir)self.progress.emit_progress(90)dataset_dict = DatasetDict({"data": dataset})pyrightreports:@ -0,0 +1650,4 @@else:dataset = Dataset.from_parquet(str(temp_chunks_dir / "*.parquet"))self.progress.emit_progress(90)dataset_dict = DatasetDict({"data": dataset})pyrightreports:@ -0,0 +1676,4 @@with open(config.output_path / "dataset_info.json", "w") as f:json.dump(info, f, indent=2)self.progress.print(f"✓ Successfully converted to HuggingFace dataset", "bold green")According to
ruff check:@ -0,0 +1749,4 @@current_doc = []for line in f:if line.startswith("http://") or line.startswith("https://"):pyrightreports:@ -0,0 +1794,4 @@def _stream_turtle_parallel(self, config: ConversionConfig) -> Iterator[list[dict[str, Any]]]:"""Stream Turtle file with chunked parsing."""# Detect compressionWhy are you using detect compression when you have the
FileHandlerclass?@ -0,0 +1799,4 @@is_bz2 = config.input_path.suffix == ".bz2"if is_bz2:file_obj = bz2.open(config.input_path, "rt", encoding="utf-8", errors="ignore")According to
ruff check:@ -0,0 +1801,4 @@if is_bz2:file_obj = bz2.open(config.input_path, "rt", encoding="utf-8", errors="ignore")elif is_gzipped:file_obj = gzip.open(config.input_path, "rt", encoding="utf-8", errors="ignore")According to
ruff check:@ -0,0 +1803,4 @@elif is_gzipped:file_obj = gzip.open(config.input_path, "rt", encoding="utf-8", errors="ignore")else:file_obj = open(config.input_path, "r", encoding="utf-8", errors="ignore")According to
ruff check:@ -0,0 +1939,4 @@"""# Strategy name to class mappingSTRATEGIES = {According to
ruff check:@ -0,0 +1971,4 @@# Selection logicif is_geonames:# GeoNames always benefits from parallel processingself.progress.print(f"Auto-selected: streaming-parallel (GeoNames detected)", "cyan")According to
ruff check:@ -0,0 +1976,4 @@if rdf_format in ("turtle", "ttl") and file_size_mb > 100:# Large Turtle files need special handlingself.progress.print(f"Auto-selected: streaming-turtle (large Turtle file)", "cyan")According to
ruff check:@ -0,0 +1981,4 @@if file_size_mb < 100:# Small files: use standard in-memoryself.progress.print(f"Auto-selected: standard (file < 100MB)", "cyan")According to
ruff check:@ -0,0 +2130,4 @@# Validate input fileif not args.input.exists():print(f"Error: Input file not found: {args.input}")I would move line 2137 above here, so that it could use progress.print.
(This would be very useful when you want to print to something other than the screen. For example, it's responding to a web request or it's on a server without a screen and it's printing to a file.)
@ -984,3 +980,1 @@Path(__file__).parent/ "convert_rdf_to_hf_dataset_streaming_parallel.py")# Use unified converter for all standard RDF datasetsSince you're changing how the software is run, could you also change the descriptions in lines 802-820?
GREAT work. Great simplification of multiple files into one. Thank you for your hard work!
View command line instructions
Checkout
From your project repository, check out a new branch and test the changes.Merge
Merge the changes and update on Forgejo.Warning: The "Autodetect manual merge" setting is not enabled for this repository, you will have to mark this pull request as manually merged afterwards.