Currently, the Spark Connect client for Rust is highly experimental and should
not be used in any production setting. This is currently a "proof of concept" to identify the methods
of interacting with Spark cluster from rust.
The spark-connect-rs aims to provide an entrypoint to Spark Connect, and provide similar DataFrame API interactions.
Project Layout
├── core <- core implementation in Rust
│ └─ spark <- git submodule for apache/spark
├── rust <- shim for 'spark-connect-rs' from core
├── examples <- examples of using different aspects of the crate
├── datasets <- sample files from the main spark repo
Future state would be to have additional bindings for other languages along side the top level rust folder.
Getting Started
This section explains how run Spark Connect Rust locally starting from 0.
Start a streaming job and return a StreamingQuery object to handle the stream operations.
DataStreamWriter
API
Comment
foreach
foreachBatch
format
option
options
outputMode
Uses an Enum for OutputMode
partitionBy
queryName
start
toTable
trigger
Uses an Enum for TriggerMode
StreamingQueryListener
StreamingQueryListener
API
Comment
onQueryIdle
onQueryProgress
onQueryStarted
onQueryTerminated
UdfRegistration (may not be possible)
UDFRegistration
API
Comment
register
registerJavaFunction
registerJavaUDAF
UdtfRegistration (may not be possible)
UDTFRegistration
API
Comment
register
RuntimeConfig
RuntimeConfig
API
Comment
get
isModifiable
set
unset
Catalog
Catalog
API
Comment
cacheTable
clearCache
createExternalTale
createTable
currentCatalog
currentDatabase
databaseExists
dropGlobalTempView
dropTempView
functionExists
getDatabase
getFunction
getTable
isCached
listCatalogs
listDatabases
listFunctions
listTables
recoverPartitions
refreshByPath
refreshTable
registerFunction
setCurrentCatalog
setCurrentDatabase
tableExists
uncacheTable
DataFrame
Spark DataFrame type object and its implemented traits.
DataFrame
API
Comment
agg
alias
approxQuantile
cache
checkpoint
coalesce
colRegex
collect
columns
corr
count
cov
createGlobalTempView
createOrReplaceGlobalTempView
createOrReplaceTempView
createTempView
crossJoin
crosstab
cube
describe
distinct
drop
dropDuplicates
dropDuplicatesWithinWatermark
Windowing functions are currently in progress
drop_duplicates
dropna
dtypes
exceptAll
explain
fillna
filter
first
foreach
foreachPartition
freqItems
groupBy
head
hint
inputFiles
intersect
intersectAll
isEmpty
isLocal
isStreaming
join
limit
localCheckpoint
mapInPandas
TBD on this exact implementation
mapInArrow
TBD on this exact implementation
melt
na
observe
offset
orderBy
persist
printSchema
randomSplit
registerTempTable
repartition
repartitionByRange
replace
rollup
sameSemantics
sample
sampleBy
schema
select
selectExpr
semanticHash
show
sort
sortWithinPartitions
sparkSession
stat
storageLevel
subtract
summary
tail
take
to
toDF
toJSON
Does not return an RDD but a long JSON formatted String
toLocalIterator
toPandas to_polars & toPolars
Convert to a polars::frame::DataFrame
new to_datafusion & toDataFusion
Convert to a datafusion::dataframe::DataFrame
transform
union
unionAll
unionByName
unpersist
unpivot
where
use filter instead, where is a keyword for rust
withColumn
withColumns
withColumnRenamed
withColumnsRenamed
withMetadata
withWatermark
write
writeStream
writeTo
DataFrameWriter
Spark Connect should respect the format as long as your cluster supports the specified type and has the
required jars
DataFrameWriter
API
Comment
bucketBy
csv
format
insertInto
jdbc
json
mode
option
options
orc
parquet
partitionBy
save
saveAsTable
sortBy
text
DataFrameWriterV2
DataFrameWriterV2
API
Comment
append
create
createOrReplace
option
options
overwrite
overwritePartitions
partitionedBy
replace
tableProperty
using
Column
Spark Column type object and its implemented traits
Column
API
Comment
alias
asc
asc_nulls_first
asc_nulls_last
astype
between
cast
contains
desc
desc_nulls_first
desc_nulls_last
dropFields
endswith
eqNullSafe
getField
This is depreciated but will need to be implemented
getItem
This is depreciated but will need to be implemented
ilike
isNotNull
isNull
isin
like
name
otherwise
over
Refer to Window for creating window specifications
rlike
startswith
substr
when
withField
eq ==
Rust does not like when you try to overload == and return something other than a bool. Currently implemented column equality like col('name').eq(col('id')). Not the best, but it works for now
addition +
subtration -
multiplication *
division /
OR |
AND &
XOR ^
Negate ~
Functions
Only a few of the functions are covered by unit tests.
Functions
API
Comment
abs
acos
acosh
add_months
aggregate
approxCountDistinct
approx_count_distinct
array
array_append
array_compact
array_contains
array_distinct
array_except
array_insert
array_intersect
array_join
array_max
array_min
array_position
array_remove
array_repeat
array_sort
array_union
arrays_overlap
arrays_zip
asc
asc_nulls_first
asc_nulls_last
ascii
asin
asinh
assert_true
atan
atan2
atanh
avg
base64
bin
bit_length
bitwiseNOT
bitwise_not
broadcast
bround
bucket
call_udf
cbrt
ceil
coalesce
col
collect_list
collect_set
column
concat
concat_ws
conv
corr
cos
cosh
cot
count
countDistinct
count_distinct
covar_pop
covar_samp
crc32
create_map
csc
cume_dist
current_date
current_timestamp
date_add
date_format
date_sub
date_trunc
datediff
dayofmonth
dayofweek
dayofyear
days
decode
degrees
dense_rank
desc
desc_nulls_first
desc_nulls_last
element_at
encode
exists
exp
explode
explode_outer
expm1
expr
factorial
filter
first
flatten
floor
forall
format_number
format_string
from_csv
from_json
from_unixtime
from_utc_timestamp
functools
get
get_active_spark_context
get_json_object
greatest
grouping
grouping_id
has_numpy
hash
hex
hour
hours
hypot
initcap
inline
inline_outer
input_file_name
inspect
instr
isnan
isnull
json_tuple
kurtosis
lag
last
last_day
lead
least
length
levenshtein
lit
localtimestamp
locate
log
log10
log1p
log2
lower
lpad
ltrim
make_date
map_concat
map_contains_key
map_entries
map_filter
map_from_arrays
map_from_entries
map_keys
map_values
map_zip_with
max
max_by
md5
mean
median
min
min_by
minute
mode
monotonically_increasing_id
month
months
months_between
nanvl
next_day
np
nth_value
ntile
octet_length
overlay
overload
pandas_udf
percent_rank
percentile_approx
pmod
posexplode
posexplode_outer
pow
product
quarter
radians
raise_error
rand
randn
rank
regexp_extract
regexp_replace
repeat
reverse
rint
round
row_number
rpad
rtrim
schema_of_csv
schema_of_json
sec
second
sentences
sequence
session_window
sha1
sha2
shiftLeft
shiftRight
shiftRightUnsigned
shiftleft
shiftright
shiftrightunsigned
shuffle
signum
sin
sinh
size
skewness
slice
sort_array
soundex
spark_partition_id
split
sqrt
stddev
stddev_pop
stddev_samp
struct
substring
substring_index
sum
sumDistinct
sum_distinct
sys
tan
tanh
timestamp_seconds
toDegrees
toRadians
to_csv
to_date
to_json
to_str
to_timestamp
to_utc_timestamp
transform
transform_keys
transform_values
translate
trim
trunc
try_remote_functions
udf
unbase64
unhex
unix_timestamp
unwrap_udt
upper
var_pop
var_samp
variance
warnings
weekofyear
when
window
window_time
xxhash64
year
years
zip_with
Data Types
Data types are used for creating schemas and for casting columns to specific types
Column
API
Comment
ArrayType
BinaryType
BooleanType
ByteType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
CharType
VarcharType
StructField
StructType
TimestampType
TimestampNTZType
DayTimeIntervalType
YearMonthIntervalType
Literal Types
Create Spark literal types from these rust types. E.g. lit(1_i64) would be a LongType() in the schema.
An array can be made like lit([1_i16,2_i16,3_i16]) would result in an ArrayType(Short) since all the values of the slice can be translated into literal type.
Spark Literal Type
Rust Type
Status
Null
Binary
&[u8]
Boolean
bool
Byte
Short
i16
Integer
i32
Long
i64
Float
f32
Double
f64
Decimal
String
&str / String
Date
chrono::NaiveDate
Timestamp
chrono::DateTime<Tz>
TimestampNtz
chrono::NaiveDateTime
CalendarInterval
YearMonthInterval
DayTimeInterval
Array
slice / Vec
Map
Create with the function create_map
Struct
Create with the function struct_col or named_struct
Window & WindowSpec
For ease of use it's recommended to use Window to create the WindowSpec.
I already have a fix in mind for this issue, and I can make a PR if this issue is reviewed and approved, thanks!
Description
The current spark-connect-rs library configures all endpoints to https scheme. When tls feature is enabled, connection cannot be successfully made to server without TLS configured, for example, when connecting to a Spark cluster set up at localhost
Expected Behavior
When tls feature is enabled in spark-connect-rs crate, connection to server with / without TLS configured should both be successful
Proposed Fix
Default endpoint scheme to http, set it to https only when use_ssl=true is specified in connection string
thread 'main' panicked at /home/sjrusso/Documents/code/projects/rust-projects/spark-connect-rs/src/session.rs:191:30:
called `Result::unwrap()` on an `Err` value: IpcError("Not expecting a schema when messages are read")stack backtrace: 0: rust_begin_unwind at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/panicking.rs:645:5 1: core::panicking::panic_fmt at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/panicking.rs:72:14 2: core::result::unwrap_failed at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/result.rs:1649:5 3: core::result::Result<T,E>::unwrap at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/result.rs:1073:23 4: spark_connect_rs::session::SparkSession::consume_plan::{{closure}} at ./src/session.rs:191:12 5: spark_connect_rs::dataframe::DataFrame::collect::{{closure}} at ./src/dataframe.rs:99:14 6: sql::main::{{closure}} at ./examples/sql.rs:21:10 7: tokio::runtime::park::CachedParkThread::block_on::{{closure}} at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:63 8: tokio::runtime::coop::with_budget at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:107:5 9: tokio::runtime::coop::budget at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/coop.rs:73:5 10: tokio::runtime::park::CachedParkThread::block_on at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/park.rs:282:31 11: tokio::runtime::context::blocking::BlockingRegionGuard::block_on at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/blocking.rs:66:9 12: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}} at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:87:13 13: tokio::runtime::context::runtime::enter_runtime at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/context/runtime.rs:65:16 14: tokio::runtime::scheduler::multi_thread::MultiThread::block_on at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/scheduler/multi_thread/mod.rs:86:9 15: tokio::runtime::runtime::Runtime::block_on at /home/sjrusso/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/runtime.rs:349:45 16: sql::main at ./examples/sql.rs:46:5 17: core::ops::function::FnOnce::call_once at /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/core/src/ops/function.rs:250:5note: Some details are omitted, run with `RUST_BACKTRACE=full`for a verbose backtrace.
Create similar bindings as with Rust but available in server side js (node, deno, bun, ...). The sdk should closely resemble the rust one, and only deviate when either necessary due to napi limitations, or when it is unidiomatic in JS.
napi.rs seems to be good crate to leverage and is relatively easy to use.
Early Experiment
The branch feat/napi contains a super quick pass at creating the bindings. The experiment only covers these areas
Create a remote SparkSession
Create a dataframe with .sql
Modify the dataframe with select, and filter
Perform "action" with count()
Perform “action” with show()
There is a lot of use of clone() and some not great implementations to create a new empty dataframe to satisfy the napi requirements. The polars js interop is a good example of how the bindings might function.
Implement the ability to use positional/keyword args with sql. Because of the differences between python and rust, the function arguments need to be clearly implemented.
The pyspark process for sql allows for literals and dataframes to be in one argument. However, rust probably won't take to kindly to that input arg. If a user passes in a DataFrame it will need to be handled with a SubqueryAlias and if it's a literal it will be passed in as either a positional or a keyword argument.
We might want to only allow for to inputs parameters are options. Something like this?
spark.sql("SELECT * FROM table",None,None).await?;
let df = spark.range(...);// create the hashmap
spark.sql("SELECT * FROM {df}"None,Some(df_hashmap)).await?;
let col = "name";// create the hashmap
spark.sql("SELECT {col} FROM {df}",Some(col_hashmap),Some(df_hashmap)).await?;
Or should positional SQL be a completely different method all together? like sql_params? So that a user doesn't need to fuss with adding None x2 to all their sql statements.
Being able to compile the rust bindings into wasm32-unknown-unknown and/or wasm32-wasi would be interesting. This could allow some interesting interactions between a browser and spark. A wasm32-wasi target would allow spark programs to run on any runtime.
Early Experiments
A feature flag under the core bindings for wasm already exists and does compile successfully to those targets mentioned above. The issue arises when trying to send a HTTP1.1 request with grpc-web to the Spark Connect server, which only accepts normal HTTP2 grpc requests. There are methods of standing up a proxy server with envoy to forward the gRPC browser request to the backend server. But this feels like a lot of effort for the client to do.
The branch feat/wasm contains the early experiment and trying to run wasm with wasmtime. Issue arises with using async code in wasm. There is probably a way to code it correctly, but I don't have time to finish the experiment
Git submodules are annoying, and the current submodule is only ever checked out at a specific release tag. It would be easier to just have the folder containing the copied protobufs.
I think it might look something like this in the repo
├── core <- core implementation in Rust
│ │ - spark4_0 <- protobuf for spark 4.0.0
│ └─ spark3_5 <- protobuf for spark 3.5.1
There are many functions for Spark, and most of them are created via a macro. However, not all of them have unit test coverage. Create additional unit tests based on similar test conditions from the exiting Spark API test cases.
I have been mirror the docstring tests from the PySpark API for reference.
I'd like to report a deadlock issue when a spark session is cloned and used concurrently.
#46 demonstrates a possible workflow leading to a deadlock.
The gist is, everywhere #[allow(clippy::await_holding_lock)] is used poses a possibility of resulting in a deadlock when a spark session is cloned and used concurrently.
-> When a task is suspended holding a lock, another task will wait for the lock to be released without yielding the executor.
-> This is a very common "dangerous" asynchronous programming pattern that should always be avoided at all cost.
Therefore, I would suggest that we remove the 'clone' method from SparkSession, or replace rwlock with an asynchronous lock. What do you think of this?
Lots of other programming languages can access gRPC and Arrow, which means many different languages will be recreating the core client logic to handle the client requests and response handling.
The idea is that there could one kernel client that all other programming languages use, and then the specific language is left to implement the specific of the core spark objects.
What does this involve?
Isolate the existing client.rs into core and move all other rust specific implementations into rust
Remove any dependencies in client.rs that are only for the rust library
Update error handling in client.rs to create a new ConnectClientError error type, (currently leverages SparkError)
lots of more research and analysis into the feasibility of this kernel :)
The initial DataFrameWriter is created but there is also another way to write data via DataFrameWriterV2. This method has a slightly different implementation and leverages a different proto command message.
I think the methods should mirror the ones found on the Spark API guide, and a new method should be added onto the DataFrame for writeTo.
The overall documentation needs to be reviewed and matched against the Spark Core Classes and Functions. For instance, the README should be accurate to what functions and methods are currently implemented compared to the existing Spark API.
However, there are probably a few misses that are either currently implemented but marked as open, or were accidentally excluded. Might consider adding a few sections for other classes like StreamingQueryManager, DataFrameNaFunctions, DataFrameStatFunctions, etc.