123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import re
- from typing import Dict
- from typing import List
- from typing import TypeVar
- import nbformat
- from traitlets.config import LoggingConfigurable
- # Setup forward reference for type hint on return from class factory method. See
- # https://stackoverflow.com/questions/39205527/can-you-annotate-return-type-when-value-is-instance-of-cls/39205612#39205612
- F = TypeVar("F", bound="FileReader")
- class FileReader(LoggingConfigurable):
- """
- Base class for parsing a file for resources according to operation type. Subclasses set
- their own parser member variable according to their implementation language.
- """
- def __init__(self, filepath: str, **kwargs):
- super().__init__(**kwargs)
- self._filepath = filepath
- @property
- def filepath(self):
- return self._filepath
- @property
- def language(self) -> str:
- file_extension = os.path.splitext(self._filepath)[-1]
- if file_extension == ".py":
- return "python"
- elif file_extension == ".r":
- return "r"
- else:
- return None
- def read_next_code_chunk(self) -> List[str]:
- """
- Implements a generator for lines of code in the specified filepath. Subclasses
- may override if explicit line-by-line parsing is not feasible, e.g. with Notebooks.
- """
- with open(self._filepath) as f:
- for line in f:
- yield [line.strip()]
- class NotebookReader(FileReader):
- def __init__(self, filepath: str, **kwargs):
- super().__init__(filepath, **kwargs)
- with open(self._filepath) as f:
- self._notebook = nbformat.read(f, as_version=4)
- self._language = None
- try:
- self._language = self._notebook["metadata"]["kernelspec"]["language"].lower()
- except KeyError:
- self.log.warning(f"No language metadata found in {self._filepath}")
- @property
- def language(self) -> str:
- return self._language
- def read_next_code_chunk(self) -> List[str]:
- for cell in self._notebook.cells:
- if cell.source and cell.cell_type == "code":
- yield cell.source.split("\n")
- class ScriptParser(object):
- """
- Base class for parsing individual lines of code. Subclasses implement a search_expressions()
- function that returns language-specific regexes to match against code lines.
- """
- _comment_char = "#"
- def _get_line_without_comments(self, line):
- if self._comment_char in line:
- index = line.find(self._comment_char)
- line = line[:index]
- return line.strip()
- def parse_environment_variables(self, line):
- # Parse a line fed from file and match each regex in regex dictionary
- line = self._get_line_without_comments(line)
- if not line:
- return []
- matches = []
- for key, value in self.search_expressions().items():
- for pattern in value:
- regex = re.compile(pattern)
- for match in regex.finditer(line):
- matches.append((key, match))
- return matches
- class PythonScriptParser(ScriptParser):
- def search_expressions(self) -> Dict[str, List]:
- # TODO: add more key:list-of-regex pairs to parse for additional resources
- regex_dict = dict()
- # First regex matches envvar assignments of form os.environ["name"] = value w or w/o value provided
- # Second regex matches envvar assignments that use os.getenv("name", "value") with ow w/o default provided
- # Third regex matches envvar assignments that use os.environ.get("name", "value") with or w/o default provided
- # Both name and value are captured if possible
- envs = [
- r"os\.environ\[[\"']([a-zA-Z_]+[A-Za-z0-9_]*)[\"']\](?:\s*=(?:\s*[\"'](.[^\"']*)?[\"'])?)*",
- r"os\.getenv\([\"']([a-zA-Z_]+[A-Za-z0-9_]*)[\"'](?:\s*\,\s*[\"'](.[^\"']*)?[\"'])?",
- r"os\.environ\.get\([\"']([a-zA-Z_]+[A-Za-z0-9_]*)[\"'](?:\s*\,(?:\s*[\"'](.[^\"']*)?[\"'])?)*",
- ]
- regex_dict["env_vars"] = envs
- return regex_dict
- class RScriptParser(ScriptParser):
- def search_expressions(self) -> Dict[str, List]:
- # TODO: add more key:list-of-regex pairs to parse for additional resources
- regex_dict = dict()
- # Tests for matches of the form Sys.setenv("key" = "value")
- envs = [
- r"Sys\.setenv\([\"']*([a-zA-Z_]+[A-Za-z0-9_]*)[\"']*\s*=\s*[\"']*(.[^\"']*)?[\"']*\)",
- r"Sys\.getenv\([\"']*([a-zA-Z_]+[A-Za-z0-9_]*)[\"']*\)(.)*",
- ]
- regex_dict["env_vars"] = envs
- return regex_dict
- class ContentParser(LoggingConfigurable):
- parsers = {"python": PythonScriptParser(), "r": RScriptParser()}
- def parse(self, filepath: str) -> dict:
- """Returns a model dictionary of all the regex matches for each key in the regex dictionary"""
- properties = {"env_vars": {}, "inputs": [], "outputs": []}
- reader = self._get_reader(filepath)
- parser = self._get_parser(reader.language)
- if not parser:
- return properties
- for chunk in reader.read_next_code_chunk():
- if chunk:
- for line in chunk:
- matches = parser.parse_environment_variables(line)
- for key, match in matches:
- if key == "env_vars":
- properties[key][match.group(1)] = match.group(2)
- else:
- properties[key].append(match.group(1))
- return properties
- def _validate_file(self, filepath: str):
- """
- Validate file exists and is file (e.g. not a directory)
- """
- if not os.path.exists(filepath):
- raise FileNotFoundError(f"No such file or directory: {filepath}")
- if not os.path.isfile(filepath):
- raise IsADirectoryError(f"Is a directory: {filepath}")
- def _get_reader(self, filepath: str):
- """
- Find the proper reader based on the file extension
- """
- file_extension = os.path.splitext(filepath)[-1]
- self._validate_file(filepath)
- if file_extension == ".ipynb":
- return NotebookReader(filepath)
- elif file_extension in [".py", ".r"]:
- return FileReader(filepath)
- else:
- raise ValueError(f"File type {file_extension} is not supported.")
- def _get_parser(self, language: str):
- """
- Find the proper parser based on content language
- """
- parser = None
- if language:
- parser = self.parsers.get(language)
- if not parser:
- self.log.warning(f"Content parser for {language} is not available.")
- return parser
|