first commit

This commit is contained in:
2025-08-07 13:15:31 +01:00
commit d903893b4c
21854 changed files with 4461308 additions and 0 deletions

View File

@ -0,0 +1 @@
# This exists to let mypy find modules here

View File

@ -0,0 +1,26 @@
import ast
import sys
import time
from pegen.testutil import print_memstats
def main() -> None:
t0 = time.time()
for filename in sys.argv[1:]:
print(filename, end="\r")
try:
with open(filename) as file:
source = file.read()
tree = ast.parse(source, filename)
except Exception as err:
print(f"{filename}: {err.__class__.__name__}: {err}", file=sys.stderr)
tok = None
t1 = time.time()
dt = t1 - t0
print(f"Parsed in {dt:.3f} secs", file=sys.stderr)
print_memstats()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,136 @@
#!/usr/bin/env python3
import argparse
import ast
import sys
import os
from time import time
import _peg_parser
try:
import memory_profiler
except ModuleNotFoundError:
print("Please run `make venv` to create a virtual environment and install"
" all the dependencies, before running this script.")
sys.exit(1)
sys.path.insert(0, os.getcwd())
from scripts.test_parse_directory import parse_directory
argparser = argparse.ArgumentParser(
prog="benchmark", description="Reproduce the various pegen benchmarks"
)
argparser.add_argument(
"--parser",
action="store",
choices=["new", "old"],
default="pegen",
help="Which parser to benchmark (default is pegen)",
)
argparser.add_argument(
"--target",
action="store",
choices=["xxl", "stdlib"],
default="xxl",
help="Which target to use for the benchmark (default is xxl.py)",
)
subcommands = argparser.add_subparsers(title="Benchmarks", dest="subcommand")
command_compile = subcommands.add_parser(
"compile", help="Benchmark parsing and compiling to bytecode"
)
command_parse = subcommands.add_parser(
"parse", help="Benchmark parsing and generating an ast.AST"
)
command_notree = subcommands.add_parser(
"notree", help="Benchmark parsing and dumping the tree"
)
def benchmark(func):
def wrapper(*args):
times = list()
for _ in range(3):
start = time()
result = func(*args)
end = time()
times.append(end - start)
memory = memory_profiler.memory_usage((func, args))
print(f"{func.__name__}")
print(f"\tTime: {sum(times)/3:.3f} seconds on an average of 3 runs")
print(f"\tMemory: {max(memory)} MiB on an average of 3 runs")
return result
return wrapper
@benchmark
def time_compile(source, parser):
if parser == "old":
return _peg_parser.compile_string(
source,
oldparser=True,
)
else:
return _peg_parser.compile_string(source)
@benchmark
def time_parse(source, parser):
if parser == "old":
return _peg_parser.parse_string(source, oldparser=True)
else:
return _peg_parser.parse_string(source)
@benchmark
def time_notree(source, parser):
if parser == "old":
return _peg_parser.parse_string(source, oldparser=True, ast=False)
else:
return _peg_parser.parse_string(source, ast=False)
def run_benchmark_xxl(subcommand, parser, source):
if subcommand == "compile":
time_compile(source, parser)
elif subcommand == "parse":
time_parse(source, parser)
elif subcommand == "notree":
time_notree(source, parser)
def run_benchmark_stdlib(subcommand, parser):
modes = {"compile": 2, "parse": 1, "notree": 0}
for _ in range(3):
parse_directory(
"../../Lib",
verbose=False,
excluded_files=["*/bad*", "*/lib2to3/tests/data/*",],
tree_arg=0,
short=True,
mode=modes[subcommand],
oldparser=(parser == "old"),
)
def main():
args = argparser.parse_args()
subcommand = args.subcommand
parser = args.parser
target = args.target
if subcommand is None:
argparser.error("A benchmark to run is required")
if target == "xxl":
with open(os.path.join("data", "xxl.py"), "r") as f:
source = f.read()
run_benchmark_xxl(subcommand, parser, source)
elif target == "stdlib":
run_benchmark_stdlib(subcommand, parser)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3.8
import argparse
import os
import json
from typing import Dict, Any
from urllib.request import urlretrieve
argparser = argparse.ArgumentParser(
prog="download_pypi_packages", description="Helper program to download PyPI packages",
)
argparser.add_argument(
"-n", "--number", type=int, default=100, help="Number of packages to download"
)
argparser.add_argument(
"-a", "--all", action="store_true", help="Download all packages listed in the json file"
)
def load_json(filename: str) -> Dict[Any, Any]:
with open(os.path.join("data", f"{filename}.json"), "r") as f:
j = json.loads(f.read())
return j
def remove_json(filename: str) -> None:
path = os.path.join("data", f"{filename}.json")
os.remove(path)
def download_package_json(package_name: str) -> None:
url = f"https://pypi.org/pypi/{package_name}/json"
urlretrieve(url, os.path.join("data", f"{package_name}.json"))
def download_package_code(name: str, package_json: Dict[Any, Any]) -> None:
source_index = -1
for idx, url_info in enumerate(package_json["urls"]):
if url_info["python_version"] == "source":
source_index = idx
break
filename = package_json["urls"][source_index]["filename"]
url = package_json["urls"][source_index]["url"]
urlretrieve(url, os.path.join("data", "pypi", filename))
def main() -> None:
args = argparser.parse_args()
number_packages = args.number
all_packages = args.all
top_pypi_packages = load_json("top-pypi-packages-365-days")
if all_packages:
top_pypi_packages = top_pypi_packages["rows"]
elif number_packages >= 0 and number_packages <= 4000:
top_pypi_packages = top_pypi_packages["rows"][:number_packages]
else:
raise AssertionError("Unknown value for NUMBER_OF_PACKAGES")
try:
os.mkdir(os.path.join("data", "pypi"))
except FileExistsError:
pass
for package in top_pypi_packages:
package_name = package["project"]
print(f"Downloading JSON Data for {package_name}... ", end="")
download_package_json(package_name)
print("Done")
package_json = load_json(package_name)
try:
print(f"Dowloading and compressing package {package_name} ... ", end="")
download_package_code(package_name, package_json)
print("Done")
except (IndexError, KeyError):
print(f"Could not locate source for {package_name}")
continue
finally:
remove_json(package_name)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,57 @@
#!/usr/bin/env python3.8
"""Find the maximum amount of nesting for an expression that can be parsed
without causing a parse error.
Starting at the INITIAL_NESTING_DEPTH, an expression containing n parenthesis
around a 0 is generated then tested with both the C and Python parsers. We
continue incrementing the number of parenthesis by 10 until both parsers have
failed. As soon as a single parser fails, we stop testing that parser.
The grammar file, initial nesting size, and amount by which the nested size is
incremented on each success can be controlled by changing the GRAMMAR_FILE,
INITIAL_NESTING_DEPTH, or NESTED_INCR_AMT variables.
Usage: python -m scripts.find_max_nesting
"""
import sys
from _peg_parser import parse_string
GRAMMAR_FILE = "data/python.gram"
INITIAL_NESTING_DEPTH = 10
NESTED_INCR_AMT = 10
FAIL = "\033[91m"
ENDC = "\033[0m"
def check_nested_expr(nesting_depth: int) -> bool:
expr = f"{'(' * nesting_depth}0{')' * nesting_depth}"
try:
parse_string(expr)
print(f"Nesting depth of {nesting_depth} is successful")
return True
except Exception as err:
print(f"{FAIL}(Failed with nesting depth of {nesting_depth}{ENDC}")
print(f"{FAIL}\t{err}{ENDC}")
return False
def main() -> None:
print(f"Testing {GRAMMAR_FILE} starting at nesting depth of {INITIAL_NESTING_DEPTH}...")
nesting_depth = INITIAL_NESTING_DEPTH
succeeded = True
while succeeded:
expr = f"{'(' * nesting_depth}0{')' * nesting_depth}"
if succeeded:
succeeded = check_nested_expr(nesting_depth)
nesting_depth += NESTED_INCR_AMT
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,114 @@
#!/usr/bin/env python3.8
""" Convert a grammar into a dot-file suitable for use with GraphViz
For example:
Generate the GraphViz file:
# scripts/grammar_grapher.py data/python.gram > python.gv
Then generate the graph...
# twopi python.gv -Tpng > python_twopi.png
or
# dot python.gv -Tpng > python_dot.png
NOTE: The _dot_ and _twopi_ tools seem to produce the most useful results.
The _circo_ tool is the worst of the bunch. Don't even bother.
"""
import argparse
import sys
from typing import Any, List
sys.path.insert(0, ".")
from pegen.build import build_parser
from pegen.grammar import (
Alt,
Cut,
Grammar,
Group,
Leaf,
Lookahead,
Rule,
NameLeaf,
NamedItem,
Opt,
Repeat,
Rhs,
)
argparser = argparse.ArgumentParser(prog="graph_grammar", description="Graph a grammar tree",)
argparser.add_argument(
"-s",
"--start",
choices=["exec", "eval", "single"],
default="exec",
help="Choose the grammar's start rule (exec, eval or single)",
)
argparser.add_argument("grammar_file", help="The grammar file to graph")
def references_for_item(item: Any) -> List[Any]:
if isinstance(item, Alt):
return [_ref for _item in item.items for _ref in references_for_item(_item)]
elif isinstance(item, Cut):
return []
elif isinstance(item, Group):
return references_for_item(item.rhs)
elif isinstance(item, Lookahead):
return references_for_item(item.node)
elif isinstance(item, NamedItem):
return references_for_item(item.item)
# NOTE NameLeaf must be before Leaf
elif isinstance(item, NameLeaf):
if item.value == "ENDMARKER":
return []
return [item.value]
elif isinstance(item, Leaf):
return []
elif isinstance(item, Opt):
return references_for_item(item.node)
elif isinstance(item, Repeat):
return references_for_item(item.node)
elif isinstance(item, Rhs):
return [_ref for alt in item.alts for _ref in references_for_item(alt)]
elif isinstance(item, Rule):
return references_for_item(item.rhs)
else:
raise RuntimeError(f"Unknown item: {type(item)}")
def main() -> None:
args = argparser.parse_args()
try:
grammar, parser, tokenizer = build_parser(args.grammar_file)
except Exception as err:
print("ERROR: Failed to parse grammar file", file=sys.stderr)
sys.exit(1)
references = {}
for name, rule in grammar.rules.items():
references[name] = set(references_for_item(rule))
# Flatten the start node if has only a single reference
root_node = {"exec": "file", "eval": "eval", "single": "interactive"}[args.start]
print("digraph g1 {")
print('\toverlap="scale";') # Force twopi to scale the graph to avoid overlaps
print(f'\troot="{root_node}";')
print(f"\t{root_node} [color=green, shape=circle];")
for name, refs in references.items():
for ref in refs:
print(f"\t{name} -> {ref};")
print("}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3.8
"""Produce a report about the most-memoable types.
Reads a list of statistics from stdin. Each line must be two numbers,
being a type and a count. We then read some other files and produce a
list sorted by most frequent type.
There should also be something to recognize left-recursive rules.
"""
import os
import re
import sys
from typing import Dict
reporoot = os.path.dirname(os.path.dirname(__file__))
parse_c = os.path.join(reporoot, "peg_extension", "parse.c")
class TypeMapper:
"""State used to map types to names."""
def __init__(self, filename: str) -> None:
self.table: Dict[int, str] = {}
with open(filename) as f:
for line in f:
match = re.match(r"#define (\w+)_type (\d+)", line)
if match:
name, type = match.groups()
if "left" in line.lower():
name += " // Left-recursive"
self.table[int(type)] = name
def lookup(self, type: int) -> str:
return self.table.get(type, str(type))
def main() -> None:
mapper = TypeMapper(parse_c)
table = []
filename = sys.argv[1]
with open(filename) as f:
for lineno, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
# Extra fields ignored
if len(parts) < 2:
print(f"{lineno}: bad input ({line!r})")
continue
try:
type, count = map(int, parts[:2])
except ValueError as err:
print(f"{lineno}: non-integer input ({line!r})")
continue
table.append((type, count))
table.sort(key=lambda values: -values[1])
for type, count in table:
print(f"{type:4d} {count:9d} {mapper.lookup(type)}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3.8
"""Show the parse tree for a given program, nicely formatted.
Example:
$ scripts/show_parse.py a+b
Module(
body=[
Expr(
value=BinOp(
left=Name(id="a", ctx=Load()), op=Add(), right=Name(id="b", ctx=Load())
)
)
],
type_ignores=[],
)
$
Use -v to show line numbers and column offsets.
The formatting is done using black. You can also import this module
and call one of its functions.
"""
import argparse
import ast
import difflib
import os
import sys
import tempfile
import _peg_parser
from typing import List
sys.path.insert(0, os.getcwd())
from pegen.ast_dump import ast_dump
parser = argparse.ArgumentParser()
parser.add_argument(
"-d", "--diff", action="store_true", help="show diff between grammar and ast (requires -g)"
)
parser.add_argument(
"-p",
"--parser",
choices=["new", "old"],
default="new",
help="choose the parser to use"
)
parser.add_argument(
"-m",
"--multiline",
action="store_true",
help="concatenate program arguments using newline instead of space",
)
parser.add_argument("-v", "--verbose", action="store_true", help="show line/column numbers")
parser.add_argument("program", nargs="+", help="program to parse (will be concatenated)")
def format_tree(tree: ast.AST, verbose: bool = False) -> str:
with tempfile.NamedTemporaryFile("w+") as tf:
tf.write(ast_dump(tree, include_attributes=verbose))
tf.write("\n")
tf.flush()
cmd = f"black -q {tf.name}"
sts = os.system(cmd)
if sts:
raise RuntimeError(f"Command {cmd!r} failed with status 0x{sts:x}")
tf.seek(0)
return tf.read()
def diff_trees(a: ast.AST, b: ast.AST, verbose: bool = False) -> List[str]:
sa = format_tree(a, verbose)
sb = format_tree(b, verbose)
la = sa.splitlines()
lb = sb.splitlines()
return list(difflib.unified_diff(la, lb, "a", "b", lineterm=""))
def show_parse(source: str, verbose: bool = False) -> str:
tree = _peg_parser.parse_string(source, oldparser=True)
return format_tree(tree, verbose).rstrip("\n")
def print_parse(source: str, verbose: bool = False) -> None:
print(show_parse(source, verbose))
def main() -> None:
args = parser.parse_args()
new_parser = args.parser == "new"
if args.multiline:
sep = "\n"
else:
sep = " "
program = sep.join(args.program)
if new_parser:
tree = _peg_parser.parse_string(program)
if args.diff:
a = _peg_parser.parse_string(program, oldparser=True)
b = tree
diff = diff_trees(a, b, args.verbose)
if diff:
for line in diff:
print(line)
else:
print("# Trees are the same")
else:
print("# Parsed using the new parser")
print(format_tree(tree, args.verbose))
else:
tree = _peg_parser.parse_string(program, oldparser=True)
print("# Parsed using the old parser")
print(format_tree(tree, args.verbose))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,254 @@
#!/usr/bin/env python3.8
import argparse
import ast
import os
import sys
import time
import tokenize
import _peg_parser
from glob import glob, escape
from pathlib import PurePath
from typing import List, Optional, Any, Tuple
sys.path.insert(0, os.getcwd())
from pegen.ast_dump import ast_dump
from pegen.testutil import print_memstats
from scripts import show_parse
SUCCESS = "\033[92m"
FAIL = "\033[91m"
ENDC = "\033[0m"
COMPILE = 2
PARSE = 1
NOTREE = 0
argparser = argparse.ArgumentParser(
prog="test_parse_directory",
description="Helper program to test directories or files for pegen",
)
argparser.add_argument("-d", "--directory", help="Directory path containing files to test")
argparser.add_argument(
"-e", "--exclude", action="append", default=[], help="Glob(s) for matching files to exclude"
)
argparser.add_argument(
"-s", "--short", action="store_true", help="Only show errors, in a more Emacs-friendly format"
)
argparser.add_argument(
"-v", "--verbose", action="store_true", help="Display detailed errors for failures"
)
argparser.add_argument(
"-t", "--tree", action="count", help="Compare parse tree to official AST", default=0
)
def report_status(
succeeded: bool,
file: str,
verbose: bool,
error: Optional[Exception] = None,
short: bool = False,
) -> None:
if short and succeeded:
return
if succeeded is True:
status = "OK"
COLOR = SUCCESS
else:
status = "Fail"
COLOR = FAIL
if short:
lineno = 0
offset = 0
if isinstance(error, SyntaxError):
lineno = error.lineno or 1
offset = error.offset or 1
message = error.args[0]
else:
message = f"{error.__class__.__name__}: {error}"
print(f"{file}:{lineno}:{offset}: {message}")
else:
print(f"{COLOR}{file:60} {status}{ENDC}")
if error and verbose:
print(f" {str(error.__class__.__name__)}: {error}")
def compare_trees(
actual_tree: ast.AST, file: str, verbose: bool, include_attributes: bool = False,
) -> int:
with open(file) as f:
expected_tree = _peg_parser.parse_string(f.read(), oldparser=True)
expected_text = ast_dump(expected_tree, include_attributes=include_attributes)
actual_text = ast_dump(actual_tree, include_attributes=include_attributes)
if actual_text == expected_text:
if verbose:
print("Tree for {file}:")
print(show_parse.format_tree(actual_tree, include_attributes))
return 0
print(f"Diffing ASTs for {file} ...")
expected = show_parse.format_tree(expected_tree, include_attributes)
actual = show_parse.format_tree(actual_tree, include_attributes)
if verbose:
print("Expected for {file}:")
print(expected)
print("Actual for {file}:")
print(actual)
print(f"Diff for {file}:")
diff = show_parse.diff_trees(expected_tree, actual_tree, include_attributes)
for line in diff:
print(line)
return 1
def parse_file(source: str, file: str, mode: int, oldparser: bool) -> Tuple[Any, float]:
t0 = time.time()
if mode == COMPILE:
result = _peg_parser.compile_string(
source,
filename=file,
oldparser=oldparser,
)
else:
result = _peg_parser.parse_string(
source,
filename=file,
oldparser=oldparser,
ast=(mode == PARSE),
)
t1 = time.time()
return result, t1 - t0
def is_parsing_failure(source: str) -> bool:
try:
_peg_parser.parse_string(source, mode="exec", oldparser=True)
except SyntaxError:
return False
return True
def generate_time_stats(files, total_seconds) -> None:
total_files = len(files)
total_bytes = 0
total_lines = 0
for file in files:
# Count lines and bytes separately
with open(file, "rb") as f:
total_lines += sum(1 for _ in f)
total_bytes += f.tell()
print(
f"Checked {total_files:,} files, {total_lines:,} lines,",
f"{total_bytes:,} bytes in {total_seconds:,.3f} seconds.",
)
if total_seconds > 0:
print(
f"That's {total_lines / total_seconds :,.0f} lines/sec,",
f"or {total_bytes / total_seconds :,.0f} bytes/sec.",
)
def parse_directory(
directory: str,
verbose: bool,
excluded_files: List[str],
tree_arg: int,
short: bool,
mode: int,
oldparser: bool,
) -> int:
if tree_arg:
assert mode == PARSE, "Mode should be 1 (parse), when comparing the generated trees"
if oldparser and tree_arg:
print("Cannot specify tree argument with the cpython parser.", file=sys.stderr)
return 1
# For a given directory, traverse files and attempt to parse each one
# - Output success/failure for each file
errors = 0
files = []
trees = {} # Trees to compare (after everything else is done)
total_seconds = 0
for file in sorted(glob(os.path.join(escape(directory), f"**/*.py"), recursive=True)):
# Only attempt to parse Python files and files that are not excluded
if any(PurePath(file).match(pattern) for pattern in excluded_files):
continue
with tokenize.open(file) as f:
source = f.read()
try:
result, dt = parse_file(source, file, mode, oldparser)
total_seconds += dt
if tree_arg:
trees[file] = result
report_status(succeeded=True, file=file, verbose=verbose, short=short)
except SyntaxError as error:
if is_parsing_failure(source):
print(f"File {file} cannot be parsed by either parser.")
else:
report_status(
succeeded=False, file=file, verbose=verbose, error=error, short=short
)
errors += 1
files.append(file)
t1 = time.time()
generate_time_stats(files, total_seconds)
if short:
print_memstats()
if errors:
print(f"Encountered {errors} failures.", file=sys.stderr)
# Compare trees (the dict is empty unless -t is given)
compare_trees_errors = 0
for file, tree in trees.items():
if not short:
print("Comparing ASTs for", file)
if compare_trees(tree, file, verbose, tree_arg >= 2) == 1:
compare_trees_errors += 1
if errors or compare_trees_errors:
return 1
return 0
def main() -> None:
args = argparser.parse_args()
directory = args.directory
verbose = args.verbose
excluded_files = args.exclude
tree = args.tree
short = args.short
mode = 1 if args.tree else 2
sys.exit(
parse_directory(
directory,
verbose,
excluded_files,
tree,
short,
mode,
oldparser=False,
)
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,92 @@
#!/usr/bin/env python3.8
import argparse
import os
import glob
import tarfile
import zipfile
import shutil
import pathlib
import sys
from typing import Generator, Any
sys.path.insert(0, ".")
from pegen import build
from scripts import test_parse_directory
HERE = pathlib.Path(__file__).resolve().parent
argparser = argparse.ArgumentParser(
prog="test_pypi_packages", description="Helper program to test parsing PyPI packages",
)
argparser.add_argument(
"-t", "--tree", action="count", help="Compare parse tree to official AST", default=0
)
def get_packages() -> Generator[str, None, None]:
all_packages = (
glob.glob("./data/pypi/*.tar.gz")
+ glob.glob("./data/pypi/*.zip")
+ glob.glob("./data/pypi/*.tgz")
)
for package in all_packages:
yield package
def extract_files(filename: str) -> None:
savedir = os.path.join("data", "pypi")
if tarfile.is_tarfile(filename):
tarfile.open(filename).extractall(savedir)
elif zipfile.is_zipfile(filename):
zipfile.ZipFile(filename).extractall(savedir)
else:
raise ValueError(f"Could not identify type of compressed file {filename}")
def find_dirname(package_name: str) -> str:
for name in os.listdir(os.path.join("data", "pypi")):
full_path = os.path.join("data", "pypi", name)
if os.path.isdir(full_path) and name in package_name:
return full_path
assert False # This is to fix mypy, should never be reached
def run_tests(dirname: str, tree: int) -> int:
return test_parse_directory.parse_directory(
dirname,
verbose=False,
excluded_files=[],
tree_arg=tree,
short=True,
mode=1 if tree else 0,
parser="pegen",
)
def main() -> None:
args = argparser.parse_args()
tree = args.tree
for package in get_packages():
print(f"Extracting files from {package}... ", end="")
try:
extract_files(package)
print("Done")
except ValueError as e:
print(e)
continue
print(f"Trying to parse all python files ... ")
dirname = find_dirname(package)
status = run_tests(dirname, tree)
if status == 0:
shutil.rmtree(dirname)
else:
print(f"Failed to parse {dirname}")
if __name__ == "__main__":
main()