xml-streaming-optimization #49
No reviewers
Labels
No labels
Blocked
Bounty
$100
Bounty
$1000
Bounty
$10000
Bounty
$20
Bounty
$2000
Bounty
$250
Bounty
$50
Bounty
$500
Bounty
$5000
Bounty
$750
MoSCoW
Could have
MoSCoW
Must have
MoSCoW
Should have
Needs feedback
Points
1
Points
13
Points
2
Points
21
Points
3
Points
34
Points
5
Points
55
Points
8
Points
88
Priority
Backlog
Priority
Critical
Priority
High
Priority
Low
Priority
Medium
Signed-off: Owner
Signed-off: Scrum Master
Signed-off: Tech Lead
Spike
State
Completed
State
Duplicate
State
In Progress
State
In Review
State
Paused
State
Unverified
State
Verified
State
Wont Do
Type
Bug
Type
Discussion
Type
Documentation
Type
Epic
Type
Feature
Type
Legendary
Type
Support
Type
Task
Type
Testing
No milestone
No project
No assignees
3 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
cleverdatasets/dataset-uploader!49
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "xml-streaming-optimization"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
This branch contains optimized code which skips the description parsing during the resume and only counts the description count, this optimization reduces the overhead of parsing the entire dataset again. This code also contain multi threading implementation for description parsing and checkpoint update using tracker sheet.
@ -370,0 +406,4 @@self.sheet.values().get(spreadsheetId=self.spreadsheet_id,range=f"{self.worksheet_name}!A{row_num}:Z{row_num}",Hardcoding A and Z columns looks suspicious - is this just a conservative estimate or do we know that the data we want is in this subset of the sheet?
Fixed !!
When I run
bandit featureson this code, I get the following summary:I can't pass this code until the tests pass.
@ -114,2 +118,4 @@["error", "error message"])self.last_shard_col = self._find_column_index(["last completed shard", "last shard", "shard"]Should this list include
"last_completed_shard"and"last_shard"?fixed !!
@ -158,0 +183,4 @@# Use dynamic range: A to max_column (or Z if no columns found)if max_col >= 0:end_col = self._col_letter(max_col + 1) # +1 because _col_letter is 1-indexedruff checkreports:fixed !!
@ -370,0 +447,4 @@).execute())values = result.get("values", [])Aside from the choice of columns, lines 174-200 and lines 417-450 look extremely similar. You can turn them into a function.
fixed !!
@ -370,0 +433,4 @@# Use dynamic range: A to max_column (or Z if no columns found)if max_col >= 0:end_col = self._col_letter(max_col + 1) # +1 because _col_letter is 1-indexedruff checkreports:fixed !!
@ -370,0 +532,4 @@)if self.last_updated_col is not None:from datetime import datetimeIt's usually best to put all imports at the top of the file. (I can't complain too hard;
ruff checkdoesn't include this any longer.)fixed !!
@ -150,2 +176,4 @@console = Console()console.print("[yellow]Streaming Turtle from line iterator[/yellow]")if skip_triples > 0:console.print(f"[dim]Resume mode: will skip {skip_triples:,} already-processed triples[/dim]")ruff checkreports:fixed !!
@ -152,0 +179,4 @@console.print(f"[dim]Resume mode: will skip {skip_triples:,} already-processed triples[/dim]")if num_workers is None:num_workers = max(1, mp.cpu_count() - 1)According to multiprocessing.cpu_count() :
It might be better to measure the number of usable CPUs instead of the number of CPUs.
fixed !!
@ -193,0 +232,4 @@console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")continueelif total_triples == skip_triples:console.print(f"[green]✓ Skipped {skip_triples:,} triples, resuming normal parsing[/green]")ruff checkreports:fixed !!
@ -199,1 +262,3 @@console.print(f"[dim]Processed {line_count:,} lines...[/dim]")if line_count % PROGRESS_LOG_INTERVAL == 0:if total_triples < skip_triples:console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")ruff checkreports:fixed !!
@ -159,0 +189,4 @@# Extract prefix declarations firstbuffered_first_line = Nonefor line in lines:line_count += 1It might be easier to use
enumerate.fixed !!
@ -190,0 +210,4 @@if buffered_first_line:yield buffered_first_linefor line in lines:yield lineruff checkreports:fixed !!
@ -193,0 +229,4 @@if total_triples < skip_triples:if total_triples % PROGRESS_LOG_INTERVAL == 0:console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")ruff checkreports:fixed !!
@ -193,0 +238,4 @@triple_count += 1if triple_count >= chunk_size:chunk_text = prefix_text + "\n" + "".join(current_chunk) + "\n" + lineruff checkreports:fixed !!
@ -221,1 +365,4 @@"""console = Console()if skip_triples > 0:console.print(f"[dim]Resume mode: will skip {skip_triples:,} already-processed triples[/dim]")ruff checkreports:fixed !!
@ -227,0 +379,4 @@if total_triples < skip_triples:if total_triples % PROGRESS_LOG_INTERVAL == 0:console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")ruff checkreports:fixed !!
@ -227,0 +382,4 @@console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")continueelif total_triples == skip_triples:console.print(f"[green]✓ Skipped {skip_triples:,} triples, resuming normal parsing[/green]")ruff checkreports:fixed !!
@ -211,0 +274,4 @@if triples:yield triplesexcept Exception as e:logger.error(f"Error parsing final Turtle chunk: {e}")Lines 242-252 and lines 270-277 are extremely similar; should they be one method?
fixed !!
@ -211,0 +303,4 @@if total_triples < skip_triples:if total_triples % PROGRESS_LOG_INTERVAL == 0:console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")ruff checkreports:fixed !!
@ -211,0 +306,4 @@console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")continueelif total_triples == skip_triples:console.print(f"[green]✓ Skipped {skip_triples:,} triples, resuming normal parsing[/green]")ruff checkreports:fixed !!
@ -211,0 +309,4 @@console.print(f"[green]✓ Skipped {skip_triples:,} triples, resuming normal parsing[/green]")continuesub_chunk_triple_count += 1There are a lot of similarities between lines 220-238 and 294-312; can they be unified?
fixed !!
@ -211,0 +319,4 @@sub_chunk_triple_count = 0if len(sub_chunks) >= batch_size:for idx, triples in pool.imap(_parse_turtle_chunk, sub_chunks):ruff checkreports:fixed !!
@ -319,0 +482,4 @@return []def _parse_description_batch(args: tuple[int, str, str | None]) -> tuple[int, list[dict]]:ruff checkreports:fixed !!
@ -211,0 +331,4 @@if line_count % PROGRESS_LOG_INTERVAL == 0:if total_triples < skip_triples:console.print(f"[dim]Fast-forward: {total_triples:,} / {skip_triples:,} triples[/dim]")ruff checkreports:fixed !!
@ -211,0 +342,4 @@# Process remaining batchif sub_chunks:for idx, triples in pool.imap(_parse_turtle_chunk, sub_chunks):ruff checkreports:fixed !!
@ -340,2 +523,4 @@console = Console()console.print("[yellow]Streaming RDF/XML with standard XML parser[/yellow]")if skip_descriptions > 0:console.print(f"[dim]Resuming: will skip {skip_descriptions:,} already-processed descriptions[/dim]")ruff checkreports:fixed !!
@ -359,0 +564,4 @@elem.clear()if total_descriptions % PROGRESS_LOG_INTERVAL == 0:console.print(f"[dim]Skipping: {total_descriptions:,} / {skip_descriptions:,}[/dim]"ruff checkreports:fixed !!
@ -376,2 +575,2 @@triple_batch = []batch_description_count = 0if len(parse_batch) >= parse_batch_size:for idx, triples in pool.imap(_parse_description_batch, parse_batch):ruff checkreports:fixed !!
@ -388,1 +596,4 @@logger.error("This may indicate malformed XML in the source file")finally:if parse_batch:for idx, triples in pool.imap(_parse_description_batch, parse_batch):ruff checkreports:fixed !!
@ -440,2 +661,3 @@yield from stream_turtle_chunks_from_lines(lines, chunk_size, skip_triples, num_workers)elif format in ("xml", "rdf", "rdfxml"):yield from stream_rdfxml_chunks_from_lines(lines, chunk_size=chunk_size)yield from stream_rdfxml_chunks_from_lines(lines, chunk_size=chunk_size, skip_descriptions=skip_descriptions, num_workers=num_workers)ruff checkreports:fixed !!
@ -438,2 +659,3 @@yield from stream_ntriples_from_lines(lines, skip_triples)elif format in ("turtle", "ttl"):yield from stream_turtle_chunks_from_lines(lines, chunk_size=chunk_size)yield from stream_turtle_chunks_from_lines(lines, chunk_size, skip_triples, num_workers)ruff checkreports:fixed !!
@ -638,3 +864,3 @@chunk_iter = stream_rdf_chunks_from_lines(lines, format=rdf_format, chunk_size=DEFAULT_BATCH_SIZElines, rdf_format, DEFAULT_BATCH_SIZE, skip_descriptions, skip_triples, num_workersruff checkreports:fixed !!
@ -1459,0 +1730,4 @@if checkpoint_source:console.print(f"[yellow]Resuming from shard {start_shard} (loaded from {checkpoint_source})[/yellow]")if skip_descriptions > 0:console.print(f"[yellow]Will skip {skip_descriptions:,} already-processed descriptions[/yellow]")ruff checkreports:fixed !!
@ -1459,0 +1728,4 @@checkpoint_source = "sheet"if checkpoint_source:console.print(f"[yellow]Resuming from shard {start_shard} (loaded from {checkpoint_source})[/yellow]")ruff checkreports:fixed !!
View command line instructions
Checkout
From your project repository, check out a new branch and test the changes.Merge
Merge the changes and update on Forgejo.Warning: The "Autodetect manual merge" setting is not enabled for this repository, you will have to mark this pull request as manually merged afterwards.