summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py677
1 files changed, 0 insertions, 677 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py
deleted file mode 100644
index c764e8c..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py
+++ /dev/null
@@ -1,677 +0,0 @@
-# dialects/mysql/reflection.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-import re
-
-from .enumerated import ENUM
-from .enumerated import SET
-from .types import DATETIME
-from .types import TIME
-from .types import TIMESTAMP
-from ... import log
-from ... import types as sqltypes
-from ... import util
-
-
-class ReflectedState:
- """Stores raw information about a SHOW CREATE TABLE statement."""
-
- def __init__(self):
- self.columns = []
- self.table_options = {}
- self.table_name = None
- self.keys = []
- self.fk_constraints = []
- self.ck_constraints = []
-
-
-@log.class_logger
-class MySQLTableDefinitionParser:
- """Parses the results of a SHOW CREATE TABLE statement."""
-
- def __init__(self, dialect, preparer):
- self.dialect = dialect
- self.preparer = preparer
- self._prep_regexes()
-
- def parse(self, show_create, charset):
- state = ReflectedState()
- state.charset = charset
- for line in re.split(r"\r?\n", show_create):
- if line.startswith(" " + self.preparer.initial_quote):
- self._parse_column(line, state)
- # a regular table options line
- elif line.startswith(") "):
- self._parse_table_options(line, state)
- # an ANSI-mode table options line
- elif line == ")":
- pass
- elif line.startswith("CREATE "):
- self._parse_table_name(line, state)
- elif "PARTITION" in line:
- self._parse_partition_options(line, state)
- # Not present in real reflection, but may be if
- # loading from a file.
- elif not line:
- pass
- else:
- type_, spec = self._parse_constraints(line)
- if type_ is None:
- util.warn("Unknown schema content: %r" % line)
- elif type_ == "key":
- state.keys.append(spec)
- elif type_ == "fk_constraint":
- state.fk_constraints.append(spec)
- elif type_ == "ck_constraint":
- state.ck_constraints.append(spec)
- else:
- pass
- return state
-
- def _check_view(self, sql: str) -> bool:
- return bool(self._re_is_view.match(sql))
-
- def _parse_constraints(self, line):
- """Parse a KEY or CONSTRAINT line.
-
- :param line: A line of SHOW CREATE TABLE output
- """
-
- # KEY
- m = self._re_key.match(line)
- if m:
- spec = m.groupdict()
- # convert columns into name, length pairs
- # NOTE: we may want to consider SHOW INDEX as the
- # format of indexes in MySQL becomes more complex
- spec["columns"] = self._parse_keyexprs(spec["columns"])
- if spec["version_sql"]:
- m2 = self._re_key_version_sql.match(spec["version_sql"])
- if m2 and m2.groupdict()["parser"]:
- spec["parser"] = m2.groupdict()["parser"]
- if spec["parser"]:
- spec["parser"] = self.preparer.unformat_identifiers(
- spec["parser"]
- )[0]
- return "key", spec
-
- # FOREIGN KEY CONSTRAINT
- m = self._re_fk_constraint.match(line)
- if m:
- spec = m.groupdict()
- spec["table"] = self.preparer.unformat_identifiers(spec["table"])
- spec["local"] = [c[0] for c in self._parse_keyexprs(spec["local"])]
- spec["foreign"] = [
- c[0] for c in self._parse_keyexprs(spec["foreign"])
- ]
- return "fk_constraint", spec
-
- # CHECK constraint
- m = self._re_ck_constraint.match(line)
- if m:
- spec = m.groupdict()
- return "ck_constraint", spec
-
- # PARTITION and SUBPARTITION
- m = self._re_partition.match(line)
- if m:
- # Punt!
- return "partition", line
-
- # No match.
- return (None, line)
-
- def _parse_table_name(self, line, state):
- """Extract the table name.
-
- :param line: The first line of SHOW CREATE TABLE
- """
-
- regex, cleanup = self._pr_name
- m = regex.match(line)
- if m:
- state.table_name = cleanup(m.group("name"))
-
- def _parse_table_options(self, line, state):
- """Build a dictionary of all reflected table-level options.
-
- :param line: The final line of SHOW CREATE TABLE output.
- """
-
- options = {}
-
- if line and line != ")":
- rest_of_line = line
- for regex, cleanup in self._pr_options:
- m = regex.search(rest_of_line)
- if not m:
- continue
- directive, value = m.group("directive"), m.group("val")
- if cleanup:
- value = cleanup(value)
- options[directive.lower()] = value
- rest_of_line = regex.sub("", rest_of_line)
-
- for nope in ("auto_increment", "data directory", "index directory"):
- options.pop(nope, None)
-
- for opt, val in options.items():
- state.table_options["%s_%s" % (self.dialect.name, opt)] = val
-
- def _parse_partition_options(self, line, state):
- options = {}
- new_line = line[:]
-
- while new_line.startswith("(") or new_line.startswith(" "):
- new_line = new_line[1:]
-
- for regex, cleanup in self._pr_options:
- m = regex.search(new_line)
- if not m or "PARTITION" not in regex.pattern:
- continue
-
- directive = m.group("directive")
- directive = directive.lower()
- is_subpartition = directive == "subpartition"
-
- if directive == "partition" or is_subpartition:
- new_line = new_line.replace(") */", "")
- new_line = new_line.replace(",", "")
- if is_subpartition and new_line.endswith(")"):
- new_line = new_line[:-1]
- if self.dialect.name == "mariadb" and new_line.endswith(")"):
- if (
- "MAXVALUE" in new_line
- or "MINVALUE" in new_line
- or "ENGINE" in new_line
- ):
- # final line of MariaDB partition endswith ")"
- new_line = new_line[:-1]
-
- defs = "%s_%s_definitions" % (self.dialect.name, directive)
- options[defs] = new_line
-
- else:
- directive = directive.replace(" ", "_")
- value = m.group("val")
- if cleanup:
- value = cleanup(value)
- options[directive] = value
- break
-
- for opt, val in options.items():
- part_def = "%s_partition_definitions" % (self.dialect.name)
- subpart_def = "%s_subpartition_definitions" % (self.dialect.name)
- if opt == part_def or opt == subpart_def:
- # builds a string of definitions
- if opt not in state.table_options:
- state.table_options[opt] = val
- else:
- state.table_options[opt] = "%s, %s" % (
- state.table_options[opt],
- val,
- )
- else:
- state.table_options["%s_%s" % (self.dialect.name, opt)] = val
-
- def _parse_column(self, line, state):
- """Extract column details.
-
- Falls back to a 'minimal support' variant if full parse fails.
-
- :param line: Any column-bearing line from SHOW CREATE TABLE
- """
-
- spec = None
- m = self._re_column.match(line)
- if m:
- spec = m.groupdict()
- spec["full"] = True
- else:
- m = self._re_column_loose.match(line)
- if m:
- spec = m.groupdict()
- spec["full"] = False
- if not spec:
- util.warn("Unknown column definition %r" % line)
- return
- if not spec["full"]:
- util.warn("Incomplete reflection of column definition %r" % line)
-
- name, type_, args = spec["name"], spec["coltype"], spec["arg"]
-
- try:
- col_type = self.dialect.ischema_names[type_]
- except KeyError:
- util.warn(
- "Did not recognize type '%s' of column '%s'" % (type_, name)
- )
- col_type = sqltypes.NullType
-
- # Column type positional arguments eg. varchar(32)
- if args is None or args == "":
- type_args = []
- elif args[0] == "'" and args[-1] == "'":
- type_args = self._re_csv_str.findall(args)
- else:
- type_args = [int(v) for v in self._re_csv_int.findall(args)]
-
- # Column type keyword options
- type_kw = {}
-
- if issubclass(col_type, (DATETIME, TIME, TIMESTAMP)):
- if type_args:
- type_kw["fsp"] = type_args.pop(0)
-
- for kw in ("unsigned", "zerofill"):
- if spec.get(kw, False):
- type_kw[kw] = True
- for kw in ("charset", "collate"):
- if spec.get(kw, False):
- type_kw[kw] = spec[kw]
- if issubclass(col_type, (ENUM, SET)):
- type_args = _strip_values(type_args)
-
- if issubclass(col_type, SET) and "" in type_args:
- type_kw["retrieve_as_bitwise"] = True
-
- type_instance = col_type(*type_args, **type_kw)
-
- col_kw = {}
-
- # NOT NULL
- col_kw["nullable"] = True
- # this can be "NULL" in the case of TIMESTAMP
- if spec.get("notnull", False) == "NOT NULL":
- col_kw["nullable"] = False
- # For generated columns, the nullability is marked in a different place
- if spec.get("notnull_generated", False) == "NOT NULL":
- col_kw["nullable"] = False
-
- # AUTO_INCREMENT
- if spec.get("autoincr", False):
- col_kw["autoincrement"] = True
- elif issubclass(col_type, sqltypes.Integer):
- col_kw["autoincrement"] = False
-
- # DEFAULT
- default = spec.get("default", None)
-
- if default == "NULL":
- # eliminates the need to deal with this later.
- default = None
-
- comment = spec.get("comment", None)
-
- if comment is not None:
- comment = cleanup_text(comment)
-
- sqltext = spec.get("generated")
- if sqltext is not None:
- computed = dict(sqltext=sqltext)
- persisted = spec.get("persistence")
- if persisted is not None:
- computed["persisted"] = persisted == "STORED"
- col_kw["computed"] = computed
-
- col_d = dict(
- name=name, type=type_instance, default=default, comment=comment
- )
- col_d.update(col_kw)
- state.columns.append(col_d)
-
- def _describe_to_create(self, table_name, columns):
- """Re-format DESCRIBE output as a SHOW CREATE TABLE string.
-
- DESCRIBE is a much simpler reflection and is sufficient for
- reflecting views for runtime use. This method formats DDL
- for columns only- keys are omitted.
-
- :param columns: A sequence of DESCRIBE or SHOW COLUMNS 6-tuples.
- SHOW FULL COLUMNS FROM rows must be rearranged for use with
- this function.
- """
-
- buffer = []
- for row in columns:
- (name, col_type, nullable, default, extra) = (
- row[i] for i in (0, 1, 2, 4, 5)
- )
-
- line = [" "]
- line.append(self.preparer.quote_identifier(name))
- line.append(col_type)
- if not nullable:
- line.append("NOT NULL")
- if default:
- if "auto_increment" in default:
- pass
- elif col_type.startswith("timestamp") and default.startswith(
- "C"
- ):
- line.append("DEFAULT")
- line.append(default)
- elif default == "NULL":
- line.append("DEFAULT")
- line.append(default)
- else:
- line.append("DEFAULT")
- line.append("'%s'" % default.replace("'", "''"))
- if extra:
- line.append(extra)
-
- buffer.append(" ".join(line))
-
- return "".join(
- [
- (
- "CREATE TABLE %s (\n"
- % self.preparer.quote_identifier(table_name)
- ),
- ",\n".join(buffer),
- "\n) ",
- ]
- )
-
- def _parse_keyexprs(self, identifiers):
- """Unpack '"col"(2),"col" ASC'-ish strings into components."""
-
- return [
- (colname, int(length) if length else None, modifiers)
- for colname, length, modifiers in self._re_keyexprs.findall(
- identifiers
- )
- ]
-
- def _prep_regexes(self):
- """Pre-compile regular expressions."""
-
- self._re_columns = []
- self._pr_options = []
-
- _final = self.preparer.final_quote
-
- quotes = dict(
- zip(
- ("iq", "fq", "esc_fq"),
- [
- re.escape(s)
- for s in (
- self.preparer.initial_quote,
- _final,
- self.preparer._escape_identifier(_final),
- )
- ],
- )
- )
-
- self._pr_name = _pr_compile(
- r"^CREATE (?:\w+ +)?TABLE +"
- r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +\($" % quotes,
- self.preparer._unescape_identifier,
- )
-
- self._re_is_view = _re_compile(r"^CREATE(?! TABLE)(\s.*)?\sVIEW")
-
- # `col`,`col2`(32),`col3`(15) DESC
- #
- self._re_keyexprs = _re_compile(
- r"(?:"
- r"(?:%(iq)s((?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)"
- r"(?:\((\d+)\))?(?: +(ASC|DESC))?(?=\,|$))+" % quotes
- )
-
- # 'foo' or 'foo','bar' or 'fo,o','ba''a''r'
- self._re_csv_str = _re_compile(r"\x27(?:\x27\x27|[^\x27])*\x27")
-
- # 123 or 123,456
- self._re_csv_int = _re_compile(r"\d+")
-
- # `colname` <type> [type opts]
- # (NOT NULL | NULL)
- # DEFAULT ('value' | CURRENT_TIMESTAMP...)
- # COMMENT 'comment'
- # COLUMN_FORMAT (FIXED|DYNAMIC|DEFAULT)
- # STORAGE (DISK|MEMORY)
- self._re_column = _re_compile(
- r" "
- r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
- r"(?P<coltype>\w+)"
- r"(?:\((?P<arg>(?:\d+|\d+,\d+|"
- r"(?:'(?:''|[^'])*',?)+))\))?"
- r"(?: +(?P<unsigned>UNSIGNED))?"
- r"(?: +(?P<zerofill>ZEROFILL))?"
- r"(?: +CHARACTER SET +(?P<charset>[\w_]+))?"
- r"(?: +COLLATE +(?P<collate>[\w_]+))?"
- r"(?: +(?P<notnull>(?:NOT )?NULL))?"
- r"(?: +DEFAULT +(?P<default>"
- r"(?:NULL|'(?:''|[^'])*'|[\-\w\.\(\)]+"
- r"(?: +ON UPDATE [\-\w\.\(\)]+)?)"
- r"))?"
- r"(?: +(?:GENERATED ALWAYS)? ?AS +(?P<generated>\("
- r".*\))? ?(?P<persistence>VIRTUAL|STORED)?"
- r"(?: +(?P<notnull_generated>(?:NOT )?NULL))?"
- r")?"
- r"(?: +(?P<autoincr>AUTO_INCREMENT))?"
- r"(?: +COMMENT +'(?P<comment>(?:''|[^'])*)')?"
- r"(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?"
- r"(?: +STORAGE +(?P<storage>\w+))?"
- r"(?: +(?P<extra>.*))?"
- r",?$" % quotes
- )
-
- # Fallback, try to parse as little as possible
- self._re_column_loose = _re_compile(
- r" "
- r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
- r"(?P<coltype>\w+)"
- r"(?:\((?P<arg>(?:\d+|\d+,\d+|\x27(?:\x27\x27|[^\x27])+\x27))\))?"
- r".*?(?P<notnull>(?:NOT )NULL)?" % quotes
- )
-
- # (PRIMARY|UNIQUE|FULLTEXT|SPATIAL) INDEX `name` (USING (BTREE|HASH))?
- # (`col` (ASC|DESC)?, `col` (ASC|DESC)?)
- # KEY_BLOCK_SIZE size | WITH PARSER name /*!50100 WITH PARSER name */
- self._re_key = _re_compile(
- r" "
- r"(?:(?P<type>\S+) )?KEY"
- r"(?: +%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)?"
- r"(?: +USING +(?P<using_pre>\S+))?"
- r" +\((?P<columns>.+?)\)"
- r"(?: +USING +(?P<using_post>\S+))?"
- r"(?: +KEY_BLOCK_SIZE *[ =]? *(?P<keyblock>\S+))?"
- r"(?: +WITH PARSER +(?P<parser>\S+))?"
- r"(?: +COMMENT +(?P<comment>(\x27\x27|\x27([^\x27])*?\x27)+))?"
- r"(?: +/\*(?P<version_sql>.+)\*/ *)?"
- r",?$" % quotes
- )
-
- # https://forums.mysql.com/read.php?20,567102,567111#msg-567111
- # It means if the MySQL version >= \d+, execute what's in the comment
- self._re_key_version_sql = _re_compile(
- r"\!\d+ " r"(?: *WITH PARSER +(?P<parser>\S+) *)?"
- )
-
- # CONSTRAINT `name` FOREIGN KEY (`local_col`)
- # REFERENCES `remote` (`remote_col`)
- # MATCH FULL | MATCH PARTIAL | MATCH SIMPLE
- # ON DELETE CASCADE ON UPDATE RESTRICT
- #
- # unique constraints come back as KEYs
- kw = quotes.copy()
- kw["on"] = "RESTRICT|CASCADE|SET NULL|NO ACTION"
- self._re_fk_constraint = _re_compile(
- r" "
- r"CONSTRAINT +"
- r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
- r"FOREIGN KEY +"
- r"\((?P<local>[^\)]+?)\) REFERENCES +"
- r"(?P<table>%(iq)s[^%(fq)s]+%(fq)s"
- r"(?:\.%(iq)s[^%(fq)s]+%(fq)s)?) +"
- r"\((?P<foreign>(?:%(iq)s[^%(fq)s]+%(fq)s(?: *, *)?)+)\)"
- r"(?: +(?P<match>MATCH \w+))?"
- r"(?: +ON DELETE (?P<ondelete>%(on)s))?"
- r"(?: +ON UPDATE (?P<onupdate>%(on)s))?" % kw
- )
-
- # CONSTRAINT `CONSTRAINT_1` CHECK (`x` > 5)'
- # testing on MariaDB 10.2 shows that the CHECK constraint
- # is returned on a line by itself, so to match without worrying
- # about parenthesis in the expression we go to the end of the line
- self._re_ck_constraint = _re_compile(
- r" "
- r"CONSTRAINT +"
- r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
- r"CHECK +"
- r"\((?P<sqltext>.+)\),?" % kw
- )
-
- # PARTITION
- #
- # punt!
- self._re_partition = _re_compile(r"(?:.*)(?:SUB)?PARTITION(?:.*)")
-
- # Table-level options (COLLATE, ENGINE, etc.)
- # Do the string options first, since they have quoted
- # strings we need to get rid of.
- for option in _options_of_type_string:
- self._add_option_string(option)
-
- for option in (
- "ENGINE",
- "TYPE",
- "AUTO_INCREMENT",
- "AVG_ROW_LENGTH",
- "CHARACTER SET",
- "DEFAULT CHARSET",
- "CHECKSUM",
- "COLLATE",
- "DELAY_KEY_WRITE",
- "INSERT_METHOD",
- "MAX_ROWS",
- "MIN_ROWS",
- "PACK_KEYS",
- "ROW_FORMAT",
- "KEY_BLOCK_SIZE",
- "STATS_SAMPLE_PAGES",
- ):
- self._add_option_word(option)
-
- for option in (
- "PARTITION BY",
- "SUBPARTITION BY",
- "PARTITIONS",
- "SUBPARTITIONS",
- "PARTITION",
- "SUBPARTITION",
- ):
- self._add_partition_option_word(option)
-
- self._add_option_regex("UNION", r"\([^\)]+\)")
- self._add_option_regex("TABLESPACE", r".*? STORAGE DISK")
- self._add_option_regex(
- "RAID_TYPE",
- r"\w+\s+RAID_CHUNKS\s*\=\s*\w+RAID_CHUNKSIZE\s*=\s*\w+",
- )
-
- _optional_equals = r"(?:\s*(?:=\s*)|\s+)"
-
- def _add_option_string(self, directive):
- regex = r"(?P<directive>%s)%s" r"'(?P<val>(?:[^']|'')*?)'(?!')" % (
- re.escape(directive),
- self._optional_equals,
- )
- self._pr_options.append(_pr_compile(regex, cleanup_text))
-
- def _add_option_word(self, directive):
- regex = r"(?P<directive>%s)%s" r"(?P<val>\w+)" % (
- re.escape(directive),
- self._optional_equals,
- )
- self._pr_options.append(_pr_compile(regex))
-
- def _add_partition_option_word(self, directive):
- if directive == "PARTITION BY" or directive == "SUBPARTITION BY":
- regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\w+.*)" % (
- re.escape(directive),
- self._optional_equals,
- )
- elif directive == "SUBPARTITIONS" or directive == "PARTITIONS":
- regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\d+)" % (
- re.escape(directive),
- self._optional_equals,
- )
- else:
- regex = r"(?<!\S)(?P<directive>%s)(?!\S)" % (re.escape(directive),)
- self._pr_options.append(_pr_compile(regex))
-
- def _add_option_regex(self, directive, regex):
- regex = r"(?P<directive>%s)%s" r"(?P<val>%s)" % (
- re.escape(directive),
- self._optional_equals,
- regex,
- )
- self._pr_options.append(_pr_compile(regex))
-
-
-_options_of_type_string = (
- "COMMENT",
- "DATA DIRECTORY",
- "INDEX DIRECTORY",
- "PASSWORD",
- "CONNECTION",
-)
-
-
-def _pr_compile(regex, cleanup=None):
- """Prepare a 2-tuple of compiled regex and callable."""
-
- return (_re_compile(regex), cleanup)
-
-
-def _re_compile(regex):
- """Compile a string to regex, I and UNICODE."""
-
- return re.compile(regex, re.I | re.UNICODE)
-
-
-def _strip_values(values):
- "Strip reflected values quotes"
- strip_values = []
- for a in values:
- if a[0:1] == '"' or a[0:1] == "'":
- # strip enclosing quotes and unquote interior
- a = a[1:-1].replace(a[0] * 2, a[0])
- strip_values.append(a)
- return strip_values
-
-
-def cleanup_text(raw_text: str) -> str:
- if "\\" in raw_text:
- raw_text = re.sub(
- _control_char_regexp, lambda s: _control_char_map[s[0]], raw_text
- )
- return raw_text.replace("''", "'")
-
-
-_control_char_map = {
- "\\\\": "\\",
- "\\0": "\0",
- "\\a": "\a",
- "\\b": "\b",
- "\\t": "\t",
- "\\n": "\n",
- "\\v": "\v",
- "\\f": "\f",
- "\\r": "\r",
- # '\\e':'\e',
-}
-_control_char_regexp = re.compile(
- "|".join(re.escape(k) for k in _control_char_map)
-)