sodadata / soda-spark Goto Github PK
View Code? Open in Web Editor NEWSoda Spark is a PySpark library that helps you with testing your data in Spark Dataframes
Home Page: https://docs.soda.io
License: Apache License 2.0
Soda Spark is a PySpark library that helps you with testing your data in Spark Dataframes
Home Page: https://docs.soda.io
License: Apache License 2.0
No need for it anymore, as we do not have to specify the database.
Explain the API for the scan functionality in the README.
I get an issue when running a scan that has sql metrics configured and passing the Soda Cloud client to the execute call. It works fine when running the scan without the client. The execution just hangs and I need to cancel it and then I see this error message:
^CERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/clientserver.py", line 475, in send_command
answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=3>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/clientserver.py", line 503, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/clientserver.py", line 475, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "/opt/homebrew/Cellar/[email protected]/3.9.9/Frameworks/Python.framework/Versions/3.9/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/context.py", line 292, in signal_handler
self.cancelAllJobs()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/context.py", line 1195, in cancelAllJobs
self._jsc.sc().cancelAllJobs()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/java_gateway.py", line 1309, in __call__
return_value = get_return_value(
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o14.sc
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/clientserver.py", line 503, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
--- Logging error ---
Traceback (most recent call last):
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodasql/scan/scan.py", line 682, in _run_sql_metric_failed_rows
self.sampler.save_sample_to_local_file_with_limit(resolved_sql, temp_file, failed_limit)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodasql/scan/sampler.py", line 207, in save_sample_to_local_file_with_limit
row = cursor.fetchone()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodaspark/scan.py", line 150, in fetchone
row = self._df.first()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1617, in first
return self.head()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1603, in head
rs = self.head(1)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 1605, in head
return self.take(n)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 744, in take
return self.limit(num).collect()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 693, in collect
sock_info = self._jdf.collectToPython()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/java_gateway.py", line 1309, in __call__
return_value = get_return_value(
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o3900.collectToPython
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/homebrew/Cellar/[email protected]/3.9.9/Frameworks/Python.framework/Versions/3.9/lib/python3.9/logging/__init__.py", line 1083, in emit
msg = self.format(record)
File "/opt/homebrew/Cellar/[email protected]/3.9.9/Frameworks/Python.framework/Versions/3.9/lib/python3.9/logging/__init__.py", line 927, in format
return fmt.format(record)
File "/opt/homebrew/Cellar/[email protected]/3.9.9/Frameworks/Python.framework/Versions/3.9/lib/python3.9/logging/__init__.py", line 663, in format
record.message = record.getMessage()
File "/opt/homebrew/Cellar/[email protected]/3.9.9/Frameworks/Python.framework/Versions/3.9/lib/python3.9/logging/__init__.py", line 367, in getMessage
msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
File "/Users/albin/dev/soda/pipeline-demo/soda-spark-test/soda-spark-tests.py", line 21, in <module>
scan_result = scan.execute(scan_definition, df_ecommerce, soda_server_client=soda_server_client)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodaspark/scan.py", line 294, in execute
scan.execute()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodasql/scan/scan.py", line 94, in execute
self._query_sql_metrics_and_run_tests()
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodasql/scan/scan.py", line 497, in _query_sql_metrics_and_run_tests
self._query_sql_metrics_and_run_tests_base(self.scan_yml.sql_metric_ymls)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodasql/scan/scan.py", line 515, in _query_sql_metrics_and_run_tests_base
self._run_sql_metric_failed_rows(sql_metric, resolved_sql, scan_column)
File "/Users/albin/dev/py/env/2.1.0b20/lib/python3.9/site-packages/sodasql/scan/scan.py", line 736, in _run_sql_metric_failed_rows
logger.exception(f'Could not perform sql metric failed rows \n{resolved_sql}', e)
Message: 'Could not perform sql metric failed rows \nselect order_id as failed_orders\nfrom orders\nwhere ship_date < order_date;\n'
Arguments: (Py4JError('An error occurred while calling o3900.collectToPython'),)
Maybe with a tool like town crier
Use setup.cfg instead
The github workflow we have now shows the release jobs (as cancelled) in the PR. By creating a separate workflow for the release, we eliminate this clutter. Still, the release workflow should only run when the tests have passed.
In the yaml you can defined which columns to exclude. Test if this feature works.
With have pinned versions for soda-sql
, we should make this a version range.
In the column metrics the metric-groups
configuration can be set. Test if this works.
It would be great if we can implement the failed rows processor for Soda Spark as well:
https://github.com/sodadata/soda-sql/blob/main/core/sodasql/scan/failed_rows_processor.py
Test if column metrics work. Testing for a random selection should suffice.
The version is retrieved here. I do not think this works, since setup.py
does not work.
The execute
function should return the ScanResults. We now do a conversion to a data frame, for the measurements only. Since this is not complete - e.g. test results are missing - we would like to the return the scan results.
Still keeping #23 open for discussion from the community about the preferred behavior of the execute.
Register data frame as temporary view and compute metric using a SQL statement.
Use the one from GDD kick-start-python
When a DS is large and has a big number of columns, the scan function scan.execute(scan_definition, df)
fails with a spark OOM issue in the master due to the collection part of the metrics. A more meaningful message here would help to avoid miss leading the developer and let them know that the final result is too large and should be either filtered or split.
Generate the docs using a Github workflow. Preferably similar to Soda data.
Add a test that validates that the scan results is an empty list
Use setup.cfg instead of setup.py to define installation.
ERROR: Failed building wheel for sasl
ERROR: Command errored out with exit status 1:
command: /databricks/python3/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-hk_a28h0/sasl_22bdc11526b24a309f12b898eb2ce262/setup.py'"'"'; file='"'"'/tmp/pip-install-hk_a28h0/sasl_22bdc11526b24a309f12b898eb2ce262/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(file);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, file, '"'"'exec'"'"'))' install --record /tmp/pip-record-_6sr1coa/install-record.txt --single-version-externally-managed --compile --install-headers /databricks/python3/include/site/python3.8/sasl
cwd: /tmp/pip-install-hk_a28h0/sasl_22bdc11526b24a309f12b898eb2ce262/
Complete output (29 lines):
running install
running build
running build_py
creating build
creating build/lib.linux-x86_64-3.8
creating build/lib.linux-x86_64-3.8/sasl
copying sasl/init.py -> build/lib.linux-x86_64-3.8/sasl
running egg_info
writing sasl.egg-info/PKG-INFO
writing dependency_links to sasl.egg-info/dependency_links.txt
writing requirements to sasl.egg-info/requires.txt
writing top-level names to sasl.egg-info/top_level.txt
reading manifest file 'sasl.egg-info/SOURCES.txt'
reading manifest template 'MANIFEST.in'
writing manifest file 'sasl.egg-info/SOURCES.txt'
copying sasl/saslwrapper.cpp -> build/lib.linux-x86_64-3.8/sasl
copying sasl/saslwrapper.h -> build/lib.linux-x86_64-3.8/sasl
copying sasl/saslwrapper.pyx -> build/lib.linux-x86_64-3.8/sasl
running build_ext
building 'sasl.saslwrapper' extension
creating build/temp.linux-x86_64-3.8
creating build/temp.linux-x86_64-3.8/sasl
x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC -Isasl -I/databricks/python3/include -I/usr/include/python3.8 -c sasl/saslwrapper.cpp -o build/temp.linux-x86_64-3.8/sasl/saslwrapper.o
In file included from sasl/saslwrapper.cpp:629:
sasl/saslwrapper.h:22:10: fatal error: sasl/sasl.h: No such file or directory
22 | #include <sasl/sasl.h>
| ^~~~~~~~~~~~~
compilation terminated.
error: command 'x86_64-linux-gnu-gcc' failed with exit status 1
----------------------------------------
ERROR: Command errored out with exit status 1: /databricks/python3/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-hk_a28h0/sasl_22bdc11526b24a309f12b898eb2ce262/setup.py'"'"'; file='"'"'/tmp/pip-install-hk_a28h0/sasl_22bdc11526b24a309f12b898eb2ce262/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(file);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, file, '"'"'exec'"'"'))' install --record /tmp/pip-record-_6sr1coa/install-record.txt --single-version-externally-managed --compile --install-headers /databricks/python3/include/site/python3.8/sasl Check the logs for full command output.
As discussed in this soda-sql
issue we prefer to use the Warehouse methods to execute SQL statements since we have control over this API. However, to unblock sodadata/soda-sql#240 we implemented the Connection
and Cursor
in sodadata/soda-sql#239. After the issue 479 is resolved we can replace the Connection
and Cursor
with a Warehouse
implementation for Spark.
See this page for connection details
First investigate what the schema of the data frame would look like.
It is not needed due to setup.cfg
Given a scan yaml file, apply a scan to a data frame.
def scan(df: DataFrame, scan_yaml: Union[str, Path]))
...
TBD:
Use doctest to test examples in the docstring.
We have implemented the sql_fetchone
method, but the others will not work for Spark.
After a measurement is calculated - given a certain metric - the response to the user should be given as a Spark data frame.
This bug is due to the new release of makupfsafe
(2.1.0) where soft_unicode
has been removed. Deps need to be adapted
What is the preferred behavior for the scan.execute
from a user perspective?
At this moment the execute
returns a Spark data frame. The data frame contains the measurements of thescan_results
.
Maybe it does not make sense to returns this a Spark data frame:
We could return the scan result object. However, that is maybe not an object the user expects, as it is Soda internal.
We will use soda-sql
to generate the SQL that computes a metric.
A user reporting a failure after reading csv's in spark and running the scan on it:
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
scan.execute(scan_definition, df)
The solution was to explicitly add the schema.
This issue asks to investigate if we also could use the inference of the schema.
When running a scan on a table that is in another database than default
, the temporary view in scan.execute
cannot be created because of the prefix in the table name. The scan runs fine without creating this temporary view.
I'm wondering if the creation of a the temporary view in scan.execute
function is necessary. To the relatively naked eye it seems unnecessary. Why is it here? Are there situations in which the table is only readable when a temporary view is created in this way?
See the validty formats.
Similar to the soda-sql
repo, have a job that publishes the package to pypi
After being able to compute one measurement from one metric we would like to do the same for any given set of metrics.
Test if custom metrics work
With tox we have the flexibility to easily test for multiple python versions
After being able to compute a measurement, implement the API for running a test
Soda Spark scan never finishes when Samples are enabled:
samples:
table_limit: 50
failed_limit: 50
When removed it works.
Title says it all
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.