fix: fix path too long issue; fix pickling issue in unified converter script #42

Merged
aditya merged 8 commits from path-too-long-fix into stream-download-convert-upload-merge-15-refactor-rdf-converters 2026-01-02 10:12:23 +00:00
Member

The branch includes a fix which was due to system hit a Linux operating system limit because the file path used for internal communication (socket path created for interprocess communication) became too long. This caused the saving process to crash, so the dataset was never written to disk and the upload failed as a result. Apart from this it also fixed the pickling issue in the unified converter script.

The branch includes a fix which was due to system hit a Linux operating system limit because the file path used for internal communication (socket path created for interprocess communication) became too long. This caused the saving process to crash, so the dataset was never written to disk and the upload failed as a result. Apart from this it also fixed the pickling issue in the unified converter script.
brent.edwards left a comment
Member

I found a few ways to speed up the code or simplify it, but overall, good work! Approved!

I found a few ways to speed up the code or simplify it, but overall, good work! Approved!
@ -377,0 +389,4 @@
try:
with mp.Pool(processes=num_workers) as pool:
for triples in pool.imap_unordered(process_geonames_lines, batches, chunksize=1):
Member

According to https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap :

For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

(Also on line 414.)

According to https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap : > For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1. (Also on line 414.)
Author
Member

I now set it equal to 10

I now set it equal to 10
@ -377,0 +424,4 @@
Module-level function for use in picklable generators.
"""
is_gzipped = file_path.suffix == ".gz"
Member

This can be simplified somewhat. The only time is_gzipped is used is line 433, so you don't need to create a variable. Just put the condition in line 433.

This can be simplified somewhat. The only time `is_gzipped` is used is line 433, so you don't need to create a variable. Just put the condition in line 433.
Author
Member

Fixed !

Fixed !
@ -377,0 +425,4 @@
Module-level function for use in picklable generators.
"""
is_gzipped = file_path.suffix == ".gz"
is_bz2 = file_path.suffix == ".bz2" or str(file_path).endswith(".ttl.bz2")
Member

The second condition is not needed. For example:

>>> from pathlib import Path
>>> p = Path("/mydir/foo.ttl.bz2")
>>> p.suffix
'.bz2'
The second condition is not needed. For example: ``` >>> from pathlib import Path >>> p = Path("/mydir/foo.ttl.bz2") >>> p.suffix '.bz2' ```
Member

Same comment about not needing to create the variable is_bz2.

Same comment about not needing to create the variable `is_bz2`.
Author
Member

Fixed !

Fixed !
@ -377,0 +465,4 @@
triples = []
for s, p, o in graph:
if isinstance(o, Literal):
Member

This code already exists in the extract_triple method in lines 214-235.

This code already exists in the `extract_triple` method in lines 214-235.
Author
Member

Fixed !

Fixed !
@ -377,0 +510,4 @@
triples = []
for s, p, o in graph:
if isinstance(o, Literal):
Member

This code already exists in the extract_triple code in lines 214-235.

This code already exists in the `extract_triple` code in lines 214-235.
Author
Member

Fixed !

Fixed !
@ -377,0 +553,4 @@
current_chunk = []
for s, p, o in graph:
if isinstance(o, Literal):
Member

This code already exists in the extract_triple code of lines 214-235.

This code already exists in the `extract_triple` code of lines 214-235.
Author
Member

Fixed !

Fixed !
@ -1022,3 +1148,3 @@
rate = triple_count / elapsed if elapsed > 0 else 0
progress_pct = 10 + min(60, int((chunk_count / estimated_chunks) * 60))
self.progress.print(
print(
Member

Why isn't this using self.progress?

Why isn't this using `self.progress`?
Author
Member

self.progress contains a Rich Console object, which is not picklable. Capturing self.progress in the generator closure would make it non-picklable.

self.progress contains a Rich Console object, which is not picklable. Capturing self.progress in the generator closure would make it non-picklable.
@ -1027,2 +1153,3 @@
flush=True,
)
self.progress.emit_progress(progress_pct)
print(f"PROGRESS: {progress_pct}", flush=True)
Member

(Same question.)

(Same question.)
Author
Member

self.progress contains a Rich Console object, which is not picklable. Capturing self.progress in the generator closure would make it non-picklable.

self.progress contains a Rich Console object, which is not picklable. Capturing self.progress in the generator closure would make it non-picklable.
@ -1205,1 +1206,4 @@
# Fix for "AF_UNIX path too long" error in multiprocessing
# This forces the temporary directory to be /tmp (short path) instead of a potentially deep workspace path
os.environ["TMPDIR"] = "/tmp"
Member

Superb choice. Thanks.

Superb choice. Thanks.
aditya merged commit 76de466d99 into stream-download-convert-upload-merge-15-refactor-rdf-converters 2026-01-02 10:12:23 +00:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
cleverdatasets/dataset-uploader!42
No description provided.